DolphinDB 用户入门指南之金融篇(6)
文章提供流数据计算的入门内容,包括流数据概念、订阅获取、流引擎计算与案例说明。
Source: https://dolphindb.cn/blogs/141
What this page covers
- 流数据与流计算的定义与对比(批处理 vs 流处理)。
- 发布-订阅(Pub-Sub)模型、流程与数据发布方式。
- 流数据发布示例:回放、Python API 写入、Kafka 写入。
- 流数据订阅:subscribeTable 参数与 server/API 案例。
- 流计算引擎:引擎级联与分类。
- 引擎示例:主买/主卖交易量计算的端到端流程。
- 常见问题:更新删除、持久化、重启加载、排错与清理。
技能认证特训营第二期报名活动 (cta)
页面顶部包含活动提示与限时报名链接。
- 提供一个用于报名的链接入口。
- 该入口与活动提示信息一同展示在页面顶部。
DolphinDB 用户入门指南之金融篇(6):7. 流数据计算 (product_overview)
文章包含标题、作者与日期信息,并概述本章将覆盖的流数据与流计算内容。
- 发布日期为 2025-01-07。
- 作者署名为 momo。
- 内容覆盖流数据基础概念、框架和流程。
- 内容结合 API 订阅、消息中间件获取与流引擎计算案例。
本章依赖库表与持久化配置(persistenceDir) (how_it_works)
本章列出依赖的库表,并说明流表持久化需要在配置文件中设置 persistenceDir。
- 示例使用快照表 snapshot,位于 dfs://stock_lv2_snapshot。
- 示例使用逐笔成交表 trade,位于 dfs://stock_trade。
- 流表持久化需要在配置文件添加 persistenceDir 配置项。
- 配置文件包括单机 dolphindb.cfg 与集群 cluster.cfg 的情形。
7.1 流数据与流计算的定义(含批处理 vs 流处理对比) (definition)
本节定义流数据与流处理/流计算,并对比批处理与流处理的差异。
- 流数据是基于事件持续生成的时间序列数据。
- 流数据具有“动态”特性:持续生成,结束无明确定义。
- 流数据具有“有序”特性:记录带时间戳或序列号标识顺序。
- 流计算是在实时数据流上进行实时计算与分析。
- 批处理与流处理在数据范围、数据大小、延迟方面存在对比差异。
批处理 vs 流处理(摘要)
- 数据范围:批处理面向全量或大部分数据;流处理面向时间窗口或最近记录。
- 数据大小:批处理为大批量;流处理为单条或小批量。
- 延迟:批处理为分钟到小时;流处理可达亚毫秒级。
7.2 发布-订阅:模型、流程与数据发布方式 (how_it_works)
本节介绍 Pub-Sub 模型、发布订阅机制、背压行为,以及金融场景流程与接入方式。
- DolphinDB 流数据通信采用发布-订阅(Pub-Sub)模型。
- 通过消息队列发布与订阅以解耦发布者与订阅者。
- 每个节点维护发布队列,新流数据写入后进入消息发布队列。
- 每个订阅线程对应一个消费队列,用于获取数据并增量处理。
- 订阅端处理不足会导致消费队列积压并触发反压到发布端。
- 金融场景流程包含:行情写入流表、用户订阅计算、结果发布到下游平台。
- 数据接入方式包括行情插件、消息中间件插件与多语言 API。
接入方式示例
- 行情插件示例:amdQuote、INSIGHT、NSQ、EFH、DataFeed、CTP、XTP、SSEQuotationFile。
- 消息中间件示例:Kafka、zmq、mqtt、RabbitMQ、RocketMQ。
- API 语言示例:Python、C++、Java、Go、R、JavaScript。
流数据发布示例:历史回放、Python API 写入、Kafka 写入 (how_it_works)
本节给出三种写入流表的示例流程与代码:回放、Python 写入、Kafka 写入。
- 历史回放使用 replayDS 生成回放数据源。
- 回放使用 replay 将数据写入目标表对象或流计算引擎。
- 示例使用 enableTableShareAndPersistence 进行流表共享与持久化设置。
- Python 写入流程包含连接数据库与通过 MultithreadedTableWriter 写入。
- Python 写入示例使用 dolphindb.session 建立连接。
- Kafka 写入流程包含创建 KafkaConsumer 指定 topic 并消费后写入。
流数据订阅:subscribeTable 参数与 server/API 订阅案例 (how_it_works)
本节说明 subscribeTable 的关键参数,并提供 server 端订阅入库与 API 端订阅示例。
- subscribeTable 可通过 handler 配置消息注入对象。
- batchSize 表示每隔多少条消息处理一次。
- throttle 表示每隔多少秒处理一次。
- offset 支持 -2、-1 与 n(n>=0) 等订阅起点机制。
- reconnect 支持订阅中断时自动重新订阅。
- 订阅既支持 server 节点间,也支持第三方 API 订阅并推送下游。
- server 端示例创建分布式表 snapshot_trade 存储处理后的数据。
- 示例可按 TradingPhaseCode 字段对订阅数据进行过滤。
- Python 订阅示例在 msgAsTable=True 时可将数据作为表传递并转为 DataFrame。
7.3 流计算引擎:引擎级联与分类 (feature_list)
本节介绍内置流计算引擎、引擎级联的流水线式计算方式,并提及引擎分类展示。
- DolphinDB 内置多类流计算引擎。
- 复杂逻辑可拆分并通过引擎级联实现流水线式计算。
- 内置引擎算子描述为经过算法优化、计算性能高效(低置信度表述)。
- 通过配置引擎参数实现复杂逻辑、节省自研成本(低置信度表述)。
- 引擎按逻辑与场景分为多类,并包含单流与双流接入分类信息。
流计算引擎示例:主买/主卖交易量计算(ts/rs/cs 引擎级联) (how_it_works)
本节通过 ts/rs/cs 引擎级联,实现分钟聚合、累计与加权求和的指标计算,并包含订阅、回放与结果查看。
- 示例给出主买/主卖交易量的计算公式:Σ(交易价格*交易量*权重因子)。
- 第 1 步:按股票分组计算每分钟成交量,使用时序聚合引擎。
- 第 2 步:按股票分组按分钟累计成交量,使用响应式状态引擎。
- 第 3 步:对结果加权求和,使用横截面引擎。
- 示例包含对流表的共享与持久化参数设置(含 asynWrite、compress 等)。
- 示例中的权重因子通过 createWeightDict 构造并返回字典。
- 示例展示 ts/rs/cs 引擎的 metrics 配置样例。
- 示例展示将逐笔流表订阅到 tsEngine 的 subscribeTable 参数样例。
- 回放示例包含通过 timeRepartitionSchema 按 15 分钟切片避免 OOM(中等置信度表述)。
- 示例包含对结果表的查询语句:select * from tradeAmountIndex limit 10。
- 示例结论提及引擎算子与 SQL select 计算主体使用同一套函数,体现流批一体(中等置信度表述)。
7.4 常见问题(FAQ) (faq)
本节汇总流表能力限制、持久化与删除方式、重启加载与排错方法等常见问题。
- 流表记录不支持更新和删除,只能追加。
- keyedStreamTable/latestKeyedStreamTable 支持写入前键值检查以追加或丢弃。
- 流表内存过大可通过持久化接口并配置 persistenceDir 解决。
- 重启后自动加载可将流表定义与订阅操作放入 startup.dos。
- 删除流表应使用 dropStreamTable,而不是 undef(`st, SHARED)。
- 删除前需确保取消全部订阅,可用 getStreamingStat 与 unsubscribeTable 排查与处理。
- 持久化可设置 retentionMinutes 以定期删除持久化数据。
- 查询流表是否存在可使用 existsStreamTable。
- defined 可能对共享内存表返回 true,存在误用风险。
- 排查异常可查看 subWorkers.lastErrMsg、队列深度与引擎状态。
7.5 下一步阅读(官方文档与相关教程链接) (navigation)
本节提供进一步阅读的官方文档与教程链接,覆盖流数据、发布订阅、引擎、回放与场景示例等主题。
- 提供与流数据与发布订阅相关的进一步阅读入口。
- 提供与流计算引擎与 API 接入相关的进一步阅读入口。
- 提供与数据回放、流批一体与场景案例相关的进一步阅读入口。
附录:各章节脚本代码下载 (navigation)
附录提供本教程相关 .dos 与 .py 脚本下载链接列表。
- 提供多份 .dos 与 .py 脚本下载链接。
- 脚本覆盖建库建表、导入数据、SQL 批计算与流数据发布订阅等内容。
- 脚本覆盖引擎级联等示例资源。
Facts index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB 用户入门指南之金融篇(6) | 发布日期 | 2025-01-07 | high |
| 文章作者署名 | 作者 | momo | high |
| 本章内容范围 | 涵盖主题 | 介绍流数据基础概念、框架和流程;结合 API 订阅、消息中间件获取、流引擎计算等案例说明 DolphinDB 中的流计算应用 | high |
| 本章示例所涉库表 | 快照表位置 | 基于 3.3 节创建的 "dfs://stock_lv2_snapshot" 库下的快照表 snapshot | high |
| 本章示例所涉库表 | 逐笔成交表位置 | 基于 4.2.3 节创建的 "dfs://stock_trade" 库下的逐笔成交表 trade | high |
| 流表持久化 | 配置文件要求 | 需要在 server 配置文件(单机:dolphindb.cfg;集群:cluster.cfg)添加配置项 persistenceDir | high |
| 流数据 | 定义 | 基于事件持续生成的时间序列数据 | high |
| 流数据 | 特点 | 动态:持续生成,结束无明确定义,大小与结构不固定 | high |
| 流数据 | 特点 | 有序:每条记录具有时间戳或序列号以标识在流中的位置与顺序 | high |
| 流数据 | 特点 | 大规模:高速率生成、规模大,对并行处理性能与可扩展性要求更高 | high |
| 流数据 | 特点 | 强时效:要求极低延迟的读取和处理能力以驱动实时决策 | high |
| 流数据处理/流计算 | 定义 | 在实时数据流上进行实时计算和分析;与批处理不同,无需等待所有数据到位,按时间顺序增量处理 | high |
| 批处理 vs 流处理 | 数据范围对比 | 批处理:对数据集中的所有或大部分数据进行查询或处理;流处理:对时间窗口内的数据或最近的数据记录进行查询或处理 | high |
| 批处理 vs 流处理 | 数据大小对比 | 批处理:大批量数据;流处理:单条记录或包含几条记录的小批量数据 | high |
| 批处理 vs 流处理 | 延迟对比 | 批处理:几分钟至几小时;流处理:亚毫秒级 | high |
| DolphinDB 流数据通信 | 采用模型 | 经典发布-订阅(Pub-Sub)通信模型,通过消息队列发布与订阅,解耦发布者与订阅者 | high |
| DolphinDB 发布端机制 | 队列与发布流程 | 每个节点维护发布队列;新流数据写入发布表后发送到消息发布队列,再由发布线程发布到各订阅端消费队列 | high |
| DolphinDB 订阅端机制 | 线程与消费流程 | 每个订阅线程对应一个消费队列;订阅提交后有新数据写入发布表时会通知订阅方,订阅线程从消费队列获取数据增量处理 | high |
| 发布-订阅队列 | 积压与反压行为 | 订阅端处理不过来会在消费队列累积;消费队列满反压到发布队列;发布队列也满会阻塞写入 | high |
| 金融场景发布-订阅流程 | 步骤 | (1)行情数据写入流表;(2)用户订阅流表计算;(3)结果通过 API/插件等发布到第三方平台(如可视化/下游对接) | high |
| 流数据发布接入方式 | 行情插件示例 | amdQuote、INSIGHT、NSQ、EFH、DataFeed、CTP、XTP、SSEQuotationFile 等 | high |
| 流数据发布接入方式 | 消息中间件插件示例 | Kafka、zmq、mqtt、RabbitMQ、RocketMQ 等 | high |
| 流数据发布接入方式 | API 语言 | Python、C++、Java、Go、R、JavaScript 等 | high |
| 历史数据回放 | 步骤 | replayDS 生成回放数据源;replay 将数据源回放到目标表对象或流计算引擎 | high |
| 流表持久化接口(示例) | 使用函数 | enableTableShareAndPersistence(table=t, tableName="st1", cacheSize=100000) | high |
| replay 回放接口(示例) | 关键参数示例 | replay(inputTables=ds, outputTables="st1", dateColumn="TradeDate", timeColumn="TradeTime", replayRate=100000, absoluteRate=true) | high |
| Python API 写入流表(示例) | 流程步骤 | 连接数据库;确保 server 端流表存在;初始化 MultithreadedTableWriter 建立连接并 insert 写入;等待写入完成并打印状态 | high |
| Python 写入示例 | 使用组件 | dolphindb.session 连接;MultithreadedTableWriter 用于写入流表 | high |
| Kafka 写入流表(示例) | 流程步骤 | 连接 DolphinDB;确保流表存在;创建 KafkaConsumer 并指定 topic;初始化 MultithreadedTableWriter;消费消息并 insert 写入 | high |
| subscribeTable | handler 参数用途 | 订阅时可通过 handler 配置消息注入对象(可为消息处理函数、流计算引擎或数据表) | high |
| subscribeTable | batchSize 参数 | 每隔多少条消息处理一次 | high |
| subscribeTable | throttle 参数 | 每隔多少秒处理一次 | high |
| subscribeTable | offset 可选机制 | -2:适用于持久化流表订阅,从磁盘持久化 offset 开始;-1:从订阅后的下一条开始;n(n>=0):从第 n 条开始 | high |
| subscribeTable | reconnect 参数 | 流订阅中断时自动重新订阅 | high |
| DolphinDB 订阅能力 | 支持范围 | 支持 server 端节点之间订阅,也支持通过第三方 API 订阅 DolphinDB 流表数据并推送给下游平台 | high |
| server 端订阅入库示例 | 目标表 | 创建分布式表 snapshot_trade 存储流表处理后的数据 | high |
| server 端订阅示例 | 过滤机制 | setStreamTableFilterColumn 设置只订阅 TradingPhaseCode 字段为 "Trade" 的数据;订阅时 filter=["TRADE"] | medium |
| server 端订阅示例 | 处理函数 | 自定义 append2Table 在入库前对部分字段进行计算处理,并将结果写入分布式表 | high |
| server 端订阅入库示例 | 入库数据量(示例注释) | select count(*) from snapshot_trade // 799,013 | medium |
| API 端订阅示例(Python) | 处理方式 | msgAsTable=True 时数据以表形式传到 Python 端默认转为 DataFrame,可在 handler 中 to_csv 导出 | high |
| 流计算引擎(DolphinDB) | 能力描述 | 内置多类流计算引擎,复杂因子逻辑可拆分并通过引擎级联实现流水线式计算 | high |
| 流计算引擎算子 | 性能描述 | 内置引擎支持的算子经过算法优化,计算性能高效 | low |
| 用户使用流计算引擎 | 成本描述 | 用户仅需配置引擎参数即可实现复杂逻辑,节省自研算法逻辑成本 | low |
| 流计算引擎分类 | 分类维度 | 根据逻辑与场景分为多类,并展示单流与双流接入场景的引擎/连接类型分类表 | medium |
| 主买/主卖交易量示例 | 计算公式 | 主买/主卖交易量 = Σ(交易价格*交易量*权重因子) | high |
| 主买/主卖交易量示例 | 拆分流程 | (1)每分钟成交量(按股票分组)→ 时序聚合引擎;(2)按分钟累计成交量(按股票分组)→ 响应式状态引擎;(3)加权求和 → 横截面引擎 | high |
| trade_st 流表(示例) | 持久化参数示例 | enableTableShareAndPersistence(... asynWrite=true, compress=true, cacheSize=100000, retentionMinutes=1440, flushMode=0, preCache=10000) | high |
| 示例中的权重因子 | 构造方式 | createWeightDict 返回 dict(sids, take(0.001, sids.size())) | high |
| createCrossSectionalEngine(示例) | metrics | [wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()] | high |
| createReactiveStateEngine(示例) | metrics | [cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)] | high |
| createTimeSeriesEngine(示例) | 窗口与metrics | windowSize=60000, step=60000;metrics=[sum(iif(TradeBSFlag=="S",1,0)*TradeQty*TradePrice), sum(iif(TradeBSFlag=="B",1,0)*TradeQty*TradePrice)] | high |
| subscribeTable(示例:订阅逐笔流表进入 tsEngine) | 参数示例 | tableName=trade_st, actionName="act_tsEngine", offset=0, handler=streamHandler{tsEngine}, msgAsTable=true, batchSize=10000, throttle=0.001 | high |
| 数据回放避免 OOM(示例说明) | 做法 | replayDS 使用 timeRepartitionSchema 按 15min 切片以保证回放时不 OOM | medium |
| 回放任务提交(示例) | replayRate | submitJob(... replayRate=10000000, absoluteRate=true) | high |
| 结果表查询(示例) | SQL | select * from tradeAmountIndex limit 10 | high |
| 流批一体特性(示例结论) | 描述 | 引擎算子与 SQL select 计算主体本质使用同一套函数,体现 DolphinDB 流批一体的计算特点 | medium |
| 流表记录 | 更新/删除支持 | 不支持更新和删除;记录只能追加,一旦追加后不能更新删除 | high |
| 键值流表(keyedStreamTable/latestKeyedStreamTable) | 写入前检查能力 | 支持在写入前进行键值检查,以选择数据是追加还是丢弃 | high |
| 流表内存占用过大 | 解决方式 | 可用 enableTablePersistence 或 enableTableShareAndPersistence 持久化;持久化目录通过配置参数 persistenceDir 设置 | high |
| 重启后自动加载订阅流表 | 配置方式 | 将流表定义与订阅操作放在启动脚本 startup.dos 中;startup 参数用于设置启动脚本 | high |
| 删除流表 | 正确方式 | 使用 dropStreamTable 删除,而不是 undef(`st, SHARED) | high |
| 流表无法删除排查 | 条件与方法 | 用 getStreamingStat 查看是否有未取消订阅;需先 unsubscribeTable 取消全部订阅后才能删除 | high |
| 定期删除持久化流表数据 | 参数 | 持久化时设置 retentionMinutes 参数定期删除持久化数据 | high |
| 查询流表是否存在 | 函数 | existsStreamTable | high |
| defined 函数用于判断流表存在性 | 误用风险 | defined 对共享内存表也会返回 true,用户常误用 | high |
| 排查流订阅/流计算异常 | 常用排查字段/函数 | getStreamingStat:subWorkers.lastErrMsg 排查异常;pubConns/subWorkers.queueDepth 排查是否阻塞;getStreamEngineStat 排查引擎状态 | high |
| 限时报名链接 | URL | https://www.qingsuyun.com/h5/e/217471/5/ | high |
| 配置参数 persistenceDir 文档链接 | URL | https://docs.dolphindb.cn/zh/db_distr_comp/cfg/cfg_para_ref.html?hl=#configparamref__section_hln_54d_syb | high |
| 配置参数 startup 文档链接 | URL | https://docs.dolphindb.cn/zh/db_distr_comp/cfg/standalone.html | high |
| 附录脚本资源 | 提供内容 | 提供多份 .dos 与 .py 脚本下载链接(含建库建表、导入数据、SQL批计算、流数据发布订阅、引擎级联等) | high |