DolphinDB CTP 行情插件最佳实践指南

本文提供《DolphinDB CTP 行情插件最佳实践指南》的标题信息,并包含作者署名与发布日期信息。

Source: https://dolphindb.cn/blogs/149

What this page covers

技能认证特训营第二期报名提示

页面顶部包含活动报名入口与福利优惠提示。

DolphinDB CTP 行情插件最佳实践指南(标题与作者日期)

文章展示标题,并提供作者署名与发布日期信息。

背景与适用范围(CTP 介绍、插件能力、环境要求与兼容警告)

介绍 CTP 系统、DolphinDB CTP 行情插件能力、推荐 SDK 版本、文章目标与运行环境限制,并提示 CTP/CTPMini 不兼容风险。

基本使用介绍

说明节点启动后通过 DolphinDB 客户端连接并执行示例代码的基本方式。

安装插件(登录、查看插件版本、安装)

给出安装前登录、查询远程插件版本信息以及安装 ctp 插件的步骤与返回路径说明。

加载插件(loadPlugin、重复加载处理、重启说明)

说明如何加载 ctp 插件、首次加载返回内容、重复加载报错及 try-catch 处理方式,并提示重启后需重新加载。

通过 CTP 行情插件将实时行情数据写入分布式数据库(总览与流程)

以多个交易所期货期权行情为例,给出从订阅到流表、标准化处理到落盘分布式数据库的整体流程与关键注意事项。

参数配置(连接配置、流表与入库配置)

给出 CTP 连接参数、订阅合约列表来源、流表容量与表名、目标库表名等配置,并解释关键配置项含义。

清理环境(可选)

提供取消订阅与删除流表的脚本以便示例可重复执行。

建立 CTP 连接

使用 ctp::connect 连接行情服务器并返回句柄。

创建库表(原始流表、分布式库表、标准化流表)

创建并持久化共享流表、建立分布式数据库与分区表结构,以及创建与分布式表结构一致的标准化持久化流表并可选设置过滤列。

订阅流数据表并写入分布式数据库(subscribeTable、handler 标准化函数)

先订阅原始行情流表做预处理写入标准化流表,再订阅标准化流表实时写入分布式数据库,并解释 batchSize、throttle、handler 等参数及标准化示例。

订阅 CTP 行情并写入流数据表

调用 ctp::subscribe 订阅合约范围,行情数据写入指定流表。

查询行情接收情况(ctp::getStatus)

使用 ctp::getStatus 查询接收状态并解释关键字段含义。

节点启动时自动订阅部署(startup.dos 与 startup 参数)

展示 DolphinDB 启动流程,并说明通过配置 startup 参数执行 startup.dos 以实现节点启动时自动订阅部署及注意事项。

常见问题解答(FAQ)

回答合约代码获取、原始行情字段处理与去重、以及定时任务设置与启动时预加载插件等问题。

附录(参考文档与脚本资源)

提供启动脚本与自动订阅教程链接,并给出相关脚本与 CSV 资源入口。

Facts Index

