用DolphinScript实现趋势跟踪与ATR止损的交易策略
目录
一、数据分析的准备工作
1.1 建库建表
1.2 stockTB表数据示例
二、趋势跟踪交易策略
2.1 策略介绍
2.2 Kelly公式
三、融入ATR跟踪止损策略
3.1 平均真实波幅(ATR)止盈止损信号
3.2 仓位管理
3.3 交易周期中各指令的触发条件
3.4 实现代码
四、度量策略表现
五、完整脚本代码
一、数据分析的准备工作
1.1 建库建表
数据库按日期值分区,每天一个分区。选用专为时间序列数据优化的TSDB存储引擎,支持高效的时间范围查询和聚合操作。使用loadTextEx函数从CSV文件导入数据,导入时会自动按照定义的分区规则将数据分布到不同分区。
// 创建一个名为 "dfs://stockDB" 的数据库
// 按日期范围进行值分区:从2024年4月1日到2025年4月1日
// 使用TSDB(时间序列数据库)引擎
create database "dfs://stockDB"
partitioned by VALUE(2024.04.01..2025.04.01) // 按日期值分区
engine='TSDB' // 使用时间序列数据库引擎
// 在stockDB数据库中创建一个名为stockTB的表
// 表结构包含股票交易的基本字段
create table "dfs://stockDB"."stockTB"(
date DATE // 日期字段
Open DOUBLE // 开盘价
High DOUBLE // 最高价
Low DOUBLE // 最低价
Close DOUBLE // 收盘价
Volume INT // 成交量
)
partitioned by date, // 按日期字段分区
sortColumns=[`date], // 按日期排序
keepDuplicates=ALL // 保留所有重复记录(根据需求可能需要改为LAST或FIRST)
// 获取stockTB表的列定义信息
tmp = loadTable("dfs://stockDB", "stockTB").schema().colDefs
// 创建一个包含列名和类型的表,用于后续数据导入
schemaTB = table(tmp.name as name, tmp.typeString as type)
// 从CSV文件导入数据到stockTB表
// 指定数据库句柄、表名、分区列、文件路径和表结构
loadTextEx(
dbHandle=database("dfs://stockDB"), // 数据库句柄
tableName="stockTB", // 目标表名
partitionColumns=`date, // 分区列
filename="C:/Users/Downloads/DolphinDB_Win64_V3.00.2.4_JIT/server/stockTB.csv", // 数据文件路径
schema=schemaTB // 表结构定义
)
// 从数据库中加载stockTB表的所有数据到变量stockTB中
stockTB = select * from loadTable("dfs://stockDB", "stockTB")1.2 stockTB表数据示例
本文选用浦发银行"600000"的2024-04-01 至 2025-04-01的日线数据,并且前复权处理避免分红配股导致的股价变化。导入到stockTB表后的部分数据如下:

二、趋势跟踪交易策略
2.1 策略介绍
这个策略是一个基于价格突破的趋势跟踪交易策略,结合了凯利公式进行仓位管理。当收盘价低于过去15日的最低价(不包括当日)时,触发买入信号;当收盘价高于过去5日的最高价(不包括当日)时,触发卖出信号。
另外,在策略设计中,我们特别设置了从2024年8月才开始产生交易信号。这一设计决策主要出于Kelly公式计算需要足够的历史数据来可靠估计。
// 定义趋势交易信号生成函数
def trendSignal(stockTB){
// 返回表新增Signal列,包含交易信号
return select date, Open, High, Low, Close, case
// 卖出信号(-1):当收盘价突破前一日5日最高价 且 日期在2024年8月及之后
when Close > High.prev().mmax(5).bfill!() && month(date) >= 2024.08M then -1
// 买入信号(1):当收盘价跌破前一日15日最低价 且 日期在2024年8月及之后
when Close < Low.prev().mmin(15).bfill!() && month(date) >= 2024.08M then 1
end as Signal
from stockTB
}
// 对股票数据应用趋势信号函数
trendSignalTB = trendSignal(stockTB)单次投资的盈利或亏损的计算公式为(当前价格 - 平均成本价格)x sgn(仓位)
2.2 Kelly公式
2.2.1 公式简介
Kelly公式是一种用于确定最优投注或投资比例的数学公式,旨在最大化长期资本增长,同时避免破产风险。它最初用于赌博和通信理论,后来被广泛应用于金融交易、投资组合管理和体育博彩等领域。Kelly公式的形式为:

