function_result = self.function(*args, **kwargs)
except FileNotFoundError as result: # filename文件 没找到
print("Error: file is not found %s" % result)
except AttributeError as result:
print("Error: %s" % result)
except Exception as result:
print("unkown Error: %s" % result)
else:
return function_result
合约模型类
class PositionModel:
@classmethod
def position_key(cls):
return "pos"
@classmethod
def entryprice_key(cls):
return "entryprice"
@classmethod
def target_position_key(cls):
return "target_pos"
@classmethod
def target_position_er_key(cls):
return "target_position"
@classmethod
def buy_position_key(cls):
return "buy_position"
@classmethod
def sell_position_key(cls):
return "sell_position"
@classmethod
def net_position_key(cls):
return "net_position"
def __init__(self, position_dictionary):
if PositionModel.position_key() in position_dictionary.keys():
self.lots = position_dictionary[PositionModel.position_key()]
else:
self.lots = None
if PositionModel.entryprice_key() in position_dictionary.keys():
self.entry_price = position_dictionary[PositionModel.entryprice_key()]
else:
self.entry_price = None
if PositionModel.target_position_key() in position_dictionary.keys():
self.target_position = position_dictionary[PositionModel.target_position_key()]
else:
self.target_position = None
def model_to_dictionary(self):
empty_dictionary = {}
if self.lots is not None:
empty_dictionary[PositionModel.position_key()] = self.lots
if self.entry_price is not None:
empty_dictionary[PositionModel.entryprice_key()] = self.entry_price
if self.target_position is not None:
empty_dictionary[PositionModel.target_position_key()] = self.target_position
return empty_dictionary
class ContractModel:
def __init__(self, contract_name, position_model):
self.name = contract_name
self.__position_model = position_model
@property
def position_model(self):
return self.__position_model
@position_model.setter
def position_model(self, position_model):
self.__position_model = position_model
@position_model.getter
def position_model(self):
return self.__position_model
@classmethod
def dictionary_to_models(cls, data_dictionary):
contract_models = []
for contract_name, position_dictionary in data_dictionary.items():
contract_model = ContractModel(
contract_name=contract_name,
position_model=PositionModel(position_dictionary=position_dictionary)
contract_models.append(contract_model)
return contract_models
@classmethod
def models_to_dictionary(cls, *models):
empty_dictionary = {}
for model in models:
empty_dictionary[model.name] = model.position_model.model_to_dictionary()
return empty_dictionary
持仓json文件操作类
class PositionJsonOperator:
操作 json文件 的 类
ps: 使用时, 需要导入 json、math库
@classmethod
def __read_type(cls):
return "r"
@classmethod
def __write_type(cls):
return "w"
@classmethod
def __utf_8_encoding(cls):
return "utf-8"
# ------------------ public part ------------------
@classmethod
def tqz_load_jsonfile(cls, jsonfile=None):
if jsonfile is None:
exception = Exception("Error: filename is None")
raise exception
else:
return cls.__writeReadFile_except_operation(jsonfile=jsonfile, operation_type=cls.__read_type())
@classmethod
def tqz_write_jsonfile(cls, content=None, target_jsonfile=None):
if target_jsonfile is None:
exception = Exception("Error: filename is None")
raise exception
else:
cls.__writeReadFile_except_operation(jsonfile=target_jsonfile, content=content, operation_type=cls.__write_type())
# 用来转网页格式
@classmethod
def tqz_get_single_jsonfile_buy_sell(cls, jsonfile=None):
content_dictionary = cls.tqz_load_jsonfile(jsonfile=jsonfile)
empty_content_dictionary = {}
for vt_symbol_strategy, data in content_dictionary.items():
for position_key, position_data in data.items():
if position_key == PositionModel.position_key():
if position_data > 0:
buy_position, sell_position, net_position = abs(position_data), 0, position_data
else:
buy_position, sell_position, net_position = 0, abs(position_data), position_data
empty_content_dictionary[vt_symbol_strategy] = {
PositionModel.buy_position_key(): buy_position,
PositionModel.sell_position_key(): sell_position,
PositionModel.entryprice_key(): data[PositionModel.entryprice_key()],
PositionModel.net_position_key(): net_position
return empty_content_dictionary
@classmethod
def tqz_get_sum_position_jsonfile_data_buy_sell(cls, *jsonfile_list):
jsonfile_content_list = []
[jsonfile_content_list.append(cls.tqz_get_single_jsonfile_buy_sell(jsonfile)) for jsonfile in jsonfile_list]
return cls.tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell(
*jsonfile_content_list
@classmethod
def tqz_get_multi_position_jsonfile_data_buy_sell(cls, *jsonfile_list, multi):
sum_content = cls.tqz_get_sum_position_jsonfile_data_buy_sell(*jsonfile_list)
for vt_symbol, data in sum_content.items():
before_buy_position = data[PositionModel.buy_position_key()]
before_sell_position = data[PositionModel.sell_position_key()]
before_net_position = data[PositionModel.net_position_key()]
data[PositionModel.buy_position_key()] = math.floor(before_buy_position * multi)
data[PositionModel.sell_position_key()] = math.floor(before_sell_position * multi)
if before_net_position > 0:
data[PositionModel.net_position_key()] = math.floor(before_net_position * multi)
else:
data[PositionModel.net_position_key()] = math.ceil(before_net_position * multi)
if before_net_position > 0 and data[PositionModel.net_position_key()] is 0:
data[PositionModel.net_position_key()] = 1
data[PositionModel.buy_position_key()] = 1
elif before_net_position < 0 and data[PositionModel.net_position_key()] is 0:
data[PositionModel.net_position_key()] = -1
data[PositionModel.sell_position_key()] = 1
sum_content[vt_symbol] = data
return sum_content
@classmethod
def tqz_get_empty_position_jsonfile_data_buy_sell(cls, *jsonfile_list):
sum_content = cls.tqz_get_sum_position_jsonfile_data_buy_sell(*jsonfile_list)
for vt_symbol, data in sum_content.items():
data[PositionModel.buy_position_key()] = 0
data[PositionModel.sell_position_key()] = 0
data[PositionModel.net_position_key()] = 0
sum_content[vt_symbol] = data
return sum_content
@classmethod
def tqz_get_ER_position_format_data_buy_sell(cls, jsonfile):
er_content_data = cls.tqz_load_jsonfile(jsonfile=jsonfile)
empty_content_dictionary = {}
for main_data in er_content_data.values():
for target_position_key, position_datas in main_data.items():
if target_position_key == PositionModel.target_position_er_key():
for vt_symbol, position_data in position_datas.items():
if vt_symbol not in empty_content_dictionary.keys():
if position_data > 0:
buy_position, sell_position, net_position = abs(position_data), 0, position_data
else:
buy_position, sell_position, net_position = 0, abs(position_data), position_data
empty_content_dictionary[vt_symbol] = {
PositionModel.buy_position_key(): buy_position,
PositionModel.sell_position_key(): sell_position,
PositionModel.net_position_key(): net_position
else:
if position_data > 0:
buy_position, sell_position, net_position = abs(position_data), 0, position_data
else:
buy_position, sell_position, net_position = 0, abs(position_data), position_data
empty_content_dictionary[vt_symbol] = {
PositionModel.buy_position_key(): buy_position + empty_content_dictionary[vt_symbol][PositionModel.buy_position_key()],
PositionModel.sell_position_key(): sell_position + empty_content_dictionary[vt_symbol][PositionModel.sell_position_key()],
PositionModel.net_position_key(): net_position + empty_content_dictionary[vt_symbol][PositionModel.net_position_key()]
return empty_content_dictionary
@classmethod
def tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell(cls, *jsonfile_content_list):
temp_dic_list = []
for single_json_content in jsonfile_content_list:
temp_dic = {}
for key, value in single_json_content.items():
vt_symbol = key.split(".")[0] + "." + key.split(".")[1]
temp_dic[vt_symbol] = value
temp_dic_list.append(temp_dic)
dic_list = temp_dic_list
sum_content = {}
for jsonfile_content in dic_list:
for vt_symbol, data in jsonfile_content.items():
if vt_symbol in sum_content.keys():
sum_content[vt_symbol][PositionModel.buy_position_key()] += data[PositionModel.buy_position_key()]
sum_content[vt_symbol][PositionModel.sell_position_key()] += data[PositionModel.sell_position_key()]
sum_content[vt_symbol][PositionModel.net_position_key()] += data[PositionModel.net_position_key()]
else:
sum_content[vt_symbol] = {
PositionModel.buy_position_key(): data[PositionModel.buy_position_key()],
PositionModel.sell_position_key(): data[PositionModel.sell_position_key()],
PositionModel.net_position_key(): data[PositionModel.net_position_key()]
return sum_content
# ------------------ net position part ------------------
@classmethod
def __tqz_sum_position_all_jsonfile_net(cls, *jsonfile_list, target_jsonfile):
加总多个 json文件的 持仓, 并写入新的目标json文件中
:param jsonfile_list: 字符串数组
:param target_jsonfile: 要存入的 json文件名
jsonfile_content_list = []
[jsonfile_content_list.append(cls.tqz_load_jsonfile(jsonfile)) for jsonfile in jsonfile_list]
content_dic = cls.__sum_position_all_keyValueNotFound_except_operation(dic_list=jsonfile_content_list)
cls.tqz_write_jsonfile(content=content_dic, target_jsonfile=target_jsonfile)
@classmethod
def __tqz_get_sum_position_jsonfile_data_net(cls, *jsonfile_list):
加总多个 json文件的 持仓, 直接返回加总后的 数据;
:param jsonfile_list: 字符串数组
return: 加总后的 json文件数据
jsonfile_content_list = []
[jsonfile_content_list.append(cls.tqz_load_jsonfile(jsonfile)) for jsonfile in jsonfile_list]
return cls.__sum_position_all_keyValueNotFound_except_operation(
dic_list=jsonfile_content_list
@classmethod
def __tqz_get_sum_position_jsonfile_data_with_jsonfileContent_net(cls, *jsonfile_content_list):
return cls.__sum_position_all_keyValueNotFound_except_operation(
dic_list=jsonfile_content_list
@classmethod
def __tqz_get_multi_position_jsonfile_data_net(cls, *jsonfile_list, multi):
先加总 多个json文件的 持仓, 并返回按倍数调整过的 单个json文件数据;
:param jsonfile_list: 需要调整持仓的 json文件路径数组;
:param multi: 倍数;
return 返回加总后并按倍数调整过的 单个json文件的 持仓数据;
return cls.__multi_position_keyValueNotFound_except_operation(
source_content=cls.__tqz_get_sum_position_jsonfile_data_net(*jsonfile_list),
multi=multi
@classmethod
def __tqz_get_empty_position_jsonfile_data_net(cls, *jsonfile_list):
sum_dictionary = cls.__tqz_get_sum_position_jsonfile_data_net(*jsonfile_list)
empty_dictionary = {}
for symbol, data in sum_dictionary.items():
data[PositionModel.position_key()] = 0
empty_dictionary[symbol] = data
return empty_dictionary
@classmethod
def __tqz_get_ER_position_format_data_net(cls, jsonfile):
jsonfile_data = cls.tqz_load_jsonfile(jsonfile=jsonfile)
empty_dic = {}
for value in jsonfile_data.values():
for target_position_key, position_data in value.items():
if target_position_key == PositionModel.target_position_er_key():
for vt_symbol, position in position_data.items():
vt_symbol_type = vt_symbol + '.ER'
if vt_symbol_type not in empty_dic.keys():
empty_dic[vt_symbol_type] = {
PositionModel.position_key(): position,
PositionModel.target_position_key(): position
elif vt_symbol_type in empty_dic.keys():
empty_dic[vt_symbol_type][PositionModel.position_key()] += position
empty_dic[vt_symbol_type][PositionModel.target_position_key()] += position
return empty_dic
# ------------------ private part ------------------
@classmethod
def __tqz_multi_position_all_net(cls, *jsonfile_list, multi):
按倍数调整 多个json文件的 持仓
:param jsonfile_list: 需要调整持仓的 json文件数组
:param multi: 倍数
[cls.__multi_position(source_jsonfile=jsonfile, multi=multi) for jsonfile in jsonfile_list]
@classmethod
def __tqz_empty_position_all_net(cls, *jsonfile_list):
清空 多个json文件的 持仓
:param jsonfile_list: 需要清空的 json文件数组
cls.__tqz_multi_position_all_net(*jsonfile_list, multi=0)
@classmethod
def __multi_position(cls, source_jsonfile, multi):
cls.tqz_write_jsonfile(
content=cls.__get_multi_position(source_jsonfile=source_jsonfile, multi=multi),
target_jsonfile=source_jsonfile
@classmethod
def __get_multi_position(cls, source_jsonfile, multi):
return cls.__multi_position_keyValueNotFound_except_operation(
source_content=cls.tqz_load_jsonfile(jsonfile=source_jsonfile),
multi=multi
@classmethod
def __empty_position(cls, source_jsonfile):
cls.__multi_position(source_jsonfile=source_jsonfile, multi=0)
# ------------------ except operation part ------------------
@classmethod
@DecorateExcept
def __writeReadFile_except_operation(cls, jsonfile=None, content=None, operation_type="r"):
if cls.__read_type() is operation_type:
with open(jsonfile, operation_type, encoding=cls.__utf_8_encoding()) as fp:
return json.load(fp=fp)
elif cls.__write_type() is operation_type:
with open(jsonfile, operation_type, encoding=cls.__utf_8_encoding()) as fp:
json.dump(content, fp=fp, ensure_ascii=False, indent=4) # 参数indent: json文件按格式写入, 距行首空4格;
@classmethod
@DecorateExcept
def __sum_position_all_keyValueNotFound_except_operation(cls, dic_list=None):
new_json_contract_models = []
new_json_contract_names = []
temp_dic_list = []
for single_json_content in dic_list:
temp_dic = {}
for key, value in single_json_content.items():
vt_symbol = key.split(".")[0] + "." + key.split(".")[1]
temp_dic[vt_symbol] = value
temp_dic_list.append(temp_dic)
dic_list = temp_dic_list
all_json_contract_models = []
if dic_list is not None:
[all_json_contract_models.append(ContractModel.dictionary_to_models(data_dictionary=dic)) for dic in dic_list]
for contract_models in all_json_contract_models:
for contract_model in contract_models:
if contract_model.name not in new_json_contract_names:
new_json_contract_models.append(ContractModel(contract_name=contract_model.name, position_model=contract_model.position_model))
new_json_contract_names.append(contract_model.name)
else:
find_contract_model = cls.__find_contract_model(*new_json_contract_models, model_name=contract_model.name)
find_contract_model.position_model.lots += contract_model.position_model.lots
return ContractModel.models_to_dictionary(*new_json_contract_models)
@classmethod
def __find_contract_model(cls, *contract_models, model_name):
for contract_model in contract_models:
if contract_model.name == model_name:
return contract_model
return None
@classmethod
@DecorateExcept
def __multi_position_keyValueNotFound_except_operation(cls, source_content=None, multi=1):
all_json_contract_models = []
if source_content is not None:
[all_json_contract_models.append(ContractModel(contract_name=contract_name, position_model=PositionModel(position_dictionary=position_dictionary))) for contract_name, position_dictionary in source_content.items()]
for contract_model in all_json_contract_models:
before_multi_lots = contract_model.position_model.lots
# multi_lots: 按倍数调整完后的仓位
if before_multi_lots > 0:
multi_lots = math.floor(contract_model.position_model.lots * multi)
else:
multi_lots = math.floor(-1 * contract_model.position_model.lots * multi) * -1
# 按倍数调整完, 确保持仓至少保留1手;
if (before_multi_lots > 0) and (multi_lots is 0): # 原先持有多单, 仓位调整完后为 0;
multi_lots = 1
elif (before_multi_lots < 0) and (multi_lots is 0): # 原先持有空单, 仓位调整完后为 0;
multi_lots = -1
contract_model.position_model.lots = multi_lots
return ContractModel.models_to_dictionary(*all_json_contract_models)
def __main_engine():
sum_data = PositionJsonOperator.tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell(
PositionJsonOperator.tqz_get_ER_position_format_data_buy_sell("portfolio_strategy_data.json"),
PositionJsonOperator.tqz_get_sum_position_jsonfile_data_buy_sell("hla.json", "hsr.json")
target_jsonfile = "hls_hsr_er.json"
PositionJsonOperator.tqz_write_jsonfile(
content=sum_data,
target_jsonfile=target_jsonfile
list = [
"symbol_2.json",
"symbol_3.json",
"symbol_4.json",
"symbol_5.json"
print(PositionJsonOperator.tqz_get_sum_position_jsonfile_data_buy_sell(*list))
print(PositionJsonOperator.tqz_get_multi_position_jsonfile_data_buy_sell(*list, multi=0.5))
print(PositionJsonOperator.tqz_get_empty_position_jsonfile_data_buy_sell(*list))
""" net position part
list = [
"symbol_2.json",
"symbol_3.json",
"symbol_4.json",
"symbol_5.json"
target_jsonfile = "test.json"
PositionJsonOperator.tqz_sum_position_all_jsonfile_net(*list, target_jsonfile=target_jsonfile)
print(PositionJsonOperator.tqz_get_multi_position_jsonfile_data_net(target_jsonfile, multi=1))
print(PositionJsonOperator.tqz_get_empty_position_jsonfile_data_net(target_jsonfile))
PositionJsonOperator.tqz_multi_position_all_net(target_jsonfile, multi=0.5)
PositionJsonOperator.tqz_empty_position_all_net(target_jsonfile)
if __name__ == '__main__':
__main_engine()
import jsonimport math""" 处理异常的装饰器类"""class DecorateExcept: def __init__(self, function): self.function = function def __call__(self, *args, **kwargs): try: function_result = self.function(*args, **kwargs) .
vn
.
py
是基于
Py
thon语言的
量化交易
系统,是目前国内最好的开源
量化交易
平台之一。
vn
.
py
是机构级别的
量化交易
软件,掌握
vn
.
py
框架原理并且熟练使用,有利于快速入门
量化交易
,搭建自己的
量化交易
系统,也可以在机构中找到与量化岗位相关的工作。
本文是原创作品,针对
vn
.
py
的最新版本2.2.0(2021年3月26日发布),从源码的下载、安装、主程序入口、主窗口入手,先跟您一起把源码运行起来。再聚焦于
vn
.
py
的一个重要应用“CTA回测”,从各个层次上分析其源码,包括相关的数据库操作、多线程机制、事件引擎机制等,把这个应用从顶到底,再从底到顶贯通起来。明白了这些内容,再分析其它的功能就会事半功倍。
本文适合想要分析
vn
.
py
源码的人,还要看清楚软件版本,否则请不要下载,免得浪费积分。
for market_vt_symbol, bar in bars.items():
# strategy position
strategy_position = TQZSymbolOperator.tqz_get_strategy_position(
market_vt_symbol=market_vt_symbol,
for trade in self.tradeDict.values():
# 复制成交对象,因为下面的开平
仓
交易配对涉及到对成交数量的修改
# 若不进行复制直接操作,则计算完后所有成交的数量会变成0
from
vn
py
.app.position_manager.tools.position_operator.position_operator import PositionModel
class SymbolOperator:
@classmethod
def tqz_get_strategy_position(cls, market_vt_symbol, strategy_data):
if market_vt_symbol in cls.__get_str..
事情起因:融行虽然有规避自成交的功能模块;但还有很多情况,程序同
时
发相同合约
多空
两向的单子,融行并不会做处理,估计这是融行系统的一个bug。
但现在的MOM账户大都用的是融行系统,所以自成交风险的避免还是要我们自己程序这边来处理。
from
vn
py
.app.position_manager import StrategyTemplate
from
vn
py
.trader.utility import BarGenerator
import math
from time impor.
Vn
Trader简介
目前
Vn
Trader基本实现了国内外全品种的交易,包括期货、股票、贵金属、外盘交易等。对于
vn
.
py
的初学者而言,建议从项目的图形交易程序
Vn
Trader开始学习,先了解如何通过界面来监控行情与交易信息,并进行手工交易。
Vn
Trader对接了目前
vn
.
py
项目中所有的交易API,包括期货、股票、贵金属、外盘、比特币等。下文以最常用的CTP交易接口为例,讲解如何配置和使用...
""" 发单逻辑部分 """
def tqz_synchronization_position_cffex_lock_mode(self, market_vt_symbol, now_price, offset_price, strategy_position_net, real_position_net):
synchronization position with lock mode(cffex mode)
1、发现我的
vn
py
无法保存
仓位
,出现了一个unicode的报错。结果竟然是,新
策略
保存
仓位
和老
策略
有冲突。在用
策略
前,最好删除mongodb数据库里的
仓位
表。
2、初始化
策略
的
时
候发现一个
策略
类的两个不同品种,无法一样的初始化值。主要是由于初始化inti()函数执行的
时
候是要整个跑玩onbar()的,加入这句话就好了。引入一个trading标志。
# 全部指标初始化完成并且开始交易,才开始接
KD指标是广泛应用的技术指标之一,其作用是用以测量股价的动量,并提供买入卖出的信号。在
Py
thon中,KD的应用可以通过简单的代码实现,以此来进行
量化交易
。
首先,我们
需要
导入程序中
需要
使用的库,包括pandas、num
py
、matplotlib和talib。在导入库之后,我们
需要
进行数据的读取和处理。我们可以通过使用pandas DataFrame读取我们
需要
的历史股票数据,并将其转换为num
py
数组进行处理。
接下来,我们将使用talib库中提供的KD指标进行计算。我们
需要
在程序中定义一个函数来调用talib库的指标计算功能,并将其应用于我们的数据中。具体实现可以使用以下代码:
import talib
def kd_indicator(high_prices, low_prices, close_prices):
k, d = talib.STOCH(high_prices, low_prices, close_prices, 14, 3, 3)
return k, d
在运行以上代码后,我们就能获得K值和D值,以此来判断此
时
的市场趋势,从而对股票进行买卖的决策。
进行
量化交易
的核心是制定交易
策略
,并根据特定的参数进行买卖操作。我们可以在程序中定义一个
策略
函数,来确定买卖的条件。以下是一个简单的
策略
函数示例,实现了基本的KD交叉买卖法:
def trading_strategy(k, d):
if k[-1] > d[-1]: # K值大于D值,买入
return 'buy'
elif k[-1] < d[-1]: # K值小于D值,卖出
return 'sell'
else:
return 'hold' # K值等于D值,持有
我们还可以添加更多的条件变量,如移动平均线指标、RSI指标等,以增加算法的精度和可靠性。
最后,我们
需要
将我们的
策略
应用到实际交易中。在程序中,我们可以使用模拟交易的方法,使用历史数据进行回测,以评估我们的
策略
的效果。在这个过程中我们
需要
记录每一次的买入、卖出,并且根据这些交易记录来计算总收益、年化收益率、夏普比率等指标。
综上所述,KD指标的
量化交易
策略
代码是一个相对简单的代码实现,可以通过
Py
thon语言的优势实现。但是,
需要
注意的是,对于指标的选择和参数的调整,
需要
应根据股票市场的不同而灵活调整,以达到最好的效果。