CTP插件
端口号需要注意,我在使用中发现,
1. 查询合约需要交易端口号;
2. 订阅行情需要行情端口号;
这里提供一个python调用的代码仅供参考
from typing import Dict, List, Tuple, Union
import numpy as np
import pandas as pd
from loguru import logger
from DolphindbClient import DolphinDB
class CTPDataService:
def __init__(
self,
ip: str,
trade_port: int,
market_port: int,
username: str,
password: str,
borker_id: str,
app_id: str,
auth_code: str,
) -> None:
self.ip = ip
# 行情需要行情端口,查询合约需要交易端口
# trade_port:交易端口,market_port:行情端口
self.port = list(map(int,[trade_port, market_port]))
self.username = username
self.password = password
self.borker_id = borker_id
self.app_id = app_id
self.auth_code = auth_code
self.table_name = "ctpMarketData"
self.client = DolphinDB()
# 连接ctp
self._ctp_connection()
def _remove_code(self, code: str) -> None:
"""从订阅列表中移除指定的代码"""
if hasattr(self, "subscribe_codes") and code in self.subscribe_codes:
self.subscribe_codes.remove(code)
def _exist_pub_table(self) -> bool:
"""检查发布表是否存在"""
if not hasattr(self, "handle_session"):
# 报错handle_session不存在
raise NameError("handle_session不错在,先执行ctp连接")
if not hasattr(self, "table_name"):
raise NameError("table_name不存在,先执行create_ctp_publish_table")
return self.handle_session.run(f"existsStreamTable(`{self.table_name});")
def _ctp_connection(self) -> None:
"""创建ctp连接"""
expr: str = f"""
ctp_handle = ctp::connect("{self.ip}",{self.port[1]},dict(["ReceivedTime", "ConcatTime", "OutputElapsed"], [true, true, true]));
"""
# ctp_handle会在此节点上
self.handle_session = self.client.get_session()
self.handle_session.run(expr)
logger.success(f"ctp接口连接成功!")
def get_all_securities_info(self, exchange: str = None) -> pd.DataFrame:
"""
获取所有证券信息。
:param exchange: 交易所代码,可选。如果未提供,则查询所有交易所的证券信息。
:type exchange: str, optional
:return: 包含所有证券信息的DataFrame。
:rtype: pandas.DataFrame
"""
if exchange is None:
expr: str = f"""
ctp::queryInstrument("{self.ip}", {self.port[0]}, "{self.username}", "{self.password}", "{self.borker_id}", "{self.app_id}", "{self.auth_code}");
"""
else:
expr: str = f"""
ctp::queryInstrument("{self.ip}", {self.port[0]}, "{self.username}", "{self.password}", "{self.borker_id}", "{self.app_id}", "{self.auth_code}","{exchange}");
"""
session = self.client.get_session()
return session.run(expr)
@logger.catch
def create_ctp_publish_table(
self, table_name: str = None, capacity: int = 1500000
) -> None:
"""
创建一个CTP发布表。
此方法创建一个CTP发布表,并启用表共享和持久化功能。表的模式从CTP句柄中获取,并根据指定的容量创建流数据表。
:param table_name: 发布表的名称,默认为 "ctpMarketData"。
:type table_name: str
:param capacity: 发布表的容量,默认为1500000。
:type capacity: int
:return: 无返回值。
:rtype: None
"""
logger.info(f"创建{table_name}发布表...")
if table_name is None:
table_name = self.table_name
else:
self.table_name = table_name
expr: str = f"""
def createPubTable(handle)
{{
schema = ctp::getSchema(handle, `marketData);
pubTable = streamTable({capacity}:0,schema.name, schema.typeInt);
// 表示当流数据表数据量达到100万行时启用持久化,将其中50%的数据采用异步方式压缩保存到磁盘
enableTableShareAndPersistence(table=pubTable, tableName=`{table_name}, cacheSize=1000000, preCache=500000);
}}
"""
self.handle_session.run(expr)
self.handle_session.run("createPubTable(ctp_handle)")
logger.success(f"{table_name}发布表,创建成功!")
@logger.catch
def subscribe_ctp_data(self, codes: Union[str, List] = None) -> None:
"""
订阅CTP行情数据。
:param codes: 合约代码,可以是字符串或字符串列表。如果为None,则订阅所有合约。
:type codes: Union[str, List], optional
:raises ValueError: 如果发布表不存在或指定的合约代码不存在,则抛出异常。
:return: None
"""
# 订阅发布表
if not self._exist_pub_table():
raise ValueError("发布表不存在,先执行create_ctp_publish_table")
logger.info("开始订阅行情...")
securities_frame: pd.DataFrame = self.get_all_securities_info()
if codes is None:
codes: List = securities_frame["InstrumentID"].tolist()
elif isinstance(codes, str):
df: pd.DataFrame = securities_frame.query("InstrumentID==@codes")
if df.empty:
raise ValueError(f"{codes}不存在")
codes: List[str] = [codes]
elif isinstance(codes, list):
df: pd.DataFrame = securities_frame.query("InstrumentID in @codes")
if df.empty:
raise ValueError(f"{codes}不存在")
self.handle_session.run(
f"ctp::subscribe(ctp_handle, `marketData, objByName(`{self.table_name}), {codes});"
)
self.subscribe_codes: List[str] = codes
logger.success("订阅成功")
@logger.catch
def unsubscribe(self, codes: Union[str, List[str]] = None) -> None:
"""
取消订阅指定的代码。
:param codes: 要取消订阅的代码,可以是字符串或字符串列表。如果未提供,则默认取消订阅所有已订阅的代码。
:type codes: Union[str, List[str]], optional
:raises ValueError: 如果codes既不是字符串也不是字符串列表,则抛出此异常。
"""
if codes is None:
if not hasattr(self, "subscribe_codes"):
raise ValueError("为查询到subscribe_codes,需要手动指定取消订阅的codes!")
codes: List[str] = self.subscribe_codes
if isinstance(codes, str):
codes: List[str] = [codes]
if not isinstance(codes, (str, list)):
raise ValueError("codes must be either a string or a list of strings")
self.handle_session.run(f"ctp::unsubscribe(ctp_handle, `marketData, {codes})")
for code in codes:
self._remove_code(code)
logger.success("订阅取消成功!")
def get_market_data_schema(self) -> pd.DataFrame:
"""查询ctp marketData表结构"""
return self.handle_session.run("ctp::getSchema(ctp_handle, `marketData);")
def get_subscrib_status(self) -> pd.DataFrame:
"""获取订阅状态信息。"""
return self.handle_session.run("ctp::getStatus(ctp_handle)")
def close(self) -> None:
"""关闭连接"""
self.handle_session.run("ctp::close(ctp_handle)")
logger.success("关闭CTP连接!")
def dropPubTable(self, table_name: str = None) -> None:
if table_name is None:
table_name = self.table_name
self.handle_session.run(f"dropStreamTable(`{table_name})")
logger.success(f"丢弃{table_name}发布表,内存中和磁盘上的流数据均会被清除!")使用说明
CTPDataService用于连接期货CTP接口
参数说明:
- ip:为string,CTP交易服务器IP;
- trade_port: CTP 交易服务器的端口;
- market_port:CTP行情端口号;
- username:资金账户;
- password:交易密码;
- borker_id:经纪公司代码;
- app_id:客户端认证的 App 代码;
- auth_code:客户端认证请求的认证码(授权码)
注:trade_port为交易端口用于查询合约列表,market_port为行情端口用于订阅行情。
# ctp配置
config: Dict = {
"ip": "180.xxx.xxx.52",
"trade_port": "交易端口",
"market_port":"行情端口",
"username": "资金密码",
"password": "交易密码",
"borker_id": "1x8x",
"app_id": "client_xxxxxx_0.9.1",
"auth_code": "授权码",
}
# 连接CTP接口
ctp_service: CTPDataService = CTPDataService(**config)
# 查询合约列表,列名:InstrumentID
securities_info: pd.DataFrame = ctp_service.get_all_securities_info()
# 生成发布表,参数table_name可以指定发布表名,默认为:ctpMarketData
# ctp_service.create_ctp_publish_table(table_name="ctpPubTable"),生成一个名为ctpPubTable的发布表
ctp_service.create_ctp_publish_table()
# 订阅合约,参数coeds默认全部订阅,也可以根据查询后的合约指定订阅
# ctp_service.subscribe_ctp_data(["T2503","IC2410"]),订阅两个合约
ctp_service.subscribe_ctp_data()
# 取消订阅,code默认为之前订阅的code,也可以指定传入
ctp_service.unsubscribe()
# 删除发布表,table_name默认为:ctpMarketData
ctp_service.dropPubTable()