添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
function_result = self.function(*args, **kwargs) except FileNotFoundError as result: # filename文件 没找到 print("Error: file is not found %s" % result) except AttributeError as result: print("Error: %s" % result) except Exception as result: print("unkown Error: %s" % result) else: return function_result 合约模型类 class PositionModel: @classmethod def position_key(cls): return "pos" @classmethod def entryprice_key(cls): return "entryprice" @classmethod def target_position_key(cls): return "target_pos" @classmethod def target_position_er_key(cls): return "target_position" @classmethod def buy_position_key(cls): return "buy_position" @classmethod def sell_position_key(cls): return "sell_position" @classmethod def net_position_key(cls): return "net_position" def __init__(self, position_dictionary): if PositionModel.position_key() in position_dictionary.keys(): self.lots = position_dictionary[PositionModel.position_key()] else: self.lots = None if PositionModel.entryprice_key() in position_dictionary.keys(): self.entry_price = position_dictionary[PositionModel.entryprice_key()] else: self.entry_price = None if PositionModel.target_position_key() in position_dictionary.keys(): self.target_position = position_dictionary[PositionModel.target_position_key()] else: self.target_position = None def model_to_dictionary(self): empty_dictionary = {} if self.lots is not None: empty_dictionary[PositionModel.position_key()] = self.lots if self.entry_price is not None: empty_dictionary[PositionModel.entryprice_key()] = self.entry_price if self.target_position is not None: empty_dictionary[PositionModel.target_position_key()] = self.target_position return empty_dictionary class ContractModel: def __init__(self, contract_name, position_model): self.name = contract_name self.__position_model = position_model @property def position_model(self): return self.__position_model @position_model.setter def position_model(self, position_model): self.__position_model = position_model @position_model.getter def position_model(self): return self.__position_model @classmethod def dictionary_to_models(cls, data_dictionary): contract_models = [] for contract_name, position_dictionary in data_dictionary.items(): contract_model = ContractModel( contract_name=contract_name, position_model=PositionModel(position_dictionary=position_dictionary) contract_models.append(contract_model) return contract_models @classmethod def models_to_dictionary(cls, *models): empty_dictionary = {} for model in models: empty_dictionary[model.name] = model.position_model.model_to_dictionary() return empty_dictionary 持仓json文件操作类 class PositionJsonOperator: 操作 json文件 的 类 ps: 使用时, 需要导入 json、math库 @classmethod def __read_type(cls): return "r" @classmethod def __write_type(cls): return "w" @classmethod def __utf_8_encoding(cls): return "utf-8" # ------------------ public part ------------------ @classmethod def tqz_load_jsonfile(cls, jsonfile=None): if jsonfile is None: exception = Exception("Error: filename is None") raise exception else: return cls.__writeReadFile_except_operation(jsonfile=jsonfile, operation_type=cls.__read_type()) @classmethod def tqz_write_jsonfile(cls, content=None, target_jsonfile=None): if target_jsonfile is None: exception = Exception("Error: filename is None") raise exception else: cls.__writeReadFile_except_operation(jsonfile=target_jsonfile, content=content, operation_type=cls.__write_type()) # 用来转网页格式 @classmethod def tqz_get_single_jsonfile_buy_sell(cls, jsonfile=None): content_dictionary = cls.tqz_load_jsonfile(jsonfile=jsonfile) empty_content_dictionary = {} for vt_symbol_strategy, data in content_dictionary.items(): for position_key, position_data in data.items(): if position_key == PositionModel.position_key(): if position_data > 0: buy_position, sell_position, net_position = abs(position_data), 0, position_data else: buy_position, sell_position, net_position = 0, abs(position_data), position_data empty_content_dictionary[vt_symbol_strategy] = { PositionModel.buy_position_key(): buy_position, PositionModel.sell_position_key(): sell_position, PositionModel.entryprice_key(): data[PositionModel.entryprice_key()], PositionModel.net_position_key(): net_position return empty_content_dictionary @classmethod def tqz_get_sum_position_jsonfile_data_buy_sell(cls, *jsonfile_list): jsonfile_content_list = [] [jsonfile_content_list.append(cls.tqz_get_single_jsonfile_buy_sell(jsonfile)) for jsonfile in jsonfile_list] return cls.tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell( *jsonfile_content_list @classmethod def tqz_get_multi_position_jsonfile_data_buy_sell(cls, *jsonfile_list, multi): sum_content = cls.tqz_get_sum_position_jsonfile_data_buy_sell(*jsonfile_list) for vt_symbol, data in sum_content.items(): before_buy_position = data[PositionModel.buy_position_key()] before_sell_position = data[PositionModel.sell_position_key()] before_net_position = data[PositionModel.net_position_key()] data[PositionModel.buy_position_key()] = math.floor(before_buy_position * multi) data[PositionModel.sell_position_key()] = math.floor(before_sell_position * multi) if before_net_position > 0: data[PositionModel.net_position_key()] = math.floor(before_net_position * multi) else: data[PositionModel.net_position_key()] = math.ceil(before_net_position * multi) if before_net_position > 0 and data[PositionModel.net_position_key()] is 0: data[PositionModel.net_position_key()] = 1 data[PositionModel.buy_position_key()] = 1 elif before_net_position < 0 and data[PositionModel.net_position_key()] is 0: data[PositionModel.net_position_key()] = -1 data[PositionModel.sell_position_key()] = 1 sum_content[vt_symbol] = data return sum_content @classmethod def tqz_get_empty_position_jsonfile_data_buy_sell(cls, *jsonfile_list): sum_content = cls.tqz_get_sum_position_jsonfile_data_buy_sell(*jsonfile_list) for vt_symbol, data in sum_content.items(): data[PositionModel.buy_position_key()] = 0 data[PositionModel.sell_position_key()] = 0 data[PositionModel.net_position_key()] = 0 sum_content[vt_symbol] = data return sum_content @classmethod def tqz_get_ER_position_format_data_buy_sell(cls, jsonfile): er_content_data = cls.tqz_load_jsonfile(jsonfile=jsonfile) empty_content_dictionary = {} for main_data in er_content_data.values(): for target_position_key, position_datas in main_data.items(): if target_position_key == PositionModel.target_position_er_key(): for vt_symbol, position_data in position_datas.items(): if vt_symbol not in empty_content_dictionary.keys(): if position_data > 0: buy_position, sell_position, net_position = abs(position_data), 0, position_data else: buy_position, sell_position, net_position = 0, abs(position_data), position_data empty_content_dictionary[vt_symbol] = { PositionModel.buy_position_key(): buy_position, PositionModel.sell_position_key(): sell_position, PositionModel.net_position_key(): net_position else: if position_data > 0: buy_position, sell_position, net_position = abs(position_data), 0, position_data else: buy_position, sell_position, net_position = 0, abs(position_data), position_data empty_content_dictionary[vt_symbol] = { PositionModel.buy_position_key(): buy_position + empty_content_dictionary[vt_symbol][PositionModel.buy_position_key()], PositionModel.sell_position_key(): sell_position + empty_content_dictionary[vt_symbol][PositionModel.sell_position_key()], PositionModel.net_position_key(): net_position + empty_content_dictionary[vt_symbol][PositionModel.net_position_key()] return empty_content_dictionary @classmethod def tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell(cls, *jsonfile_content_list): temp_dic_list = [] for single_json_content in jsonfile_content_list: temp_dic = {} for key, value in single_json_content.items(): vt_symbol = key.split(".")[0] + "." + key.split(".")[1] temp_dic[vt_symbol] = value temp_dic_list.append(temp_dic) dic_list = temp_dic_list sum_content = {} for jsonfile_content in dic_list: for vt_symbol, data in jsonfile_content.items(): if vt_symbol in sum_content.keys(): sum_content[vt_symbol][PositionModel.buy_position_key()] += data[PositionModel.buy_position_key()] sum_content[vt_symbol][PositionModel.sell_position_key()] += data[PositionModel.sell_position_key()] sum_content[vt_symbol][PositionModel.net_position_key()] += data[PositionModel.net_position_key()] else: sum_content[vt_symbol] = { PositionModel.buy_position_key(): data[PositionModel.buy_position_key()], PositionModel.sell_position_key(): data[PositionModel.sell_position_key()], PositionModel.net_position_key(): data[PositionModel.net_position_key()] return sum_content # ------------------ net position part ------------------ @classmethod def __tqz_sum_position_all_jsonfile_net(cls, *jsonfile_list, target_jsonfile): 加总多个 json文件的 持仓, 并写入新的目标json文件中 :param jsonfile_list: 字符串数组 :param target_jsonfile: 要存入的 json文件名 jsonfile_content_list = [] [jsonfile_content_list.append(cls.tqz_load_jsonfile(jsonfile)) for jsonfile in jsonfile_list] content_dic = cls.__sum_position_all_keyValueNotFound_except_operation(dic_list=jsonfile_content_list) cls.tqz_write_jsonfile(content=content_dic, target_jsonfile=target_jsonfile) @classmethod def __tqz_get_sum_position_jsonfile_data_net(cls, *jsonfile_list): 加总多个 json文件的 持仓, 直接返回加总后的 数据; :param jsonfile_list: 字符串数组 return: 加总后的 json文件数据 jsonfile_content_list = [] [jsonfile_content_list.append(cls.tqz_load_jsonfile(jsonfile)) for jsonfile in jsonfile_list] return cls.__sum_position_all_keyValueNotFound_except_operation( dic_list=jsonfile_content_list @classmethod def __tqz_get_sum_position_jsonfile_data_with_jsonfileContent_net(cls, *jsonfile_content_list): return cls.__sum_position_all_keyValueNotFound_except_operation( dic_list=jsonfile_content_list @classmethod def __tqz_get_multi_position_jsonfile_data_net(cls, *jsonfile_list, multi): 先加总 多个json文件的 持仓, 并返回按倍数调整过的 单个json文件数据; :param jsonfile_list: 需要调整持仓的 json文件路径数组; :param multi: 倍数; return 返回加总后并按倍数调整过的 单个json文件的 持仓数据; return cls.__multi_position_keyValueNotFound_except_operation( source_content=cls.__tqz_get_sum_position_jsonfile_data_net(*jsonfile_list), multi=multi @classmethod def __tqz_get_empty_position_jsonfile_data_net(cls, *jsonfile_list): sum_dictionary = cls.__tqz_get_sum_position_jsonfile_data_net(*jsonfile_list) empty_dictionary = {} for symbol, data in sum_dictionary.items(): data[PositionModel.position_key()] = 0 empty_dictionary[symbol] = data return empty_dictionary @classmethod def __tqz_get_ER_position_format_data_net(cls, jsonfile): jsonfile_data = cls.tqz_load_jsonfile(jsonfile=jsonfile) empty_dic = {} for value in jsonfile_data.values(): for target_position_key, position_data in value.items(): if target_position_key == PositionModel.target_position_er_key(): for vt_symbol, position in position_data.items(): vt_symbol_type = vt_symbol + '.ER' if vt_symbol_type not in empty_dic.keys(): empty_dic[vt_symbol_type] = { PositionModel.position_key(): position, PositionModel.target_position_key(): position elif vt_symbol_type in empty_dic.keys(): empty_dic[vt_symbol_type][PositionModel.position_key()] += position empty_dic[vt_symbol_type][PositionModel.target_position_key()] += position return empty_dic # ------------------ private part ------------------ @classmethod def __tqz_multi_position_all_net(cls, *jsonfile_list, multi): 按倍数调整 多个json文件的 持仓 :param jsonfile_list: 需要调整持仓的 json文件数组 :param multi: 倍数 [cls.__multi_position(source_jsonfile=jsonfile, multi=multi) for jsonfile in jsonfile_list] @classmethod def __tqz_empty_position_all_net(cls, *jsonfile_list): 清空 多个json文件的 持仓 :param jsonfile_list: 需要清空的 json文件数组 cls.__tqz_multi_position_all_net(*jsonfile_list, multi=0) @classmethod def __multi_position(cls, source_jsonfile, multi): cls.tqz_write_jsonfile( content=cls.__get_multi_position(source_jsonfile=source_jsonfile, multi=multi), target_jsonfile=source_jsonfile @classmethod def __get_multi_position(cls, source_jsonfile, multi): return cls.__multi_position_keyValueNotFound_except_operation( source_content=cls.tqz_load_jsonfile(jsonfile=source_jsonfile), multi=multi @classmethod def __empty_position(cls, source_jsonfile): cls.__multi_position(source_jsonfile=source_jsonfile, multi=0) # ------------------ except operation part ------------------ @classmethod @DecorateExcept def __writeReadFile_except_operation(cls, jsonfile=None, content=None, operation_type="r"): if cls.__read_type() is operation_type: with open(jsonfile, operation_type, encoding=cls.__utf_8_encoding()) as fp: return json.load(fp=fp) elif cls.__write_type() is operation_type: with open(jsonfile, operation_type, encoding=cls.__utf_8_encoding()) as fp: json.dump(content, fp=fp, ensure_ascii=False, indent=4) # 参数indent: json文件按格式写入, 距行首空4格; @classmethod @DecorateExcept def __sum_position_all_keyValueNotFound_except_operation(cls, dic_list=None): new_json_contract_models = [] new_json_contract_names = [] temp_dic_list = [] for single_json_content in dic_list: temp_dic = {} for key, value in single_json_content.items(): vt_symbol = key.split(".")[0] + "." + key.split(".")[1] temp_dic[vt_symbol] = value temp_dic_list.append(temp_dic) dic_list = temp_dic_list all_json_contract_models = [] if dic_list is not None: [all_json_contract_models.append(ContractModel.dictionary_to_models(data_dictionary=dic)) for dic in dic_list] for contract_models in all_json_contract_models: for contract_model in contract_models: if contract_model.name not in new_json_contract_names: new_json_contract_models.append(ContractModel(contract_name=contract_model.name, position_model=contract_model.position_model)) new_json_contract_names.append(contract_model.name) else: find_contract_model = cls.__find_contract_model(*new_json_contract_models, model_name=contract_model.name) find_contract_model.position_model.lots += contract_model.position_model.lots return ContractModel.models_to_dictionary(*new_json_contract_models) @classmethod def __find_contract_model(cls, *contract_models, model_name): for contract_model in contract_models: if contract_model.name == model_name: return contract_model return None @classmethod @DecorateExcept def __multi_position_keyValueNotFound_except_operation(cls, source_content=None, multi=1): all_json_contract_models = [] if source_content is not None: [all_json_contract_models.append(ContractModel(contract_name=contract_name, position_model=PositionModel(position_dictionary=position_dictionary))) for contract_name, position_dictionary in source_content.items()] for contract_model in all_json_contract_models: before_multi_lots = contract_model.position_model.lots # multi_lots: 按倍数调整完后的仓位 if before_multi_lots > 0: multi_lots = math.floor(contract_model.position_model.lots * multi) else: multi_lots = math.floor(-1 * contract_model.position_model.lots * multi) * -1 # 按倍数调整完, 确保持仓至少保留1手; if (before_multi_lots > 0) and (multi_lots is 0): # 原先持有多单, 仓位调整完后为 0; multi_lots = 1 elif (before_multi_lots < 0) and (multi_lots is 0): # 原先持有空单, 仓位调整完后为 0; multi_lots = -1 contract_model.position_model.lots = multi_lots return ContractModel.models_to_dictionary(*all_json_contract_models) def __main_engine(): sum_data = PositionJsonOperator.tqz_get_sum_position_jsonfile_data_with_jsonfileContent_buy_sell( PositionJsonOperator.tqz_get_ER_position_format_data_buy_sell("portfolio_strategy_data.json"), PositionJsonOperator.tqz_get_sum_position_jsonfile_data_buy_sell("hla.json", "hsr.json") target_jsonfile = "hls_hsr_er.json" PositionJsonOperator.tqz_write_jsonfile( content=sum_data, target_jsonfile=target_jsonfile list = [ "symbol_2.json", "symbol_3.json", "symbol_4.json", "symbol_5.json" print(PositionJsonOperator.tqz_get_sum_position_jsonfile_data_buy_sell(*list)) print(PositionJsonOperator.tqz_get_multi_position_jsonfile_data_buy_sell(*list, multi=0.5)) print(PositionJsonOperator.tqz_get_empty_position_jsonfile_data_buy_sell(*list)) """ net position part list = [ "symbol_2.json", "symbol_3.json", "symbol_4.json", "symbol_5.json" target_jsonfile = "test.json" PositionJsonOperator.tqz_sum_position_all_jsonfile_net(*list, target_jsonfile=target_jsonfile) print(PositionJsonOperator.tqz_get_multi_position_jsonfile_data_net(target_jsonfile, multi=1)) print(PositionJsonOperator.tqz_get_empty_position_jsonfile_data_net(target_jsonfile)) PositionJsonOperator.tqz_multi_position_all_net(target_jsonfile, multi=0.5) PositionJsonOperator.tqz_empty_position_all_net(target_jsonfile) if __name__ == '__main__': __main_engine() import jsonimport math""" 处理异常的装饰器类"""class DecorateExcept: def __init__(self, function): self.function = function def __call__(self, *args, **kwargs): try: function_result = self.function(*args, **kwargs) .
vn . py 是基于 Py thon语言的 量化交易 系统,是目前国内最好的开源 量化交易 平台之一。 vn . py 是机构级别的 量化交易 软件,掌握 vn . py 框架原理并且熟练使用,有利于快速入门 量化交易 ,搭建自己的 量化交易 系统,也可以在机构中找到与量化岗位相关的工作。 本文是原创作品,针对 vn . py 的最新版本2.2.0(2021年3月26日发布),从源码的下载、安装、主程序入口、主窗口入手,先跟您一起把源码运行起来。再聚焦于 vn . py 的一个重要应用“CTA回测”,从各个层次上分析其源码,包括相关的数据库操作、多线程机制、事件引擎机制等,把这个应用从顶到底,再从底到顶贯通起来。明白了这些内容,再分析其它的功能就会事半功倍。 本文适合想要分析 vn . py 源码的人,还要看清楚软件版本,否则请不要下载,免得浪费积分。
for market_vt_symbol, bar in bars.items(): # strategy position strategy_position = TQZSymbolOperator.tqz_get_strategy_position( market_vt_symbol=market_vt_symbol, for trade in self.tradeDict.values(): # 复制成交对象,因为下面的开平 交易配对涉及到对成交数量的修改 # 若不进行复制直接操作,则计算完后所有成交的数量会变成0
from vn py .app.position_manager.tools.position_operator.position_operator import PositionModel class SymbolOperator: @classmethod def tqz_get_strategy_position(cls, market_vt_symbol, strategy_data): if market_vt_symbol in cls.__get_str..
事情起因:融行虽然有规避自成交的功能模块;但还有很多情况,程序同 发相同合约 多空 两向的单子,融行并不会做处理,估计这是融行系统的一个bug。 但现在的MOM账户大都用的是融行系统,所以自成交风险的避免还是要我们自己程序这边来处理。 from vn py .app.position_manager import StrategyTemplate from vn py .trader.utility import BarGenerator import math from time impor.
Vn Trader简介 目前 Vn Trader基本实现了国内外全品种的交易,包括期货、股票、贵金属、外盘交易等。对于 vn . py 的初学者而言,建议从项目的图形交易程序 Vn Trader开始学习,先了解如何通过界面来监控行情与交易信息,并进行手工交易。 Vn Trader对接了目前 vn . py 项目中所有的交易API,包括期货、股票、贵金属、外盘、比特币等。下文以最常用的CTP交易接口为例,讲解如何配置和使用...
""" 发单逻辑部分 """ def tqz_synchronization_position_cffex_lock_mode(self, market_vt_symbol, now_price, offset_price, strategy_position_net, real_position_net): synchronization position with lock mode(cffex mode)
1、发现我的 vn py 无法保存 仓位 ,出现了一个unicode的报错。结果竟然是,新 策略 保存 仓位 和老 策略 有冲突。在用 策略 前,最好删除mongodb数据库里的 仓位 表。 2、初始化 策略 候发现一个 策略 类的两个不同品种,无法一样的初始化值。主要是由于初始化inti()函数执行的 候是要整个跑玩onbar()的,加入这句话就好了。引入一个trading标志。 # 全部指标初始化完成并且开始交易,才开始接
KD指标是广泛应用的技术指标之一,其作用是用以测量股价的动量,并提供买入卖出的信号。在 Py thon中,KD的应用可以通过简单的代码实现,以此来进行 量化交易 。 首先,我们 需要 导入程序中 需要 使用的库,包括pandas、num py 、matplotlib和talib。在导入库之后,我们 需要 进行数据的读取和处理。我们可以通过使用pandas DataFrame读取我们 需要 的历史股票数据,并将其转换为num py 数组进行处理。 接下来,我们将使用talib库中提供的KD指标进行计算。我们 需要 在程序中定义一个函数来调用talib库的指标计算功能,并将其应用于我们的数据中。具体实现可以使用以下代码: import talib def kd_indicator(high_prices, low_prices, close_prices): k, d = talib.STOCH(high_prices, low_prices, close_prices, 14, 3, 3) return k, d 在运行以上代码后,我们就能获得K值和D值,以此来判断此 的市场趋势,从而对股票进行买卖的决策。 进行 量化交易 的核心是制定交易 策略 ,并根据特定的参数进行买卖操作。我们可以在程序中定义一个 策略 函数,来确定买卖的条件。以下是一个简单的 策略 函数示例,实现了基本的KD交叉买卖法: def trading_strategy(k, d): if k[-1] > d[-1]: # K值大于D值,买入 return 'buy' elif k[-1] < d[-1]: # K值小于D值,卖出 return 'sell' else: return 'hold' # K值等于D值,持有 我们还可以添加更多的条件变量,如移动平均线指标、RSI指标等,以增加算法的精度和可靠性。 最后,我们 需要 将我们的 策略 应用到实际交易中。在程序中,我们可以使用模拟交易的方法,使用历史数据进行回测,以评估我们的 策略 的效果。在这个过程中我们 需要 记录每一次的买入、卖出,并且根据这些交易记录来计算总收益、年化收益率、夏普比率等指标。 综上所述,KD指标的 量化交易 策略 代码是一个相对简单的代码实现,可以通过 Py thon语言的优势实现。但是, 需要 注意的是,对于指标的选择和参数的调整, 需要 应根据股票市场的不同而灵活调整,以达到最好的效果。