新闻

金融数据导入手册之:逐笔数据篇

2022.12.29

本文将概述数据导入前的准备工作和导入的核心步骤。完整教程已发布在官方知乎。

准备工作示例

数据导入的准备工作主要包含三个方面:

(1)数据源分析:从 DolphinDB 数据类型兼容性的角度分析数据源,选择满足建库建表要求的方案。

(2)规划存储方案:分析表连接需求,选择适合的存储方案。当没有表连接分析的需求时,推荐单库单表存储数据;当有表连接需求时,推荐一库多表存储数据。

(3)规划分区:对于 level2 逐笔数据的场景,我们推荐复合分区,先按日期做值分区,再按股票代码做 HASH 分区。

我们以上市委托数据 CSV 文件为例,使用 Linux 系统的 head 命令打开该文件:

可以看到,这个 CSV 文件有如下特点:

  • 第一行是文件说明,后续各种读取都需要跳过这一行;
  • 从第二行开始是数据,没有列名,在建表时需要根据数据的说明文档定义字段名称和字段类型。
  • 从左至右的字段名根据上市的说明文档定义为:SecurityID, TransactTime, valOrderNoue, Price, Balance, OrderBSFlag, OrdType, OrderIndex, ChannelNo, BizIndex。

其中 SecurityID, OrderBSFlag 和 OrdType 为重复较多的有限数量的字符串,使用 SYMBOL 类型;TransactTime 为从年到毫秒的日期,使用 TIMESTAMP 数据类型;其它的字段没有特殊之处,整数用 INT,浮点数用 DOUBLE。

所以,从左至右存储字段的数据类型定义为:SYMBOL, TIMESTAMP, INT, DOUBLE, INT, SYMBOL, SYMBOL, INT, INT, INT。

本教程推荐选用 TSDB 引擎。上市每天逐笔委托数据大小在 3GB 左右,先按日期做值分区,再用股票代码做7个 HASH 分区。按日期值分区时,VALUE 的初始值写两三天的初始值即可,实际分区值会根据数据的实际日期自动扩展。

数据导入与清洗转换

DolphinDB 导入数据的核心函数是 loadTextEx,可用于 CSV 文件读取、数据清洗和入库一体化操作。导入数据核心代码如下:

db = database("dfs://sh_entrust")def transType(mutable memTable){  return memTable.replaceColumn!(`col0,string(memTable.col0)).replaceColumn!(`col1,datetimeParse(string(memTable.col1),"yyyyMMddHHmmssSSS")).replaceColumn!(`col5,string(memTable.col5)).replaceColumn!(`col6,string(memTable.col6))}filePath = "/home/ychan/data/loadForPoc/SH/Order/20210104/Entrust.csv"loadTextEx(dbHandle = db, tableName = `entrust, partitionColumns = `col1`col0, filename = filePath, skipRows = 1,transform = transType)

核心代码中,使用了 loadTextEx 函数,其中 transform 参数引用了 transType 函数定义,其作用是数据清洗和类型转换。

transform 能够非常方便地完成但不限于如下需求:

(1)转换数据类型

例如,当传入的 SecurityID 数据类型为整型,不符合 SYMBOL 的要求而报错时,可以使用 transType 函数自定义转换数据类型,赋给 transform 参数后再执行导入语句。相关代码如下:

def transType(mutable memTable){   return memTable.replaceColumn!(`col0,string(memTable.col0)).replaceColumn!(`col1,datetimeParse(string(memTable.col1),"yyyyMMddHHmmssSSS")).replaceColumn!(`col5,string(memTable.col5)).replaceColumn!(`col6,string(memTable.col6))}

(2)在 CSV 文件的基础上增加列

有时 CSV 文件会缺少某些列,比如缺少日期,但通过文件名给出了日期信息。这种情况下我们可以通过 transform 参数引用的函数,增加列并赋值。代码如下:

def addCol(mutable memTable,datePara){    update memTable set date = datePara    return memTable}

(3)过滤无效数据

当需要把 CSV 文件中的无效数据过滤掉再写入分布式表时,可以在 transform 参数引用的函数中使用 select 语句筛选出符合条件的数据。例如只写入价格大于0的数据,函数定义的代码为:

def fliterData(mutable memTable){    return select * from memTable where price > 0}

(4)转换字符编码

为了显示正常,有时候需要把 GBK 编码的列转成 UTF-8。transform 参数引用的函数的代码为:

def addCol(mutable memTable){    return mutable.replaceColumn!(`custname,toUTF8(mutable.custname,`gbk))}

(5)导入部分列

可通过在 transform 参数引用的函数中筛选出所需列,实现导入部分列,代码如下:

def partCol(mutable memTable){    return select [需要的部分列名] from memTable}


常见问题

问:只提交了一个文件的导入,长时间执行不完,硬盘也没有写入,这是什么原因?


答:因为单个 CSV 文件太大,缓存不够用。先把 OLAPCacheEngineSize 和 TSDBCacheEngineSize 两个参数的值修改为大于 CSV 文件的大小,再重启系统即可。


问:执行过程中,报 out of memory 错误,怎么处理?


答:1)如果使用的是社区版本 license,请联系负责支持的销售人员,获取试用版本 license。2)查看 maxMemSize 参数的配置是否远小于系统内存,建议配置为系统内存的80%。3)检查 workerNum 和 localExecutors 配置,合理配置值的计算方法为:可用内存除以单个文件大小向下取整得到 workerNum 的值,localExecutors 的值为 workerNum 减 1。


问:nsf 系统导入时,报 Bad file descriptor 错误,怎么解决?


答:nfs 文件需要用 v3 版本,并设置 local_lock 参数为 all 的方式进行挂载。


问:数据如何去重?


答:建表时指定 keepDuplicates 参数的值可以去重,提供以下选项:

  • ALL:保留所有数据
  • LAST:仅保留最新数据
  • FIRST:仅保留第一条数据