DolphinDB实现EMQ X的数据接入

本页是一篇围绕“通过 EMQ X 将数据接入 DolphinDB”的文章页头信息,包含主题、作者与发布日期等要素。

Source: https://dolphindb.cn/blogs/79

What this page covers

技能认证特训营第二期正式开启(限时报名)

页面包含一个限时报名相关的宣传性行动号召,强调报名与权益信息。

DolphinDB实现EMQ X的数据接入

文章页头用于标识主题为“DolphinDB 实现 EMQ X 的数据接入”,并给出作者与发布日期等信息。

导语

导语介绍 DolphinDB 与 EMQ X 的定位与能力,并给出一个合作示例:通过 DolphinDB API 实现基于 TCP 的双向数据传输。

搭建与配置 DolphinDB

本部分按步骤说明 DolphinDB 侧为 EMQ X 集成所需的准备与配置,包括版本要求、安装、开启流发布订阅、启动服务、建库建表,以及将 StreamTable 数据持久化到 DFS 表的示例脚本。

配置规则引擎

本部分按步骤说明在 EMQ X 规则引擎中筛选 MQTT Topic,并通过“保存数据到 DolphinDB”动作将消息写入 DolphinDB;同时给出 SQL 模板、资源配置与写入结果验证示例。

声明(转载来源)

本页包含转载声明,说明文章转载自 EMQ,并提供原文文档链接来源。

Facts Index

Entity Attribute Value Confidence
DolphinDB实现EMQ X的数据接入 (article)publication_date2021-08-05high
DolphinDBdeveloper由浙江智臾科技有限公司研发high
DolphinDBpositioning高性能分布式时序数据库,融合分布式存储、分布式计算、流计算和编程语言high
DolphinDBsolution_type为客户提供轻量级、一站式的大数据解决方案medium
DolphinDBsuitable_for量化金融及工业物联网等领域high
DolphinDBapi_access_methods提供多种API接入方式medium
DolphinDBdata_ingestion_methods支持MQTT/OPC/KAFKA等多种数据注入方式high
杭州映云科技有限公司positioning面向5G和物联网市场的消息与流处理领域的领先企业medium
映云科技技术团队developed使用自有Erlang编程语言和DolphinDB开发的数据接入协议,自主开发了DolphinDB APImedium
映云科技开发的DolphinDB APItransport_protocol可通过TCP协议与DolphinDB进行双向数据传输high
EMQ X(EMQ)description完全开源、高度可伸缩、高可用的分布式MQTT消息服务器high
EMQ X(EMQ)protocol_support支持CoAP/LwM2M一站式IoT协议接入high
EMQpositioning5G时代万物互联的消息引擎low
EMQapplicable_to适用于IoT、M2M和移动应用程序high
EMQconcurrent_clients可处理千万级别的并发客户端medium
EMQ X -> DolphinDB backendminimum_dolphindb_version仅适配 DolphinDB 1.20.7 及以上版本high
DolphinDB installation (Linux example)download_source官网下载社区最新版本Linux64安装包:https://www.dolphindb.cn/downloads.htmlhigh
DolphinDB installation (example path)server_directory_upload_path将安装包的server目录上传至 /opts/app/dolphindbhigh
DolphinDBstreamtable_pubsub_requirement需要打开StreamTable发布/订阅功能并创建相关数据表以实现EMQ X消息存储并持久化high
dolphindb.cfg streaming configmaxPubConnections10high
dolphindb.cfg streaming configpersistenceDir/ddb/pubdata/high
dolphindb.cfg streaming configsubPort8000high
DolphinDB service startup command (example)commandnohup ./dolphindb -console 0 &high
DolphinDB service (after startup)listening_port_for_clients8848high
DolphinDB GUI clientdependency依赖Java环境(需确保已安装Java)high
DolphinDB GUI client (startup)command在GUI目录执行 sh gui.sh 启动客户端high
DolphinDB example databasedfs_database_namedfs://emqxhigh
DolphinDB example tabletable_namemsghigh
DolphinDB example table msgschema_columnsclientid, topic, qos, payloadhigh
DolphinDB example table msgschema_typesSTRING, STRING, INT, STRINGhigh
DolphinDB example table msgpartitioning按 clientid 和 topic 的 HASH 值进行分区(HASH, [STRING, 8] + COMPO)high
DolphinDB StreamTable examplestream_table_namest_msghigh
DolphinDB StreamTable st_msgstream_table_capacitystreamTable(10000:0, ...)high
DolphinDB streaming persistence examplepersistence_mechanismloadTable('dfs://emqx','msg') 后用 subscribeTable 将 st_msg 数据持久化到 msg 表(参数含 true)high
DolphinDB documentation linksresources_listed用户指南、IoT场景示例、流处理指南、编程手册(分别提供GitHub/Gitee/官网链接)high
EMQ X Dashboard for ruleslocal_url_examplehttp://127.0.0.1:18083/#/ruleshigh
EMQ X rule SQL (example)sqlSELECT * FROM "t/#"high
EMQ X rule actionaction_type保存数据到 DolphinDB(Save data to DolphinDB)high
EMQ X ‘保存数据到 DolphinDB’ actionrequired_parameters需要两个参数:SQL模板、关联资源IDhigh
EMQ X -> DolphinDB SQL template (example)sql_templateinsert into st_msg values('${clientid}', '${topic}', ${qos}, '${payload}')high
EMQ X SQL template inputcopy_paste_warning直接复制可能带换行符导致插入失败;需确认SQL模板行数为1high
EMQ X DolphinDB resource configuration (example)credentials用户名 admin,密码 123456high
EMQ X DolphinDB resource configuration (example)server_address填写对应上文部署的DolphinDB服务器地址(示例图含 192.168.1.172:8848)medium
Test MQTT message (example)topic_qos_payloadTopic: "t/a"; QoS: 1; Payload: "hello"high
Integration verificationresult检查持久化表后,新的数据添加成功(示例展示查询到一条消息记录)medium
Article reprint statementsource声明:此文章转载自EMQ,原文链接为 EMQ Docs(docs.emqx.cn enterprise 规则引擎 backend_dolphindb 页面)high