金融行业是数据资源最丰富、对数据使用最深入的行业之一。海量的行情数据为金融机构的投资决策和风险管理提供了有力支持。万得宏汇行情系统,作为一款多市场整合的实时行情平台,提供了低延时、高质量的行情数据服务,并通过统一的数据接口标准帮助用户获取数据。
通常,数据库和这一行情数据系统对接时,都需要通过 API 接口来完成。用户需要自行编写代码进行数据接入、序列化和存储,这不仅需要较强的技术支持能力,还会增加开发和维护的成本。
为简化这一过程,DolphinDB 开发了 WindTDF 行情插件。该插件通过简洁的脚本调用,让用户能够快速将行情数据接入 DolphinDB,免去繁琐的编码步骤,从而降低技术门槛,提升效率。
WindTDF 插件基于万得宏汇提供的 TDF_C++ SDK 实现,通过行情回调函数将数据写入指定的 DolphinDB 共享流表中。目前,该插件已支持多种数据类型,包括上交所、深交所、中金所的股票、基金快照、逐笔交易数据及期货数据。
接下来,我们将以订阅沪深两市的全市场股票快照和逐笔数据为例,展示如何使用 WindTDF 插件将行情时据实时接入并存储至 DolphinDB 分布式数据库。
(完整案例可前往此链接查看:https://zhuanlan.zhihu.com/p/12893455530)
DolphinDB WindTDF 插件实践案例
首先,我们通过 WindTDF 插件订阅上海深圳市场的成交数据、委托数据和快照数据,并将这些数据写入 DolphinDB 的持久化流数据表中。接着,我们订阅这些流表中的数据,并将其写入 DolphinDB 的分布式数据库中进行持久存储。具体方案如下图所示:
以深交所为例
实现步骤
调用 WindTDF 插件将数据接入 DolphinDB 主要分为三步:安装并加载插件、创建表、订阅行情。在创建表时,需要涉及持久化流数据表和分布式库表的创建。有关流表的性能优化和分布式库表的分区方式如何设置可前往 DolphinDB 文档中心查看。(https://docs.dolphindb.cn/zh/index.html)
1. 安装并加载 WindTDF 插件
启动节点后,首先连接客户端到目标节点,并在 DolphinDB 客户端中执行 installPlugin
函数安装与当前 DolphinDB 服务器版本适配的 WindTDF 插件文件。插件安装完毕后,在脚本中调用 loadPlugin("WindTDF")
加载插件。
2. 创建流表和分布式表
由于万得 TD 系统在连接行情系统后无法修改订阅,所以需要在连接前预先设置所有数据的订阅。
首先,我们创建连接 handle:
handle = WindTDF::createHandle([HOST], [PORT], [USERNAME], [PASSWORD])
接着,我们分别调用以下函数获取快照、逐笔委托、逐笔成交三种表结构:
得到行情数据的表结构后,我们创建三种数据品类的持久化流数据表(以快照为例):
cacheSize = 1000000
snapshot_sh = streamTable(10000:0, snapshotSchema[`name], snapshotSchema[`typeString]);
enableTableShareAndPersistence(table=snapshot_sh,tableName=`snapshot_sh_s,cacheSize=cacheSize, preCache=cacheSize)
snapshot_sz = streamTable(10000:0, snapshotSchema[`name], snapshotSchema[`typeString]);
enableTableShareAndPersistence(table=snapshot_sz,tableName=`snapshot_sz_s,cacheSize=cacheSize, preCache=cacheSize)
以上代码中的 cacheSize
作为变量设置建表时预分配内存的大小以及流数据表可占用的最大内存,具体值可以根据实际的可使用内存大小决定,较大的 cacheSize
可以降低出现峰值时延的频率。
为将行情数据存入分布式数据库,我们需要根据行情数据表结构创建分布式库表。本例中,我们按天值分区+按股票代码 HASH 分区的方式创建。
3. 订阅 WindTDF 行情
建表后,我们使用 WindTDF::subscribe
函数订阅 WindTDF 行情(以快照为例):
WindTDF::subscribe(handle, snapshot_sh_s, "SH-2-0", "snapshot");
WindTDF::subscribe(handle, snapshot_sz_s, "SZ-2-0", "snapshot");
再订阅之前创建的持久化流数据表,将增量数据实时写入分布式数据库(以快照为例):
def handleInsert(tb, mutable msg) {
tableInsert(tb, msg)
}
subscribeTable(tableName="snapshot_sh_s", actionName="snapshotTableInsert_sh", offset=-1, handler=handleInsert{shSnapshot}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
subscribeTable(tableName="snapshot_sz_s", actionName="snapshotTableInsert_sz", offset=-1, handler=handleInsert{szSnapshot}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
通过调整 subscribeTable
函数中的 batchSize 和 throttle 参数可以灵活控制写入分布式数据库的频率。batchSize 表示未处理消息的数量,throttle 表示时间间隔,当达到其中任何一个条件时,数据将被写入分布式数据库。
完成订阅后,使用函数 WindTDF::connect
连接 WindTDF,行情数据便开始进入流数据表。
4. WindTDF 运行状态监控
运行过程中,我们可以随时查看 WindTDF 行情的接收情况:
由于 WindTDF 插件中使用了第三方 SDK,可能会出现内存耗尽(Out of Memory)的情况。我们可以通过合理分配流数据的容量、及时管理 session 的变量等方式避免该情况的发生。
除了 WindTDF 插件,DolphinDB 还提供了通联、华锐 AMD、华泰 Insight 等主流平台的行情接入插件。同时,DolphinDB 也提供数据存取、消息队列、机器学习等多种功能插件,为用户带来高效便捷的开发体验。
欢迎前往 DolphinDB 插件市场浏览完整清单~