Files
fidc-backtest-engine/聚宽微盘股策略.py
2026-04-18 18:02:50 +08:00

655 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
设定的市值17—26亿 可以根据指数的变化来更改
比如3300点集中在15—25亿市值最小的四十只 3400点 集中在17—27亿
对应指数乘以一个系数 对应市值选出40只
'''
from jqdata import *
from datetime import datetime, timedelta
## 初始化函数,设定要操作的股票、基准等等
def initialize(context):
set_benchmark('000852.XSHG') #对标中证1000
# True为开启动态复权模式使用真实价格交易
set_option('use_real_price', True)
# 设定成交量比例
set_option('order_volume_ratio', 1)
# 股票类交易手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, \
open_commission=0.0003, close_commission=0.0003,\
close_today_commission=0, min_commission=5), type='stock')
# 交易日计时器
g.days = 0
# 分仓常量参数,无须人为修改
g.TR = 1;
# 区间最高价
g.summit = {}
g.muster = []
# 运行状态 1/运行; 0/停运
g.OpenYN = 1
# 开始范围
#g.mystart = 13
# 截至范围
#g.myend = 23
# 调仓频率
g.refresh_rate = 15
# 运行函数1
run_daily(trade, time='10:18')
# 运行函数2
run_daily(CPtrade, time='10:17')
# 持仓数量
g.stocknum = 40
# 上证指数对应系数
g.XS = 4/500
#止盈比率
g.CloseRate = 1.07
#止损比率
g.LossRate = 0.93
# 均线上涨比率
g.RSIRate = 1.0001
# 保证金仓位比率: 1/2为半仓
g.TradeRate = 0.5
# 调试日志展示数量
g.debug_log_limit = 60
g.debug_boundary_window = 5
g.market_cap_map = {}
def _fmt_num(value, digits=2):
try:
return ('%.' + str(digits) + 'f') % float(value)
except Exception:
return str(value)
def _safe_bar_values(bar_data, field):
try:
values = bar_data[field]
except Exception:
values = getattr(bar_data, field)
try:
return [float(x) for x in list(values)]
except Exception:
try:
return list(values)
except Exception:
return []
def _safe_bar_last(bar_data, field):
values = _safe_bar_values(bar_data, field)
if values:
return values[-1]
return None
def _market_cap_text(stock):
cap = getattr(g, 'market_cap_map', {}).get(stock)
if cap is None:
return 'None'
return _fmt_num(cap, 2) + '亿'
def _describe_stock(stock, extras=None):
parts = [stock, '市值=' + _market_cap_text(stock)]
if extras:
parts.extend(extras)
return ','.join(parts)
def _log_detail_items(label, details, limit=None, chunk=10):
total = len(details)
if total == 0:
log.info(label + ': 0')
return
if limit is None:
limit = total
shown = details[:limit]
log.info('%s: 总数=%d, 展示=%d' % (label, total, len(shown)))
for idx in range(0, len(shown), chunk):
part = shown[idx:idx + chunk]
log.info('%s[%d-%d]: %s' % (
label,
idx + 1,
idx + len(part),
' | '.join(part)
))
if total > limit:
log.info('%s: 其余%d项省略' % (label, total - limit))
def _log_market_cap_snapshot(df, caller):
if df is None or len(df) == 0:
log.info('市值筛选快照[%s]: 无数据' % caller)
return
top_n = min(len(df), g.debug_log_limit)
top_details = []
for i in range(top_n):
row = df.iloc[i]
top_details.append('%d.%s:%s亿' % (
i + 1,
row['code'],
_fmt_num(row['market_cap'], 2)
))
_log_detail_items('市值排序前段[%s]' % caller, top_details, limit=top_n, chunk=8)
start_idx = max(0, g.stocknum - g.debug_boundary_window - 1)
end_idx = min(len(df), g.stocknum + g.debug_boundary_window)
boundary_details = []
for i in range(start_idx, end_idx):
row = df.iloc[i]
boundary_details.append('%d.%s:%s亿' % (
i + 1,
row['code'],
_fmt_num(row['market_cap'], 2)
))
_log_detail_items('市值排序边界[%s]' % caller, boundary_details, limit=len(boundary_details), chunk=5)
## 选出小市值股票
def check_stocks(context, caller='unknown'):
g.today = context.current_dt
validate_date()
#不停运参数
g.OpenYN = 1
log.info('选股开始[%s]: 日期=%s, g.days=%d, refresh_rate=%d, 持仓数=%d' % (
caller,
context.current_dt.strftime('%Y-%m-%d %H:%M:%S'),
g.days,
g.refresh_rate,
len(context.portfolio.positions)
))
# 检查日期是否在范围内
if g.OpenYN == 0:
log.warn("该时段属于停运==================范围")
return []
else:
# g.OpenYN = 1
g.security = '000001.XSHG'
#000852.XSHG
close_data = get_bars(g.security, count=1, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close_data['close'].mean()
###################################################
close5_data = get_bars(g.security, count=5, unit='1d', fields=['close'])
# 获取股票的收盘价
close10_data = get_bars(g.security, count=10, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close5_data['close'].mean()
# 取得过去十天的平均价格
MA10 = close10_data['close'].mean()
#5日线下穿,则半仓交易
if MA5 < MA10*g.RSIRate:
g.TR = g.TradeRate
elif MA5 >= MA10*g.RSIRate:
g.TR = 1
###################################################
close5_list = _safe_bar_values(close5_data, 'close')
log.info('指数均线调试[%s][%s] close[-5:]=%s, ma5=%s, ma10=%s, ma5<ma10*RSIRate=%s, TR=%s' % (
caller,
context.current_dt.strftime('%Y-%m-%d'),
str([round(float(v), 2) for v in close5_list]),
_fmt_num(MA5, 4),
_fmt_num(MA10, 4),
str(MA5 < MA10 * g.RSIRate),
_fmt_num(g.TR, 4)
))
# 取得上一时间点价格
current_price = close_data['close'][-1]
log.info('中证指数(current_price)'+str(current_price))
if current_price == 2000:
g.mystart = 7
g.myend = 17
elif current_price > 0:
#Y = (current_price - 3000) *g.XS + 14
Y = (current_price - 2000) *g.XS + 7
g.mystart = Y
g.myend = Y + 10
# mystart = g.start
# myend = g.end
mystart = round(g.mystart)
myend = round(g.myend)
log.info('价格区间为:'+ str(mystart) + '~'+str(myend))
# 设定查询条件
q = query(
valuation.code,
valuation.market_cap
).filter(
valuation.market_cap.between(mystart,myend)
).order_by(
valuation.market_cap.asc()
)
# 选出低市值的股票构成buylist
df = get_fundamentals(q)
g.market_cap_map = {}
if df is not None and len(df) > 0:
for _, row in df.iterrows():
try:
g.market_cap_map[row['code']] = float(row['market_cap'])
except Exception:
g.market_cap_map[row['code']] = None
log.info('市值筛选结果[%s]: %d只股票' % (caller, 0 if df is None else len(df)))
_log_market_cap_snapshot(df, caller)
buylist =list(df['code'])
# 过滤停牌ST科创新股1元股
buylist = filter_paused_stock(buylist, caller=caller)
final_list = buylist[:g.stocknum]
g.muster = final_list
_log_detail_items(
'最终选股[%s]' % caller,
[_describe_stock(stock) for stock in final_list],
limit=len(final_list),
chunk=8
)
boundary_slice = buylist[max(0, g.stocknum - g.debug_boundary_window): min(len(buylist), g.stocknum + g.debug_boundary_window)]
_log_detail_items(
'最终选股边界[%s]' % caller,
[_describe_stock(stock) for stock in boundary_slice],
limit=len(boundary_slice),
chunk=5
)
return final_list
def before_trading_start(context):
# 取得当前日期
g.todayDT = context.current_dt
g.today = context.current_dt.strftime('%Y-%m-%d')
g.start = context.current_dt + timedelta(-2)
g.market_cap_map = {}
## 交易函数
def CPtrade(context):
## 选股
stock_list = check_stocks(context, caller='CPtrade')
if g.OpenYN == 0:
#log.warn("日期属于范围")
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
order_target_value(stock, 0)
return
## 交易函数
def trade(context):
## 选股
stock_list = check_stocks(context, caller='trade')
if g.OpenYN == 0:
#log.warn("日期属于范围")
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
order_target_value(stock, 0)
return
g.changeYN = 0
curr_data = get_current_data()
log.info('交易调试[trade]: 日期=%s, g.days=%d, refresh_hit=%s, TR=%s, cash=%s, total_value=%s, 持仓数=%d, 目标池数=%d' % (
context.current_dt.strftime('%Y-%m-%d %H:%M:%S'),
g.days,
str(g.days % g.refresh_rate == 0),
_fmt_num(g.TR, 4),
_fmt_num(context.portfolio.cash, 2),
_fmt_num(context.portfolio.total_value, 2),
len(context.portfolio.positions),
len(stock_list)
))
_log_detail_items(
'交易目标池[trade]',
[_describe_stock(stock) for stock in stock_list],
limit=len(stock_list),
chunk=8
)
# --------------------------------------------------------------------------
for stockPos in context.portfolio.positions:
# SS、记录股票峰值信息
# if g.summit.get(stock, 0)<data[stock].high: g.summit[stock]=data[stock].high
# S0、取得最近几天股票价格信息
grid = get_price(stockPos, start_date=g.start, end_date=g.today, fields=['open', 'high', 'low', 'close', 'high_limit', 'paused'])
# SP、跳过退市、停牌、无效数据
# if grid.paused[-1]: continue
# S1、目前持仓不在预选股票池中(g.muster)则清仓
# if stock not in g.muster:
# order_target(stock,0)
# log.info("市值清仓:%s" % (stock))
# S2、回撤10%则清仓
hold = context.portfolio.positions[stockPos]
current_price = curr_data[stockPos].last_price
avg_cost = hold.avg_cost
return_ratio = current_price / avg_cost - 1 if avg_cost else 0
grid_high_limit = _safe_bar_last(grid, 'high_limit')
grid_close = _safe_bar_last(grid, 'close')
grid_paused = _safe_bar_last(grid, 'paused')
stop_hit = (current_price / avg_cost < g.LossRate) if avg_cost else False
profit_hit = (current_price < grid_high_limit and current_price / avg_cost > g.CloseRate) if (avg_cost and grid_high_limit is not None) else False
log.info('止盈止损评估[%s] %s: qty=%s, avg_cost=%s, current=%s, return=%s%%, stop_threshold=%s, profit_threshold=%s, high_limit=%s, bar.close=%s, paused=%s, in_target=%s, stop_hit=%s, profit_hit=%s' % (
context.current_dt.strftime('%Y-%m-%d'),
stockPos,
str(getattr(hold, 'total_amount', getattr(hold, 'amount', 'None'))),
_fmt_num(avg_cost, 4),
_fmt_num(current_price, 4),
_fmt_num(return_ratio * 100, 2),
_fmt_num(g.LossRate, 4),
_fmt_num(g.CloseRate, 4),
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4),
str(grid_paused),
str(stockPos in stock_list),
str(stop_hit),
str(profit_hit)
))
if stop_hit:
order_target_value(stockPos, 0)
g.changeYN = 1
log.error('止损清仓:%s,当前价=%.2f,成本价=%.2f,收益率=%.2f%%,high_limit=%s,bar.close=%s' % (
stockPos,
current_price,
hold.avg_cost,
return_ratio * 100,
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4)
))
# S3、今日高开、今日未涨停则清仓
elif profit_hit:
order_target_value(stockPos, 0)
g.changeYN = 1
log.warn('止盈清仓:%s,当前价=%s,成本价=%s,收益率=%s%%,high_limit=%s,bar.close=%s' % (
stockPos,
_fmt_num(current_price, 4),
_fmt_num(hold.avg_cost, 4),
_fmt_num(return_ratio * 100, 2),
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4)
))
else:
g.changeYN = 0
if g.changeYN == 1:
sell_list = list(context.portfolio.positions.keys())
replacement_candidates = [stock for stock in stock_list if stockPos != stock and stock not in sell_list]
_log_detail_items(
'补仓候选[%s][%s]' % (context.current_dt.strftime('%Y-%m-%d'), stockPos),
[_describe_stock(stock) for stock in replacement_candidates],
limit=min(len(replacement_candidates), g.debug_log_limit),
chunk=6
)
# log.warn('portfoliocash' + str(context.portfolio.cash))
for stock in stock_list:
if len(context.portfolio.positions.keys()) < g.stocknum:
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
if stockPos != stock and stock not in sell_list :
## 分配资金
if len(context.portfolio.positions) < g.stocknum :
Num = g.stocknum - len(context.portfolio.positions)
Cash = context.portfolio.cash * g.TR / Num
#gridbuy = get_price(stock, start_date=g.start, end_date=g.today, fields=['open', 'high', 'low_limit', 'close', 'high_limit'])
#if gridbuy.open[-1] > gridbuy.low_limit[-1]:
log.info('补仓买入:卖出=%s,买入=%s,Cash=%s,Num=%d,TR=%s,持仓数=%d' % (
stockPos,
stock,
_fmt_num(Cash, 2),
Num,
_fmt_num(g.TR, 4),
len(context.portfolio.positions)
))
order_value(stock, Cash)
#else :
# log.warn("忽略跌停股票:" + stock)
break
else:
continue
if g.days%g.refresh_rate == 0:
log.info('定期调仓触发:日期=%s,g.days=%d,refresh_rate=%d,TR=%s' % (
context.current_dt.strftime('%Y-%m-%d'),
g.days,
g.refresh_rate,
_fmt_num(g.TR, 4)
))
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
rebalance_sell_list = [stock for stock in sell_list if stock not in stock_list]
_log_detail_items(
'定期调仓卖出名单',
[_describe_stock(stock) for stock in rebalance_sell_list],
limit=len(rebalance_sell_list),
chunk=8
)
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
if stock not in stock_list:
log.info('定期调仓卖出:%s' % stock)
order_target_value(stock, 0)
## 分配资金
if len(context.portfolio.positions) < g.stocknum :
Num = g.stocknum - len(context.portfolio.positions)
# Cash = context.portfolio.cash/Num * g.TR
Cash = context.portfolio.cash * g.TR / g.stocknum
else:
Cash = 0
log.info('定期调仓资金分配Cash=%s,Num=%s,cash=%s,total_value=%s' % (
_fmt_num(Cash, 2),
str(Num if len(context.portfolio.positions) < g.stocknum else 0),
_fmt_num(context.portfolio.cash, 2),
_fmt_num(context.portfolio.total_value, 2)
))
## 买入股票
for stock in stock_list:
if len(context.portfolio.positions.keys()) < g.stocknum:
if stock not in sell_list:
log.info('定期调仓买入:%s,Cash=%s' % (stock, _fmt_num(Cash, 2)))
order_value(stock, Cash)
# 天计数加一
g.days = 1
else:
g.days += 1
# 过滤日期
# 一月十号到三十一号
# 四月十号到四月二十九日 期间
# 八月十日到八月三十一日
# 十月二十日 到十月三十日
def validate_date():
# date = g.todayDT.strftime('%Y-%m-%d')
# date = datetime.strptime(g.todayDT, "%Y-%m-%d")
date = g.today
# 检查是否是4月10日到4月29日之间
if date.month == 1 and 15 <= date.day <= 30:
g.OpenYN = 0
elif date.month == 4 and 15 <= date.day <= 29:
g.OpenYN = 0
elif date.month == 8 and 15 <= date.day <= 31:
g.OpenYN = 0
elif date.month == 10 and 20 <= date.day <= 30:
g.OpenYN = 0
elif date.month == 12 and 20 <= date.day <= 30:
g.OpenYN = 0
else :
g.OpenYN = 1
# 过滤停牌股票
def filter_paused_stock(stock_list, caller='unknown'):
curr_data = get_current_data()
raw_count = len(stock_list)
risk_filtered = []
risk_removed = []
for stock in stock_list:
reasons = []
if curr_data[stock].day_open == curr_data[stock].high_limit:
reasons.append('涨停开盘')
if curr_data[stock].day_open == curr_data[stock].low_limit:
reasons.append('跌停开盘')
if curr_data[stock].last_price == curr_data[stock].high_limit:
reasons.append('当前涨停')
if curr_data[stock].last_price == curr_data[stock].low_limit:
reasons.append('当前跌停')
if curr_data[stock].paused:
reasons.append('停牌')
if curr_data[stock].is_st:
reasons.append('ST')
if 'ST' in curr_data[stock].name:
reasons.append('名称含ST')
if '*' in curr_data[stock].name:
reasons.append('名称含*')
if '退' in curr_data[stock].name:
reasons.append('名称含退')
if stock.startswith('688'):
reasons.append('科创板')
if reasons:
risk_removed.append(_describe_stock(stock, [
'原因=' + '/'.join(reasons),
'open=' + _fmt_num(curr_data[stock].day_open, 2),
'last=' + _fmt_num(curr_data[stock].last_price, 2),
'high_limit=' + _fmt_num(curr_data[stock].high_limit, 2),
'low_limit=' + _fmt_num(curr_data[stock].low_limit, 2),
'name=' + str(curr_data[stock].name)
]))
else:
risk_filtered.append(stock)
log.info('风险过滤[%s]: %d -> %d, 移除=%d' % (
caller,
raw_count,
len(risk_filtered),
len(risk_removed)
))
_log_detail_items(
'风险过滤移除[%s]' % caller,
risk_removed,
limit=min(len(risk_removed), g.debug_log_limit),
chunk=4
)
one_yuan_filtered = []
one_yuan_removed = []
for stock in risk_filtered:
if curr_data[stock].day_open > 1:
one_yuan_filtered.append(stock)
else:
one_yuan_removed.append(_describe_stock(stock, [
'原因=1元股过滤',
'open=' + _fmt_num(curr_data[stock].day_open, 2),
'last=' + _fmt_num(curr_data[stock].last_price, 2)
]))
log.info('1元股过滤[%s]: %d -> %d, 移除=%d' % (
caller,
len(risk_filtered),
len(one_yuan_filtered),
len(one_yuan_removed)
))
_log_detail_items(
'1元股过滤移除[%s]' % caller,
one_yuan_removed,
limit=min(len(one_yuan_removed), g.debug_log_limit),
chunk=4
)
new_list = []
ma_pass_details = {}
ma_removed_details = []
for stock in one_yuan_filtered:
# 获取股票的收盘价
close5_data = get_bars(stock, count=5, unit='1d', fields=['close'])
# 获取股票的收盘价
close10_data = get_bars(stock, count=10, unit='1d', fields=['close'])
# 获取股票的收盘价
close20_data = get_bars(stock, count=20, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close5_data['close'].mean()
# 取得过去十天的平均价格
MA10 = close10_data['close'].mean()
# 取得过去二十天的平均价格
MA20 = close20_data['close'].mean()
# 取得上一时间点价格
# current_price = close_data['close'][-1]
close5_list = [round(float(v), 2) for v in _safe_bar_values(close5_data, 'close')]
ma_condition_1 = MA5 > MA10 * g.RSIRate
ma_condition_2 = MA10 > MA20
detail = _describe_stock(stock, [
'close[-5:]=' + str(close5_list),
'ma5=' + _fmt_num(MA5, 4),
'ma10=' + _fmt_num(MA10, 4),
'ma20=' + _fmt_num(MA20, 4),
'ma5>ma10*RSIRate=' + str(ma_condition_1),
'ma10>ma20=' + str(ma_condition_2)
])
if MA5 > MA10*g.RSIRate> MA20*g.RSIRate:
new_list.append(stock)
ma_pass_details[stock] = detail
else:
ma_removed_details.append(detail)
log.info('均线过滤[%s]: %d -> %d, 移除=%d' % (
caller,
len(one_yuan_filtered),
len(new_list),
len(ma_removed_details)
))
_log_detail_items(
'均线过滤移除[%s]' % caller,
ma_removed_details,
limit=min(len(ma_removed_details), g.debug_log_limit),
chunk=3
)
ma_boundary_details = [ma_pass_details[stock] for stock in new_list[:min(len(new_list), g.stocknum + g.debug_boundary_window)]]
_log_detail_items(
'均线通过前段[%s]' % caller,
ma_boundary_details,
limit=len(ma_boundary_details),
chunk=3
)
return new_list