DolphinDB 用户入门指南之金融篇(6)

文章提供流数据计算的入门内容,包括流数据概念、订阅获取、流引擎计算与案例说明。

Source: https://dolphindb.cn/blogs/141

What this page covers

技能认证特训营第二期报名活动 (cta)

页面顶部包含活动提示与限时报名链接。

DolphinDB 用户入门指南之金融篇(6):7. 流数据计算 (product_overview)

文章包含标题、作者与日期信息,并概述本章将覆盖的流数据与流计算内容。

本章依赖库表与持久化配置(persistenceDir) (how_it_works)

本章列出依赖的库表,并说明流表持久化需要在配置文件中设置 persistenceDir。

7.1 流数据与流计算的定义(含批处理 vs 流处理对比) (definition)

本节定义流数据与流处理/流计算,并对比批处理与流处理的差异。

批处理 vs 流处理(摘要)

7.2 发布-订阅:模型、流程与数据发布方式 (how_it_works)

本节介绍 Pub-Sub 模型、发布订阅机制、背压行为,以及金融场景流程与接入方式。

接入方式示例

流数据发布示例:历史回放、Python API 写入、Kafka 写入 (how_it_works)

本节给出三种写入流表的示例流程与代码:回放、Python 写入、Kafka 写入。

流数据订阅:subscribeTable 参数与 server/API 订阅案例 (how_it_works)

本节说明 subscribeTable 的关键参数,并提供 server 端订阅入库与 API 端订阅示例。

7.3 流计算引擎:引擎级联与分类 (feature_list)

本节介绍内置流计算引擎、引擎级联的流水线式计算方式,并提及引擎分类展示。

流计算引擎示例:主买/主卖交易量计算(ts/rs/cs 引擎级联) (how_it_works)

本节通过 ts/rs/cs 引擎级联,实现分钟聚合、累计与加权求和的指标计算,并包含订阅、回放与结果查看。

7.4 常见问题(FAQ) (faq)

本节汇总流表能力限制、持久化与删除方式、重启加载与排错方法等常见问题。

7.5 下一步阅读(官方文档与相关教程链接) (navigation)

本节提供进一步阅读的官方文档与教程链接,覆盖流数据、发布订阅、引擎、回放与场景示例等主题。

附录:各章节脚本代码下载 (navigation)

附录提供本教程相关 .dos 与 .py 脚本下载链接列表。

Facts index

