天弘基金:基于 DolphinDB 订单簿引擎的毫秒级决策体系
业务场景与需求
作为国内领先的基金管理公司,天弘基金量化团队在管理高频策略时面临核心挑战:交易所提供的3秒频率订单簿无法满足科创板及创业板标的的微观结构分析需求。尤其在价格笼子机制下,瞬时有效报价常在快照间隙消失,导致传统信号捕捉存在盲区。为此,天弘基金引入DolphinDB订单簿引擎,构建毫秒级高频订单簿实时合成体系。
技术方案设计
(一) 引擎核心能力适配
- 多市场规则支持通过exchange="XSHE"参数启用深交所股票处理逻辑,自动应用创业板价格笼子规则配置exchange="XSHG"处理上交所标的,兼容本方最优等特殊委托类型
- 乱序数据容错机制启用orderBySeq=true参数,依据ApplSeqNum字段重建数据时序实测可处理单通道15%数据乱序率,保障合成准确性
(二) 数据治理标准化
- 异构数据整合深交所逐笔委托补全BuyNo/SellNo为零值,匹配引擎输入规范上交所市价单类型映射(原始值49→限价单,50→本方最优)
- 实时流处理架构部署4个INSIGHT插件实例分通道订阅数据构建共享流表orderTrans,日均吞吐量达1.2亿条逐笔记录
示例:合成 1 秒频率订单簿
创建订单簿引擎
首先创建订单簿引擎,以下脚本中名为“demo”的订单簿引擎的功能是每1秒计算输出深交所股票10档订单簿。
// 定义引擎参数outputTable,即指定输出表
suffix = string(1..10)
colNames = `SecurityID`timestamp`lastAppSeqNum`tradingPhaseCode`modified`turnover`volume`tradeNum`totalTurnover`totalVolume`totalTradeNum`lastPx`highPx`lowPx`ask`bid`askVol`bidVol`preClosePx`abnormal join ("bids" + suffix) join ("bidVolumes" + suffix) join ("bidOrderNums" + suffix) join ("asks" + suffix) join ("askVolumes" + suffix) join ("askOrderNums" + suffix)
colTypes = [SYMBOL,TIMESTAMP,LONG,INT,BOOL,DOUBLE,LONG,INT,DOUBLE,LONG,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,LONG,DOUBLE,BOOL] join take(DOUBLE, 10) join take(LONG, 10) join take(INT, 10) join take(DOUBLE, 10) join take(LONG, 10) join take(INT, 10)
outTable = table(1:0, colNames, colTypes)
// 定义引擎参数dummyTable,即指定输入表的表结构
colNames = `SecurityID`Date`Time`SourceType`Type`Price`Qty`BSFlag`BuyNo`SellNo`ApplSeqNum`ChannelNo
colTypes = [SYMBOL, DATE, TIME, INT, INT, LONG, LONG, INT, LONG, LONG, LONG, INT]
dummyOrderTrans = table(1:0, colNames, colTypes)
// 定义引擎参数inputColMap,即指定输入表各字段的含义
inputColMap = dict(`codeColumn`timeColumn`typeColumn`priceColumn`qtyColumn`buyOrderColumn`sellOrderColumn`sideColumn`msgTypeColumn`seqColumn, `SecurityID`Time`Type`Price`Qty`BuyNo`SellNo`BSFlag`SourceType`ApplSeqNum)
// 定义引擎参数prevClose,即昨日收盘价,prevClose不影响最终的输出结果中除昨日收盘价以外的其他字段
prevClose = dict(`000400.SZ`300274.SZ`300288.SZ`300122.SZ`300918.SZ, [1.1, 2.2, 3.3, 4.4, 5.5])
// 创建引擎,每1s计算输出深交所股票10档订单簿
engine = createOrderBookSnapshotEngine(name="demo", exchange="XSHE", orderbookDepth=10, intervalInMilli=1000, date=2022.01.10, startTime=09:30:00.000, prevClose=prevClose, dummyTable=dummyOrderTrans, inputColMap=inputColMap, outputTable=outTable, orderBySeq=false)通过以下脚本可以释放掉引擎:
dropStreamEngine("demo")
历史批计算调用
DolphinDB 流计算引擎均实现了数据表(table),可以通过 tableInsert 或者 append! 函数,向订单簿引擎批量注入处理好的输入数据实现批计算。
(1)深交所股票
输入数据
t = select * from loadText("./orderTrans.csv") order by ApplSeqNum
getStreamEngine("demo").append!(t)输出结果表 outTable 如下:

(2)上交所股票
创建订单簿引擎时,指定 exchange="XSHG" 即可计算上交所股票的订单簿。
engine = createOrderBookSnapshotEngine(name="demo", exchange="XSHG", orderbookDepth=10, intervalInMilli=1000, date=2022.01.10, startTime=09:30:00.000, prevClose=prevClose, dummyTable=dummyOrderTrans, inputColMap=inputColMap, outputTable=outTable, orderBySeq=false)输入数据
实时流计算调用
通过 subscribeTable 函数订阅共享流数据表 orderTrans,handler 可以直接指定为订单簿引擎。
subscribeTable(tableName="orderTrans", actionName="orderbookDemo", handler=getStreamEngine("demo"), msgAsTable=True)
- 需要保证表 orderTrans 中最多仅包含一个通道的逐笔数据;
- 若需要合成沪深股票两种订单簿,则需要创建不同的订单簿引擎;
合成包含衍生指标(如逐笔成交明细)的订单簿
首先创建订单簿引擎,以下脚本中名为“demo”的订单簿引擎的功能是每1秒计算输出深交所股票10档订单簿,并且扩展了逐笔成交明细字段。
// 获取订单簿引擎的输出表结构
depth = 10
orderBookAsArray =true
outputColMap, outputTableSch = genOutputColumnsForOBSnapshotEngine(basic=true, time=true, depth=(depth, orderBookAsArray), tradeDetail=true, orderDetail=false, withdrawDetail=false, orderBookDetailDepth=0, prevDetail=false)
// 定义引擎参数outputTable,即指定输出表
outTable = table(1:0, outputTableSch.schema().colDefs.name, outputTableSch.schema().colDefs.typeString)
// 定义引擎参数dummyTable,即指定输入表的表结构
colNames = `SecurityID`Date`Time`SourceType`Type`Price`Qty`BSFlag`BuyNo`SellNo`ApplSeqNum`ChannelNo`ReceiveTime
colTypes = [SYMBOL, DATE, TIME, INT, INT, LONG, LONG, INT, LONG, LONG, LONG, INT, NANOTIMESTAMP]
dummyOrderTrans = table(1:0, colNames, colTypes)
// 定义引擎参数inputColMap,即指定输入表各字段的含义
inputColMap = dict(`codeColumn`timeColumn`typeColumn`priceColumn`qtyColumn`buyOrderColumn`sellOrderColumn`sideColumn`msgTypeColumn`seqColumn`receiveTime, `SecurityID`Time`Type`Price`Qty`BuyNo`SellNo`BSFlag`SourceType`ApplSeqNum`ReceiveTime)
// 定义引擎参数prevClose,即昨日收盘价,prevClose不影响最终的输出结果中除昨日收盘价以外的其他字段
prevClose = dict(`000400.SZ`300274.SZ`300288.SZ`300122.SZ`300918.SZ, [1.1, 2.2, 3.3, 4.4, 5.5])
// 创建引擎,每1s计算输出深交所股票10档订单簿
engine = createOrderBookSnapshotEngine(name="demo", exchange="XSHE", orderbookDepth=depth, intervalInMilli=1000, date=2022.01.10, startTime=09:30:00.000, prevClose=prevClose, dummyTable=dummyOrderTrans, inputColMap=inputColMap, outputTable=outTable, orderBySeq=false, outputColMap=outputColMap, orderBookAsArray=orderBookAsArray)接着,历史数据注入订单簿引擎与上一小节不同,本节的输入数据多了一列 ReceiveTime,表示每条逐笔数据的接收时间。
t = select * from loadText("./orderTrans.csv") order by ApplSeqNum update t set ReceiveTime = now(true) // 构造接收时间列 getStreamEngine("demo").append!(t)
输出结果表 outTable 如下:

合成包含用户自定义指标的订单簿
首先创建订单簿引擎,以下脚本中名为的订单簿引擎的功能是每 1 秒计算输出深交所股票 10 档订单簿,并且扩展了 4 个用户自定义指标,指标定义如下:
| 指标名称 | 含义 |
|---|---|
| AvgBuyDuration | 过去 1 秒内,成交中买方的平均挂单时长 |
| AvgSellDuration | 过去 1 秒内,成交中卖方的平均挂单时长 |
| BuyWithdrawQty | 过去 1 秒内,买方撤单的总量 |
| SellWithdrawQty | 过去 1 秒内,卖方撤单的总量 |
// 定义订单簿深度等
depth = 10
orderBookAsArray =true
outputColMap = genOutputColumnsForOBSnapshotEngine(basic=true, time=false, depth=(depth, orderBookAsArray), tradeDetail=true, orderDetail=false, withdrawDetail=true, orderBookDetailDepth=0, prevDetail=false)[0]
// 定义引擎参数 dummyTable,即指定输入表的表结构
colNames = `SecurityID`Date`Time`SourceType`Type`Price`Qty`BSFlag`BuyNo`SellNo`ApplSeqNum`ChannelNo
colTypes = [SYMBOL, DATE, TIME, INT, INT, LONG, LONG, INT, LONG, LONG, LONG, INT]
dummyOrderTrans = table(1:0, colNames, colTypes)
// 定义引擎参数 inputColMap,即指定输入表各字段的含义
inputColMap = dict(`codeColumn`timeColumn`typeColumn`priceColumn`qtyColumn`buyOrderColumn`sellOrderColumn`sideColumn`msgTypeColumn`seqColumn, `SecurityID`Time`Type`Price`Qty`BuyNo`SellNo`BSFlag`SourceType`ApplSeqNum)
// 定义引擎参数 prevClose,即昨日收盘价,prevClose不影响最终的输出结果中除昨日收盘价以外的其他字段
prevClose = dict(STRING, DOUBLE)
//// 定义用户自定义因子
def userDefinedFunc(t){
AvgBuyDuration = rowAvg(t.TradeMDTimeList-t.TradeOrderBuyNoTimeList).int()
AvgSellDuration = rowAvg(t.TradeMDTimeList-t.TradeOrderSellNoTimeList).int()
BuyWithdrawQty = rowSum(t.WithdrawBuyQtyList)
SellWithdrawQty = rowSum(t.WithdrawSellQtyList)
return (AvgBuyDuration, AvgSellDuration, BuyWithdrawQty, SellWithdrawQty)
}
// 定义订单簿引擎的输出表
outputTableSch = genOutputColumnsForOBSnapshotEngine(basic=true, time=false, depth=(depth, orderBookAsArray), tradeDetail=false, orderDetail=false, withdrawDetail=false, orderBookDetailDepth=0, prevDetail=false)[1]
colNames = outputTableSch.schema().colDefs.name join (`AvgBuyDuration`AvgSellDuration`BuyWithdrawQty`SellWithdrawQty)
colTypes = outputTableSch.schema().colDefs.typeString join (`INT`INT`INT`INT)
outTable = table(1:0, colNames, colTypes)
// 创建引擎,每1s计算输出深交所股票10档订单簿
try{dropStreamEngine(`demo)} catch(ex){}
engine = createOrderBookSnapshotEngine(name="demo", exchange="XSHE", orderbookDepth=depth, intervalInMilli=1000, date=2022.01.10, startTime=09:30:00.000, prevClose=prevClose, dummyTable=dummyOrderTrans, inputColMap=inputColMap, outputTable=outTable, outputColMap=outputColMap, orderBookAsArray=orderBookAsArray, userDefinedMetrics=userDefinedFunc)接着,用历史数据注入订单簿引擎。
t = select * from loadText("./orderTrans.csv") order by ApplSeqNum getStreamEngine("demo").append!(t)
输出结果表 outTable 如下,红框部分为用户自定义指标:

行业实践启示
通过DolphinDB订单簿引擎的深度应用,天弘基金实现三大突破:
- 技术自主性:构建全自研高频数据处理管道,摆脱第三方系统依赖
- 监管合规性:完整保留ApplSeqNum时序,满足穿透式监管审计要求
- 业务扩展性:同一引擎支持股票、可转债等多品种订单簿合成