DolphinDB 用户入门指南之金融篇(5)
6. 基于 SQL 的批计算
本章将讲解如何使用 SQL 进行常见的数据处理,包括查询、聚合、时序分析、行列转换、关联查询、更新删除等操作,并基于 DolphinDB 数据表,从金融场景涉及的通用表操作出发,结合案例进行演示。 注: 本章所有的例子都基于 3.3 节创建的 "dfs://stock_lv2_snapshot" 库下的快照表 snapshot。
6.1 库表加载
对于已经创建的库表,DolphinDB 可以借助 database 函数进行加载,参考脚本如下:
dbName = "dfs://stock_lv2_snapshot"
snapdb = database(dbName)
schema(snapdb)如果是在对应数据目录(cataLog)下创建的库表,则可以用 “use catalog xxx” 的方式加载数据目录,例如:
use catalog trading
select count(*) from stock_lv2_snapshot.snapshot详情请参考 数据目录 。
同样,分布式表的加载也可以借助 loadTable 函数进行,参考脚本如下:
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
snaptb = loadTable(dbName, tbName)
select count(*) from snaptb
// 或者 select count(*) from loadTable(dbName, tbName)除了借助函数,还可以通过 “数据库句柄”.”数据库表名” 访问分布式表:
dbName = "dfs://stock_lv2_snapshot"
snapdb = database(dbName)
select count(*) from snapdb.snapshot注: loadTable 加载的只是分布式表的句柄,即元数据信息。只有对句柄进行查询时,才会真正的将数据加载到内存。
6.2 分区剪枝
DolphinDB 在 SQL 查询、更新和删除数据时,支持根据 where 条件对查询进行优化。最常用的优化之一就是如果过滤条件包含分区字段,则查询、更新和删除时,系统能够支持按照分区信息进行剪枝处理,减少扫描的分区数。
注: 为了更好地说明该场景,这里利用 [HINT_EXPLAIN] 关键字打印执行计划,观察是否进行剪枝。
例 1. 查询 2022.12.01 ~ 2022.12.03 日期范围内的数据
触发分区剪枝的场景:使用 between 语句、非链式的过滤条件
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where TradeDate between 2022.12.01 and 2022.12.03
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where TradeDate >= 2022.12.01 and TradeDate <= 2022.12.03
// output:
// 可以看到查询遍历的分区数为 90
"map": {\n' +
' "partitions": {\n' +
' "local": 90,\n' +
' "remote": 0\n' +
' },\n' + "sql": ...
' }无法触发分区剪枝的场景:链式条件过滤
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where 2022.12.01 <= TradeDate <= 2022.12.03
// output:
// 可以看到查询遍历的分区数为 10891
"map": {\n' +
' "partitions": {\n' +
' "local": 10891,\n' +
' "remote": 0\n' +
' }例 2. 查询某只、某几只股票的数据
使用等值查询,in 查询都可以触发分区剪枝。
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where SecurityID = "000001"
// output:
// 可以看到查询遍历的分区数为 243
"map": {\n' +
' "partitions": {\n' +
' "local": 243,\n' +
' "remote": 0\n' +
' }
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where SecurityID in ["000001", "000002"]
// output:
// 可以看到查询遍历的分区数为 485
"map": {\n' +
' "partitions": {\n' +
' "local": 485,\n' +
' "remote": 0\n' +
' }例 3. 查询股票 ”000001“ 在 2022 年 2 月的快照数据。
对于时间类型的分区列应用时间精度更低的函数,也可以进行分区剪枝,缩小分区范围。
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where month(TradeDate) = 2022.02M and SecurityID = "000001"
// output:
// 可以看到查询遍历的分区数为 16
"map": {\n' +
' "partitions": {\n' +
' "local": 16,\n' +
' "remote": 0\n' +
' }6.3 单点查询
与 OLAP 引擎不同,TSDB 引擎额外维护了一个键值索引(通过配置参数 sortColumns 指定),因此除了分区剪枝外,在过滤条件中指定键值字段,也可以加速查询。索引的组织形式详见 TSDB 存储引擎详解 3.3.1 节对索引键的说明。
可以通过 schema 函数查看 sortColumns 配置的值:
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
loadTable(dbName, tbName).schema().sortColumns
// output: ["SecurityID","TradeTime"]该sortColumns 属性显示索引键是 securityID(除最后一个时间列外的其他字段构成索引键),且每个索引对应的数据按照 TradeTime 顺序存储在磁盘上。SecurityID 字段既是分区列又是索引键,如果查询条件包含 securityID 不仅可以进行分区剪枝,还可以根据内存中维护的索引信息快速定位到数据块。
以查询某天某只股票的数据为例,通过 [HINT_EXPLAIN] 获取 SQL 执行计划,观察是否命中索引:
select [HINT_EXPLAIN] * from loadTable(dbName, tbName) where TradeDate = 2022.12.01 and SecurityID = "000002"
// output:
// 打印信息的 where 部分包含查询涉及扫描的数据块个数和命中的索引条件数
"where": {\n' +
' "TSDBIndexPrefiltering": {\n' +
' "blocksToBeScanned": 62,\n' +
' "matchedWhereConditions": 1\n' +
' }6.4 聚合计算
在金融场景下,聚合指标是策略不可或缺的一部分,有助于帮助交易员、分析员快速识别市场趋势,做出基于数据的策略调整。以股票数据为例,常用的聚合方式有以下两种:
- 按照交易时间聚合
- 按股票标的聚合
下面综合上述场景,结合具体例子来进行说明。
例1:基于历史数据计算 5 分钟 K 线高开低收(OHLC)和总量。
// 模拟数据
n = 1000000
date = take(2019.11.07 2019.11.08, n)
time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort!()
timestamp = concatDateTime(date, time)
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`AAPL`FB`AMZN`MSFT, n)
trade = table(symbol, date, time, timestamp, price, volume).sortBy!(`symbol`timestamp)
// 计算 OHLC 和 Volume
barMinutes = 5
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart这里用到了四个函数 max, first, min, low 分别用于计算窗口内数据的高开低收聚合值,sum 用于计算总量。
K 线窗口借助 group by 分组语句进行划分:脚本 group by symbol, date, bar(time, barMinutes*60*1000) as barStart 表示按照股票代码、日期、5分钟时间窗口划分,其中 bar 函数用于按照时间将每 5 分钟的数据划分为一组。
例 1 只是计算 K 线最精简的一个例子,在实际场景中可能还会涉及非常复杂的数据处理过程,本节将在例 2 进行具体的讲解。
例2:基于历史快照数据合成分钟 K 线。如果某只股票在某一分钟无数据,则填充为 0。
注: 本案例的示例代码基于教程 基于快照行情的股票和基金 K 线合成 进行改写,此处仅展示关键代码,以方便新手快速理解。
Step1. 变量函数定义
- 自定义最高价最低价计算函数
// 自定义计算最高最低点的聚合函数
// defg 用于声明聚合函数
defg high(DeltasHighPrice, HighPrice, LastPrice){
if(sum(DeltasHighPrice)>0.000001){
return max(HighPrice)
}
else{
return max(LastPrice)
}
}
defg low(DeltasLowPrice, LowPrice, LastPrice){
sumDeltas = sum(DeltasLowPrice)
if(sumDeltas<-0.000001 and sumDeltas!=NULL){
return min(iif(LowPrice==0.0, NULL, LowPrice))
}
else{
return min(LastPrice)
}
}Step2. 数据预处理
- 把09:25:00-09:30:00的数据归入第一根 K 线:[09:30:00, 09:31:00]
- 计算同一个股票或者基金的两笔相邻快照的最高价和最低价的变化幅度
- 计算同一个股票或者基金的两笔相邻快照的成交量、成交金额和成交笔数的增量
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
tempTB1 = select TradeDate,
iif(TradeTime <=09:30:00.000, 09:30:00.000, TradeTime) as TradeTime,
SecurityID,
OpenPrice,
PreCloPrice,
HighPrice,
LowPrice,
LastPrice,
PreCloseIOPV,
IOPV,
UpLimitPx,
DownLimitPx,
iif(deltas(HighPrice)>0.000001, 1, 0) as DeltasHighPrice,
iif(abs(deltas(LowPrice))>0.000001, -1, 0) as DeltasLowPrice,
iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade)) as DeltasVolume,
iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade)) as DeltasTurnover,
iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades)) as DeltasTradesCount
from loadTable(dbName, tbName)
where TradeTime >=09:25:00.000
context by SecurityIDStep3. K 线计算
针对处理后的快照行情进行窗口为1分钟、步长为1分钟的滚动窗口计算。
re = select firstNot(LastPrice, 0) as OpenPrice,
high(DeltasHighPrice, HighPrice, LastPrice) as HighPrice,
low(DeltasLowPrice, LowPrice, LastPrice) as LowPrice,
last(LastPrice) as ClosePrice,
sum(DeltasVolume) as Volume,
sum(DeltasTurnover) as Turnover,
sum(DeltasTradesCount) as TradesCount,
last(PreCloPrice) as PreClosePrice,
last(PreCloseIOPV) as PreCloseIOPV,
last(IOPV) as IOPV,
last(UpLimitPx) as UpLimitPx,
last(DownLimitPx) as DownLimitPx,
lastNot(LastPrice, 0)\firstNot(LastPrice, 0)-1 as FirstBarChangeRate
from tempTB1
group by SecurityID, TradeDate, interval(X=TradeTime, duration=60s, label='left', fill=0) as TradeTime该部分使用了一个关键的 SQL 语句 group by 进行分组聚合计算,group by 只能够搭配聚合指标使用,上述代码中的 firstNot, high, low, last, sum, lastNot 等等都是聚合函数。
group by 指定的分组指标是 SecurityID, TradeDate, interval(X=TradeTime, duration=60s, label='left', fill=0),即股票字段、日期、分钟时间。interval 函数必须与 group by 语句搭配使用,通常用于时间精度的降维场景,此处 interval 函数将毫秒精度的时间字段,按照分钟窗口进行划分,缺失值填充为 0。
部分计算结果示意如下:
SecurityID TradeDate TradeTime OpenPrice HighPrice LowPrice ClosePrice Volume Turnover TradesCount ...
---------- ---------- ------------ --------- --------- -------- ---------- ------ --------------------- ----------- ---
000002 2022.01.04 09:30:00.000 1.6 1.779 1.003 1.3 100 10022.200000000000727 94 ...
000002 2022.01.04 09:31:00.000 1.5 1.756 1.003 1.4 0 0 0 ...
000002 2022.01.04 09:32:00.000 1.6 1.777 1.011 1.5 0 0 0 ...
000002 2022.01.04 09:33:00.000 1.5 1.791 1.01 1.5 0 0 0 ...
000002 2022.01.04 09:34:00.000 1.4 1.791 1.042 1.5 0 1.5 -39 ...
000002 2022.01.04 09:35:00.000 1.5 1.794 1.003 1.5 0 0 0 ...
...注: 由于快照表 snapshot 的数据量很大,如果直接执行上述语句,社区版的 server 执行时可能会报错 Out of memory。一个替代方案是按分区串行计算然后合并计算结果。
先将上述 K 线的计算逻辑封装在一个函数内。
def calcKLine(snapshot){
tempTB1 = select TradeDate,
iif(TradeTime <=09:30:00.000, 09:30:00.000, TradeTime) as TradeTime,
SecurityID,
OpenPrice,
PreCloPrice,
HighPrice,
LowPrice,
LastPrice,
PreCloseIOPV,
IOPV,
UpLimitPx,
DownLimitPx,
iif(deltas(HighPrice)>0.000001, 1, 0) as DeltasHighPrice,
iif(abs(deltas(LowPrice))>0.000001, -1, 0) as DeltasLowPrice,
iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade)) as DeltasVolume,
iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade)) as DeltasTurnover,
iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades)) as DeltasTradesCount
from snapshot
where TradeTime >= 09:25:00.000
context by SecurityID
re = select firstNot(LastPrice, 0) as OpenPrice,
high(DeltasHighPrice, HighPrice, LastPrice) as HighPrice,
low(DeltasLowPrice, LowPrice, LastPrice) as LowPrice,
last(LastPrice) as ClosePrice,
sum(DeltasVolume) as Volume,
sum(DeltasTurnover) as Turnover,
sum(DeltasTradesCount) as TradesCount,
last(PreCloPrice) as PreClosePrice,
last(PreCloseIOPV) as PreCloseIOPV,
last(IOPV) as IOPV,
last(UpLimitPx) as UpLimitPx,
last(DownLimitPx) as DownLimitPx,
lastNot(LastPrice, 0)\firstNot(LastPrice, 0)-1 as FirstBarChangeRate
from tempTB1
group by SecurityID, TradeDate, interval(X=TradeTime, duration=60s, label='left', fill=0) as TradeTime
return re
}然后利用 mr 函数指定 parallel 参数为 false 进行分区串行计算。此处选取了一个月的数据进行演示,其中 sqlDS 函数可以根据指定的 SQL 查询筛选数据,并按分区生成数据源列表。
beginDate = 2022.01.01
endDate = 2022.01.31
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
ds = sqlDS(<select * from loadTable(dbName, tbName) where TradeDate between beginDate and endDate>)
re = mr(ds, calcKLine, , unionAll, false)
select top 10 * from re order by SecurityId
// output:
SecurityID TradeDate TradeTime OpenPrice HighPrice LowPrice ClosePrice Volume Turnover TradesCount ...
---------- ---------- ------------ --------- --------- -------- ---------- ------ --------------------- ----------- ---
000001 2022.01.04 09:30:00.000 1.6 1.794 1.064 1.3 0 10000.200000000000727 53 ...
000001 2022.01.04 09:31:00.000 1.4 1.767 1.011 1.3 0 0 0 ...
000001 2022.01.04 09:32:00.000 1.6 1.8 1.046 1.4 0 0 0 ...
000001 2022.01.04 09:33:00.000 1.6 1.736 1.033 1.4 0 0 0 ...
000001 2022.01.04 09:34:00.000 1.6 1.772 1.063 1.3 0 0 0 ...
000001 2022.01.04 09:35:00.000 1.3 1.758 1.035 1.6 0 0.799999999999272 -41 ...
000001 2022.01.04 09:36:00.000 1.4 1.784 1.013 1.5 0 0 0 ...
000001 2022.01.04 09:37:00.000 1.4 1.79 1.006 1.5 0 0 0 ...
000001 2022.01.04 09:38:00.000 1.6 1.781 1.024 1.5 0 0 0 ...
000001 2022.01.04 09:39:00.000 1.5 1.697 1.024 1.6 0 0 0 ...6.5 时序分析
在金融领域,时间序列分析是一种重要的工具,它可以帮助理解数据的趋势、季节性、随机性等特征,并进行准确的预测。例如,在股票价格预测、货币汇率预测、通胀率预测等方面,时间序列分析被广泛应用于揭示金融数据随时间变化的规律。为应对此类场景,DolphinDB 提供了滑动窗口、累积窗口、序列计算等内置函数,以及 Talib、MyTT 算子指标库和 Alpha 因子库等模块,用户可以利用这些函数的组合和嵌套构建复杂的时序指标算子。
例:基于快照数据计算订单失衡率(SOIR)因子
该因子用于衡量买卖委托量在总量中的不均衡程度。t 时刻第 i 档的订单失衡率因子定义如下,具体参数说明参考处理 Level-2 行情数据实例 3.1.2 节。

