Add dynamic universe and subscription controls

This commit is contained in:
boris
2026-04-23 07:12:56 -07:00
parent 5265f82fef
commit 152b5c3141
12 changed files with 963 additions and 24 deletions

View File

@@ -1,3 +1,5 @@
use std::collections::BTreeSet;
use chrono::NaiveDate;
use serde::Serialize;
use thiserror::Error;
@@ -97,6 +99,8 @@ pub struct BacktestEngine<S, C, R> {
config: BacktestConfig,
dividend_reinvestment: bool,
process_event_bus: ProcessEventBus,
dynamic_universe: Option<BTreeSet<String>>,
subscriptions: BTreeSet<String>,
}
impl<S, C, R> BacktestEngine<S, C, R> {
@@ -113,6 +117,8 @@ impl<S, C, R> BacktestEngine<S, C, R> {
config,
dividend_reinvestment: false,
process_event_bus: ProcessEventBus::new(),
dynamic_universe: None,
subscriptions: BTreeSet::new(),
}
}
@@ -146,6 +152,143 @@ where
C: CostModel,
R: EquityRuleHooks,
{
fn apply_strategy_directives(
&mut self,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
process_events: &mut Vec<ProcessEvent>,
decision: &mut crate::strategy::StrategyDecision,
) -> Result<(), BacktestError> {
if decision.order_intents.is_empty() {
return Ok(());
}
let mut retained = Vec::with_capacity(decision.order_intents.len());
for intent in decision.order_intents.drain(..) {
match intent {
crate::strategy::OrderIntent::UpdateUniverse { symbols, reason } => {
let symbol_count = symbols.len();
self.dynamic_universe = Some(symbols.clone());
decision
.diagnostics
.push(format!("dynamic_universe_updated count={symbol_count}"));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseUpdated,
order_id: None,
symbol: (symbol_count == 1)
.then(|| symbols.iter().next().cloned())
.flatten(),
side: None,
detail: format!(
"reason={reason} count={symbol_count} symbols={}",
symbols.iter().cloned().collect::<Vec<_>>().join(",")
),
},
)?;
}
crate::strategy::OrderIntent::Subscribe { symbols, reason } => {
let mut added = Vec::new();
for symbol in symbols {
if self.subscriptions.insert(symbol.clone()) {
added.push(symbol);
}
}
if !added.is_empty() {
decision.diagnostics.push(format!(
"subscriptions_added count={} total={}",
added.len(),
self.subscriptions.len()
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseSubscribed,
order_id: None,
symbol: (added.len() == 1).then(|| added[0].clone()),
side: None,
detail: format!(
"reason={reason} count={} symbols={}",
added.len(),
added.join(",")
),
},
)?;
}
}
crate::strategy::OrderIntent::Unsubscribe { symbols, reason } => {
let mut removed = Vec::new();
for symbol in symbols {
if self.subscriptions.remove(&symbol) {
removed.push(symbol);
}
}
if !removed.is_empty() {
decision.diagnostics.push(format!(
"subscriptions_removed count={} total={}",
removed.len(),
self.subscriptions.len()
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseUnsubscribed,
order_id: None,
symbol: (removed.len() == 1).then(|| removed[0].clone()),
side: None,
detail: format!(
"reason={reason} count={} symbols={}",
removed.len(),
removed.join(",")
),
},
)?;
}
}
other => retained.push(other),
}
}
decision.order_intents = retained;
Ok(())
}
pub fn run(&mut self) -> Result<BacktestResult, BacktestError> {
self.run_with_progress(|_| {})
}
@@ -245,6 +388,8 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreBeforeTrading,
@@ -257,6 +402,8 @@ where
data: &self.data,
portfolio: &portfolio,
open_orders: &pre_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})?;
@@ -269,12 +416,14 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::BeforeTrading,
"before_trading",
)?;
let _ = collect_scheduled_decisions(
let mut before_trading_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
@@ -285,10 +434,21 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::BeforeTrading),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&pre_open_orders,
&mut process_events,
&mut before_trading_decision,
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
@@ -298,6 +458,8 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostBeforeTrading,
@@ -312,6 +474,8 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreOpenAuction,
@@ -328,6 +492,8 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::OpenAuction),
@@ -339,6 +505,8 @@ where
data: &self.data,
portfolio: &portfolio,
open_orders: &pre_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})?);
@@ -351,11 +519,22 @@ where
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::OpenAuction,
"open_auction",
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&pre_open_orders,
&mut process_events,
&mut auction_decision,
)?;
let mut report = self.broker.execute(
execution_date,
&mut portfolio,
@@ -372,6 +551,8 @@ where
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut report.process_events,
)?;
@@ -384,6 +565,8 @@ where
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostOpenAuction,
@@ -399,6 +582,8 @@ where
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreOnDay,
@@ -414,6 +599,8 @@ where
data: &self.data,
portfolio: &portfolio,
open_orders: &on_day_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})
@@ -431,6 +618,8 @@ where
&self.data,
&portfolio,
&on_day_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::OnDay),
@@ -444,11 +633,22 @@ where
&self.data,
&portfolio,
&on_day_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::OnDay,
"on_day",
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&on_day_open_orders,
&mut process_events,
&mut decision,
)?;
let mut intraday_report =
self.broker
@@ -463,6 +663,8 @@ where
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut intraday_report.process_events,
)?;
@@ -482,6 +684,8 @@ where
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostOnDay,
@@ -500,6 +704,8 @@ where
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreAfterTrading,
@@ -512,6 +718,8 @@ where
data: &self.data,
portfolio: &portfolio,
open_orders: &post_trade_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})?;
@@ -524,12 +732,14 @@ where
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::AfterTrading,
"after_trading",
)?;
let _ = collect_scheduled_decisions(
let mut after_trading_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
@@ -540,10 +750,21 @@ where
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::AfterTrading),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&post_trade_open_orders,
&mut process_events,
&mut after_trading_decision,
)?;
let mut close_report = self.broker.after_trading(execution_date);
publish_process_events(
&mut self.strategy,
@@ -554,6 +775,8 @@ where
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut close_report.process_events,
)?;
@@ -572,6 +795,8 @@ where
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostAfterTrading,
@@ -586,6 +811,8 @@ where
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreSettlement,
@@ -598,6 +825,8 @@ where
data: &self.data,
portfolio: &portfolio,
open_orders: &post_close_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})?;
@@ -610,12 +839,14 @@ where
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Settlement,
"settlement",
)?;
let _ = collect_scheduled_decisions(
let mut settlement_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
@@ -626,10 +857,21 @@ where
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::Settlement),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&post_close_open_orders,
&mut process_events,
&mut settlement_decision,
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
@@ -639,6 +881,8 @@ where
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostSettlement,
@@ -1139,6 +1383,8 @@ fn collect_scheduled_decisions<S: Strategy>(
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
process_events: &mut Vec<ProcessEvent>,
process_event_bus: &mut ProcessEventBus,
current_time: Option<chrono::NaiveTime>,
@@ -1154,6 +1400,8 @@ fn collect_scheduled_decisions<S: Strategy>(
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
execution_date,
ProcessEventKind::PreScheduled,
@@ -1167,6 +1415,8 @@ fn collect_scheduled_decisions<S: Strategy>(
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events: process_events.as_slice(),
active_process_event: None,
},
@@ -1181,6 +1431,8 @@ fn collect_scheduled_decisions<S: Strategy>(
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
execution_date,
ProcessEventKind::PostScheduled,
@@ -1199,6 +1451,8 @@ fn publish_phase_event<S: Strategy>(
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
events: &mut Vec<ProcessEvent>,
date: NaiveDate,
kind: ProcessEventKind,
@@ -1221,6 +1475,8 @@ fn publish_phase_event<S: Strategy>(
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
};
@@ -1238,6 +1494,8 @@ fn publish_process_events<S: Strategy>(
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
target: &mut Vec<ProcessEvent>,
incoming: &mut Vec<ProcessEvent>,
) -> Result<(), BacktestError> {
@@ -1251,6 +1509,8 @@ fn publish_process_events<S: Strategy>(
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
};
@@ -1260,6 +1520,39 @@ fn publish_process_events<S: Strategy>(
Ok(())
}
fn publish_custom_process_event<S: Strategy>(
strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
target: &mut Vec<ProcessEvent>,
event: ProcessEvent,
) -> Result<(), BacktestError> {
process_event_bus.publish(&event);
let process_events = target.as_slice();
let event_ctx = StrategyContext {
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
};
strategy.on_process_event(&event_ctx, &event)?;
target.push(event);
Ok(())
}
fn stage_label(stage: ScheduleStage) -> &'static str {
match stage {
ScheduleStage::BeforeTrading => "before_trading",