DolphinDB 用户入门指南之金融篇(6)
7. 流数据计算
为了便于用户快速入门和理解,本章主要介绍流数据的一些基础概念、框架和流程。结合 API 订阅流数据、消息中间件获取流数据、流引擎计算等具体案例,进一步了解流计算在 DolphinDB 中的应用。
本章涉及到的库表有:
- 基于 3.3 节创建的 "dfs://stock_lv2_snapshot" 库下的快照表 snapshot
- 基于 4.2.3 节创建的 "dfs://stock_trade" 库下的逐笔成交表 trade
本章涉及到流表持久化,需要在 server 配置文件(单机:dolphindb.cfg 集群:cluster.cfg)添加配置项 persistenceDir。

图 7-1 配置项示意图
7.1 流数据和流计算的定义
流数据是基于事件持续生成的时间序列数据。与静态有界的历史数据不同,流数据具有以下特点:
- 动态:数据流持续动态生成,流的结束没有明确定义,数据的大小与结构也没有固定限制。
- 有序:每条流数据记录都具有时间戳或者序列号,标识了数据在流中的位置与顺序。
- 大规模:流数据通常以高速率生成,数据规模大,对处理引擎的并行处理性能和可扩展性有更高要求。
- 强时效:流数据的强时效性要求极低延迟的读取和处理能力,以最大化数据价值,驱动实时业务决策。
流数据处理是指在实时数据流上进行实时计算和分析的过程,也称为流计算。与批处理不同,流处理无需等待所有数据全部到位,即可按照时间顺序对数据进行增量处理。这种实时处理方式能够高效利用存储与计算资源,适用于需要快速响应和及时决策的应用场景。
表 7-1 批计算和流计算对比
| 批处理 | 流处理 | |
|---|---|---|
| 数据范围 | 对数据集中的所有或大部分数据进行查询或处理 | 对时间窗口内的数据或对最近的数据记录进行查询或处理 |
| 数据大小 | 大批量数据 | 单条记录或包含几条记录的小批量数据 |
| 性能 | 几分钟至几小时的延迟 | 亚毫秒级延迟 |
7.2 发布-订阅
7.2.1 发布-订阅模型
DolphinDB 采用了经典的发布-订阅(Pub-Sub)通信模型,通过消息队列实现流数据的发布与订阅,将流数据生产者(发布者)与消费者(订阅者)解耦。这种模式促进了系统内不同组件之间的通信,提高可伸缩性,改善发送者的响应能力。

图 7-2 发布订阅框架
- 发布数据:发布端在每个节点上维护一个发布队列。当新的流数据注入到该节点的流数据发布表时,DolphinDB 会将这些数据发送到相应的消息发布队列,再由发布线程将数据发布到各个订阅端的消费队列。
- 订阅数据:每个订阅线程对应一个消费队列。订阅成功提交后,每当有新数据写入流数据发布表时,DolphinDB 会主动通知所有订阅方,订阅线程从消费队列中获取数据进行增量处理。
当发布端发布大量消息造成订阅端来不及处理时,消息会在消费队列进行累积,若消费队列满了,则会反压到发布队列,若发布队列也满了,则会阻塞写入。
7.2.2 发布-订阅流程

图 7-3 发布-订阅流程
在实际金融场景下,一个完整发布-订阅流程:(1)行情数据发布到(写入)流表;(2)用户订阅流表进行计算;(3)计算结果通过 API、插件等发布到其他第三方平台,例如可视化平台展示、下游数据对接等等。
流数据发布
在环节 (1)中,用户通常通过行情插件、消息中间件或 API 将外部订阅的流数据写入流表,或者也可以通过数据回放历史数据写入流表。前者通常应用在实时计算场景,而回放通常是在回测场景使用的。
- 行情插件:华锐 amdQuote, 华泰 INSIGHT, 恒生 NSQ, 盛立 EFH, 中金 DataFeed, CTP, XTP, SSEQuotationFile 等。
- 消息中间件插件:Kafka, zmq, mqtt, RabbitMQ, RocketMQ 等。
- API:Python, C++, Java, Go, R, JavaScript 等。

