Orca 声明式 DStream API 应用:账户持仓损益实时监控
本页介绍使用 Orca 声明式 DStream API 进行双流关联,以实现账户持仓损益的实时监控,并提供相关说明与附录资源。
Source: https://dolphindb.cn/blogs/252
What this page covers
- 文章信息与引言(标题、日期与主题概述)。
- 应用场景与挑战点,以及 Orca/DStream API 的适用性说明。
- 两阶段实现方案、结果输出方式与可视化展示思路。
- 数据结构与监控指标规则(含示例代码指向)。
- DStream API 构建流图、关联与指标计算、回放模拟数据写入步骤。
- 性能测试:时延定义、压测结果与测试环境信息。
- 附录:示例代码、样例数据与面板配置链接。
技能认证特训营第二期报名提示
页面顶部包含限时报名活动提示,并提供报名链接。
- 提供“技能认证特训营第二期”的报名入口链接。
- 该提示位于页面顶部区域。
Orca 声明式 DStream API 应用:账户持仓损益实时监控(文章信息与引言)
本节给出文章基本信息与引言,介绍使用 Orca 声明式 DStream API 进行双流关联以实现持仓损益实时监控,并说明前置阅读与附录代码资源。
- 文章发布日期为 2025-11-17。
- 主题聚焦账户持仓损益的实时监控实现。
- 内容提到使用 Orca 声明式 DStream API 进行双流关联。
- 文中提及前置阅读与附录代码说明。
应用场景描述
本节阐述多资产实时损益监控的业务必要性与挑战,并说明 Orca 平台及其声明式 DStream API 的适用性与能力点。
- 场景涉及多资产的实时损益监控需求。
- 指出实现实时监控存在挑战点。
- Orca 声明式 DStream API 强调声明式编程范式来定义流处理逻辑。
- Orca 提供任务自动调度与计算高可用等能力用于稳定任务运行。
- 文中将 DolphinDB 对应的平台/产品描述为“企业级实时计算平台 Orca”。
实现方案
本节描述两阶段的数据处理方案、结果输出方式与 Dashboard 可视化展示,并配合流程图进行说明。
- 方案分为两个阶段:数据预处理与监控指标计算。
- 数据预处理阶段包含关联生成宽表。
- 指标计算阶段使用响应式状态引擎计算十余个指标。
- 计算结果可由外部 API 订阅或查询。
- 计算结果可通过消息中间件插件等方式发送到外部。
- 本文使用 DolphinDB 自带 Dashboard 进行可视化展示。
数据与指标说明(数据结构与指标规则)
本节说明数据来源与回放模拟方式,列出三类数据表结构,并给出多项监控指标的计算规则与示例代码实现方式。
- 本文实现目标是账户层级下各标的持仓损益的实时监控。
- 样例数据与导入代码在附录中提供下载。
- 委托成交样例数据覆盖半个交易日的模拟数据。
- 委托成交样例数据包含 5 个账户与 20 支标的的流水记录。
- 持仓信息样例数据模拟开盘时 5 个账户持仓信息。
- 行情快照样例数据包含 20 支标的的 L2 快照数据。
- 持仓信息表列包含 AccountID、SecurityID、Date、SecurityName 等字段。
- 委托和成交数据表包含 AccountID、Type、OrderNo、SecurityID 等字段。
- 行情快照数据表列包含 SecurityID、Date、Time、LastPx。
- 自定义函数计算指标时,需要在定义前添加声明 @state。
- CanceledVolume 定义为撤单的委托数量,并给出 calCanceledVolume 实现。
- PositionVolume 定义为实时持仓数量,并给出 calPositionVolume 实现。
- ThresholdDeviation 定义为阈值偏离度,并给出 calThresholdDeviation 实现。
- PositionDeviation 定义为持仓偏离度,并给出 calPositionDeviation 实现。
- BuyVolume 定义为当日买入数量,并给出 calBuyVolume 实现。
- BuyPrice 定义为当日买入均价,并给出 calBuyPrice 实现。
- SellVolume 定义为当日卖出数量,并给出 calSellVolume 实现。
- SellPrice 定义为当日卖出均价,并给出 calSellPrice 实现。
- NetBuyVolume 定义为当日净买入数量,并给出 calNetBuyVolume 实现。
- FreezeVolume 定义为冻结持仓,并给出 calFreezeVolume 实现说明。
- AvailableVolume 段落首次出现时,公式与代码实现重复为 FreezeVolume。
- AvailableVolume 给出 calAvailableVolume 的实现表达式。
- AvailableVolumeRatio 给出 calAvailableVolumeRatio 的实现表达式。
- Profit 给出 calProfit 的实现说明,并提到结果 round 到 6 位。
持仓监控代码实现(构建流图与数据回放)
本节分步展示如何创建目录与流图、定义流数据表、进行两次 lookupJoin 关联、通过 reactiveStateEngine 计算指标并输出,以及提交流图并回放模拟数据写入。
- 数据目录为 positionMonitorDemo,流图名称为 positionMonitor。
- MarketDataStream 使用 parallelize("AccountID", 2) 按 AccountID 哈希分区并行分支。
- 委托成交与持仓基础信息使用 lookupJoinEngine 关联,matchingColumn 为 AccountID 与 SecurityID。
- 与行情快照通过 lookupJoinEngine 获取 LastPx,matchingColumn 为 SecurityID。
- reactiveStateEngine 输出使用 keyColumn 为 AccountID 与 SecurityID,并在 sync() 后 sink 到 PositionMonitorStream。
- 需要调用 StreamGraph::submit 才能启动流图。
- 开盘前调用 appendOrcaStreamTable 批量插入静态数据到 PositionInfo。
- 回放模拟使用 replayDS/replay,并通过 submitJob 提交任务。
结果展示(状态查看、结果查询与可视化)
本节展示查看流图状态的方式、计算结果查询方式,以及使用数据面板进行可视化展示的方法线索。
- 提供流图运维函数:getStreamGraphInfo。
- 提供元数据查看函数:getOrcaStreamTableMeta。
- 提供引擎元数据查看函数:getOrcaStreamEngineMeta。
- 结果查询 SQL:select * from positionMonitorDemo.orca_table.PositionMonitorStream。
性能测试(数据量、结果、环境)
本节给出时延定义、不同并发与回放速度下的时延结果,并说明单节点测试环境的软件版本与硬件/系统配置。
- 计算时延定义为 UpdateTime 与 ReceiveTime 的差值。
- 时延统计包含平均值与 99 分位。
- 五倍速回放用于模拟高负载场景。
- 文中提到委托成交流水最高达 5 万笔每秒。
- 文中结论:提升并发数可进一步降低计算时延。
- 测试环境为 DolphinDB server 单节点模式。
- 测试环境软件版本为 3.00.4 2025.09.09 LINUX_JIT x86_64。
- 环境包含 CentOS Linux 7 (Core) 与 Intel Xeon Gold 5220R @ 2.20GHz 配置描述。
总结与展望
本节总结 Orca 声明式 DStream API 在降低流计算开发难度与支撑低时延持仓监控方案方面的价值,并列出后续计划方向。
- 声明式 DStream API 以“描述做什么”来定义流处理逻辑,从而降低开发门槛。
- 后续规划包含低延时引擎、流式 SQL 引擎与流高可用等方向。
- 后续规划包含即时编译与开发/调试增强。
附录(示例代码/数据/面板配置链接)
本节提供示例代码、样例数据与数据面板配置文件的下载或引用链接。
- 提供 PositionMonitor.dos 的引用链接。
- 提供 importData.dos 的引用链接。
- 提供 SampleData.zip 的引用链接。
- 提供数据面板配置文件的引用链接。
Facts index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| 技能认证特训营第二期 | 报名链接 | https://www.qingsuyun.com/h5/e/217471/5/ | high |
| Orca 声明式 DStream API 应用:账户持仓损益实时监控 | 发布日期 | 2025-11-17 | high |
| DolphinDB | 平台/产品 | 企业级实时计算平台 Orca | high |
| Orca 声明式 DStream API | 编程范式/接口特点 | 以声明式编程范式为核心,允许通过描述“做什么”而非“如何做”定义流数据处理逻辑,从而降低开发门槛 | medium |
| Orca | 能力 | 提供任务自动调度与计算高可用等能力,用于保证任务稳定与降低开发运维成本 | medium |
| 本文实现目标 | 监控对象 | 账户层级下各标的持仓损益的实时监控 | high |
| 数据处理方案 | 阶段划分 | 两个阶段:数据预处理(关联生成宽表)与持仓监控指标计算(响应式状态引擎计算十余个指标) | high |
| 计算结果输出方式 | 方式 | 可由外部 API 订阅或查询,或通过消息中间件插件等发送到外部;本文使用 DolphinDB 自带 Dashboard 可视化展示 | high |
| 样例数据与导入代码 | 获取位置 | 在附录中提供下载 | high |
| 样例数据(委托成交数据表) | 覆盖范围 | 半个交易日的模拟数据;5 个账户、20 支标的的委托成交流水记录 | high |
| 样例数据(持仓信息表) | 覆盖范围 | 模拟当日开盘时 5 个账户所持的每支标的的持仓信息 | high |
| 样例数据(行情快照数据) | 覆盖范围 | 包括 20 支标的的 L2 快照数据 | high |
| 流计算引擎自定义函数 | 声明要求 | 使用自定义函数计算指标时,需在定义前添加声明 @state | high |
| 持仓信息表 | 字段结构 | 列:AccountID(SYMBOL), SecurityID(SYMBOL), Date(DATE), SecurityName(STRING), Threshold(INT), OpenVolume(INT), PreVolume(INT), PreClose(DOUBLE) | high |
| 委托和成交数据表 | 字段结构 | 列包括 AccountID, Type(委托1/成交2), OrderNo, SecurityID, Date, Time, BSFlag, Price, Volume, TradeNo, State, Mark, NetVolume, CumSellVol, CumBuyVol, SellPrice, BuyPrice, ReceivedTime(NANOTIMESTAMP) 等 | high |
| 行情快照数据表 | 字段结构 | 列:SecurityID(SYMBOL), Date(DATE), Time(TIME), LastPx(DOUBLE) | high |
| 指标 CanceledVolume | 定义 | 撤单的委托数量;文中给出计算公式与 @state 函数 calCanceledVolume | high |
| 指标 PositionVolume | 定义 | 实时持仓数量;文中给出计算公式与 @state 函数 calPositionVolume | high |
| 指标 ThresholdDeviation | 定义 | 阈值偏离度;文中给出计算公式与 @state 函数 calThresholdDeviation(结果 round 到 6 位) | high |
| 指标 PositionDeviation | 定义 | 持仓偏离度;文中给出计算公式与 @state 函数 calPositionDeviation(结果 round 到 6 位) | high |
| 指标 BuyVolume | 定义 | 当日买入数量;@state 函数 calBuyVolume(空值返回0) | high |
| 指标 BuyPrice | 定义 | 当日买入均价;文中给出计算说明与 @state 函数 calBuyPrice(round到6位) | high |
| 指标 SellVolume | 定义 | 当日卖出数量;@state 函数 calSellVolume(空值返回0) | high |
| 指标 SellPrice | 定义 | 当日卖出均价;文中给出计算说明与 @state 函数 calSellPrice(round到6位) | high |
| 指标 NetBuyVolume | 定义 | 当日净买入数量;@state 函数 calNetBuyVolume(空值返回0) | high |
| 指标 FreezeVolume | 定义 | 冻结持仓;文中给出计算说明与 @state 函数 calFreezeVolume(使用 cumsum 与撤单量/成交量逻辑) | high |
| AvailableVolume 指标段落 | 内容问题 | 在首次出现“AvailableVolume:可用持仓”处,公式与代码实现重复为 FreezeVolume(calFreezeVolume),随后再次出现 AvailableVolume 并给出正确公式与实现 | high |
| 指标 AvailableVolume | 实现 | @state 函数 calAvailableVolume:availableVolume = PreVolume - calSellVolume(Type, CumSellVol) - calFreezeVolume(VOLUME, BSFlag, Mark, Type) | high |
| 指标 AvailableVolumeRatio | 实现 | @state 函数 calAvailableVolumeRatio:availableVolumeRatio = calAvailableVolume(...) \ PreVolume,并 round 到 6 位(空值返回0) | high |
| 指标 Profit | 实现 | @state 函数 calProfit:结合昨持仓浮动盈亏、已卖出实现盈亏与新买入浮动盈亏;结果 round 到 6 位 | high |
| StreamGraph 数据目录与流图 | 名称 | 数据目录 positionMonitorDemo;流图名称 positionMonitor | high |
| MarketDataStream(委托成交流表) | 并行设置 | 使用 parallelize("AccountID", 2) 按 AccountID 哈希分区生成并行分支 | high |
| DStream 关联引擎(委托成交与持仓基础信息) | 连接列与时间列 | matchingColumn=`AccountID`SecurityID;rightTimeColumn=`Date;使用 DStream::lookupJoinEngine | high |
| DStream 关联引擎(与行情快照) | 连接列与时间列 | matchingColumn=`SecurityID;rightTimeColumn=`Date;通过 lookupJoinEngine 获取 LastPx | high |
| reactiveStateEngine 输出 | 键列与输出表 | keyColumn=`AccountID`SecurityID;keepOrder=true;sync() 后 sink("PositionMonitorStream") | high |
| 流图启动 | 必要操作 | 需要调用 StreamGraph::submit(positionMonitorGraph.submit())流图才能启动 | high |
| 静态持仓基础信息写入 | 方法 | 开盘前调用 appendOrcaStreamTable 批量插入静态数据到 PositionInfo | high |
| 模拟实时数据传入 | 方法与回放参数 | 使用 useOrcaStreamTable 与 replayDS/replay;timeRepartitionSchema=cutPoints(09:30:00.000..15:00:00.000, 50);replayRate=1, absoluteRate=false, preciseRate=true;通过 submitJob 提交任务 | high |
| 流图运维函数 | 函数列表 | getStreamGraphInfo、getOrcaStreamTableMeta、getOrcaStreamEngineMeta | high |
| 计算结果查询 | SQL | select * from positionMonitorDemo.orca_table.PositionMonitorStream | high |
| 计算时延定义 | 定义方式 | UpdateTime(响应计算完成时刻)与 ReceiveTime(记录到达系统时刻)的差值;统计平均值与 99 分位 | high |
| 高负载模拟流量 | 回放速度与规模 | 五倍速回放模拟高负载场景;委托成交流水高达 5 万笔每秒 | high |
| 测试数据量(委托成交流水) | 规模 | 每秒 1 万条;3,000 支标的;800 个账户(每个账户均有 2,300+ 标的);总 72,026,955 条记录(09:30-11:30 半个交易日) | high |
| 测试数据量(行情快照) | 描述 | 3 秒频 Level 2 股票行情快照 | high |
| 性能测试结果(并发8,1X) | 全链路总时延(平均/99分位,微秒) | 743 / 1622(TPS=1w) | high |
| 性能测试结果(并发16,1X) | 全链路总时延(平均/99分位,微秒) | 697 / 1391(TPS=1w) | high |
| 性能测试结果(并发8,5X) | 全链路总时延(平均/99分位,微秒) | 1045 / 8928(TPS=5w) | high |
| 性能测试结果(并发16,5X) | 全链路总时延(平均/99分位,微秒) | 877 / 4155(TPS=5w) | high |
| 并发度与时延关系(文中结论) | 描述 | 提升并发数可进一步降低计算时延;在 5w TPS 场景并发由 8 到 16,99 分位时延可降低一倍多 | medium |
| 测试环境 | 部署模式 | DolphinDB server 单节点模式 | high |
| 测试环境软件版本 | DolphinDB 版本 | 3.00.4 2025.09.09 LINUX_JIT x86_64 | high |
| 测试环境硬件/系统 | 配置 | CentOS Linux 7 (Core);内核 3.10.0-1160.el7.x86_64;CPU Intel Xeon Gold 5220R @ 2.20GHz;license 限制 8 核 256 GB;最大可用内存 256 GB | high |
| Orca 后续功能与优化 | 规划列表 | 低延时引擎(十微秒级延时)、流式 SQL 引擎、流高可用、即时编译、开发与调试增强 | medium |
| Orca 定位 | 目标 | 为金融机构构建内部统一平台技术基座,将反应速度从 T+1 提升至 T+0,甚至微秒级 | low |
| 附录资源 | PositionMonitor.dos 链接 | https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhc | high |
| 附录资源 | importData.dos 链接 | https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhc | high |
| 附录资源 | SampleData.zip 链接 | https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhc | high |
| 附录资源 | 数据面板配置文件链接 | https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhc | high |