Add rqalpha-style scheduler primitives

This commit is contained in:
boris
2026-04-23 01:49:24 -07:00
parent 452eb3324d
commit e5fe1f0432
5 changed files with 533 additions and 3 deletions

View File

@@ -9,6 +9,7 @@ use crate::events::{AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus,
use crate::metrics::{BacktestMetrics, compute_backtest_metrics};
use crate::portfolio::{CashReceivable, HoldingSummary, PortfolioState};
use crate::rules::EquityRuleHooks;
use crate::scheduler::{ScheduleRule, ScheduleStage, Scheduler};
use crate::strategy::{Strategy, StrategyContext};
#[derive(Debug, Error)]
@@ -124,6 +125,8 @@ where
F: FnMut(&BacktestDayProgress),
{
let mut portfolio = PortfolioState::new(self.config.initial_cash);
let scheduler_calendar = self.data.calendar().clone();
let scheduler = Scheduler::new(&scheduler_calendar);
let execution_dates = self
.data
.calendar()
@@ -202,8 +205,17 @@ where
data: &self.data,
portfolio: &portfolio,
};
let schedule_rules = self.strategy.schedule_rules();
self.strategy.before_trading(&daily_context)?;
let auction_decision = self.strategy.open_auction(&daily_context)?;
let mut auction_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::OpenAuction,
&schedule_rules,
&daily_context,
)?;
auction_decision.merge_from(self.strategy.open_auction(&daily_context)?);
let mut report = self.broker.execute(
execution_date,
&mut portfolio,
@@ -211,7 +223,7 @@ where
&auction_decision,
)?;
let decision = decision_slot
let mut decision = decision_slot
.map(|(decision_idx, decision_date)| {
self.strategy.on_day(&StrategyContext {
execution_date,
@@ -223,6 +235,20 @@ where
})
.transpose()?
.unwrap_or_default();
decision.merge_from(collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::OnDay,
&schedule_rules,
&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
},
)?);
let intraday_report =
self.broker
@@ -635,6 +661,21 @@ where
}
}
fn collect_scheduled_decisions<S: Strategy>(
strategy: &mut S,
scheduler: &Scheduler<'_>,
execution_date: NaiveDate,
stage: ScheduleStage,
rules: &[ScheduleRule],
ctx: &StrategyContext<'_>,
) -> Result<crate::strategy::StrategyDecision, BacktestError> {
let mut combined = crate::strategy::StrategyDecision::default();
for rule in scheduler.triggered_rules(execution_date, stage, rules) {
combined.merge_from(strategy.on_scheduled(ctx, rule)?);
}
Ok(combined)
}
mod date_format {
use chrono::NaiveDate;
use serde::Serializer;

View File

@@ -9,6 +9,7 @@ pub mod metrics;
pub mod platform_expr_strategy;
pub mod portfolio;
pub mod rules;
pub mod scheduler;
pub mod strategy;
pub mod strategy_ai;
pub mod universe;
@@ -31,6 +32,7 @@ pub use metrics::{BacktestMetrics, compute_backtest_metrics};
pub use platform_expr_strategy::{PlatformExprStrategy, PlatformExprStrategyConfig};
pub use portfolio::{CashReceivable, HoldingSummary, PortfolioState, Position};
pub use rules::{ChinaEquityRuleHooks, EquityRuleHooks, RuleCheck};
pub use scheduler::{ScheduleFrequency, ScheduleRule, ScheduleStage, Scheduler};
pub use strategy::{
CnSmallCapRotationConfig, CnSmallCapRotationStrategy, JqMicroCapConfig, JqMicroCapStrategy,
OrderIntent, Strategy, StrategyContext, StrategyDecision,

View File

@@ -0,0 +1,201 @@
use chrono::{Datelike, NaiveDate};
use crate::calendar::TradingCalendar;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScheduleStage {
OpenAuction,
OnDay,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScheduleFrequency {
Daily,
Weekly {
weekday: Option<u32>,
tradingday: Option<i32>,
},
Monthly {
tradingday: i32,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScheduleRule {
pub name: String,
pub stage: ScheduleStage,
pub frequency: ScheduleFrequency,
}
impl ScheduleRule {
pub fn daily(name: impl Into<String>, stage: ScheduleStage) -> Self {
Self {
name: name.into(),
stage,
frequency: ScheduleFrequency::Daily,
}
}
pub fn weekly_by_weekday(name: impl Into<String>, weekday: u32, stage: ScheduleStage) -> Self {
Self {
name: name.into(),
stage,
frequency: ScheduleFrequency::Weekly {
weekday: Some(weekday),
tradingday: None,
},
}
}
pub fn weekly_by_tradingday(
name: impl Into<String>,
tradingday: i32,
stage: ScheduleStage,
) -> Self {
Self {
name: name.into(),
stage,
frequency: ScheduleFrequency::Weekly {
weekday: None,
tradingday: Some(tradingday),
},
}
}
pub fn monthly(name: impl Into<String>, tradingday: i32, stage: ScheduleStage) -> Self {
Self {
name: name.into(),
stage,
frequency: ScheduleFrequency::Monthly { tradingday },
}
}
}
pub struct Scheduler<'a> {
calendar: &'a TradingCalendar,
}
impl<'a> Scheduler<'a> {
pub fn new(calendar: &'a TradingCalendar) -> Self {
Self { calendar }
}
pub fn triggered_rules<'r>(
&self,
date: NaiveDate,
stage: ScheduleStage,
rules: &'r [ScheduleRule],
) -> Vec<&'r ScheduleRule> {
rules
.iter()
.filter(|rule| rule.stage == stage && self.matches(date, rule))
.collect()
}
fn matches(&self, date: NaiveDate, rule: &ScheduleRule) -> bool {
match &rule.frequency {
ScheduleFrequency::Daily => true,
ScheduleFrequency::Weekly {
weekday,
tradingday,
} => {
if let Some(weekday) = weekday {
return date.weekday().number_from_monday() == *weekday;
}
tradingday
.and_then(|nth| nth_date_in_period(&self.week_dates(date), nth))
.is_some_and(|matched| matched == date)
}
ScheduleFrequency::Monthly { tradingday } => {
nth_date_in_period(&self.month_dates(date), *tradingday)
.is_some_and(|matched| matched == date)
}
}
}
fn week_dates(&self, date: NaiveDate) -> Vec<NaiveDate> {
let iso = date.iso_week();
self.calendar
.iter()
.filter(|candidate| candidate.iso_week() == iso)
.collect()
}
fn month_dates(&self, date: NaiveDate) -> Vec<NaiveDate> {
self.calendar
.iter()
.filter(|candidate| {
candidate.year() == date.year() && candidate.month() == date.month()
})
.collect()
}
}
fn nth_date_in_period(period: &[NaiveDate], nth: i32) -> Option<NaiveDate> {
if nth == 0 || period.is_empty() {
return None;
}
if nth > 0 {
period.get((nth - 1) as usize).copied()
} else {
let idx = period.len().checked_sub((-nth) as usize)?;
period.get(idx).copied()
}
}
#[cfg(test)]
mod tests {
use chrono::NaiveDate;
use super::{ScheduleRule, ScheduleStage, Scheduler};
use crate::calendar::TradingCalendar;
fn d(year: i32, month: u32, day: u32) -> NaiveDate {
NaiveDate::from_ymd_opt(year, month, day).expect("valid date")
}
fn sample_calendar() -> TradingCalendar {
TradingCalendar::new(vec![
d(2025, 1, 30),
d(2025, 1, 31),
d(2025, 2, 3),
d(2025, 2, 4),
d(2025, 2, 7),
])
}
#[test]
fn scheduler_matches_daily_weekly_and_monthly_rules() {
let calendar = sample_calendar();
let scheduler = Scheduler::new(&calendar);
let rules = vec![
ScheduleRule::daily("daily", ScheduleStage::OnDay),
ScheduleRule::weekly_by_weekday("friday", 5, ScheduleStage::OnDay),
ScheduleRule::weekly_by_tradingday(
"last_trading_day_of_week",
-1,
ScheduleStage::OnDay,
),
ScheduleRule::monthly("first_trading_day_of_month", 1, ScheduleStage::OnDay),
];
let jan_31 = scheduler
.triggered_rules(d(2025, 1, 31), ScheduleStage::OnDay, &rules)
.into_iter()
.map(|rule| rule.name.as_str())
.collect::<Vec<_>>();
assert!(jan_31.contains(&"daily"));
assert!(jan_31.contains(&"friday"));
assert!(jan_31.contains(&"last_trading_day_of_week"));
assert!(!jan_31.contains(&"first_trading_day_of_month"));
let feb_3 = scheduler
.triggered_rules(d(2025, 2, 3), ScheduleStage::OnDay, &rules)
.into_iter()
.map(|rule| rule.name.as_str())
.collect::<Vec<_>>();
assert!(feb_3.contains(&"daily"));
assert!(feb_3.contains(&"first_trading_day_of_month"));
assert!(!feb_3.contains(&"friday"));
}
}

View File

@@ -11,10 +11,21 @@ use crate::data::{DataSet, PriceField};
use crate::engine::BacktestError;
use crate::events::OrderSide;
use crate::portfolio::PortfolioState;
use crate::scheduler::ScheduleRule;
use crate::universe::{DynamicMarketCapBandSelector, SelectionContext, UniverseSelector};
pub trait Strategy {
fn name(&self) -> &str;
fn schedule_rules(&self) -> Vec<ScheduleRule> {
Vec::new()
}
fn on_scheduled(
&mut self,
_ctx: &StrategyContext<'_>,
_rule: &ScheduleRule,
) -> Result<StrategyDecision, BacktestError> {
Ok(StrategyDecision::default())
}
fn before_trading(&mut self, _ctx: &StrategyContext<'_>) -> Result<(), BacktestError> {
Ok(())
}
@@ -51,6 +62,26 @@ pub struct StrategyDecision {
pub diagnostics: Vec<String>,
}
impl StrategyDecision {
pub fn merge_from(&mut self, mut other: StrategyDecision) {
self.rebalance |= other.rebalance;
self.target_weights.append(&mut other.target_weights);
self.exit_symbols.append(&mut other.exit_symbols);
self.order_intents.append(&mut other.order_intents);
self.notes.append(&mut other.notes);
self.diagnostics.append(&mut other.diagnostics);
}
pub fn is_empty(&self) -> bool {
!self.rebalance
&& self.target_weights.is_empty()
&& self.exit_symbols.is_empty()
&& self.order_intents.is_empty()
&& self.notes.is_empty()
&& self.diagnostics.is_empty()
}
}
#[derive(Debug, Clone)]
pub enum OrderIntent {
TargetValue {