使用DolphinDB计算K线

本文概述了在 DolphinDB 中通过历史批量处理与流式实时处理两种方式计算 K 线(OHLC)的教程范围。

Source: https://dolphindb.cn/blogs/23

What this page covers

技能认证特训营第二期报名提示

页面顶部包含一个报名提示与注册链接。

使用DolphinDB计算K线(文章信息与概述)

该部分给出文章信息,并说明教程覆盖批量与流式两种K线计算方式。

教程覆盖的两类场景:历史批量与流式实时

概述历史批量K线与流式实时K线两类场景,并列出相关选项与组件。

历史数据K线计算:可用函数与用法概览

该部分引入历史K线计算,并列出用于K线计算的函数。

不指定起始时刻:bar 分组生成K线窗口(示例1)

解释 bar(X,Y) 的分组行为,并用模拟交易数据演示用 bar 生成窗口后计算5分钟OHLC,同时提示时间精度要求。

指定起始时刻:dailyAlignedBar(示例2-4,不同交易时段/隔夜)

说明 dailyAlignedBar 的用途与类型要求,并展示单一时段、两段时段与隔夜时段的对齐窗口生成方式。

重叠K线窗口:wj(示例5)

该部分通过生成时间窗口并使用 wj,在相对时间范围内进行聚合,从而计算重叠K线窗口。

用交易量划分K线窗口(示例6)

该部分展示除了时间外,还可用交易量等维度划分K线窗口,并以累计成交量阈值示例计算OHLC。

使用 MapReduce(mr)加速历史K线计算并写入数据库

该部分介绍使用 mr 与 sqlDS 对分区数据并行读取与计算K线,并将结果写回 DFS 表,再进行 reduce 汇总。

实时K线计算概览与数据源格式

该部分概述实时K线处理管道,并说明模拟数据源 trades.csv 的字段格式。

实时步骤1:Python接收数据写入 DolphinDB 流表 Trade

该部分展示在 DolphinDB 中创建 Trade 流表,并用 Python 读取 CSV、转换时间类型后写入流表。

实时步骤2:使用 createTimeSeriesAggregator 计算K线(两类触发方式)

该部分说明 TimeSeriesAggregator 的窗口参数配置、将 Trade 流表订阅到聚合引擎的方式,并讨论 updateTime 模式及其对输出表类型(keyedTable)的要求与原因。

实时步骤3:Python订阅输出表 OHLC 并展示;可视化选项

该部分给出 Python 订阅 OHLC 输出的示例,并提到可使用 Grafana 进行查询与可视化。

Facts Index

