成功的量化交易 PART 6——自动化交易(3)


成功的量化交易 PART 6——自动化交易(3)

数据处理器


事件驱动的交易系统的目标之一是最小化回溯测试元素和实时执行元素之间的代码重复。理想情况下,最好使用相同的信号生成方法和投资组合管理组件来进行历史测试和实时交易。为了使产生信号的策略对象和提供基于信号的订单的组合对象能够工作,必须使用与市场提要相同的接口来进行历史运行和实时运行。


这激发了基于数据处理器对象的类层次结构的概念,它为所有子类提供一个接口,以便为系统内的其余组件提供市场数据。通过这种方式,任何子类的数据处理程序都可以“换出”,而不会影响策略或投资组合的计算。


具体的例子子类可以包括HistoricCSVDataHandler, QuandlDataHandler, SecuritiesMasterDataHandler, InteractiveBrokersMarketFeedDataHandler等。在本章中,我们只考虑创建一个历史性的CSV数据处理程序,它将在日内加载股票的开-高-低-收-量-空盘量的bars的CSV数据。然后,可以使用它来逐个bar地将数据“滴灌”到系统的每个心跳上的策略和投资组合类中,从而避免前向偏差。


第一个任务是导入必要的库。具体来说,需要导入pandas和抽象基类工具。由于数据处理器生成市场事件,因此还需要上面描述的event.py。

#!/usr/bin/python

# -*- coding: utf-8 -*-

# data.py

from __future__ import print_function

from abc import ABCMeta, abstractmethod

import datetime

import os, os.path import numpy as np

import pandas as pd

from event import MarketEvent


数据处理器是一个抽象基类(ABC),这意味着不可能直接实例化一个实例。只能实例化子类。这样做的基本原理是,ABC提供了一个所有后续数据处理器子类都必须遵守的接口,从而确保与与之通信的其他类的兼容性。


我们使用了_metaclass__属性让Python知道这是一个ABC。此外,我们使用@abstractmethod装饰器让Python知道方法将在子类中被覆盖(这与c++中的纯虚方法相同)。


这个类列出了6个方法。前两个方法get_latest_bar和get_latest_bars用于从存储的交易bar列表中检索历史交易bar的最近子集。由于需要不断了解当前的市场价格和交易量,这些方法在策略和投资组合类中非常方便。


下面的方法get_latest_bar_datetime简单地返回一个Python datetime对象,该对象表示bar的时间戳(例如,日bar的日期或分钟条的分钟分辨率对象)。


以下两个方法get_latest_bar_value和get_latest_bar_values是用于从特定bar或bar列表中检索单个值的便利方法。例如,一个策略通常只对收盘价感兴趣。在本例中,我们可以使用这些方法来返回一个浮点值列表,这些浮点值表示以前的bar的收盘价,而不必从bar对象列表中获取它。这通常会提高利用“回溯窗口”的策略的效率,例如那些涉及回归的策略。


最后一个方法update_bars提供了一种“点滴输入”机制,用于将bar信息放置在严格禁止前向偏差的新数据结构上。这是事件驱动的回溯测试系统与基于矢量化的系统之间的关键区别之一。注意,如果试图实例化类,将会引发异常:

# data.py
class DataHandler(object):
"""
数据处理器是一个抽象基类,
为所有后续的(继承的)数据处理程序(实时的和历史的)提供接口。
(派生的)数据处理器对象的目标是为请求的每个品种输出生成的一组bar (OHLCVI)。
这将复制实时策略的运作方式,即将当前市场数据“发送到管道”。
因此,一个历史的和实时的系统将被其余的回溯测试套件以相同的方式对待。
"""
__metaclass__ = ABCMeta
@abstractmethod
def get_latest_bar(self, symbol):
"""
返回最后更新的bar。
"""
raise NotImplementedError("Should implement get_latest_bar()")
@abstractmethod
def get_latest_bars(self, symbol, N=1):
"""
返回最后更新的N个bar。
"""
raise NotImplementedError("Should implement get_latest_bars()")
@abstractmethod
def get_latest_bar_datetime(self, symbol):
"""
为最后一个bar返回一个Python datetime对象。
"""
raise NotImplementedError("Should implement
get_latest_bar_datetime()")
@abstractmethod
def get_latest_bar_value(self, symbol, val_type):
"""
从最后一个横条返回开、高、低、收、量或OI中的一个。


"""
raise NotImplementedError("Should implement
get_latest_bar_value()")
@abstractmethod
def get_latest_bars_values(self, symbol, val_type, N=1):
"""
从latest_symbol列表中返回最后的N个bar值,如果可用性较低,则返回N-k。
"""
raise NotImplementedError("Should implement
get_latest_bars_values()")
@abstractmethod
def update_bars(self):
"""
以开-高-低-收-量-的元组各式将每个品种的最新bar推送到bars_queue: (datetime、open、high、low、close、volume、open interest)。
"""
raise NotImplementedError("Should implement update_bars()")


为了创建基于历史数据的回溯测试系统,我们需要考虑一种通过公共源导入数据的机制。我们在前几章已经讨论了Securities Master的好处。因此,创建数据处理器类的一个很好的候选方法是将它与这样的数据库结合起来。


然而,为了清楚起见,在本章中,我想讨论一种更简单的机制,即导入(可能很大的)逗号分隔变量(CSV)文件。这将允许我们关注创建数据处理器的机制,而不是关注连接到数据库并使用SQL查询获取数据的“样板”代码。


因此,我们将定义HistoricCSVDataHandler子类,它被设计用来处理多个CSV文件(每个已交易的品种对应一个),并将它们转换成一个pandas数据流字典,可以通过前面提到的bar方法访问。


数据处理程序需要几个参数,即将市场事件信息推送到的事件队列、CSV文件的绝对路径和符号列表。下面是初始化的类:

# data.py
class HistoricCSVDataHandler(DataHandler):
"""
HistoricCSVDataHandler被设计为从磁盘为每个请求的品种读取CSV文件,
并提供一个接口,以与实时交易接口相同的方式获取“最新”栏。
"""
def __init__(self, events, csv_dir, symbol_list):
"""
通过请求CSV文件的位置和品种列表来初始化历史数据处理程序。
它将假定所有文件的形式'symbol。csv ',其中symbol是列表中的字符串。
Parameters:
events - 事件队列。
csv_dir - CSV文件的绝对目录路径。
symbol_list - 品种字符串的列表。
"""
self.events = events


self.csv_dir = csv_dir
self.symbol_list = symbol_list
self.symbol_data = {}
self.latest_symbol_data = {}
self.continue_backtest = True
self._open_convert_csv_files()


处理程序将在绝对目录csv_dir中查找文件,并尝试以“SYMBOL”格式打开它们。其中的品种是股票代码(如GOOG或AAPL)。这些文件的格式与Yahoo Finance提供的格式相匹配,但是很容易修改以处理其他数据格式,比如Quandl或DTN IQFeed提供的格式。文件的打开由下面的_open_convert_csv_files方法处理。


在HistoricCSVDataHandler内部使用pandas作为数据存储的好处之一是,可以将所跟踪的所有品种的索引合并在一起。这允许将缺失的数据点向前填充、向后填充或在这些间隙内插入,以便在条形图的基础上比较报价器。例如,这对于均值回归策略是必要的。注意union和reindex方法在为所有符号组合索引时的使用:

# data.py
def _open_convert_csv_files(self):
"""
打开数据目录中的CSV文件,将它们转换为符号字典中的panda数据流。
对于这个处理程序,将假定数据来自Yahoo。因此,它的格式将得到尊重。
"""


comb_index = None
for s in self.symbol_list:
# 加载没有标头信息的CSV文件,按日期索引
self.symbol_data[s] = pd.io.parsers.read_csv(
os.path.join(self.csv_dir, ’%s.csv’ % s),
header=0, index_col=0, parse_dates=True,
names=[
’datetime’, ’open’, ’high’,
’low’, ’close’, ’volume’, ’adj_close’
]
).sort()
# 组合索引以填充前向值
if comb_index is None:
comb_index = self.symbol_data[s].index
else:
comb_index.union(self.symbol_data[s].index)
# 将最新的symbol_data设置为None
self.latest_symbol_data[s] = []
# 重建索引的dataframes
for s in self.symbol_list:
self.symbol_data[s] = self.symbol_data[s].\\
reindex(index=comb_index, method=’pad’).iterrows()


_get_new_bar方法创建一个生成器来提供一个新的bar。这意味着对该方法的后续调用将产生一个新的bar,直到到达品种数据的结尾:


# data.py
def _get_new_bar(self, symbol):
"""
返回数据提要中的最新bar。
"""
for b in self.symbol_data[symbol]:yield b


要实现的数据处理器中的第一个抽象方法是get_latest_bar和get_latest_bars。这些方法简单地提供了一个bar或latest_symbol_data结构的最后N个bar的列表:

# data.py
def get_latest_bar(self, symbol):
"""
返回latest_symbol列表中的最新bar。
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
else:
return bars_list[-1]
def get_latest_bars(self, symbol, N=1):
"""
返回latest_symbol列表的最后N个条,
如果可用性较低,则返回N-k。
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
else:
return bars_list[-N:]


下一个方法是get_latest_bar_datetime,它在最新的bar中查询一个表示“最后市场价格”的datetime对象:

def get_latest_bar_datetime(self, symbol):

"""
为最后一个bar返回一个Python datetime对象。
"""
try:


bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
else:
return bars_list[-1][0]


接下来实现的两个方法是get_latest_bar_value和get_latest_bar_values。这两种方法都使用Python的getattr函数,该函数查询对象以查看对象上是否存在特定的属性。因此,我们可以将诸如“open”或“close”之类的字符串传递给getattr,并直接从bar获取值,从而使方法更加灵活。这使我们不必编写get_latest_bar_close类型的方法,例如:

def get_latest_bar_value(self, symbol, val_type):
"""
从pandas Bar序列对象中返回一个开、高、低、收、量或空盘值。"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
else:
return getattr(bars_list[-1][1], val_type)
def get_latest_bars_values(self, symbol, val_type, N=1):
"""
从latest_symbol列表中返回最后的N个bar值,
如果可用性较低,则返回N-k。
"""
try:
bars_list = self.get_latest_bars(symbol, N)
except KeyError:
print("That symbol is not available in the historical data set.")
raise
else:
return np.array([getattr(b[1], val_type) for b in bars_list])


最后一个方法update_bars是数据处理器中的第二个抽象方法。它只是生成一个市场事件,当它将最新的bar添加到latest_symbol_data字典中时,该事件被添加到队列中:

# data.py
def update_bars(self):
"""
将符号列表中所有品种的最新bar推到latest_symbol_data结构。
"""
for s in self.symbol_list:
try:
bar = next(self._get_new_bar(s))
except StopIteration:
self.continue_backtest = False
else:
if bar is not None:
self.latest_symbol_data[s].append(bar)
self.events.put(MarketEvent())


分享到:


相關文章: