DolphinDB 用户入门指南之金融篇(3)- 4. 数据导入与数据清洗

本页围绕建库建表后的数据导入/迁移阶段,概览常见问题并给出处理与教程目标导向的说明。

Source: https://dolphindb.cn/blogs/138

What this page covers

技能认证特训营第二期报名入口

页面顶部提供活动报名宣传与限时报名链接入口。

DolphinDB 用户入门指南之金融篇(3)

本部分展示文章的标题信息,以及作者与发布日期。

4. 数据导入与数据清洗:问题概览

说明在建库建表后进入数据导入/迁移阶段时可能遇到的问题,并引出教程目标。

4.1 方法导览

介绍 DolphinDB 支持的多种数据导入方式,并提供参考文档方向。

4.2 从文本文件导入数据

以 CSV 文本文件为示例讲解导入流程、约束与典型问题处理方向。

4.2.1 导入一年快照数据(每天一个文件)

以按天单文件的较大文件场景为例,包含模拟数据、批量写入建议、多线程后台作业导入与导入后校验。

4.2.2 导入一天快照数据(每个股票一个文件)

针对小文件、多事务的导入场景,提出先合并再分批导入以降低事务开销,并说明分区冲突规避与异常重写。

4.2.3 导入逐笔成交数据(每天一个文件)

逐笔超大文件导入示例包含 TSDB 建库建表、分块读取写入,以及并行写入导致分区冲突的规避策略。

4.3 数据预处理

说明入库前常见预处理类型,并通过后续案例展开。

4.3.1 时间类型解析

展示两类时间戳文本格式解析问题:一种需要通过 schema 修正类型推断,另一种需要 temporalParse 处理非标准格式。

4.3.2 字段增删改与数组向量合并(loadTextEx + transform)

介绍 loadTextEx 常用参数与字段处理函数,并示例说明导入时字段处理与数组向量合并方式。

4.3.3 中文乱码(GBK 编码)处理

说明文本导入编码限制与 GBK 文件乱码现象,并给出编码转换处理方式。

4.4 常见问题

汇总 OOM 排查与缓存清理、写入列类型/顺序不一致、以及 VALUE/RANGE 分区策略导致写入无效等问题与建议处理。

4.5 下一步阅读

本部分提供后续阅读入口,覆盖文本导入、数据清洗与相关模块/插件方向。

Facts index