使用 DolphinDB 的脚本实现上述因子的定义:此处使用 rowWavg 函数计算各档加权平均的买卖委托量不均衡程度因子,即订单失衡率因子。并使用 moving 系列函数对一段时间的指标进行移动标准化处理。
def wavgSOIR(bidQty,askQty,lag=20){
imbalance= rowWavg((bidQty - askQty)\(bidQty + askQty), 10 9 8 7 6 5 4 3 2 1).ffill().nullFill(0)
mean = mavg(prev(imbalance), (lag-1), 2)
std = mstdp(prev(imbalance) * 1000000, (lag-1), 2) \ 1000000
return iif(std >= 0.0000001,(imbalance - mean) \ std, NULL).ffill().nullFill(0)
}
re = select SecurityID, TradeDate, TradeTime, wavgSOIR(bidOrderQty, OfferOrderQty, lag=20) as HeightImbalance from loadTable(dbName, tbName) where TradeDate between beginDate and endDate context by SecurityID csort TradeTime
select top 10 * from re
// output:
SecurityID TradeDate TradeTime HeightImbalance
---------- ---------- ------------ ------------------
000001 2022.01.04 09:30:00.000 0
000001 2022.01.05 09:30:00.000 0
000001 2022.01.06 09:30:00.000 -4.710845145045806
000001 2022.01.07 09:30:00.000 10.571487776713668
000001 2022.01.10 09:30:00.000 1.688837114535889
000001 2022.01.04 09:30:03.000 -0.761301248682889
000001 2022.01.05 09:30:03.000 -0.494321354433684
000001 2022.01.06 09:30:03.000 -0.963245349720814
000001 2022.01.07 09:30:03.000 1.708213822850803
000001 2022.01.10 09:30:03.000 1.3875026058188546.6 截面计算
除了纵向的时序计算外,在金融场景下还需要横向对同一时间截面上不同股票的数据进行一些比较或计算。要对比同一截面不同股票的指标,就需要将原本的窄表进行一些行列转换,使之转成面板数据。为此,DolphinDB 在 SQL 语义上拓展提供了 pivot by 语句,并开发了用于内存计算的 pivot、panel 函数。
pivot 操作基于指定的维度(通常是时间和标的)将某一个属性值展开成面板数据,通常用于将一个窄表转换成宽表/矩阵,然后进行计算。

