用DolphinScript实现趋势跟踪与ATR止损的交易策略

海豚_325754868
2025-05-23

目录

一、数据分析的准备工作

1.1 建库建表

1.2 stockTB表数据示例

二、趋势跟踪交易策略

2.1 策略介绍

2.2 Kelly公式

三、融入ATR跟踪止损策略

3.1 平均真实波幅(ATR)止盈止损信号

3.2 仓位管理

3.3 交易周期中各指令的触发条件

3.4 实现代码

四、度量策略表现

五、完整脚本代码


一、数据分析的准备工作

1.1 建库建表

数据库按日期值分区,每天一个分区。选用专为时间序列数据优化的TSDB存储引擎,支持高效的时间范围查询和聚合操作。使用loadTextEx函数从CSV文件导入数据,导入时会自动按照定义的分区规则将数据分布到不同分区。


// 创建一个名为 "dfs://stockDB" 的数据库
// 按日期范围进行值分区:从2024年4月1日到2025年4月1日
// 使用TSDB(时间序列数据库)引擎
create database "dfs://stockDB"
partitioned by VALUE(2024.04.01..2025.04.01)  // 按日期值分区
engine='TSDB'  // 使用时间序列数据库引擎
// 在stockDB数据库中创建一个名为stockTB的表
// 表结构包含股票交易的基本字段
create table "dfs://stockDB"."stockTB"(
    date DATE       // 日期字段
    Open DOUBLE     // 开盘价
    High DOUBLE     // 最高价
    Low DOUBLE      // 最低价
    Close DOUBLE    // 收盘价
    Volume INT      // 成交量
)
partitioned by date,             // 按日期字段分区
sortColumns=[`date],             // 按日期排序
keepDuplicates=ALL               // 保留所有重复记录(根据需求可能需要改为LAST或FIRST)
// 获取stockTB表的列定义信息
tmp = loadTable("dfs://stockDB", "stockTB").schema().colDefs
// 创建一个包含列名和类型的表,用于后续数据导入
schemaTB = table(tmp.name as name, tmp.typeString as type)
// 从CSV文件导入数据到stockTB表
// 指定数据库句柄、表名、分区列、文件路径和表结构
loadTextEx(
    dbHandle=database("dfs://stockDB"),  // 数据库句柄
    tableName="stockTB",                 // 目标表名
    partitionColumns=`date,              // 分区列
    filename="C:/Users/Downloads/DolphinDB_Win64_V3.00.2.4_JIT/server/stockTB.csv",  // 数据文件路径
    schema=schemaTB                      // 表结构定义
)
// 从数据库中加载stockTB表的所有数据到变量stockTB中
stockTB = select * from loadTable("dfs://stockDB", "stockTB")

1.2 stockTB表数据示例

本文选用浦发银行"600000"的2024-04-01 至 2025-04-01的日线数据,并且前复权处理避免分红配股导致的股价变化。导入到stockTB表后的部分数据如下:

image.png


二、趋势跟踪交易策略

2.1 策略介绍

这个策略是一个基于价格突破的趋势跟踪交易策略,结合了凯利公式进行仓位管理。当收盘价低于过去15日的最低价(不包括当日)时,触发买入信号;当收盘价高于过去5日的最高价(不包括当日)时,触发卖出信号。

另外,在策略设计中,我们特别设置了从2024年8月才开始产生交易信号。这一设计决策主要出于Kelly公式计算需要足够的历史数据来可靠估计。

// 定义趋势交易信号生成函数
def trendSignal(stockTB){
    // 返回表新增Signal列,包含交易信号
    return select date, Open, High, Low, Close, case
                      // 卖出信号(-1):当收盘价突破前一日5日最高价 且 日期在2024年8月及之后
                      when Close > High.prev().mmax(5).bfill!() && month(date) >= 2024.08M then -1
                      // 买入信号(1):当收盘价跌破前一日15日最低价 且 日期在2024年8月及之后
                      when Close < Low.prev().mmin(15).bfill!() && month(date) >= 2024.08M then 1
                      end as Signal 
           from stockTB
}
// 对股票数据应用趋势信号函数
trendSignalTB = trendSignal(stockTB)

单次投资的盈利或亏损的计算公式为(当前价格 - 平均成本价格)x sgn(仓位)


2.2 Kelly公式

2.2.1 公式简介

