DolphinDB 用户入门指南之金融篇(4)
5. 数据增删改查
5.1 增加数据
在 DolphinDB 中可以通过三种方式写入数据,分别为:
- tableInsert 函数(推荐):支持任意表对象的写入,会返回写入的记录数。
- append! 函数:支持任意表对象的写入,会直接对原表进行修改。
- insert into 语句:适合于数据量较少的场景,支持内存表写入;从 3.00.1 版本开始,若配置 enableInsertStatementForDFSTable 配置项,则 insert into 也支持分布式表写入。
例:使用三种方式向快照表里追加股票的数据。
use MockData
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
// 模拟一只股票的数据
date = 2023.01.03
t = stockSnapshot(tradeDate=date, securityNumber=1)使用tableInsert写入:
re2 = loadTable(dbName, tbName).tableInsert(t)
re2 // output: 4802使用append!写入:
re1 = loadTable(dbName, tbName).append!(t)
select count(*) from re1使用 insert into 语句写入:
getConfig(`enableInsertStatementForDFSTable) // true
insert into snaptb values(2023.01.03,09:30:00.000,'',"000001",'',"OCALL",2,1.045,37,0,10001.799999999999272,
1.4,1.41,1.631,1.633,0,0,0,0,0,0,0,22300,7.002,0,12100,7.002,0,0,0,0,0,
array(DOUBLE[]).append!([[6.940000000000001,6.940000000000001,6.950000000000001,6.97,6.980000000000001,6.990000000000001,7.02,7.05,7.08,7.1]]),
array(DOUBLE[]).append!([[7.09,7.06,7.05,7.04,7.030000000000001,7.01,6.990000000000001,6.980000000000001,6.950000000000001,6.92]]),
array(INT[]).append!([[32400,32400,33300,36900,36900,36900,39600,41400,41400,44100]]),
array(INT[]).append!([[48600,48600,47700,46800,42300,41400,40500,40500,34200,31500]]),
array(INT[]).append!([[20,20,18,18,17,17,16,16,15,13]]),
array(INT[]).append!([[10,10,12,16,17,17,17,18,18,19]]),
0,0,0,0,0,0,0,0,1,1000,2454.300000000000181,30,45000,31392.200000000000727,237,237,204,210,236,226,10:57:45.000,1,
array(LONG[]).append!([[,0,0,0,3600,5200,5200,12400,15600,15600]]),
array(LONG[]).append!([[,0,100,3500,3600,3900,5900,12400,13500,15600]]))思考题: (1)对比上述三种写入数据的方式,对于分布式表而言,哪种写入方式是最方便的?
注: 阅读 插入数据
(2)使用 insert into 写入数据到分布式表,对写入性能有何影响?
注: 阅读 redo log 和 cache engine
5.2 查询数据
在 DolphinDB 中内存对象可以直接通过切片、索引下标取数,且支持应用于函数直接进行计算,不光是向量、矩阵支持这种特性,内存表也同样支持。但是对于分区表、分布式表仅支持使用 SQL 语句进行访问。
表 5-1 不同类型表的访问方式
| 操作方式 | 普通内存表 | 分区内存表、分布式表(dfs 分区表、维度表) |
|---|---|---|
| 索引、切片 t[1] t[0:3] | 支持 | 不支持 |
| 函数sum(t) cumsum(t) | 支持 | 不支持 |
| SQLselect * from t | 支持 | 支持 |
例 1:查询某日的所有股票数据。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
select * from loadTable(dbName, tbName) where TradeDate = 2023.01.03例 2:查询某日股票部分字段的数据。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
select concatDateTime(TradeDate, TradeTime) as TradeDateTime,
SecurityID, OfferPrice[0] as offerPx1,
BidPrice[0] as bidPx1
from loadTable(dbName, tbName)
where TradeDate = 2023.01.035.3 删除数据
在 DolphinDB 中,为了应对不同的删除场景,系统支持按行、按分区以及全表删除等多种方式。
5.3.1 硬删除&软删除
在具体分析删除场景前,我们先来了解一下 DolphinDB 内的删除机制。
在 DolphinDB 中,OLAP 引擎采取的是硬删除(物理删除)的方式,而 TSDB 支持软删除(逻辑删除)和硬删除两种方式。
表 5-2 软删除和硬删除的机制对比
| 分类 | 删除方式 | delete 删除机制 |
|---|---|---|
| 硬删除 | 直接从文件中删除数据。 | 1.分区剪枝:根据删除条件进行分区剪枝,确定删除涉及的分区。2. 查到内存删除:取出对应分区所有数据到内存后,根据条件删除数据。3. 写回删除后的分区数据到新目录:将删除后的数据重新写入数据库,系统会使用一个新的 CHUNK 目录(默认是“物理表名_cid”)来保存写入的数据,旧的文件将被定时回收(默认 30 min)。 |
| 软删除 | 追加写入带删除标记的数据。 | 1. 分区剪枝:根据查询语句进行分区剪枝,缩窄查询范围。2. 读到内存获取待删除数据:根据查询条件,查询出需要删除的数据。3. 追加写入待删除数据:给需要删除的数据打上删除标记(deletion flag),并将排序列(sort column)和分区列(partition column)外字段值设置为空值,以 append! 方式将数据追加写入。 |
从上述机制可以了解到执行硬删除时,即使是按条件删除几条数据,也需要将相关分区的所有数据读取到内存,删除后再写回去;而软删除仅需要追加数据即可。因此就删除性能而言,硬删除在时间和空间的开销上都更高。但在查询场景下,软删除需要额外对数据进行过滤,因此相比硬删除,其性能更差。详情请参考 软删除 。
5.3.2 删除方法
下表列出了 DolphinDB 支持的多种删除数据的方法,并给出了用法的示例、适用场景的说明。此外,在使用过程中不同的方法有一些使用的注意点和限制,详情参考对应链接的说明。
表 5-3 删除方法及其适用场景,详情参考 新用户入门指南(金融篇)-5.3.2删除方法
表 5-4 不同删除方法的特点比较

注:
- 多版本并发控制(MVCC):在写入、更新和删除数据时创建新的数据版本,而不是直接覆盖原有数据版本,从而实现读写分离。这种机制使得并发读操作能够继续访问旧版本的数据快照,不会受到写操作的影响,确保数据一致性和高效的并发性能。
- 事务(Transaction): 是数据库系统中一个完整的、不可分割的操作单元。事务的主要目标是确保数据库的一致性和可靠性,即使在并发操作或系统故障的情况下,数据库仍能保持正确的状态。
例 1:删除某日的所有股票的快照数据。
由于该表是按天进行分区的,因此直接调用 dropPartition函数删除整个分区即可。由于该快照数据库采用的是组合分区,可以通过指定条件的方式进行删除。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
db = database(dbName)
dropPartition(db, [2023.01.04], tableName=tbName) 例 2:删除某个 ID 的所有股票数据。
由于股票字段采用的是 HASH 分区,无法通过直接删除分区的方式删除某只股票的数据,此时可以借助 delete 语句按照条件进行删除。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
delete from loadTable(dbName, tbName) where SecurityId = "000001"5.3.3 场景性能分析
本节结合实际案例,对比说明不同场景下如何选择更优的删除方式。
示例库表创建脚本:
use MockData
drop database if exists "dfs://k_minute_level"
create database "dfs://k_minute_level"
partitioned by VALUE(2020.01.01..2021.01.01)
engine='OLAP'
create table "dfs://k_minute_level"."k_minute"(
securityid SYMBOL
tradetime TIMESTAMP
open DOUBLE
close DOUBLE
high DOUBLE
low DOUBLE
vol INT
val DOUBLE
vwap DOUBLE
)
partitioned by tradetime
go
dates = getMarketCalendar('XSHE', 2022.01.01, 2022.01.31)
t = stockMinuteKLine(dates[0], dates[dates.size()-1], 10)
loadTable("dfs://k_minute_level", "k_minute").append!(t)场景一:删除部分分区数据,推荐使用函数 dropPartition。
对比删除单个分区场景下,delete 和 dropPartition 函数的性能:
// delete 删除 2022.01.04 的数据
timer delete from loadTable("dfs://k_minute_level", "k_minute") where date(tradetime) = 2022.01.04 // Time elapsed: 4.099 ms
// Time elapsed: 20.049 ms
// dropPartition 删除 2022.01.05 的数据
db = database("dfs://k_minute_level")
timer dropPartition(db, [2022.01.05], tableName="k_minute")
// Time elapsed: 5.978 ms对比删除多个分区场景下,delete 和 dropPartition 函数的性能:
// delete 删除多个分区数据
timer delete from loadTable("dfs://k_minute_level", "k_minute") where date(tradetime) between 2022.01.01 and 2022.01.31 // Time elapsed: 4.099 ms
// Time elapsed: 469.155 ms
// 将 delete 删除的数据重写回去
loadTable("dfs://k_minute_level", "k_minute").append!(t)
// dropPartition 删除多个分区数据
db = database("dfs://k_minute_level")
timer dropPartition(db, 2022.01.01..2022.01.31, tableName="k_minute")
// Time elapsed: 35.645 ms对比发现 dropPartition 函数的删除性能明显优于 delete 语句。这是因为 delete 语句具有 SQL 解析的开销。 此外,需要注意的是由于 delete 支持 MVCC 机制,在删除时会产生多个版本的数据副本占用额外磁盘空间,且与 dropPartition 直接删除分区数据文件不同,delete 会将数据读取到内存后再进行删除。因此,在内存受限的场景下,delete 操作可能会导致内存溢出。推荐运维时使用 dropPartition 函数删除数据。
场景二:删除全表数据, 推荐使用 truncate、 dropTable 或 drop 语句。
对比 delete、truncate 以及 dropTable 的性能:
注:k_minute_1 和 k_minute_2 是与 k_minute 结构完全相同的表,且写入相同的数据。
// delete
timer delete from loadTable("dfs://k_minute_level", "k_minute")
// Time elapsed: 495.771 ms
// truncate
timer truncate("dfs://k_minute_level", "k_minute_1")
// Time elapsed: 70.856 ms
// dropTable
db = database("dfs://k_minute_level")
timer dropTable(db, "k_minute_2")
// Time elapsed: 71.254 ms删除后,再次查询表数据:
// delete
select count(*) from loadTable("dfs://k_minute_level", "k_minute") // output: 0
// truncate
select count(*) from loadTable("dfs://k_minute_level", "k_minute_1") // output: 0
// dropTable
select count(*) from loadTable("dfs://k_minute_level", "k_minute_2")
// error: getFileBlocksMeta on path '/k_minute_level/k_minute_2.tbl' failed, reason: path does not exist对比性能开销,可以发现在删除全表数据的场景下, truncate 和 dropTable 是比较推荐的方式。两者的区别在于,truncate 保留原表结构只是删除了数据,而 dropTable 会将表结构也删除。
场景三:数据导入分布式表时,写入了重复数据,希望对该表进行去重。
对比 delete 方法和 dropPartition 删除分区重写的方式:
t1 = stockMinuteKLine(2022.01.04, 2022.01.05, 10)
// 写入 2 天重复数据
loadTable("dfs://k_minute_level", "k_minute").append!(t1)
// delete statement
timer delete from loadTable("dfs://k_minute_level", "k_minute") where isDuplicated([securityid, tradetime], FIRST) = true map
// Time elapsed: 271.374 ms
// 写入 2 天重复数据
loadTable("dfs://k_minute_level", "k_minute").append!(t1)
// rewrite data
timer{
tmp = select * from loadTable("dfs://k_minute_level", "k_minute") where isDuplicated([securityid, tradetime], FIRST) = false map
truncate("dfs://k_minute_level", "k_minute")
loadTable("dfs://k_minute_level", "k_minute").append!(tmp)
}
// Time elapsed: 86.429 ms注: 由于 isDuplicated 是顺序敏感函数,无法直接在查询分布式表的条件语句中使用, 此处增加了分区限定关键字词 map,使语句不跨分区执行。
该场景下,比起使用 delete + isDuplicate 函数条件删除,查出去重后的数据、全表删除然后重新写入是性能更好的方式。
但是对于社区用户,由于内存资源有限,delete 去重或一次性查询重写去重都可能会导致内存 OOM,因此可以改写成遍历分区的方式执行。参考脚本如下:
dates = getTabletsMeta("%k_minute_level/%").dfsPath.split("/")[2].temporalParse("yyyyMMdd")
timer{
for(date in dates){
tmp = select * from loadTable("dfs://k_minute_level", "k_minute") where partition(tradetime, date), isDuplicated([securityid, tradetime], FIRST) = false
dropPartition(db, [date], tableName="k_minute")
loadTable("dfs://k_minute_level", "k_minute").append!(tmp)
}
}
// Time elapsed: 301.925 ms注: 本例对所有分区都进行了遍历,如果已知重复写的分区,可以进一步缩小遍历范围。
5.4 更新数据
DolphinDB 的数据更新和数据删除的机制类似,每次更新时,由于 MVCC 机制,系统会额外产生数据副本存储在磁盘上,由系统定时回收;对于 TSDB 引擎,在配置数据保留策略为保留最新一条的场景下,更新也和软删除类似,以追加的方式实现。
最基础的更新方式就是通过 update 语句实现,如果要基于某些字段组成的键进行更新(该键存在)或追加(该键不存在),则可以使用 upsert! 函数。
例 1. 向快照表追加并更新买卖 1 档报价的信息。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
// 分布式表无法通过 update 直接追加字段,需要通过 addColumn
loadTable(dbName, tbName).addColumn(["OfferPx1", "BidPrice1"], [DOUBLE, DOUBLE])
// 更新更新买卖 1 档报价的信息
update loadTable(dbName, tbName) set OfferPx1 = OfferPrice[0], BidPrice1 = BidPrice[0] where TradeDate == 2022.12.01例 2. 向快照表写入数据时,希望根据时间和股票代码进行去重写入。
// 模拟写入的数据
use MockData
date = 2023.01.03
t = stockSnapshot(tradeDate=date, securityNumber=1)
// 如果执行了增加列字段的代码,需要执行此步
update t set OfferPx1 = OfferPrice[0], BidPrice1 = BidPrice[0]
// 向分布式表进行去重写入
loadTable(dbName, tbName).upsert!(t, keyColNames=["TradeDate", "TradeTime", "SecurityID"])注: 对于已经存储在数据库中的键值重复的数据,upsert! 不会对其进行去重,且后续键值相同的数据通过 upsert! 函数追加时,只会更新重复数据中第一个命中键值的记录,其余相同键值的记录不做更新。
5.5 下一步阅读
- SQL 编程:SQL 标准化SQL 编写案例
- 增删改查:数据库操作分布式表数据更新原理和性能软删除