图 7-4 流数据发布的不同方式
例 1:回放历史快照表模拟实时数据写入场景。
历史数据回放主要分为2个步骤:
- replayDS 函数根据历史数据生成回放数据源。
- replay 函数回放历史数据源到目标表对象或流计算引擎。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
// 从历史数据生成回放数据源
beginDate = 2022.01.01
endDate = 2022.01.31
ds = replayDS(<select * from loadTable(dbName, tbName) where TradeDate between beginDate and endDate>, dateColumn="TradeDate", timeColumn="TradeTime")
// 定义流计算表
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
// 持久化流表
enableTableShareAndPersistence(table=t, tableName="st1", cacheSize=100000)
go
// 回放流表
replay(inputTables=ds, outputTables="st1", dateColumn="TradeDate", timeColumn="TradeTime", replayRate=100000, absoluteRate=true)例 2:通过 Python API 将上游流数据实时写入流表。
本脚本使用的测试数据文件如下,解压到自己的目录后,请替换脚本中的 data_path:
通过 Python API 将上游数据写入流表一般分为个步骤:
- 建立到 DolphinDB 数据库的连接。
- 确保 DolphinDB 端订阅的流表已经建立并且存在。
- 初始化 MultithreadedTableWriter(MTW)对象,建立 API 和 server 端流表的连接(dbName 指定为流表名称),并通过 MTW.insert 接口写入数据。
- 等待数据完全写入并打印写入结果。
本例中上游的历史数据是通过遍历 csv 数据模拟的,用户可以将该部分替换成其他数据源,例如 kafka 数据,参考例 3。
import dolphindb as ddb
import numpy as np
import pandas as pd
import dolphindb.settings as keys
# 建立 API 和 server 之间的连接
s = ddb.session()
conn = s.connect(host="ecosystem", port=8440, userid="admin", password="123456")
# 替换为自己的 ip/port
if conn:
print("successfully connected!")
# 定义用于写入的持久化流表
spt = """
dropStreamTable("st2")
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
enableTableShareAndPersistence(table=t, tableName="st2", cacheSize=100000)
"""
s.run(spt)
# 通过遍历 csv 数据模拟上层实时数据写入
# csv 文件和 python 客户端处于同一台服务器
data_path = "<YOUR_PATH>/2022.01.04.csv"
df = pd.read_csv(data_path)
# 数据预处理:字段类型转换
df["Securityid"] = df["Securityid"].astype('str').apply(lambda x: x.zfill(6))
df["TradeDate"] = pd.to_datetime(df["TradeDate"], format='%Y.%m.%d')
df["TradeTime"] = pd.to_datetime(df["TradeTime"] , format='%H:%M:%S.%f')
df["LocalTime"] = pd.to_datetime(df["LocalTime"] , format='%H:%M:%S.%f')
df["OfferPrice"] = df["OfferPrice"].apply(lambda str: [float(x) if x.strip() else np.NaN for x in str.split(',')])
df["BidPrice"] = df["BidPrice"].apply(lambda str: [float(x) if x.strip() else np.NaN for x in str.split(',')])
df["OfferOrderQty"] = df["OfferOrderQty"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
df["BidOrderQty"] = df["BidOrderQty"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
df["BidNumOrders"] = df["BidNumOrders"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
df["OfferNumOrders"] = df["OfferNumOrders"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
df["OfferOrder"] = df["OfferOrder"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
df["BidOrder"] = df["BidOrder"].apply(lambda str: [int(x) if x.strip() else None for x in str.split(',')])
writer = ddb.MultithreadedTableWriter(host="ecosystem", port=8440, userId="admin", password="123456", dbPath="", tableName="st2", threadCount=1)
# 数据写入:通过循环模拟实时数据逐条接入
for row in df.itertuples(index=False):
data = tuple(row)
tmp = writer.insert(*data)
# 如有写入错误打印错误代码和错误信息
if(tmp.hasError()):
print(tmp.errorCode + ": " + tmp.errorInfo)
# 等待写入完成
writer.waitForThreadCompletion()
# 打印写入整体状态
res = writer.getStatus()
print(res)
# 打印最终写入结果
if res.succeed():
print("Data successfully written.")例 3:在 Python 端通过 kafka 消息中间件将上游流数据实时写入流表。
本教程提取部分历史数据导入 kafka 中进行测试,测试数据文件如下:
与例 2 相比,本例只是将数据源替换成了 Kafka 插件端接收的数据,因此其大致流程和例 2 相同,也是分为下述步骤:
- 建立到 DolphinDB 数据库的连接。
- 确保 DolphinDB 端订阅的流表已经建立并且存在。
- 创建一个KafkaConsumer对象,并指定要消费的topic。
- 初始化 MultithreadedTableWriter(MTW)对象,建立 API 和 server 端流表的连接(dbName 指定为流表名称),逐条消费并处理 kafka 端的数据,并通过 MTW.insert 接口写入数据。
from kafka import KafkaConsumer, TopicPartition
import json
from json import JSONDecodeError
import dolphindb as ddb
import numpy as np
import pandas as pd
import dolphindb.settings as keys
# 建立 API 和 server 之间的连接
s = ddb.session()
conn = s.connect(host="ecosystem", port=8440, userid="admin", password="123456")
# 替换自己的 ip/port
if conn:
print("successfully connected!")
# 定义用于写入的持久化流表
spt = """
try{dropStreamTable("st3")}catch(ex){writeLog(ex)}
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
enableTableShareAndPersistence(table=t, tableName="st3", cacheSize=100000)
"""
s.run(spt)
##### 上述脚本可在 DolphinDB 端实现 #####
# 定义Kafka集群的地址和topic名称
bootstrap_servers = ['183.134.101.143:9092']
topic = 'snapshot'
# 创建一个KafkaConsumer对象,并指定要消费的topic
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m) if m else None
)
# 创建 MTW 对象用于实时写入流数据
writer = ddb.MultithreadedTableWriter(host="ecosystem", port=8440, userId="admin", password="123456", dbPath="", tableName="st3", threadCount=1, batchSize=100)
# 消费 kafka 中的数据写入流表
for message in consumer:
try:
msg = message.value
msg['TradeDate']=pd.to_datetime(msg["TradeDate"], format='%Y.%m.%d')
msg["TradeTime"] = pd.to_datetime(msg["TradeTime"] , format='%H:%M:%S.%f')
msg["LocalTime"] = pd.to_datetime(msg["LocalTime"] , format='%H:%M:%S.%f')
data = tuple(msg.values())
tmp = writer.insert(*data)
if(tmp.hasError()):
print(tmp.errorCode + ": " + tmp.errorInfo)
except Exception:
print("Received invalid JSON message:", message)流数据订阅
流数据发布之后就来到了环节(2)流数据订阅,用户可以通过函数 subscribeTable 订阅流数据表中的数据,并且在订阅时,可以通过设置 subscribeTable 函数的参数 handler 来配置消息处理机制。
关键参数配置:
- batchSize:每隔多少条消息处理一次
- throttle:每隔多少秒处理一次
- handler:设置订阅消息注入对象,可以是消息处理函数、流计算引擎、数据表
- offset:订阅消息的偏移量,可选机制:
- -2:适用于持久化流表订阅。获取持久化到磁盘上的 offset,并从该位置开始订阅
- -1:从发起订阅后的下一条数据开始订阅
- n(n >= 0):从第 n 条数据开始订阅
- reconnect:流订阅中断时,自动重新订阅
DolphinDB 不仅支持 server 端节点之间的订阅,而且支持通过第三方 API 订阅 DolphinDB 流表数据,推送给下游的其他平台。下面我们结合几个具体的案例进行说明。
例 1:在 server 端订阅流数据,进行数据处理后入库。
执行代码前,可以初始化定义一些变量并清理执行环境。
undef all
go
// 变量定义
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
tbName1 = "snapshot_trade"
// 清理环境
try{
dropTable(database(dbName), tbName1)
// 删除流表前需要取消该表所有的订阅
unsubscribeTable(tableName="snap1", actionName="subAndWrt2db")
dropStreamTable("snap1")
}catch(ex){print ex}创建一个分布式表 snapshot_trade 用于存储流表处理后的数据。
create table "dfs://stock_lv2_snapshot"."snapshot_trade"(
TradeDate DATE[comment="交易日期", compress="delta"]
TradeTime TIME[comment="交易时间", compress="delta"]
SecurityID SYMBOL
OpenPrice DOUBLE
PreClosePrice DOUBLE
HighPrice DOUBLE
LowPrice DOUBLE
LastPrice DOUBLE
PreCloseIOPV DOUBLE
IOPV DOUBLE
UpLimitPx DOUBLE
DownLimitPx DOUBLE
DeltasHighPrice INT
DeltasLowPrice INT
DeltasVolume LONG
DeltasTurnover DOUBLE
DeltasTradesCount INT
)
partitioned by TradeDate, SecurityID,
sortColumns=[`SecurityID,`TradeTime],
keepDuplicates=ALL定义流数据表并建立流订阅。
本例对流数据进行处理,包含 2 个方面:
- setStreamTableFilterColumn 函数:设置只订阅 TradingPhaseCode 字段为 ”Trade“ 值的数据
- 自定义函数 append2Table :在入库前对部分字段的数据进行计算和处理
// 定义流表
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
enableTableShareAndPersistence(table=t, tableName="snap1", cacheSize=100000)
go
// 定义过滤字段
setStreamTableFilterColumn(snap1, "TradingPhaseCode")
// 定义流数据处理函数:订阅处理后写入分布式表
def append2Table(dbName, tbName, mutable msg){
data = select TradeDate,
iif(TradeTime <=09:30:00.000, 09:30:00.000, TradeTime) as TradeTime,
SecurityID,
OpenPrice,
PreCloPrice,
HighPrice,
LowPrice,
LastPrice,
PreCloseIOPV,
IOPV,
UpLimitPx,
DownLimitPx,
iif(deltas(HighPrice)>0.000001, 1, 0) as DeltasHighPrice,
iif(abs(deltas(LowPrice))>0.000001, -1, 0) as DeltasLowPrice,
iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade)) as DeltasVolume,
iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade)) as DeltasTurnover,
iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades)) as DeltasTradesCount
from msg
where TradeTime >=09:25:00.000
context by SecurityID
loadTable(dbName, tbName).tableInsert(data)
}
// 订阅流表
subscribeTable(tableName="snap1", actionName="subAndWrt2db", offset=0, handler=append2Table{dbName, tbName1}, msgAsTable=true, batchSize=10000, filter=["TRADE"])使用 replay 函数回放历史存储的快照数据到流表,以模拟实时数据写入。
// 历史数据回放
beginDate = 2022.01.01
endDate = 2022.01.10
ds = replayDS(<select * from loadTable(dbName, tbName) where TradeDate between beginDate and endDate>, dateColumn="TradeDate", timeColumn="TradeTime")
replay(inputTables=ds, outputTables="snap1", dateColumn="TradeDate", timeColumn="TradeTime", replayRate=100000, absoluteRate=true)
// 查看实际入库的数据
select count(*) from loadTable(dbName, tbName1) // 799,013例 2:在 API 端订阅流数据表,并在 API 端进行计算。
在订阅流表前,现在 DolphinDB 端(Web / GUI / VSCode)执行下述脚本,定义一个持久化流表 snap2。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
beginDate = 2022.01.01
endDate = 2022.01.10
ds = replayDS(<select * from loadTable(dbName, tbName) where TradeDate between beginDate and endDate>, dateColumn="TradeDate", timeColumn="TradeTime")
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
streamTbName = "snap2"
// 请确保没有同名流表,如有请修改表名
// 若需要重复执行该脚本,可执行此步取消订阅删除流表
try{
actName = exec actions from getStreamingStat().pubTables where tableName = streamTbName
hp = exec subscriber from getStreamingStat().pubTables where tableName = streamTbName
host = hp[0].split(":")[0]
port = hp[0].split(":")[1].int()
stopPublishTable(host, port, streamTbName, actName[0])
dropStreamTable(streamTbName)
}catch(ex){
print ex
}
// 定义一个持久化流表 snap2 接收流数据
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
enableTableShareAndPersistence(table=t, tableName=streamTbName, cacheSize=100000)
go
// (非必要)设置过滤列,若不需要过滤可以删去该脚本
setStreamTableFilterColumn(snap2, 'SecurityID')在 Python 客户端发起订阅,并将订阅到的流数据存入本地的csv文件。导入 csv 文件的处理逻辑封装在 handler 函数内,由于设置 msgAsTable=True 后,导入的数据以表的形式传输到 Python 端,默认转化为 DataFrame 形式,因此可以直接用 to_csv 导出。 此处的 handler 函数也可以替换成其他数据处理计算逻辑。
import dolphindb as ddb
import numpy as np
import pandas as pd
import dolphindb.settings as keys
from threading import Event
# 建立 API 和 server 之间的连接
s = ddb.session()
conn = s.connect(host="ecosystem", port=8440, userid="admin", password="123456")
# 替换自己的 ip/port
# 确认连接是否成功
if conn:
print("successfully connected!")
# 开启流数据功能
s.enableStreaming(0)
# 定义流计算处理函数
def my_handler(msg):
optPath = "D:/File2024/入门教程/streamOutput.csv"
msg.to_csv(optPath, mode="a", header=False, index=False)
# 发起订阅
s.subscribe(host="ecosystem", port=8440, tableName="snap2", actionName="sub_snap2", offset=-1, handler=my_handler, msgAsTable=True, batchSize=10000, throttle=1, filter=np.array(["000001"]))
# 阻塞进程使订阅持续进行
Event().wait()流数据回放模拟:提交一个历史数据回放的后台任务模拟流数据回放
submitJob("replay", "mockRealTimeData", replay{inputTables=ds, outputTables=streamTbName, dateColumn="TradeDate", timeColumn="TradeTime", replayRate=100000, absoluteRate=true})
getRecentJobs(1)可以通过 getRecentJobs 函数查看当前回放任务是否结束,回放结束后,可以在指定目录查看文件是否成功导出。
7.3 流计算引擎
在 DolphinDB 处理流数据有天然优势,这是因为 DolphinDB 内置了处理不同逻辑场景的流计算引擎,即使是复杂的因子计算逻辑,也可以通过逻辑拆分交给不同的引擎去实现,通过引擎的级联实现流水线式的计算。且 DolphinDB 的内置引擎支持的算子经过了算法上的优化,计算性能高效。用户使用时仅需要对引擎参数进行配置,即可实现复杂逻辑,节省了用户自己开发算法逻辑的成本。

