Add subscription guards for platform trading actions

This commit is contained in:
boris
2026-04-23 19:02:25 -07:00
parent 58836a1c37
commit 86e4db6272
3 changed files with 143 additions and 17 deletions

View File

@@ -186,6 +186,7 @@ pub struct PlatformExprStrategyConfig {
pub rotation_enabled: bool, pub rotation_enabled: bool,
pub explicit_action_stage: PlatformExplicitActionStage, pub explicit_action_stage: PlatformExplicitActionStage,
pub explicit_action_schedule: Option<PlatformRebalanceSchedule>, pub explicit_action_schedule: Option<PlatformRebalanceSchedule>,
pub subscription_guard_required: bool,
pub explicit_actions: Vec<PlatformTradeAction>, pub explicit_actions: Vec<PlatformTradeAction>,
} }
@@ -235,6 +236,7 @@ fn band_low(index_close) {
rotation_enabled: true, rotation_enabled: true,
explicit_action_stage: PlatformExplicitActionStage::OnDay, explicit_action_stage: PlatformExplicitActionStage::OnDay,
explicit_action_schedule: None, explicit_action_schedule: None,
subscription_guard_required: false,
explicit_actions: Vec::new(), explicit_actions: Vec::new(),
} }
} }
@@ -1215,6 +1217,10 @@ impl PlatformExprStrategy {
); );
scope.push("has_subscriptions", ctx.has_subscriptions()); scope.push("has_subscriptions", ctx.has_subscriptions());
scope.push("subscription_count", ctx.subscription_count() as i64); scope.push("subscription_count", ctx.subscription_count() as i64);
scope.push(
"subscription_guard_required",
self.config.subscription_guard_required,
);
scope.push("has_process_events", ctx.has_process_events()); scope.push("has_process_events", ctx.has_process_events());
scope.push("process_event_count", ctx.process_event_count() as i64); scope.push("process_event_count", ctx.process_event_count() as i64);
scope.push( scope.push(
@@ -1346,6 +1352,10 @@ impl PlatformExprStrategy {
"subscription_count".into(), "subscription_count".into(),
Dynamic::from(ctx.subscription_count() as i64), Dynamic::from(ctx.subscription_count() as i64),
); );
day_factors.insert(
"subscription_guard_required".into(),
Dynamic::from(self.config.subscription_guard_required),
);
day_factors.insert( day_factors.insert(
"has_process_events".into(), "has_process_events".into(),
Dynamic::from(ctx.has_process_events()), Dynamic::from(ctx.has_process_events()),
@@ -2517,8 +2527,9 @@ impl PlatformExprStrategy {
ctx: &StrategyContext<'_>, ctx: &StrategyContext<'_>,
date: NaiveDate, date: NaiveDate,
day: &DayExpressionState, day: &DayExpressionState,
) -> Result<Vec<OrderIntent>, BacktestError> { ) -> Result<(Vec<OrderIntent>, Vec<String>), BacktestError> {
let mut intents = Vec::new(); let mut intents = Vec::new();
let mut diagnostics = Vec::new();
for action in &self.config.explicit_actions { for action in &self.config.explicit_actions {
match action { match action {
PlatformTradeAction::Order { PlatformTradeAction::Order {
@@ -2540,6 +2551,13 @@ impl PlatformExprStrategy {
)? { )? {
continue; continue;
} }
if self.config.subscription_guard_required && !ctx.is_subscribed(symbol) {
diagnostics.push(format!(
"subscription_guard_denied symbol={} action={:?}",
symbol, kind
));
continue;
}
match kind { match kind {
PlatformExplicitOrderKind::Shares => { PlatformExplicitOrderKind::Shares => {
let quantity = let quantity =
@@ -2900,11 +2918,26 @@ impl PlatformExprStrategy {
if !self.action_when_matches(ctx, day, None, when_expr.as_deref())? { if !self.action_when_matches(ctx, day, None, when_expr.as_deref())? {
continue; continue;
} }
let target_weights = let mut target_weights =
self.eval_float_map_expr(ctx, target_weights_expr, day, None, None)?; self.eval_float_map_expr(ctx, target_weights_expr, day, None, None)?;
if target_weights.is_empty() { if target_weights.is_empty() {
continue; continue;
} }
if self.config.subscription_guard_required {
let before = target_weights.len();
target_weights.retain(|symbol, _| ctx.is_subscribed(symbol));
let removed = before.saturating_sub(target_weights.len());
if removed > 0 {
diagnostics.push(format!(
"subscription_guard_filtered target_portfolio_smart removed={} remaining={}",
removed,
target_weights.len()
));
}
if target_weights.is_empty() {
continue;
}
}
let order_prices = order_prices_expr let order_prices = order_prices_expr
.as_deref() .as_deref()
.map(|expr| { .map(|expr| {
@@ -2926,7 +2959,7 @@ impl PlatformExprStrategy {
} }
} }
} }
Ok(intents) Ok((intents, diagnostics))
} }
fn explicit_action_decision( fn explicit_action_decision(
@@ -2934,8 +2967,9 @@ impl PlatformExprStrategy {
ctx: &StrategyContext<'_>, ctx: &StrategyContext<'_>,
) -> Result<StrategyDecision, BacktestError> { ) -> Result<StrategyDecision, BacktestError> {
let day = self.day_state(ctx, ctx.execution_date)?; let day = self.day_state(ctx, ctx.execution_date)?;
let order_intents = self.explicit_action_intents(ctx, ctx.execution_date, &day)?; let (order_intents, action_diagnostics) =
let diagnostics = vec![format!( self.explicit_action_intents(ctx, ctx.execution_date, &day)?;
let mut diagnostics = vec![format!(
"platform_expr signal={} last={:.2} explicit_actions={} stage={}", "platform_expr signal={} last={:.2} explicit_actions={} stage={}",
self.config.signal_symbol, self.config.signal_symbol,
day.signal_close, day.signal_close,
@@ -2945,6 +2979,7 @@ impl PlatformExprStrategy {
PlatformExplicitActionStage::OnDay => "on_day", PlatformExplicitActionStage::OnDay => "on_day",
} }
)]; )];
diagnostics.extend(action_diagnostics);
Ok(StrategyDecision { Ok(StrategyDecision {
rebalance: false, rebalance: false,
target_weights: BTreeMap::new(), target_weights: BTreeMap::new(),
@@ -3327,13 +3362,13 @@ impl Strategy for PlatformExprStrategy {
} }
let day = self.day_state(ctx, date)?; let day = self.day_state(ctx, date)?;
let explicit_action_intents = if self.config.explicit_action_stage let (explicit_action_intents, explicit_action_diagnostics) =
== PlatformExplicitActionStage::OnDay if self.config.explicit_action_stage == PlatformExplicitActionStage::OnDay
&& self.explicit_actions_active(ctx.data.calendar(), date) && self.explicit_actions_active(ctx.data.calendar(), date)
{ {
self.explicit_action_intents(ctx, date, &day)? self.explicit_action_intents(ctx, date, &day)?
} else { } else {
Vec::new() (Vec::new(), Vec::new())
}; };
let mut selection_notes = Vec::new(); let mut selection_notes = Vec::new();
let trading_ratio = if self.config.rotation_enabled { let trading_ratio = if self.config.rotation_enabled {
@@ -3557,6 +3592,7 @@ impl Strategy for PlatformExprStrategy {
"platform strategy script executed through expression runtime + bid1/ask1 snapshot execution".to_string(), "platform strategy script executed through expression runtime + bid1/ask1 snapshot execution".to_string(),
]; ];
diagnostics.extend(selection_notes); diagnostics.extend(selection_notes);
diagnostics.extend(explicit_action_diagnostics);
let notes = vec![ let notes = vec![
format!("stock_list={}", stock_list.len()), format!("stock_list={}", stock_list.len()),
@@ -4275,6 +4311,95 @@ mod tests {
} }
} }
#[test]
fn platform_strategy_filters_target_portfolio_smart_by_subscription_guard() {
let date = d(2025, 2, 3);
let data = DataSet::from_components(
vec![],
vec![DailyMarketSnapshot {
date,
symbol: "000001.SH".to_string(),
timestamp: Some("2025-02-03 10:18:00".to_string()),
day_open: 1000.0,
open: 1000.0,
high: 1002.0,
low: 998.0,
close: 1001.0,
last_price: 1001.0,
bid1: 1000.5,
ask1: 1001.5,
prev_close: 999.0,
volume: 100_000,
tick_volume: 5_000,
bid1_volume: 2_500,
ask1_volume: 2_500,
trading_phase: Some("continuous".to_string()),
paused: false,
upper_limit: 1098.9,
lower_limit: 899.1,
price_tick: 0.01,
}],
vec![],
vec![],
vec![BenchmarkSnapshot {
date,
benchmark: "000852.SH".to_string(),
open: 1000.0,
close: 1001.0,
prev_close: 999.0,
volume: 100_000,
}],
)
.expect("dataset");
let portfolio = PortfolioState::new(1_000_000.0);
let subscriptions = BTreeSet::from(["000001.SZ".to_string()]);
let ctx = StrategyContext {
execution_date: date,
decision_date: date,
decision_index: 0,
data: &data,
portfolio: &portfolio,
open_orders: &[],
dynamic_universe: None,
subscriptions: &subscriptions,
process_events: &[],
active_process_event: None,
};
let mut cfg = PlatformExprStrategyConfig::microcap_rotation();
cfg.signal_symbol = "000001.SH".to_string();
cfg.rotation_enabled = false;
cfg.subscription_guard_required = true;
cfg.benchmark_short_ma_days = 1;
cfg.benchmark_long_ma_days = 1;
cfg.explicit_actions = vec![PlatformTradeAction::TargetPortfolioSmart {
target_weights_expr: "{\"000001.SZ\": 0.30, \"000002.SZ\": 0.20}".to_string(),
order_prices_expr: Some("TWAPOrder(1000, 1030)".to_string()),
valuation_prices_expr: Some(
"{\"000001.SZ\": signal_close, \"000002.SZ\": signal_close}".to_string(),
),
when_expr: Some("subscription_guard_required".to_string()),
reason: "guarded_target_portfolio_smart".to_string(),
}];
let mut strategy = PlatformExprStrategy::new(cfg);
let decision = strategy.on_day(&ctx).expect("platform decision");
assert!(
decision
.diagnostics
.iter()
.any(|item| item.contains("subscription_guard_filtered target_portfolio_smart"))
);
assert_eq!(decision.order_intents.len(), 1);
match &decision.order_intents[0] {
crate::strategy::OrderIntent::TargetPortfolioSmart { target_weights, .. } => {
assert_eq!(target_weights.len(), 1);
assert_eq!(target_weights.get("000001.SZ").copied(), Some(0.30));
}
other => panic!("unexpected guarded target portfolio intent: {other:?}"),
}
}
#[test] #[test]
fn platform_strategy_emits_explicit_actions_in_open_auction_stage() { fn platform_strategy_emits_explicit_actions_in_open_auction_stage() {
let date = d(2025, 2, 3); let date = d(2025, 2, 3);

View File

@@ -120,7 +120,7 @@ pub fn built_in_strategy_manual() -> StrategyAiManual {
}, },
ManualSection { ManualSection {
title: "trading.rotation / order.* / cancel.* / update_universe / subscribe".to_string(), title: "trading.rotation / order.* / cancel.* / update_universe / subscribe".to_string(),
detail: "支持显式下单、撤单、AlgoOrder 和动态 universe 管理。可以用 trading.rotation(false) 关闭默认轮动链路,再用 trading.stage(\"open_auction\" | \"on_day\") 指定执行阶段用 trading.schedule.daily().at([\"10:18\"]) / trading.schedule.weekly(weekday=5).at([\"10:18\"]) / trading.schedule.weekly(tradingday=-1).at([\"10:18\"]) / trading.schedule.monthly(tradingday=1).at([\"10:18\"]) 指定触发频率和分钟级 time_rule然后写 order.shares(\"600000.SH\", 1000)、order.target_shares(\"600000.SH\", 2000)、order.value(\"600000.SH\", cash * 0.25)、order.target_percent(\"600000.SH\", 0.05)、order.limit_value(\"600000.SH\", cash * 0.25, open * 0.99)、order.vwap_value(\"600000.SH\", cash * 0.25, \"09:31\", \"09:40\")、order.twap_percent(\"600000.SH\", 0.05, \"10:00\", \"10:30\")、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices=VWAPOrder(930, 940), valuation_prices={\"600000.SH\": prev_close})、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices={\"600000.SH\": open * 0.99}, valuation_prices={\"600000.SH\": prev_close})、cancel.order(12345)、cancel.symbol(\"600000.SH\")、cancel.all()、update_universe([\"600000.SH\", \"000001.SZ\"])、subscribe([\"000001.SZ\"])、unsubscribe([\"000001.SZ\"])。其中 order.target_shares(...) 对应 rqalpha 的 order_toorder.target_portfolio_smart(...) 对应 rqalpha 的 order_target_portfolio_smart 批量目标权重语义order_prices 既可以是逐标的限价映射,也可以是 VWAPOrder/TWAPOrder 这类全局 AlgoOrderorder.vwap_* / order.twap_* 对应 rqalpha 的 AlgoOrder 时间窗订单风格,而 update_universe/subscribe/unsubscribe 对应 rqalpha 的动态 universe 与订阅接口。symbol 使用标准证券代码数量、金额、仓位、时间窗、限价、order_id 和 symbol 列表都支持表达式;这些语句也支持放进 when/unless 条件块。".to_string(), detail: "支持显式下单、撤单、AlgoOrder 和动态 universe 管理。可以用 trading.rotation(false) 关闭默认轮动链路,再用 trading.stage(\"open_auction\" | \"on_day\") 指定执行阶段;需要模拟 rqalpha 的 tick 订阅保护时,可写 trading.subscription_guard(true),未订阅 symbol 的显式订单会被拦截TargetPortfolioSmart + AlgoOrder 会过滤未订阅标的。用 trading.schedule.daily().at([\"10:18\"]) / trading.schedule.weekly(weekday=5).at([\"10:18\"]) / trading.schedule.weekly(tradingday=-1).at([\"10:18\"]) / trading.schedule.monthly(tradingday=1).at([\"10:18\"]) 指定触发频率和分钟级 time_rule然后写 order.shares(\"600000.SH\", 1000)、order.target_shares(\"600000.SH\", 2000)、order.value(\"600000.SH\", cash * 0.25)、order.target_percent(\"600000.SH\", 0.05)、order.limit_value(\"600000.SH\", cash * 0.25, open * 0.99)、order.vwap_value(\"600000.SH\", cash * 0.25, \"09:31\", \"09:40\")、order.twap_percent(\"600000.SH\", 0.05, \"10:00\", \"10:30\")、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices=VWAPOrder(930, 940), valuation_prices={\"600000.SH\": prev_close})、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices={\"600000.SH\": open * 0.99}, valuation_prices={\"600000.SH\": prev_close})、cancel.order(12345)、cancel.symbol(\"600000.SH\")、cancel.all()、update_universe([\"600000.SH\", \"000001.SZ\"])、subscribe([\"000001.SZ\"])、unsubscribe([\"000001.SZ\"])。其中 order.target_shares(...) 对应 rqalpha 的 order_toorder.target_portfolio_smart(...) 对应 rqalpha 的 order_target_portfolio_smart 批量目标权重语义order_prices 既可以是逐标的限价映射,也可以是 VWAPOrder/TWAPOrder 这类全局 AlgoOrderorder.vwap_* / order.twap_* 对应 rqalpha 的 AlgoOrder 时间窗订单风格,而 update_universe/subscribe/unsubscribe 对应 rqalpha 的动态 universe 与订阅接口。symbol 使用标准证券代码数量、金额、仓位、时间窗、限价、order_id 和 symbol 列表都支持表达式;这些语句也支持放进 when/unless 条件块。".to_string(),
}, },
ManualSection { ManualSection {
title: "when / unless / else".to_string(), title: "when / unless / else".to_string(),
@@ -142,6 +142,7 @@ pub fn built_in_strategy_manual() -> StrategyAiManual {
ManualField { name: "open_buy_qty/open_sell_qty/latest_open_order_id".to_string(), field_type: "int".to_string(), detail: "当前阶段未成交买卖挂单的剩余数量汇总,以及最近一笔挂单 id。".to_string() }, ManualField { name: "open_buy_qty/open_sell_qty/latest_open_order_id".to_string(), field_type: "int".to_string(), detail: "当前阶段未成交买卖挂单的剩余数量汇总,以及最近一笔挂单 id。".to_string() },
ManualField { name: "has_dynamic_universe/dynamic_universe_count".to_string(), field_type: "bool/int".to_string(), detail: "当前策略上下文是否存在动态 universe以及动态 universe 内证券数量。".to_string() }, ManualField { name: "has_dynamic_universe/dynamic_universe_count".to_string(), field_type: "bool/int".to_string(), detail: "当前策略上下文是否存在动态 universe以及动态 universe 内证券数量。".to_string() },
ManualField { name: "has_subscriptions/subscription_count".to_string(), field_type: "bool/int".to_string(), detail: "当前订阅集合是否为空,以及订阅证券数量。".to_string() }, ManualField { name: "has_subscriptions/subscription_count".to_string(), field_type: "bool/int".to_string(), detail: "当前订阅集合是否为空,以及订阅证券数量。".to_string() },
ManualField { name: "subscription_guard_required".to_string(), field_type: "bool".to_string(), detail: "当前显式交易是否启用订阅保护;启用后未订阅标的的显式订单会被拒绝生成。".to_string() },
ManualField { name: "has_process_events/process_event_count/process_event_counts".to_string(), field_type: "bool/int/map".to_string(), detail: "当前阶段可见的过程事件摘要process_event_counts[\"trade\"] 这类写法可直接读取当天事件计数。".to_string() }, ManualField { name: "has_process_events/process_event_count/process_event_counts".to_string(), field_type: "bool/int/map".to_string(), detail: "当前阶段可见的过程事件摘要process_event_counts[\"trade\"] 这类写法可直接读取当天事件计数。".to_string() },
ManualField { name: "current_process_kind/current_process_order_id/current_process_symbol/current_process_side/current_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前正在回调的过程事件上下文;没有活动事件时为空字符串或 0。".to_string() }, ManualField { name: "current_process_kind/current_process_order_id/current_process_symbol/current_process_side/current_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前正在回调的过程事件上下文;没有活动事件时为空字符串或 0。".to_string() },
ManualField { name: "latest_process_kind/latest_process_order_id/latest_process_symbol/latest_process_side/latest_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前阶段最近一条过程事件的摘要,可用于让 on_day/open_auction 逻辑响应 earlier lifecycle 或订单事件。".to_string() }, ManualField { name: "latest_process_kind/latest_process_order_id/latest_process_symbol/latest_process_side/latest_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前阶段最近一条过程事件的摘要,可用于让 on_day/open_auction 逻辑响应 earlier lifecycle 或订单事件。".to_string() },

View File

@@ -31,7 +31,7 @@ current alignment pass.
- [x] `update_universe` - [x] `update_universe`
- [x] `subscribe` - [x] `subscribe`
- [x] `unsubscribe` - [x] `unsubscribe`
- [ ] tick-frequency subscription guards exposed at strategy API level - [x] tick-frequency subscription guards exposed at strategy API level
### Phase 4: Algo order parity ### Phase 4: Algo order parity
@@ -57,5 +57,5 @@ current alignment pass.
## Current Step ## Current Step
Active implementation target: Phase 2/3 follow-up, finer `1m`/`tick` Active implementation target: Phase 2 follow-up, finer `1m`/`tick`
strategy execution entrypoints and subscription guards. strategy execution entrypoints beyond the current explicit intraday schedules.