DolphinDB实现EMQ X的数据接入
本页是一篇围绕“通过 EMQ X 将数据接入 DolphinDB”的文章页头信息,包含主题、作者与发布日期等要素。
Source: https://dolphindb.cn/blogs/79
What this page covers
- DolphinDB 与 EMQ X 的背景介绍与定位说明。
- DolphinDB 侧的部署、配置与流数据持久化示例。
- EMQ X 规则引擎配置为“保存数据到 DolphinDB”的操作步骤与验证。
- 转载声明与原文来源链接。
技能认证特训营第二期正式开启(限时报名)
页面包含一个限时报名相关的宣传性行动号召,强调报名与权益信息。
- 该部分属于活动报名导向的行动号召内容。
- 该部分强调“限时”与“报名”相关信息。
DolphinDB实现EMQ X的数据接入
文章页头用于标识主题为“DolphinDB 实现 EMQ X 的数据接入”,并给出作者与发布日期等信息。
- 文章主题是“DolphinDB 实现 EMQ X 的数据接入”。
- 文章发布日期为 2021-08-05。
- 页头包含作者与日期等文章元信息。
导语
导语介绍 DolphinDB 与 EMQ X 的定位与能力,并给出一个合作示例:通过 DolphinDB API 实现基于 TCP 的双向数据传输。
- DolphinDB 被描述为高性能分布式时序数据库,融合存储、计算、流计算与编程语言。
- DolphinDB 被描述为面向客户提供轻量级、一站式的大数据解决方案。
- DolphinDB 适用于量化金融及工业物联网等领域。
- DolphinDB 支持 MQTT/OPC/KAFKA 等数据注入方式。
- 合作示例中提到自主开发了面向 DolphinDB 的 API。
- 该 API 可通过 TCP 与 DolphinDB 进行双向数据传输。
- EMQ X 被描述为完全开源、可伸缩且高可用的分布式 MQTT 消息服务器。
- EMQ X 支持 CoAP/LwM2M 一站式 IoT 协议接入。
搭建与配置 DolphinDB
本部分按步骤说明 DolphinDB 侧为 EMQ X 集成所需的准备与配置,包括版本要求、安装、开启流发布订阅、启动服务、建库建表,以及将 StreamTable 数据持久化到 DFS 表的示例脚本。
- EMQ X 到 DolphinDB 的后端适配要求为 DolphinDB 1.20.7 及以上。
- 示例中从官网下载 DolphinDB 社区版 Linux64 安装包。
- 示例中将安装包的 server 目录上传至 /opts/app/dolphindb。
- 需要开启 StreamTable 发布/订阅并创建相关表以实现 EMQ X 消息存储与持久化。
- 示例 streaming 配置包含 maxPubConnections=10。
- 示例 streaming 配置包含 persistenceDir=/ddb/pubdata/。
- 示例 streaming 配置包含 subPort=8000。
- 示例使用命令 nohup ./dolphindb -console 0 & 启动服务。
- 示例中服务启动后监听 8848 端口供客户端连接。
- DolphinDB GUI 客户端依赖 Java 环境。
- 示例在 GUI 目录执行 sh gui.sh 启动客户端。
- 示例创建 DFS 数据库名为 dfs://emqx。
- 示例持久化表名为 msg。
- 示例表字段包含 clientid、topic、qos、payload。
- 示例字段类型为 STRING、STRING、INT、STRING。
- 示例分区按 clientid 与 topic 的 HASH 值进行组合分区。
- 示例 StreamTable 名为 st_msg。
- 示例 StreamTable 容量参数包含 streamTable(10000:0, ...)。
- 示例通过 subscribeTable 将 st_msg 数据持久化到 msg 表(参数包含 true)。
- 本部分提供用户指南、IoT 场景示例、流处理指南与编程手册等文档链接。
配置规则引擎
本部分按步骤说明在 EMQ X 规则引擎中筛选 MQTT Topic,并通过“保存数据到 DolphinDB”动作将消息写入 DolphinDB;同时给出 SQL 模板、资源配置与写入结果验证示例。
- 规则配置示例入口为 http://127.0.0.1:18083/#/rules。
- 示例规则 SQL 为 SELECT * FROM "t/#"。
- 示例动作类型为“保存数据到 DolphinDB(Save data to DolphinDB)”。
- 该动作需要两个参数:SQL 模板与关联资源 ID。
- 示例 SQL 模板为 insert into st_msg values('${clientid}', '${topic}', ${qos}, '${payload}')。
- 提示:直接复制可能带换行符导致插入失败,需确认 SQL 模板为单行。
- 示例资源配置包含用户名 admin 与密码 123456。
- 资源配置需填写 DolphinDB 服务器地址(示例图包含 192.168.1.172:8848)。
- 示例测试消息为 Topic "t/a"、QoS 1、Payload "hello"。
- 验证示例显示:检查持久化表后,新数据添加成功(示例查询到一条记录)。
声明(转载来源)
本页包含转载声明,说明文章转载自 EMQ,并提供原文文档链接来源。
- 声明指出文章转载自 EMQ。
- 声明提供原文链接为 EMQ Docs(规则引擎 backend_dolphindb 页面)。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB实现EMQ X的数据接入 (article) | publication_date | 2021-08-05 | high |
| DolphinDB | developer | 由浙江智臾科技有限公司研发 | high |
| DolphinDB | positioning | 高性能分布式时序数据库,融合分布式存储、分布式计算、流计算和编程语言 | high |
| DolphinDB | solution_type | 为客户提供轻量级、一站式的大数据解决方案 | medium |
| DolphinDB | suitable_for | 量化金融及工业物联网等领域 | high |
| DolphinDB | api_access_methods | 提供多种API接入方式 | medium |
| DolphinDB | data_ingestion_methods | 支持MQTT/OPC/KAFKA等多种数据注入方式 | high |
| 杭州映云科技有限公司 | positioning | 面向5G和物联网市场的消息与流处理领域的领先企业 | medium |
| 映云科技技术团队 | developed | 使用自有Erlang编程语言和DolphinDB开发的数据接入协议,自主开发了DolphinDB API | medium |
| 映云科技开发的DolphinDB API | transport_protocol | 可通过TCP协议与DolphinDB进行双向数据传输 | high |
| EMQ X(EMQ) | description | 完全开源、高度可伸缩、高可用的分布式MQTT消息服务器 | high |
| EMQ X(EMQ) | protocol_support | 支持CoAP/LwM2M一站式IoT协议接入 | high |
| EMQ | positioning | 5G时代万物互联的消息引擎 | low |
| EMQ | applicable_to | 适用于IoT、M2M和移动应用程序 | high |
| EMQ | concurrent_clients | 可处理千万级别的并发客户端 | medium |
| EMQ X -> DolphinDB backend | minimum_dolphindb_version | 仅适配 DolphinDB 1.20.7 及以上版本 | high |
| DolphinDB installation (Linux example) | download_source | 官网下载社区最新版本Linux64安装包:https://www.dolphindb.cn/downloads.html | high |
| DolphinDB installation (example path) | server_directory_upload_path | 将安装包的server目录上传至 /opts/app/dolphindb | high |
| DolphinDB | streamtable_pubsub_requirement | 需要打开StreamTable发布/订阅功能并创建相关数据表以实现EMQ X消息存储并持久化 | high |
| dolphindb.cfg streaming config | maxPubConnections | 10 | high |
| dolphindb.cfg streaming config | persistenceDir | /ddb/pubdata/ | high |
| dolphindb.cfg streaming config | subPort | 8000 | high |
| DolphinDB service startup command (example) | command | nohup ./dolphindb -console 0 & | high |
| DolphinDB service (after startup) | listening_port_for_clients | 8848 | high |
| DolphinDB GUI client | dependency | 依赖Java环境(需确保已安装Java) | high |
| DolphinDB GUI client (startup) | command | 在GUI目录执行 sh gui.sh 启动客户端 | high |
| DolphinDB example database | dfs_database_name | dfs://emqx | high |
| DolphinDB example table | table_name | msg | high |
| DolphinDB example table msg | schema_columns | clientid, topic, qos, payload | high |
| DolphinDB example table msg | schema_types | STRING, STRING, INT, STRING | high |
| DolphinDB example table msg | partitioning | 按 clientid 和 topic 的 HASH 值进行分区(HASH, [STRING, 8] + COMPO) | high |
| DolphinDB StreamTable example | stream_table_name | st_msg | high |
| DolphinDB StreamTable st_msg | stream_table_capacity | streamTable(10000:0, ...) | high |
| DolphinDB streaming persistence example | persistence_mechanism | loadTable('dfs://emqx','msg') 后用 subscribeTable 将 st_msg 数据持久化到 msg 表(参数含 true) | high |
| DolphinDB documentation links | resources_listed | 用户指南、IoT场景示例、流处理指南、编程手册(分别提供GitHub/Gitee/官网链接) | high |
| EMQ X Dashboard for rules | local_url_example | http://127.0.0.1:18083/#/rules | high |
| EMQ X rule SQL (example) | sql | SELECT * FROM "t/#" | high |
| EMQ X rule action | action_type | 保存数据到 DolphinDB(Save data to DolphinDB) | high |
| EMQ X ‘保存数据到 DolphinDB’ action | required_parameters | 需要两个参数:SQL模板、关联资源ID | high |
| EMQ X -> DolphinDB SQL template (example) | sql_template | insert into st_msg values('${clientid}', '${topic}', ${qos}, '${payload}') | high |
| EMQ X SQL template input | copy_paste_warning | 直接复制可能带换行符导致插入失败;需确认SQL模板行数为1 | high |
| EMQ X DolphinDB resource configuration (example) | credentials | 用户名 admin,密码 123456 | high |
| EMQ X DolphinDB resource configuration (example) | server_address | 填写对应上文部署的DolphinDB服务器地址(示例图含 192.168.1.172:8848) | medium |
| Test MQTT message (example) | topic_qos_payload | Topic: "t/a"; QoS: 1; Payload: "hello" | high |
| Integration verification | result | 检查持久化表后,新的数据添加成功(示例展示查询到一条消息记录) | medium |
| Article reprint statement | source | 声明:此文章转载自EMQ,原文链接为 EMQ Docs(docs.emqx.cn enterprise 规则引擎 backend_dolphindb 页面) | high |