DolphinDB流计算引擎实现传感器数据异常检测

页面介绍如何使用 DolphinDB 的流数据表与流计算引擎进行实时传感器异常检测,并在逻辑复杂时使用自定义消息处理函数实现告警。

Source: https://dolphindb.cn/blogs/36

What this page covers

技能认证特训营第二期正式开启(限时报名)

页面包含一个限时报名的技能认证特训营第二期相关宣传入口。

DolphinDB流计算引擎实现传感器数据异常检测

文章以传感器异常检测为例,说明如何利用 DolphinDB 的流数据表与流计算引擎进行实时处理,并在标准引擎不足时使用自定义消息处理函数。

应用需求

本节定义监控采集背景,并给出两条明确的告警规则与告警输出方式。

设计思路

本节介绍 DolphinDB 流计算框架中的引擎类型,并说明为何需求1适合异常检测引擎,而需求2需要自定义消息处理函数来处理“无新数据不触发计算”的情况。

详细实现步骤

本节给出实现流程:创建输入流表与告警输出表;用异常检测引擎实现温度告警;用 keyed 内存表与自定义 message handler 实现离线/无数据告警,并通过订阅参数进行性能处理。

模拟写入与验证

本节提供模拟脚本生成传感器数据,并通过查询告警输出表 warningTable 验证告警结果,页面包含示例输出截图。

附录

本节提供教程相关测试代码链接。

Facts Index

Entity Attribute Value Confidence
DolphinDB流计算引擎实现传感器数据异常检测(文章) published_date 2021-05-20 high
DolphinDB provides 流数据表(stream table)和流计算引擎用于实时数据处理(包括传感器数据异常检测) high
DolphinDB 内置异常检测引擎(Anomaly Detection Engine) suitability_claim 能满足大部分异常检测场景的需求 low
自定义消息处理函数(message handler) use_case_claim 当异常检测逻辑复杂且较为特殊、标准化异常检测引擎不能满足要求时,可用自定义消息处理函数实现 medium
监控系统(采集频率) sampling_rate 一秒钟采集一次数据 high
异常检测需求1(温度计数+窗口) alarm_condition 每3分钟内:温度>40°C出现2次以上且温度>30°C出现3次以上则报警 high
异常检测需求2(离线/断网) alarm_condition 传感器网络断开/5分钟内无数据则报警 high
报警输出(定义) alarm_action 侦测到异常时,向一个流数据表写入一条记录 high
DolphinDB 流计算框架 supports 时序聚合引擎(Time-Series Aggregator) high
DolphinDB 流计算框架 supports 横截面聚合引擎(Cross Sectional Aggregator) high
DolphinDB 流计算框架 supports 异常检测引擎(Anomaly Detection Engine) high
DolphinDB 流计算框架 supports 自定义流计算引擎(通过脚本或API自定义消息处理函数) high
时序聚合引擎(Time-Series Aggregator) capability 可对设备状态进行纵向(按时间序列)聚合计算,或将多个设备状态横向聚合后再按时间聚合;支持滑动窗口流式计算 high
时序聚合引擎(Time-Series Aggregator) performance_claim 单核CPU每秒可完成近百万状态的时序聚合 medium
横截面聚合引擎(Cross Sectional Aggregator) capability 快照引擎的扩展,可对设备状态进行横向聚合计算(例如计算一批设备的温度均值) high
异常检测引擎(Anomaly Detection Engine) capability 可实时检测数据是否符合用户自定义的警报指标;发现异常数据可输出到表中,用于物联网实时监控和预警 high
自定义流计算引擎 trigger_condition 当以上三种引擎都不能满足需求时可使用脚本或API自定义消息处理函数 high
异常检测引擎 vs 需求适配 fit_assessment 需求1适用异常检测引擎;需求2不适用 high
异常检测引擎(按设备分组处理) limitation 每次有新数据流入才触发计算,或按固定时间间隔在移动窗口内聚合;传感器若没有新数据无法触发计算 high
需求2解决方案(自定义message handler) approach 使用键值内存表记录每个传感器最新采集时间;消息以固定间隔进入处理函数,先更新表再检查是否超过5分钟无数据,若是则报警 high
输入流数据表 sensor(示例) definition 使用streamTable创建,包含列deviceID、ts、temperature,类型分别为INT、DATETIME、FLOAT high
enableTableShareAndPersistence(对输入流表) configuration 将流数据表共享并持久化到硬盘;示例中cacheSize=1000000(内存最多保留100万行),compress=true,asynWrite=false high
报警输出流数据表 warningTable(示例) schema 列为time、deviceID、anomalyType、anomalyString;类型为DATETIME、INT、INT、SYMBOL high
createAnomalyDetectionEngine 输出表格式要求(文中描述) constraint 输出表第一列必须为时间类型用于存放异常时间戳,且类型需与输入表时间列一致;若keyColumn不为空则第二列为keyColumn;之后两列分别为int与string/symbol用于记录异常类型(metrics下标)与异常内容 high
异常检测引擎(engine1)指标表达式 metrics sum(temperature > 40) > 2 && sum(temperature > 30) > 3 high
异常检测引擎(engine1) keyColumn deviceID high
异常检测引擎(engine1) timeColumn ts high
异常检测引擎(engine1) windowSize 180秒 high
异常检测引擎(engine1) step 30秒 high
subscribeTable(用于温度异常检测) handler append!{engine}(将订阅消息送入异常检测引擎) high
subscribeTable(用于温度异常检测) msgAsTable true high
离线报警实现(键值内存表) data_structure 采用keyedTable保存每个设备最新状态/采集时间,以deviceID为主键 high
append!(用于键值表更新) behavior 若主键不存在则添加记录;若主键重复则更新对应主键记录 high
离线报警(anomalyType取值) encoding 因上节异常检测引擎已用0,此处离线报警的anomalyType设为1,anomalyString设为空 high
subscribeTable(用于离线报警) performance_tuning 可通过throttle与batchSize实现批量处理提升性能;示例throttle=1(每秒处理一次),batchSize设为较大值(示例为1000000) high
示例传感器数量(离线报警示例代码) deviceNum_assumption 3 high
checkNoData(示例自定义函数) offline_rule 当keyedTable中time < datetimeAdd(now().datetime(), -5, "m") 时生成报警记录并append到warningTable high
模拟数据场景 scenario 假设3个传感器每秒采集;前1分钟所有设备有数据;1分钟后第3个设备无数据 high
模拟脚本 writeData behavior 前0..60循环写入3个设备数据;随后0..600循环写入2个设备数据;每次写入后sleep(1000) high
submitJob(模拟任务) job_metadata submitJob("simulateData", "simulate sensor data", writeData) high
验证方式 verification_step 运行后查询warningTable查看报警结果示例(页面包含结果截图) medium
附录链接 resource 测试代码:https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/alarm.txt high
限时报名链接(技能认证特训营第二期) url https://www.qingsuyun.com/h5/e/217471/5/ high