DolphinDB 用户入门指南之金融篇(3)
4. 数据导入与数据清洗
创建好库表后,下一步就需要进行数据导入或数据迁移,在此过程中可能会遇到以下问题:
- 数据无法正确导入:源数据无法直接解析成 DolphinDB 的数据类型,造成数据错误、数据空值等;中文数据由于字符编码导致导入后的数据呈现乱码
- 导入数据预处理:需要增加、修改、重排序字段;需要将多列数据存储成数组向量入库;导入时需要跳过表头、跳过指定行数
- 数据膨胀:导入后的数据量较源文件大小膨胀了很多
- 内存溢出:社区版本受到内存限制,导入时可能会出现 Out Of Memory 内存溢出的现象
- 并行导入分区冲突:多线程并发写数据时,可能会抛出异常 The openChunks operation failed because the chunk <xxx> is currently locked and in use by transaction <tid>. RefId:S00002
本教程将针对数据导入中的一系列问题,结合实际案例进行系统地讲解。
4.1 方法导览
DolphinDB 支持多种文件格式导入、从数据库迁移、实时数据订阅等多种数据导入方式。
下表整理列出了数据导入 DolphinDB 数据库的不同方式、对应方法以及相关参考文档的链接:数据迁移方法。
DolphinDB 中的数据导入方法详情见:新用户入门指南(金融篇)-4.1 方法导览
4.2 从文本文件导入数据
本教程作为入门教程,将以最基础的文本文件导入数据作为示例,介绍导入的详细过程,以及导入中可能碰到的问题。
本章节使用的历史数据通过模拟产生,数据类型为快照数据,存储方式为 csv 文件。注意,导入的 csv 文件需要和 DolphinDB server 位于同一台服务器,且启动 DolphinDB server 的用户需要有该文件的读权限。
4.2.1 导入一年快照数据(每天一个文件)
场景:一年份的股票快照数据,每天一个 csv 文件,包含 100 只股票。
模拟数据生成 csv 文件,脚本如下:
dates = getMarketCalendar('XSHE', 2022.01.01, 2023.01.01)
def mockDataByDay(path, date){
submitJob("mockCsv", "mock data by date", saveText, stockSnapshot(tradeDate=date, securityNumber=100), path + string(date) + ".csv")
}
path = "<YOUR_PATH>/snapData/"
each(mockDataByDay{path}, dates)
getRecentJobs(242)
图 4-1 磁盘上单个文件大小展示
可以观察到单个文件约为 300M,经验上,对于几百 MB 的文件数据作为一批写入较为合适。在此基础上,许多用户为了提高数据导入的性能,通常会尝试多线程进行数据写入。
注:DolphinDB 事务要求不能够多线程往同一个分区写数据,否则会造成分区冲突(S00002 ),因此,用户必须保证每个线程写入的数据对应不同的分区。由于本场景每天存储在一个文件,因此文件并发写入不会造成分区冲突。
多线程并行处理方法:利用函数 submitJob 对每个文件的写入都创建一个后台作业,提交的批处理作业的并行度受到配置项 workerNum 影响,默认发行版配置的是 4。
注: 区分作业并行度(系统同时处理的作业数)和任务并行度(系统同时处理的作业子任务数)的概念!提交作业后,系统会将作业拆分成多个子任务,子任务之间也可并行执行。submitJob 函数默认的任务并行度是 2,如果要提高该并行度,可以使用 submitJobEx 函数提交任务,并对其参数 parallelism 进行配置。
// 导入历史数据
dataPath = "<YOUR_PATH>/snapData/"
file_list = files(dataPath).filename
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
// 定义单日写入任务
def loadDataByDay(dbName, tbName, schema, fileName){
loadTextEx(dbHandle=database(dbName), tableName=tbName, partitionColumns=["TradeDate", "SecurityID"], filename=fileName, schema=schema, sortColumns=[`SecurityID,`TradeTime])
}
// 创建任务写入作业
def createDataLoadJob(dbName, tbName, schema, fileName){
submitJob("loadDataByFile", "load data to db", loadDataByDay, dbName, tbName, schema, fileName)
}
// 设置导入的表的结构
schema = select name, typeString as type from loadTable("dfs://stock_lv2_snapshot", "snapshot").schema().colDefs
// 按照文件循环提交导入任务
each(createDataLoadJob{dbName, tbName, schema}, dataPath + file_list)通过 getRecentJobs函数查看最近写入任务的状态,确保所有写入任务全部执行完成。
// 一年的模拟数据包含 242 日交易日,每日一个文件
// 此处可以替换为 file_list.size()
getRecentJobs(242)
图 4-2 getRecentJob 函数部分返回结果截图
getRencentJob 函数返回的结果是一张表,因此可以通过 SQL 对其进行信息的过滤和筛选。例如,过滤提交作业中未完成的作业总数:
exec count(*) from getRecentJobs(242) where endTime is null如果上述查询返回为 0,代表所有任务结束。任务结束后,对库表进行查询,确认数据量是否相同,模拟数据的总数据量应为 242 * 480200=116,208,400。
select count(*) from loadTable("dfs://stock_lv2_snapshot", "snapshot")
// output: 116,208,4004.2.2 导入一天快照数据(每个股票一个文件)
场景:一天的股票快照数据,每个股票一个 csv 文件,共计 500 只股票。
模拟数据生成 csv 文件,脚本如下:
use MockData
t = stockSnapshot(tradeDate=2023.01.04, securityNumber=500)
sids = exec distinct Securityid from t
def mockDataByStock(path, sid, t){
data = select * from t where Securityid = sid
submitJob("mockCsv", "mock data by stock", saveText, data, path + sid + ".csv")
}
path = "<YOUR_PATH>/snapDataPerStock/"
each(mockDataByStock{path,,t}, sids)
select count(*) from getRecentJobs(500) where endTime is null
图 4-3 磁盘上单个文件大小展示
对于此类只有几 MB 或者几十 MB 的小文件导入的场景,如果与 4.2.1 的导入那样,每个文件都起一个线程,那么写入的效率将会很低。这是因为单批次写入数据量很小,而每批写入都会对应一个写入事务,在该场景下,事务的开销是较大的。
对于此类场景,DolphinDB 推荐先载入文件到内存,在内存中合并多个文件的数据后,再导入数据库中。为了避免分区冲突,本节将属于同一 HASH 桶的文件数据作一批导入。
注: 实际场景下,如果导入多天的多只股票文件的数据,也可以按照同一只股票多天文件去进行合并,合并的总原则就是不造成分区冲突。
数据导入的脚本如下,具体逻辑为先将多个文件合成一个 bigTable 然后导入分布式表。
def loadDataByStock(dbName, tbName, schema, fileNames){
bigTable = loop(loadText{schema=schema}, fileNames).unionAll()
rows = loadTable(dbName, tbName).tableInsert(bigTable)
return rows
}通过下述脚本定义一个写入作业的函数,在作业描述中记录了写入数据对应的 HASH 分区信息。
def createDataLoadJob(dbName, tbName, schema, HashNo, fileNames){
submitJob("loadDataByFile", "load data to db "+ string(HashNo), loadDataByStock, dbName, tbName, schema, fileNames)
}执行导入任务,将历史文件按照 HASH 分桶,然后每个桶的数据单独作为一批,由同一个后台批处理任务进行写入。
// 参数定义,可以修改
dataPath = "<YOUR_PATH>/snapDataPerStock/"
file_list = files(dataPath).filename
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
// 获取分布式表的 schema 作为导入数据的 schema
schema = select name, typeString as type from loadTable(dbName, tbName).schema().colDefs
// 将 500 个股票代码文件按照 Hash 分区分桶
tmp = table(file_list as path, file_list.split(".")[0] as sid)
// 此处文件名是股票代码,所以可以字符串处理后得到对应存储的 SecurityId
// 计算 SecurityId 的 HASH 值
update tmp set hashNo = sid.hashBucket(50)
tmp1 = select toArray(path) as fileBucket from tmp group by hashNo
fileBucket = tmp1.fileBucket
HashNo = tmp1.hashNo
// 属于同一个桶的文件作为一批数据在同一个线程提交写入
each(createDataLoadJob{dbName, tbName, schema}, HashNo, dataPath + fileBucket)
// 一共 50 个 HASH 分区,因此查看最近 50 个提交的任务
getRecentJobs(50)
// 写入完成后检查写入记录数是否满足需求
select count(*) from loadTable(dbName, tbName) where TradeDate=2023.01.04
select distinct SecurityId from loadTable(dbName, tbName) where TradeDate=2023.01.04由于导入的接口是 tableInsert,它会返回写入的数据条数,且提交的 job 的描述(由 submitJob 函数的 jobDesc 参数设置)中记录了 HASH 对应的分区信息。如果发生异常,例如最后检查时发现最终写入的记录数和原文件记录数不一致,用户可以通过 getRecentJob 显示 msgError 检查是否有写入失败的任务,或者通过 getJobReturn去获取作业写入的条数以检查是哪个作业出现问题。对于出现问题的作业我们可以采用 dropPartition 删除掉写异常的分区然后重写。如 2023.01.04 属于 HASH 0 的数据写入异常,则可以通过下述脚本进行重写。
// 删除异常分区,重新提交写入任务
dropPartition(database(dbName),"/20230104/Key0", tableName="snapshot")
submitJob("loadDataByFile", "load data to db "+ string(HashNo[0]), loadDataByStock, dbName, tbName, schema, dataPath + fileBucket.row(0))4.2.3 导入逐笔成交数据(每天一个文件)
场景:一天的股票逐笔成交数据,每天一个 csv 文件,共计 500 只股票。
建库建表代码如下:
// 建库建表
create database "dfs://stock_trade"
partitioned by VALUE(2020.01.01..2021.01.01), HASH([SYMBOL, 50])
engine='TSDB'
create table "dfs://stock_trade"."trade"(
ChannelNo INT
ApplSeqNum LONG
MDStreamID SYMBOL
BidApplSeqNum LONG
OfferApplSeqNum LONG
SecurityID SYMBOL
SecurityIDSource SYMBOL
TradePrice DOUBLE
TradeQty LONG
ExecType SYMBOL
TradeDate DATE[comment="交易日期", compress="delta"]
TradeTime TIME[comment="交易时间", compress="delta"]
LocalTime TIME
SeqNo LONG
DataStatus INT
TradeMoney DOUBLE
TradeBSFlag SYMBOL
BizIndex LONG
OrderKind SYMBOL
)
partitioned by TradeDate, SecurityID,
sortColumns=[`SecurityID,`TradeTime],
keepDuplicates=ALL由于逐笔数据很大,本节仅模拟 3 日的数据作为示例。模拟数据生成 csv 文件,脚本如下:
dates = getMarketCalendar('XSHE', 2022.01.01, 2022.01.06)
path = "<YOUR_PATH>/tradeData/"
for(i in 0:dates.size()){
saveText(stockTrade(tradeDate=dates[i], securityNumber=500), path + string(dates[i]) + ".csv")
}
图 4-4 磁盘上单个文件大小展示
如果需要将此类大文本文件加载到分布式表, 除了使用 loadTextEx 函数进行加载外(见 4.2.1 节的例子),还可以使用 textChunkDS 函数将文本文件根据指定大小划分为多个小文件数据源,再通过 mr 函数写入到数据库中。
mr+textChunkDs 和 loadTextEx 都是分块读文件然后写入数据库,和 loadTextEx 不同的是,mr+textChunkDs 可以由用户更灵活的配置拆分的数据源大小。虽然 mr 函数支持按照数据源并发进行写入,但是由于数据源是按照文件大小划分而非分区列,如果直接并行执行写会造成分区冲突,所以需要配置 parallel 参数为 false,使数据源间串行执行写入,避免写入冲突。
以单个大文件导入为例,首先使用 textChunkDS 函数将大文件拆分成多个小的数据源,然后使用 mr 函数将数据源写入数据库,此处指定 mapFunc 设置为写数据库函数,可以是 append! 或者 tableInsert。
ds = textChunkDS(fileName=dataPath+file_list[0], chunkSize=512, schema=schema)
mr(ds, tableInsert{loadTable(dbName, tbName)}, parallel=false)最后验证写入的数据量是否和原数据量一致:
exec count(*) from loadTable(dbName, tbName)
// output: 14,401,0004.3 数据预处理
很多场景下,数据入库前需要进行预处理操作,包括:
- 类型转换:将时间类型转换成 DolphinDB 可以识别的格式;格式化小数数值等
- 字段处理:合并多个字段、过滤掉不必要的字段、将字段重排序、增加字段等
- 记录处理:对记录进行异常值过滤;跳过、过滤部分记录等
下文将场景结合具体的案例进行说明。
4.3.1 时间类型解析
场景 1:文本文件存储的时间戳格式是 “20230101090001300”。
模拟数据下载链接:MinuteKLine.csv
部分数据展示:
securityid,tradetime,open,close,high,low,vol,val,vwap
000001,20220104093000000,85.172409680671989,88.88322226820837,85.607211456168442,84.836089657153934,11823,1050866.336877027526497,88.88322226820837
000001,20220104093100000,85.172409680671989,88.88322226820837,85.607211456168442,84.836089657153934,11823,1050866.336877027526497,88.88322226820837
000001,20220104093200000,44.614620972424745,47.569264134165443,44.741714544594287,42.879966530017554,96059,4569455.943463798612356,47.569264134165443
000001,20220104093300000,44.614620972424745,47.569264134165443,44.741714544594287,42.879966530017554,96059,4569455.943463798612356,47.569264134165443
...如果直接使用 loadText 函数或者 loadTextEx 函数导入数据,则 tradeTime 字段无法被识别为时间戳。
dataPath = "<YOUR_PATH>/MinuteKLine.csv"
kline = loadText(dataPath)
kline.schema().colDefs
// output:
name typeString typeInt extra comment
---------- ---------- ------- ----- -------
securityid SYMBOL 17
tradetime LONG 5
open DOUBLE 16
close DOUBLE 16
high DOUBLE 16
low DOUBLE 16
vol INT 4
val DOUBLE 16
vwap DOUBLE 16我们可以参考前文的导入案例,尝试为导入设置 schema 参数,测试是否能正确解析。
schema = extractTextSchema(dataPath)
update schema set type = "TIMESTAMP" where name = "tradetime"
kline = loadText(dataPath, schema=schema)
// 查看表的类型
kline.schema().colDefs
// output:
name typeString typeInt extra comment
---------- ---------- ------- ----- -------
securityid SYMBOL 17
tradetime TIMESTAMP 12
open DOUBLE 16
close DOUBLE 16
high DOUBLE 16
low DOUBLE 16
vol INT 4
val DOUBLE 16
vwap DOUBLE 16
// 查询一条数据
select top 1 * from kline
// output:
securityid tradetime open close high low vol ...
---------- ----------------------- ------------------ ----------------- ------------------ ------------------ ----- ---
000001 2022.01.04T09:30:00.000 85.172409680672004 88.88322226820837 85.607211456168442 84.836089657153934 11823 ...可以发现通过修改 schema, 系统可以自动解析此类 LONG 类型的时间戳。但是并不是所有时间戳格式,DolphinDB 内部都做了自动匹配,例如场景 2。
场景 2:文本文件存储的时间戳格式是 “09:00:01:300 21/01/2023”。
模拟数据下载链接:MinuteKLine_1.csv
数据展示:
securityid,tradetime,open,close,high,low,vol,val,vwap
000001,09:30:00.000 04/01/2022,71.402596682310104,73.259566984725012,71.99277468281798,70.358094742754474,71231,5218352.215888947248458,73.259566984725012
000001,09:31:00.000 04/01/2022,71.402596682310104,73.259566984725012,71.99277468281798,70.358094742754474,71231,5218352.215888947248458,73.259566984725012
000001,09:32:00.000 04/01/2022,49.951133667491376,46.830399985240049,50.527059993706643,50.500124022830277,50228,2352197.330458637326955,46.830399985240049
000001,09:33:00.000 04/01/2022,49.951133667491376,46.830399985240049,50.527059993706643,50.500124022830277,50228,2352197.330458637326955,46.830399985240049
...直接使用 loadText 函数,并设置正确的 schema 参数进行数据导入。
dataPath = "<YOUR_PATH>/MinuteKLine_1.csv"
schema = extractTextSchema(dataPath)
update schema set type = "TIMESTAMP" where name = "tradetime"
kline=loadText(dataPath, schema=schema)
select top 5 * from kline
// output:
securityid tradetime open close high low vol ...
---------- --------- ------------------ ------------------ ------------------ ------------------ ----- ---
000001 71.40259668231009 73.259566984725012 71.99277468281798 70.358094742754474 71231 ...
000001 71.40259668231009 73.259566984725012 71.99277468281798 70.358094742754474 71231 ...
000001 49.951133667491376 46.830399985240049 50.527059993706636 50.500124022830277 50228 ...
000001 49.951133667491376 46.830399985240049 50.527059993706636 50.500124022830277 50228 ...
000001 66.079958924092352 64.834977746015866 66.389338146662339 65.51682344241999 8487 ...可以发现虽然导入没有报错,但是最终导入的时间列都变成了空值。为了处理这种情况,可以先将时间字段以字符串形式导入,再使用 temporalParse 函数对时间列进行预处理。这里按照内存表和分布式表分开讨论。
(1)对于内存表,先将数据导入内存,再修改时间字段即可。
schema = extractTextSchema(dataPath)
update schema set type = "STRING" where name = "tradetime"
kline=loadText(dataPath, schema=schema)
kline.replaceColumn!("tradetime", kline.tradetime.temporalParse("HH:mm:ss.SSS dd/MM/yyyy"))
kline.schema().colDefs
// output:
name typeString typeInt extra comment
---------- ---------- ------- ----- -------
securityid SYMBOL 17
tradetime TIMESTAMP 12
open DOUBLE 16
close DOUBLE 16
high DOUBLE 16
low DOUBLE 16
vol INT 4
val DOUBLE 16
vwap DOUBLE 16
select top 1 * from kline
// output:
securityid tradetime open close high low vol ...
---------- ----------------------- ----------------- ------------------ ----------------- ------------------ ----- ---
000001 2022.01.04T09:30:00.000 71.40259668231009 73.259566984725012 71.99277468281798 70.358094742754474 71231 ...(2)对于分布式表,使用 loadTextEx 进行数据导入,在参数 transform 指定的函数中定义字段转换逻辑即可。
建库建表脚本如下:
create database "dfs://k_minute_level"
partitioned by VALUE(2020.01.01..2021.01.01)
engine='OLAP'
create table "dfs://k_minute_level"."k_minute"(
securityid SYMBOL
tradetime TIMESTAMP
open DOUBLE
close DOUBLE
high DOUBLE
low DOUBLE
vol INT
val DOUBLE
vwap DOUBLE
)
partitioned by tradetime数据导入脚本如下:
schema = extractTextSchema(dataPath)
update schema set type = "STRING" where name = "tradetime"
// 时间字段转换函数定义
def transfunc(mutable msg){
msg.replaceColumn!("tradetime", msg.tradetime.temporalParse("HH:mm:ss.SSS dd/MM/yyyy"))
return msg
}
kline = loadTextEx(dbHandle=database("dfs://k_minute_level"), tableName="k_minute", partitionColumns="tradetime", filename=dataPath, schema=schema, transform=transfunc)
// 查询一条数据验证导入结果
select top 1 * from loadTable("dfs://k_minute_level", "k_minute")
// output:
securityid tradetime open close high low vol ...
---------- ----------------------- ----------------- ------------------ ----------------- ------------------ ----- ---
000001 2022.01.04T09:30:00.000 71.40259668231009 73.259566984725012 71.99277468281798 70.358094742754474 71231 ...4.3.2 增加、删除、重排序、合并字段后导入数据
数据导入函数 loadTextEx
loadTextEx 是数据导入场景最常用的函数。导入文本文件到分布式表时,不论是修改字段类型、增删字段、记录修改都可以利用 loadTextEx 函数搭配 transform 参数指定的函数来定义数据处理逻辑。此外, loadTextEx 函数还有其他一些功能参数配置可以辅助系统更好地解析和导入数据:
- skipRows 指定导入时从文件头开始忽略的行数
- delimiter 数据文件中各列的分隔符
- containHeader 表示数据文件是否包含标题行
- 数组向量解析相关配置:
- arrayDelimiter 数据文件中数组向量列的分隔符
- arrayMarker 表示数组向量左右边界的标识符
使用 loadTextEx 导入数据到分布式表的整体逻辑如下图所示:

