From 152b5c314192a8df1af0d1739e4ce45909dee2ec Mon Sep 17 00:00:00 2001 From: boris Date: Thu, 23 Apr 2026 07:12:56 -0700 Subject: [PATCH] Add dynamic universe and subscription controls --- crates/bt-demo/src/main.rs | 4 + crates/fidc-core/src/broker.rs | 24 ++ crates/fidc-core/src/engine.rs | 299 +++++++++++++++++- crates/fidc-core/src/events.rs | 6 + crates/fidc-core/src/lib.rs | 2 +- .../fidc-core/src/platform_expr_strategy.rs | 276 +++++++++++++++- crates/fidc-core/src/strategy.rs | 71 ++++- crates/fidc-core/src/strategy_ai.rs | 11 +- crates/fidc-core/src/universe.rs | 27 +- crates/fidc-core/tests/engine_hooks.rs | 252 +++++++++++++++ crates/fidc-core/tests/strategy_selection.rs | 7 + docs/rqalpha-gap-roadmap.md | 8 +- 12 files changed, 963 insertions(+), 24 deletions(-) diff --git a/crates/bt-demo/src/main.rs b/crates/bt-demo/src/main.rs index 8e83328..b671c31 100644 --- a/crates/bt-demo/src/main.rs +++ b/crates/bt-demo/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::error::Error; use std::fs; use std::io::Write; @@ -117,6 +118,7 @@ fn main() -> Result<(), Box> { eprintln!(" {} {:.6}", row.symbol, row.market_cap_bn); } let mut debug_strategy = JqMicroCapStrategy::new(strategy_cfg.clone()); + let debug_subscriptions = BTreeSet::new(); let decision = debug_strategy.on_day(&StrategyContext { execution_date: date, decision_date: date, @@ -124,6 +126,8 @@ fn main() -> Result<(), Box> { data: &data, portfolio: &PortfolioState::new(10_000_000.0), open_orders: &[], + dynamic_universe: None, + subscriptions: &debug_subscriptions, process_events: &[], active_process_event: None, })?; diff --git a/crates/fidc-core/src/broker.rs b/crates/fidc-core/src/broker.rs index 84e786c..cdd203c 100644 --- a/crates/fidc-core/src/broker.rs +++ b/crates/fidc-core/src/broker.rs @@ -787,6 +787,30 @@ where self.cancel_all_open_orders(date, reason, report); Ok(()) } + OrderIntent::UpdateUniverse { symbols, reason } => { + report.diagnostics.push(format!( + "engine_control_intent_skipped kind=update_universe count={} reason={}", + symbols.len(), + reason + )); + Ok(()) + } + OrderIntent::Subscribe { symbols, reason } => { + report.diagnostics.push(format!( + "engine_control_intent_skipped kind=subscribe count={} reason={}", + symbols.len(), + reason + )); + Ok(()) + } + OrderIntent::Unsubscribe { symbols, reason } => { + report.diagnostics.push(format!( + "engine_control_intent_skipped kind=unsubscribe count={} reason={}", + symbols.len(), + reason + )); + Ok(()) + } } } diff --git a/crates/fidc-core/src/engine.rs b/crates/fidc-core/src/engine.rs index 6505a7e..49709fc 100644 --- a/crates/fidc-core/src/engine.rs +++ b/crates/fidc-core/src/engine.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use chrono::NaiveDate; use serde::Serialize; use thiserror::Error; @@ -97,6 +99,8 @@ pub struct BacktestEngine { config: BacktestConfig, dividend_reinvestment: bool, process_event_bus: ProcessEventBus, + dynamic_universe: Option>, + subscriptions: BTreeSet, } impl BacktestEngine { @@ -113,6 +117,8 @@ impl BacktestEngine { config, dividend_reinvestment: false, process_event_bus: ProcessEventBus::new(), + dynamic_universe: None, + subscriptions: BTreeSet::new(), } } @@ -146,6 +152,143 @@ where C: CostModel, R: EquityRuleHooks, { + fn apply_strategy_directives( + &mut self, + execution_date: NaiveDate, + decision_date: NaiveDate, + decision_index: usize, + portfolio: &PortfolioState, + open_orders: &[crate::strategy::OpenOrderView], + process_events: &mut Vec, + decision: &mut crate::strategy::StrategyDecision, + ) -> Result<(), BacktestError> { + if decision.order_intents.is_empty() { + return Ok(()); + } + + let mut retained = Vec::with_capacity(decision.order_intents.len()); + for intent in decision.order_intents.drain(..) { + match intent { + crate::strategy::OrderIntent::UpdateUniverse { symbols, reason } => { + let symbol_count = symbols.len(); + self.dynamic_universe = Some(symbols.clone()); + decision + .diagnostics + .push(format!("dynamic_universe_updated count={symbol_count}")); + publish_custom_process_event( + &mut self.strategy, + &mut self.process_event_bus, + execution_date, + decision_date, + decision_index, + &self.data, + portfolio, + open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, + process_events, + ProcessEvent { + date: execution_date, + kind: ProcessEventKind::UniverseUpdated, + order_id: None, + symbol: (symbol_count == 1) + .then(|| symbols.iter().next().cloned()) + .flatten(), + side: None, + detail: format!( + "reason={reason} count={symbol_count} symbols={}", + symbols.iter().cloned().collect::>().join(",") + ), + }, + )?; + } + crate::strategy::OrderIntent::Subscribe { symbols, reason } => { + let mut added = Vec::new(); + for symbol in symbols { + if self.subscriptions.insert(symbol.clone()) { + added.push(symbol); + } + } + if !added.is_empty() { + decision.diagnostics.push(format!( + "subscriptions_added count={} total={}", + added.len(), + self.subscriptions.len() + )); + publish_custom_process_event( + &mut self.strategy, + &mut self.process_event_bus, + execution_date, + decision_date, + decision_index, + &self.data, + portfolio, + open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, + process_events, + ProcessEvent { + date: execution_date, + kind: ProcessEventKind::UniverseSubscribed, + order_id: None, + symbol: (added.len() == 1).then(|| added[0].clone()), + side: None, + detail: format!( + "reason={reason} count={} symbols={}", + added.len(), + added.join(",") + ), + }, + )?; + } + } + crate::strategy::OrderIntent::Unsubscribe { symbols, reason } => { + let mut removed = Vec::new(); + for symbol in symbols { + if self.subscriptions.remove(&symbol) { + removed.push(symbol); + } + } + if !removed.is_empty() { + decision.diagnostics.push(format!( + "subscriptions_removed count={} total={}", + removed.len(), + self.subscriptions.len() + )); + publish_custom_process_event( + &mut self.strategy, + &mut self.process_event_bus, + execution_date, + decision_date, + decision_index, + &self.data, + portfolio, + open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, + process_events, + ProcessEvent { + date: execution_date, + kind: ProcessEventKind::UniverseUnsubscribed, + order_id: None, + symbol: (removed.len() == 1).then(|| removed[0].clone()), + side: None, + detail: format!( + "reason={reason} count={} symbols={}", + removed.len(), + removed.join(",") + ), + }, + )?; + } + } + other => retained.push(other), + } + } + decision.order_intents = retained; + Ok(()) + } + pub fn run(&mut self) -> Result { self.run_with_progress(|_| {}) } @@ -245,6 +388,8 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreBeforeTrading, @@ -257,6 +402,8 @@ where data: &self.data, portfolio: &portfolio, open_orders: &pre_open_orders, + dynamic_universe: self.dynamic_universe.as_ref(), + subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, })?; @@ -269,12 +416,14 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::BeforeTrading, "before_trading", )?; - let _ = collect_scheduled_decisions( + let mut before_trading_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, @@ -285,10 +434,21 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::BeforeTrading), )?; + self.apply_strategy_directives( + execution_date, + decision_date, + decision_index, + &portfolio, + &pre_open_orders, + &mut process_events, + &mut before_trading_decision, + )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, @@ -298,6 +458,8 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostBeforeTrading, @@ -312,6 +474,8 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreOpenAuction, @@ -328,6 +492,8 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::OpenAuction), @@ -339,6 +505,8 @@ where data: &self.data, portfolio: &portfolio, open_orders: &pre_open_orders, + dynamic_universe: self.dynamic_universe.as_ref(), + subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, })?); @@ -351,11 +519,22 @@ where &self.data, &portfolio, &pre_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::OpenAuction, "open_auction", )?; + self.apply_strategy_directives( + execution_date, + decision_date, + decision_index, + &portfolio, + &pre_open_orders, + &mut process_events, + &mut auction_decision, + )?; let mut report = self.broker.execute( execution_date, &mut portfolio, @@ -372,6 +551,8 @@ where &self.data, &portfolio, &post_auction_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut report.process_events, )?; @@ -384,6 +565,8 @@ where &self.data, &portfolio, &post_auction_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostOpenAuction, @@ -399,6 +582,8 @@ where &self.data, &portfolio, &post_auction_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreOnDay, @@ -414,6 +599,8 @@ where data: &self.data, portfolio: &portfolio, open_orders: &on_day_open_orders, + dynamic_universe: self.dynamic_universe.as_ref(), + subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, }) @@ -431,6 +618,8 @@ where &self.data, &portfolio, &on_day_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::OnDay), @@ -444,11 +633,22 @@ where &self.data, &portfolio, &on_day_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::OnDay, "on_day", )?; + self.apply_strategy_directives( + execution_date, + decision_date, + decision_index, + &portfolio, + &on_day_open_orders, + &mut process_events, + &mut decision, + )?; let mut intraday_report = self.broker @@ -463,6 +663,8 @@ where &self.data, &portfolio, &post_intraday_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut intraday_report.process_events, )?; @@ -482,6 +684,8 @@ where &self.data, &portfolio, &post_intraday_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostOnDay, @@ -500,6 +704,8 @@ where &self.data, &portfolio, &post_trade_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreAfterTrading, @@ -512,6 +718,8 @@ where data: &self.data, portfolio: &portfolio, open_orders: &post_trade_open_orders, + dynamic_universe: self.dynamic_universe.as_ref(), + subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, })?; @@ -524,12 +732,14 @@ where &self.data, &portfolio, &post_trade_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::AfterTrading, "after_trading", )?; - let _ = collect_scheduled_decisions( + let mut after_trading_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, @@ -540,10 +750,21 @@ where &self.data, &portfolio, &post_trade_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::AfterTrading), )?; + self.apply_strategy_directives( + execution_date, + decision_date, + decision_index, + &portfolio, + &post_trade_open_orders, + &mut process_events, + &mut after_trading_decision, + )?; let mut close_report = self.broker.after_trading(execution_date); publish_process_events( &mut self.strategy, @@ -554,6 +775,8 @@ where &self.data, &portfolio, &post_trade_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut close_report.process_events, )?; @@ -572,6 +795,8 @@ where &self.data, &portfolio, &post_close_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostAfterTrading, @@ -586,6 +811,8 @@ where &self.data, &portfolio, &post_close_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreSettlement, @@ -598,6 +825,8 @@ where data: &self.data, portfolio: &portfolio, open_orders: &post_close_open_orders, + dynamic_universe: self.dynamic_universe.as_ref(), + subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, })?; @@ -610,12 +839,14 @@ where &self.data, &portfolio, &post_close_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::Settlement, "settlement", )?; - let _ = collect_scheduled_decisions( + let mut settlement_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, @@ -626,10 +857,21 @@ where &self.data, &portfolio, &post_close_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::Settlement), )?; + self.apply_strategy_directives( + execution_date, + decision_date, + decision_index, + &portfolio, + &post_close_open_orders, + &mut process_events, + &mut settlement_decision, + )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, @@ -639,6 +881,8 @@ where &self.data, &portfolio, &post_close_open_orders, + self.dynamic_universe.as_ref(), + &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostSettlement, @@ -1139,6 +1383,8 @@ fn collect_scheduled_decisions( data: &crate::data::DataSet, portfolio: &PortfolioState, open_orders: &[crate::strategy::OpenOrderView], + dynamic_universe: Option<&BTreeSet>, + subscriptions: &BTreeSet, process_events: &mut Vec, process_event_bus: &mut ProcessEventBus, current_time: Option, @@ -1154,6 +1400,8 @@ fn collect_scheduled_decisions( data, portfolio, open_orders, + dynamic_universe, + subscriptions, process_events, execution_date, ProcessEventKind::PreScheduled, @@ -1167,6 +1415,8 @@ fn collect_scheduled_decisions( data, portfolio, open_orders, + dynamic_universe, + subscriptions, process_events: process_events.as_slice(), active_process_event: None, }, @@ -1181,6 +1431,8 @@ fn collect_scheduled_decisions( data, portfolio, open_orders, + dynamic_universe, + subscriptions, process_events, execution_date, ProcessEventKind::PostScheduled, @@ -1199,6 +1451,8 @@ fn publish_phase_event( data: &crate::data::DataSet, portfolio: &PortfolioState, open_orders: &[crate::strategy::OpenOrderView], + dynamic_universe: Option<&BTreeSet>, + subscriptions: &BTreeSet, events: &mut Vec, date: NaiveDate, kind: ProcessEventKind, @@ -1221,6 +1475,8 @@ fn publish_phase_event( data, portfolio, open_orders, + dynamic_universe, + subscriptions, process_events, active_process_event: Some(&event), }; @@ -1238,6 +1494,8 @@ fn publish_process_events( data: &crate::data::DataSet, portfolio: &PortfolioState, open_orders: &[crate::strategy::OpenOrderView], + dynamic_universe: Option<&BTreeSet>, + subscriptions: &BTreeSet, target: &mut Vec, incoming: &mut Vec, ) -> Result<(), BacktestError> { @@ -1251,6 +1509,8 @@ fn publish_process_events( data, portfolio, open_orders, + dynamic_universe, + subscriptions, process_events, active_process_event: Some(&event), }; @@ -1260,6 +1520,39 @@ fn publish_process_events( Ok(()) } +fn publish_custom_process_event( + strategy: &mut S, + process_event_bus: &mut ProcessEventBus, + execution_date: NaiveDate, + decision_date: NaiveDate, + decision_index: usize, + data: &crate::data::DataSet, + portfolio: &PortfolioState, + open_orders: &[crate::strategy::OpenOrderView], + dynamic_universe: Option<&BTreeSet>, + subscriptions: &BTreeSet, + target: &mut Vec, + event: ProcessEvent, +) -> Result<(), BacktestError> { + process_event_bus.publish(&event); + let process_events = target.as_slice(); + let event_ctx = StrategyContext { + execution_date, + decision_date, + decision_index, + data, + portfolio, + open_orders, + dynamic_universe, + subscriptions, + process_events, + active_process_event: Some(&event), + }; + strategy.on_process_event(&event_ctx, &event)?; + target.push(event); + Ok(()) +} + fn stage_label(stage: ScheduleStage) -> &'static str { match stage { ScheduleStage::BeforeTrading => "before_trading", diff --git a/crates/fidc-core/src/events.rs b/crates/fidc-core/src/events.rs index 6b3f062..f964c31 100644 --- a/crates/fidc-core/src/events.rs +++ b/crates/fidc-core/src/events.rs @@ -127,6 +127,9 @@ pub enum ProcessEventKind { OrderCancellationReject, OrderUnsolicitedUpdate, Trade, + UniverseUpdated, + UniverseSubscribed, + UniverseUnsubscribed, } impl ProcessEventKind { @@ -157,6 +160,9 @@ impl ProcessEventKind { Self::OrderCancellationReject => "order_cancellation_reject", Self::OrderUnsolicitedUpdate => "order_unsolicited_update", Self::Trade => "trade", + Self::UniverseUpdated => "universe_updated", + Self::UniverseSubscribed => "universe_subscribed", + Self::UniverseUnsubscribed => "universe_unsubscribed", } } } diff --git a/crates/fidc-core/src/lib.rs b/crates/fidc-core/src/lib.rs index e40b9e2..e15d1a3 100644 --- a/crates/fidc-core/src/lib.rs +++ b/crates/fidc-core/src/lib.rs @@ -37,7 +37,7 @@ pub use metrics::{BacktestMetrics, compute_backtest_metrics}; pub use platform_expr_strategy::{ PlatformExplicitActionStage, PlatformExplicitCancelKind, PlatformExplicitOrderKind, PlatformExprStrategy, PlatformExprStrategyConfig, PlatformRebalanceSchedule, - PlatformScheduleFrequency, PlatformTradeAction, + PlatformScheduleFrequency, PlatformTradeAction, PlatformUniverseActionKind, }; pub use portfolio::{CashReceivable, HoldingSummary, PortfolioState, Position}; pub use rules::{ChinaEquityRuleHooks, EquityRuleHooks, RuleCheck}; diff --git a/crates/fidc-core/src/platform_expr_strategy.rs b/crates/fidc-core/src/platform_expr_strategy.rs index 85f9e1e..81eac99 100644 --- a/crates/fidc-core/src/platform_expr_strategy.rs +++ b/crates/fidc-core/src/platform_expr_strategy.rs @@ -100,6 +100,13 @@ pub enum PlatformExplicitCancelKind { All, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PlatformUniverseActionKind { + UpdateUniverse, + Subscribe, + Unsubscribe, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum PlatformTradeAction { Order { @@ -117,6 +124,12 @@ pub enum PlatformTradeAction { when_expr: Option, reason: String, }, + Universe { + kind: PlatformUniverseActionKind, + symbols_expr: String, + when_expr: Option, + reason: String, + }, Cancel { kind: PlatformExplicitCancelKind, symbol: Option, @@ -1186,6 +1199,13 @@ impl PlatformExprStrategy { scope.push("open_buy_qty", ctx.open_buy_quantity() as i64); scope.push("open_sell_qty", ctx.open_sell_quantity() as i64); scope.push("latest_open_order_id", ctx.latest_open_order_id() as i64); + scope.push("has_dynamic_universe", ctx.has_dynamic_universe()); + scope.push( + "dynamic_universe_count", + ctx.dynamic_universe_count() as i64, + ); + scope.push("has_subscriptions", ctx.has_subscriptions()); + scope.push("subscription_count", ctx.subscription_count() as i64); scope.push("has_process_events", ctx.has_process_events()); scope.push("process_event_count", ctx.process_event_count() as i64); scope.push( @@ -1301,6 +1321,22 @@ impl PlatformExprStrategy { "latest_open_order_id".into(), Dynamic::from(ctx.latest_open_order_id() as i64), ); + day_factors.insert( + "has_dynamic_universe".into(), + Dynamic::from(ctx.has_dynamic_universe()), + ); + day_factors.insert( + "dynamic_universe_count".into(), + Dynamic::from(ctx.dynamic_universe_count() as i64), + ); + day_factors.insert( + "has_subscriptions".into(), + Dynamic::from(ctx.has_subscriptions()), + ); + day_factors.insert( + "subscription_count".into(), + Dynamic::from(ctx.subscription_count() as i64), + ); day_factors.insert( "has_process_events".into(), Dynamic::from(ctx.has_process_events()), @@ -1410,6 +1446,11 @@ impl PlatformExprStrategy { "latest_symbol_open_order_id", ctx.latest_symbol_open_order_id(&stock.symbol) as i64, ); + scope.push( + "in_dynamic_universe", + ctx.dynamic_universe_contains(&stock.symbol), + ); + scope.push("is_subscribed", ctx.is_subscribed(&stock.symbol)); scope.push("stock_ma_short", stock.stock_ma_short); scope.push("stock_ma_mid", stock.stock_ma_mid); scope.push("stock_ma_long", stock.stock_ma_long); @@ -1501,6 +1542,14 @@ impl PlatformExprStrategy { "latest_symbol_open_order_id".into(), Dynamic::from(ctx.latest_symbol_open_order_id(&stock.symbol) as i64), ); + factors.insert( + "in_dynamic_universe".into(), + Dynamic::from(ctx.dynamic_universe_contains(&stock.symbol)), + ); + factors.insert( + "is_subscribed".into(), + Dynamic::from(ctx.is_subscribed(&stock.symbol)), + ); factors.insert("stock_ma5".into(), Dynamic::from(stock.stock_ma5)); factors.insert("stock_ma10".into(), Dynamic::from(stock.stock_ma10)); factors.insert("stock_ma20".into(), Dynamic::from(stock.stock_ma20)); @@ -2193,6 +2242,50 @@ impl PlatformExprStrategy { Ok(output) } + fn eval_symbol_set_expr( + &self, + ctx: &StrategyContext<'_>, + expr: &str, + day: &DayExpressionState, + stock: Option<&StockExpressionState>, + position: Option<&PositionExpressionState>, + ) -> Result, BacktestError> { + let trimmed = expr.trim(); + if trimmed.is_empty() { + return Ok(BTreeSet::new()); + } + let inner = trimmed + .strip_prefix('[') + .and_then(|value| value.strip_suffix(']')) + .ok_or_else(|| { + BacktestError::Execution(format!( + "platform symbol list expr must use [...] array literal syntax: {trimmed}" + )) + })?; + let mut output = BTreeSet::new(); + for entry in Self::split_top_level_args(inner) { + let raw = entry.trim(); + if raw.is_empty() { + continue; + } + let symbol = if raw.starts_with('"') || raw.starts_with('\'') { + Self::parse_string_literal_key(raw)? + } else { + let value = self.eval_dynamic(ctx, raw, day, stock, position)?; + value.try_cast::().ok_or_else(|| { + BacktestError::Execution(format!( + "platform symbol list entry must evaluate to string: {raw}" + )) + })? + }; + let normalized = symbol.trim().to_ascii_uppercase(); + if !normalized.is_empty() { + output.insert(normalized); + } + } + Ok(output) + } + fn split_top_level_key_value(input: &str) -> Option<(&str, &str)> { let mut paren_depth = 0i32; let mut brace_depth = 0i32; @@ -2553,6 +2646,34 @@ impl PlatformExprStrategy { } } } + PlatformTradeAction::Universe { + kind, + symbols_expr, + when_expr, + reason, + } => { + if !self.action_when_matches(ctx, day, None, when_expr.as_deref())? { + continue; + } + let symbols = self.eval_symbol_set_expr(ctx, symbols_expr, day, None, None)?; + if symbols.is_empty() { + continue; + } + intents.push(match kind { + PlatformUniverseActionKind::UpdateUniverse => OrderIntent::UpdateUniverse { + symbols, + reason: reason.clone(), + }, + PlatformUniverseActionKind::Subscribe => OrderIntent::Subscribe { + symbols, + reason: reason.clone(), + }, + PlatformUniverseActionKind::Unsubscribe => OrderIntent::Unsubscribe { + symbols, + reason: reason.clone(), + }, + }); + } PlatformTradeAction::TargetPortfolioSmart { target_weights_expr, order_prices_expr, @@ -2805,10 +2926,10 @@ impl PlatformExprStrategy { band_high: f64, limit: usize, ) -> Result<(Vec, Vec), BacktestError> { - let universe = ctx.data.eligible_universe_on(date); + let universe = ctx.eligible_universe_on(date); let mut diagnostics = Vec::new(); let mut candidates = Vec::new(); - for candidate in universe.iter().cloned() { + for candidate in universe { let stock = self.stock_state(ctx, date, &candidate.symbol)?; let field_value = self.selection_field_value(&candidate, &stock); if field_value < band_low || field_value > band_high { @@ -3235,14 +3356,14 @@ impl Strategy for PlatformExprStrategy { #[cfg(test)] mod tests { - use std::collections::BTreeMap; + use std::collections::{BTreeMap, BTreeSet}; use chrono::{NaiveDate, NaiveTime}; use super::{ PlatformExplicitActionStage, PlatformExplicitCancelKind, PlatformExplicitOrderKind, PlatformExprStrategy, PlatformExprStrategyConfig, PlatformRebalanceSchedule, - PlatformScheduleFrequency, PlatformTradeAction, + PlatformScheduleFrequency, PlatformTradeAction, PlatformUniverseActionKind, }; use crate::{ BenchmarkSnapshot, CandidateEligibility, DailyFactorSnapshot, DailyMarketSnapshot, DataSet, @@ -3401,6 +3522,7 @@ mod tests { ) .expect("dataset"); let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3408,6 +3530,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -3533,6 +3657,7 @@ mod tests { ) .expect("dataset"); let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3540,6 +3665,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -3616,6 +3743,7 @@ mod tests { ) .expect("dataset"); let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3623,10 +3751,13 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; let mut cfg = PlatformExprStrategyConfig::microcap_rotation(); + cfg.signal_symbol = "000001.SH".to_string(); cfg.rotation_enabled = false; cfg.benchmark_short_ma_days = 1; cfg.benchmark_long_ma_days = 1; @@ -3742,6 +3873,7 @@ mod tests { ) .expect("dataset"); let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3749,6 +3881,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -3856,6 +3990,7 @@ mod tests { ) .expect("dataset"); let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3863,6 +3998,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -3971,6 +4108,7 @@ mod tests { limit_price: 10.2, reason: "pending_limit_sell".to_string(), }]; + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -3978,6 +4116,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &open_orders, + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -4092,6 +4232,7 @@ mod tests { reason: "pending_limit_sell".to_string(), }, ]; + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -4099,6 +4240,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &open_orders, + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }; @@ -4127,6 +4270,128 @@ mod tests { } } + #[test] + fn platform_strategy_emits_universe_management_actions() { + let date = d(2025, 2, 3); + let data = DataSet::from_components( + vec![Instrument { + symbol: "000001.SZ".to_string(), + name: "Ping An Bank".to_string(), + board: "SZSE".to_string(), + round_lot: 100, + listed_at: Some(d(2010, 1, 1)), + delisted_at: None, + status: "active".to_string(), + }], + vec![DailyMarketSnapshot { + date, + symbol: "000001.SZ".to_string(), + timestamp: Some("10:18:00".to_string()), + day_open: 10.0, + open: 10.0, + high: 10.2, + low: 9.9, + close: 10.1, + last_price: 10.05, + bid1: 10.04, + ask1: 10.05, + prev_close: 9.95, + volume: 1_000_000, + tick_volume: 5_000, + bid1_volume: 1_000, + ask1_volume: 1_000, + trading_phase: Some("continuous".to_string()), + paused: false, + upper_limit: 10.94, + lower_limit: 8.96, + price_tick: 0.01, + }], + vec![DailyFactorSnapshot { + date, + symbol: "000001.SZ".to_string(), + market_cap_bn: 12.0, + free_float_cap_bn: 10.0, + pe_ttm: 8.0, + turnover_ratio: Some(22.0), + effective_turnover_ratio: Some(18.0), + extra_factors: BTreeMap::new(), + }], + vec![CandidateEligibility { + date, + symbol: "000001.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }], + vec![BenchmarkSnapshot { + date, + benchmark: "000852.SH".to_string(), + open: 1000.0, + close: 1002.0, + prev_close: 998.0, + volume: 1_000_000, + }], + ) + .expect("dataset"); + let portfolio = PortfolioState::new(1_000_000.0); + let subscriptions = BTreeSet::from(["000001.SZ".to_string()]); + let ctx = StrategyContext { + execution_date: date, + decision_date: date, + decision_index: 0, + data: &data, + portfolio: &portfolio, + open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, + process_events: &[], + active_process_event: None, + }; + let mut cfg = PlatformExprStrategyConfig::microcap_rotation(); + cfg.signal_symbol = "000001.SZ".to_string(); + cfg.rotation_enabled = false; + cfg.benchmark_short_ma_days = 1; + cfg.benchmark_long_ma_days = 1; + cfg.explicit_actions = vec![ + PlatformTradeAction::Universe { + kind: PlatformUniverseActionKind::UpdateUniverse, + symbols_expr: "[\"000001.SZ\"]".to_string(), + when_expr: Some("subscription_count == 1 && has_subscriptions".to_string()), + reason: "dynamic_focus".to_string(), + }, + PlatformTradeAction::Universe { + kind: PlatformUniverseActionKind::Unsubscribe, + symbols_expr: "[\"000001.SZ\"]".to_string(), + when_expr: Some("has_subscriptions".to_string()), + reason: "drop_subscription".to_string(), + }, + ]; + let mut strategy = PlatformExprStrategy::new(cfg); + + let decision = strategy.on_day(&ctx).expect("platform decision"); + + assert_eq!(decision.order_intents.len(), 2); + match &decision.order_intents[0] { + crate::strategy::OrderIntent::UpdateUniverse { symbols, reason } => { + assert_eq!(reason, "dynamic_focus"); + assert!(symbols.contains("000001.SZ")); + } + other => panic!("unexpected universe update intent: {other:?}"), + } + match &decision.order_intents[1] { + crate::strategy::OrderIntent::Unsubscribe { symbols, reason } => { + assert_eq!(reason, "drop_subscription"); + assert_eq!(symbols.len(), 1); + assert!(symbols.contains("000001.SZ")); + } + other => panic!("unexpected unsubscribe intent: {other:?}"), + } + } + #[test] fn platform_strategy_exposes_process_event_runtime_fields() { let date = d(2025, 2, 3); @@ -4203,6 +4468,7 @@ mod tests { side: Some(crate::OrderSide::Buy), detail: "open at or above upper limit".to_string(), }]; + let subscriptions = BTreeSet::new(); let ctx = StrategyContext { execution_date: date, decision_date: date, @@ -4210,6 +4476,8 @@ mod tests { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &process_events, active_process_event: None, }; diff --git a/crates/fidc-core/src/strategy.rs b/crates/fidc-core/src/strategy.rs index edcb4f7..daabb12 100644 --- a/crates/fidc-core/src/strategy.rs +++ b/crates/fidc-core/src/strategy.rs @@ -70,6 +70,8 @@ pub struct StrategyContext<'a> { pub data: &'a DataSet, pub portfolio: &'a PortfolioState, pub open_orders: &'a [OpenOrderView], + pub dynamic_universe: Option<&'a BTreeSet>, + pub subscriptions: &'a BTreeSet, pub process_events: &'a [ProcessEvent], pub active_process_event: Option<&'a ProcessEvent>, } @@ -157,6 +159,47 @@ impl StrategyContext<'_> { raw_sellable_qty.saturating_sub(self.symbol_open_sell_quantity(symbol)) } + pub fn has_dynamic_universe(&self) -> bool { + self.dynamic_universe + .is_some_and(|symbols| !symbols.is_empty()) + } + + pub fn dynamic_universe_count(&self) -> usize { + self.dynamic_universe.map_or(0, BTreeSet::len) + } + + pub fn dynamic_universe_contains(&self, symbol: &str) -> bool { + self.dynamic_universe + .is_some_and(|symbols| symbols.contains(symbol)) + } + + pub fn eligible_universe_on( + &self, + date: NaiveDate, + ) -> Vec { + let eligible = self.data.eligible_universe_on(date); + match self.dynamic_universe { + Some(symbols) if !symbols.is_empty() => eligible + .iter() + .filter(|row| symbols.contains(&row.symbol)) + .cloned() + .collect(), + _ => eligible.to_vec(), + } + } + + pub fn has_subscriptions(&self) -> bool { + !self.subscriptions.is_empty() + } + + pub fn subscription_count(&self) -> usize { + self.subscriptions.len() + } + + pub fn is_subscribed(&self, symbol: &str) -> bool { + self.subscriptions.contains(symbol) + } + pub fn has_process_events(&self) -> bool { !self.process_events.is_empty() || self.active_process_event.is_some() } @@ -381,6 +424,18 @@ pub enum OrderIntent { CancelAll { reason: String, }, + UpdateUniverse { + symbols: BTreeSet, + reason: String, + }, + Subscribe { + symbols: BTreeSet, + reason: String, + }, + Unsubscribe { + symbols: BTreeSet, + reason: String, + }, } #[derive(Debug, Clone)] @@ -696,6 +751,7 @@ impl Strategy for CnSmallCapRotationStrategy { benchmark, reference_level: signal_level, data: ctx.data, + dynamic_universe: ctx.dynamic_universe, }); let before_ma_count = selected_before_ma.len(); let mut ma_rejects = Vec::new(); @@ -1576,6 +1632,13 @@ impl JqMicroCapStrategy { if !selected_set.insert(symbol.clone()) { continue; } + if ctx.has_dynamic_universe() && !ctx.dynamic_universe_contains(symbol) { + selected_set.remove(symbol); + if diagnostics.len() < 14 { + diagnostics.push(format!("truth {} rejected by dynamic_universe", symbol)); + } + continue; + } if let Some(reason) = self.buy_rejection_reason(ctx, date, symbol)? { selected_set.remove(symbol); if diagnostics.len() < 14 { @@ -1588,8 +1651,8 @@ impl JqMicroCapStrategy { } if selected.len() < self.config.stocknum { - let universe = ctx.data.eligible_universe_on(date); - let start = lower_bound_eligible(universe, band_low); + let universe = ctx.eligible_universe_on(date); + let start = lower_bound_eligible(&universe, band_low); for candidate in universe.iter().skip(start) { if candidate.market_cap_bn > band_high { break; @@ -1623,10 +1686,10 @@ impl JqMicroCapStrategy { return Ok((selected, diagnostics)); } - let universe = ctx.data.eligible_universe_on(date); + let universe = ctx.eligible_universe_on(date); let mut diagnostics = Vec::new(); let mut selected = Vec::new(); - let start = lower_bound_eligible(universe, band_low); + let start = lower_bound_eligible(&universe, band_low); for candidate in universe.iter().skip(start) { if candidate.market_cap_bn > band_high { diff --git a/crates/fidc-core/src/strategy_ai.rs b/crates/fidc-core/src/strategy_ai.rs index b3676f0..280d7a3 100644 --- a/crates/fidc-core/src/strategy_ai.rs +++ b/crates/fidc-core/src/strategy_ai.rs @@ -119,8 +119,8 @@ pub fn built_in_strategy_manual() -> StrategyAiManual { detail: "设置撮合模式和滑点。支持 execution.matching_type(\"next_tick_last\" | \"next_tick_best_own\" | \"next_tick_best_counterparty\" | \"counterparty_offer\" | \"vwap\" | \"current_bar_close\" | \"next_bar_open\" | \"open_auction\")。其中 next_tick_last 使用 tick 的 last_price;next_tick_best_own / next_tick_best_counterparty 会按 L1 买一卖一近似 rqalpha 的 tick 最优价语义,counterparty_offer 当前也按 L1 对手方报价近似实现;vwap 会在盘中执行价链路上聚合多笔成交为单条 VWAP 成交;open_auction 使用当日集合竞价开盘价 day_open 进行撮合,且不额外施加滑点,并按竞价成交量而不是盘口一档流动性限制成交;滑点支持 execution.slippage(\"none\") / execution.slippage(\"price_ratio\", 0.001) / execution.slippage(\"tick_size\", 1) / execution.slippage(\"limit_price\"),其中 limit_price 会在限价单成交时按挂单价模拟 rqalpha 的最坏成交价。".to_string(), }, ManualSection { - title: "trading.rotation / order.* / cancel.*".to_string(), - detail: "支持显式下单和撤单。可以用 trading.rotation(false) 关闭默认轮动链路,再用 trading.stage(\"open_auction\" | \"on_day\") 指定执行阶段,用 trading.schedule.daily().at([\"10:18\"]) / trading.schedule.weekly(weekday=5).at([\"10:18\"]) / trading.schedule.weekly(tradingday=-1).at([\"10:18\"]) / trading.schedule.monthly(tradingday=1).at([\"10:18\"]) 指定触发频率和分钟级 time_rule,然后写 order.shares(\"600000.SH\", 1000)、order.target_shares(\"600000.SH\", 2000)、order.value(\"600000.SH\", cash * 0.25)、order.target_percent(\"600000.SH\", 0.05)、order.limit_value(\"600000.SH\", cash * 0.25, open * 0.99)、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices={\"600000.SH\": open * 0.99}, valuation_prices={\"600000.SH\": prev_close})、cancel.order(12345)、cancel.symbol(\"600000.SH\")、cancel.all()。其中 order.target_shares(...) 对应 rqalpha 的 order_to,order.target_portfolio_smart(...) 对应 rqalpha 的 order_target_portfolio_smart 批量目标权重语义。symbol 使用标准证券代码;数量、金额、仓位、限价和 order_id 都支持表达式;这些语句也支持放进 when/unless 条件块。".to_string(), + title: "trading.rotation / order.* / cancel.* / update_universe / subscribe".to_string(), + detail: "支持显式下单、撤单和动态 universe 管理。可以用 trading.rotation(false) 关闭默认轮动链路,再用 trading.stage(\"open_auction\" | \"on_day\") 指定执行阶段,用 trading.schedule.daily().at([\"10:18\"]) / trading.schedule.weekly(weekday=5).at([\"10:18\"]) / trading.schedule.weekly(tradingday=-1).at([\"10:18\"]) / trading.schedule.monthly(tradingday=1).at([\"10:18\"]) 指定触发频率和分钟级 time_rule,然后写 order.shares(\"600000.SH\", 1000)、order.target_shares(\"600000.SH\", 2000)、order.value(\"600000.SH\", cash * 0.25)、order.target_percent(\"600000.SH\", 0.05)、order.limit_value(\"600000.SH\", cash * 0.25, open * 0.99)、order.target_portfolio_smart(weights={\"600000.SH\": 0.3, \"000001.SZ\": 0.2}, order_prices={\"600000.SH\": open * 0.99}, valuation_prices={\"600000.SH\": prev_close})、cancel.order(12345)、cancel.symbol(\"600000.SH\")、cancel.all()、update_universe([\"600000.SH\", \"000001.SZ\"])、subscribe([\"000001.SZ\"])、unsubscribe([\"000001.SZ\"])。其中 order.target_shares(...) 对应 rqalpha 的 order_to,order.target_portfolio_smart(...) 对应 rqalpha 的 order_target_portfolio_smart 批量目标权重语义,而 update_universe/subscribe/unsubscribe 对应 rqalpha 的动态 universe 与订阅接口。symbol 使用标准证券代码;数量、金额、仓位、限价、order_id 和 symbol 列表都支持表达式;这些语句也支持放进 when/unless 条件块。".to_string(), }, ManualSection { title: "when / unless / else".to_string(), @@ -140,6 +140,8 @@ pub fn built_in_strategy_manual() -> StrategyAiManual { ManualField { name: "position_count/max_positions/refresh_rate".to_string(), field_type: "int".to_string(), detail: "仓位计数与调仓周期。".to_string() }, ManualField { name: "has_open_orders/open_order_count/open_buy_order_count/open_sell_order_count".to_string(), field_type: "bool/int".to_string(), detail: "当前阶段挂单簿摘要。".to_string() }, ManualField { name: "open_buy_qty/open_sell_qty/latest_open_order_id".to_string(), field_type: "int".to_string(), detail: "当前阶段未成交买卖挂单的剩余数量汇总,以及最近一笔挂单 id。".to_string() }, + ManualField { name: "has_dynamic_universe/dynamic_universe_count".to_string(), field_type: "bool/int".to_string(), detail: "当前策略上下文是否存在动态 universe,以及动态 universe 内证券数量。".to_string() }, + ManualField { name: "has_subscriptions/subscription_count".to_string(), field_type: "bool/int".to_string(), detail: "当前订阅集合是否为空,以及订阅证券数量。".to_string() }, ManualField { name: "has_process_events/process_event_count/process_event_counts".to_string(), field_type: "bool/int/map".to_string(), detail: "当前阶段可见的过程事件摘要;process_event_counts[\"trade\"] 这类写法可直接读取当天事件计数。".to_string() }, ManualField { name: "current_process_kind/current_process_order_id/current_process_symbol/current_process_side/current_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前正在回调的过程事件上下文;没有活动事件时为空字符串或 0。".to_string() }, ManualField { name: "latest_process_kind/latest_process_order_id/latest_process_symbol/latest_process_side/latest_process_detail".to_string(), field_type: "string/int".to_string(), detail: "当前阶段最近一条过程事件的摘要,可用于让 on_day/open_auction 逻辑响应 earlier lifecycle 或订单事件。".to_string() }, @@ -160,6 +162,7 @@ pub fn built_in_strategy_manual() -> StrategyAiManual { ManualField { name: "allow_buy/allow_sell/at_upper_limit/at_lower_limit".to_string(), field_type: "bool".to_string(), detail: "盘中买卖与涨跌停状态。".to_string() }, ManualField { name: "touched_upper_limit/touched_lower_limit/hit_upper_limit/hit_lower_limit".to_string(), field_type: "bool".to_string(), detail: "当日 tick 曾经触达涨跌停。".to_string() }, ManualField { name: "symbol_open_order_count/symbol_open_buy_qty/symbol_open_sell_qty/latest_symbol_open_order_id".to_string(), field_type: "int".to_string(), detail: "当前证券在挂单簿中的未成交挂单摘要和最近挂单 id。".to_string() }, + ManualField { name: "in_dynamic_universe/is_subscribed".to_string(), field_type: "bool".to_string(), detail: "当前证券是否在动态 universe 内,以及是否仍在订阅集合中。".to_string() }, ManualField { name: "stock_ma5/stock_ma10/stock_ma20/stock_ma30".to_string(), field_type: "float".to_string(), detail: "个股价格均线内建别名。只内建这几个窗口;15 日、45 日等任意窗口请改用 sma(\"close\", n)。".to_string() }, ManualField { name: "stock_volume_ma5/stock_volume_ma10/stock_volume_ma20/stock_volume_ma60".to_string(), field_type: "float".to_string(), detail: "个股成交量均线内建别名。只内建这几个窗口;任意窗口请改用 rolling_mean(\"volume\", n)。".to_string() }, ManualField { name: "listed_days".to_string(), field_type: "int".to_string(), detail: "上市天数。".to_string() }, @@ -219,6 +222,10 @@ pub fn built_in_strategy_manual() -> StrategyAiManual { title: "next tick 撮合 + tick 滑点".to_string(), code: "execution.matching_type(\"next_tick_last\")\nexecution.slippage(\"tick_size\", 1)".to_string(), }, + ManualExample { + title: "动态 universe 和订阅".to_string(), + code: "when(!has_dynamic_universe) { update_universe([\"000001.SZ\", \"000002.SZ\"]) }\nwhen(subscription_count == 0) { subscribe([\"000001.SZ\"]) }".to_string(), + }, ManualExample { title: "显式下单并关闭默认轮动".to_string(), code: "trading.rotation(false)\norder.value(\"600000.SH\", cash * 0.25, \"manual_entry\")\ncancel.symbol(\"600000.SH\", \"manual_cancel\")".to_string(), diff --git a/crates/fidc-core/src/universe.rs b/crates/fidc-core/src/universe.rs index 3d7c5dd..ed19de0 100644 --- a/crates/fidc-core/src/universe.rs +++ b/crates/fidc-core/src/universe.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use chrono::NaiveDate; use serde::Serialize; @@ -44,6 +46,21 @@ pub struct SelectionContext<'a> { pub benchmark: &'a BenchmarkSnapshot, pub reference_level: f64, pub data: &'a DataSet, + pub dynamic_universe: Option<&'a BTreeSet>, +} + +impl SelectionContext<'_> { + fn eligible_universe(&self) -> Vec { + let eligible = self.data.eligible_universe_on(self.decision_date); + match self.dynamic_universe { + Some(symbols) if !symbols.is_empty() => eligible + .iter() + .filter(|row| symbols.contains(&row.symbol)) + .cloned() + .collect(), + _ => eligible.to_vec(), + } + } } pub trait UniverseSelector { @@ -132,12 +149,10 @@ impl UniverseSelector for DynamicMarketCapBandSelector { }; diagnostics.factor_total = ctx.data.factor_snapshots_on(ctx.decision_date).len(); - diagnostics.market_cap_missing_count = diagnostics - .factor_total - .saturating_sub(ctx.data.eligible_universe_on(ctx.decision_date).len()); - - let eligible = ctx.data.eligible_universe_on(ctx.decision_date); - let start_idx = lower_bound_by_market_cap(eligible, min_cap); + let eligible = ctx.eligible_universe(); + diagnostics.market_cap_missing_count = + diagnostics.factor_total.saturating_sub(eligible.len()); + let start_idx = lower_bound_by_market_cap(&eligible, min_cap); let mut selected = Vec::new(); for factor in eligible.iter().skip(start_idx) { diff --git a/crates/fidc-core/tests/engine_hooks.rs b/crates/fidc-core/tests/engine_hooks.rs index f635dc6..5d2d8e1 100644 --- a/crates/fidc-core/tests/engine_hooks.rs +++ b/crates/fidc-core/tests/engine_hooks.rs @@ -129,6 +129,10 @@ struct LimitCarryStrategy { issued: bool, } +struct UniverseDirectiveStrategy { + snapshots: Rc>>, +} + impl Strategy for ScheduledProbeStrategy { fn name(&self) -> &str { "scheduled-probe" @@ -233,6 +237,56 @@ impl Strategy for ProcessContextProbeStrategy { } } +impl Strategy for UniverseDirectiveStrategy { + fn name(&self) -> &str { + "universe-directive-probe" + } + + fn on_day( + &mut self, + ctx: &StrategyContext<'_>, + ) -> Result { + let eligible = ctx + .eligible_universe_on(ctx.execution_date) + .into_iter() + .map(|row| row.symbol) + .collect::>() + .join(","); + self.snapshots.borrow_mut().push(format!( + "{}:{}:{}:{}", + ctx.execution_date, + ctx.dynamic_universe_count(), + ctx.subscription_count(), + eligible + )); + let order_intents = match ctx.execution_date { + date if date == d(2025, 1, 2) => vec![ + OrderIntent::UpdateUniverse { + symbols: BTreeSet::from(["000002.SZ".to_string()]), + reason: "focus_single_symbol".to_string(), + }, + OrderIntent::Subscribe { + symbols: BTreeSet::from(["000001.SZ".to_string()]), + reason: "subscribe_probe".to_string(), + }, + ], + date if date == d(2025, 1, 3) => vec![OrderIntent::Unsubscribe { + symbols: BTreeSet::from(["000001.SZ".to_string()]), + reason: "unsubscribe_probe".to_string(), + }], + _ => Vec::new(), + }; + Ok(StrategyDecision { + rebalance: false, + target_weights: BTreeMap::new(), + exit_symbols: BTreeSet::new(), + order_intents, + notes: Vec::new(), + diagnostics: Vec::new(), + }) + } +} + #[test] fn engine_runs_strategy_hooks_in_daily_order() { let date1 = d(2025, 1, 2); @@ -1173,6 +1227,204 @@ fn engine_dispatches_process_events_to_external_bus_listeners() { ); } +#[test] +fn engine_applies_dynamic_universe_and_subscription_directives() { + let dates = [d(2025, 1, 2), d(2025, 1, 3), d(2025, 1, 6)]; + let snapshots = Rc::new(RefCell::new(Vec::new())); + let strategy = UniverseDirectiveStrategy { + snapshots: snapshots.clone(), + }; + let instruments = vec![ + Instrument { + symbol: "000001.SZ".to_string(), + name: "One".to_string(), + board: "SZ".to_string(), + round_lot: 100, + listed_at: Some(d(2020, 1, 1)), + delisted_at: None, + status: "active".to_string(), + }, + Instrument { + symbol: "000002.SZ".to_string(), + name: "Two".to_string(), + board: "SZ".to_string(), + round_lot: 100, + listed_at: Some(d(2020, 1, 1)), + delisted_at: None, + status: "active".to_string(), + }, + ]; + let markets = dates + .iter() + .flat_map(|date| { + [ + DailyMarketSnapshot { + date: *date, + symbol: "000001.SZ".to_string(), + timestamp: Some(format!("{date} 10:18:00")), + day_open: 10.0, + open: 10.0, + high: 10.1, + low: 9.9, + close: 10.0, + last_price: 10.0, + bid1: 9.99, + ask1: 10.01, + prev_close: 9.95, + volume: 100_000, + tick_volume: 5_000, + bid1_volume: 2_000, + ask1_volume: 2_000, + trading_phase: Some("continuous".to_string()), + paused: false, + upper_limit: 11.0, + lower_limit: 9.0, + price_tick: 0.01, + }, + DailyMarketSnapshot { + date: *date, + symbol: "000002.SZ".to_string(), + timestamp: Some(format!("{date} 10:18:00")), + day_open: 20.0, + open: 20.0, + high: 20.1, + low: 19.9, + close: 20.0, + last_price: 20.0, + bid1: 19.99, + ask1: 20.01, + prev_close: 19.95, + volume: 100_000, + tick_volume: 5_000, + bid1_volume: 2_000, + ask1_volume: 2_000, + trading_phase: Some("continuous".to_string()), + paused: false, + upper_limit: 22.0, + lower_limit: 18.0, + price_tick: 0.01, + }, + ] + }) + .collect::>(); + let factors = dates + .iter() + .flat_map(|date| { + [ + DailyFactorSnapshot { + date: *date, + symbol: "000001.SZ".to_string(), + market_cap_bn: 10.0, + free_float_cap_bn: 8.0, + pe_ttm: 10.0, + turnover_ratio: Some(1.0), + effective_turnover_ratio: Some(1.0), + extra_factors: BTreeMap::new(), + }, + DailyFactorSnapshot { + date: *date, + symbol: "000002.SZ".to_string(), + market_cap_bn: 12.0, + free_float_cap_bn: 10.0, + pe_ttm: 12.0, + turnover_ratio: Some(1.0), + effective_turnover_ratio: Some(1.0), + extra_factors: BTreeMap::new(), + }, + ] + }) + .collect::>(); + let candidates = dates + .iter() + .flat_map(|date| { + [ + CandidateEligibility { + date: *date, + symbol: "000001.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }, + CandidateEligibility { + date: *date, + symbol: "000002.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }, + ] + }) + .collect::>(); + let benchmarks = dates + .iter() + .map(|date| BenchmarkSnapshot { + date: *date, + benchmark: "000852.SH".to_string(), + open: 1000.0, + close: 1000.0, + prev_close: 999.0, + volume: 100_000, + }) + .collect::>(); + let data = DataSet::from_components(instruments, markets, factors, candidates, benchmarks) + .expect("dataset"); + let broker = BrokerSimulator::new_with_execution_price( + ChinaAShareCostModel::default(), + ChinaEquityRuleHooks::default(), + PriceField::Open, + ); + let mut engine = BacktestEngine::new( + data, + strategy, + broker, + BacktestConfig { + initial_cash: 1_000_000.0, + benchmark_code: "000852.SH".to_string(), + start_date: Some(dates[0]), + end_date: Some(dates[2]), + decision_lag_trading_days: 0, + execution_price_field: PriceField::Open, + }, + ); + + let result = engine.run().expect("backtest result"); + + assert_eq!( + snapshots.borrow().as_slice(), + &[ + "2025-01-02:0:0:000001.SZ,000002.SZ", + "2025-01-03:1:1:000002.SZ", + "2025-01-06:1:0:000002.SZ", + ] + ); + assert!( + result + .process_events + .iter() + .any(|event| event.kind == ProcessEventKind::UniverseUpdated) + ); + assert!( + result + .process_events + .iter() + .any(|event| event.kind == ProcessEventKind::UniverseSubscribed) + ); + assert!( + result + .process_events + .iter() + .any(|event| event.kind == ProcessEventKind::UniverseUnsubscribed) + ); +} + #[test] fn engine_exposes_current_process_context_to_strategies() { let date = d(2025, 1, 2); diff --git a/crates/fidc-core/tests/strategy_selection.rs b/crates/fidc-core/tests/strategy_selection.rs index 2fc2706..2e8730d 100644 --- a/crates/fidc-core/tests/strategy_selection.rs +++ b/crates/fidc-core/tests/strategy_selection.rs @@ -3,6 +3,7 @@ use fidc_core::{ CnSmallCapRotationConfig, CnSmallCapRotationStrategy, DataSet, JqMicroCapConfig, JqMicroCapStrategy, PortfolioState, Strategy, StrategyContext, }; +use std::collections::BTreeSet; use std::path::PathBuf; #[test] @@ -17,6 +18,7 @@ fn strategy_emits_target_weights_and_diagnostics() { cfg.short_ma_days = 3; cfg.long_ma_days = 5; let mut strategy = CnSmallCapRotationStrategy::new(cfg); + let subscriptions = BTreeSet::new(); let decision = strategy .on_day(&StrategyContext { @@ -26,6 +28,8 @@ fn strategy_emits_target_weights_and_diagnostics() { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }) @@ -57,6 +61,7 @@ fn jq_strategy_emits_same_day_decision() { cfg.stock_mid_ma_days = 4; cfg.stock_long_ma_days = 5; let mut strategy = JqMicroCapStrategy::new(cfg); + let subscriptions = BTreeSet::new(); let decision = strategy .on_day(&StrategyContext { @@ -66,6 +71,8 @@ fn jq_strategy_emits_same_day_decision() { data: &data, portfolio: &portfolio, open_orders: &[], + dynamic_universe: None, + subscriptions: &subscriptions, process_events: &[], active_process_event: None, }) diff --git a/docs/rqalpha-gap-roadmap.md b/docs/rqalpha-gap-roadmap.md index 0be3b9f..8a6a26c 100644 --- a/docs/rqalpha-gap-roadmap.md +++ b/docs/rqalpha-gap-roadmap.md @@ -28,9 +28,9 @@ current alignment pass. ### Phase 3: Universe and subscription model -- [ ] `update_universe` -- [ ] `subscribe` -- [ ] `unsubscribe` +- [x] `update_universe` +- [x] `subscribe` +- [x] `unsubscribe` - [ ] tick-frequency subscription guards exposed at strategy API level ### Phase 4: Algo order parity @@ -57,4 +57,4 @@ current alignment pass. ## Current Step -Active implementation target: Phase 3, dynamic universe and subscription model. +Active implementation target: Phase 4, algo-order styles.