回测执行引擎BacktestingEngine
回顾前面的文章CTA:回测综述,那里提到,真正执行回测的逻辑,写在BacktestingEngine
中。
代码解读
BacktestingEngine
定义在vnpy_ctastrategy -> backtesting.py
中。
package
from collections import defaultdict
from datetime import date, datetime, timedelta
from typing import Callable, List, Dict, Optional, Type
from functools import lru_cache, partial
import traceback
这一部分导入了数据结构之类的库。
import numpy as np
from pandas import DataFrame, Series
from pandas.core.window import ExponentialMovingWindow
import plotly.graph_objects as go
from plotly.subplots import make_subplots
这一部分导入了数据处理的库,以及交互式数据可视化库plotly
。
from vnpy.trader.constant import (Direction,Offset,Exchange,Interval,Status
)
from vnpy.trader.database import get_database, BaseDatabase
from vnpy.trader.object import OrderData, TradeData, BarData, TickData
from vnpy.trader.utility import round_to, extract_vt_symbol
from vnpy.trader.optimize import (OptimizationSetting,check_optimization_setting,run_bf_optimization,run_ga_optimization
)
这一部分导入了VNPY全局定义的一些函数、数据结构。
from .base import (BacktestingMode,EngineType,STOPORDER_PREFIX,StopOrder,StopOrderStatus,INTERVAL_DELTA_MAP
)
from .template import CtaTemplate
from .locale import _
从.base
当中导入了有关回测引擎的一些设置变量。
BacktestingMode
回测模式:
分为K线模式和Tick模式class BacktestingMode(Enum):BAR = 1TICK = 2
EngineType
引擎类型
分为实盘和回测两种类型class EngineType(Enum):LIVE = _("实盘")BACKTESTING = _("回测")
STOPORDER_PREFIX
,StopOrder
,StopOrderStatus
停止订单STOPORDER_PREFIX = "STOP"@dataclass class StopOrder:vt_symbol: strdirection: Directionoffset: Offsetprice: floatvolume: floatstop_orderid: strstrategy_name: strdatetime: datetimelock: bool = Falsenet: bool = Falsevt_orderids: list = field(default_factory=list)status: StopOrderStatus = StopOrderStatus.WAITINGclass StopOrderStatus(Enum):WAITING = _("等待中")CANCELLED = _("已撤销")TRIGGERED = _("已触发")
初始设置
回测设置
class BacktestingEngine:""""""engine_type: EngineType = EngineType.BACKTESTINGgateway_name: str = "BACKTESTING"def __init__(self) -> None:""""""self.vt_symbol: str = ""self.symbol: str = ""self.exchange: Exchange = Noneself.start: datetime = Noneself.end: datetime = Noneself.rate: float = 0self.slippage: float = 0self.size: float = 1self.pricetick: float = 0self.capital: int = 1_000_000self.risk_free: float = 0self.annual_days: int = 240self.half_life: int = 120self.mode: BacktestingMode = BacktestingMode.BAR
这一部分进行回测相关设置
start
,end
:回测起止日期rate
:手续费率slippage
:交易滑点,指下单的点位和最后成交的点位的差距size
:合约乘数pricetick
:最小价格单位capital
:初始资金risk_free
:无风险利率(用于计算绩效指标)annual_days
:每年交易日期(用于计算年化指标)mode
:回测交易频率模式,默认按K线频率回测
这里与策略的初始设置需要相区分。策略的初始设置是设置策略参数,如双均线的回看窗口大小。
策略相关设置
self.strategy_class: Type[CtaTemplate] = Noneself.strategy: CtaTemplate = Noneself.tick: TickDataself.bar: BarDataself.datetime: datetime = Noneself.interval: Interval = Noneself.days: int = 0self.callback: Callable = Noneself.history_data: list = []
strategy_class
:策略类别- 如双均线策略是
DoubleMaStrategy
;strategy
:策略实例
- 如双均线策略是
callback
:回调函数- 策略在调用
on_init
时,会调用load_bar
- 而在
load_bar
中,会传入callback
,一般是策略的回调函数on_bar
- 策略在调用
history_data
:历史数据- 回测使用行情数据的方法是,一次性将数据加载到内存当中
订单管理
self.stop_order_count: int = 0self.stop_orders: Dict[str, StopOrder] = {}self.active_stop_orders: Dict[str, StopOrder] = {}self.limit_order_count: int = 0self.limit_orders: Dict[str, OrderData] = {}self.active_limit_orders: Dict[str, OrderData] = {}self.trade_count: int = 0self.trades: Dict[str, TradeData] = {}
主要分为两种订单:
- 停止单
stop_order
stop_order_count
:停止单计数stop_orders
:停止单字典active_stop_orders
:活跃停止单字典
- 限价单
limit_order
以及跟踪成交情况的:
trade_count
:成交数量trades
:成交字典
停止单(Stop Order)是一种在金融交易中常用的委托类型。与市价单和限价单不同,停止单允许投资者设定一个触发价格,当市场价格达到或超过这个触发价格时,停止单才会变成市价单执行。这种委托类型主要用于控制风险,确保只有在市场条件达到预期时才进行交易。
绩效统计分析
self.logs: list = []self.daily_results: Dict[date, DailyResult] = {}self.daily_df: DataFrame = None
回测过程中,会计算逐日盯市绩效结果。具体逻辑见下一篇文章。
回测准备
清除上一次回测数据
def clear_data(self) -> None:"""Clear all data of last backtesting."""self.strategy = Noneself.tick = Noneself.bar = Noneself.datetime = Noneself.stop_order_count = 0self.stop_orders.clear()self.active_stop_orders.clear()self.limit_order_count = 0self.limit_orders.clear()self.active_limit_orders.clear()self.trade_count = 0self.trades.clear()self.logs.clear()self.daily_results.clear()
若要连续使用同一个回测引擎做回测,则需要清除上一次回测数据。
设定参数
def set_parameters(self,vt_symbol: str,interval: Interval,start: datetime,rate: float,slippage: float,size: float,pricetick: float,capital: int = 0,end: datetime = None,mode: BacktestingMode = BacktestingMode.BAR,risk_free: float = 0,annual_days: int = 240,half_life: int = 120
) -> None:""""""self.mode = modeself.vt_symbol = vt_symbolself.interval = Interval(interval)self.rate = rateself.slippage = slippageself.size = sizeself.pricetick = pricetickself.start = startself.symbol, exchange_str = self.vt_symbol.split(".")self.exchange = Exchange(exchange_str)self.capital = capitalif not end:end = datetime.now()self.end = end.replace(hour=23, minute=59, second=59)self.mode = modeself.risk_free = risk_freeself.annual_days = annual_daysself.half_life = half_life
通过专门的set_parameters
函数,将上文提到的策略相关设置参数都设置好。
加载策略
def add_strategy(self, strategy_class: Type[CtaTemplate], setting: dict) -> None:""""""self.strategy_class = strategy_classself.strategy = strategy_class(self, strategy_class.__name__, self.vt_symbol, setting)
策略类型strategy_class
是类,再将策略实例化保存至strategy
。
再回头查看策略的初始化函数:
class CtaTemplate(ABC):def __init__(self,cta_engine: Any,strategy_name: str,vt_symbol: str,setting: dict,) -> None:""""""self.cta_engine: Any = cta_engineself.strategy_name: str = strategy_nameself.vt_symbol: str = vt_symbol
可以看到,在引擎中加载策略,传给cta_engine
的是引擎实例本身,这样,策略实例和引擎实例就建立起了关系;strategy_name
是策略类的名称。
加载数据
def load_data(self) -> None:""""""self.output(_("开始加载历史数据"))if not self.end:self.end = datetime.now()if self.start >= self.end:self.output(_("起始日期必须小于结束日期"))returnself.history_data.clear() # Clear previously loaded history data# Load 30 days of data each time and allow for progress updatetotal_days: int = (self.end - self.start).daysprogress_days: int = max(int(total_days / 10), 1)progress_delta: timedelta = timedelta(days=progress_days)interval_delta: timedelta = INTERVAL_DELTA_MAP[self.interval]start: datetime = self.startend: datetime = self.start + progress_deltaprogress = 0while start < self.end:progress_bar: str = "#" * int(progress * 10 + 1)self.output(_("加载进度:{} [{:.0%}]").format(progress_bar, progress))end: datetime = min(end, self.end) # Make sure end time stays within set rangeif self.mode == BacktestingMode.BAR:data: List[BarData] = load_bar_data(self.symbol,self.exchange,self.interval,start,end)else:data: List[TickData] = load_tick_data(self.symbol,self.exchange,start,end)self.history_data.extend(data)progress += progress_days / total_daysprogress = min(progress, 1)start = end + interval_deltaend += progress_deltaself.output(_("历史数据加载完成,数据量:{}").format(len(self.history_data)))
除去进度条的包装,该函数干的事情如下:
- 分时间段,一段一段地取数
- 判断回测频率类型
- 如果是K线频率,则调用
load_bar_data
函数加载数据 - 如果是Tick频率,则调用
load_tick_data
函数加载数据
- 如果是K线频率,则调用
- 将数据
data
合并到历史数据history_data
中
@lru_cache(maxsize=999)
def load_bar_data(symbol: str,exchange: Exchange,interval: Interval,start: datetime,end: datetime
) -> List[BarData]:""""""database: BaseDatabase = get_database()return database.load_bar_data(symbol, exchange, interval, start, end)@lru_cache(maxsize=999)
def load_tick_data(symbol: str,exchange: Exchange,start: datetime,end: datetime
) -> List[TickData]:""""""database: BaseDatabase = get_database()return database.load_tick_data(symbol, exchange, start, end)
具体取数的函数如上,都是链接到数据库database
,从database
中取一段数据。
functools
模块中@lru_cache
是非常实用的装饰器,它实现了缓存,去重的功能。它将耗时的函数结果保存起来,避免传入相同的参数重复计算。LRU三个字母是 "Least Recently Used" 的缩写,表明缓存不会无限储存,一段时间不用,或者数量超出一定限制,旧缓存就会扔掉。
回测运行
主要逻辑函数
def run_backtesting(self) -> None:""""""if self.mode == BacktestingMode.BAR:func = self.new_barelse:func = self.new_tickself.strategy.on_init()self.strategy.inited = Trueself.output(_("策略初始化完成"))self.strategy.on_start()self.strategy.trading = Trueself.output(_("开始回放历史数据"))total_size: int = len(self.history_data)batch_size: int = max(int(total_size / 10), 1)for ix, i in enumerate(range(0, total_size, batch_size)):batch_data: list = self.history_data[i: i + batch_size]for data in batch_data:try:func(data)except Exception:self.output(_("触发异常,回测终止"))self.output(traceback.format_exc())returnprogress = min(ix / 10, 1)progress_bar: str = "=" * (ix + 1)self.output(_("回放进度:{} [{:.0%}]").format(progress_bar, progress))self.strategy.on_stop()self.output(_("历史数据回放结束"))
-
根据回测频率模式的不同,将回调函数设为
new_bar
和new_tick
,它们会不断地对给定的新数据做出反应。 -
在
self.strategy.on_init()
中,调用了策略的初始化函数,再回顾DoubleMaStrategy
中on_init
的定义:def on_init(self):"""Callback when strategy is inited."""self.write_log("策略初始化")self.load_bar(10)
发现
on_init
调用了load_bar
,再回顾CtaTemplate
中load_bar
的定义:def load_bar(self,days: int,interval: Interval = Interval.MINUTE,callback: Callable = None,use_database: bool = False ) -> None:"""Load historical bar data for initializing strategy."""if not callback:callback: Callable = self.on_barbars: List[BarData] = self.cta_engine.load_bar(self.vt_symbol,days,interval,callback,use_database)for bar in bars:callback(bar)
发现又调用回了
BacktesingEngine
中的load_bar
函数:def load_bar(self,vt_symbol: str,days: int,interval: Interval,callback: Callable,use_database: bool ) -> List[BarData]:""""""self.callback = callbackinit_end = self.start - INTERVAL_DELTA_MAP[interval]init_start = self.start - timedelta(days=days)symbol, exchange = extract_vt_symbol(vt_symbol)bars: List[BarData] = load_bar_data(symbol,exchange,interval,init_start,init_end)return bars
最后,归根结底,还是调用了前面提到的
load_bar_data
函数。不过值得注意的是,这里并不是将数据加载到
history_data
中,而是将数据预先加载到策略的ArrayManager
中,因为ArrayManager
需要预先填满,才能完成初始化。 -
在
self.strategy.on_start()
中,启动策略。 -
分批处理数据
for data in batch_data:try:func(data)
回调函数
具体来看回调函数new_bar
和new_tick
:
def new_bar(self, bar: BarData) -> None:""""""self.bar = barself.datetime = bar.datetimeself.cross_limit_order()self.cross_stop_order()self.strategy.on_bar(bar)self.update_daily_close(bar.close_price)def new_tick(self, tick: TickData) -> None:""""""self.tick = tickself.datetime = tick.datetimeself.cross_limit_order()self.cross_stop_order()self.strategy.on_tick(tick)self.update_daily_close(tick.last_price)
收到新Bar数据后:
- 调用
cross_limit_order()
函数,这是撮合限价订单 - 调用
cross_stop_order()
函数,这是撮合停止订单 - 再调用策略的回调函数
strategy.on_bar(bar)
,发出新的订单
限价单模拟撮合成交
一般来说,以K线频率交易的回测逻辑是前一根K线的收盘价发出交易信号,在下一根K线的时候以开盘价买入。
def cross_limit_order(self) -> None:"""Cross limit order with last bar/tick data."""if self.mode == BacktestingMode.BAR:long_cross_price = self.bar.low_priceshort_cross_price = self.bar.high_pricelong_best_price = self.bar.open_priceshort_best_price = self.bar.open_priceelse:long_cross_price = self.tick.ask_price_1short_cross_price = self.tick.bid_price_1long_best_price = long_cross_priceshort_best_price = short_cross_price
这一部分的定义反映了上面的逻辑。如果是以Tick频率回测,则最优卖价和买价就是订单簿当前的买一价和卖一价。
显然,以Tick频率回测的准确度更高,可以做日内择时,也可以实现T+0策略实现低买高卖。
接下来对活跃限价订单字典中的订单列表进行循环:
for order in list(self.active_limit_orders.values()):# Push order update with status "not traded" (pending).if order.status == Status.SUBMITTING:order.status = Status.NOTTRADEDself.strategy.on_order(order)
这一步,将上一时刻提交的订单的状态,从SUBMITTING
转换为NOTTRADED
。
# Check whether limit orders can be filled.long_cross: bool = (order.direction == Direction.LONGand order.price >= long_cross_priceand long_cross_price > 0)short_cross: bool = (order.direction == Direction.SHORTand order.price <= short_cross_priceand short_cross_price > 0)if not long_cross and not short_cross:continue
分买和卖两个方向,进行撮合:
- 买:只要订单的限价
order.price
大于等于K线的最低价,这笔订单就可以成交,long_cross=True
- 卖:只要订单的限价
order.price
小于等于K线的最高价,这笔订单就可以成交,short_cross=True
- 如果两个方向都无法实现成交,则跳过这个订单
# Push order udpate with status "all traded" (filled).order.traded = order.volumeorder.status = Status.ALLTRADEDself.strategy.on_order(order)if order.vt_orderid in self.active_limit_orders:self.active_limit_orders.pop(order.vt_orderid)# Push trade updateself.trade_count += 1
如果在上一步发现订单可以成交:
- 将订单的成交额设为订单额
order.traded = order.volume
- 将订单的状态设为全部成交
order.status = Status.ALLTRADED
- 在活跃限价订单簿中删除这个订单
active_limit_orders.pop(order.vt_orderid)
- 成交笔数加一
trade_count += 1
最后,将成交信息整理一下:
- 仓位的变化:
pos_change
- 成交数据保存:
trade
- 将成交数据添加进成交列表
trades
中
if long_cross:trade_price = min(order.price, long_best_price)pos_change = order.volumeelse:trade_price = max(order.price, short_best_price)pos_change = -order.volumetrade: TradeData = TradeData(symbol=order.symbol,exchange=order.exchange,orderid=order.orderid,tradeid=str(self.trade_count),direction=order.direction,offset=order.offset,price=trade_price,volume=order.volume,datetime=self.datetime,gateway_name=self.gateway_name,)self.strategy.pos += pos_changeself.strategy.on_trade(trade)self.trades[trade.vt_tradeid] = trade
这里的trades
实际上就是交割单。必须通过交割单才能生成绩效统计分析。
停止单模拟撮合成交
停止单有如下两种使用场景:
- 提前下停止单,一旦价格回落到预设价位以下,就买入
- 提前下停止单,一旦价格回升到预设价位以上,就卖出
据此逻辑,long_cross_price
和short_cross_price
跟上面的限价订单相反。此外,当满足下单逻辑时,停止订单就转换为限价订单,其余逻辑与限价订单类似。
def cross_stop_order(self) -> None:"""Cross stop order with last bar/tick data."""if self.mode == BacktestingMode.BAR:long_cross_price = self.bar.high_priceshort_cross_price = self.bar.low_pricelong_best_price = self.bar.open_priceshort_best_price = self.bar.open_priceelse:long_cross_price = self.tick.last_priceshort_cross_price = self.tick.last_pricelong_best_price = long_cross_priceshort_best_price = short_cross_pricefor stop_order in list(self.active_stop_orders.values()):# Check whether stop order can be triggered.long_cross: bool = (stop_order.direction == Direction.LONGand stop_order.price <= long_cross_price)short_cross: bool = (stop_order.direction == Direction.SHORTand stop_order.price >= short_cross_price)if not long_cross and not short_cross:continue# Create order data.self.limit_order_count += 1order: OrderData = OrderData(symbol=self.symbol,exchange=self.exchange,orderid=str(self.limit_order_count),direction=stop_order.direction,offset=stop_order.offset,price=stop_order.price,volume=stop_order.volume,traded=stop_order.volume,status=Status.ALLTRADED,gateway_name=self.gateway_name,datetime=self.datetime)self.limit_orders[order.vt_orderid] = order# Create trade data.if long_cross:trade_price = max(stop_order.price, long_best_price)pos_change = order.volumeelse:trade_price = min(stop_order.price, short_best_price)pos_change = -order.volumeself.trade_count += 1trade: TradeData = TradeData(symbol=order.symbol,exchange=order.exchange,orderid=order.orderid,tradeid=str(self.trade_count),direction=order.direction,offset=order.offset,price=trade_price,volume=order.volume,datetime=self.datetime,gateway_name=self.gateway_name,)self.trades[trade.vt_tradeid] = trade# Update stop order.stop_order.vt_orderids.append(order.vt_orderid)stop_order.status = StopOrderStatus.TRIGGEREDif stop_order.stop_orderid in self.active_stop_orders:self.active_stop_orders.pop(stop_order.stop_orderid)# Push update to strategy.self.strategy.on_stop_order(stop_order)self.strategy.on_order(order)self.strategy.pos += pos_changeself.strategy.on_trade(trade)
订单管理
提交订单
在策略中,若触发了交易信号,就会调用交易函数。如前面所见,有四种方向。再回顾CtaTemplate
中的buy
函数:
def buy(self,price: float,volume: float,stop: bool = False,lock: bool = False,net: bool = False
) -> list:"""Send buy order to open a long position."""return self.send_order(Direction.LONG,Offset.OPEN,price,volume,stop,lock,net)
这里将订单信息组织好了,并发送给了send_order
函数。再来看CtaTemplate
中的send_order
函数:
def send_order(self,direction: Direction,offset: Offset,price: float,volume: float,stop: bool = False,lock: bool = False,net: bool = False
) -> list:"""Send a new order."""if self.trading:vt_orderids: list = self.cta_engine.send_order(self, direction, offset, price, volume, stop, lock, net)return vt_orderidselse:return []
send_order
函数实际上又调用了BacktestingEngine
中的send_order
函数。再看BacktestingEngine
中的send_order
函数:
def send_order(self,strategy: CtaTemplate,direction: Direction,offset: Offset,price: float,volume: float,stop: bool,lock: bool,net: bool
) -> list:""""""price: float = round_to(price, self.pricetick)if stop:vt_orderid: str = self.send_stop_order(direction, offset, price, volume)else:vt_orderid: str = self.send_limit_order(direction, offset, price, volume)return [vt_orderid]
它又将订单按照停止单、限价单两种类型,分别调用send_stop_order
和send_limit_order
进行处理。这两种订单发送函数的逻辑是一样的:
- 先按订单信息组织成订单数据结构
- 再将订单分别加入至活跃订单字典和订单字典中。
- 最后,返回订单的
orderid
def send_stop_order(self,direction: Direction,offset: Offset,price: float,volume: float
) -> str:""""""self.stop_order_count += 1stop_order: StopOrder = StopOrder(vt_symbol=self.vt_symbol,direction=direction,offset=offset,price=price,volume=volume,datetime=self.datetime,stop_orderid=f"{STOPORDER_PREFIX}.{self.stop_order_count}",strategy_name=self.strategy.strategy_name,)self.active_stop_orders[stop_order.stop_orderid] = stop_orderself.stop_orders[stop_order.stop_orderid] = stop_orderreturn stop_order.stop_orderiddef send_limit_order(self,direction: Direction,offset: Offset,price: float,volume: float
) -> str:""""""self.limit_order_count += 1order: OrderData = OrderData(symbol=self.symbol,exchange=self.exchange,orderid=str(self.limit_order_count),direction=direction,offset=offset,price=price,volume=volume,status=Status.SUBMITTING,gateway_name=self.gateway_name,datetime=self.datetime)self.active_limit_orders[order.vt_orderid] = orderself.limit_orders[order.vt_orderid] = orderreturn order.vt_orderid
撤销订单
与提交订单类似地,撤销订单在策略中被调用,但是最终调用的是BacktesingEninge
中的相关函数,由BacktesingEninge
完成订单管理。
撤销订单的逻辑也很简单:
- 将订单从活跃订单字典中去掉
- 将订单的状态设为
CANCELLED
但是依然可以在订单字典stop_orders
和limit_orders
中获取到所有提交过的订单信息。
def cancel_order(self, strategy: CtaTemplate, vt_orderid: str) -> None:"""Cancel order by vt_orderid."""if vt_orderid.startswith(STOPORDER_PREFIX):self.cancel_stop_order(strategy, vt_orderid)else:self.cancel_limit_order(strategy, vt_orderid)def cancel_stop_order(self, strategy: CtaTemplate, vt_orderid: str) -> None:""""""if vt_orderid not in self.active_stop_orders:returnstop_order: StopOrder = self.active_stop_orders.pop(vt_orderid)stop_order.status = StopOrderStatus.CANCELLEDself.strategy.on_stop_order(stop_order)def cancel_limit_order(self, strategy: CtaTemplate, vt_orderid: str) -> None:""""""if vt_orderid not in self.active_limit_orders:returnorder: OrderData = self.active_limit_orders.pop(vt_orderid)order.status = Status.CANCELLEDself.strategy.on_order(order)def cancel_all(self, strategy: CtaTemplate) -> None:"""Cancel all orders, both limit and stop."""vt_orderids: list = list(self.active_limit_orders.keys())for vt_orderid in vt_orderids:self.cancel_limit_order(strategy, vt_orderid)stop_orderids: list = list(self.active_stop_orders.keys())for vt_orderid in stop_orderids:self.cancel_stop_order(strategy, vt_orderid)