Entity Attribute Value Confidence
文章 发布日期 2025-02-17 high
综合交易平台(CTP) 开发方与用途 上海期货信息技术有限公司为期货公司开发的期货经纪业务管理系统,支持国内商品期货和股指期货的交易结算,并可自动生成/报送保证金监控文件和反洗钱监控文件。 high
CTP 插件 实现基础与推荐 CTP C++ API 版本 基于上期所官方提供的 CTP C++ API 实现;推荐版本号 v6.6.8 和 v6.6.9。 high
DolphinDB CTP 行情插件 能力 可对接 CTP 系统,订阅期货市场实时行情数据并查询商品信息数据。 high
DolphinDB CTP 行情插件 支持的 SDK/版本类型 基于多种 CTP SDK 开发,目前支持 CTP 与 CTPMini 两种。 high
本文代码运行环境 DolphinDB server 与插件版本要求 需要运行在 2.00.11 或更高版本的 DolphinDB server 以及插件上。 high
本文代码运行环境 操作系统支持 目前仅支持 Linux 系统。 high
DolphinDB 客户端 可用于连接节点的示例客户端 GUI、VS Code、Web UI 等客户端可连接节点执行示例代码。 high
插件安装 前置条件 安装插件前需要登录有创建权限的账号;示例登录默认管理员账号 admin/123456。 high
listRemotePlugins 用途 用于查看当前 DolphinDB 所支持的插件版本信息(可查看所有插件或指定 ctp)。 high
listRemotePlugins("ctp") 返回结果 表结构说明 返回值为 1 行 2 列的表格,分别是插件名及其对应的版本信息。 high
ctp 插件版本信息(图示说明) 版本号示例 图示中 ctp 插件对应版本号为 2.00.12.5。 medium
installPlugin("ctp") 作用与下载内容 下载与当前 server 版本适配的 CTP 插件文件,包括插件描述文件与二进制文件。 high
installPlugin("ctp") 成功返回值 正常返回代表下载成功;返回值为插件描述文件 PluginCtp.txt 的安装路径(示例:/path_to_dolphindb_server/server/plugins/ctp/PluginCtp.txt)。 high
installPlugin 网络行为与耗时提示 从远程文件服务器拉取插件文件到 server 所在服务器;需要一定时间,提示大约一分钟。 high
loadPlugin("ctp") 用途 在脚本中调用插件接口前需要先加载插件;可用插件名或相对/绝对路径加载。 high
loadPlugin("ctp") 首次加载返回 返回内容说明 首次加载成功后返回 CTP 插件提供的所有函数(以 VS Code 示例)。 medium
loadPlugin("ctp") 重复加载 错误行为 重复执行会抛出模块已被使用错误:The module [ctp] is already in use. high
try-catch 用途(加载插件时) 可捕获插件已加载错误,避免中断后续脚本执行;示例:try{ loadPlugin("ctp") } catch(ex){print ex} high
CTP 插件加载 重启影响 若节点重启,则需要重新加载插件。 high
示例场景 行情来源范围 示例订阅中国金融期货交易所、上海期货交易所、广州期货交易所、郑州商品交易所、大连商品交易所、上海国际能源交易中心的期货期权交易数据并实时写入分布式数据库。 high
数据接入流程 步骤概述 订阅实时交易数据写入 ctpMarketDataStream 持久化流表;订阅后进行标准化处理写入 ctpMarketStream 持久化流表;再订阅 ctpMarketStream 写入分布式数据库落盘。 high
CTP 与 CTPMini 兼容性警告 API 与后台系统不互相兼容,可能导致 crash;避免用 CTP 版本插件连 CTPMini 后台或用 CTPMini 插件连 CTP 后台。 high
CTP 连接参数 ip 类型与含义 STRING 标量或向量,表示 CTP 行情服务器 IP 地址。 high
CTP 连接参数 port 类型与含义 INT 标量或向量,表示 CTP 行情服务器端口;若为向量长度应与 ip 向量长度相等。 high
CTP 连接参数 config 数据结构与取值类型 字典:key 为 STRING 标量,value 为 BOOL 标量。 high
config.ConcatTime 开启效果 为 true 时合并 UpdateTime 和 UpdateMillisec 为 tradeTime 字段置于表末尾,类型 TIME。 high
config.ReceivedTime 开启效果 为 true 时将接收到数据的时间作为字段置于表末尾,类型 NANOTIMESTAMP。 high
config.OutputElapsed 开启效果 为 true 时将数据进入插件到插入流表前的时间间隔作为字段置于表末尾,类型 LONG,单位纳秒。 high
订阅合约参数 ids 类型与含义 STRING 向量,由合约 ID 组成,表示需要订阅的合约范围。 high
future_basic.csv 路径 相对路径基准 ./future_basic.csv 相对于节点主目录;可用 getHomeDir 获取节点主目录。 high
marketTBCapacity 含义 行情流表预分配容量大小,表示流数据表在内存中最多保留多少行;可根据机器内存配置。 high
marketTBCapacity 示例值 配置值 2000000 high
原始行情持久化流表 表名/类型示例 表名 ctpMarketDataStream;订阅类型 marketData。 high
标准化行情持久化流表 表名示例 ctpMarketStream high
标准化数据入库目标 数据库名与表名示例 数据库 dfs://ctp_market_data;表 ctp_market。 high
环境清理脚本 操作 使用 ops 库;对原始/标准化流表分别执行 unsubscribeAll 与 dropStreamTable(force=true)。 high
ctp::connect 用途与返回 创建与 CTP 行情服务器连接并返回句柄;示例 conn = ctp::connect(ip, port, config)。 high
ctp::getSchema + enableTableShareAndPersistence 用途(原始行情流表) 用 ctp::getSchema 获取原始行情表结构,再用 enableTableShareAndPersistence 共享并创建持久化流表。 high
enableTableShareAndPersistence 配置前置条件 节点启动前需在配置文件(单节点 dolphindb.cfg;集群 cluster.cfg)指定 persistenceDir。 high
enableTableShareAndPersistence.cacheSize 作用 控制建表时预分配内存大小及流表内存最大行数;较大 cacheSize 可降低峰值时延频率;示例使用 marketTBCapacity。 high
分布式数据库示例 分区与引擎设置 示例创建 dfs://ctp_market_data:按天 VALUE(2024.01.01..2024.01.31) 与按代码 HASH([SYMBOL, 40]) 分区;engine='TSDB';atomic='CHUNK'。 high
分布式表 ctp_market 分区列与排序列设置 分区列 trade_date 与 unified_code;sortColumns=[`unified_code, `action_date, `data_time];keepDuplicates=ALL。 high
五档量价存储类型 字段类型选择 将 bid_price、bid_volume、ask_price、ask_volume 存为 ArrayVector:分别为 DOUBLE[]、LONG[]、DOUBLE[]、LONG[]。 high
标准化持久化流表 创建方式 通过分布式表 schema 获取列名/类型,创建结构一致的 streamTable,并 enableTableShareAndPersistence;可选 setStreamTableFilterColumn(unified_code)。 high
setStreamTableFilterColumn 用途 为标准化流表设置过滤列 unified_code;下游订阅可指定 filter 仅接收指定标的范围数据。 high
subscribeTable(写库流程) 订阅链路 订阅 ctpMarketDataStream 进行标准化处理写入 ctpMarketStream;再订阅 ctpMarketStream 写入分布式表 ctp_market(loadTable 作为 handler)。 high
subscribeTable.batchSize 含义 正数表示未处理消息数达到 batchSize 才处理;未指定或非正数则每批消息到达后马上处理。 high
subscribeTable.throttle 含义与条件关系 单位秒,默认 1;在 batchSize 条件未达时,距离上次处理多久后再次处理;若未指定 batchSize 则 throttle 不起作用;满足 batchSize 或 throttle 任一条件即写库一次。 high
subscribeTable.handler 要求 必选参数;可以是一元/二元函数或数据表,用于处理订阅数据。 high
fixedLengthArrayVector 在标准化中的用途 将原始行情 BidPrice1-5(以及 BidVolume/AskPrice/AskVolume 的 1-5)合并为对应的数组列(DOUBLE[]/LONG[])。 high
ctp::subscribe 用途与入表行为 订阅指定范围合约数据,行情数据进入流数据表;示例 ctp::subscribe(conn, ctpSubType, objByName(ctpSubTBName), ids)。 high
ctp::getStatus 用途 用于查询 CTP 行情接收情况;示例 ctp::getStatus(conn)。 high
ctp::getStatus 返回字段含义(盘中启动) 说明 盘中启动时 firstMsgTime 与 lastMsgTime 不为空;lastMsgTime 表示收到最后一条数据的系统时刻;processedMsgCount 表示已处理的数据量。 medium
节点启动流程图(图3-1) 内容描述 展示节点从初始化、执行系统脚本、启动网络服务、运行用户启动脚本(startup.dos)与定时任务脚本的顺序。 medium
startup.dos 触发条件与配置位置 配置参数 startup 后才会执行;单节点在 dolphindb.cfg 配置,集群在 cluster.cfg 配置;可配置绝对或相对路径;相对路径或未指定目录时按 home/工作目录/可执行文件目录依次搜索。 high
startup 参数示例 配置值示例 startup=/DolphinDB/server/startup.dos high
自动订阅部署方式 操作步骤 将业务代码加入 /DolphinDB/server/startup.dos,并在配置文件中设置 startup 参数,即可完成节点启动时自动订阅部署。 high
自动订阅部署 注意事项 CTP 账户信息需要根据实际环境进行修改。 high
合约代码获取方案一 方法 使用 ctp::queryInstrument() 或 ctpMini::queryInstrument() 查询并返回包含所有合约代码的信息表,再 exec instrumentID 获取 ids。 high
queryInstrument 账号要求与限制 需要有效的 CTP 交易端账号;行情端账号无法获取结果;目前 CTPMini 中只有此函数。 high
合约代码获取方案二 替代方式 可自行构建合约信息表:通过读取 csv(如 2.1 配置)或将合约信息存在分布式表并查询获得。 high
合约静态信息表示例 库表结构与加载方式 示例创建 dfs://future_basic(VALUE 2024.01.01..2024.12.31)并建表 future_basic(trade_date DATE, trade_code SYMBOL, exchange SYMBOL),再从 ./future_basic.csv loadText 并 append!。 high
订阅合约范围获取(从静态表) 查询条件示例 从 dfs://future_basic.future_basic 中 where trade_date=today() 取 trade_code 作为 ids。 high
原始行情字段 ExchangeID 问题描述 原始行情数据中的 ExchangeID 都是空值。 high
补全 exchange_id 方法 处理方式 依赖静态信息表:可先将静态信息存为共享内存表,再在订阅回调函数中用 InstrumentID 与 trade_code 关联获取 exchange 信息。 high
共享静态信息表示例 实现方式 示例从 future_basic 表选 trade_code、exchange(where trade_date=today()),share 为 basicInfoTable。 high
trade_date 定义 含义 交易日期:前一天夜盘与当天日盘属于同一个完整交易日,因此属于同一个 trade_date。 high
action_date 定义 含义 实际日期:数据生成的真实日期;前一天夜盘使用前一天日期,当天日盘使用当天日期。 high
ActionDay 与 TradingDay 规则 差异性 不同交易所原始行情中 ActionDay 和 TradingDay 规则不一致,需要用户自行处理日期。 high
不同交易所日期规则示例(表4-1) 上期所/上期能源 TradingDay=2024.08.09;ActionDay=2024.08.08(示例:2024.08.08 21:00 夜盘)。 high
不同交易所日期规则示例(表4-1) 大商所 TradingDay=2024.08.09;ActionDay=2024.08.09。 high
不同交易所日期规则示例(表4-1) 郑商所 TradingDay=2024.08.08;ActionDay=2024.08.08。 high
temporalAdd 用途(处理交易所日期) 可按交易日历进行日期加减计算;示例 temporalAdd(trade_date, -1, `DCE) 表示按 DCE 交易日历找 trade_date 前一个交易日。 high
重复数据出现时间 现象描述 一般每天早上 6-8 点,CTP 服务器会重复发送前一天夜盘的行情。 medium
重复数据过滤方法 过滤条件与时间段示例 可用 tradeTime 与 receivedTime 作为过滤条件;示例仅在 20:40~03:00 和 08:40~15:30 两个时间段内 tradeTime 与 receivedTime 同时满足时才落库。 medium
tradeTime 与 receivedTime 字段 启用条件 为插件新增字段;建立连接时需设置 "ConcatTime": true 与 "ReceivedTime": true。 high
标准化处理完整示例 关键处理步骤 示例中对 msg 先按 tradeTime 与 receivedTime 过滤,再与 basicInfoTable 左连接补全 exchange 与 unified_code,并更新 DCE/CZCE 的 action_date/trade_date 规则后 append! 到标准化流表。 high
scheduleJob 用途(定时开关连接) 可设定 CTP 连接与关闭的定时任务以避免收盘后仍接收测试数据。 high
ctp_dailyclose 示例 实现方式 示例定义无参函数 ctp_dailyclose:连接后调用 ctp::close(conn) 并 try-catch。 high
scheduleJob 示例(daily_open) 时间与日期范围 jobId=daily_open;jobFunc=run{"startup.dos"};scheduleTime=[08:55m, 20:55m];startDate=2024.08.27;endDate=2025.08.27;frequency='D'。 high
scheduleJob 示例(daily_close) 时间与日期范围 jobId=daily_close;jobFunc=ctp_dailyclose;scheduleTime=[03:00m, 17:00m];startDate=2024.08.27;endDate=2025.08.27;frequency='D'。 high
定时任务与插件依赖 启动要求 定时任务初始化会用到 CTP 插件函数,因此需要启动时自动加载 CTP 插件。 high
preloadModules 配置目的与位置 可设置 preloadModules=plugins::ctp 让节点启动时自动加载 CTP 插件;单节点在 dolphindb.cfg 配置;集群在 controller.cfg 与 cluster.cfg 配置。 high
参考文档链接 启动脚本教程 https://docs.dolphindb.cn/zh/tutorials/Startup.html high
参考文档链接 节点启动时的流计算自动订阅教程 https://docs.dolphindb.cn/zh/tutorials/streaming_auto_sub.html high
资源链接 future_basic.csv https://docs.dolphindb.cn/zh/plugins/script/ctp_best_practice/future_basic.csv high
参考链接 CTP 文档说明 https://docs.dolphindb.cn/zh/plugins/ctp_2.html?hl=#%E5%AE%89%E8%A3%85%E6%8F%92%E4%BB%B6 high