图 4-5 loadTextEx 导入分布式表流程
表 4-2 字段处理相关函数
| 功能 | 函数 |
|---|---|
| 字段删除 | dropColumns! |
| 字段替换 | replaceColumn! |
| 字段增加 | addColumn |
| 字段重命名 | rename! |
| 字段重排序 | reorderColumns! |
| 字段注释 | setColumnComment |
下面结合具体的案例来对该函数的使用进行一个理解和巩固。
场景 1:导入时增加字段。每个股票每天一个文件,文件名为 “股票代码.csv”。但是每个股票的数据内没有存储股票字段,导入数据时希望能够通过读文件名,将股票代码的信息也补充进去。
模拟一天 10 个股票数据进行说明,模拟数据下载链接:snapForDataProcess.zip
本场景同样使用 loadTextEx 函数搭配 transform 转换函数即可实现。为了方便说明,本节不考虑小文件合并的问题(具体参考 4.2.2 的示例),而是直接遍历每个文件导入数据库。建库建表代码参考 3.3 节。
// 通过文件名提取股票代码
dataPath = "<YOUR_PATH>/snapPerStockWithoutSid/"
file_list = files(dataPath).filename
sids = file_list.split(".")[0]
// 设置导入数据的 schema
// 注意此处的 schema 是导入数据的格式,不等于导入的数据表的 schema
schema = select name, typeString as type from loadTable("dfs://stock_lv2_snapshot", "snapshot").schema().colDefs where name != "SecurityID"
// 转换函数定义
def transFunc(mutable msg, sid, orderCols){
// step1. 增加 securityId 字段
data = select sid as SecurityID, * from msg
// step2. 将表字段重排序和分布式表对齐
data.reorderColumns!(orderCols)
return data
}
// 字段排序的依据
orderCols = loadTable("dfs://stock_lv2_snapshot", "snapshot").schema().colDefs.name
// 循环导入文件
for(i in 0:sids.size()){
loadTextEx(dbHandle=database("dfs://stock_lv2_snapshot"), tableName="snapshot", partitionColumns=["TradeDate", "SecurityID"], filename=dataPath + file_list[i], schema=schema, sortColumns=["SecurityID", "TradeDate"], transform=transFunc{, sid[i], orderCols})
}
// 检验数据是否导入完成
exec count(*) from loadTable("dfs://stock_lv2_snapshot", "snapshot")
// output: 48,020
注: 由于 transForm 参数仅接受指定的函数为一元函数,本脚本 transform=transFunc{, sid[i], orderCols} 使用部分应用固定了 sid 和 orderCols 参数,使之成为一个一元函数。
场景 2:原快照数据中的多档价格存储在不同的字段,需要在导入时将其合并为数组向量。
模拟一天 10 个股票的快照数据进行说明,模拟数据下载链接:snapshot_array.csv
本场景还是沿用 3.3 节创建的快照库表作为导入的分布式表。在导入前需要根据预先设计的字段类型对 schema 进行调整。
dbName = "dfs://stock_lv2_snapshot"
tbName = "snapshot"
dataPath = "<YOUR_PATH>/snapshot_array.csv"
schema = extractTextSchema(dataPath)
update schema set type="LONG" where name in ["NumTrades", "TotalValueTrade", "TotalBidQty", "TotalOfferQty", "ETFBuyAmount", "ETFSellAmount", "WithdrawBuyAmount", "WithdrawSellAmount"] or regexFind(name, "[bid|offer]Order[0-9]{1,2}") >= 0 最方便灵活的方法是直接通过 select 对数据字段进行处理,此处使用了 fixedLengthArrayVector 函数搭配字段序列(…) 将多列合并成一列数组向量。
def transFunc(mutable msg){
return select TradeDate, TradeTime, MDStreamID, Securityid, SecurityIDSource, TradingPhaseCode, ImageStatus,
PreCloPrice, NumTrades,TotalVolumeTrade, TotalValueTrade, LastPrice, OpenPrice, HighPrice, LowPrice, ClosePrice,
DifPrice1, DifPrice2, PE1, PE2, PreCloseIOPV, IOPV, TotalBidQty, WeightedAvgBidPx, AltWAvgBidPri, TotalOfferQty,
WeightedAvgOfferPx, AltWAvgAskPri, UpLimitPx, DownLimitPx, OpenInt, OptPremiumRatio,
fixedLengthArrayVector(offerPx1...offerPx10) as offerPx, fixedLengthArrayVector(bidPx1...bidPx10) as bidPx,
fixedLengthArrayVector(offerOrderQty1...offerOrderQty10) as offerOrderQty,
fixedLengthArrayVector(bidOrderQty1...bidOrderQty10) as bidOrderQty,
fixedLengthArrayVector(bidNumOrders1...bidNumOrders10) as bidNumOrders,
fixedLengthArrayVector(offerNumOrders1...offerNumOrders10) as offerNumOrders,
ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney, YieldToMatu, TotWarExNum,
WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney,
TotalBidNumber, TotalOfferNumber, MaxBidDur, MaxSellDur, BidNum, SellNum, LocalTime, SeqNo,
fixedLengthArrayVector(offerOrder1...offerOrder10) as offerOrder,
fixedLengthArrayVector(bidOrder1...bidOrder10) as bidOrder
from msg
}
loadTextEx(dbHandle=database(dbName), tableName=tbName, partitionColumns=["TradeTime", "SecurityID"], filename=dataPath, schema=schema, sortColumns=["SecurityID", "TradeTime"], transform=transFunc)还有一个比较有技巧的方式是通过 update 先将多列合并后的字段并入表内,然后为了保持字段顺序不变,将原属性的第一列字段替换为同属性的数组向量字段,最后删除多余的字段。该逻辑对于的转换函数如下:
def transFunc1(mutable msg){
update msg set offerPx = fixedLengthArrayVector(offerPx1...offerPx10), bidPx = fixedLengthArrayVector(bidPx1...bidPx10),
offerOrderQty = fixedLengthArrayVector(offerOrderQty1...offerOrderQty10),
bidOrderQty = fixedLengthArrayVector(bidOrderQty1...bidOrderQty10),
bidNumOrders = fixedLengthArrayVector(bidNumOrders1...bidNumOrders10),
offerNumOrders = fixedLengthArrayVector(offerNumOrders1...offerNumOrders10),
offerOrder = fixedLengthArrayVector(offerOrder1...offerOrder10),
bidOrder = fixedLengthArrayVector(bidOrder1...bidOrder10)
arrayCols = ["offerPx", "bidPx", "offerOrderQty", "bidOrderQty", "bidNumOrders", "offerNumOrders", "offerOrder", "bidOrder"]
each(replaceColumn!{msg}, arrayCols + "1", msg[arrayCols])
dropCols = cross(+, arrayCols, NULL join string(2..10)).flatten().flatten()
msg.dropColumns!(dropCols)
return msg
}4.3.3 导入数据包含中文导致乱码
场景:导入包含中文字段的 csv 文件(采用 GBK 编码)。
模拟数据下载链接:GPLIST_gbk.csv(来源 股票与存托凭证 | 上海证券交易所)
DolphinDB 仅支持导入 UTF8 和 ANSI 编码的文件,如果直接导入 GBK 编码的 csv 文件,在 DolphinDB 中会显示为乱码数据。
dataPath = "<YOUR_PATH>/GPLIST_gbk.csv"
t = loadText(dataPath)
t
/* output:
A�ɴ��� B�ɴ��� ֤ȯ��� ��λ֤ȯ��� ��˾Ӣ��ȫ�� ��������
------- ------- -------- ------------ ------------------------- ----------
600000 �ַ����� �ַ����� Shanghai Pudong Develop...1999.11.10
600004 ���ƻ��� ���ƻ��� Guangzhou Baiyun Intern...2003.04.28
600006 ����ɷ� ����ɷ� DONGFENG AUTOMOBILE CO....1999.07.27
600007 �й���ó �й���ó China World Trade Cente...1999.03.12
600008 �״����� �״����� BEIJING CAPITAL ECO-ENV...2000.04.27
...
*/
此时就需要将数据进行一个编码转换,可以借助第三方平台将文件编码进行转换,或在导入时借助 DolphinDB 的内置函数 convertEncode 和 toUTF8 函数进行转换。
dataPath = "<YOUR_PATH>/GPLIST_gbk.csv"
schema = extractTextSchema(dataPath)
update schema set name = toUTF8(name, "gbk")
update schema set type = "SYMBOL" where name like "%代码%"
t = loadText(dataPath, schema=schema)
t = toUTF8(t, "gbk")
t
/* output:
A股代码 B股代码 证券简称 扩位证券简称 公司英文全称 上市日期
---------- ---------- ------------ ------------------ ------------------------- ------------
600000 - 浦发银行 浦发银行 Shanghai Pudong Develop...1999.11.10
600004 - 白云机场 白云机场 Guangzhou Baiyun Intern...2003.04.28
600006 - 东风股份 东风股份 DONGFENG AUTOMOBILE CO....1999.07.27
600007 - 中国国贸 中国国贸 China World Trade Cente...1999.03.12
600008 - 首创环保 首创环保 BEIJING CAPITAL ECO-ENV...2000.04.27
600009 - 上海机场 上海机场 Shanghai International ...1998.02.18
600010 - 包钢股份 包钢股份 Inner Mongolia Baotou S...2001.03.09
600011 - 华能国际 华能国际 Huaneng Power Internati...2001.12.06
*/
4.4 常见问题
1. 由于内存溢出无法写入/查询?
如果是社区版本,由于 8G 的内存限制,在导入时可能会出现内存溢出的情况。此时,getRecentJob 的 errorMsg 字段可能会出现以下报错:
loadDataByDay: loadTextEx(database(dbName), tbName, ["TradeDate","SecurityID"], fileName, , schema, , , ["SecurityID","TradeTime"]) => Out of memory
loadDataByDay: loadTextEx(database(dbName), tbName, ["TradeDate","SecurityID"], fileName, , schema, , , ["SecurityID","TradeTime"]) => [appendDFSTablet] Failed to save table on data node, with error: Out of memory. RefId:S01077若发生溢出情况,可以提前结束其他任务:
// 取消单机的后台作业
cancelJob(getRecentJob(242).jobId)
// 取消集群节点上所有后台作业
use ops
cancelJobEx()为了能弄清楚造成溢出的原因,我们逐步进行排查。本教程仅列出基础的排查方式,详细内容参考 Out of Memory 。
内存占用的对象包括:临时变量、读写缓存、元数据缓存等等。
- 临时变量占用内存量获取内存中当前所有临时变量的内存占用(单位:MB)。
tempVarSize = exec sum(bytes) \ 1024 \ 1024 from objs(true)清除不必要的临时变量,可以通过 undef 函数进行操作。
undef(obj, [objType=VAR]) // 清除某个变量
undef all // 清除所有变量- 缓存占用内存量
获取所有缓存的内存占用(单位:B)。
getSessionMemoryStat()
图 4-6 getSessionMemoryStat 返回结果示意
- 对缓存进行清理。每类缓存对应的清理方法如下表所示:
表 4-3 系统缓存类型及其对应的清理方式
| 缓存类型 | 清理方式 | 说明 |
|---|---|---|
| __DimensionalTable__ | 暂时没有接口 | |
| __SharedTable__ | undef("sharedTableName", SHARED) | 清理共享表变量 |
| __OLAPTablet__ | clearAllCache() | 清理查询数据缓存 |
| __OLAPCacheEngine __ | flushOLAPCache() | 将写入 OLAP Cache Engine 缓存的数据强行刷盘 |
| __OLAPCachedSymbolBase__ | 无需释放 | |
| __DFSMetadata__ | 无需释放 | |
| __TSDBCacheEngine__ | flushTSDBCache() | 将写入 TSDB Cache Engine 缓存的数据强行刷盘 |
| __TSDBLevelFileIndex__ | invalidateLevelIndexCache() | 将 TSDB 表缓存的索引强制清理 |
| __TSDBCachedSymbolBase__ | 无需释放 | |
| __StreamingPubQueue__ | 无需释放 | |
| __StreamingSubQueue__ | 管理订阅以及引擎 |
- 其他内存占用排查方式
如果清理缓存和清理临时变量后仍然无法解决问题,那么可以参照下述方法进行检查:
(1)内存、缓存相关的配置项是否设置合理,下面列举部分可能造成内存大量占用的场景。
内存相关配置请参照 功能配置 内存、系统内存、Cache Engine 小节。
表 4-4 TSDB 内存相关配置项
| 配置项 | 场景说明 | 解决方案 |
|---|---|---|
| TSDBCacheEngineSize | TSDB Cache Engine 写满刷盘后,系统会重新开辟一个新的 Cache Engine 内存供其他事务写入,从而造成内存膨胀 | 尝试调小 TSDBCacheEngineSize,或者调大 maxMemSize(社区版受到单节点 8G 限制) |
| TSDBLevelFileIndexCacheSize | TSDB 的 Level File 索引会在查询时载入内存进行缓存 | 尝试调小 TSDBLevelFileIndexCacheSize ,同时检查一下 sortColumns 字段是否设置合理,造成索引膨胀 |
(2)数据库分区或索引等配置不合理导致元数据爆炸。下表列举了几个可能的场景:
表 4-5 导致内存中元数据膨胀的场景
| 排查方向 | 具体说明 |
|---|---|
| TSDB 引擎场景下,sortColumns 设置是否合理 | 如果 sortKey 数量很大,会占用大量内存和磁盘空间。测试案例可参考 TSDB 存储引擎详解 5.1 小节。 |
| 创库时分区指定过多,如设置了数万个 HASH 分区,设置了几十年的 VALUE 天分区等 | VALUE 分区会自动拓展,不需要预设很多; HASH 分区主要是用在控制分区粒度的场景,一般和 VALUE 或者 RANGE 组合使用,如果设置过多会造成分区数据量很少; RANGE 分区可以先设置部分,如果超过范围可以通过函数 addRangePartitions 拓展分区。 |
2. 写入时报错列字段不一致,The column [xxx] expects type of DATETIME, but the actual type is DOUBLE.
解决方案:这通常是由于写入数据的字段顺序和表字段的顺序不一致造成的。DolphinDB 不支持表字段根据字段名自动对应,也不支持 insert into 指定字段名进行写入。需要在写入时确保写入数据的类型和字段顺序能够与表定义的顺序完全一致。
3. 写入函数执行没有报错,但是数据没有成功写进去。
解决方案:请检查一下建库时的分区的类型和分区模式。
如是 VALUE 分区,请检查配置项 newValuePartitionPolicy 是否设置为 add,默认下载的 DolphinDB 配置文件中会设置为 add 策略,即 VALUE 分区自动增加;或者调用 addValuePartitions 手动增加分区。
配置文件,如下图所示(以单机为例):

图 4-7 addValuePartitions 配置示意图
如是 RANGE 分区,不支持自增,请检查写入数据的范围是否超过了建库时设置的范围,如超过了需要手动调用函数 addRangePartitions 增加分区,然后再写入数据。