图 6-1 窄表转换成宽表示意图
在 DolphinDB 中使用 exec+pivot by 语句可以将一个表转换成一个面板矩阵,矩阵的行列标签记录了面板数据的维度信息。以一个简单的例子进行说明。
原表 t 定义如下:
sym = `C`MS`MS`MS`IBM`IBM`C`C`C
price= 49.6 29.46 29.52 30.02 174.97 175.23 50.76 50.32 51.29
qty = 2200 1900 2100 3200 6800 5400 1300 2500 8800
timestamp = [09:34:07,09:35:42,09:36:51,09:36:59,09:35:47,09:36:26,09:34:16,09:35:26,09:36:12]
t = table(timestamp, sym, qty, price);
// output:
timestamp sym qty price
--------- --- ---- -------------------
09:34:07 C 2200 49.600000000000001
09:35:42 MS 1900 29.46
09:36:51 MS 2100 29.519999999999999
09:36:59 MS 3200 30.019999999999999
09:35:47 IBM 6800 174.969999999999998
09:36:26 IBM 5400 175.229999999999989
09:34:16 C 1300 50.759999999999997
09:35:26 C 2500 50.32
09:36:12 C 8800 51.289999999999999利用 exec + pivot by 将表 t 转成面板矩阵:
m = exec price from t pivot by timestamp, sym;
m
// output:
C IBM MS
------------------ ------------------- ------------------
09:34:07|49.600000000000001
09:34:16|50.759999999999997
09:35:26|50.32
09:35:42| 29.46
09:35:47| 174.969999999999998
09:36:12|51.289999999999999
09:36:26| 175.229999999999989
09:36:51| 29.519999999999999
09:36:59| 30.019999999999999
typestr m
// output: 'FAST DOUBLE MATRIX'
m.colNames() // output: ["C","IBM","MS"]
m.rowNames() // output: [09:34:07,09:34:16,09:35:26,09:35:42,09:35:47,09:36:12,09:36:26,09:36:51,09:36:59]了解了 pivot by 的基本使用,下面结合具体的金融数据进行举例。
例:根据每天每只股票最后的 LastPrice 价格计算排名。
此处 pivot by 用于指定截面的维度信息,此处是日期和股票代码,由于同一个日期对应多个数据,因此计算时可以配合聚合指标使用。
m = exec last(LastPrice) from loadTable(dbName, tbName) pivot by TradeDate, SecurityID
rowRank(m)
// output:
000001 000002 000003 000004 000005 000006 000007 000008 000009 000010 000011 000012 000013 000014 000015 ...
------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ------ ---
2022.01.04|87 0 26 26 0 26 58 26 87 0 26 26 0 0 58 ...
2022.01.05|84 20 20 47 84 47 84 47 0 20 47 20 0 47 84 ...
2022.01.06|84 17 84 84 84 47 17 17 47 47 84 47 17 0 47 ...
2022.01.07|50 0 50 12 83 50 0 12 50 12 12 50 0 12 50 ...
2022.01.10|45 15 15 15 15 74 74 0 45 15 74 15 74 45 74 ...
...6.7 常见问题
1. 如何在写入分布式表时对数据进行去重?
- TSDB 引擎 sortColumns 去重:系统会根据索引配置字段 sortColumns 进行去重,但是该方式不适用于每个索引键(除最后一列时间列以外的 sortColumns 字段构成索引键)对应的数据量很少的场景,因为索引键的主要作用是数据索引,若设置为去重键会造成索引膨胀,影响查询效率。
- 使用 upsert! 方法写入数据:upsert! 支持设置数据键 keyColNames,使用 upsert! 写入新数据时,系统会去检查原有数据中该数据键是否已经存在,若存在则更新主键的数据,否则追加。通过该方式以达到数据去重的目的。
2. 如何写脚本可以优化查询性能,查询很慢的原因是什么?
一个查询是否可以优化,主要是看以下的方面:
- 是否能够触发分区剪枝,缩小系统遍历范围? → where 条件是否可以改写为触发剪枝的条件
- 是否走了分布式查询的优化? → 如果是自定义函数,无法进行分布式查询优化,是否可以通过 mr 函数进行逻辑改写。
查询慢的原因可以参考文档 查询/写入慢 进行排查。
3. 查询的字段调用了自定义的函数,会对性能有影响吗,会走分布式查询的优化吗?如果没有通过 mapr 关键字声明,则自定义函数无法进行分布式查询优化,此时有几种优化方案:
- 计算数据不跨分区:在查询语句后添加 map 关键字。例如:select first(id), count(*) from pt map。
- 计算数据跨分区:通过函数 mr 定义分布式计算逻辑。参数 mapFunc 指定函数会映射到各个分区进行执行,并将计算结果通过 reduceFunc 进行处理汇总后,通过 finalFunc 进行最后的计算。案例参考 通用计算 。
4. 查询字段多造成脚本复杂,可以如何优化代码?
可以通过元编程或者字段序列的方式去定义脚本。
例:查询字段 col000~col999 的数据
// 元编程
cols="col" + lpad(string(0..999), 3, "0")
<select _$$cols from t>.eval()
// 字段序列
select col000...col999 from t字段序列对于字段名和查询场景都有限制,与之相比元编程的使用范围更广。
例:计算多个字段的最小二乘回归。
colName = "y"
x = `SH000001`SH000003`SH000005
<select ols(_$colName, _$$x, 1, 2).Residual as residual from t>.eval()具体用法请参考 基于 SQL 的元编程 。
5. 如何进行表关联后只保留公共匹配列?
DolphinDB 中表关联的结果会将两边公共列拆成两列返回,例如:
t1= table(1 2 3 as id, 7.8 4.6 5.1 as value)
t2 = table(1 3 5 as id, 300 500 800 as qty)
fj(t1, t2, `id)
// output:
id value t2_id qty
-- ----- ----- ---
1 7.8 1 300
2 4.6
3 5.1 3 500
5 800如果需要将公共列 id 合成一列返回,可以参考下述脚本:
select t1.id.nullFill(t2.id) as id, value, qty from fj(t1, t2, `id)6. 共享表怎么删除,调用 dropTable 函数不起作用?
dropTable 用于删除分布式表,对于共享内存表可以用 undef("st", SHARED) 进行删除。
7. update 和 delete 的事务开销很大,这是为什么?
DolphinDB 的分布式数据是按照列式存储(OLAP)或者行列混存(TSDB)的,对于行级别的更新和删除操作,系统以分区为单位读出相关字段的数据到内存,然后进行更新和删除再写回磁盘。因此,如果事务涉及的分区多、分区数据量大,更新和删除的开销也会很大。
如果数据以 TSDB 引擎进行存储,则在某些特定场景和配置下,其更新和删除操作有优化:
- 设置去重策略 keepDuplicates=LAST 时,数据更新以数据追加方式进行。
- 设置去重策略 keepDuplicates=LAST 并开启软删除机制 softDelete=true,则删除操作时,系统取出待删除的数据打上删除标记(软删除),再以追加的方式写回数据库。
上述两种优化后的更新和删除方式效率更高。
8. 如何编写查询脚本,使之可以通过命中内部维护的元数据信息,从而加速查询性能?
DolphinDB 内部维护了分区信息、TSDB 引擎还额外维护了 zonemap、sortKey 等信息。理论上可以通过此类元数据信息加速相关的查询。下面以具体的例子进行说明:
例 1:查询分区字段的元数据信息。 场景:按照日期(天)和股票代码的 COMPO 分区,获取所有日期(天)值。
以 4.2.1 导入的快照数据表 snapshot 为例,该表的结构为:
engineType->TSDB
keepDuplicates->ALL
chunkGranularity->TABLE
sortColumns->["SecurityID","TradeTime"]
softDelete->0
tableOwner->admin
compressMethods->name compressMethods
------------------ ---------------
TradeDate delta
TradeTime delta
MDStreamID lz4
SecurityID lz4
...
tableComment->
colDefs->name typeString typeInt extra comment
------------------ ---------- ------- ----- ------------
TradeDate DATE 6 交易日期
TradeTime TIME 8 交易时间
MDStreamID SYMBOL 17
...
chunkPath->
partitionColumnIndex->[0,3]
partitionColumnName->["TradeDate","SecurityID"]
partitionColumnType->[6,17]
partitionType->[1,5]
partitionTypeName->["VALUE","HASH"]
partitionSchema->([2022.01.04,2022.01.05,2022.01.06,2022.01.07,2022.01.10,2022.01.11,2022.01.12,2022.01.13,2022.01.14,2022.01.17,2022.01.18,2022.01.19,2022.01.20,2022.01.21,2022.01.24,2022.01.25,2022.01.26,2022.01.27,2022.01.28,2022.02.07,2022.02.08,2022.02.09,2022.02.10,2022.02.11,2022.02.14,2022.02.15,2022.02.16,2022.02.17,2022.02.18,2022.02.21...],50)
partitionSites->分别利用三种方式求所有数据日期的唯一值:
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
pt = loadTable(dbName, tbName)
// (1)直接利用 distinct 函数求日期唯一值
timer dateList = exec distinct(TradeDate) from pt
// Time elapsed: 670.988 ms
// (2)利用 group by 的分布式计算求日期唯一值
timer dateList = exec TradeDate from select count(*) from pt group by TradeDate
// Time elapsed: 373.783 ms
// (3)利用元数据信息求日期的唯一值
timer{
result = exec dfsPath from getTabletsMeta( dbName[5:]+ "/%", tbName, false, -1) where rowNum != 0
dateList = substr(result, regexFind(result, "[0-9]{8}"), 8).distinct().temporalParse("yyyyMMdd").sort()
}
// Time elapsed: 35.623 ms比较查询耗时可以发现方案 3 性能最佳,因为它是通过直接解析分区元数据获取日期信息的。
如果创建数据库时,设置的 VALUE 分区方案包含在数据范围内,则利用 VALUE 分区自动拓展的特性,可以通过 schema 函数直接获取日期分区字段的值。
本例中 snapshot 表的分区信息可以通过schema(pt).partitionSchema[0]获取,获取结果是长度为 244 的向量,较真实的场景多了 2 天,因为 2024.01.01 和 2024.01.02 这两天也被包含了。这是由于在创建数据库时, snapshot 的建库语句为:create database "dfs://stock_lv2_snapshot" partitioned by VALUE(2024.01.01..2024.01.02), HASH([SYMBOL, 50]), engine='TSDB',其中指定的 2024.01.01..2024.01.02 也被纳入了元数据信息中,即使该分区并无数据。
利用该特性,我们在创建数据库时指定必然有数据的天作为分区 scheme,例如 create database "dfs://stock_lv2_snapshot" partitioned by VALUE(2022.01.04..2022.01.04), HASH([SYMBOL, 50]), engine='TSDB',这样就可以直接借助 schema 函数更方便地获取元数据信息。
例 2:限 TSDB 引擎:查询 count, min, max, sum 等 zonemap 包含的元数据信息。
zonemap 存储了数据的预聚合信息(每列每个 sortKey 对应数据的 min,max,sum,notnullcount)。如果查询语句包含 count, min, max, sum 等预聚合指标,且用户于设置去重策略(keepDuplicates=ALL),就可以利用 zonemap 进行提速。
不同去重策略下,查询的性能对比见TSDB 存储引擎详解5.2 节查询全表总记录数(命中元数据)部分。对于 ALL 去重策略,由于查询时无需去重,直接利用 zonemap 提速性能最高。

图 6-2 不同去重策略下 TSDB 查询全表记录数的性能对比
例 3:限 TSDB 引擎:查询条件命中 sortKey 索引数据。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
pt = loadTable(dbName, tbName)
select [HINT_EXPLAIN] * from pt where SecurityID in ["000076", "000024"]
// output:
...
"explain": {
"where": {
"TSDBIndexPrefiltering": {
"blocksToBeScanned": 60, // 扫描的 block 个数
"matchedWhereConditions": 1 // 命中 sortKey 的过滤条件
},
...可以通过 [HINT_EXPLAIN] 执行计划观察查询的过滤条件是否命中索引。
6.8 下一步阅读
- 分区剪枝:查询数据SQL 执行计划
- SQL 操作:量化金融范例深度不平衡、买卖压力指标、波动率计算
- 聚合计算:K 线计算基于快照行情的股票和基金 K 线合成
- 时序计算:窗口计算处理 Level-2 行情数据实例
- 截面计算:面板数据处理