新闻

如何用 DolphinDB + Kafka 实时计算K线

2022.11.17

DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka 订阅数据,并在 DolphinDB 中消费。本文将为大家展示如何通过该插件实时计算k线。DolphinDB Kafka 插件的完整实践指南已发布在官方知乎,可前往知乎查看。

DolphinDB Kafka 插件

Kafka 是一个高吞吐量的分布式消息中间件,可用于海量消息的发布和订阅。

当面对大量的数据写入时,以消息中间件接收数据,然后再批量写入到时序数据库中,这样可以将消息中间件的高并发能力时序数据库的高吞吐量联合起来,更好地解决海量数据的实时处理和存储问题

DolphinDB Kafka 插件目前支持以下数据类型的序列化和反序列化:

  • DolphinDB 标量
  • Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer
  • 以上数据类型所组成的向量

用户可以在 DolphinDB 中实例化 Producer 对象,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也可以在 DolphinDB 中实例化 Consumer 对象,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。

案例:如何实时计算K线

本节将展示如何使用“DolphinDB+Kafka”实时计算K线。

1. 环境准备

  • 部署 DolphinDB 集群,版本为v2.00.7。
  • 部署 Kafka 集群,版本为 2.13-3.1.0。

2. 生产数据

配置好相关环境后,我们首先通过 DolphinDB 的 replay 历史数据回放工具和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。主要包含三个步骤:

  • Kafka 创建 Topic
./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092Created topic topic-message.
  • 加载 Kafka 插件并创建 Kafka Producer
// 加载插件path = "/DolphinDB/server/plugins/kafka"loadPlugin(path + "/PluginKafka.txt")loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");// 定义创建 Kafka Producer 的函数def initKafkaProducerFunc(metadataBrokerList){  producerCfg = dict(STRING, ANY)  producerCfg["metadata.broker.list"] = metadataBrokerList  return kafka::producer(producerCfg)}// 创建 Kafka Producer 并返回句柄producer = initKafkaProducerFunc("192.193.168.5:8992")
  • 推送数据到 Kafka Topic
// 定义推送数据到 KafKa "topic-message" Topic 的函数def sendMsgToKafkaFunc(dataType, producer, msg){  startTime = now()  try {    for(i in msg){      kafka::produce(producer, "topic-message", 1, i, true)     }        cost = now() - startTime    writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")  }   catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}}// 创建 DolphinDB 流数据表 tickStreamcolName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndexcolType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]share(streamTable(35000000:0, colName, colType), `tickStream)// 订阅 tickStream,处理函数是 sendMsgToKafkaFuncsubscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)getHomeDir()// 控速回放 DolphinDB 分布式中的历史数据至 tickStreamdbName = "dfs://SH_TSDB_tick"tbName = "tick"replayDay =  2021.12.08testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityIDsubmitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)

可以使用 kafka-console-consumer 命令行工具消费 Topic 为 topic-message 中的数据,验证数据是否成功写入:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message

3. 消费数据

接下来为大家展示如何在 DolphinDB 中订阅消费 Kafka Topic 中的数据,以将其实时同步到流数据表中。

  • 创建消费者,订阅主题数据
// 创建 Kafka Consumer 并返回句柄consumerCfg = dict(STRING, ANY)consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"consumerCfg["group.id"] = "topic-message"consumer = kafka::consumer(consumerCfg)// 订阅 Kafka 中的 "topic-message" 主题的数据topics = ["topic-message"]kafka::subscribe(consumer, topics);
  • DolphinDB 订阅 Kafka 消息队列中的数据
// 订阅 Kafka 发布消息,写入流表 tickStream_kafkacolName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndexcolType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]share(streamTable(35000000:0, colName, colType), `tickStreamkafka)go// Kafka 消息解析函数def parse(mutable dictVar, mutable tickStreamkafka){  try{    t = dictVar    t.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))    tickStreamkafka.append!(t);  }catch(ex){    print("kafka errors : " + ex)  }}colType[1] = STRING;decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)// 创建 subjob 函数conn =  kafka::createSubJob(consumer, , decoder, "topic-message")

4. 流计算引擎实时计算K线

我们使用 DolphinDB 内置流计算引擎计算分钟 K 线,并将结果输出到名为 OHLCVwap的结果表中。DolphinDB GUI 中执行以下脚本:

// 创建接收实时计算结果的流数据表colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`VwapcolType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]share(streamTable(2000000:0, colName, colType), `OHLCStream)// K 线指标计算元表达式aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>// 创建引擎并将 kafka 中订阅的数据注入流计算引擎createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)
  • 设置参数 offset 为 - 1,订阅将会从提交订阅时流数据表的当前行开始。
  • 设置 useSystemTime=true,表示时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。

本文展示了如何用 DolphinDB Kafka Plugin 中常用的接口函数实时计算K线,更多函数支持可参考官网文档,完整案例教程和脚本可前往知乎查看。使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈。