CTP插件

hugo
2024-09-20

端口号需要注意,我在使用中发现,

1. 查询合约需要交易端口号;

2. 订阅行情需要行情端口号;

这里提供一个python调用的代码仅供参考

from typing import Dict, List, Tuple, Union

import numpy as np
import pandas as pd
from loguru import logger

from DolphindbClient import DolphinDB


class CTPDataService:

    def __init__(
        self,
        ip: str,
        trade_port: int,
        market_port: int,
        username: str,
        password: str,
        borker_id: str,
        app_id: str,
        auth_code: str,
    ) -> None:
        
        self.ip = ip
        # 行情需要行情端口,查询合约需要交易端口
        # trade_port:交易端口,market_port:行情端口
        self.port = list(map(int,[trade_port, market_port]))
        self.username = username
        self.password = password
        self.borker_id = borker_id
        self.app_id = app_id
        self.auth_code = auth_code

        self.table_name = "ctpMarketData"
        self.client = DolphinDB()
        # 连接ctp
        self._ctp_connection()

    def _remove_code(self, code: str) -> None:
        """从订阅列表中移除指定的代码"""
        if hasattr(self, "subscribe_codes") and code in self.subscribe_codes:
            self.subscribe_codes.remove(code)

    def _exist_pub_table(self) -> bool:
        """检查发布表是否存在"""
        if not hasattr(self, "handle_session"):
            # 报错handle_session不存在
            raise NameError("handle_session不错在,先执行ctp连接")

        if not hasattr(self, "table_name"):

            raise NameError("table_name不存在,先执行create_ctp_publish_table")

        return self.handle_session.run(f"existsStreamTable(`{self.table_name});")

    def _ctp_connection(self) -> None:
        """创建ctp连接"""
        expr: str = f"""
        ctp_handle = ctp::connect("{self.ip}",{self.port[1]},dict(["ReceivedTime", "ConcatTime", "OutputElapsed"], [true, true, true]));
        """
        # ctp_handle会在此节点上
        self.handle_session = self.client.get_session()

        self.handle_session.run(expr)
        logger.success(f"ctp接口连接成功!")

    def get_all_securities_info(self, exchange: str = None) -> pd.DataFrame:
        """
        获取所有证券信息。
        :param exchange: 交易所代码,可选。如果未提供,则查询所有交易所的证券信息。
        :type exchange: str, optional
        :return: 包含所有证券信息的DataFrame。
        :rtype: pandas.DataFrame
        """
        if exchange is None:
            expr: str = f"""
            ctp::queryInstrument("{self.ip}", {self.port[0]}, "{self.username}", "{self.password}", "{self.borker_id}", "{self.app_id}", "{self.auth_code}");
            """
        else:
            expr: str = f"""
            ctp::queryInstrument("{self.ip}", {self.port[0]}, "{self.username}", "{self.password}", "{self.borker_id}", "{self.app_id}", "{self.auth_code}","{exchange}");
            """
        session = self.client.get_session()
        return session.run(expr)

    @logger.catch
    def create_ctp_publish_table(
        self, table_name: str = None, capacity: int = 1500000
    ) -> None:
        """
        创建一个CTP发布表。
        此方法创建一个CTP发布表,并启用表共享和持久化功能。表的模式从CTP句柄中获取,并根据指定的容量创建流数据表。
        :param table_name: 发布表的名称,默认为 "ctpMarketData"。
        :type table_name: str
        :param capacity: 发布表的容量,默认为1500000。
        :type capacity: int
        :return: 无返回值。
        :rtype: None
        """

        logger.info(f"创建{table_name}发布表...")

        if table_name is None:
            table_name = self.table_name
        else:
            self.table_name = table_name

        expr: str = f"""
        def createPubTable(handle)
            {{
                schema = ctp::getSchema(handle, `marketData);
                pubTable = streamTable({capacity}:0,schema.name, schema.typeInt);
                // 表示当流数据表数据量达到100万行时启用持久化,将其中50%的数据采用异步方式压缩保存到磁盘
                enableTableShareAndPersistence(table=pubTable, tableName=`{table_name}, cacheSize=1000000, preCache=500000);
            }}
        """
        self.handle_session.run(expr)
        self.handle_session.run("createPubTable(ctp_handle)")
        logger.success(f"{table_name}发布表,创建成功!")

    @logger.catch
    def subscribe_ctp_data(self, codes: Union[str, List] = None) -> None:
        """
        订阅CTP行情数据。

        :param codes: 合约代码,可以是字符串或字符串列表。如果为None,则订阅所有合约。
        :type codes: Union[str, List], optional
        :raises ValueError: 如果发布表不存在或指定的合约代码不存在,则抛出异常。
        :return: None
        """

        # 订阅发布表
        if not self._exist_pub_table():
            raise ValueError("发布表不存在,先执行create_ctp_publish_table")

        logger.info("开始订阅行情...")

        securities_frame: pd.DataFrame = self.get_all_securities_info()

        if codes is None:
            codes: List = securities_frame["InstrumentID"].tolist()
        elif isinstance(codes, str):
            df: pd.DataFrame = securities_frame.query("InstrumentID==@codes")
            if df.empty:
                raise ValueError(f"{codes}不存在")

            codes: List[str] = [codes]

        elif isinstance(codes, list):
            df: pd.DataFrame = securities_frame.query("InstrumentID in @codes")
            if df.empty:
                raise ValueError(f"{codes}不存在")

        self.handle_session.run(
            f"ctp::subscribe(ctp_handle, `marketData, objByName(`{self.table_name}), {codes});"
        )
        self.subscribe_codes: List[str] = codes
        logger.success("订阅成功")

    @logger.catch
    def unsubscribe(self, codes: Union[str, List[str]] = None) -> None:
        """
        取消订阅指定的代码。
        :param codes: 要取消订阅的代码,可以是字符串或字符串列表。如果未提供,则默认取消订阅所有已订阅的代码。
        :type codes: Union[str, List[str]], optional
        :raises ValueError: 如果codes既不是字符串也不是字符串列表,则抛出此异常。
        """

        if codes is None:
            if not hasattr(self, "subscribe_codes"):
                raise ValueError("为查询到subscribe_codes,需要手动指定取消订阅的codes!")

            codes: List[str] = self.subscribe_codes

        if isinstance(codes, str):
            codes: List[str] = [codes]

        if not isinstance(codes, (str, list)):
            raise ValueError("codes must be either a string or a list of strings")

        self.handle_session.run(f"ctp::unsubscribe(ctp_handle, `marketData, {codes})")
        for code in codes:
            self._remove_code(code)

        logger.success("订阅取消成功!")

    def get_market_data_schema(self) -> pd.DataFrame:
        """查询ctp marketData表结构"""
        return self.handle_session.run("ctp::getSchema(ctp_handle, `marketData);")

    def get_subscrib_status(self) -> pd.DataFrame:
        """获取订阅状态信息。"""
        return self.handle_session.run("ctp::getStatus(ctp_handle)")

    def close(self) -> None:
        """关闭连接"""

        self.handle_session.run("ctp::close(ctp_handle)")
        logger.success("关闭CTP连接!")

    def dropPubTable(self, table_name: str = None) -> None:

        if table_name is None:
            table_name = self.table_name

        self.handle_session.run(f"dropStreamTable(`{table_name})")
        logger.success(f"丢弃{table_name}发布表,内存中和磁盘上的流数据均会被清除!")

使用说明


CTPDataService用于连接期货CTP接口

参数说明:

- ip:为string,CTP交易服务器IP;

- trade_port: CTP 交易服务器的端口;

- market_port:CTP行情端口号;

- username:资金账户;

- password:交易密码;

- borker_id:经纪公司代码;

- app_id:客户端认证的 App 代码;

- auth_code:客户端认证请求的认证码(授权码)

注:trade_port为交易端口用于查询合约列表,market_port为行情端口用于订阅行情。

# ctp配置
    config: Dict = {
        "ip": "180.xxx.xxx.52",
        "trade_port": "交易端口",
        "market_port":"行情端口",
        "username": "资金密码",
        "password": "交易密码",
        "borker_id": "1x8x",
        "app_id": "client_xxxxxx_0.9.1",
        "auth_code": "授权码",
    }


    # 连接CTP接口
    ctp_service: CTPDataService = CTPDataService(**config)


    # 查询合约列表,列名:InstrumentID
    securities_info: pd.DataFrame = ctp_service.get_all_securities_info()


    # 生成发布表,参数table_name可以指定发布表名,默认为:ctpMarketData
    # ctp_service.create_ctp_publish_table(table_name="ctpPubTable"),生成一个名为ctpPubTable的发布表
    ctp_service.create_ctp_publish_table()


    # 订阅合约,参数coeds默认全部订阅,也可以根据查询后的合约指定订阅
    # ctp_service.subscribe_ctp_data(["T2503","IC2410"]),订阅两个合约
    ctp_service.subscribe_ctp_data()


    # 取消订阅,code默认为之前订阅的code,也可以指定传入
    ctp_service.unsubscribe()


    # 删除发布表,table_name默认为:ctpMarketData
    ctp_service.dropPubTable()