DolphinDB流计算引擎实现传感器数据异常检测
页面介绍如何使用 DolphinDB 的流数据表与流计算引擎进行实时传感器异常检测,并在逻辑复杂时使用自定义消息处理函数实现告警。
Source: https://dolphindb.cn/blogs/36
What this page covers
- 传感器异常检测的整体主题与方案概述。
- 监控场景定义与两类告警需求(温度与离线/无数据)。
- 流计算引擎选型:异常检测引擎与自定义消息处理函数的适用性。
- 实现步骤:输入/输出流表、异常检测引擎、离线告警处理函数与订阅配置。
- 模拟数据写入与通过查询告警表验证结果。
- 附录:教程测试代码链接。
技能认证特训营第二期正式开启(限时报名)
页面包含一个限时报名的技能认证特训营第二期相关宣传入口。
- 提供一个限时报名链接入口。
- 内容为促销/活动导向,而非教程主体。
DolphinDB流计算引擎实现传感器数据异常检测
文章以传感器异常检测为例,说明如何利用 DolphinDB 的流数据表与流计算引擎进行实时处理,并在标准引擎不足时使用自定义消息处理函数。
- 文章发布日期为 2021-05-20。
- DolphinDB 提供流数据表与流计算引擎用于实时数据处理。
- 页面讨论了用于复杂逻辑的自定义消息处理函数(message handler)。
应用需求
本节定义监控采集背景,并给出两条明确的告警规则与告警输出方式。
- 监控系统按“一秒一次”的频率采集数据。
- 需求1:3分钟内温度>40°C 超过2次且温度>30°C 超过3次则报警。
- 需求2:传感器网络断开或5分钟内无数据则报警。
- 告警输出定义为:检测到异常时向流数据表写入一条记录。
设计思路
本节介绍 DolphinDB 流计算框架中的引擎类型,并说明为何需求1适合异常检测引擎,而需求2需要自定义消息处理函数来处理“无新数据不触发计算”的情况。
- 流计算框架支持时序聚合引擎(Time-Series Aggregator)。
- 流计算框架支持横截面聚合引擎(Cross Sectional Aggregator)。
- 流计算框架支持异常检测引擎(Anomaly Detection Engine)。
- 适配结论:需求1适用异常检测引擎,需求2不适用。
- 限制:按设备分组处理时,没有新数据流入将无法触发计算。
详细实现步骤
本节给出实现流程:创建输入流表与告警输出表;用异常检测引擎实现温度告警;用 keyed 内存表与自定义 message handler 实现离线/无数据告警,并通过订阅参数进行性能处理。
- 示例输入流表 sensor 使用 streamTable 创建,列为 deviceID、ts、temperature。
- 输入流表可通过 enableTableShareAndPersistence 共享并持久化。
- 示例告警输出流表 warningTable 列为 time、deviceID、anomalyType、anomalyString。
- createAnomalyDetectionEngine 的输出表对列顺序与类型有要求。
- 温度告警指标表达式为:sum(temperature > 40) > 2 && sum(temperature > 30) > 3。
- 温度告警示例引擎使用 windowSize=180秒,step=30秒。
- 温度告警示例引擎使用 keyColumn=deviceID 与 timeColumn=ts。
- 温度告警订阅将消息通过 append!{engine} 送入异常检测引擎。
- 离线/无数据告警使用 keyedTable 记录每个设备最新采集时间。
- append! 更新 keyedTable:主键不存在则新增,重复则更新。
- checkNoData:当时间早于 now()-5分钟时生成告警并写入 warningTable。
- 离线告警可通过 throttle 与 batchSize 做批量处理与节流。
模拟写入与验证
本节提供模拟脚本生成传感器数据,并通过查询告警输出表 warningTable 验证告警结果,页面包含示例输出截图。
- 模拟场景假设共有3个传感器按每秒采集。
- 模拟场景中前1分钟所有设备有数据。
- 模拟场景中1分钟后第3个设备无数据。
- 示例写入脚本 writeData 分两个循环阶段写入数据。
- 脚本每次写入后 sleep(1000) 以模拟每秒写入。
- 示例使用 submitJob("simulateData", "simulate sensor data", writeData) 提交任务。
- 验证方式为运行后查询 warningTable 查看告警结果示例。
附录
本节提供教程相关测试代码链接。
- 测试代码链接指向 gitee 上的 alarm.txt。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB流计算引擎实现传感器数据异常检测(文章) | published_date | 2021-05-20 | high |
| DolphinDB | provides | 流数据表(stream table)和流计算引擎用于实时数据处理(包括传感器数据异常检测) | high |
| DolphinDB 内置异常检测引擎(Anomaly Detection Engine) | suitability_claim | 能满足大部分异常检测场景的需求 | low |
| 自定义消息处理函数(message handler) | use_case_claim | 当异常检测逻辑复杂且较为特殊、标准化异常检测引擎不能满足要求时,可用自定义消息处理函数实现 | medium |
| 监控系统(采集频率) | sampling_rate | 一秒钟采集一次数据 | high |
| 异常检测需求1(温度计数+窗口) | alarm_condition | 每3分钟内:温度>40°C出现2次以上且温度>30°C出现3次以上则报警 | high |
| 异常检测需求2(离线/断网) | alarm_condition | 传感器网络断开/5分钟内无数据则报警 | high |
| 报警输出(定义) | alarm_action | 侦测到异常时,向一个流数据表写入一条记录 | high |
| DolphinDB 流计算框架 | supports | 时序聚合引擎(Time-Series Aggregator) | high |
| DolphinDB 流计算框架 | supports | 横截面聚合引擎(Cross Sectional Aggregator) | high |
| DolphinDB 流计算框架 | supports | 异常检测引擎(Anomaly Detection Engine) | high |
| DolphinDB 流计算框架 | supports | 自定义流计算引擎(通过脚本或API自定义消息处理函数) | high |
| 时序聚合引擎(Time-Series Aggregator) | capability | 可对设备状态进行纵向(按时间序列)聚合计算,或将多个设备状态横向聚合后再按时间聚合;支持滑动窗口流式计算 | high |
| 时序聚合引擎(Time-Series Aggregator) | performance_claim | 单核CPU每秒可完成近百万状态的时序聚合 | medium |
| 横截面聚合引擎(Cross Sectional Aggregator) | capability | 快照引擎的扩展,可对设备状态进行横向聚合计算(例如计算一批设备的温度均值) | high |
| 异常检测引擎(Anomaly Detection Engine) | capability | 可实时检测数据是否符合用户自定义的警报指标;发现异常数据可输出到表中,用于物联网实时监控和预警 | high |
| 自定义流计算引擎 | trigger_condition | 当以上三种引擎都不能满足需求时可使用脚本或API自定义消息处理函数 | high |
| 异常检测引擎 vs 需求适配 | fit_assessment | 需求1适用异常检测引擎;需求2不适用 | high |
| 异常检测引擎(按设备分组处理) | limitation | 每次有新数据流入才触发计算,或按固定时间间隔在移动窗口内聚合;传感器若没有新数据无法触发计算 | high |
| 需求2解决方案(自定义message handler) | approach | 使用键值内存表记录每个传感器最新采集时间;消息以固定间隔进入处理函数,先更新表再检查是否超过5分钟无数据,若是则报警 | high |
| 输入流数据表 sensor(示例) | definition | 使用streamTable创建,包含列deviceID、ts、temperature,类型分别为INT、DATETIME、FLOAT | high |
| enableTableShareAndPersistence(对输入流表) | configuration | 将流数据表共享并持久化到硬盘;示例中cacheSize=1000000(内存最多保留100万行),compress=true,asynWrite=false | high |
| 报警输出流数据表 warningTable(示例) | schema | 列为time、deviceID、anomalyType、anomalyString;类型为DATETIME、INT、INT、SYMBOL | high |
| createAnomalyDetectionEngine 输出表格式要求(文中描述) | constraint | 输出表第一列必须为时间类型用于存放异常时间戳,且类型需与输入表时间列一致;若keyColumn不为空则第二列为keyColumn;之后两列分别为int与string/symbol用于记录异常类型(metrics下标)与异常内容 | high |
| 异常检测引擎(engine1)指标表达式 | metrics | sum(temperature > 40) > 2 && sum(temperature > 30) > 3 | high |
| 异常检测引擎(engine1) | keyColumn | deviceID | high |
| 异常检测引擎(engine1) | timeColumn | ts | high |
| 异常检测引擎(engine1) | windowSize | 180秒 | high |
| 异常检测引擎(engine1) | step | 30秒 | high |
| subscribeTable(用于温度异常检测) | handler | append!{engine}(将订阅消息送入异常检测引擎) | high |
| subscribeTable(用于温度异常检测) | msgAsTable | true | high |
| 离线报警实现(键值内存表) | data_structure | 采用keyedTable保存每个设备最新状态/采集时间,以deviceID为主键 | high |
| append!(用于键值表更新) | behavior | 若主键不存在则添加记录;若主键重复则更新对应主键记录 | high |
| 离线报警(anomalyType取值) | encoding | 因上节异常检测引擎已用0,此处离线报警的anomalyType设为1,anomalyString设为空 | high |
| subscribeTable(用于离线报警) | performance_tuning | 可通过throttle与batchSize实现批量处理提升性能;示例throttle=1(每秒处理一次),batchSize设为较大值(示例为1000000) | high |
| 示例传感器数量(离线报警示例代码) | deviceNum_assumption | 3 | high |
| checkNoData(示例自定义函数) | offline_rule | 当keyedTable中time < datetimeAdd(now().datetime(), -5, "m") 时生成报警记录并append到warningTable | high |
| 模拟数据场景 | scenario | 假设3个传感器每秒采集;前1分钟所有设备有数据;1分钟后第3个设备无数据 | high |
| 模拟脚本 writeData | behavior | 前0..60循环写入3个设备数据;随后0..600循环写入2个设备数据;每次写入后sleep(1000) | high |
| submitJob(模拟任务) | job_metadata | submitJob("simulateData", "simulate sensor data", writeData) | high |
| 验证方式 | verification_step | 运行后查询warningTable查看报警结果示例(页面包含结果截图) | medium |
| 附录链接 | resource | 测试代码:https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/alarm.txt | high |
| 限时报名链接(技能认证特训营第二期) | url | https://www.qingsuyun.com/h5/e/217471/5/ | high |