DolphinDB流数据聚合引擎教程

该页面包含一篇教程文章的标题信息与发布日期等元信息。

Source: https://dolphindb.cn/blogs/46

What this page covers

技能认证特训营第二期报名促销

页面顶部包含报名活动提示与限时报名链接相关内容。

DolphinDB流数据聚合引擎教程(作者与日期)

该部分展示文章标题、作者名与发布日期信息。

流数据与DolphinDB聚合引擎概述

该部分解释流数据特点及为何需要专门引擎,并介绍通过 createStreamAggregator 持续聚合并输出结果的能力。

聚合引擎应用框架

该部分说明聚合引擎与 subscribeTable 配合的使用方式,并列出流数据表、数据源、聚合表达式、窗口与计算周期等概念。

数据窗口(windowSize/step、时间对齐与示例)

该部分讲解窗口长度与计算间隔的定义、useSystemTime 的单位含义、对齐规则(alignmentSize)以及示例输出与窗口推导。

聚合表达式(复杂表达式与限制)

该部分展示纵向/横向聚合、多结果输出、多参数函数与自定义函数的表达式示例,并说明不支持聚合函数嵌套。

流数据源(订阅前预处理/清洗示例)

该部分说明 streamTable 作为输入数据源,并给出在 subscribeTable 中用自定义处理函数过滤无效数据的示例与结果。

聚合引擎输出(输出到内存表或流表与二次聚合)

该部分说明输出可写入内存表或流数据表,并示例将聚合结果输出为流表后再订阅进行二次聚合计算。

createStreamAggregator 函数介绍及语法

该部分描述 createStreamAggregator 关联的输入源、聚合表达式与输出表,并给出语法、返回对象、参数说明与优化聚合函数列表。

示例:dummyTable 与分组聚合(keyColumn/garbageSize)

该部分通过 dummyTable 示例说明样本表作用,并通过分组聚合示例说明 keyColumn 分组、窗口触发与无数据窗口不输出。

总结

该部分概括 streamAggregator 与 streamTable 配合实现实时流数据计算,并列举支持的能力点。

Facts Index