Kelly公式是一种用于确定最优投注或投资比例的数学公式,旨在最大化长期资本增长,同时避免破产风险。它最初用于赌博和通信理论,后来被广泛应用于金融交易、投资组合管理和体育博彩等领域。Kelly公式的形式为:

image.png

其中:

f=投入资产占当前总资产的最优比例。正值代表多头头寸,负值代表空头头寸。假设不允许施加杠杆,其绝对值应小于1

Pwin=做多胜率

Ploss=做多败率,也是做空胜率

rw=每次做多胜利时的平均盈利

rl=每次做空胜利时的平均盈利


2.2.2 Kelly值的计算思路和实现

1) 计算历史胜率Pwin、Ploss

做多/空胜利总次数cum_long_count/cum_short_count:计算每天价格在历史序列中的排名(升序),其排名值刚好等于这天价格高于历史序列中的价格的次数,看作是这天的做多胜利次数(或者这样说,在过去的每一天做一笔买入交易,而在这天全部卖出,其中获利的交易单数)。累加排名值,得到以这天为最后期限,能够做多胜利的总次数。例如,[10, 20, 15, 20, 30] 的排名是 [0, 1, 1, 2, 4],累加得 [0, 1, 2, 4, 8]。做空胜利总次数的计算思路与前述类似。

然后历史胜率Pwin、Ploss的计算公式:

image.png

image.png

其中:

Nlong=做多胜利总次数

Nshort=做多胜利总次数

2) 计算历史平均盈利rw、rl

long_profit(v)/ short_profit(v):计算每个时刻的潜在做多/空盈利。对于每个价格 v[i],检查之前所有低于 v[i] 的价格,并计算差值总和。例如 v = [10, 20, 15, 20, 30],long_profit 计算:[0, 10, 5, 15, 55]

i=0(v[0]=10):前面无数据 → 0

i=1(v[1]=20):10 < 20 → 20-10 = 10

i=2(v[2]=15):10 < 15 → 15-10 = 5

i=3(v[3]=20):10 < 20, 15 < 20 → 20-10+20-15 = 15

i=4(v[4]=30):10 < 30, 20 < 30, 15 < 30, 20 < 30 → 30-10+30-20+30-15+30-20 = 55

short_profit(v) 的计算思路与前述类似。例如 v = [10, 20, 15, 20, 30],short_profit 计算得[0, 0, 5, 0, 0]

做多/空胜利总盈利cum_long_profit/cum_short_profit:累加long_profit(v)/ short_profit(v),得到以这天为最后期限,与做多/空胜利总次数Nlong、Nshort对应的做多/空胜利总盈利。

然后历史平均盈利rw、rl的计算公式:

image.png

image.png

其中:

Glong=做多胜利总盈利

Gshort=做多胜利总盈利


3) 实现代码

// 计算多头利润函数
// 对于每个位置i,计算前面i-1个价格中低于当前价格的差值总和
def long_profit(v){ 
    return each(i -> sum(iif(v[0:i-1] < v[i-1], v[i-1] - v[0:i-1], 0)), 1..size(v)) 
}
// 计算空头利润函数
// 对于每个位置i,计算前面i-1个价格中高于当前价格的差值总和
def short_profit(v){ 
    return each(i -> sum(iif(v[0:i-1] > v[i-1], v[0:i-1] - v[i-1], 0)), 1..size(v)) 
}
// Kelly公式计算主函数
def Kelly(v){
    // 计算累计多头排名和空头排名
    // cumrank(): 计算每个价格在历史序列中的排名(从小到大)
    cum_long_count = v.cumrank().cumsum().double()
    // cumrank(ascending=false): 计算每个价格在历史序列中的排名(从大到小)
    cum_short_count = v.cumrank(ascending=false, tiesMethod='max').cumsum().double()
    
    // 计算多头和空头的胜率
    long_rate = cum_long_count / (cum_long_count + cum_short_count)
    short_rate = 1 - long_rate
    // 计算累计多头和空头利润
    cum_long_profit = v.long_profit().cumsum()
    cum_short_profit = v.short_profit().cumsum()
    
    // 计算平均多头和空头利润
    avg_long_profit = cum_long_profit / cum_long_count
    avg_short_profit = cum_short_profit / cum_short_count
    // Kelly公式计算
    // 公式: (胜率*平均盈利 - 败率*平均亏损)/(平均盈利*平均亏损)
    return (long_rate * avg_long_profit - short_rate * avg_short_profit) / (avg_long_profit * avg_short_profit)
}