Entity Attribute Value Confidence
DolphinDB 用户入门指南之金融篇(3)发布日期2025-01-07high
作者署名momohigh
数据导入/迁移过程可能遇到的问题数据无法正确导入(类型解析错误/空值;中文字符编码导致乱码)medium
数据导入/迁移过程可能遇到的问题导入数据预处理需求(增删改字段、重排序、多列合并为数组向量、跳过表头或指定行)medium
数据导入/迁移过程风险/问题导入后数据量相对源文件大小可能出现膨胀medium
DolphinDB 社区版本导入时可能出现的问题受内存限制,导入可能出现 Out Of Memory 内存溢出high
并行导入可能报错多线程并发写同一分区可能抛异常:The openChunks operation failed because the chunk <xxx> is currently locked and in use by transaction <tid>. RefId:S00002high
DolphinDB支持的数据导入方式支持多种文件格式导入、从数据库迁移、实时数据订阅等数据导入方式medium
CSV 文本导入示例(4.2)文件位置与权限要求导入的 csv 文件需要与 DolphinDB server 位于同一台服务器,且启动 DolphinDB server 的用户需要有该文件读权限high
4.2.1 一年快照数据导入(每天一个文件)数据规模设定一年份股票快照数据:每天一个 csv 文件,包含 100 只股票high
4.2.1 示例文件大小单文件大小(经验观察)单个文件约为 300MB(图示文件约 335MB~339MB;文字为“约 300M”)medium
批量写入策略(大文件)经验建议对于几百 MB 的文件数据作为一批写入较为合适medium
DolphinDB 事务与并行写入约束不能多线程往同一个分区写数据,否则会造成分区冲突;用户必须保证每个线程写入的数据对应不同的分区high
4.2.1 并发写入(按天文件)分区冲突风险判断由于本场景每天存储在一个文件,因此文件并发写入不会造成分区冲突medium
submitJob用途可为每个文件的写入创建一个后台作业进行多线程并行处理high
workerNum 配置项影响批处理作业的并行度受到 workerNum 影响;默认发行版配置为 4high
submitJob默认任务并行度submitJob 默认的任务并行度是 2high
submitJobEx能力可通过参数 parallelism 配置以提高任务并行度high
loadTextEx用途(示例)用于向分布式表导入文本数据,并可指定 partitionColumns、schema、sortColumns 等high
getRecentJobs用途用于查看最近写入任务/后台作业状态high
4.2.1 模拟数据导入任务数量交易日数量242 日交易日(每日一个文件)high
4.2.1 模拟数据总行数期望总记录数242 * 480200 = 116,208,400high
4.2.1 count(*) 查询结果输出示例116,208,400high
4.2.2 一天快照数据导入(每个股票一个文件)数据规模设定一天数据:每个股票一个 csv 文件,共 500 只股票high
4.2.2 小文件大小(图示)单文件大小约 3.3MB 左右(示例截图说明)medium
小文件导入策略性能问题原因每个小文件单独起线程写入效率低,因为单批写入量小但每批对应一个写入事务,事务开销较大medium
DolphinDB(小文件场景)推荐做法先载入文件到内存并合并多个文件后再导入数据库high
小文件合并导入分区冲突规避原则为避免分区冲突,将属于同一 HASH 桶的文件数据作一批导入(或按同一股票多天文件合并,原则是不造成分区冲突)high
tableInsert返回值会返回写入的数据条数high
submitJob jobDesc 参数用途(示例)作业描述中记录了 HASH 对应的分区信息(用于排查与定位异常)high
getRecentJob / getJobReturn用途(异常排查)可通过 getRecentJob 查看 msgError 检查失败任务,或通过 getJobReturn 获取作业写入条数定位问题作业high
dropPartition用途(异常处理)可删除写异常分区后重写(示例给出 /20230104/Key0 分区路径)high
4.2.3 逐笔成交数据导入场景描述一天的股票逐笔成交数据,每天一个 csv 文件,共计 500 只股票high
示例数据库 dfs://stock_trade分区与引擎partitioned by VALUE(2020.01.01..2021.01.01), HASH([SYMBOL, 50]); engine='TSDB'high
示例表 dfs://stock_trade.trade表属性partitioned by TradeDate, SecurityID; sortColumns=[`SecurityID,`TradeTime]; keepDuplicates=ALLhigh
4.2.3 模拟数据范围生成天数仅模拟 3 日数据作为示例(getMarketCalendar 2022.01.01..2022.01.06 并保存对应日期文件)medium
4.2.3 大文件大小(图示)单日文件大小如 2022.01.04 文件大小达到 1.5GB(截图说明)medium
textChunkDS用途可将大文本文件按指定大小划分为多个小文件数据源,再通过 mr 写入数据库high
mr + textChunkDS与 loadTextEx 的差异点二者都分块读文件写库;mr+textChunkDS 允许更灵活配置拆分的数据源大小high
mr 并行写入分区冲突风险与规避数据源按文件大小而非分区列划分,直接并行写可能造成分区冲突;需配置 parallel=false 使数据源串行写入high
mr 的 mapFunc(示例说明)可选写库函数mapFunc 可设置为 append! 或 tableInserthigh
4.2.3 示例导入校验count(*) 输出示例14,401,000high
数据预处理(入库前)包含内容类型转换、字段处理(合并/过滤/重排序/增加)、记录处理(异常过滤、跳过/过滤记录)high
时间字段解析(场景1)问题直接 loadText/loadTextEx 导入时 tradeTime 字段可能无法被识别为时间戳(示例中 tradetime 被推断为 LONG)high
extractTextSchema + schema 修改能力(场景1)将 tradetime 的 type 设置为 TIMESTAMP 后可被正确解析为 TIMESTAMP 并查询得到时间值high
时间戳解析限制并不是所有时间戳格式 DolphinDB 都会自动匹配(引出场景2)high
时间字段解析(场景2)现象将 tradetime 设置为 TIMESTAMP 直接 loadText 导入不报错但时间列变成空值high
temporalParse用途(场景2)可先将时间字段以 STRING 导入,再用 temporalParse("HH:mm:ss.SSS dd/MM/yyyy") 转换为 TIMESTAMPhigh
loadTextEx transform 参数用途在导入分布式表时可在 transform 指定函数中定义字段转换逻辑(如 temporalParse 转换)high
loadTextEx常用能力导入文本到分布式表时可通过 transform 实现字段类型修改、增删字段、记录修改等处理逻辑high
loadTextEx参数(文中列举)skipRows、delimiter、containHeader、arrayDelimiter、arrayMarkerhigh
字段处理相关函数(表4-2)函数清单dropColumns!, replaceColumn!, addColumn, rename!, reorderColumns!, setColumnCommenthigh
transform 参数函数限制约束transform 参数仅接受指定函数为一元函数;示例使用部分应用固定 sid 和 orderCols 使其成为一元函数high
fixedLengthArrayVector用途可将多列合并成一列数组向量(示例用于 offerPx1...offerPx10 等合并)high
DolphinDB文本文件编码支持仅支持导入 UTF8 和 ANSI 编码文件high
GBK 编码 CSV 导入现象直接导入 GBK 编码 csv 在 DolphinDB 中显示乱码high
convertEncode / toUTF8用途可用于导入时进行编码转换(示例:toUTF8(name,"gbk");toUTF8(t,"gbk"))high
DolphinDB 社区版本内存限制8G 的内存限制high
OOM 时 getRecentJob errorMsg可能出现的报错示例Out of memory;[appendDFSTablet] Failed to save table on data node, with error: Out of memory. RefId:S01077high
cancelJob用途(单机)可取消单机的后台作业:cancelJob(getRecentJob(242).jobId)high
cancelJobEx用途(集群)可取消集群节点上所有后台作业(use ops; cancelJobEx())high
objs(true)用途(内存排查)可统计当前所有临时变量的内存占用(示例:exec sum(bytes) \ 1024 \ 1024 from objs(true))high
undef用途(内存排查)可清除临时变量:undef(obj,[objType=VAR]) 或 undef allhigh
getSessionMemoryStat()用途(内存排查)获取所有缓存的内存占用(单位:B)high
缓存清理方法(表4-3)清理接口示例__SharedTable__:undef("sharedTableName", SHARED);__OLAPTablet__:clearAllCache();__OLAPCacheEngine__:flushOLAPCache();__TSDBCacheEngine__:flushTSDBCache();__TSDBLevelFileIndex__:invalidateLevelIndexCache()high
写入时报错列字段不一致典型报错The column [xxx] expects type of DATETIME, but the actual type is DOUBLE.high
DolphinDB 写入机制限制不支持表字段按字段名自动对应,也不支持 insert into 指定字段名写入;需确保写入数据类型与字段顺序与表定义完全一致high
VALUE 分区写入无效排查配置项检查 newValuePartitionPolicy 是否设置为 add(默认下载配置文件为 add,支持 VALUE 分区自动增加),或调用 addValuePartitions 手动增加分区high
RANGE 分区限制与处理不支持自增;若写入数据范围超过建库范围需调用 addRangePartitions 增加分区后再写入high
技能认证特训营第二期报名链接https://www.qingsuyun.com/h5/e/217471/5/high