Entity Attribute Value Confidence
使用DolphinDB计算K线 (blog post) publication_date 2021-05-18 high
DolphinDB capabilities_claimed Provides a powerful in-memory compute engine with built-in time-series functions, distributed computing, and a streaming data processing engine for efficient K-line computation in many scenarios. medium
Tutorial scope covers How DolphinDB computes K-line via batch processing and streaming processing. high
历史数据批量计算K线 supports Can specify K-line window start time. high
历史数据批量计算K线 supports A day can contain multiple trading sessions including overnight sessions. high
历史数据批量计算K线 supports Overlapping K-line windows. high
历史数据批量计算K线 supports Using trading volume as a dimension to partition K-line windows. high
DolphinDB Map-Reduce function use_case When reading very large datasets and writing results to a database, built-in Map-Reduce can parallelize K-line computation. high
流式计算K线 method Receive market data in real time via API and compute K-line in real time using TimeSeriesAggregator. high
DolphinDB built-in functions for historical K-line functions_listed bar, dailyAlignedBar, wj (window join). high
bar(X,Y) definition Returns X minus the remainder of X divided by Y; generally used to group data. high
bar(date, 5) example input date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m; bar(date, 5); high
bar(date, 5) example output [09:30m,09:30m,09:45m,09:45m,09:55m,09:55m] high
Example 1 (US stock market simulation) data_characteristics Simulates a trade table with columns symbol, date, time, timestamp, price, volume and sorts by symbol and timestamp; uses n=1000000 rows and symbols AAPL/FB/AMZN/MSFT. high
5-minute OHLC computation (Example 1) query_pattern select first(price) open, max(price) high, min(price) low, last(price) close, sum(volume) volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart (barMinutes=5). high
Example 1 time column precision_note time column precision is milliseconds; if not milliseconds, adjust the numeric factor in barMinutes*60*1000 accordingly. high
dailyAlignedBar purpose Used when a specific K-line window start time must be specified; can handle multiple daily sessions and overnight sessions. high
dailyAlignedBar time_column_type_requirement Time column must include date information and be DATETIME, TIMESTAMP, or NANOTIMESTAMP. high
dailyAlignedBar timeOffset_type_requirement timeOffset must be SECOND, TIME, or NANOTIME (date removed) corresponding to the time column type. high
Example 2 (US market, single session) kline_interval 7-minute K-line using trade table from Example 1 with dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000). high
Example 3 (China stock market, two sessions) trading_sessions Morning 9:30-11:30 and afternoon 13:00-15:00. high
Example 3 (China market simulation) data_characteristics Simulates n=1000000 rows over dates 2019.11.07 and 2019.11.08 with symbols 600519/000001/600000/601766, timestamp/price/volume, sorted by symbol and timestamp. high
Example 3 dailyAlignedBar usage sessionsStart_parameter sessionsStart=09:30:00.000 13:00:00.000; dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart (barMinutes=7). high
Example 4 (futures with overnight session) sessions Day session 08:45-13:45; night session 15:00-05:00 (next day). high
Example 4 dailyAlignedBar usage sessionsStart_parameter sessionsStart = [daySession[0], nightSession[0]]; dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart (barMinutes=7). high
wj function purpose Used to compute overlapping K-line windows by specifying a relative time range on the left table's time column and computing over the right table. high
Example 5 (overlapping windows) scenario China stock simulation; compute 30-minute K-line every 5 minutes. high
Example 5 window generation method Generate windows by time and use cross join (cj) to combine symbols and trading windows into barWindows. high
Example 5 OHLC calculation wj_call OHLC = wj(barWindows, trade, 0:(30*60*1000), <[first(price) open, max(price) high, min(price) low, last(price) close, sum(volume) volume]>, `symbol`time). high
K-line window partition dimension alternatives Besides time, other dimensions can be used; example given is cumulative trading volume. medium
Example 6 (volume bars) scenario China stock simulation; compute a bar each time volume increases by 10000 (volThreshold=10000). high
Example 6 implementation method Nested query computes cumvol=cumsum(volume) per symbol, then groups by symbol and bar(cumvol, volThreshold) to compute OHLC and barStart time. high
mr (Map-Reduce) function benefit_claim Parallel reading and computation for large historical data extraction + K-line computation + writing back to DB; can significantly improve speed. medium
Example MapReduce dataset data_source US stock market trade data with nanosecond precision stored in dfs://TAQ database trades table. high
dfs://TAQ database partitioning Composite partitioning: value partition by Date and range partition by Symbol. high
Example MapReduce step (1) login_and_load Uses login(`admin, `123456); db = database("dfs://TAQ"); trades = db.loadTable("trades"). high
Example MapReduce step (2) create_output_table Creates template model from trades and creates partitioned table OHLC in dfs://TAQ partitioned by Date and Symbol (dropping existing OHLC if present). high
Example MapReduce step (3) map_reduce_logic Defines calcOHLC map: compute 5-minute bars from 09:30 to 15:59:59 grouped by Symbol, Date, and bar; append to dfs://TAQ OHLC; returns row count; uses ds=sqlDS(select ... Date between 2007.08.01 : 2019.08.01); mr(ds, calcOHLC, +) to sum counts. high
Real-time K-line computation in DolphinDB pipeline Real-time market data is subscribed by a Python program, written via Python API into a DolphinDB streaming table; DolphinDB aggregation engine computes K-line dynamically and outputs to a K-line table for querying/display. medium
TimeSeriesAggregator capability Can compute K-line from real-time data by specified frequency and moving windows. high
Simulated real-time data source file trades.csv (linked). high
trades.csv columns Contains 4 columns: Symbol, Datetime, Price, Volume. high
DolphinDB stream table Trade schema share streamTable(100:0, `Symbol`Datetime`Price`Volume, [SYMBOL, DATETIME, DOUBLE, INT]) as Trade. high
Python ingestion example datetime_precision_and_conversion Incoming Datetime precision is seconds; because pandas uses DateTime[64] (nanotimestamp), code converts Datetime before inserting into DolphinDB. high
Python connection example connect_params s.connect("127.0.0.1", 8848, "admin", "123456"). high
Real-time aggregation output table (non-updateTime mode) schema share streamTable(100:0, `datetime`symbol`open`high`low`close`volume, [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]) as OHLC. high
createTimeSeriesAggregator scenario 1 configuration Every 5 minutes compute past 5 minutes: windowSize=300, step=300; metrics=[first(Price), max(Price), min(Price), last(Price), sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. high
createTimeSeriesAggregator scenario 2 configuration Every 1 minute compute past 5 minutes: windowSize=300, step=60; metrics=[first(Price), max(Price), min(Price), last(Price), sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. high
Streaming subscription to inject Trade into aggregator script subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true). high
updateTime parameter (createTimeSeriesAggregator) meaning Defines computation interval; if not set, engine computes only at window end; if set, triggers compute at window end, every updateTime units, and when data has been in for >2*updateTime with uncomputed data (minimum 2 seconds). high
TimeSeriesAggregator with updateTime output_table_requirement Requires using keyedTable as output table when updateTime is used. high
Using table or streamTable as output with updateTime issue Allows duplicate time records, causing many rows with the same time when updateTime triggers before step; results become meaningless. high
Using keyedStreamTable as output with updateTime issue Does not allow updates or duplicate keys; when updateTime triggers for the same time key, new record is rejected, making updateTime ineffective. high
Using keyedTable as output with updateTime behavior Allows updates: if the time key exists, updates the existing record so the same time's result can be refreshed until step boundary adds a new time record. high
keyedTable output with keyColumn set primary_key_requirement If keyColumn is specified, keyedTable must include both the time-related column and keyColumn as primary keys. high
keyedTable OHLC schema (updateTime mode) schema share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume, [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]) as OHLC. high
createTimeSeriesAggregator with updateTime example configuration windowSize=60, step=60, updateTime=1, useWindowStartTime=true; metrics=[first(Price),max(Price),min(Price),last(Price),sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. high
TimeSeriesAggregator parameter constraints constraints windowSize must be an integer multiple of step, and step must be an integer multiple of updateTime. high
Streaming subscription (updateTime mode also shown) script subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true). high
Python subscription for displaying OHLC script_behavior Enable streaming on local port 20001 and subscribe to OHLC stream table on 127.0.0.1:8848 with a handler that prints received data. high
Grafana integration_claim Grafana can connect to DolphinDB to query the output table and visualize results as charts (link provided). medium
技能认证特训营第二期 registration_link https://www.qingsuyun.com/h5/e/217471/5/ high