图 7-5 引擎级联流水线示意图
根据各自适用的逻辑和场景,DolphinDB 内置的流计算引擎可以分为以下几类:
表 7-3-1 不同流计算场景下适用的引擎及其计算逻辑


例. 流式计算主买/主卖交易量(示例代码参考:流式计算中证 1000 指数主买/主卖交易量)
计算公式:主买/主卖交易量 = Σ(交易价格*交易量*权重因子)
拆分计算逻辑,得到的计算流程为:
(1)计算每分钟的成交量(按照股票分组)→ 时序聚合引擎
(2)按分钟累计成交量(按照股票分组) → 响应式状态引擎
(3)对股票成交量进行加权求和计算交易量 → 横截面引擎

图 7-6 主买/卖成交量计算示意图

图 7-7 主买/卖累积成交量、交易量(加权后)计算示意图
在运行脚本前,可以先清理环境中的变量,并初始化定义一些公共变量:
undef all
go
dbName = "dfs://stock_trade"
tbName = "trade"
streamTbName = "trade_st"
try{unsubscribeTable(tableName=streamTbName, actionName="act_tsEngine")}catch(ex){print(ex)}
try{dropStreamEngine("rsEngine")}catch(ex){print(ex)}
try{dropStreamEngine("tsEngine")}catch(ex){print(ex)}
try{dropStreamEngine("csEngine")}catch(ex){print(ex)}定义一个持久化流表 trade_st 用于接收实时的逐笔成交数据。
colName=loadTable(dbName, tbName).schema().colDefs.name
colType=loadTable(dbName, tbName).schema().colDefs.typeString
t = streamTable(100:0, colName, colType);
try{
enableTableShareAndPersistence(table=t, tableName=streamTbName, asynWrite=true, compress=true, cacheSize=100000, retentionMinutes=1440, flushMode=0, preCache=10000)
}
catch(ex){
print(ex)
}
undef("t")
go模拟创建成分股权重因子。
sids = exec distinct SecurityID from loadTable(dbName, tbName)
def createWeightDict(sids){
return dict(sids, take(0.001, sids.size()))
}
weightDict = createWeightDict(sids)模拟创建成分股权重因子。
// 输入表结构定义
tsEngineDummy = table(1:0, `SecurityID`TradeTime`TradePrice`TradeQty`TradeBSFlag, [SYMBOL, TIMESTAMP, DOUBLE, INT, SYMBOL])
rsEngineDummy = table(1:0, `TradeTime`SecurityID`SellTradeAmount`BuyTradeAmount, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
csEngineDummy = table(1:0, `SecurityID`TradeTime`SellTradeAmount`BuyTradeAmount, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE])
//创建存储结果的表
share(keyedTable(`Time, 2000:0, `Time`SellTradeAmount`BuyTradeAmount`UpdateTime, [TIMESTAMP, DOUBLE, DOUBLE, TIMESTAMP]), "tradeAmountIndex")
go定义流计算引擎。由于引擎是通过输出表设置为下一个引擎,来进行级联的,为保证脚本正常解析,引擎定义的顺序和业务数据流的逻辑是相反的。如本例中计算逻辑是 时序引擎 → 响应式状态引擎 → 横截面引擎,脚本中定义顺序则相反:
//创建横截面引擎
csEngine = createCrossSectionalEngine(name="csEngine", metrics=<[wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()]>, dummyTable=csEngineDummy, outputTable=objByName("tradeAmountIndex"), keyColumn=`SecurityID, triggeringPattern="keyCount", triggeringInterval=1000, useSystemTime=false, timeColumn=`TradeTime, lastBatchOnly=false)
//创建响应式状态引擎
rsEngine = createReactiveStateEngine(name="rsEngine", metrics=<[cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)]>, dummyTable=rsEngineDummy, outputTable=csEngine, keyColumn=`SecurityID)
//创建时间序列引擎
tsEngine = createTimeSeriesEngine(name="tsEngine", windowSize=60000, step=60000, metrics=<[sum(iif(TradeBSFlag=="S", 1, 0)*TradeQty*TradePrice), sum(iif(TradeBSFlag=="B", 1, 0)*TradeQty*TradePrice)]>, dummyTable=tsEngineDummy, outputTable=rsEngine, timeColumn=`TradeTime, keyColumn=`SecurityID, useWindowStartTime=true, fill=[0, 0], forceTriggerTime=100)订阅逐笔数据表,写入引擎流水线的第一个引擎:时序聚合引擎。
本例自定义了一个流数据处理函数 streamHandler,可以通过修改内部逻辑自定义数据处理方案。
// handler 数据处理函数
def streamHandler(mutable engine, msg){
data = select SecurityID, concatDateTime(TradeDate, TradeTime) as TradeTime, TradePrice, TradeQty, TradeBSFlag from msg
engine.append!(data)
}
// 订阅流数据表
subscribeTable(tableName=streamTbName, actionName="act_tsEngine", offset=0, handler=streamHandler{tsEngine}, msgAsTable=true, batchSize=10000, throttle=0.001)通过 replayDS 函数获取历史数据源,然后借助 replay 函数模拟流数据回放过程。
// 后台提交数据回放任务,模拟实时数据注入
// 本例设置 timeRepartitionSchema 将数据源按照 15min 进行切片,这是为了保证回放时数据不会 OOM
ds = replayDS(<select * from loadTable(dbName, tbName)>, dateColumn="TradeDate", timeColumn="TradeTime", timeRepartitionSchema=09:30:00.000 + 15*1000*60*(0..22))
submitJob("relay", "replay to trade", replay{inputTables=ds, outputTables=streamTbName, dateColumn="TradeDate", timeColumn="TradeTime", replayRate=10000000, absoluteRate=true})
getRecentJobs(1)回放进程可以通过函数 getRecentJobs 进程查询。
通过 SQL 查询输出的结果表:
select * from tradeAmountIndex limit 10
图 7-8 主买卖成交量流计算结果示意图
为了新用户能够更好理解引擎的计算,我们仔细观察三个计算引擎的流计算指标。
- 时序聚合引擎
<[sum(iif(TradeBSFlag=="S", 1, 0)*TradeQty*TradePrice),
sum(iif(TradeBSFlag=="B", 1, 0)*TradeQty*TradePrice)]>- 响应式状态引擎
<[cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)]>- 横截面引擎
<[wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()]>将上述流计算脚本转换为 SQL 批计算逻辑,可改写如下:
从分布式表中筛选一部分字段进行计算:
data = select SecurityID, concatDateTime(TradeDate, TradeTime) as TradeTime, TradePrice, TradeQty, TradeBSFlag from loadTable(dbName, tbName)注: 如果此脚本读取的数据量太大造成内存溢出,可以加上 limit 或 top 子句筛选一部分数据进行计算验证结果。
- 按时间进行分钟聚合(时序聚合)
t1 = select sum(iif(TradeBSFlag=="S", 1, 0)*TradeQty*TradePrice) as SellTradeAmount,
sum(iif(TradeBSFlag=="B", 1, 0)*TradeQty*TradePrice) as BuyTradeAmount
from data group by SecurityID, interval(TradeTime, 1m, 0) as TradeTime- 累积计算成交量(状态计算)
t2 = select SecurityID, cummax(TradeTime) as Time,
cumsum(SellTradeAmount) as SellTradeAmount,
cumsum(BuyTradeAmount) as BuyTradeAmount
from t1 context by SecurityID- 按股票因子加权计算成交量(截面计算)
t3 = select wsum(SellTradeAmount, weightDict[SecurityID]) as SellTradeAmount,
wsum(BuyTradeAmount, weightDict[SecurityID]) as BuyTradeAmount
from t2 group by Time对比批计算和流计算的脚本,可以发现不论是引擎的算子还是 SQL select 的计算主体,本质上都使用了一套函数去实现,这体现了 DolphinDB 流批一体的计算特点。
7.4 常见问题
1. 流表数据是否支持部分记录的更新和删除?
不支持。流表记录只能追加,一旦追加后就不能更新和删除。
特殊地,对于键值流表(keyedStreamTable 和 latestKeyedStreamTable),系统支持在数据写入流表前,进行键值检查,以选择数据是追加还是丢弃。
2. 流表数据量大,内存占用不够怎么解决?
可以将流表通过 enableTablePersistence 函数或者 enableTableShareAndPersistence 函数进行持久化,系统会将一部分数据持久化存储到磁盘指定目录(通过配置参数 persistenceDir 设置)。
3. 如何配置重启后自动加载订阅流表?
将流数据表定义和流表订阅操作放在启动脚本 startup.dos (通过配置参数 startup 设置)内,这样节点重启后可以自动加载订阅流表。详情参考教程 启动脚本 和 节点启动时的流计算自动订阅。
4. 流表如何进行删除?一直无法删除如何解决?
流表需要通过函数 dropStreamTable 进行删除,而不是通过 undef(`st, SHARED) 进行删除。 如果流表一直无法删除,请通过 getStreamingStat 函数查看当前流表上是否存在未取消的订阅,只有订阅全被取消后(取消订阅函数 unsubscribeTable),才能删除流表。
5. 如何配置定期删除持久化流表持久化的数据?
用户可以在通过 enableTablePersistence 函数或者 enableTableShareAndPersistence 函数持久化流表时,设置 retentionMinutes 参数,定期删除持久化的数据。
6. 如何查询当前流表是否存在?
可以通过函数 existsStreamTable 查看指定流数据表是否存在。 需要注意,用户常会误用 defined 函数进行判断,但是该函数对于共享内存表也会返回 true。
7. 流订阅或流计算异常如何排查? 函数 getStreamingStat 返回了流订阅发布的详细信息,其中比较常用的信息有:
- 通过 subWorkers 表的 lastErrMsg 字段可以排查订阅中是否存在异常;
- 通过 pubConns 表和 subWorkers 表中的 queueDepth 字段可以排查订阅是否阻塞;
通过函数 getStreamEngineStat 排查流计算引擎的状态。
7.5 下一步阅读
官方文档 流数据 整理了一些流计算相关的文档和教程说明,此处仅列举部分:
- 流数据架构:流数据功能简介流数据表发布与订阅
- 流计算引擎:内置流式计算引擎内置多数据源流式关联引擎
- 流数据接入:Python API 接入数据行情数据插件:NSQ
- 数据回放:股票行情回放
- 流批一体:流批一体快速搭建 Level-2 快照数据流批一体因子计算平台最佳实践
- 场景案例:金融实时实际波动率预测实时计算日累计逐单资金流实时计算分钟资金流实时选取外汇行情多价源最优价 金融因子流式实现
- 可视化展示:数据面板Altair 连接 DolphinDB 数据源DolphinDB Grafana DataSource Plugin
8. 附录
各章节对应的脚本代码如下: