Orca 声明式 DStream API 应用:账户持仓损益实时监控

本页介绍使用 Orca 声明式 DStream API 进行双流关联,以实现账户持仓损益的实时监控,并提供相关说明与附录资源。

Source: https://dolphindb.cn/blogs/252

What this page covers

技能认证特训营第二期报名提示

页面顶部包含限时报名活动提示,并提供报名链接。

Orca 声明式 DStream API 应用:账户持仓损益实时监控(文章信息与引言)

本节给出文章基本信息与引言,介绍使用 Orca 声明式 DStream API 进行双流关联以实现持仓损益实时监控,并说明前置阅读与附录代码资源。

应用场景描述

本节阐述多资产实时损益监控的业务必要性与挑战,并说明 Orca 平台及其声明式 DStream API 的适用性与能力点。

实现方案

本节描述两阶段的数据处理方案、结果输出方式与 Dashboard 可视化展示,并配合流程图进行说明。

数据与指标说明(数据结构与指标规则)

本节说明数据来源与回放模拟方式,列出三类数据表结构,并给出多项监控指标的计算规则与示例代码实现方式。

持仓监控代码实现(构建流图与数据回放)

本节分步展示如何创建目录与流图、定义流数据表、进行两次 lookupJoin 关联、通过 reactiveStateEngine 计算指标并输出,以及提交流图并回放模拟数据写入。

结果展示(状态查看、结果查询与可视化)

本节展示查看流图状态的方式、计算结果查询方式,以及使用数据面板进行可视化展示的方法线索。

性能测试(数据量、结果、环境)

本节给出时延定义、不同并发与回放速度下的时延结果,并说明单节点测试环境的软件版本与硬件/系统配置。

总结与展望

本节总结 Orca 声明式 DStream API 在降低流计算开发难度与支撑低时延持仓监控方案方面的价值,并列出后续计划方向。

附录(示例代码/数据/面板配置链接)

本节提供示例代码、样例数据与数据面板配置文件的下载或引用链接。

Facts index

