获取binance合约的aggTrade, Markprice, Depth and forceOrder并存储到dolphindb中(1)

cloudQuant
2024-12-13

原先我有一个python版本的量化交易框架用于接收binance上的数据,但是占用的服务器资源比较多,所以重新用c++实现了一个版本, 使用websocket从binance上接收数据,然后存储到服务器dolphindb数据库,定时跑脚本, 同步到本地dolphindb数据库中,删除同步过后的数据,减少占用的服务器磁盘空间。

服务器配置

如果想要节省成本,1核2G的应该也能跑起来,带宽估计得比较高一些,每秒进来的数据量还是挺大的

  1. ubuntu 18.04 64位 server, 2c4g, 30M, 位置:日本东京
  2. dolphindb 3.0
  3. some packages needed to install
sudo apt update
sudo apt upgrade
sudo apt install build-essential  # 安装c/c++编译环境
# openssl
sudo apt-get install libssl-dev
# uuid
sudo apt-get install uuid-dev
# zlib
sudo apt-get install zlib1g-dev

安装 DolphinDB

  1. 下载 wget https://www.dolphindb.cn/downloads/DolphinDB_Linux64_V3.00.2.1.zip -O dolphindb.zip
  2. 解压到文件夹中: unzip dolphindb.zip -d dolphindb
  3. 进入server文件夹: cd ./dolphindb/server
  4. 启动dolphindb:./dolphindb, 第一次需要修改文件权限:chmod +x dolphindb
  5. 查询是否启动: ps aux|grep dolphindb
  6. 检查端口是否监听:netstat -tuln | grep 8848
  7. 访问http://{ip}:8488, 注意如果是腾讯云服务器,需要在防火墙-管理规则中配置8488的端口
  8. 查询cpu的运行频率:watch -n 1 "cat /proc/cpuinfo | grep "cpu MHz""

创建数据库

这个数据库是按照date_time和symbol进行分区的,仿照的是期货和股票的分区模式, 但是由于数字货币的数据量比较大,感觉可以按小时进行分区,这样性能应该能提高不少。

create database "dfs://binance_data"
partitioned by VALUE(2024.01.01..2028.01.01), HASH([SYMBOL, 25])
engine='TSDB';

create table "dfs://binance_data"."orderbook"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     ask_price_list DOUBLE[]
     bid_price_list DOUBLE[]
     ask_volume_list DOUBLE[]
     bid_volume_list DOUBLE[]
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;

create table "dfs://binance_data"."ticker"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     ask_price DOUBLE
     bid_price DOUBLE
     ask_volume DOUBLE
     bid_volume DOUBLE
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;

 create table "dfs://binance_data"."kline"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     open_time LONG
     local_update_time DOUBLE
     close_time DOUBLE
     symbol SYMBOL
     open_price DOUBLE
     high_price DOUBLE
     low_price DOUBLE
     close_price DOUBLE
     volume DOUBLE
     amount DOUBLE
     num_trades LONG
     taker_buy_base_asset_volume DOUBLE
     taker_buy_quote_asset_volume DOUBLE

 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`open_time],
 keepDuplicates=ALL;


create table "dfs://binance_data"."mark_price"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     mark_price DOUBLE
     index_price DOUBLE
     settlement_price DOUBLE
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;


 create table "dfs://binance_data"."funding_rate"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     current_funding_rate DOUBLE
     next_funding_rate DOUBLE
     next_funding_rate_time DOUBLE
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;

create table "dfs://binance_data"."agg_trade"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     trade_id STRING
     first_trade_id STRING
     last_trade_id STRING
     trade_type SYMBOL
     trade_price DOUBLE
     trade_volume DOUBLE
     trade_time DOUBLE
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;

 create table "dfs://binance_data"."force_order"(
     date_time DATETIME[comment="交易日期", compress="delta"]
     server_time LONG
     local_update_time DOUBLE
     symbol SYMBOL
     order_side SYMBOL
     order_type SYMBOL
     order_time_in_force SYMBOL
     trade_status SYMBOL
     order_price DOUBLE
     order_qty DOUBLE
     order_avg_price DOUBLE
     trade_time DOUBLE
     last_trade_volume DOUBLE
     total_trade_volume DOUBLE
 )
 partitioned by date_time, symbol,
 sortColumns=[`symbol,`server_time],
 keepDuplicates=ALL;

安装ccapi教程

原本的ccapi中没有获取forceOrder,markPirce,FundingRate的接口, 我增加了这几个接口,相比于原本版本做了一些改动。

git clone https://github.com/cloudQuant/dolphindb_demos.git
cd dolphindb_demos
# 在主目录下运行
mkdir build && cd build && 
cmake .. -DCMAKE_BUILD_TYPE=Release -DUSE_AERON=OFF -DUSE_OPENSSL=ON && 
make -j4 &&
cp libDolphinDBAPI.so ../examples/lib
# 进入examples下编译
cmake -B build -DABI=0 -DUSE_OPENSSL=1 && cmake --build build

运行

# 进入到dolphindb_demos同级目录下,如果不存在logs,创建一个,如果存在,直接运行nohup指令
mkdir logs
nohup ~/dolphindb_demos/examples/build/05_binance_public_to_dolphindb/binance_public_to_dolphindb dispatch_events_to_multiple_threads > ~/logs/binance_public_to_dolphindb.log 2>&1 &

在dolphindb终端中运行下面的代码,可以查询数据的量

t_agg_trade = loadTable("dfs://binance_data", "agg_trade")
t_force_order = loadTable("dfs://binance_data", "force_order")    
t_funding_rate = loadTable("dfs://binance_data", "funding_rate")
t_kline = loadTable("dfs://binance_data", "kline")
t_mark_price = loadTable("dfs://binance_data", "mark_price")
t_orderbook = loadTable("dfs://binance_data", "orderbook")
t_ticker = loadTable("dfs://binance_data", "ticker")
select count(*) from t_agg_trade
select count(*) from t_force_order
select count(*) from t_funding_rate
select count(*) from t_kline
select count(*) from t_mark_price
select count(*) from t_orderbook
select count(*) from t_ticker

注:windows下还没有编译成功,正在尝试中,等后续编译成功了,再分享一下