DolphinDB历史数据回放教程
本页说明量化策略回测与实盘在事件驱动差异下可能需要两套代码,并提出通过将历史数据按“实时数据”方式回放到流表来复用同一套代码。
Source: https://dolphindb.cn/blogs/18
What this page covers
- 背景:回测与实盘代码复用的动机与问题描述。
- 流数据处理框架:发布-订阅-消费模式与 API 概览。
- replay:函数用途与关键参数说明。
- replayDS:将 SQL 转为数据源并按时间拆分的机制。
- 案例:回放金融数据并计算 ETF 价值的流程展示。
- 性能测试:服务器配置、脚本与回放耗时描述。
技能认证特训营第二期正式开启(限时报名)
页面顶部包含活动报名与优惠提示,并提供报名链接。
- 该活动提供报名入口链接。
- 活动信息以限时报名提示呈现。
DolphinDB历史数据回放教程(作者与日期)
页面展示文章标题、作者署名与发布日期信息。
- 作者署名为 Junxi。
- 发布日期为 2021-05-18。
背景:用历史数据以“实时数据”方式回测以复用代码
说明量化策略回测与实盘在事件驱动差异导致两套代码问题,并提出通过流表回放历史数据以复用同一套代码。
- 实盘处理实时数据通常是事件驱动。
- 回测使用历史数据时程序通常不是事件驱动。
- 同一策略可能需要两套代码,耗时且易出错。
- 可将历史数据按时间顺序以“实时数据”方式导入流表。
- 目标是用同一套代码进行回测与实盘交易。
DolphinDB 流数据处理框架与 API 概览
介绍发布-订阅-消费模型、订阅侧处理方式(自定义函数或聚合引擎)以及多语言流数据 API,并给出流数据教程链接。
- 流数据处理框架采用发布-订阅-消费模式。
- 订阅者可用自定义函数处理消息。
- 订阅者可用内置聚合引擎处理消息。
- 流数据接口支持 C++、C#、Java、Python 等语言。
- 页面提供流数据教程链接。
本文范围:介绍 replay / replayDS 并展示回放过程与应用场景
声明本文将介绍 replay 与 replayDS 两个函数并用金融数据展示数据回放。
- 本文将介绍 replay 函数。
- 本文将介绍 replayDS 函数。
- 本文包含金融数据回放示例。
函数介绍:replay
给出 replay 函数签名、用途与参数说明(输入/输出表、日期时间列、回放速率、并行度等)。
- 函数签名:replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])。
- replay 可将若干表或数据源回放到相应输出表。
- inputTables 可为单表或包含若干表/数据源的元组。
- 输入表与输出表数量需一致并一一对应。
- 每对输入/输出表需要相同结构。
- 不指定 dateColumn 时默认第一列为日期列。
- 若时间列同时包含日期与时间,可将 dateColumn 与 timeColumn 设为同一列。
- 系统根据日期/时间列设定决定回放最小时间精度与批次输出。
- replayRate 表示每秒回放条数,但批次机制可能导致实际速率偏离。
- parallelLevel 表示读取数据的并行度。
函数介绍:replayDS
给出 replayDS 函数签名、用途与参数说明(SQL 元代码、日期/时间列、时间再分区 schema 等)。
- 函数签名:replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])。
- replayDS 可将 SQL 查询转化为数据源,并与 replay 结合使用。
- replayDS 可按时间顺序将原始 SQL 拆分为若干小 SQL 查询。
- sqlObj 为 SQL 元代码,表示要回放的数据。
- 不指定 dateColumn 时默认第一列为日期列。
- 默认假设日期列是数据源的一个分区列,并据此拆分查询。
- timeRepartitionSchema 可为时间类型向量,用于时间维度拆分。
回放用法示例:单个内存表回放
说明单内存表回放所需参数并提供调用示例。
- 示例调用:replay(inputTable, outputTable, `date, `time, 10)。
- 示例中显式指定 dateColumn 与 timeColumn。
回放用法示例:使用 data source 的单表回放(replayDS + 并行度)
说明大表通过 replayDS 生成数据源后回放,并描述 pipeline 实现与并行读取优势,给出示例代码。
- 可先用 replayDS 将 SQL 转为数据源,再交由 replay 回放。
- replay 内部使用 pipeline 框架,将取数与输出分开执行。
- 当输入为 data source 时,多块数据可并行读取以减少输出等待。
- 示例并行度为 2 表示两个线程取数据。
- 示例代码包含 replayDS 构造 inputDS,并调用 replay(inputDS, outputTable, `date, `time, 1000, 2)。
回放用法示例:使用 data source 的多表回放
说明 replay 支持多表同时回放(元组输入/输出、结构一致、列存在要求)并给出示例。
- 多张输入表可用元组形式传给 replay。
- 需要分别指定与输入一一对应的输出表。
- 每对输入/输出表必须具有相同表结构。
- 若指定日期列或时间列,则所有表都应存在相应列。
- 示例代码展示对多个数据源 ds1/ds2/ds3 与多个输出表 out1/out2/out3 的 replay 调用。
取消回放
介绍通过 submitJob 的任务取消方式与直接调用时通过控制台任务取消方式,并给出相关函数调用示例。
- submitJob 提交的回放可用 getRecentJob(s) 获取 jobId。
- 获取 jobId 后可用 cancelJob 取消回放。
- 直接调用回放时,可在另一个 GUI session 用 getConsoleJobs 获取 jobId。
- 获取 jobId 后可用 cancelConsoleJob 取消回放任务。
如何使用回放数据(订阅与消费方式)
列出回放数据作为流数据后的三种订阅消费方式(脚本回调、内置流计算引擎、第三方客户端 API)并引用手册链接。
- 方式 1:在 DolphinDB 中订阅,并用脚本自定义回调函数消费流数据。
- 方式 2:在 DolphinDB 中订阅,并用内置流计算引擎处理数据。
- 方式 3:第三方客户端通过流数据 API 订阅与消费数据。
- 页面引用 createCrossSectionalAggregator 用户手册链接。
金融示例:回放美国股市一天 level1 数据并计算 ETF 价值
以 2007-08-17 美国股市 level1 数据为例,展示 quotes 表结构预览、数据划分回放、输出流表、权重与聚合函数、创建横截面聚合引擎与订阅、启动回放与查看输出。
- 示例回放 2007 年 8 月 17 日美国股市一天的 level1 交易数据。
- 示例数据存放在 dfs://TAQ 的 quotes 表。
- 示例展示 quotes 表的列名与类型(按页面展示顺序)。
- 示例提到一天 quotes 数据量为 336,305,414 条。
- 示例使用 replayDS 将数据按时间戳分为 62 个部分。
- 示例给出 cutPoints 与 replayDS 构造 rds 的代码。
- 示例创建输出流表 outQuotes。
- 示例定义 ETF 价值聚合函数 etfVal。
- 示例给出 ETF 权重字典 weights 的成分与权重。
- 订阅 outQuotes 时设置发布表过滤条件以减少网络开销与数据传输。
- 示例调用 setStreamTableFilterColumn(outQuotes, `symbol)。
- 示例创建横截面聚合引擎 createCrossSectionalAggregator。
- 示例订阅 outQuotes 并指定 symbol 列表。
- 示例通过 submitJob 启动回放并设置回放速率与并行度。
- 示例 outputTable 展示字段包含 time 与 etf。
性能测试
给出测试服务器配置、测试脚本,并报告在最快速率且无订阅时回放特定数据量的耗时范围。
- 测试服务器型号为 DELL PowerEdge R730xd。
- CPU 为 Intel Xeon(R) CPU E5-2650 v4(24核 48线程 2.20GHz)。
- 内存为 512 GB(32GB × 16, 2666 MHz)。
- 硬盘容量与构成为 17T HDD(1.7T × 10)。
- 硬盘读取速度为 222 MB/s。
- 硬盘写入速度为 210 MB/s。
- 网络为万兆以太网。
- 测试脚本中 submitJob 调用的 replayRate 为空,并行度为 4。
- 在最快速率且输出表无订阅时,回放 336,305,414 条数据耗时为 90~110 秒。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| 技能认证特训营第二期 | 报名链接 | https://www.qingsuyun.com/h5/e/217471/5/ | high |
| DolphinDB历史数据回放教程 | 发布日期 | 2021-05-18 | high |
| DolphinDB历史数据回放教程 | 作者署名 | Junxi | high |
| 量化策略回测与实盘代码复用 | 问题描述 | 实盘处理实时数据通常为事件驱动;回测使用历史数据时程序通常不是事件驱动;同一策略需要编写两套代码,耗时且易出错。 | medium |
| DolphinDB database 历史数据回放能力 | 实现方式 | 可将历史数据按时间顺序以“实时数据”的方式导入流数据表,从而使用同一套代码进行回测与实盘交易。 | high |
| DolphinDB 流数据处理框架 | 模式 | 发布-订阅-消费(publish-subscribe-consume)模式。 | high |
| DolphinDB 流数据处理(订阅者侧) | 处理方式 | 订阅者收到消息后可使用自定义函数或 DolphinDB 内置的聚合引擎处理消息。 | high |
| DolphinDB 流数据接口 | 支持语言 API | C++, C#, Java, Python 等。 | high |
| DolphinDB 流数据教程 | 链接 | https://github.com/dolphindb/Tutorials_CN/blob/master/streaming_tutorial.md | high |
| replay 函数 | 函数签名 | replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1]) | high |
| replay 函数 | 作用 | 将若干表或数据源同时回放到相应的输出表中,需要指定输入表/数据源、输出表、日期列、时间列、回放速度以及并行度。 | high |
| replay 参数 inputTables | 取值类型 | 单个表或包含若干表或数据源(见 replayDS)的元组。 | high |
| replay 参数 outputTables | 约束 | 单个表或包含若干个表的元组(通常为流数据表);输入表和输出表个数一致且一一对应;每对输入/输出表结构相同。 | high |
| replay 参数 dateColumn/timeColumn | 默认与特殊设置 | 若不指定则默认第一列为日期列;若时间列同时包含日期和时间,需要将 dateColumn 和 timeColumn 设为同一列。 | high |
| replay 回放批次机制 | 最小时间精度与批次输出 | 系统根据 dateColumn/timeColumn 的设定决定回放的最小时间精度;在该精度下同一时刻的数据在相同批次输出;例如表有日期列和时间列但只设置 dateColumn,则同一天数据会在一个批次输出。 | high |
| replay 参数 replayRate | 速率偏差说明 | replayRate 为每秒回放条数;因同一时刻数据同批次输出,当 replayRate 小于一个批次行数时,实际输出速率会大于 replayRate。 | high |
| replay 参数 parallelLevel | 含义与场景 | 表示读取数据的并行度;当源数据超过内存时需用 replayDS 将源数据划分为小数据源依次从磁盘读取并回放;多线程读取可提升读取速度。 | high |
| replayDS 函数 | 函数签名 | replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema]) | high |
| replayDS 函数 | 作用 | 将输入的 SQL 查询转化为数据源,与 replay 结合使用;根据输入表分区及 timeRepartitionSchema 将原始 SQL 按时间顺序拆分为若干小 SQL 查询。 | high |
| replayDS 参数 sqlObj | 含义 | SQL 元代码,表示回放的数据(例如 <select * from sourceTable>)。 | high |
| replayDS 参数 dateColumn | 默认与分区假设 | 若不指定默认第一列为日期列;replayDS 默认日期列是数据源的一个分区列,并根据分区信息将原始 SQL 拆分为多个查询。 | high |
| replayDS 参数 timeRepartitionSchema | 类型与拆分规则 | 时间类型向量(如 08:00:00 .. 18:00:00);若同时指定 timeColumn,则对 SQL 查询在时间维度上进一步拆分。 | high |
| 单个内存表回放 | 示例调用 | replay(inputTable, outputTable, `date, `time, 10) | high |
| 使用 data source 的单表回放 | 实现说明 | replay 内部使用 pipeline 框架,取数据与输出分开执行;当输入为 data source 时,多块数据可并行读取以避免输出线程等待;示例并行度为 2 表示两个线程取数据。 | medium |
| 使用 data source 的单表回放 | 示例代码 | inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay(inputDS, outputTable, `date, `time, 1000, 2) | high |
| 使用 data source 的多表回放 | 结构与列要求 | 多张输入表以元组传给 replay,并分别指定输出表;输出表与输入表一一对应;每对必须有相同表结构;若指定日期列或时间列,则所有表都应存在相应列。 | high |
| 使用 data source 的多表回放 | 示例代码 | ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2) | high |
| 取消回放(submitJob 提交) | 步骤 | 使用 getRecentJob(s) 获取 jobId,然后用 cancelJob 取消回放。 | high |
| 取消回放(直接调用) | 步骤 | 在另一个 GUI session 使用 getConsoleJobs 获取 jobId,然后使用 cancelConsoleJob 取消回放任务。 | high |
| 回放数据消费方式 | 方式 1 | 在 DolphinDB 中订阅,使用 DolphinDB 脚本自定义回调函数消费流数据。 | high |
| 回放数据消费方式 | 方式 2 | 在 DolphinDB 中订阅,使用内置流计算引擎处理(如时间序列聚合引擎、横截面聚合引擎、异常检测引擎等),并提到 3.2 中使用横截面聚合引擎计算 ETF 内在价值。 | medium |
| createCrossSectionalAggregator 用户手册 | 链接 | https://www.dolphindb.cn/cn/help/createCrossSectionalAggregator.html | high |
| 回放数据消费方式 | 方式 3 | 第三方客户端通过 DolphinDB 的流数据 API 订阅和消费数据。 | high |
| 金融示例 | 数据范围与类型 | 回放美国股市 2007 年 8 月 17 日的一天 level1 交易数据,并计算 ETF 价值。 | high |
| 示例数据位置 | 数据库与表 | 数据存放在分布式数据库 dfs://TAQ 的 quotes 表。 | high |
| quotes 表结构(列名与类型) | schema 示例 | name/typeString/typeInt 列表中包含:time SECOND;symbol SYMBOL;ofrsiz INT;ofr DOUBLE;mode INT;mmid SYMBOL;ex CHAR;date DATE;bidsize INT;bid DOUBLE(按页面展示顺序)。 | high |
| 示例数据量(一天 quotes 数据) | 行数 | 336,305,414 条。 | high |
| 示例:replayDS 时间切分 | 切分结果 | 使用 replayDS 并指定 timeRepartitionSchema 将数据按时间戳分为 62 个部分。 | high |
| 示例:数据划分代码 | replayDS 构造 | trs = cutPoints(09:30:00.001..18:00:00.001, 60);rds = replayDS(<select * from quotes>, `date, `time, trs) | high |
| 示例:输出流表 outQuotes | 创建方式 | share streamTable(100:0, sch.name, sch.type) as outQuotes | high |
| 示例:ETF 价值聚合函数 | 函数定义 | defg etfVal(weights, sym, price) { return wsum(price, weights[sym]) } | high |
| 示例:ETF 权重字典 weights | 成分与权重 | AAPL=0.1,IBM=0.1,MSFT=0.1,NTES=0.1,AMZN=0.1,GOOG=0.5。 | high |
| 示例订阅过滤 | 过滤目的 | 订阅 outQuotes 时设置发布表过滤条件,仅发布指定 symbol(AAPL、IBM、MSFT、NTES、AMZN、GOOG)到横截面聚合引擎,以减少不必要的网络开销和数据传输。 | medium |
| 示例:过滤列设置 | 调用 | setStreamTableFilterColumn(outQuotes, `symbol) | high |
| 示例:横截面聚合引擎创建 | 调用 | tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch) | high |
| 示例:订阅 outQuotes | 调用与订阅 symbol 列表 | subscribeTable(,"outQuotes","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true,,,,,`AAPL`IBM`MSFT`NTES`AMZN`GOOG) | high |
| 示例:回放任务提交 | 速率与并行度 | submitJob(... replay, [rds], [`outQuotes], `date, `time, 100000, 4):每秒回放 10 万条数据,并行度 4。 | high |
| 示例 outputTable 结果展示 | 样例输出字段 | outputTable 展示字段为 time 与 etf(示例查询 select top 15 * from outputTable)。 | high |
| 性能测试服务器 | 主机型号 | DELL PowerEdge R730xd | high |
| 性能测试服务器 | CPU | Intel Xeon(R) CPU E5-2650 v4(24核 48线程 2.20GHz) | high |
| 性能测试服务器 | 内存 | 512 GB(32GB × 16, 2666 MHz) | high |
| 性能测试服务器 | 硬盘容量与构成 | 17T HDD(1.7T × 10) | high |
| 性能测试服务器硬盘 | 读取速度 | 222 MB/s | high |
| 性能测试服务器硬盘 | 写入速度 | 210 MB/s | high |
| 性能测试服务器 | 网络 | 万兆以太网 | high |
| 性能测试脚本 | submitJob 调用参数特点 | submitJob(... replay, [rds], [`outQuotes], `date, `time, , 4):replayRate 为空(不设定回放速率),并行度 4。 | high |
| 性能测试结果 | 回放耗时 | 在不设定回放速率(最快速率)且输出表无任何订阅时,回放 336,305,414 条数据耗时仅需 90~110 秒。 | high |