DolphinDB流数据聚合引擎教程
该页面包含一篇教程文章的标题信息与发布日期等元信息。
Source: https://dolphindb.cn/blogs/46
What this page covers
- 流数据特点与聚合引擎的概述主题。
- 聚合引擎与订阅机制的应用框架与相关概念。
- 窗口长度、计算周期与时间对齐规则相关说明。
- 聚合表达式示例与限制(如嵌套限制)。
- 订阅前预处理/过滤无效数据的示例主题。
- 输出目标类型与二次聚合(多级聚合)的示例主题。
- createStreamAggregator 的语法、参数与优化函数列表相关内容。
技能认证特训营第二期报名促销
页面顶部包含报名活动提示与限时报名链接相关内容。
- 该部分为活动报名提示信息。
- 该部分包含与报名相关的链接入口。
DolphinDB流数据聚合引擎教程(作者与日期)
该部分展示文章标题、作者名与发布日期信息。
- 页面包含文章标题。
- 页面展示作者信息。
- 页面展示发布日期。
流数据与DolphinDB聚合引擎概述
该部分解释流数据特点及为何需要专门引擎,并介绍通过 createStreamAggregator 持续聚合并输出结果的能力。
- 该部分讨论流数据的特点与处理需求。
- 该部分说明需要专门的聚合引擎来处理流数据。
- 该部分提到可持续聚合流数据并持续输出结果。
聚合引擎应用框架
该部分说明聚合引擎与 subscribeTable 配合的使用方式,并列出流数据表、数据源、聚合表达式、窗口与计算周期等概念。
- 该部分描述 subscribeTable 与聚合引擎的配合方式。
- 该部分列出流数据表(streamTable)的概念。
- 该部分说明聚合引擎数据源为 createStreamAggregator 返回对象。
- 该部分描述聚合表达式(aggregators)的表达方式与示例形式。
- 该部分引入数据窗口(windowSize)与计算周期(step)的概念。
数据窗口(windowSize/step、时间对齐与示例)
该部分讲解窗口长度与计算间隔的定义、useSystemTime 的单位含义、对齐规则(alignmentSize)以及示例输出与窗口推导。
- windowSize 用于指定每次计算时截取的流数据窗口长度。
- step 用于指定触发计算的时间间隔。
- useSystemTime 决定 windowSize 与 step 的单位口径。
- 系统会根据 step 规整第一个数据窗口边界并确定 alignmentSize。
- 第一个窗口左边界规整公式为 firstDataTime/alignmentSize*alignmentSize(取整除法)。
聚合表达式(复杂表达式与限制)
该部分展示纵向/横向聚合、多结果输出、多参数函数与自定义函数的表达式示例,并说明不支持聚合函数嵌套。
- 该部分给出纵向聚合的表达式示例(如 sum(ofr))。
- 该部分给出横向聚合的表达式示例(如 max(ofr)-min(ofr))。
- 该部分展示一次输出多个聚合结果的表达式示例。
- 该部分展示多参数聚合函数调用示例(如 corr)。
- 该部分说明不支持聚合函数嵌套调用。
流数据源(订阅前预处理/清洗示例)
该部分说明 streamTable 作为输入数据源,并给出在 subscribeTable 中用自定义处理函数过滤无效数据的示例与结果。
- streamTable 是用于流数据发布的特定表对象。
- subscribeTable 可在数据进入聚合引擎前执行自定义预处理函数。
- 示例过滤条件包含 voltage<=0.02 或 electric==NULL。
- 订阅在每次新数据进入时触发指定规则以持续输入聚合引擎。
聚合引擎输出(输出到内存表或流表与二次聚合)
该部分说明输出可写入内存表或流数据表,并示例将聚合结果输出为流表后再订阅进行二次聚合计算。
- 聚合结果可输出到内存表或流数据表。
- 内存表输出可更新或删除,流数据表输出不可变更但可再次发布。
- 可将第一个聚合引擎输出为流数据表后进行二次聚合。
- 示例二次聚合包含对上一级结果求 max(avgElectric)。
createStreamAggregator 函数介绍及语法
该部分描述 createStreamAggregator 关联的输入源、聚合表达式与输出表,并给出语法、返回对象、参数说明与优化聚合函数列表。
- createStreamAggregator 语法包含 windowTime、rollingTime、aggregators、dummyTable、outputTable、timeColumn 等参数。
- createStreamAggregator 返回一个抽象表,用于将数据写入聚合引擎。
- useSystemTime 可用于选择时间驱动或数据驱动方式,默认值为 false。
- outputTable 的列结构包含时间列、可选分组列与聚合结果列。
- 部分聚合函数经过优化以减少重复计算。
示例:dummyTable 与分组聚合(keyColumn/garbageSize)
该部分通过 dummyTable 示例说明样本表作用,并通过分组聚合示例说明 keyColumn 分组、窗口触发与无数据窗口不输出。
- dummyTable 用于提供样本表对象,其表结构需与输入流表相同。
- 示例说明 dummyTable 是否包含数据不影响输出结果。
- keyColumn 可用于按分组列对输入流数据进行分组聚合。
- garbageSize 用于控制历史数据缓存清理阈值。
- 窗口内无数据时系统不会计算也不会产生结果。
总结
该部分概括 streamAggregator 与 streamTable 配合实现实时流数据计算,并列举支持的能力点。
- 该部分将 streamAggregator 描述为轻量且使用方便的流数据聚合引擎。
- 该部分提到支持纵向与横向的聚合计算形式。
- 该部分提到支持自定义函数与分组聚合。
- 该部分提到支持无效数据预清洗与多级计算。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB流数据聚合引擎教程 | 发布日期 | 2021-08-05 | high |
| DolphinDB | 提供的能力 | 提供灵活的面向流数据的聚合引擎,可通过 createStreamAggregator 创建并持续对流数据做聚合计算,将结果持续输出到指定数据表 | medium |
| createStreamAggregator | 创建对象类型/行为 | 返回一个抽象表;向该抽象表写入数据表示数据进入聚合引擎进行计算 | high |
| subscribeTable | 作用于流数据订阅 | 订阅流数据表;每次有新数据进入会触发指定规则(示例中触发 append!{tradesAggregator})将流数据持续输入聚合引擎 | high |
| streamTable(DolphinDB) | 对象/用途 | 为流数据提供的特定表对象,提供流数据发布功能;其他节点或APP可通过 subscribeTable 订阅/消费流数据 | high |
| 聚合引擎数据源 | 含义 | createStreamAggregator 返回的抽象表;向其写入数据意味着数据进入聚合引擎进行计算 | high |
| 聚合表达式(createStreamAggregator aggregators) | 支持形式 | 以元数据格式提供聚合函数集合,如 <[sum(qty)]>、<[sum(qty),max(qty),avg(price)]>;支持系统内所有聚合函数与表达式组合(如 <[avg(price1)-avg(price2)]>、<[std(price1-price2)]>) | high |
| 数据窗口(windowSize) | 定义 | 指定每次计算时截取的流数据窗口长度 | high |
| 计算周期(step) | 定义 | 指定进行计算的间隔/触发计算的时间间隔 | high |
| useSystemTime 参数 | 决定单位 | 决定 windowSize 与 step 的单位:useSystemTime=true 时以系统时间精度(毫秒)为单位;否则以数据生成时间精度(timeColumn 的精度)为单位 | high |
| 系统时间(聚合引擎) | 时间戳来源与精度 | 数据进入聚合引擎的时间由聚合引擎所在服务器系统时间给出,精度为毫秒 | high |
| 窗口起始时间对齐(alignmentSize) | 目的 | 系统会根据 step 对第一个数据窗口边界进行规整处理并确定 alignmentSize,以避免各组窗口边界不规整导致无法在相同窗口对比 | high |
| alignmentSize(秒精度类型) | 取值规则(step区间映射) | 当时间精度为秒(如 DATETIME、SECOND):step 0~2→2;3~5→5;6~10→10;11~15→15;16~20→20;21~30→30;31~60→60 | high |
| alignmentSize(毫秒精度类型) | 取值规则(step区间映射) | 当时间精度为毫秒(如 TIMESTAMP、TIME):step 0~2→2;3~5→5;6~10→10;11~20→20;21~25→25;26~50→50;51~100→100;101~200→200;201~250→250;251~500→500;501~1000→1000 | high |
| 第一个窗口左边界规整公式 | 计算方式 | 第一个数据窗口左边界最小精度规整后为 firstDataTime/alignmentSize*alignmentSize(/ 为取整除法) | high |
| 规整示例(firstDataTime=365, step=100) | 规整结果 | firstDataTime=365 且 step=100 时 alignmentSize=100,规整后最小精度为 300,因此窗口左边界为 2018.10.08T01:01:01.300 | high |
| 窗口计算规则(示例推导) | 窗口移动方式 | 窗口起始时间以第一条数据时间规整后为准;窗口大小为 windowSize,按 step 步长移动 | high |
| 聚合表达式示例(纵向聚合) | 示例代码 | createStreamAggregator(6, 3, <sum(ofr)>, trades, outputTable, `time) | high |
| 聚合表达式示例(横向聚合) | 示例代码 | createStreamAggregator(6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time);createStreamAggregator(6, 3, <max(ofr-bid)>, trades, outputTable, `time) | high |
| 聚合表达式示例(输出多个结果) | 示例代码 | createStreamAggregator(6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time) | high |
| 多参数聚合函数调用 | 示例 | corr(ofr,bid);percentile(ofr-bid,99)/sum(ofr) | high |
| 自定义函数用于聚合表达式 | 示例 | 定义 spread(x,y)=abs(x-y)/(x+y)*2,并在 createStreamAggregator 中使用 <spread(ofr, bid)> | high |
| DolphinDB 流数据引擎聚合函数 | 嵌套调用限制 | 不支持聚合函数嵌套调用;例如 sum(spread(ofr,bid)) 会报错提示 “Nested aggregated function is not allowed” | high |
| subscribeTable + 预处理 | 能力 | 在数据进入聚合引擎前可通过 subscribeTable 的自定义处理函数对流数据做初步清洗/过滤 | high |
| 过滤规则示例(传感器数据) | 无效数据条件 | 电压 voltage<=0.02 或 电流 electric==NULL 的数据在进入聚合引擎前需要过滤 | high |
| 聚合结果输出目标 | 支持类型 | 可输出到新建或已存在的内存表,也可输出到流数据表;内存表可更新/删除,流数据表输出不可变更但可再次发布 | high |
| 多级/二次聚合 | 实现方式 | 将第一个聚合引擎结果输出为流数据表 aggrOutput,再订阅该表并用第二个聚合引擎对结果进行二次聚合(示例求 max(avgElectric)) | high |
| createStreamAggregator 语法 | 函数签名 | createStreamAggregator(windowTime, rollingTime, aggregators, dummyTable, outputTable, timeColumn[,useSystemTime, keyColumn, garbageSize]) | high |
| useSystemTime 参数(驱动方式) | 含义与默认值 | true 表示时间驱动(到达预定时间点激活,毫秒精度并为每条数据添加毫秒时间戳);false 表示数据驱动(数据进入才激活,使用 timeColumn 作为窗口依据);可选参数默认 false | high |
| windowSize 参数 | 约束/边界包含规则 | 正整数;数据窗口只包含下边界不包含上边界 | high |
| aggregators 参数 | 支持范围 | 支持系统内所有聚合函数(如 <sum(qty)>、<sum(qty),max(qty),avg(price)>)以及对聚合结果使用表达式组合(如 <[avg(price1)-avg(price2)]>、<[std(price1-price2)]>) | high |
| 聚合函数性能优化 | 优化方式 | 对部分聚合函数进行优化,每次计算充分利用上一个窗口计算结果以降低重复计算 | medium |
| 经过优化的聚合函数列表 | 函数名 | corr, covar, first, last, max, med, min, percentile, std, sum, sum2, var, wavg, wsum | high |
| dummyTable 参数 | 要求 | 提供样本表对象;不需要有数据,但表结构必须与输入流数据表相同 | high |
| outputTable 参数(输出表结构) | 列结构规则 | 输出表第一列为时间类型存放计算时间点;若 keyColumn 不为空则第二列为分组列;从第三列起为聚合结果列 | high |
| keyColumn 参数 | 作用 | 可选参数;按 keyColumn 分组,对输入流数据进行分组聚合计算 | high |
| garbageSize 参数 | 作用 | 正整数;当内存缓存的历史数据记录条数超过 garbageSize 时系统清理缓存;分组计算时每组独立统计与清理 | high |
| dummyTable 示例结论 | 对结果影响 | dummyTable 作为样本表,其是否包含数据对输出结果没有影响(示例中仍输出结果) | high |
| 分组聚合示例 | 设置 | 输入表增加分组列 sym,createStreamAggregator 设置 keyColumn 为 `sym(示例:createStreamAggregator(3,3,<[sum(qty)]>,trades,outputTable,`time,false,`sym,50)) | high |
| 无数据窗口行为(分组示例说明) | 输出规则 | 窗口内若没有任何数据,系统不会计算也不会产生结果(示例中 B 组第一个窗口无结果) | high |
| streamAggregator(DolphinDB database) | 定位与能力概述 | 被描述为轻量、使用方便的流数据聚合引擎;支持纵向/横向/组合计算、自定义函数、分组聚合、无效数据预清洗、多级计算等 | low |