2.2.3 Kelly值计算结果

如下图部分结果,从2024年8月开始,基于历史数据计算的Kelly值稳定在0.7~0.8之间。因为Kelly公式在实际交易中偏激进,在策略的仓位管理中使用半Kelly值。

image.png


三、融入ATR跟踪止损策略

3.1 平均真实波幅(ATR)止盈止损信号

平均真实波幅ATR值是以下三个值的最大值取15日移动平均:

当日最高价-最低价、

当日最高价-次日收盘价的绝对值、

次日收盘价-当日最低价的绝对值

止盈倍数 × ATR值 > 单次投资的盈利时,触发止盈信号;止损倍数 × ATR值 > 单次投资的亏损时,触发止损信号。策略选取的止盈倍数为3.5,止损倍数为1.8。

ATR = matrix(
    trendSignalTB[`High] - trendSignalTB[`Low],
    abs(trendSignalTB[`High] - trendSignalTB[`Close].next()),
    abs(trendSignalTB[`Close].next() - trendSignalTB[`Low])
).rowMax().mavg(15)


3.2 仓位管理

本策略采用凯利公式进行动态仓位管理,基本公式为:

仓位比例 = (趋势信号方向 × Kelly值) / 2

持仓量 =仓位比例 × 本金 / 当日股价

其中趋势信号方向为1或-1,1代表买入信号,-1代表卖出信号;Kelly值按照2.2节计算得到。注意按照2.2节计算的Kelly值可正可负,如果趋势信号为卖出信号为负且Kelly值为负,那么计算的实际仓位为正,代表多头头寸。当日的股票交易价格取当日的收盘价。


3.3 交易周期中各指令的触发条件

策略的一个交易周期包括以下4种指令,分别说明它们的触发条件,若是其他情况,则执行保持不变的指令

3.3.1 开仓的触发条件

前一日无持仓Position[i-1]==0.0

且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0

且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()

之后按3.2节说明的规则计算新的仓位。

3.3.2 调仓的触发条件

当前持仓方向与前一日信号方向一致Position[i-1] * trendSignalTB[Signal][i-1] > 0

且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0

且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()

之后按3.2节说明的规则计算新的仓位。

3.3.3 多空转换的触发条件

当前持仓方向与前一日信号方向相反Position[i-1] * trendSignalTB[Signal][i-1] < 0

且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0

且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()

之后按3.2节说明的规则计算新的仓位。

3.3.4 平仓的触发条件

前一日产生了ATR信号atrSignal[i-1].isValid()

之后清空持仓。另外设计冷静期机制,即接下来5天内忽略趋势信号,保持空仓。


3.4 实现代码

定义一个名为tradeRecord、输入参数为trendSignalTB的函数来实现策略,该函数的主要内容是遍历每个交易日的for循环。

主循环内包括确定交易指令order、计算持仓数量Position、计算平均成本价costPrice、更新资产价值capital、生成ATR信号,以下逐一说明。

3.4.1 初始化变量


order = take("hold", size(trendSignalTB[`date]))  // 交易指令数组
order[0] = "hold"  // 初始指令为"hold"
Position = take(0.0, size(trendSignalTB[`date]))  // 持仓数量数组
Position[0] = 0.0  // 初始持仓为0
costPrice = take(0.0, size(trendSignalTB[`date]))  // 平均成本价格数组
costPrice[0] = 0.0  // 初始平均成本价格为0
atrSignal = take(0.0, size(trendSignalTB[`date]))  // ATR信号数组
atrSignal[0] = NULL  // 初始ATR信号为空
cash = take(0.0, size(trendSignalTB[`date]))  // 资金数组
cash[0] = 100000.0  // 初始资金为100000.0
capital = take(0.0, size(trendSignalTB[`date]))  // 资产价值数组
capital[0] = cash[0]  // 初始资产价值为100000.0

3.4.2 确定交易指令order

按照3.3节说明的规则:

order[i] = case
    when Position[i-1]==0.0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "open"
    when Position[i-1] * trendSignalTB[`Signal][i-1] > 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "adjust"
    when Position[i-1] * trendSignalTB[`Signal][i-1] < 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "reverse"
    when atrSignal[i-1].isValid() then "close"
    else "hold"
    end

3.4.3 计算持仓数量Position

按照3.2节说明:

Position[i] = case
    when order[i] == "open" || order[i] == "adjust" || order[i] == "reverse" then (capital[i-1] * trendSignalTB[`Signal][i-1] * Kelly(trendSignalTB[`Close])[i] / 2) / trendSignalTB[`Close][i]  // 根据凯利公式计算仓位
    when order[i] == "close" then 0.0  // 平仓时持仓为0
    else Position[i-1]  // 保持原仓位
    end

3.4.4 计算平均成本价格costPrice

costPrice[i] = case
    when order[i] == "open" || order[i] == "reverse" then trendSignalTB[`Close][i]  // 新开仓或反向操作时使用当前收盘价
    when order[i] == "adjust" then trendSignalTB[`Close][i] + (costPrice[i-1] - trendSignalTB[`Close][i]) * Position[i-1] / Position[i]  // 调整仓位时计算加权平均价格
    when order[i] == "close" then 0.0  // 平仓时买入价格归零
    else costPrice[i-1]  // 保持原买入价格
    end

3.4.5 更新资产价值capital

cash[i] = cash[i-1] + (Position[i-1] - Position[i]) * trendSignalTB[`Close][i]
capital[i] = cash[i] + Position[i] * trendSignalTB[`Close][i]

3.4.6 生成ATR信号

按照3.1节说明:

atrSignal[i] = case
    when costPrice[i]==0.0 then NULL  // 无持仓时无信号
    when 3.5*ATR[i] < (trendSignalTB[`Close][i] - costPrice[i]) * signum(Position[i]) then 1  // 止盈信号
    when -1.8*ATR[i] > (trendSignalTB[`Close][i] - costPrice[i]) * signum(Position[i]) then 1  // 止损信号
    else NULL  // 无信号
    end

3.4.7 返回交易记录表tradeRecordTB

函数返回一个表,表包含价格、仓位、资金、交易信号等信息。部分结果如下图:

image.png


四、度量策略表现

用收益率和最大回撤来度量策略表现,年化收益率的数学计算公式为:

image.png

其中:

Ct=当前资产价值

C0=初始资产价值

t=投资天数

风险回撤的数学计算公式为:

image.png

其中:

Ct=当前资产价值

Cpeak=当前时间点之前的资产价值峰值

// 定义计算交易指标的函数
def performMetrics(tradeRecordTB){
    // 从交易记录表中选择数据并计算指标
    return select date, 
                  // 计算年化收益率: 
                  log(capital / capital[0]) * 365/(date - date[0]).double() as returnRate,
                  // 计算累计最大回撤 (使用cummdd函数)
                  capital.cummdd() as drawdown,
           from tradeRecordTB
           // 筛选2024年8月及以后的数据
           where month(date) >= 2024.08M
}
// 调用函数计算指标,结果存入performMetricsTB表
performMetricsTB = performMetrics(tradeRecordTB)
// 计算平均年化收益率
avgreturnRate = avg(performMetricsTB[`returnRate])
// 将平均年化收益率格式化为百分比,保留两位小数
avgreturnRate.format("0.00%")
// 计算最大回撤值
drawdownMax = max(performMetricsTB[`drawdown])
// 将最大回撤格式化为百分比,保留两位小数
drawdownMax.format("0.00%")

计算结果年化收益率6.48%,最大回撤1.56%


五、完整脚本代码

create database "dfs://stockDB"
partitioned by VALUE(2024.04.01..2025.04.01)
engine='TSDB'

create table "dfs://stockDB"."stockTB"(
    date DATE
    Open DOUBLE
    High DOUBLE
    Low DOUBLE
    Close DOUBLE
    Volume INT
)
partitioned by date,
sortColumns=[`date],
keepDuplicates=ALL

tmp = loadTable("dfs://stockDB", "stockTB").schema().colDefs
schemaTB = table(tmp.name as name, tmp.typeString as type)
loadTextEx(dbHandle=database("dfs://stockDB"), tableName="stockTB", partitionColumns=`date, filename="C:/Users/Downloads/DolphinDB_Win64_V3.00.2.4_JIT/server/stockTB.csv", schema=schemaTB)

stockTB = select * from loadTable("dfs://stockDB", "stockTB")
share stockTB as sharedTB