其中:
f=投入资产占当前总资产的最优比例。正值代表多头头寸,负值代表空头头寸。假设不允许施加杠杆,其绝对值应小于1
Pwin=做多胜率
Ploss=做多败率,也是做空胜率
rw=每次做多胜利时的平均盈利
rl=每次做空胜利时的平均盈利
2.2.2 Kelly值的计算思路和实现
1) 计算历史胜率Pwin、Ploss
做多/空胜利总次数cum_long_count/cum_short_count:计算每天价格在历史序列中的排名(升序),其排名值刚好等于这天价格高于历史序列中的价格的次数,看作是这天的做多胜利次数(或者这样说,在过去的每一天做一笔买入交易,而在这天全部卖出,其中获利的交易单数)。累加排名值,得到以这天为最后期限,能够做多胜利的总次数。例如,[10, 20, 15, 20, 30] 的排名是 [0, 1, 1, 2, 4],累加得 [0, 1, 2, 4, 8]。做空胜利总次数的计算思路与前述类似。
然后历史胜率Pwin、Ploss的计算公式:


其中:
Nlong=做多胜利总次数
Nshort=做多胜利总次数
2) 计算历史平均盈利rw、rl
long_profit(v)/ short_profit(v):计算每个时刻的潜在做多/空盈利。对于每个价格 v[i],检查之前所有低于 v[i] 的价格,并计算差值总和。例如 v = [10, 20, 15, 20, 30],long_profit 计算:[0, 10, 5, 15, 55]
i=0(v[0]=10):前面无数据 → 0
i=1(v[1]=20):10 < 20 → 20-10 = 10
i=2(v[2]=15):10 < 15 → 15-10 = 5
i=3(v[3]=20):10 < 20, 15 < 20 → 20-10+20-15 = 15
i=4(v[4]=30):10 < 30, 20 < 30, 15 < 30, 20 < 30 → 30-10+30-20+30-15+30-20 = 55
short_profit(v) 的计算思路与前述类似。例如 v = [10, 20, 15, 20, 30],short_profit 计算得[0, 0, 5, 0, 0]
做多/空胜利总盈利cum_long_profit/cum_short_profit:累加long_profit(v)/ short_profit(v),得到以这天为最后期限,与做多/空胜利总次数Nlong、Nshort对应的做多/空胜利总盈利。
然后历史平均盈利rw、rl的计算公式:


其中:
Glong=做多胜利总盈利
Gshort=做多胜利总盈利
3) 实现代码
// 计算多头利润函数
// 对于每个位置i,计算前面i-1个价格中低于当前价格的差值总和
def long_profit(v){
return each(i -> sum(iif(v[0:i-1] < v[i-1], v[i-1] - v[0:i-1], 0)), 1..size(v))
}
// 计算空头利润函数
// 对于每个位置i,计算前面i-1个价格中高于当前价格的差值总和
def short_profit(v){
return each(i -> sum(iif(v[0:i-1] > v[i-1], v[0:i-1] - v[i-1], 0)), 1..size(v))
}
// Kelly公式计算主函数
def Kelly(v){
// 计算累计多头排名和空头排名
// cumrank(): 计算每个价格在历史序列中的排名(从小到大)
cum_long_count = v.cumrank().cumsum().double()
// cumrank(ascending=false): 计算每个价格在历史序列中的排名(从大到小)
cum_short_count = v.cumrank(ascending=false, tiesMethod='max').cumsum().double()
// 计算多头和空头的胜率
long_rate = cum_long_count / (cum_long_count + cum_short_count)
short_rate = 1 - long_rate
// 计算累计多头和空头利润
cum_long_profit = v.long_profit().cumsum()
cum_short_profit = v.short_profit().cumsum()
// 计算平均多头和空头利润
avg_long_profit = cum_long_profit / cum_long_count
avg_short_profit = cum_short_profit / cum_short_count
// Kelly公式计算
// 公式: (胜率*平均盈利 - 败率*平均亏损)/(平均盈利*平均亏损)
return (long_rate * avg_long_profit - short_rate * avg_short_profit) / (avg_long_profit * avg_short_profit)
}
2.2.3 Kelly值计算结果
如下图部分结果,从2024年8月开始,基于历史数据计算的Kelly值稳定在0.7~0.8之间。因为Kelly公式在实际交易中偏激进,在策略的仓位管理中使用半Kelly值。

三、融入ATR跟踪止损策略
3.1 平均真实波幅(ATR)止盈止损信号
平均真实波幅ATR值是以下三个值的最大值取15日移动平均:
当日最高价-最低价、
当日最高价-次日收盘价的绝对值、
次日收盘价-当日最低价的绝对值
止盈倍数 × ATR值 > 单次投资的盈利时,触发止盈信号;止损倍数 × ATR值 > 单次投资的亏损时,触发止损信号。策略选取的止盈倍数为3.5,止损倍数为1.8。
ATR = matrix(
trendSignalTB[`High] - trendSignalTB[`Low],
abs(trendSignalTB[`High] - trendSignalTB[`Close].next()),
abs(trendSignalTB[`Close].next() - trendSignalTB[`Low])
).rowMax().mavg(15)
3.2 仓位管理
本策略采用凯利公式进行动态仓位管理,基本公式为:
仓位比例 = (趋势信号方向 × Kelly值) / 2
持仓量 =仓位比例 × 本金 / 当日股价
其中趋势信号方向为1或-1,1代表买入信号,-1代表卖出信号;Kelly值按照2.2节计算得到。注意按照2.2节计算的Kelly值可正可负,如果趋势信号为卖出信号为负且Kelly值为负,那么计算的实际仓位为正,代表多头头寸。当日的股票交易价格取当日的收盘价。
3.3 交易周期中各指令的触发条件
策略的一个交易周期包括以下4种指令,分别说明它们的触发条件,若是其他情况,则执行保持不变的指令
3.3.1 开仓的触发条件
前一日无持仓Position[i-1]==0.0
且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0
且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()
之后按3.2节说明的规则计算新的仓位。
3.3.2 调仓的触发条件
当前持仓方向与前一日信号方向一致Position[i-1] * trendSignalTB[Signal][i-1] > 0
且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0
且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()
之后按3.2节说明的规则计算新的仓位。
3.3.3 多空转换的触发条件
当前持仓方向与前一日信号方向相反Position[i-1] * trendSignalTB[Signal][i-1] < 0
且 过去5天没有ATR信号atrSignal[(i-5):i].isValid().sum() == 0
且 前一日有有效的趋势信号trendSignalTB[Signal][i-1].isValid()
之后按3.2节说明的规则计算新的仓位。
3.3.4 平仓的触发条件
前一日产生了ATR信号atrSignal[i-1].isValid()
之后清空持仓。另外设计冷静期机制,即接下来5天内忽略趋势信号,保持空仓。
3.4 实现代码
定义一个名为tradeRecord、输入参数为trendSignalTB的函数来实现策略,该函数的主要内容是遍历每个交易日的for循环。
主循环内包括确定交易指令order、计算持仓数量Position、计算平均成本价costPrice、更新资产价值capital、生成ATR信号,以下逐一说明。
3.4.1 初始化变量
order = take("hold", size(trendSignalTB[`date])) // 交易指令数组
order[0] = "hold" // 初始指令为"hold"
Position = take(0.0, size(trendSignalTB[`date])) // 持仓数量数组
Position[0] = 0.0 // 初始持仓为0
costPrice = take(0.0, size(trendSignalTB[`date])) // 平均成本价格数组
costPrice[0] = 0.0 // 初始平均成本价格为0
atrSignal = take(0.0, size(trendSignalTB[`date])) // ATR信号数组
atrSignal[0] = NULL // 初始ATR信号为空
cash = take(0.0, size(trendSignalTB[`date])) // 资金数组
cash[0] = 100000.0 // 初始资金为100000.0
capital = take(0.0, size(trendSignalTB[`date])) // 资产价值数组
capital[0] = cash[0] // 初始资产价值为100000.03.4.2 确定交易指令order
按照3.3节说明的规则:
order[i] = case
when Position[i-1]==0.0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "open"
when Position[i-1] * trendSignalTB[`Signal][i-1] > 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "adjust"
when Position[i-1] * trendSignalTB[`Signal][i-1] < 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "reverse"
when atrSignal[i-1].isValid() then "close"
else "hold"
end3.4.3 计算持仓数量Position
按照3.2节说明:
Position[i] = case
when order[i] == "open" || order[i] == "adjust" || order[i] == "reverse" then (capital[i-1] * trendSignalTB[`Signal][i-1] * Kelly(trendSignalTB[`Close])[i] / 2) / trendSignalTB[`Close][i] // 根据凯利公式计算仓位
when order[i] == "close" then 0.0 // 平仓时持仓为0
else Position[i-1] // 保持原仓位
end3.4.4 计算平均成本价格costPrice
costPrice[i] = case
when order[i] == "open" || order[i] == "reverse" then trendSignalTB[`Close][i] // 新开仓或反向操作时使用当前收盘价
when order[i] == "adjust" then trendSignalTB[`Close][i] + (costPrice[i-1] - trendSignalTB[`Close][i]) * Position[i-1] / Position[i] // 调整仓位时计算加权平均价格
when order[i] == "close" then 0.0 // 平仓时买入价格归零
else costPrice[i-1] // 保持原买入价格
end3.4.5 更新资产价值capital
cash[i] = cash[i-1] + (Position[i-1] - Position[i]) * trendSignalTB[`Close][i]
capital[i] = cash[i] + Position[i] * trendSignalTB[`Close][i]3.4.6 生成ATR信号
按照3.1节说明:
atrSignal[i] = case
when costPrice[i]==0.0 then NULL // 无持仓时无信号
when 3.5*ATR[i] < (trendSignalTB[`Close][i] - costPrice[i]) * signum(Position[i]) then 1 // 止盈信号
when -1.8*ATR[i] > (trendSignalTB[`Close][i] - costPrice[i]) * signum(Position[i]) then 1 // 止损信号
else NULL // 无信号
end3.4.7 返回交易记录表tradeRecordTB
函数返回一个表,表包含价格、仓位、资金、交易信号等信息。部分结果如下图:

四、度量策略表现
用收益率和最大回撤来度量策略表现,年化收益率的数学计算公式为:

其中:
Ct=当前资产价值
C0=初始资产价值
t=投资天数
风险回撤的数学计算公式为:

其中:
Ct=当前资产价值
Cpeak=当前时间点之前的资产价值峰值
// 定义计算交易指标的函数
def performMetrics(tradeRecordTB){
// 从交易记录表中选择数据并计算指标
return select date,
// 计算年化收益率:
log(capital / capital[0]) * 365/(date - date[0]).double() as returnRate,
// 计算累计最大回撤 (使用cummdd函数)
capital.cummdd() as drawdown,
from tradeRecordTB
// 筛选2024年8月及以后的数据
where month(date) >= 2024.08M
}
// 调用函数计算指标,结果存入performMetricsTB表
performMetricsTB = performMetrics(tradeRecordTB)
// 计算平均年化收益率
avgreturnRate = avg(performMetricsTB[`returnRate])
// 将平均年化收益率格式化为百分比,保留两位小数
avgreturnRate.format("0.00%")
// 计算最大回撤值
drawdownMax = max(performMetricsTB[`drawdown])
// 将最大回撤格式化为百分比,保留两位小数
drawdownMax.format("0.00%")计算结果年化收益率6.48%,最大回撤1.56%
五、完整脚本代码
create database "dfs://stockDB"
partitioned by VALUE(2024.04.01..2025.04.01)
engine='TSDB'
create table "dfs://stockDB"."stockTB"(
date DATE
Open DOUBLE
High DOUBLE
Low DOUBLE
Close DOUBLE
Volume INT
)
partitioned by date,
sortColumns=[`date],
keepDuplicates=ALL
tmp = loadTable("dfs://stockDB", "stockTB").schema().colDefs
schemaTB = table(tmp.name as name, tmp.typeString as type)
loadTextEx(dbHandle=database("dfs://stockDB"), tableName="stockTB", partitionColumns=`date, filename="C:/Users/Downloads/DolphinDB_Win64_V3.00.2.4_JIT/server/stockTB.csv", schema=schemaTB)
stockTB = select * from loadTable("dfs://stockDB", "stockTB")
share stockTB as sharedTB
def long_profit(v){
return each(i -> sum(iif(v[0:i-1] < v[i-1], v[i-1] - v[0:i-1], 0)), 1..size(v))
}
def short_profit(v){
return each(i -> sum(iif(v[0:i-1] > v[i-1], v[0:i-1] - v[i-1], 0)), 1..size(v))
}
def Kelly(v){
cum_long_count = v.cumrank().cumsum().double()
cum_short_count = v.cumrank(ascending=false, tiesMethod='max').cumsum().double()
long_rate = cum_long_count / (cum_long_count + cum_short_count)
short_rate = 1 - long_rate
cum_long_profit = v.long_profit().cumsum()
cum_short_profit = v.short_profit().cumsum()
avg_long_profit = cum_long_profit / cum_long_count
avg_short_profit = cum_short_profit / cum_short_count
return (long_rate * avg_long_profit - short_rate * avg_short_profit) / ( avg_long_profit * avg_short_profit)
}
def trendSignal(stockTB){
return select date, Open, High, Low, Close, case
when Close > High.prev().mmax(5).bfill!() && month(date) >= 2024.08M then -1
when Close < Low.prev().mmin(15).bfill!() && month(date) >= 2024.08M then 1
end as Signal
from stockTB
}
trendSignalTB = trendSignal(stockTB);
def tradeRecord(trendSignalTB){
ATR = matrix(
trendSignalTB[`High] - trendSignalTB[`Low],
abs(trendSignalTB[`High] - trendSignalTB[`Close].next()),
abs(trendSignalTB[`Close].next() - trendSignalTB[`Low])
).rowMax().mavg(15);
order = take("hold", size(trendSignalTB[`date]))
order[0] = "hold"
costPrice = take(0.0, size(trendSignalTB[`date]))
costPrice[0] = 0.0
Position = take(0.0, size(trendSignalTB[`date]))
Position[0] = 0.0
atrSignal = take(0.0, size(trendSignalTB[`date]))
atrSignal[0] = NULL
cash = take(0.0, size(trendSignalTB[`date]))
cash[0] = 100000.0
capital = take(0.0, size(trendSignalTB[`date]))
capital[0] = cash[0]
for(i in 1..(size(trendSignalTB[`date])-1)){
order[i] = case
when Position[i-1]==0.0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "open"
when Position[i-1] * trendSignalTB[`Signal][i-1] > 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "adjust"
when Position[i-1] * trendSignalTB[`Signal][i-1] < 0 && atrSignal[(i-5):i].isValid().sum() == 0 && trendSignalTB[`Signal][i-1].isValid() then "reverse"
when atrSignal[i-1].isValid() then "close"
else "hold"
end
Position[i] = case
when order[i] == "open" || order[i] == "adjust" || order[i] == "reverse" then (capital[i-1] * trendSignalTB[`Signal][i-1] * Kelly(trendSignalTB[`Close])[i] / 2) / trendSignalTB[`Close][i]
when order[i] == "close" then 0.0
else Position[i-1]
end
costPrice[i] = case
when order[i] == "open" || order[i] == "reverse" then trendSignalTB[`Close][i]
when order[i] == "adjust" then trendSignalTB[`Close][i] + (costPrice[i-1] - trendSignalTB[`Close][i]) * Position[i-1] / Position[i]
when order[i] == "close" then 0.0
else costPrice[i-1]
end
cash[i] = cash[i-1] + (Position[i-1] - Position[i]) * trendSignalTB[`Close][i]
capital[i] = cash[i] + Position[i] * trendSignalTB[`Close][i]
atrSignal[i] = case
when costPrice[i]==0.0 then NULL
when 3.5*ATR[i] < (trendSignalTB[`Close][i]-costPrice[i])*signum(Position[i]) then 1
when -1.8*ATR[i] > (trendSignalTB[`Close][i]-costPrice[i])*signum(Position[i]) then 1
else NULL
end
}
return select date, Close as tradePrice, costPrice, Position, cash, capital, order, Signal as trendSignal, atrSignal
from trendSignalTB
}
tradeRecordTB = tradeRecord(trendSignalTB);
def performMetrics(tradeRecordTB){
return select date, log(capital / capital[0]) * 365/(date - date[0]).double() as returnRate, capital.cummdd() as drawdown
from tradeRecordTB
where month(date) >= 2024.08M
}
performMetricsTB = performMetrics(tradeRecordTB)
avgreturnRate = avg(performMetricsTB[`returnRate])
avgreturnRate.format("0.00%")
drawdownMax = max(performMetricsTB[`drawdown])
drawdownMax.format("0.00%")