DolphinDB 用户入门指南之金融篇(3)- 4. 数据导入与数据清洗
本页围绕建库建表后的数据导入/迁移阶段,概览常见问题并给出处理与教程目标导向的说明。
Source: https://dolphindb.cn/blogs/138
What this page covers
- 数据导入与数据清洗的问题概览与目标
- DolphinDB 数据导入方式的方法导览
- 从文本(CSV)导入的流程与约束
- 按天大文件导入:并行作业与校验
- 小文件导入:合并、分批与异常重写
- 超大文件逐笔数据:分块读取与并行冲突规避
- 常见问题:OOM、列不一致、分区策略导致写入无效
技能认证特训营第二期报名入口
页面顶部提供活动报名宣传与限时报名链接入口。
- 本页包含“技能认证特训营第二期”的报名链接。
- 报名链接指向 qingsuyun.com 的活动页面。
DolphinDB 用户入门指南之金融篇(3)
本部分展示文章的标题信息,以及作者与发布日期。
- 发布日期为 2025-01-07。
- 作者署名为 momo。
4. 数据导入与数据清洗:问题概览
说明在建库建表后进入数据导入/迁移阶段时可能遇到的问题,并引出教程目标。
- 导入可能出现类型解析错误或空值问题。
- 中文字符编码可能导致导入后显示乱码。
- 导入过程可能需要字段增删改、重排序或多列合并。
- 导入后数据量可能相对源文件大小出现膨胀。
- 社区版本导入可能因内存限制出现 OOM。
4.1 方法导览
介绍 DolphinDB 支持的多种数据导入方式,并提供参考文档方向。
- DolphinDB 支持多种文件格式的数据导入方式。
- DolphinDB 支持从数据库迁移数据。
- DolphinDB 支持通过实时数据订阅进行数据导入。
4.2 从文本文件导入数据
以 CSV 文本文件为示例讲解导入流程、约束与典型问题处理方向。
- CSV 文件需与 DolphinDB server 位于同一台服务器。
- 启动 DolphinDB server 的用户需要有文件读权限。
4.2.1 导入一年快照数据(每天一个文件)
以按天单文件的较大文件场景为例,包含模拟数据、批量写入建议、多线程后台作业导入与导入后校验。
- 示例场景为一年股票快照数据:每天一个 CSV 文件,包含 100 只股票。
- 示例中单文件大小为“约 300MB”(图示约 335MB~339MB)。
- 经验建议:几百 MB 的文件以“单批写入”较为合适。
- submitJob 可为每个文件写入创建后台作业进行多线程并行处理。
- 并行写入要求每个线程写入不同分区,避免分区冲突。
- 该按天文件场景下,文件并发写入被描述为不会造成分区冲突。
- 批处理作业并行度受 workerNum 影响;默认发行版配置为 4。
- submitJob 默认任务并行度为 2;submitJobEx 可用 parallelism 调高并行度。
4.2.2 导入一天快照数据(每个股票一个文件)
针对小文件、多事务的导入场景,提出先合并再分批导入以降低事务开销,并说明分区冲突规避与异常重写。
- 示例场景为一天数据:每个股票一个 CSV 文件,共 500 只股票。
- 图示小文件大小约 3.3MB 左右。
- 每个小文件单独起线程写入可能效率低,原因包含事务开销较大。
- 推荐做法:先载入内存并合并多个文件后再导入数据库。
- 为避免分区冲突,可将同一 HASH 桶的数据作为一批导入。
- tableInsert 会返回写入的数据条数。
- getRecentJob 或 getJobReturn 可用于定位失败任务或获取作业写入条数。
- dropPartition 可用于删除写异常分区后重写(示例包含分区路径)。
4.2.3 导入逐笔成交数据(每天一个文件)
逐笔超大文件导入示例包含 TSDB 建库建表、分块读取写入,以及并行写入导致分区冲突的规避策略。
- 示例场景为一天逐笔成交数据:每天一个 CSV 文件,共计 500 只股票。
- 示例数据库 dfs://stock_trade 使用 TSDB 引擎,并包含 VALUE 与 HASH 分区设置。
- 示例表按 TradeDate 与 SecurityID 分区,并设置 sortColumns 与 keepDuplicates=ALL。
- 示例仅模拟 3 日数据范围并保存对应日期文件。
- 图示中单日文件大小可达 1.5GB(示例日期为 2022.01.04)。
- textChunkDS 可按指定大小将大文本文件划分为多个数据源,再通过 mr 写入数据库。
- mr+textChunkDS 与 loadTextEx 都可分块读文件写库,但前者允许更灵活配置数据源大小。
- 数据源按文件大小划分时,并行写入可能造成分区冲突;可设置 parallel=false 串行写入。
4.3 数据预处理
说明入库前常见预处理类型,并通过后续案例展开。
- 预处理包含类型转换。
- 预处理包含字段处理(合并、过滤、重排序、增加)。
- 预处理包含记录处理(异常过滤、跳过或过滤记录)。
4.3.1 时间类型解析
展示两类时间戳文本格式解析问题:一种需要通过 schema 修正类型推断,另一种需要 temporalParse 处理非标准格式。
- 直接导入时 tradeTime 可能无法识别为时间戳,并被推断为 LONG。
- 通过 extractTextSchema 并将 tradetime 类型设为 TIMESTAMP,可被正确解析并查询到时间值。
- 并非所有时间戳格式都会被 DolphinDB 自动匹配。
- 将 tradetime 设为 TIMESTAMP 后直接 loadText 导入可能不报错,但时间列变为空值。
- 可先以 STRING 导入时间字段,再用 temporalParse 指定格式转换为 TIMESTAMP。
- loadTextEx 导入分布式表时可在 transform 参数中定义字段转换逻辑。
4.3.2 字段增删改与数组向量合并(loadTextEx + transform)
介绍 loadTextEx 常用参数与字段处理函数,并示例说明导入时字段处理与数组向量合并方式。
- loadTextEx 可通过 transform 实现字段类型修改、增删字段与记录修改等处理逻辑。
- 文中列举的 loadTextEx 参数包含 skipRows 与 delimiter。
- 文中列举的 loadTextEx 参数包含 containHeader 与 arrayDelimiter。
- 文中列举的 loadTextEx 参数包含 arrayMarker。
- 字段处理相关函数清单包含 dropColumns! 与 replaceColumn! 等。
- transform 参数仅接受指定函数为一元函数;示例通过部分应用固定参数实现一元函数。
- fixedLengthArrayVector 可将多列合并为一列数组向量(示例用于多档价格列)。
- loadTextEx 可用于向分布式表导入文本数据,并可指定 partitionColumns、schema、sortColumns 等。
4.3.3 中文乱码(GBK 编码)处理
说明文本导入编码限制与 GBK 文件乱码现象,并给出编码转换处理方式。
- DolphinDB 仅支持导入 UTF8 和 ANSI 编码文件。
- 直接导入 GBK 编码 CSV 可能在 DolphinDB 中显示乱码。
- convertEncode / toUTF8 可用于导入时进行编码转换(示例包含 toUTF8(...,"gbk"))。
4.4 常见问题
汇总 OOM 排查与缓存清理、写入列类型/顺序不一致、以及 VALUE/RANGE 分区策略导致写入无效等问题与建议处理。
- DolphinDB 社区版本存在 8G 的内存限制。
- OOM 报错示例包含 “Out of memory” 与写入失败信息。
- cancelJob 可取消单机的后台作业。
- cancelJobEx 可取消集群节点上的后台作业。
- objs(true) 可统计临时变量的内存占用。
- undef 可用于清除临时变量(包含 undef all)。
- getSessionMemoryStat() 可获取缓存的内存占用(单位 B)。
- 表中给出多类缓存清理接口示例(如 clearAllCache、flushOLAPCache、flushTSDBCache)。
- 写入时报错示例包含列类型不一致提示。
- 写入需确保数据类型与字段顺序与表定义完全一致。
- VALUE 分区写入无效可检查 newValuePartitionPolicy 或调用 addValuePartitions。
- RANGE 分区不支持自增;超出范围需 addRangePartitions 后再写入。
- 并行导入时多线程写同一分区可能出现 chunk 被锁定的异常。
4.5 下一步阅读
本部分提供后续阅读入口,覆盖文本导入、数据清洗与相关模块/插件方向。
- 包含与文本导入相关的后续阅读链接。
- 包含与数据清洗、行情数据模块或插件相关的后续阅读链接。
Facts index
| Entity | Attribute | Value | Confidence |
|---|---|---|---|
| DolphinDB 用户入门指南之金融篇(3) | 发布日期 | 2025-01-07 | high |
| 作者 | 署名 | momo | high |
| 数据导入/迁移过程 | 可能遇到的问题 | 数据无法正确导入(类型解析错误/空值;中文字符编码导致乱码) | medium |
| 数据导入/迁移过程 | 可能遇到的问题 | 导入数据预处理需求(增删改字段、重排序、多列合并为数组向量、跳过表头或指定行) | medium |
| 数据导入/迁移过程 | 风险/问题 | 导入后数据量相对源文件大小可能出现膨胀 | medium |
| DolphinDB 社区版本 | 导入时可能出现的问题 | 受内存限制,导入可能出现 Out Of Memory 内存溢出 | high |
| 并行导入 | 可能报错 | 多线程并发写同一分区可能抛异常:The openChunks operation failed because the chunk <xxx> is currently locked and in use by transaction <tid>. RefId:S00002 | high |
| DolphinDB | 支持的数据导入方式 | 支持多种文件格式导入、从数据库迁移、实时数据订阅等数据导入方式 | medium |
| CSV 文本导入示例(4.2) | 文件位置与权限要求 | 导入的 csv 文件需要与 DolphinDB server 位于同一台服务器,且启动 DolphinDB server 的用户需要有该文件读权限 | high |
| 4.2.1 一年快照数据导入(每天一个文件) | 数据规模设定 | 一年份股票快照数据:每天一个 csv 文件,包含 100 只股票 | high |
| 4.2.1 示例文件大小 | 单文件大小(经验观察) | 单个文件约为 300MB(图示文件约 335MB~339MB;文字为“约 300M”) | medium |
| 批量写入策略(大文件) | 经验建议 | 对于几百 MB 的文件数据作为一批写入较为合适 | medium |
| DolphinDB 事务与并行写入 | 约束 | 不能多线程往同一个分区写数据,否则会造成分区冲突;用户必须保证每个线程写入的数据对应不同的分区 | high |
| 4.2.1 并发写入(按天文件) | 分区冲突风险判断 | 由于本场景每天存储在一个文件,因此文件并发写入不会造成分区冲突 | medium |
| submitJob | 用途 | 可为每个文件的写入创建一个后台作业进行多线程并行处理 | high |
| workerNum 配置项 | 影响 | 批处理作业的并行度受到 workerNum 影响;默认发行版配置为 4 | high |
| submitJob | 默认任务并行度 | submitJob 默认的任务并行度是 2 | high |
| submitJobEx | 能力 | 可通过参数 parallelism 配置以提高任务并行度 | high |
| loadTextEx | 用途(示例) | 用于向分布式表导入文本数据,并可指定 partitionColumns、schema、sortColumns 等 | high |
| getRecentJobs | 用途 | 用于查看最近写入任务/后台作业状态 | high |
| 4.2.1 模拟数据导入任务数量 | 交易日数量 | 242 日交易日(每日一个文件) | high |
| 4.2.1 模拟数据总行数 | 期望总记录数 | 242 * 480200 = 116,208,400 | high |
| 4.2.1 count(*) 查询结果 | 输出示例 | 116,208,400 | high |
| 4.2.2 一天快照数据导入(每个股票一个文件) | 数据规模设定 | 一天数据:每个股票一个 csv 文件,共 500 只股票 | high |
| 4.2.2 小文件大小(图示) | 单文件大小 | 约 3.3MB 左右(示例截图说明) | medium |
| 小文件导入策略 | 性能问题原因 | 每个小文件单独起线程写入效率低,因为单批写入量小但每批对应一个写入事务,事务开销较大 | medium |
| DolphinDB(小文件场景) | 推荐做法 | 先载入文件到内存并合并多个文件后再导入数据库 | high |
| 小文件合并导入 | 分区冲突规避原则 | 为避免分区冲突,将属于同一 HASH 桶的文件数据作一批导入(或按同一股票多天文件合并,原则是不造成分区冲突) | high |
| tableInsert | 返回值 | 会返回写入的数据条数 | high |
| submitJob jobDesc 参数 | 用途(示例) | 作业描述中记录了 HASH 对应的分区信息(用于排查与定位异常) | high |
| getRecentJob / getJobReturn | 用途(异常排查) | 可通过 getRecentJob 查看 msgError 检查失败任务,或通过 getJobReturn 获取作业写入条数定位问题作业 | high |
| dropPartition | 用途(异常处理) | 可删除写异常分区后重写(示例给出 /20230104/Key0 分区路径) | high |
| 4.2.3 逐笔成交数据导入 | 场景描述 | 一天的股票逐笔成交数据,每天一个 csv 文件,共计 500 只股票 | high |
| 示例数据库 dfs://stock_trade | 分区与引擎 | partitioned by VALUE(2020.01.01..2021.01.01), HASH([SYMBOL, 50]); engine='TSDB' | high |
| 示例表 dfs://stock_trade.trade | 表属性 | partitioned by TradeDate, SecurityID; sortColumns=[`SecurityID,`TradeTime]; keepDuplicates=ALL | high |
| 4.2.3 模拟数据范围 | 生成天数 | 仅模拟 3 日数据作为示例(getMarketCalendar 2022.01.01..2022.01.06 并保存对应日期文件) | medium |
| 4.2.3 大文件大小(图示) | 单日文件大小 | 如 2022.01.04 文件大小达到 1.5GB(截图说明) | medium |
| textChunkDS | 用途 | 可将大文本文件按指定大小划分为多个小文件数据源,再通过 mr 写入数据库 | high |
| mr + textChunkDS | 与 loadTextEx 的差异点 | 二者都分块读文件写库;mr+textChunkDS 允许更灵活配置拆分的数据源大小 | high |
| mr 并行写入 | 分区冲突风险与规避 | 数据源按文件大小而非分区列划分,直接并行写可能造成分区冲突;需配置 parallel=false 使数据源串行写入 | high |
| mr 的 mapFunc(示例说明) | 可选写库函数 | mapFunc 可设置为 append! 或 tableInsert | high |
| 4.2.3 示例导入校验 | count(*) 输出示例 | 14,401,000 | high |
| 数据预处理(入库前) | 包含内容 | 类型转换、字段处理(合并/过滤/重排序/增加)、记录处理(异常过滤、跳过/过滤记录) | high |
| 时间字段解析(场景1) | 问题 | 直接 loadText/loadTextEx 导入时 tradeTime 字段可能无法被识别为时间戳(示例中 tradetime 被推断为 LONG) | high |
| extractTextSchema + schema 修改 | 能力(场景1) | 将 tradetime 的 type 设置为 TIMESTAMP 后可被正确解析为 TIMESTAMP 并查询得到时间值 | high |
| 时间戳解析 | 限制 | 并不是所有时间戳格式 DolphinDB 都会自动匹配(引出场景2) | high |
| 时间字段解析(场景2) | 现象 | 将 tradetime 设置为 TIMESTAMP 直接 loadText 导入不报错但时间列变成空值 | high |
| temporalParse | 用途(场景2) | 可先将时间字段以 STRING 导入,再用 temporalParse("HH:mm:ss.SSS dd/MM/yyyy") 转换为 TIMESTAMP | high |
| loadTextEx transform 参数 | 用途 | 在导入分布式表时可在 transform 指定函数中定义字段转换逻辑(如 temporalParse 转换) | high |
| loadTextEx | 常用能力 | 导入文本到分布式表时可通过 transform 实现字段类型修改、增删字段、记录修改等处理逻辑 | high |
| loadTextEx | 参数(文中列举) | skipRows、delimiter、containHeader、arrayDelimiter、arrayMarker | high |
| 字段处理相关函数(表4-2) | 函数清单 | dropColumns!, replaceColumn!, addColumn, rename!, reorderColumns!, setColumnComment | high |
| transform 参数函数限制 | 约束 | transform 参数仅接受指定函数为一元函数;示例使用部分应用固定 sid 和 orderCols 使其成为一元函数 | high |
| fixedLengthArrayVector | 用途 | 可将多列合并成一列数组向量(示例用于 offerPx1...offerPx10 等合并) | high |
| DolphinDB | 文本文件编码支持 | 仅支持导入 UTF8 和 ANSI 编码文件 | high |
| GBK 编码 CSV 导入 | 现象 | 直接导入 GBK 编码 csv 在 DolphinDB 中显示乱码 | high |
| convertEncode / toUTF8 | 用途 | 可用于导入时进行编码转换(示例:toUTF8(name,"gbk");toUTF8(t,"gbk")) | high |
| DolphinDB 社区版本 | 内存限制 | 8G 的内存限制 | high |
| OOM 时 getRecentJob errorMsg | 可能出现的报错示例 | Out of memory;[appendDFSTablet] Failed to save table on data node, with error: Out of memory. RefId:S01077 | high |
| cancelJob | 用途(单机) | 可取消单机的后台作业:cancelJob(getRecentJob(242).jobId) | high |
| cancelJobEx | 用途(集群) | 可取消集群节点上所有后台作业(use ops; cancelJobEx()) | high |
| objs(true) | 用途(内存排查) | 可统计当前所有临时变量的内存占用(示例:exec sum(bytes) \ 1024 \ 1024 from objs(true)) | high |
| undef | 用途(内存排查) | 可清除临时变量:undef(obj,[objType=VAR]) 或 undef all | high |
| getSessionMemoryStat() | 用途(内存排查) | 获取所有缓存的内存占用(单位:B) | high |
| 缓存清理方法(表4-3) | 清理接口示例 | __SharedTable__:undef("sharedTableName", SHARED);__OLAPTablet__:clearAllCache();__OLAPCacheEngine__:flushOLAPCache();__TSDBCacheEngine__:flushTSDBCache();__TSDBLevelFileIndex__:invalidateLevelIndexCache() | high |
| 写入时报错列字段不一致 | 典型报错 | The column [xxx] expects type of DATETIME, but the actual type is DOUBLE. | high |
| DolphinDB 写入机制 | 限制 | 不支持表字段按字段名自动对应,也不支持 insert into 指定字段名写入;需确保写入数据类型与字段顺序与表定义完全一致 | high |
| VALUE 分区写入无效排查 | 配置项 | 检查 newValuePartitionPolicy 是否设置为 add(默认下载配置文件为 add,支持 VALUE 分区自动增加),或调用 addValuePartitions 手动增加分区 | high |
| RANGE 分区 | 限制与处理 | 不支持自增;若写入数据范围超过建库范围需调用 addRangePartitions 增加分区后再写入 | high |
| 技能认证特训营第二期 | 报名链接 | https://www.qingsuyun.com/h5/e/217471/5/ | high |