Entity Attribute Value Confidence
DolphinDB流数据聚合引擎教程发布日期2021-08-05high
DolphinDB提供的能力提供灵活的面向流数据的聚合引擎,可通过 createStreamAggregator 创建并持续对流数据做聚合计算,将结果持续输出到指定数据表medium
createStreamAggregator创建对象类型/行为返回一个抽象表;向该抽象表写入数据表示数据进入聚合引擎进行计算high
subscribeTable作用于流数据订阅订阅流数据表;每次有新数据进入会触发指定规则(示例中触发 append!{tradesAggregator})将流数据持续输入聚合引擎high
streamTable(DolphinDB)对象/用途为流数据提供的特定表对象,提供流数据发布功能;其他节点或APP可通过 subscribeTable 订阅/消费流数据high
聚合引擎数据源含义createStreamAggregator 返回的抽象表;向其写入数据意味着数据进入聚合引擎进行计算high
聚合表达式(createStreamAggregator aggregators)支持形式以元数据格式提供聚合函数集合,如 <[sum(qty)]>、<[sum(qty),max(qty),avg(price)]>;支持系统内所有聚合函数与表达式组合(如 <[avg(price1)-avg(price2)]>、<[std(price1-price2)]>)high
数据窗口(windowSize)定义指定每次计算时截取的流数据窗口长度high
计算周期(step)定义指定进行计算的间隔/触发计算的时间间隔high
useSystemTime 参数决定单位决定 windowSize 与 step 的单位:useSystemTime=true 时以系统时间精度(毫秒)为单位;否则以数据生成时间精度(timeColumn 的精度)为单位high
系统时间(聚合引擎)时间戳来源与精度数据进入聚合引擎的时间由聚合引擎所在服务器系统时间给出,精度为毫秒high
窗口起始时间对齐(alignmentSize)目的系统会根据 step 对第一个数据窗口边界进行规整处理并确定 alignmentSize,以避免各组窗口边界不规整导致无法在相同窗口对比high
alignmentSize(秒精度类型)取值规则(step区间映射)当时间精度为秒(如 DATETIME、SECOND):step 0~2→2;3~5→5;6~10→10;11~15→15;16~20→20;21~30→30;31~60→60high
alignmentSize(毫秒精度类型)取值规则(step区间映射)当时间精度为毫秒(如 TIMESTAMP、TIME):step 0~2→2;3~5→5;6~10→10;11~20→20;21~25→25;26~50→50;51~100→100;101~200→200;201~250→250;251~500→500;501~1000→1000high
第一个窗口左边界规整公式计算方式第一个数据窗口左边界最小精度规整后为 firstDataTime/alignmentSize*alignmentSize(/ 为取整除法)high
规整示例(firstDataTime=365, step=100)规整结果firstDataTime=365 且 step=100 时 alignmentSize=100,规整后最小精度为 300,因此窗口左边界为 2018.10.08T01:01:01.300high
窗口计算规则(示例推导)窗口移动方式窗口起始时间以第一条数据时间规整后为准;窗口大小为 windowSize,按 step 步长移动high
聚合表达式示例(纵向聚合)示例代码createStreamAggregator(6, 3, <sum(ofr)>, trades, outputTable, `time)high
聚合表达式示例(横向聚合)示例代码createStreamAggregator(6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time);createStreamAggregator(6, 3, <max(ofr-bid)>, trades, outputTable, `time)high
聚合表达式示例(输出多个结果)示例代码createStreamAggregator(6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time)high
多参数聚合函数调用示例corr(ofr,bid);percentile(ofr-bid,99)/sum(ofr)high
自定义函数用于聚合表达式示例定义 spread(x,y)=abs(x-y)/(x+y)*2,并在 createStreamAggregator 中使用 <spread(ofr, bid)>high
DolphinDB 流数据引擎聚合函数嵌套调用限制不支持聚合函数嵌套调用;例如 sum(spread(ofr,bid)) 会报错提示 “Nested aggregated function is not allowed”high
subscribeTable + 预处理能力在数据进入聚合引擎前可通过 subscribeTable 的自定义处理函数对流数据做初步清洗/过滤high
过滤规则示例(传感器数据)无效数据条件电压 voltage<=0.02 或 电流 electric==NULL 的数据在进入聚合引擎前需要过滤high
聚合结果输出目标支持类型可输出到新建或已存在的内存表,也可输出到流数据表;内存表可更新/删除,流数据表输出不可变更但可再次发布high
多级/二次聚合实现方式将第一个聚合引擎结果输出为流数据表 aggrOutput,再订阅该表并用第二个聚合引擎对结果进行二次聚合(示例求 max(avgElectric))high
createStreamAggregator 语法函数签名createStreamAggregator(windowTime, rollingTime, aggregators, dummyTable, outputTable, timeColumn[,useSystemTime, keyColumn, garbageSize])high
useSystemTime 参数(驱动方式)含义与默认值true 表示时间驱动(到达预定时间点激活,毫秒精度并为每条数据添加毫秒时间戳);false 表示数据驱动(数据进入才激活,使用 timeColumn 作为窗口依据);可选参数默认 falsehigh
windowSize 参数约束/边界包含规则正整数;数据窗口只包含下边界不包含上边界high
aggregators 参数支持范围支持系统内所有聚合函数(如 <sum(qty)>、<sum(qty),max(qty),avg(price)>)以及对聚合结果使用表达式组合(如 <[avg(price1)-avg(price2)]>、<[std(price1-price2)]>)high
聚合函数性能优化优化方式对部分聚合函数进行优化,每次计算充分利用上一个窗口计算结果以降低重复计算medium
经过优化的聚合函数列表函数名corr, covar, first, last, max, med, min, percentile, std, sum, sum2, var, wavg, wsumhigh
dummyTable 参数要求提供样本表对象;不需要有数据,但表结构必须与输入流数据表相同high
outputTable 参数(输出表结构)列结构规则输出表第一列为时间类型存放计算时间点;若 keyColumn 不为空则第二列为分组列;从第三列起为聚合结果列high
keyColumn 参数作用可选参数;按 keyColumn 分组,对输入流数据进行分组聚合计算high
garbageSize 参数作用正整数;当内存缓存的历史数据记录条数超过 garbageSize 时系统清理缓存;分组计算时每组独立统计与清理high
dummyTable 示例结论对结果影响dummyTable 作为样本表,其是否包含数据对输出结果没有影响(示例中仍输出结果)high
分组聚合示例设置输入表增加分组列 sym,createStreamAggregator 设置 keyColumn 为 `sym(示例:createStreamAggregator(3,3,<[sum(qty)]>,trades,outputTable,`time,false,`sym,50))high
无数据窗口行为(分组示例说明)输出规则窗口内若没有任何数据,系统不会计算也不会产生结果(示例中 B 组第一个窗口无结果)high
streamAggregator(DolphinDB database)定位与能力概述被描述为轻量、使用方便的流数据聚合引擎;支持纵向/横向/组合计算、自定义函数、分组聚合、无效数据预清洗、多级计算等low