使用DolphinDB计算K线
本文概述了在 DolphinDB 中通过历史批量处理与流式实时处理两种方式计算 K 线(OHLC)的教程范围。
Source: https://dolphindb.cn/blogs/23
What this page covers
- 文章信息与教程概述(批量与流式两类计算)。
- 历史批量计算的常见场景与选项(交易时段、重叠窗口、交易量分段、MapReduce)。
- 历史K线计算中使用的函数概览(bar、dailyAlignedBar、wj)。
- bar 分组与示例:不指定起始时刻的K线窗口生成。
- dailyAlignedBar:指定起始时刻与多时段/隔夜时段示例。
- wj:重叠K线窗口的计算方式。
- 实时K线管道与数据源格式。
技能认证特训营第二期报名提示
页面顶部包含一个报名提示与注册链接。
- 提供一个“技能认证特训营第二期”的报名链接。
- 该提示位于页面开头位置。
使用DolphinDB计算K线(文章信息与概述)
该部分给出文章信息,并说明教程覆盖批量与流式两种K线计算方式。
- 文章发布日期为 2021-05-18。
- 教程范围包含批量处理计算K线。
- 教程范围包含流式处理计算K线。
- 文中对 DolphinDB 的能力作出概括性描述。
教程覆盖的两类场景:历史批量与流式实时
概述历史批量K线与流式实时K线两类场景,并列出相关选项与组件。
- 历史批量计算可指定K线窗口起始时间。
- 历史批量计算可处理包含隔夜的交易时段。
- 历史批量计算支持重叠的K线窗口。
- K线窗口可按交易量等维度划分。
- 读取超大数据并写入数据库时,可用内置 Map-Reduce 并行计算K线。
- 流式计算可通过 API 接收实时行情并用 TimeSeriesAggregator 实时计算K线。
历史数据K线计算:可用函数与用法概览
该部分引入历史K线计算,并列出用于K线计算的函数。
- 历史K线计算中列出的函数包括 bar。
- 历史K线计算中列出的函数包括 dailyAlignedBar。
- 历史K线计算中列出的函数包括 wj(window join)。
不指定起始时刻:bar 分组生成K线窗口(示例1)
解释 bar(X,Y) 的分组行为,并用模拟交易数据演示用 bar 生成窗口后计算5分钟OHLC,同时提示时间精度要求。
- bar(X,Y) 返回 X 减去 X 除以 Y 的余数。
- bar 通常用于对数据进行分组。
- 示例给出了 bar(date, 5) 的输入与对应输出时间对齐结果。
- 示例1使用模拟的成交数据表,并按 symbol 与 timestamp 排序。
- 示例1通过分组聚合计算 open/high/low/close 与 volume 的汇总。
- 示例提示 time 列精度为毫秒;否则需调整 barMinutes 的换算因子。
指定起始时刻:dailyAlignedBar(示例2-4,不同交易时段/隔夜)
说明 dailyAlignedBar 的用途与类型要求,并展示单一时段、两段时段与隔夜时段的对齐窗口生成方式。
- dailyAlignedBar 用于需要指定K线窗口起始时刻的场景。
- dailyAlignedBar 可处理多交易时段与隔夜时段。
- time 列必须包含日期信息,且类型为 DATETIME/TIMESTAMP/NANOTIMESTAMP。
- timeOffset 需为 SECOND/TIME/NANOTIME,且与 time 列类型对应。
- 示例2在单一交易时段下使用 dailyAlignedBar 生成 7 分钟K线窗口。
- 示例3给出两段交易时段(上午与下午)并用 sessionsStart 生成窗口。
- 示例4包含日盘与跨日夜盘,并用 sessionsStart 生成窗口。
重叠K线窗口:wj(示例5)
该部分通过生成时间窗口并使用 wj,在相对时间范围内进行聚合,从而计算重叠K线窗口。
- wj 可通过指定左表时间列的相对时间范围来计算重叠窗口。
- 示例5场景为每 5 分钟计算一次 30 分钟K线。
- 示例5通过按时间生成窗口并使用 cross join(cj)生成 barWindows。
- 示例5用 wj 对窗口内数据计算 OHLC 与成交量汇总。
用交易量划分K线窗口(示例6)
该部分展示除了时间外,还可用交易量等维度划分K线窗口,并以累计成交量阈值示例计算OHLC。
- K线窗口划分维度不局限于时间。
- 示例给出的替代维度是累计交易量。
- 示例6按成交量每增加 10000 生成一个K线条(bar)。
- 示例6先计算每个 symbol 的累计成交量,再按 bar(cumvol, 阈值) 分组计算OHLC。
使用 MapReduce(mr)加速历史K线计算并写入数据库
该部分介绍使用 mr 与 sqlDS 对分区数据并行读取与计算K线,并将结果写回 DFS 表,再进行 reduce 汇总。
- mr 可用于大规模历史数据的提取、计算与回写流程并行化。
- 示例数据来自 dfs://TAQ 数据库的 trades 表,精度为纳秒级。
- dfs://TAQ 的分区方式为按 Date 的 value 分区与按 Symbol 的 range 分区组合。
- 示例步骤包含登录并加载 dfs://TAQ 数据库表。
- 示例创建(或重建)按 Date 与 Symbol 分区的 OHLC 结果表。
- 示例定义 map 逻辑计算 5 分钟K线并将结果追加写入 OHLC 表。
- 示例使用 mr 进行 reduce,并对返回的行数进行求和。
实时K线计算概览与数据源格式
该部分概述实时K线处理管道,并说明模拟数据源 trades.csv 的字段格式。
- 实时管道包含订阅实时行情、写入流表、动态聚合输出到K线表。
- 聚合由 DolphinDB 的聚合引擎计算K线并输出。
- TimeSeriesAggregator 可按指定频率与移动窗口从实时数据计算K线。
- 模拟实时数据源文件为 trades.csv(提供链接)。
- trades.csv 包含 Symbol、Datetime、Price、Volume 四列。
- 输出的K线表可用于查询与展示。
实时步骤1:Python接收数据写入 DolphinDB 流表 Trade
该部分展示在 DolphinDB 中创建 Trade 流表,并用 Python 读取 CSV、转换时间类型后写入流表。
- 创建 Trade 流表的列为 Symbol、Datetime、Price、Volume。
- Trade 流表的类型为 SYMBOL、DATETIME、DOUBLE、INT。
- Python 连接参数示例包含 127.0.0.1:8848 与 admin/123456。
- 示例指出入库 Datetime 精度为秒,且需要做类型转换以适配插入。
实时步骤2:使用 createTimeSeriesAggregator 计算K线(两类触发方式)
该部分说明 TimeSeriesAggregator 的窗口参数配置、将 Trade 流表订阅到聚合引擎的方式,并讨论 updateTime 模式及其对输出表类型(keyedTable)的要求与原因。
- 非 updateTime 模式的输出表 OHLC 为 streamTable,包含 datetime、symbol、open、high、low、close、volume。
- 场景1配置为每 5 分钟计算过去 5 分钟(windowSize=300,step=300)。
- 场景2配置为每 1 分钟计算过去 5 分钟(windowSize=300,step=60)。
- 示例指标包含 first/max/min/last/sum(volume) 的组合用于 OHLC 与成交量。
- 通过 subscribeTable 将 Trade 数据送入聚合引擎的处理函数。
- updateTime 定义计算触发间隔;设置后会在窗口结束与周期性触发等情况下计算。
- 使用 updateTime 时,输出表要求使用 keyedTable。
- 若输出为 table/streamTable,可能产生同一时间的重复记录,影响结果含义。
- 若输出为 keyedStreamTable,同一 key 的更新会被拒绝,使 updateTime 不生效。
- 若输出为 keyedTable,同一时间 key 可被更新直到 step 边界产生新记录。
- 指定 keyColumn 时,keyedTable 主键需包含时间列与 keyColumn。
- updateTime 模式示例给出 keyedTable OHLC 的主键为 datetime 与 Symbol。
- updateTime 示例参数包含 windowSize=60、step=60、updateTime=1 与 useWindowStartTime=true。
- 约束条件:windowSize 必须是 step 的整数倍,step 必须是 updateTime 的整数倍。
实时步骤3:Python订阅输出表 OHLC 并展示;可视化选项
该部分给出 Python 订阅 OHLC 输出的示例,并提到可使用 Grafana 进行查询与可视化。
- 示例通过本地端口启用流并订阅 OHLC 表以打印接收数据。
- Grafana 可连接 DolphinDB 查询输出表并可视化为图表。
- 该部分包含对订阅配置的示例脚本。
Facts Index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| 使用DolphinDB计算K线 (blog post) | publication_date | 2021-05-18 | high |
| DolphinDB | capabilities_claimed | Provides a powerful in-memory compute engine with built-in time-series functions, distributed computing, and a streaming data processing engine for efficient K-line computation in many scenarios. | medium |
| Tutorial scope | covers | How DolphinDB computes K-line via batch processing and streaming processing. | high |
| 历史数据批量计算K线 | supports | Can specify K-line window start time. | high |
| 历史数据批量计算K线 | supports | A day can contain multiple trading sessions including overnight sessions. | high |
| 历史数据批量计算K线 | supports | Overlapping K-line windows. | high |
| 历史数据批量计算K线 | supports | Using trading volume as a dimension to partition K-line windows. | high |
| DolphinDB Map-Reduce function | use_case | When reading very large datasets and writing results to a database, built-in Map-Reduce can parallelize K-line computation. | high |
| 流式计算K线 | method | Receive market data in real time via API and compute K-line in real time using TimeSeriesAggregator. | high |
| DolphinDB built-in functions for historical K-line | functions_listed | bar, dailyAlignedBar, wj (window join). | high |
| bar(X,Y) | definition | Returns X minus the remainder of X divided by Y; generally used to group data. | high |
| bar(date, 5) example | input | date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m; bar(date, 5); | high |
| bar(date, 5) example | output | [09:30m,09:30m,09:45m,09:45m,09:55m,09:55m] | high |
| Example 1 (US stock market simulation) | data_characteristics | Simulates a trade table with columns symbol, date, time, timestamp, price, volume and sorts by symbol and timestamp; uses n=1000000 rows and symbols AAPL/FB/AMZN/MSFT. | high |
| 5-minute OHLC computation (Example 1) | query_pattern | select first(price) open, max(price) high, min(price) low, last(price) close, sum(volume) volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart (barMinutes=5). | high |
| Example 1 time column | precision_note | time column precision is milliseconds; if not milliseconds, adjust the numeric factor in barMinutes*60*1000 accordingly. | high |
| dailyAlignedBar | purpose | Used when a specific K-line window start time must be specified; can handle multiple daily sessions and overnight sessions. | high |
| dailyAlignedBar | time_column_type_requirement | Time column must include date information and be DATETIME, TIMESTAMP, or NANOTIMESTAMP. | high |
| dailyAlignedBar | timeOffset_type_requirement | timeOffset must be SECOND, TIME, or NANOTIME (date removed) corresponding to the time column type. | high |
| Example 2 (US market, single session) | kline_interval | 7-minute K-line using trade table from Example 1 with dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000). | high |
| Example 3 (China stock market, two sessions) | trading_sessions | Morning 9:30-11:30 and afternoon 13:00-15:00. | high |
| Example 3 (China market simulation) | data_characteristics | Simulates n=1000000 rows over dates 2019.11.07 and 2019.11.08 with symbols 600519/000001/600000/601766, timestamp/price/volume, sorted by symbol and timestamp. | high |
| Example 3 dailyAlignedBar usage | sessionsStart_parameter | sessionsStart=09:30:00.000 13:00:00.000; dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart (barMinutes=7). | high |
| Example 4 (futures with overnight session) | sessions | Day session 08:45-13:45; night session 15:00-05:00 (next day). | high |
| Example 4 dailyAlignedBar usage | sessionsStart_parameter | sessionsStart = [daySession[0], nightSession[0]]; dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart (barMinutes=7). | high |
| wj function | purpose | Used to compute overlapping K-line windows by specifying a relative time range on the left table's time column and computing over the right table. | high |
| Example 5 (overlapping windows) | scenario | China stock simulation; compute 30-minute K-line every 5 minutes. | high |
| Example 5 window generation | method | Generate windows by time and use cross join (cj) to combine symbols and trading windows into barWindows. | high |
| Example 5 OHLC calculation | wj_call | OHLC = wj(barWindows, trade, 0:(30*60*1000), <[first(price) open, max(price) high, min(price) low, last(price) close, sum(volume) volume]>, `symbol`time). | high |
| K-line window partition dimension | alternatives | Besides time, other dimensions can be used; example given is cumulative trading volume. | medium |
| Example 6 (volume bars) | scenario | China stock simulation; compute a bar each time volume increases by 10000 (volThreshold=10000). | high |
| Example 6 implementation | method | Nested query computes cumvol=cumsum(volume) per symbol, then groups by symbol and bar(cumvol, volThreshold) to compute OHLC and barStart time. | high |
| mr (Map-Reduce) function | benefit_claim | Parallel reading and computation for large historical data extraction + K-line computation + writing back to DB; can significantly improve speed. | medium |
| Example MapReduce dataset | data_source | US stock market trade data with nanosecond precision stored in dfs://TAQ database trades table. | high |
| dfs://TAQ database | partitioning | Composite partitioning: value partition by Date and range partition by Symbol. | high |
| Example MapReduce step (1) | login_and_load | Uses login(`admin, `123456); db = database("dfs://TAQ"); trades = db.loadTable("trades"). | high |
| Example MapReduce step (2) | create_output_table | Creates template model from trades and creates partitioned table OHLC in dfs://TAQ partitioned by Date and Symbol (dropping existing OHLC if present). | high |
| Example MapReduce step (3) | map_reduce_logic | Defines calcOHLC map: compute 5-minute bars from 09:30 to 15:59:59 grouped by Symbol, Date, and bar; append to dfs://TAQ OHLC; returns row count; uses ds=sqlDS(select ... Date between 2007.08.01 : 2019.08.01); mr(ds, calcOHLC, +) to sum counts. | high |
| Real-time K-line computation in DolphinDB | pipeline | Real-time market data is subscribed by a Python program, written via Python API into a DolphinDB streaming table; DolphinDB aggregation engine computes K-line dynamically and outputs to a K-line table for querying/display. | medium |
| TimeSeriesAggregator | capability | Can compute K-line from real-time data by specified frequency and moving windows. | high |
| Simulated real-time data source | file | trades.csv (linked). | high |
| trades.csv | columns | Contains 4 columns: Symbol, Datetime, Price, Volume. | high |
| DolphinDB stream table Trade | schema | share streamTable(100:0, `Symbol`Datetime`Price`Volume, [SYMBOL, DATETIME, DOUBLE, INT]) as Trade. | high |
| Python ingestion example | datetime_precision_and_conversion | Incoming Datetime precision is seconds; because pandas uses DateTime[64] (nanotimestamp), code converts Datetime before inserting into DolphinDB. | high |
| Python connection example | connect_params | s.connect("127.0.0.1", 8848, "admin", "123456"). | high |
| Real-time aggregation output table (non-updateTime mode) | schema | share streamTable(100:0, `datetime`symbol`open`high`low`close`volume, [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]) as OHLC. | high |
| createTimeSeriesAggregator scenario 1 | configuration | Every 5 minutes compute past 5 minutes: windowSize=300, step=300; metrics=[first(Price), max(Price), min(Price), last(Price), sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. | high |
| createTimeSeriesAggregator scenario 2 | configuration | Every 1 minute compute past 5 minutes: windowSize=300, step=60; metrics=[first(Price), max(Price), min(Price), last(Price), sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. | high |
| Streaming subscription to inject Trade into aggregator | script | subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true). | high |
| updateTime parameter (createTimeSeriesAggregator) | meaning | Defines computation interval; if not set, engine computes only at window end; if set, triggers compute at window end, every updateTime units, and when data has been in for >2*updateTime with uncomputed data (minimum 2 seconds). | high |
| TimeSeriesAggregator with updateTime | output_table_requirement | Requires using keyedTable as output table when updateTime is used. | high |
| Using table or streamTable as output with updateTime | issue | Allows duplicate time records, causing many rows with the same time when updateTime triggers before step; results become meaningless. | high |
| Using keyedStreamTable as output with updateTime | issue | Does not allow updates or duplicate keys; when updateTime triggers for the same time key, new record is rejected, making updateTime ineffective. | high |
| Using keyedTable as output with updateTime | behavior | Allows updates: if the time key exists, updates the existing record so the same time's result can be refreshed until step boundary adds a new time record. | high |
| keyedTable output with keyColumn set | primary_key_requirement | If keyColumn is specified, keyedTable must include both the time-related column and keyColumn as primary keys. | high |
| keyedTable OHLC schema (updateTime mode) | schema | share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume, [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]) as OHLC. | high |
| createTimeSeriesAggregator with updateTime example | configuration | windowSize=60, step=60, updateTime=1, useWindowStartTime=true; metrics=[first(Price),max(Price),min(Price),last(Price),sum(volume)]; dummyTable=Trade; outputTable=OHLC; timeColumn=`Datetime; keyColumn=`Symbol. | high |
| TimeSeriesAggregator parameter constraints | constraints | windowSize must be an integer multiple of step, and step must be an integer multiple of updateTime. | high |
| Streaming subscription (updateTime mode also shown) | script | subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true). | high |
| Python subscription for displaying OHLC | script_behavior | Enable streaming on local port 20001 and subscribe to OHLC stream table on 127.0.0.1:8848 with a handler that prints received data. | high |
| Grafana | integration_claim | Grafana can connect to DolphinDB to query the output table and visualize results as charts (link provided). | medium |
| 技能认证特训营第二期 | registration_link | https://www.qingsuyun.com/h5/e/217471/5/ | high |