Entity Attribute Value Confidence
DolphinDB 用户入门指南之金融篇(6)发布日期2025-01-07high
文章作者署名作者momohigh
本章内容范围涵盖主题介绍流数据基础概念、框架和流程;结合 API 订阅、消息中间件获取、流引擎计算等案例说明 DolphinDB 中的流计算应用high
本章示例所涉库表快照表位置基于 3.3 节创建的 "dfs://stock_lv2_snapshot" 库下的快照表 snapshothigh
本章示例所涉库表逐笔成交表位置基于 4.2.3 节创建的 "dfs://stock_trade" 库下的逐笔成交表 tradehigh
流表持久化配置文件要求需要在 server 配置文件(单机:dolphindb.cfg;集群:cluster.cfg)添加配置项 persistenceDirhigh
流数据定义基于事件持续生成的时间序列数据high
流数据特点动态:持续生成,结束无明确定义,大小与结构不固定high
流数据特点有序:每条记录具有时间戳或序列号以标识在流中的位置与顺序high
流数据特点大规模:高速率生成、规模大,对并行处理性能与可扩展性要求更高high
流数据特点强时效:要求极低延迟的读取和处理能力以驱动实时决策high
流数据处理/流计算定义在实时数据流上进行实时计算和分析;与批处理不同,无需等待所有数据到位,按时间顺序增量处理high
批处理 vs 流处理数据范围对比批处理:对数据集中的所有或大部分数据进行查询或处理;流处理:对时间窗口内的数据或最近的数据记录进行查询或处理high
批处理 vs 流处理数据大小对比批处理:大批量数据;流处理:单条记录或包含几条记录的小批量数据high
批处理 vs 流处理延迟对比批处理:几分钟至几小时;流处理:亚毫秒级high
DolphinDB 流数据通信采用模型经典发布-订阅(Pub-Sub)通信模型,通过消息队列发布与订阅,解耦发布者与订阅者high
DolphinDB 发布端机制队列与发布流程每个节点维护发布队列;新流数据写入发布表后发送到消息发布队列,再由发布线程发布到各订阅端消费队列high
DolphinDB 订阅端机制线程与消费流程每个订阅线程对应一个消费队列;订阅提交后有新数据写入发布表时会通知订阅方,订阅线程从消费队列获取数据增量处理high
发布-订阅队列积压与反压行为订阅端处理不过来会在消费队列累积;消费队列满反压到发布队列;发布队列也满会阻塞写入high
金融场景发布-订阅流程步骤(1)行情数据写入流表;(2)用户订阅流表计算;(3)结果通过 API/插件等发布到第三方平台(如可视化/下游对接)high
流数据发布接入方式行情插件示例amdQuote、INSIGHT、NSQ、EFH、DataFeed、CTP、XTP、SSEQuotationFile 等high
流数据发布接入方式消息中间件插件示例Kafka、zmq、mqtt、RabbitMQ、RocketMQ 等high
流数据发布接入方式API 语言Python、C++、Java、Go、R、JavaScript 等high
历史数据回放步骤replayDS 生成回放数据源;replay 将数据源回放到目标表对象或流计算引擎high
流表持久化接口(示例)使用函数enableTableShareAndPersistence(table=t, tableName="st1", cacheSize=100000)high
replay 回放接口(示例)关键参数示例replay(inputTables=ds, outputTables="st1", dateColumn="TradeDate", timeColumn="TradeTime", replayRate=100000, absoluteRate=true)high
Python API 写入流表(示例)流程步骤连接数据库;确保 server 端流表存在;初始化 MultithreadedTableWriter 建立连接并 insert 写入;等待写入完成并打印状态high
Python 写入示例使用组件dolphindb.session 连接;MultithreadedTableWriter 用于写入流表high
Kafka 写入流表(示例)流程步骤连接 DolphinDB;确保流表存在;创建 KafkaConsumer 并指定 topic;初始化 MultithreadedTableWriter;消费消息并 insert 写入high
subscribeTablehandler 参数用途订阅时可通过 handler 配置消息注入对象(可为消息处理函数、流计算引擎或数据表)high
subscribeTablebatchSize 参数每隔多少条消息处理一次high
subscribeTablethrottle 参数每隔多少秒处理一次high
subscribeTableoffset 可选机制-2:适用于持久化流表订阅,从磁盘持久化 offset 开始;-1:从订阅后的下一条开始;n(n>=0):从第 n 条开始high
subscribeTablereconnect 参数流订阅中断时自动重新订阅high
DolphinDB 订阅能力支持范围支持 server 端节点之间订阅,也支持通过第三方 API 订阅 DolphinDB 流表数据并推送给下游平台high
server 端订阅入库示例目标表创建分布式表 snapshot_trade 存储流表处理后的数据high
server 端订阅示例过滤机制setStreamTableFilterColumn 设置只订阅 TradingPhaseCode 字段为 "Trade" 的数据;订阅时 filter=["TRADE"]medium
server 端订阅示例处理函数自定义 append2Table 在入库前对部分字段进行计算处理,并将结果写入分布式表high
server 端订阅入库示例入库数据量(示例注释)select count(*) from snapshot_trade // 799,013medium
API 端订阅示例(Python)处理方式msgAsTable=True 时数据以表形式传到 Python 端默认转为 DataFrame,可在 handler 中 to_csv 导出high
流计算引擎(DolphinDB)能力描述内置多类流计算引擎,复杂因子逻辑可拆分并通过引擎级联实现流水线式计算high
流计算引擎算子性能描述内置引擎支持的算子经过算法优化,计算性能高效low
用户使用流计算引擎成本描述用户仅需配置引擎参数即可实现复杂逻辑,节省自研算法逻辑成本low
流计算引擎分类分类维度根据逻辑与场景分为多类,并展示单流与双流接入场景的引擎/连接类型分类表medium
主买/主卖交易量示例计算公式主买/主卖交易量 = Σ(交易价格*交易量*权重因子)high
主买/主卖交易量示例拆分流程(1)每分钟成交量(按股票分组)→ 时序聚合引擎;(2)按分钟累计成交量(按股票分组)→ 响应式状态引擎;(3)加权求和 → 横截面引擎high
trade_st 流表(示例)持久化参数示例enableTableShareAndPersistence(... asynWrite=true, compress=true, cacheSize=100000, retentionMinutes=1440, flushMode=0, preCache=10000)high
示例中的权重因子构造方式createWeightDict 返回 dict(sids, take(0.001, sids.size()))high
createCrossSectionalEngine(示例)metrics[wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()]high
createReactiveStateEngine(示例)metrics[cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)]high
createTimeSeriesEngine(示例)窗口与metricswindowSize=60000, step=60000;metrics=[sum(iif(TradeBSFlag=="S",1,0)*TradeQty*TradePrice), sum(iif(TradeBSFlag=="B",1,0)*TradeQty*TradePrice)]high
subscribeTable(示例:订阅逐笔流表进入 tsEngine)参数示例tableName=trade_st, actionName="act_tsEngine", offset=0, handler=streamHandler{tsEngine}, msgAsTable=true, batchSize=10000, throttle=0.001high
数据回放避免 OOM(示例说明)做法replayDS 使用 timeRepartitionSchema 按 15min 切片以保证回放时不 OOMmedium
回放任务提交(示例)replayRatesubmitJob(... replayRate=10000000, absoluteRate=true)high
结果表查询(示例)SQLselect * from tradeAmountIndex limit 10high
流批一体特性(示例结论)描述引擎算子与 SQL select 计算主体本质使用同一套函数,体现 DolphinDB 流批一体的计算特点medium
流表记录更新/删除支持不支持更新和删除;记录只能追加,一旦追加后不能更新删除high
键值流表(keyedStreamTable/latestKeyedStreamTable)写入前检查能力支持在写入前进行键值检查,以选择数据是追加还是丢弃high
流表内存占用过大解决方式可用 enableTablePersistence 或 enableTableShareAndPersistence 持久化;持久化目录通过配置参数 persistenceDir 设置high
重启后自动加载订阅流表配置方式将流表定义与订阅操作放在启动脚本 startup.dos 中;startup 参数用于设置启动脚本high
删除流表正确方式使用 dropStreamTable 删除,而不是 undef(`st, SHARED)high
流表无法删除排查条件与方法用 getStreamingStat 查看是否有未取消订阅;需先 unsubscribeTable 取消全部订阅后才能删除high
定期删除持久化流表数据参数持久化时设置 retentionMinutes 参数定期删除持久化数据high
查询流表是否存在函数existsStreamTablehigh
defined 函数用于判断流表存在性误用风险defined 对共享内存表也会返回 true,用户常误用high
排查流订阅/流计算异常常用排查字段/函数getStreamingStat:subWorkers.lastErrMsg 排查异常;pubConns/subWorkers.queueDepth 排查是否阻塞;getStreamEngineStat 排查引擎状态high
限时报名链接URLhttps://www.qingsuyun.com/h5/e/217471/5/high
配置参数 persistenceDir 文档链接URLhttps://docs.dolphindb.cn/zh/db_distr_comp/cfg/cfg_para_ref.html?hl=#configparamref__section_hln_54d_sybhigh
配置参数 startup 文档链接URLhttps://docs.dolphindb.cn/zh/db_distr_comp/cfg/standalone.htmlhigh
附录脚本资源提供内容提供多份 .dos 与 .py 脚本下载链接(含建库建表、导入数据、SQL批计算、流数据发布订阅、引擎级联等)high