Entity Attribute Value Confidence
技能认证特训营第二期报名链接https://www.qingsuyun.com/h5/e/217471/5/high
Orca 声明式 DStream API 应用:账户持仓损益实时监控发布日期2025-11-17high
DolphinDB平台/产品企业级实时计算平台 Orcahigh
Orca 声明式 DStream API编程范式/接口特点以声明式编程范式为核心,允许通过描述“做什么”而非“如何做”定义流数据处理逻辑,从而降低开发门槛medium
Orca能力提供任务自动调度与计算高可用等能力,用于保证任务稳定与降低开发运维成本medium
本文实现目标监控对象账户层级下各标的持仓损益的实时监控high
数据处理方案阶段划分两个阶段:数据预处理(关联生成宽表)与持仓监控指标计算(响应式状态引擎计算十余个指标)high
计算结果输出方式方式可由外部 API 订阅或查询,或通过消息中间件插件等发送到外部;本文使用 DolphinDB 自带 Dashboard 可视化展示high
样例数据与导入代码获取位置在附录中提供下载high
样例数据(委托成交数据表)覆盖范围半个交易日的模拟数据;5 个账户、20 支标的的委托成交流水记录high
样例数据(持仓信息表)覆盖范围模拟当日开盘时 5 个账户所持的每支标的的持仓信息high
样例数据(行情快照数据)覆盖范围包括 20 支标的的 L2 快照数据high
流计算引擎自定义函数声明要求使用自定义函数计算指标时,需在定义前添加声明 @statehigh
持仓信息表字段结构列:AccountID(SYMBOL), SecurityID(SYMBOL), Date(DATE), SecurityName(STRING), Threshold(INT), OpenVolume(INT), PreVolume(INT), PreClose(DOUBLE)high
委托和成交数据表字段结构列包括 AccountID, Type(委托1/成交2), OrderNo, SecurityID, Date, Time, BSFlag, Price, Volume, TradeNo, State, Mark, NetVolume, CumSellVol, CumBuyVol, SellPrice, BuyPrice, ReceivedTime(NANOTIMESTAMP) 等high
行情快照数据表字段结构列:SecurityID(SYMBOL), Date(DATE), Time(TIME), LastPx(DOUBLE)high
指标 CanceledVolume定义撤单的委托数量;文中给出计算公式与 @state 函数 calCanceledVolumehigh
指标 PositionVolume定义实时持仓数量;文中给出计算公式与 @state 函数 calPositionVolumehigh
指标 ThresholdDeviation定义阈值偏离度;文中给出计算公式与 @state 函数 calThresholdDeviation(结果 round 到 6 位)high
指标 PositionDeviation定义持仓偏离度;文中给出计算公式与 @state 函数 calPositionDeviation(结果 round 到 6 位)high
指标 BuyVolume定义当日买入数量;@state 函数 calBuyVolume(空值返回0)high
指标 BuyPrice定义当日买入均价;文中给出计算说明与 @state 函数 calBuyPrice(round到6位)high
指标 SellVolume定义当日卖出数量;@state 函数 calSellVolume(空值返回0)high
指标 SellPrice定义当日卖出均价;文中给出计算说明与 @state 函数 calSellPrice(round到6位)high
指标 NetBuyVolume定义当日净买入数量;@state 函数 calNetBuyVolume(空值返回0)high
指标 FreezeVolume定义冻结持仓;文中给出计算说明与 @state 函数 calFreezeVolume(使用 cumsum 与撤单量/成交量逻辑)high
AvailableVolume 指标段落内容问题在首次出现“AvailableVolume:可用持仓”处,公式与代码实现重复为 FreezeVolume(calFreezeVolume),随后再次出现 AvailableVolume 并给出正确公式与实现high
指标 AvailableVolume实现@state 函数 calAvailableVolume:availableVolume = PreVolume - calSellVolume(Type, CumSellVol) - calFreezeVolume(VOLUME, BSFlag, Mark, Type)high
指标 AvailableVolumeRatio实现@state 函数 calAvailableVolumeRatio:availableVolumeRatio = calAvailableVolume(...) \ PreVolume,并 round 到 6 位(空值返回0)high
指标 Profit实现@state 函数 calProfit:结合昨持仓浮动盈亏、已卖出实现盈亏与新买入浮动盈亏;结果 round 到 6 位high
StreamGraph 数据目录与流图名称数据目录 positionMonitorDemo;流图名称 positionMonitorhigh
MarketDataStream(委托成交流表)并行设置使用 parallelize("AccountID", 2) 按 AccountID 哈希分区生成并行分支high
DStream 关联引擎(委托成交与持仓基础信息)连接列与时间列matchingColumn=`AccountID`SecurityID;rightTimeColumn=`Date;使用 DStream::lookupJoinEnginehigh
DStream 关联引擎(与行情快照)连接列与时间列matchingColumn=`SecurityID;rightTimeColumn=`Date;通过 lookupJoinEngine 获取 LastPxhigh
reactiveStateEngine 输出键列与输出表keyColumn=`AccountID`SecurityID;keepOrder=true;sync() 后 sink("PositionMonitorStream")high
流图启动必要操作需要调用 StreamGraph::submit(positionMonitorGraph.submit())流图才能启动high
静态持仓基础信息写入方法开盘前调用 appendOrcaStreamTable 批量插入静态数据到 PositionInfohigh
模拟实时数据传入方法与回放参数使用 useOrcaStreamTable 与 replayDS/replay;timeRepartitionSchema=cutPoints(09:30:00.000..15:00:00.000, 50);replayRate=1, absoluteRate=false, preciseRate=true;通过 submitJob 提交任务high
流图运维函数函数列表getStreamGraphInfo、getOrcaStreamTableMeta、getOrcaStreamEngineMetahigh
计算结果查询SQLselect * from positionMonitorDemo.orca_table.PositionMonitorStreamhigh
计算时延定义定义方式UpdateTime(响应计算完成时刻)与 ReceiveTime(记录到达系统时刻)的差值;统计平均值与 99 分位high
高负载模拟流量回放速度与规模五倍速回放模拟高负载场景;委托成交流水高达 5 万笔每秒high
测试数据量(委托成交流水)规模每秒 1 万条;3,000 支标的;800 个账户(每个账户均有 2,300+ 标的);总 72,026,955 条记录(09:30-11:30 半个交易日)high
测试数据量(行情快照)描述3 秒频 Level 2 股票行情快照high
性能测试结果(并发8,1X)全链路总时延(平均/99分位,微秒)743 / 1622(TPS=1w)high
性能测试结果(并发16,1X)全链路总时延(平均/99分位,微秒)697 / 1391(TPS=1w)high
性能测试结果(并发8,5X)全链路总时延(平均/99分位,微秒)1045 / 8928(TPS=5w)high
性能测试结果(并发16,5X)全链路总时延(平均/99分位,微秒)877 / 4155(TPS=5w)high
并发度与时延关系(文中结论)描述提升并发数可进一步降低计算时延;在 5w TPS 场景并发由 8 到 16,99 分位时延可降低一倍多medium
测试环境部署模式DolphinDB server 单节点模式high
测试环境软件版本DolphinDB 版本3.00.4 2025.09.09 LINUX_JIT x86_64high
测试环境硬件/系统配置CentOS Linux 7 (Core);内核 3.10.0-1160.el7.x86_64;CPU Intel Xeon Gold 5220R @ 2.20GHz;license 限制 8 核 256 GB;最大可用内存 256 GBhigh
Orca 后续功能与优化规划列表低延时引擎(十微秒级延时)、流式 SQL 引擎、流高可用、即时编译、开发与调试增强medium
Orca 定位目标为金融机构构建内部统一平台技术基座,将反应速度从 T+1 提升至 T+0,甚至微秒级low
附录资源PositionMonitor.dos 链接https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhchigh
附录资源importData.dos 链接https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhchigh
附录资源SampleData.zip 链接https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhchigh
附录资源数据面板配置文件链接https://docs.dolphindb.cn/zh/tutorials/orca_finance_position.html#topic_bvx_mmp_hhchigh