def long_profit(v){ 
    return each(i -> sum(iif(v[0:i-1] < v[i-1], v[i-1] - v[0:i-1], 0)), 1..size(v)) 
}
def short_profit(v){ 
    return each(i -> sum(iif(v[0:i-1] > v[i-1], v[0:i-1] - v[i-1], 0)), 1..size(v)) 
}
def Kelly(v){
    cum_long_count = v.cumrank().cumsum().double()
    cum_short_count = v.cumrank(ascending=false, tiesMethod='max').cumsum().double()
    long_rate = cum_long_count / (cum_long_count + cum_short_count)
    short_rate = 1 - long_rate

    cum_long_profit = v.long_profit().cumsum()
    cum_short_profit = v.short_profit().cumsum()
    avg_long_profit = cum_long_profit / cum_long_count
    avg_short_profit = cum_short_profit / cum_short_count

    return (long_rate * avg_long_profit - short_rate * avg_short_profit) / ( avg_long_profit * avg_short_profit)
}

def trendSignal(stockTB){
    return select date, Open, High, Low, Close, case
                when Close > High.prev().mmax(5).bfill!() && month(date) >= 2024.08M then -1
                when Close < Low.prev().mmin(15).bfill!() && month(date) >= 2024.08M then 1
            end as Signal
            from stockTB
}
trendSignalTB = trendSignal(stockTB);

def tradeRecord(trendSignalTB){
 ATR = matrix(
    trendSignalTB[`High] - trendSignalTB[`Low],
    abs(trendSignalTB[`High] - trendSignalTB[`Close].next()),
    abs(trendSignalTB[`Close].next() - trendSignalTB[`Low])
).rowMax().mavg(15);
    
    order = take("hold", size(trendSignalTB[`date]))
    order[0] = "hold"
    costPrice = take(0.0, size(trendSignalTB[`date]))
    costPrice[0] = 0.0
    Position = take(0.0, size(trendSignalTB[`date]))
    Position[0] = 0.0
    atrSignal = take(0.0, size(trendSignalTB[`date]))
    atrSignal[0] = NULL
    cash = take(0.0, size(trendSignalTB[`date]))
    cash[0] = 100000.0
    capital = take(0.0, size(trendSignalTB[`date]))
    capital[0] = cash[0]
    
for(i in 1..(size(trendSignalTB[`date])-1)){
    order[i] = case
    when Position[i-1]==0.0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "open"
    when Position[i-1] * trendSignalTB[`Signal][i-1] > 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "adjust"
    when Position[i-1] * trendSignalTB[`Signal][i-1] < 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "reverse"
    when atrSignal[i-1].isValid() then "close"
    else "hold"
    end
    
    Position[i] = case
    when order[i] == "open" || order[i] == "adjust" || order[i] == "reverse" then (capital[i-1] * trendSignalTB[`Signal][i-1] * Kelly(trendSignalTB[`Close])[i] / 2) / trendSignalTB[`Close][i]
    when order[i] == "close" then 0.0
    else Position[i-1]
    end

    costPrice[i] = case
    when order[i] == "open" || order[i] == "reverse" then trendSignalTB[`Close][i]
    when order[i] == "adjust" then trendSignalTB[`Close][i] + (costPrice[i-1] - trendSignalTB[`Close][i]) * Position[i-1] / Position[i]
    when order[i] == "close" then 0.0
    else costPrice[i-1]
    end
    
    cash[i] = cash[i-1] + (Position[i-1] - Position[i]) * trendSignalTB[`Close][i]
    capital[i] = cash[i] + Position[i] * trendSignalTB[`Close][i]
    
    atrSignal[i] = case
    when costPrice[i]==0.0 then NULL
    when 3.5*ATR[i] < (trendSignalTB[`Close][i]-costPrice[i])*signum(Position[i]) then 1
    when -1.8*ATR[i] > (trendSignalTB[`Close][i]-costPrice[i])*signum(Position[i]) then 1
    else NULL
    end
    
}
return select date, Close as tradePrice, costPrice, Position, cash, capital, order, Signal as trendSignal, atrSignal
    from trendSignalTB

}
tradeRecordTB = tradeRecord(trendSignalTB);

def performMetrics(tradeRecordTB){
    return select date, log(capital / capital[0]) * 365/(date - date[0]).double() as returnRate, capital.cummdd() as drawdown
    from tradeRecordTB
    where month(date) >= 2024.08M
}
performMetricsTB = performMetrics(tradeRecordTB)

avgreturnRate = avg(performMetricsTB[`returnRate])
avgreturnRate.format("0.00%")
drawdownMax = max(performMetricsTB[`drawdown])
drawdownMax.format("0.00%")