use std::collections::{BTreeMap, HashMap}; use std::fs; use std::path::Path; use chrono::{NaiveDate, NaiveDateTime}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::calendar::TradingCalendar; use crate::futures::{FuturesCommissionType, FuturesTradingParameter}; use crate::instrument::Instrument; mod date_format { use chrono::NaiveDate; use serde::{self, Deserialize, Deserializer, Serializer}; const FORMAT: &str = "%Y-%m-%d"; pub fn serialize(date: &NaiveDate, serializer: S) -> Result where S: Serializer, { serializer.serialize_str(&date.format(FORMAT).to_string()) } pub fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { let text = String::deserialize(deserializer)?; NaiveDate::parse_from_str(&text, FORMAT).map_err(serde::de::Error::custom) } } mod datetime_format { use chrono::NaiveDateTime; use serde::{self, Deserialize, Deserializer, Serializer}; const FORMAT: &str = "%Y-%m-%d %H:%M:%S"; pub fn serialize(date: &NaiveDateTime, serializer: S) -> Result where S: Serializer, { serializer.serialize_str(&date.format(FORMAT).to_string()) } pub fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { let text = String::deserialize(deserializer)?; NaiveDateTime::parse_from_str(&text, FORMAT).map_err(serde::de::Error::custom) } } #[derive(Debug, Error)] pub enum DataSetError { #[error("failed to read file {path}: {source}")] Io { path: String, #[source] source: std::io::Error, }, #[error("invalid csv row in {path} at line {line}: {message}")] InvalidRow { path: String, line: usize, message: String, }, #[error("benchmark file contains multiple benchmark codes")] MultipleBenchmarks, #[error("missing data for {kind} on {date} / {symbol}")] MissingSnapshot { kind: &'static str, date: NaiveDate, symbol: String, }, #[error("benchmark snapshot missing for {date}")] MissingBenchmark { date: NaiveDate }, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PriceField { DayOpen, Open, Close, Last, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DailyMarketSnapshot { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub timestamp: Option, pub day_open: f64, pub open: f64, pub high: f64, pub low: f64, pub close: f64, pub last_price: f64, pub bid1: f64, pub ask1: f64, pub prev_close: f64, pub volume: u64, pub tick_volume: u64, pub bid1_volume: u64, pub ask1_volume: u64, pub trading_phase: Option, pub paused: bool, pub upper_limit: f64, pub lower_limit: f64, pub price_tick: f64, } impl DailyMarketSnapshot { pub fn price(&self, field: PriceField) -> f64 { match field { PriceField::DayOpen => self.day_open, PriceField::Open => self.open, PriceField::Close => self.close, PriceField::Last => self.last_price, } } pub fn buy_price(&self, field: PriceField) -> f64 { match field { PriceField::Last if self.ask1.is_finite() && self.ask1 > 0.0 => self.ask1, _ => self.price(field), } } pub fn sell_price(&self, field: PriceField) -> f64 { match field { PriceField::Last if self.bid1.is_finite() && self.bid1 > 0.0 => self.bid1, _ => self.price(field), } } pub fn liquidity_for_buy(&self) -> u64 { self.ask1_volume } pub fn liquidity_for_sell(&self) -> u64 { self.bid1_volume } pub fn effective_price_tick(&self) -> f64 { if self.price_tick.is_finite() && self.price_tick > 0.0 { self.price_tick } else { 0.01 } } pub fn is_at_upper_limit_price(&self, price: f64) -> bool { if !self.upper_limit.is_finite() || self.upper_limit <= 0.0 { return false; } price >= self.upper_limit - self.effective_price_tick() + 1e-6 } pub fn is_at_lower_limit_price(&self, price: f64) -> bool { if !self.lower_limit.is_finite() || self.lower_limit <= 0.0 { return false; } price <= self.lower_limit + self.effective_price_tick() - 1e-6 } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DailyFactorSnapshot { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub market_cap_bn: f64, pub free_float_cap_bn: f64, pub pe_ttm: f64, pub turnover_ratio: Option, pub effective_turnover_ratio: Option, #[serde(default)] pub extra_factors: BTreeMap, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BenchmarkSnapshot { #[serde(with = "date_format")] pub date: NaiveDate, pub benchmark: String, pub open: f64, pub close: f64, pub prev_close: f64, pub volume: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CandidateEligibility { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub is_st: bool, pub is_new_listing: bool, pub is_paused: bool, pub allow_buy: bool, pub allow_sell: bool, pub is_kcb: bool, pub is_one_yuan: bool, } impl CandidateEligibility { pub fn eligible_for_selection(&self) -> bool { !self.is_st && !self.is_new_listing && !self.is_paused && !self.is_kcb && !self.is_one_yuan && self.allow_buy && self.allow_sell } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CorporateAction { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, #[serde(default, with = "optional_date_format")] pub payable_date: Option, pub share_cash: f64, pub share_bonus: f64, pub share_gift: f64, pub issue_quantity: f64, pub issue_price: f64, pub reform: bool, pub adjust_factor: Option, #[serde(default)] pub successor_symbol: Option, #[serde(default)] pub successor_ratio: Option, #[serde(default)] pub successor_cash: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IntradayExecutionQuote { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, #[serde(with = "datetime_format")] pub timestamp: NaiveDateTime, pub last_price: f64, pub bid1: f64, pub ask1: f64, pub bid1_volume: u64, pub ask1_volume: u64, #[serde(default)] pub volume_delta: u64, #[serde(default)] pub amount_delta: f64, pub trading_phase: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IntradayOrderBookDepthLevel { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, #[serde(with = "datetime_format")] pub timestamp: NaiveDateTime, pub level: u8, pub bid_price: f64, pub bid_volume: u64, pub ask_price: f64, pub ask_volume: u64, } impl IntradayOrderBookDepthLevel { pub fn executable_price(&self, side: crate::events::OrderSide) -> Option { match side { crate::events::OrderSide::Buy if self.ask_price.is_finite() && self.ask_price > 0.0 => { Some(self.ask_price) } crate::events::OrderSide::Sell if self.bid_price.is_finite() && self.bid_price > 0.0 => { Some(self.bid_price) } _ => None, } } pub fn executable_volume(&self, side: crate::events::OrderSide) -> u64 { match side { crate::events::OrderSide::Buy => self.ask_volume, crate::events::OrderSide::Sell => self.bid_volume, } } } impl IntradayExecutionQuote { pub fn buy_price(&self) -> Option { if self.ask1.is_finite() && self.ask1 > 0.0 { Some(self.ask1) } else if self.last_price.is_finite() && self.last_price > 0.0 { Some(self.last_price) } else { None } } pub fn sell_price(&self) -> Option { if self.bid1.is_finite() && self.bid1 > 0.0 { Some(self.bid1) } else if self.last_price.is_finite() && self.last_price > 0.0 { Some(self.last_price) } else { None } } } impl CorporateAction { pub fn split_ratio(&self) -> f64 { 1.0 + self.share_bonus.max(0.0) + self.share_gift.max(0.0) } pub fn has_effect(&self) -> bool { self.share_cash.abs() > f64::EPSILON || (self.split_ratio() - 1.0).abs() > f64::EPSILON || self.issue_quantity.abs() > f64::EPSILON || self.reform || self.has_successor_conversion() } pub fn has_successor_conversion(&self) -> bool { self.successor_symbol .as_ref() .is_some_and(|symbol| !symbol.trim().is_empty()) && self.successor_ratio_value() > 0.0 } pub fn successor_ratio_value(&self) -> f64 { self.successor_ratio .filter(|ratio| ratio.is_finite() && *ratio > 0.0) .unwrap_or(1.0) } pub fn successor_cash_value(&self) -> f64 { self.successor_cash .filter(|cash| cash.is_finite()) .unwrap_or(0.0) } } #[derive(Debug, Clone)] pub struct DailySnapshotBundle { pub date: NaiveDate, pub benchmark: BenchmarkSnapshot, pub market: Vec, pub factors: Vec, pub candidates: Vec, pub corporate_actions: Vec, } #[derive(Debug, Clone, Serialize)] pub struct PriceBar { #[serde(with = "date_format")] pub date: NaiveDate, pub timestamp: Option, pub symbol: String, pub frequency: String, pub open: f64, pub high: f64, pub low: f64, pub close: f64, pub last_price: f64, pub volume: u64, pub amount: f64, pub bid1: f64, pub ask1: f64, pub bid1_volume: u64, pub ask1_volume: u64, } #[derive(Debug, Clone, Serialize)] pub struct DividendRecord { #[serde(with = "date_format")] pub ex_dividend_date: NaiveDate, #[serde(with = "date_format")] pub payable_date: NaiveDate, pub symbol: String, pub dividend_cash_before_tax: f64, pub round_lot: u32, } #[derive(Debug, Clone, Serialize)] pub struct SplitRecord { #[serde(with = "date_format")] pub ex_dividend_date: NaiveDate, pub symbol: String, pub split_ratio: f64, } #[derive(Debug, Clone, Serialize)] pub struct FactorValue { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub field: String, pub value: f64, } #[derive(Debug, Clone, Serialize)] pub struct FactorTextValue { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub field: String, pub value: String, } #[derive(Debug, Clone, Serialize)] pub struct SecuritiesMarginRecord { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub field: String, pub value: f64, } #[derive(Debug, Clone, Serialize)] pub struct YieldCurvePoint { #[serde(with = "date_format")] pub date: NaiveDate, pub tenor: String, pub value: f64, } #[derive(Debug, Clone)] pub struct EligibleUniverseSnapshot { pub symbol: String, pub market_cap_bn: f64, pub free_float_cap_bn: f64, } #[derive(Debug, Clone)] struct SymbolPriceSeries { snapshots: Vec, dates: Vec, opens: Vec, closes: Vec, prev_closes: Vec, last_prices: Vec, open_prefix: Vec, close_prefix: Vec, prev_close_prefix: Vec, last_prefix: Vec, volume_prefix: Vec, } impl SymbolPriceSeries { fn new(rows: &[DailyMarketSnapshot]) -> Self { let mut sorted = rows.to_vec(); sorted.sort_by_key(|row| row.date); let dates = sorted.iter().map(|row| row.date).collect::>(); let opens = sorted.iter().map(|row| row.open).collect::>(); let closes = sorted.iter().map(|row| row.close).collect::>(); let prev_closes = sorted.iter().map(|row| row.prev_close).collect::>(); let last_prices = sorted.iter().map(|row| row.last_price).collect::>(); let volumes = sorted .iter() .map(|row| row.volume as f64) .collect::>(); let open_prefix = prefix_sums(&opens); let close_prefix = prefix_sums(&closes); let prev_close_prefix = prefix_sums(&prev_closes); let last_prefix = prefix_sums(&last_prices); let volume_prefix = prefix_sums(&volumes); Self { snapshots: sorted, dates, opens, closes, prev_closes, last_prices, open_prefix, close_prefix, prev_close_prefix, last_prefix, volume_prefix, } } fn moving_average(&self, date: NaiveDate, lookback: usize, field: PriceField) -> Option { if lookback == 0 { return None; } let end = self.end_index(date)?; if end < lookback { return None; } let start = end - lookback; let prefix = self.prefix_for(field); let sum = prefix[end] - prefix[start]; Some(sum / lookback as f64) } fn trailing_values(&self, date: NaiveDate, lookback: usize, field: PriceField) -> Vec { let Some(end) = self.end_index(date) else { return Vec::new(); }; let start = end.saturating_sub(lookback); self.values_for(field)[start..end].to_vec() } fn trailing_snapshots( &self, date: NaiveDate, lookback: usize, include_now: bool, ) -> Vec { if lookback == 0 { return Vec::new(); } let end = if include_now { self.end_index(date) } else { self.previous_completed_end_index(date) }; let Some(end) = end else { return Vec::new(); }; let start = end.saturating_sub(lookback); self.snapshots[start..end].to_vec() } fn decision_price_on_or_before(&self, date: NaiveDate) -> Option { let end = self.decision_end_index(date)?; if end == 0 { return None; } self.prev_closes.get(end - 1).copied() } fn decision_end_index(&self, date: NaiveDate) -> Option { match self.dates.binary_search(&date) { Ok(idx) => Some(idx + 1), Err(0) => None, Err(idx) => Some(idx), } } fn previous_completed_end_index(&self, date: NaiveDate) -> Option { match self.dates.binary_search(&date) { Ok(idx) => Some(idx), Err(0) => None, Err(idx) => Some(idx), } } fn decision_close_moving_average(&self, date: NaiveDate, lookback: usize) -> Option { if lookback == 0 { return None; } let end = self.decision_end_index(date)?; if end < lookback { return None; } let start = end - lookback; let sum = self.prev_close_prefix[end] - self.prev_close_prefix[start]; Some(sum / lookback as f64) } fn decision_volume_moving_average(&self, date: NaiveDate, lookback: usize) -> Option { if lookback == 0 { return None; } let end = self.previous_completed_end_index(date)?; if end < lookback { return None; } let start = end - lookback; let sum = self.volume_prefix[end] - self.volume_prefix[start]; Some(sum / lookback as f64) } fn end_index(&self, date: NaiveDate) -> Option { match self.dates.binary_search(&date) { Ok(idx) => Some(idx + 1), Err(0) => None, Err(idx) => Some(idx), } } fn values_for(&self, field: PriceField) -> &[f64] { match field { PriceField::DayOpen => &self.opens, PriceField::Open => &self.opens, PriceField::Close => &self.closes, PriceField::Last => &self.last_prices, } } fn price_on_or_before(&self, date: NaiveDate, field: PriceField) -> Option { let end = self.end_index(date)?; if end == 0 { return None; } self.values_for(field).get(end - 1).copied() } fn prefix_for(&self, field: PriceField) -> &[f64] { match field { PriceField::DayOpen => &self.open_prefix, PriceField::Open => &self.open_prefix, PriceField::Close => &self.close_prefix, PriceField::Last => &self.last_prefix, } } } #[derive(Debug, Clone)] struct BenchmarkPriceSeries { dates: Vec, closes: Vec, open_prefix: Vec, close_prefix: Vec, } impl BenchmarkPriceSeries { fn new(rows: &[BenchmarkSnapshot]) -> Self { let mut sorted = rows.to_vec(); sorted.sort_by_key(|row| row.date); let dates = sorted.iter().map(|row| row.date).collect::>(); let opens = sorted.iter().map(|row| row.open).collect::>(); let closes = sorted.iter().map(|row| row.close).collect::>(); let open_prefix = prefix_sums(&opens); let close_prefix = prefix_sums(&closes); Self { dates, closes, open_prefix, close_prefix, } } fn moving_average(&self, date: NaiveDate, lookback: usize) -> Option { self.moving_average_for(date, lookback, PriceField::Close) } fn moving_average_for( &self, date: NaiveDate, lookback: usize, field: PriceField, ) -> Option { if lookback == 0 { return None; } let end = match self.dates.binary_search(&date) { Ok(idx) => idx + 1, Err(0) => return None, Err(idx) => idx, }; if end < lookback { return None; } let start = end - lookback; let prefix = match field { PriceField::DayOpen | PriceField::Open => &self.open_prefix, PriceField::Close | PriceField::Last => &self.close_prefix, }; let sum = prefix[end] - prefix[start]; Some(sum / lookback as f64) } fn trailing_values(&self, date: NaiveDate, lookback: usize) -> Vec { let end = match self.dates.binary_search(&date) { Ok(idx) => idx + 1, Err(0) => return Vec::new(), Err(idx) => idx, }; let start = end.saturating_sub(lookback); self.closes[start..end].to_vec() } } #[derive(Debug, Clone)] pub struct DataSet { instruments: HashMap, calendar: TradingCalendar, market_by_date: BTreeMap>, market_index: HashMap<(NaiveDate, String), DailyMarketSnapshot>, factor_by_date: BTreeMap>, factor_index: HashMap<(NaiveDate, String), DailyFactorSnapshot>, factor_text_by_date: BTreeMap>, factor_text_index: HashMap<(NaiveDate, String, String), FactorTextValue>, candidate_by_date: BTreeMap>, candidate_index: HashMap<(NaiveDate, String), CandidateEligibility>, corporate_actions_by_date: BTreeMap>, execution_quotes_index: HashMap<(NaiveDate, String), Vec>, order_book_depth_index: HashMap<(NaiveDate, String), Vec>, benchmark_by_date: BTreeMap, market_series_by_symbol: HashMap, benchmark_series_cache: BenchmarkPriceSeries, eligible_universe_by_date: BTreeMap>, benchmark_code: String, futures_params_by_symbol: HashMap>, } impl DataSet { pub fn from_csv_dir(path: &Path) -> Result { let instruments = read_instruments(&path.join("instruments.csv"))?; let market = read_market(&path.join("market.csv"))?; let factors = read_factors(&path.join("factors.csv"))?; let factor_texts = read_factor_texts(&path.join("factors.csv"))?; let candidates = read_candidates(&path.join("candidate_flags.csv"))?; let benchmarks = read_benchmarks(&path.join("benchmark.csv"))?; let corporate_actions_path = path.join("corporate_actions.csv"); let corporate_actions = if corporate_actions_path.exists() { read_corporate_actions(&corporate_actions_path)? } else { Vec::new() }; let execution_quotes_path = path.join("execution_quotes.csv"); let execution_quotes = if execution_quotes_path.exists() { read_execution_quotes(&execution_quotes_path)? } else { Vec::new() }; let futures_params_path = path.join("futures_trading_parameters.csv"); let futures_params = if futures_params_path.exists() { read_futures_trading_parameters(&futures_params_path)? } else { Vec::new() }; let order_book_depth_path = path.join("order_book_depth.csv"); let order_book_depth = if order_book_depth_path.exists() { read_order_book_depth(&order_book_depth_path)? } else { Vec::new() }; Self::from_components_with_actions_quotes_futures_depth_and_factor_texts( instruments, market, factors, candidates, benchmarks, corporate_actions, execution_quotes, futures_params, order_book_depth, factor_texts, ) } pub fn from_partitioned_dir(path: &Path) -> Result { let instruments = read_instruments(&path.join("instruments.csv"))?; let benchmarks = read_partitioned_dir(&path.join("benchmark"), read_benchmarks)?; let market = read_partitioned_dir(&path.join("market"), read_market)?; let factors = read_partitioned_dir(&path.join("factors"), read_factors)?; let factor_texts = read_partitioned_dir(&path.join("factors"), read_factor_texts)?; let candidates = read_partitioned_dir(&path.join("candidates"), read_candidates)?; let corporate_actions_dir = path.join("corporate_actions"); let corporate_actions = if corporate_actions_dir.exists() { read_partitioned_dir(&corporate_actions_dir, read_corporate_actions)? } else { Vec::new() }; let execution_quotes_dir = path.join("execution_quotes"); let execution_quotes = if execution_quotes_dir.exists() { read_partitioned_dir(&execution_quotes_dir, read_execution_quotes)? } else { Vec::new() }; let futures_params_dir = path.join("futures_trading_parameters"); let futures_params = if futures_params_dir.exists() { read_partitioned_dir(&futures_params_dir, read_futures_trading_parameters)? } else { Vec::new() }; let order_book_depth_dir = path.join("order_book_depth"); let order_book_depth = if order_book_depth_dir.exists() { read_partitioned_dir(&order_book_depth_dir, read_order_book_depth)? } else { Vec::new() }; Self::from_components_with_actions_quotes_futures_depth_and_factor_texts( instruments, market, factors, candidates, benchmarks, corporate_actions, execution_quotes, futures_params, order_book_depth, factor_texts, ) } pub fn from_components( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, ) -> Result { Self::from_components_with_actions_and_quotes( instruments, market, factors, candidates, benchmarks, Vec::new(), Vec::new(), ) } pub fn from_components_with_actions( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, corporate_actions: Vec, ) -> Result { Self::from_components_with_actions_and_quotes( instruments, market, factors, candidates, benchmarks, corporate_actions, Vec::new(), ) } pub fn from_components_with_actions_and_quotes( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, corporate_actions: Vec, execution_quotes: Vec, ) -> Result { Self::from_components_with_actions_quotes_and_futures( instruments, market, factors, candidates, benchmarks, corporate_actions, execution_quotes, Vec::new(), ) } pub fn from_components_with_actions_quotes_and_futures( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, corporate_actions: Vec, execution_quotes: Vec, futures_params: Vec, ) -> Result { Self::from_components_with_actions_quotes_futures_and_depth( instruments, market, factors, candidates, benchmarks, corporate_actions, execution_quotes, futures_params, Vec::new(), ) } pub fn from_components_with_actions_quotes_futures_and_depth( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, corporate_actions: Vec, execution_quotes: Vec, futures_params: Vec, order_book_depth: Vec, ) -> Result { Self::from_components_with_actions_quotes_futures_depth_and_factor_texts( instruments, market, factors, candidates, benchmarks, corporate_actions, execution_quotes, futures_params, order_book_depth, Vec::new(), ) } pub fn from_components_with_factor_texts( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, factor_texts: Vec, ) -> Result { Self::from_components_with_actions_quotes_futures_depth_and_factor_texts( instruments, market, factors, candidates, benchmarks, Vec::new(), Vec::new(), Vec::new(), Vec::new(), factor_texts, ) } pub fn from_components_with_actions_quotes_futures_depth_and_factor_texts( instruments: Vec, market: Vec, factors: Vec, candidates: Vec, benchmarks: Vec, corporate_actions: Vec, execution_quotes: Vec, futures_params: Vec, order_book_depth: Vec, factor_texts: Vec, ) -> Result { let benchmark_code = collect_benchmark_code(&benchmarks)?; let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect()); let instruments = instruments .into_iter() .map(|instrument| (instrument.symbol.clone(), instrument)) .collect::>(); let market_by_date = group_by_date(market.clone(), |item| item.date); let market_index = market .into_iter() .map(|item| ((item.date, item.symbol.clone()), item)) .collect::>(); let factor_by_date = group_by_date(factors.clone(), |item| item.date); let factor_index = factors .into_iter() .map(|item| ((item.date, item.symbol.clone()), item)) .collect::>(); let factor_texts = factor_texts .into_iter() .filter_map(|mut item| { item.field = normalize_field(&item.field); if item.field.is_empty() { None } else { Some(item) } }) .collect::>(); let factor_text_by_date = group_by_date(factor_texts.clone(), |item| item.date); let factor_text_index = factor_texts .into_iter() .map(|item| ((item.date, item.symbol.clone(), item.field.clone()), item)) .collect::>(); let candidate_by_date = group_by_date(candidates.clone(), |item| item.date); let candidate_index = candidates .into_iter() .map(|item| ((item.date, item.symbol.clone()), item)) .collect::>(); let corporate_actions_by_date = group_by_date(corporate_actions, |item| item.date); let execution_quotes_index = build_execution_quote_index(execution_quotes); let order_book_depth_index = build_order_book_depth_index(order_book_depth); let benchmark_by_date = benchmarks .into_iter() .map(|item| (item.date, item)) .collect::>(); let market_series_by_symbol = build_market_series(&market_by_date); let benchmark_series_cache = BenchmarkPriceSeries::new(&benchmark_by_date.values().cloned().collect::>()); let eligible_universe_by_date = build_eligible_universe(&factor_by_date, &candidate_index, &market_index); let futures_params_by_symbol = build_futures_params_index(futures_params); Ok(Self { instruments, calendar, market_by_date, market_index, factor_by_date, factor_index, factor_text_by_date, factor_text_index, candidate_by_date, candidate_index, corporate_actions_by_date, execution_quotes_index, order_book_depth_index, benchmark_by_date, market_series_by_symbol, benchmark_series_cache, eligible_universe_by_date, benchmark_code, futures_params_by_symbol, }) } pub fn calendar(&self) -> &TradingCalendar { &self.calendar } pub fn benchmark_code(&self) -> &str { &self.benchmark_code } pub fn instruments(&self) -> &HashMap { &self.instruments } pub fn all_instruments(&self) -> Vec<&Instrument> { let mut instruments = self.instruments.values().collect::>(); instruments.sort_by(|left, right| left.symbol.cmp(&right.symbol)); instruments } pub fn instruments_history(&self, symbols: &[&str]) -> Vec<&Instrument> { symbols .iter() .filter_map(|symbol| self.instruments.get(*symbol)) .collect() } pub fn active_instruments(&self, date: NaiveDate, symbols: &[&str]) -> Vec<&Instrument> { symbols .iter() .filter_map(|symbol| self.instruments.get(*symbol)) .filter(|instrument| instrument.is_active_on(date)) .collect() } pub fn instrument(&self, symbol: &str) -> Option<&Instrument> { self.instruments.get(symbol) } pub fn market(&self, date: NaiveDate, symbol: &str) -> Option<&DailyMarketSnapshot> { self.market_index.get(&(date, symbol.to_string())) } pub fn factor(&self, date: NaiveDate, symbol: &str) -> Option<&DailyFactorSnapshot> { self.factor_index.get(&(date, symbol.to_string())) } pub fn candidate(&self, date: NaiveDate, symbol: &str) -> Option<&CandidateEligibility> { self.candidate_index.get(&(date, symbol.to_string())) } pub fn benchmark(&self, date: NaiveDate) -> Option<&BenchmarkSnapshot> { self.benchmark_by_date.get(&date) } pub fn corporate_actions_on(&self, date: NaiveDate) -> &[CorporateAction] { self.corporate_actions_by_date .get(&date) .map(Vec::as_slice) .unwrap_or(&[]) } pub fn execution_quotes_on(&self, date: NaiveDate, symbol: &str) -> &[IntradayExecutionQuote] { self.execution_quotes_index .get(&(date, symbol.to_string())) .map(Vec::as_slice) .unwrap_or(&[]) } pub fn order_book_depth_on( &self, date: NaiveDate, symbol: &str, ) -> &[IntradayOrderBookDepthLevel] { self.order_book_depth_index .get(&(date, symbol.to_string())) .map(Vec::as_slice) .unwrap_or(&[]) } pub fn execution_quotes_on_date(&self, date: NaiveDate) -> Vec { let mut quotes = self .execution_quotes_index .iter() .filter(|((quote_date, _), _)| *quote_date == date) .flat_map(|(_, rows)| rows.iter().cloned()) .collect::>(); quotes.sort_by(|left, right| { left.timestamp .cmp(&right.timestamp) .then_with(|| left.symbol.cmp(&right.symbol)) }); quotes } pub fn benchmark_series(&self) -> Vec { self.benchmark_by_date.values().cloned().collect() } pub fn futures_trading_parameter( &self, date: NaiveDate, symbol: &str, ) -> Option<&FuturesTradingParameter> { self.futures_params_by_symbol.get(symbol).and_then(|rows| { rows.iter() .rev() .find(|row| row.effective_date.is_none_or(|effective| effective <= date)) }) } pub fn futures_settlement_price( &self, date: NaiveDate, symbol: &str, mode: &str, ) -> Option { let snapshot = self.market(date, symbol)?; match normalize_field(mode).as_str() { "settlement" | "settle" => self .factor_numeric_value(date, symbol, "settlement") .or_else(|| self.factor_numeric_value(date, symbol, "settle")) .or(Some(snapshot.close)), "prev_settlement" | "pre_settlement" => self .factor_numeric_value(date, symbol, "prev_settlement") .or_else(|| self.factor_numeric_value(date, symbol, "pre_settlement")) .or(Some(snapshot.prev_close)), _ => Some(snapshot.close), } } pub fn history_bars( &self, date: NaiveDate, symbol: &str, bar_count: usize, frequency: &str, field: &str, include_now: bool, ) -> Vec { self.history_bars_at(date, None, symbol, bar_count, frequency, field, include_now) } pub fn history_bars_at( &self, date: NaiveDate, active_datetime: Option, symbol: &str, bar_count: usize, frequency: &str, field: &str, include_now: bool, ) -> Vec { if bar_count == 0 { return Vec::new(); } match normalize_history_frequency(frequency).as_deref() { Some("1d") => self.history_daily_values(date, symbol, bar_count, field, include_now), Some("1m") | Some("tick") => self.history_intraday_values( date, active_datetime, symbol, bar_count, field, include_now, ), _ => Vec::new(), } } pub fn history_daily_snapshots( &self, date: NaiveDate, symbol: &str, bar_count: usize, include_now: bool, ) -> Vec { self.market_series_by_symbol .get(symbol) .map(|series| series.trailing_snapshots(date, bar_count, include_now)) .unwrap_or_default() } pub fn history_intraday_quotes( &self, date: NaiveDate, symbol: &str, bar_count: usize, include_now: bool, ) -> Vec { self.history_intraday_quotes_at(date, None, symbol, bar_count, include_now) } pub fn history_intraday_quotes_at( &self, date: NaiveDate, active_datetime: Option, symbol: &str, bar_count: usize, include_now: bool, ) -> Vec { if bar_count == 0 { return Vec::new(); } let mut quotes = self .execution_quotes_index .iter() .filter(|((_, quote_symbol), _)| quote_symbol == symbol) .flat_map(|(_, rows)| rows.iter()) .filter(|quote| intraday_quote_visible(quote, date, active_datetime, include_now)) .cloned() .collect::>(); quotes.sort_by_key(|quote| quote.timestamp); take_last(quotes, bar_count) } pub fn trading_dates(&self, start: NaiveDate, end: NaiveDate) -> Vec { self.calendar.trading_dates(start, end) } pub fn previous_trading_date(&self, date: NaiveDate, n: usize) -> Option { self.calendar.previous_trading_date(date, n) } pub fn next_trading_date(&self, date: NaiveDate, n: usize) -> Option { self.calendar.next_trading_date(date, n) } pub fn is_suspended_flags(&self, date: NaiveDate, symbol: &str, count: usize) -> Vec { self.historical_daily_flags(date, symbol, count, |candidate, market| { candidate.is_some_and(|row| row.is_paused) || market.is_some_and(|row| row.paused) }) } pub fn is_st_stock_flags(&self, date: NaiveDate, symbol: &str, count: usize) -> Vec { self.historical_daily_flags(date, symbol, count, |candidate, _| { candidate.is_some_and(|row| row.is_st) }) } pub fn get_dividend( &self, symbol: &str, start: NaiveDate, end: NaiveDate, ) -> Vec { let mut rows = self .corporate_actions_by_date .range(start..=end) .flat_map(|(_, actions)| actions.iter()) .filter(|action| action.symbol == symbol && action.share_cash.abs() > f64::EPSILON) .map(|action| DividendRecord { ex_dividend_date: action.date, payable_date: action.payable_date.unwrap_or(action.date), symbol: action.symbol.clone(), dividend_cash_before_tax: action.share_cash, round_lot: self .instrument(symbol) .map(Instrument::effective_round_lot) .unwrap_or(100), }) .collect::>(); rows.sort_by_key(|row| row.ex_dividend_date); rows } pub fn get_split(&self, symbol: &str, start: NaiveDate, end: NaiveDate) -> Vec { let mut rows = self .corporate_actions_by_date .range(start..=end) .flat_map(|(_, actions)| actions.iter()) .filter(|action| action.symbol == symbol && (action.split_ratio() - 1.0).abs() > 1e-12) .map(|action| SplitRecord { ex_dividend_date: action.date, symbol: action.symbol.clone(), split_ratio: action.split_ratio(), }) .collect::>(); rows.sort_by_key(|row| row.ex_dividend_date); rows } pub fn get_factor( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { if start > end { return Vec::new(); } let field = normalize_field(field); let mut rows = self .factor_by_date .range(start..=end) .flat_map(|(_, snapshots)| snapshots.iter()) .filter(|snapshot| snapshot.symbol == symbol) .filter_map(|snapshot| { factor_numeric_value(snapshot, &field).map(|value| FactorValue { date: snapshot.date, symbol: snapshot.symbol.clone(), field: field.clone(), value, }) }) .collect::>(); rows.sort_by_key(|row| row.date); rows } pub fn get_factor_text( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { if start > end { return Vec::new(); } let field = normalize_field(field); let mut rows = self .factor_text_by_date .range(start..=end) .flat_map(|(_, snapshots)| snapshots.iter()) .filter(|snapshot| { snapshot.symbol == symbol && normalize_field(&snapshot.field) == field }) .cloned() .collect::>(); rows.sort_by_key(|row| row.date); rows } pub fn get_yield_curve( &self, start: NaiveDate, end: NaiveDate, tenor: Option<&str>, ) -> Vec { if start > end { return Vec::new(); } let tenor_filter = tenor.map(normalize_field); let mut rows = Vec::new(); for (date, snapshots) in self.factor_by_date.range(start..=end) { for snapshot in snapshots { for (field, value) in &snapshot.extra_factors { let normalized = normalize_field(field); let Some(raw_tenor) = normalized .strip_prefix("yield_curve_") .or_else(|| normalized.strip_prefix("yc_")) else { continue; }; if tenor_filter .as_ref() .is_some_and(|expected| expected != raw_tenor) { continue; } rows.push(YieldCurvePoint { date: *date, tenor: raw_tenor.to_string(), value: *value, }); } } } rows.sort_by(|left, right| { left.date .cmp(&right.date) .then(left.tenor.cmp(&right.tenor)) }); rows } pub fn get_margin_stocks(&self, date: NaiveDate, margin_type: &str) -> Vec { let field = match normalize_field(margin_type).as_str() { "stock" => "margin_stock", "cash" => "margin_cash", _ => "margin_all", }; let mut symbols = self .factor_by_date .get(&date) .map(|rows| { rows.iter() .filter(|row| { row.extra_factors .get(field) .or_else(|| row.extra_factors.get("margin_all")) .is_some_and(|value| *value > 0.0) }) .map(|row| row.symbol.clone()) .collect::>() }) .unwrap_or_default(); if symbols.is_empty() { symbols = self .active_instruments( date, &self .instruments .keys() .map(String::as_str) .collect::>(), ) .into_iter() .filter(|instrument| !instrument.board.eq_ignore_ascii_case("FUTURE")) .map(|instrument| instrument.symbol.clone()) .collect(); } symbols.sort(); symbols.dedup(); symbols } pub fn get_securities_margin( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_factor(symbol, start, end, field) .into_iter() .map(|row| SecuritiesMarginRecord { date: row.date, symbol: row.symbol, field: row.field, value: row.value, }) .collect() } pub fn get_shares( &self, symbol: &str, start: NaiveDate, end: NaiveDate, share_type: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &shares_factor_aliases(share_type), &format!("shares_{}", normalize_field(share_type)), ) } pub fn get_turnover_rate( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &turnover_rate_factor_aliases(field), &format!("turnover_rate_{}", normalize_field(field)), ) } pub fn get_price_change_rate( &self, symbol: &str, start: NaiveDate, end: NaiveDate, ) -> Vec { if start > end { return Vec::new(); } let mut rows = self .market_by_date .range(start..=end) .flat_map(|(_, snapshots)| snapshots.iter()) .filter(|snapshot| snapshot.symbol == symbol) .filter_map(|snapshot| { if snapshot.prev_close.is_finite() && snapshot.prev_close > 0.0 { Some(FactorValue { date: snapshot.date, symbol: snapshot.symbol.clone(), field: "price_change_rate".to_string(), value: snapshot.close / snapshot.prev_close - 1.0, }) } else { None } }) .collect::>(); if rows.is_empty() { rows = self.get_first_available_factor_series( symbol, start, end, &[ "price_change_rate".to_string(), "change_rate".to_string(), "pct_change".to_string(), ], "price_change_rate", ); } rows.sort_by_key(|row| row.date); rows } pub fn get_stock_connect( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &stock_connect_factor_aliases(field), &format!("stock_connect_{}", normalize_field(field)), ) } pub fn current_performance( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &prefixed_factor_aliases("current_performance", field), field, ) } pub fn get_fundamentals( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &prefixed_factor_aliases("fundamental", field), field, ) } pub fn get_financials( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &prefixed_factor_aliases("financial", field), field, ) } pub fn get_pit_financials( &self, symbol: &str, start: NaiveDate, end: NaiveDate, field: &str, ) -> Vec { self.get_first_available_factor_series( symbol, start, end, &prefixed_factor_aliases("pit_financial", field), field, ) } pub fn get_industry( &self, symbol: &str, date: NaiveDate, source: &str, level: usize, ) -> Option { let fields = industry_factor_aliases(source, level); for (factor_date, snapshots) in self.factor_by_date.range(..=date).rev() { let Some(snapshot) = snapshots.iter().find(|row| row.symbol == symbol) else { continue; }; for field in &fields { if let Some(value) = factor_numeric_value(snapshot, field) { return Some(FactorValue { date: *factor_date, symbol: snapshot.symbol.clone(), field: field.clone(), value, }); } } } None } pub fn get_industry_name( &self, symbol: &str, date: NaiveDate, source: &str, level: usize, ) -> Option { let fields = industry_name_factor_aliases(source, level); for (factor_date, snapshots) in self.factor_text_by_date.range(..=date).rev() { for snapshot in snapshots { if snapshot.symbol != symbol { continue; } let normalized = normalize_field(&snapshot.field); if fields.iter().any(|field| field == &normalized) { return Some(FactorTextValue { date: *factor_date, symbol: snapshot.symbol.clone(), field: snapshot.field.clone(), value: snapshot.value.clone(), }); } } } None } pub fn get_dominant_future(&self, underlying_symbol: &str, date: NaiveDate) -> Option { let underlying = normalize_field(underlying_symbol); let mut candidates = self .futures_params_by_symbol .keys() .filter(|symbol| normalize_field(symbol).starts_with(&underlying)) .filter(|symbol| { self.futures_trading_parameter(date, symbol.as_str()) .is_some() }) .cloned() .collect::>(); if candidates.is_empty() { candidates = self .instruments .values() .filter(|instrument| instrument.board.eq_ignore_ascii_case("FUTURE")) .filter(|instrument| normalize_field(&instrument.symbol).starts_with(&underlying)) .filter(|instrument| instrument.is_active_on(date)) .map(|instrument| instrument.symbol.clone()) .collect(); } candidates.sort(); candidates.into_iter().next() } pub fn get_dominant_future_price( &self, underlying_symbol: &str, start: NaiveDate, end: NaiveDate, frequency: &str, ) -> Vec { let Some(symbol) = self.get_dominant_future(underlying_symbol, end) else { return Vec::new(); }; self.get_price(&symbol, start, end, frequency) } pub fn get_price( &self, symbol: &str, start: NaiveDate, end: NaiveDate, frequency: &str, ) -> Vec { if start > end { return Vec::new(); } match normalize_history_frequency(frequency).as_deref() { Some("1d") => self .market_by_date .range(start..=end) .flat_map(|(_, rows)| rows.iter()) .filter(|row| row.symbol == symbol) .map(daily_market_price_bar) .collect(), Some("1m") | Some("tick") => { let mut bars = self .execution_quotes_index .iter() .filter(|((date, quote_symbol), _)| { quote_symbol == symbol && *date >= start && *date <= end }) .flat_map(|(_, rows)| rows.iter()) .map(intraday_quote_price_bar) .collect::>(); bars.sort_by(|left, right| { left.date .cmp(&right.date) .then_with(|| left.timestamp.cmp(&right.timestamp)) }); bars } _ => Vec::new(), } } pub fn price(&self, date: NaiveDate, symbol: &str, field: PriceField) -> Option { let snapshot = self.market(date, symbol)?; Some(snapshot.price(field)) } pub fn price_on_or_before( &self, date: NaiveDate, symbol: &str, field: PriceField, ) -> Option { self.market_series_by_symbol .get(symbol) .and_then(|series| series.price_on_or_before(date, field)) } pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> { self.factor_by_date .get(&date) .map(|rows| rows.iter().collect()) .unwrap_or_default() } pub fn factor_text_snapshots_on(&self, date: NaiveDate) -> Vec<&FactorTextValue> { self.factor_text_by_date .get(&date) .map(|rows| rows.iter().collect()) .unwrap_or_default() } pub fn market_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyMarketSnapshot> { self.market_by_date .get(&date) .map(|rows| rows.iter().collect()) .unwrap_or_default() } pub fn candidate_snapshots_on(&self, date: NaiveDate) -> Vec<&CandidateEligibility> { self.candidate_by_date .get(&date) .map(|rows| rows.iter().collect()) .unwrap_or_default() } pub fn bundle_on(&self, date: NaiveDate) -> Result { let benchmark = self .benchmark(date) .cloned() .ok_or(DataSetError::MissingBenchmark { date })?; Ok(DailySnapshotBundle { date, benchmark, market: self.market_by_date.get(&date).cloned().unwrap_or_default(), factors: self.factor_by_date.get(&date).cloned().unwrap_or_default(), candidates: self .candidate_by_date .get(&date) .cloned() .unwrap_or_default(), corporate_actions: self .corporate_actions_by_date .get(&date) .cloned() .unwrap_or_default(), }) } pub fn benchmark_closes_up_to(&self, date: NaiveDate, lookback: usize) -> Vec { self.benchmark_series_cache.trailing_values(date, lookback) } pub fn market_closes_up_to(&self, date: NaiveDate, symbol: &str, lookback: usize) -> Vec { self.market_series_by_symbol .get(symbol) .map(|series| series.trailing_values(date, lookback, PriceField::Close)) .unwrap_or_default() } fn history_daily_values( &self, date: NaiveDate, symbol: &str, bar_count: usize, field: &str, include_now: bool, ) -> Vec { self.history_daily_snapshots(date, symbol, bar_count, include_now) .into_iter() .filter_map(|row| daily_market_numeric_value(&row, field)) .collect() } fn history_intraday_values( &self, date: NaiveDate, active_datetime: Option, symbol: &str, bar_count: usize, field: &str, include_now: bool, ) -> Vec { self.history_intraday_quotes_at(date, active_datetime, symbol, bar_count, include_now) .into_iter() .filter_map(|row| intraday_quote_numeric_value(&row, field)) .collect() } fn historical_daily_flags( &self, date: NaiveDate, symbol: &str, count: usize, evaluator: F, ) -> Vec where F: Fn(Option<&CandidateEligibility>, Option<&DailyMarketSnapshot>) -> bool, { if count == 0 { return Vec::new(); } let days = self .calendar .iter() .filter(|day| *day <= date) .collect::>(); let start = days.len().saturating_sub(count); days[start..] .iter() .map(|day| { evaluator( self.candidate_index.get(&(*day, symbol.to_string())), self.market_index.get(&(*day, symbol.to_string())), ) }) .collect() } pub fn market_decision_close(&self, date: NaiveDate, symbol: &str) -> Option { self.market_series_by_symbol .get(symbol) .and_then(|series| series.decision_price_on_or_before(date)) } pub fn market_decision_close_moving_average( &self, date: NaiveDate, symbol: &str, lookback: usize, ) -> Option { self.market_series_by_symbol .get(symbol) .and_then(|series| series.decision_close_moving_average(date, lookback)) } pub fn market_decision_volume_moving_average( &self, date: NaiveDate, symbol: &str, lookback: usize, ) -> Option { self.market_series_by_symbol .get(symbol) .and_then(|series| series.decision_volume_moving_average(date, lookback)) } pub fn factor_numeric_value(&self, date: NaiveDate, symbol: &str, field: &str) -> Option { self.factor(date, symbol) .and_then(|snapshot| factor_numeric_value(snapshot, field)) } pub fn factor_text_value(&self, date: NaiveDate, symbol: &str, field: &str) -> Option { self.factor_text_index .get(&(date, symbol.to_string(), normalize_field(field))) .map(|row| row.value.clone()) } fn get_first_available_factor_series( &self, symbol: &str, start: NaiveDate, end: NaiveDate, fields: &[String], output_field: &str, ) -> Vec { if start > end { return Vec::new(); } let output_field = normalize_field(output_field); let mut rows = Vec::new(); for (_, snapshots) in self.factor_by_date.range(start..=end) { let Some(snapshot) = snapshots.iter().find(|row| row.symbol == symbol) else { continue; }; for field in fields { if let Some(value) = factor_numeric_value(snapshot, field) { rows.push(FactorValue { date: snapshot.date, symbol: snapshot.symbol.clone(), field: output_field.clone(), value, }); break; } } } rows.sort_by_key(|row| row.date); rows } pub fn factor_moving_average( &self, date: NaiveDate, symbol: &str, field: &str, lookback: usize, ) -> Option { if lookback == 0 { return None; } let dates = self.calendar.trailing_days(date, lookback); if dates.is_empty() { return None; } let mut sum = 0.0_f64; let mut count = 0usize; for trading_day in dates { let snapshot = self.factor(trading_day, symbol)?; let value = factor_numeric_value(snapshot, field)?; sum += value; count += 1; } if count == 0 { None } else { Some(sum / count as f64) } } pub fn market_decision_numeric_moving_average( &self, date: NaiveDate, symbol: &str, field: &str, lookback: usize, ) -> Option { match field { "close" | "prev_close" | "stock_close" | "price" => self .market_series_by_symbol .get(symbol) .and_then(|series| series.decision_close_moving_average(date, lookback)), "volume" | "stock_volume" => self .market_series_by_symbol .get(symbol) .and_then(|series| series.decision_volume_moving_average(date, lookback)), "day_open" | "dayopen" => { self.market_moving_average(date, symbol, lookback, PriceField::DayOpen) } "open" => self.market_moving_average(date, symbol, lookback, PriceField::Open), "last" | "last_price" => { self.market_moving_average(date, symbol, lookback, PriceField::Last) } other => self.factor_moving_average(date, symbol, other, lookback), } } pub fn market_moving_average( &self, date: NaiveDate, symbol: &str, lookback: usize, field: PriceField, ) -> Option { self.market_series_by_symbol .get(symbol) .and_then(|series| series.moving_average(date, lookback, field)) } pub fn benchmark_moving_average(&self, date: NaiveDate, lookback: usize) -> Option { self.benchmark_series_cache.moving_average(date, lookback) } pub fn benchmark_open_moving_average(&self, date: NaiveDate, lookback: usize) -> Option { self.benchmark_series_cache .moving_average_for(date, lookback, PriceField::Open) } pub fn market_open_moving_average( &self, date: NaiveDate, symbol: &str, lookback: usize, ) -> Option { self.market_moving_average(date, symbol, lookback, PriceField::Open) } pub fn eligible_universe_on(&self, date: NaiveDate) -> &[EligibleUniverseSnapshot] { self.eligible_universe_by_date .get(&date) .map(Vec::as_slice) .unwrap_or(&[]) } pub fn require_market( &self, date: NaiveDate, symbol: &str, ) -> Result<&DailyMarketSnapshot, DataSetError> { self.market(date, symbol) .ok_or_else(|| DataSetError::MissingSnapshot { kind: "market", date, symbol: symbol.to_string(), }) } pub fn require_candidate( &self, date: NaiveDate, symbol: &str, ) -> Result<&CandidateEligibility, DataSetError> { self.candidate(date, symbol) .ok_or_else(|| DataSetError::MissingSnapshot { kind: "candidate", date, symbol: symbol.to_string(), }) } pub fn require_factor( &self, date: NaiveDate, symbol: &str, ) -> Result<&DailyFactorSnapshot, DataSetError> { self.factor(date, symbol) .ok_or_else(|| DataSetError::MissingSnapshot { kind: "factor", date, symbol: symbol.to_string(), }) } } fn read_instruments(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut instruments = Vec::new(); for row in rows { instruments.push(Instrument { symbol: row.get(0)?.to_string(), name: row.get(1)?.to_string(), board: row.get(2)?.to_string(), round_lot: row.parse_optional_u32(3).unwrap_or(100), listed_at: row.parse_optional_date(4)?, delisted_at: row.parse_optional_date(5)?, status: row .fields .get(6) .map(|value| value.trim()) .filter(|value| !value.is_empty()) .unwrap_or("active") .to_string(), }); } Ok(instruments) } fn read_market(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut snapshots = Vec::new(); for row in rows { let open = row.parse_f64(2)?; let close = row.parse_f64(5)?; let prev_close = row.parse_f64(6)?; let price_tick = row.parse_optional_f64(15).unwrap_or(0.01); let derived_upper_limit = round_price_to_tick(prev_close * 1.10, price_tick); let derived_lower_limit = round_price_to_tick(prev_close * 0.90, price_tick); snapshots.push(DailyMarketSnapshot { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), timestamp: row .fields .get(16) .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()), day_open: row.parse_optional_f64(11).unwrap_or(open), open, high: row.parse_f64(3)?, low: row.parse_f64(4)?, close, last_price: row.parse_optional_f64(12).unwrap_or(close), bid1: row.parse_optional_f64(13).unwrap_or(close), ask1: row.parse_optional_f64(14).unwrap_or(close), prev_close, volume: row.parse_u64(7)?, tick_volume: row.parse_optional_u64(17).unwrap_or_default(), bid1_volume: row.parse_optional_u64(18).unwrap_or_default(), ask1_volume: row.parse_optional_u64(19).unwrap_or_default(), trading_phase: row .fields .get(20) .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()), paused: row.parse_bool(8)?, upper_limit: row.parse_optional_f64(9).unwrap_or(derived_upper_limit), lower_limit: row.parse_optional_f64(10).unwrap_or(derived_lower_limit), price_tick, }); } Ok(snapshots) } fn read_factors(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut snapshots = Vec::new(); for row in rows { let (extra_factors, _) = parse_extra_factor_maps(&row); snapshots.push(DailyFactorSnapshot { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), market_cap_bn: row.parse_f64(2)?, free_float_cap_bn: row.parse_f64(3)?, pe_ttm: row.parse_f64(4)?, turnover_ratio: row.parse_optional_f64(5), effective_turnover_ratio: row.parse_optional_f64(6), extra_factors, }); } Ok(snapshots) } fn read_factor_texts(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut text_values = Vec::new(); for row in rows { let date = row.parse_date(0)?; let symbol = row.get(1)?.to_string(); let (_, extra_text_factors) = parse_extra_factor_maps(&row); for (field, value) in extra_text_factors { text_values.push(FactorTextValue { date, symbol: symbol.clone(), field, value, }); } } Ok(text_values) } fn parse_extra_factor_maps(row: &CsvRow) -> (BTreeMap, BTreeMap) { let mut numeric = BTreeMap::new(); let mut text = BTreeMap::new(); for value in row.fields.get(7).into_iter().chain(row.fields.get(8)) { merge_extra_factor_json(value, &mut numeric, &mut text); } (numeric, text) } fn merge_extra_factor_json( raw: &str, numeric: &mut BTreeMap, text: &mut BTreeMap, ) { let trimmed = raw.trim(); if trimmed.is_empty() { return; } let Ok(serde_json::Value::Object(map)) = serde_json::from_str::(trimmed) else { return; }; for (key, value) in map { let key = normalize_field(&key); if key.is_empty() { continue; } match value { serde_json::Value::Number(number) => { if let Some(value) = number.as_f64().filter(|value| value.is_finite()) { numeric.insert(key, value); } } serde_json::Value::String(value) => { text.insert(key, value); } serde_json::Value::Bool(value) => { numeric.insert(key.clone(), if value { 1.0 } else { 0.0 }); text.insert(key, value.to_string()); } _ => {} } } } fn normalized_aliases(values: &[String]) -> Vec { let mut aliases = Vec::new(); for value in values { let normalized = normalize_field(value); if !aliases.contains(&normalized) { aliases.push(normalized); } } aliases } fn shares_factor_aliases(share_type: &str) -> Vec { let field = normalize_field(share_type); let values = match field.as_str() { "" | "all" | "total" => vec![ "total_shares", "shares_total", "total_share", "total_share_capital", "capitalization", "shares", ], "float" | "free_float" | "circulating" | "circulation" => vec![ "free_float_shares", "float_shares", "circulating_shares", "circulation_shares", "float_a_shares", ], "a" | "a_share" | "a_shares" => vec!["a_shares", "shares_a", "a_share_capital"], other => { return normalized_aliases(&[ other.to_string(), format!("shares_{other}"), format!("{other}_shares"), ]); } }; normalized_aliases( &values .iter() .map(|value| value.to_string()) .collect::>(), ) } fn turnover_rate_factor_aliases(field: &str) -> Vec { let field = normalize_field(field); let values = match field.as_str() { "" | "all" | "rate" | "turnover" | "turnover_rate" | "turnover_ratio" => { vec!["turnover_rate", "turnover_ratio"] } "effective" | "effective_turnover" | "effective_turnover_rate" => { vec!["effective_turnover_rate", "effective_turnover_ratio"] } other => { return normalized_aliases(&[ other.to_string(), format!("turnover_rate_{other}"), format!("{other}_turnover_rate"), format!("turnover_ratio_{other}"), format!("{other}_turnover_ratio"), ]); } }; normalized_aliases( &values .iter() .map(|value| value.to_string()) .collect::>(), ) } fn stock_connect_factor_aliases(field: &str) -> Vec { let field = normalize_field(field); let values = match field.as_str() { "" | "all" | "connect" | "stock_connect" => { vec![ "stock_connect", "stock_connect_all", "connect_all", "north_bound", ] } "north" | "north_bound" | "northbound" => vec![ "stock_connect_north_bound", "stock_connect_northbound", "connect_north_bound", "north_bound", "northbound", ], "south" | "south_bound" | "southbound" => vec![ "stock_connect_south_bound", "stock_connect_southbound", "connect_south_bound", "south_bound", "southbound", ], other => { return normalized_aliases(&[ other.to_string(), format!("stock_connect_{other}"), format!("connect_{other}"), ]); } }; normalized_aliases( &values .iter() .map(|value| value.to_string()) .collect::>(), ) } fn prefixed_factor_aliases(prefix: &str, field: &str) -> Vec { let prefix = normalize_field(prefix); let field = normalize_field(field); let plural_prefix = format!("{prefix}s"); normalized_aliases(&[ format!("{prefix}_{field}"), format!("{plural_prefix}_{field}"), field.clone(), ]) } fn industry_factor_aliases(source: &str, level: usize) -> Vec { let source = normalize_field(source); normalized_aliases(&[ format!("industry_{source}_l{level}"), format!("industry_{source}_{level}"), format!("{source}_industry_l{level}"), format!("{source}_industry_{level}"), format!("industry_l{level}"), format!("industry_{level}"), "industry_code".to_string(), ]) } fn industry_name_factor_aliases(source: &str, level: usize) -> Vec { let source = normalize_field(source); normalized_aliases(&[ format!("industry_{source}_l{level}_name"), format!("industry_{source}_{level}_name"), format!("industry_{source}_name_l{level}"), format!("{source}_industry_l{level}_name"), format!("{source}_industry_{level}_name"), format!("{source}_industry_name_l{level}"), format!("industry_l{level}_name"), format!("industry_{level}_name"), "industry_name".to_string(), ]) } fn factor_numeric_value(snapshot: &DailyFactorSnapshot, field: &str) -> Option { match field { "market_cap" | "market_cap_bn" => Some(snapshot.market_cap_bn), "free_float_cap" | "free_float_market_cap" | "free_float_cap_bn" => { Some(snapshot.free_float_cap_bn) } "pe_ttm" => Some(snapshot.pe_ttm), "turnover_ratio" => snapshot.turnover_ratio, "effective_turnover_ratio" => snapshot.effective_turnover_ratio, other => snapshot.extra_factors.get(other).copied(), } } fn daily_market_numeric_value(snapshot: &DailyMarketSnapshot, field: &str) -> Option { match normalize_field(field).as_str() { "day_open" | "dayopen" => Some(snapshot.day_open), "open" => Some(snapshot.open), "high" => Some(snapshot.high), "low" => Some(snapshot.low), "close" | "price" => Some(snapshot.close), "last" | "last_price" => Some(snapshot.last_price), "prev_close" | "pre_close" => Some(snapshot.prev_close), "volume" => Some(snapshot.volume as f64), "tick_volume" => Some(snapshot.tick_volume as f64), "bid1" => Some(snapshot.bid1), "ask1" => Some(snapshot.ask1), "bid1_volume" => Some(snapshot.bid1_volume as f64), "ask1_volume" => Some(snapshot.ask1_volume as f64), "upper_limit" => Some(snapshot.upper_limit), "lower_limit" => Some(snapshot.lower_limit), "price_tick" => Some(snapshot.price_tick), _ => None, } } fn intraday_quote_numeric_value(snapshot: &IntradayExecutionQuote, field: &str) -> Option { match normalize_field(field).as_str() { "last" | "last_price" | "close" | "price" => Some(snapshot.last_price), "bid1" => Some(snapshot.bid1), "ask1" => Some(snapshot.ask1), "bid1_volume" => Some(snapshot.bid1_volume as f64), "ask1_volume" => Some(snapshot.ask1_volume as f64), "volume" | "volume_delta" => Some(snapshot.volume_delta as f64), "amount" | "amount_delta" | "total_turnover" => Some(snapshot.amount_delta), _ => None, } } fn intraday_quote_visible( quote: &IntradayExecutionQuote, date: NaiveDate, active_datetime: Option, include_now: bool, ) -> bool { if quote.date < date { return true; } if quote.date > date { return false; } let Some(active_datetime) = active_datetime.filter(|value| value.date() == date) else { return include_now; }; if include_now { quote.timestamp <= active_datetime } else { quote.timestamp < active_datetime } } fn daily_market_price_bar(snapshot: &DailyMarketSnapshot) -> PriceBar { PriceBar { date: snapshot.date, timestamp: snapshot.timestamp.clone(), symbol: snapshot.symbol.clone(), frequency: "1d".to_string(), open: snapshot.open, high: snapshot.high, low: snapshot.low, close: snapshot.close, last_price: snapshot.last_price, volume: snapshot.volume, amount: 0.0, bid1: snapshot.bid1, ask1: snapshot.ask1, bid1_volume: snapshot.bid1_volume, ask1_volume: snapshot.ask1_volume, } } fn intraday_quote_price_bar(snapshot: &IntradayExecutionQuote) -> PriceBar { PriceBar { date: snapshot.date, timestamp: Some(snapshot.timestamp.format("%Y-%m-%d %H:%M:%S").to_string()), symbol: snapshot.symbol.clone(), frequency: "tick".to_string(), open: snapshot.last_price, high: snapshot.last_price, low: snapshot.last_price, close: snapshot.last_price, last_price: snapshot.last_price, volume: snapshot.volume_delta, amount: snapshot.amount_delta, bid1: snapshot.bid1, ask1: snapshot.ask1, bid1_volume: snapshot.bid1_volume, ask1_volume: snapshot.ask1_volume, } } fn normalize_field(field: &str) -> String { field .trim() .trim_matches('"') .trim_matches('\'') .to_ascii_lowercase() } fn normalize_history_frequency(frequency: &str) -> Option { let normalized = normalize_field(frequency); match normalized.as_str() { "1d" | "d" | "day" | "daily" => Some("1d".to_string()), "1m" | "m" | "minute" | "min" => Some("1m".to_string()), "tick" | "t" => Some("tick".to_string()), _ => None, } } fn take_last(mut rows: Vec, count: usize) -> Vec { if rows.len() <= count { return rows; } rows.split_off(rows.len() - count) } fn read_candidates(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut snapshots = Vec::new(); for row in rows { snapshots.push(CandidateEligibility { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), is_st: row.parse_bool(2)?, is_new_listing: row.parse_bool(3)?, is_paused: row.parse_bool(4)?, allow_buy: row.parse_bool(5)?, allow_sell: row.parse_bool(6)?, is_kcb: row.parse_optional_bool(7).unwrap_or(false), is_one_yuan: row.parse_optional_bool(8).unwrap_or(false), }); } Ok(snapshots) } fn read_benchmarks(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut snapshots = Vec::new(); for row in rows { snapshots.push(BenchmarkSnapshot { date: row.parse_date(0)?, benchmark: row.get(1)?.to_string(), open: row.parse_f64(2)?, close: row.parse_f64(3)?, prev_close: row.parse_f64(4)?, volume: row.parse_u64(5)?, }); } Ok(snapshots) } fn read_corporate_actions(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut snapshots = Vec::new(); for row in rows { let has_payable_date = row.fields.len() >= 10; let payable_date = if has_payable_date { row.parse_optional_date(2)? } else { None }; let offset = if has_payable_date { 1 } else { 0 }; snapshots.push(CorporateAction { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), payable_date, share_cash: row.parse_optional_f64(2 + offset).unwrap_or(0.0), share_bonus: row.parse_optional_f64(3 + offset).unwrap_or(0.0), share_gift: row.parse_optional_f64(4 + offset).unwrap_or(0.0), issue_quantity: row.parse_optional_f64(5 + offset).unwrap_or(0.0), issue_price: row.parse_optional_f64(6 + offset).unwrap_or(0.0), reform: row.parse_optional_bool(7 + offset).unwrap_or(false), adjust_factor: row.parse_optional_f64(8 + offset), successor_symbol: row .fields .get(9 + offset) .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()), successor_ratio: row.parse_optional_f64(10 + offset), successor_cash: row.parse_optional_f64(11 + offset), }); } Ok(snapshots) } fn read_execution_quotes(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut quotes = Vec::new(); for row in rows { quotes.push(IntradayExecutionQuote { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), timestamp: row.parse_datetime(2)?, last_price: row.parse_optional_f64(3).unwrap_or_default(), bid1: row.parse_optional_f64(4).unwrap_or_default(), ask1: row.parse_optional_f64(5).unwrap_or_default(), bid1_volume: row.parse_optional_u64(6).unwrap_or_default(), ask1_volume: row.parse_optional_u64(7).unwrap_or_default(), volume_delta: row.parse_optional_u64(8).unwrap_or_default(), amount_delta: row.parse_optional_f64(9).unwrap_or_default(), trading_phase: row .fields .get(10) .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()), }); } Ok(quotes) } fn read_order_book_depth(path: &Path) -> Result, DataSetError> { let rows = read_rows(path)?; let mut levels = Vec::new(); for row in rows { levels.push(IntradayOrderBookDepthLevel { date: row.parse_date(0)?, symbol: row.get(1)?.to_string(), timestamp: row.parse_datetime(2)?, level: row .parse_optional_u32(3) .unwrap_or(1) .clamp(1, u8::MAX as u32) as u8, bid_price: row.parse_optional_f64(4).unwrap_or_default(), bid_volume: row.parse_optional_u64(5).unwrap_or_default(), ask_price: row.parse_optional_f64(6).unwrap_or_default(), ask_volume: row.parse_optional_u64(7).unwrap_or_default(), }); } Ok(levels) } fn read_futures_trading_parameters( path: &Path, ) -> Result, DataSetError> { let rows = read_rows(path)?; let mut params = Vec::new(); for row in rows { let first = row.get(0)?.trim(); let (effective_date, symbol_index) = if NaiveDate::parse_from_str(first, "%Y-%m-%d").is_ok() { (row.parse_optional_date(0)?, 1) } else { (None, 0) }; params.push(FuturesTradingParameter { effective_date, symbol: row.get(symbol_index)?.to_string(), contract_multiplier: row.parse_optional_f64(symbol_index + 1).unwrap_or(1.0), long_margin_rate: row.parse_optional_f64(symbol_index + 2).unwrap_or(0.0), short_margin_rate: row.parse_optional_f64(symbol_index + 3).unwrap_or(0.0), commission_type: row .fields .get(symbol_index + 4) .map(|value| FuturesCommissionType::parse(value)) .unwrap_or(FuturesCommissionType::ByMoney), open_commission_ratio: row.parse_optional_f64(symbol_index + 5).unwrap_or(0.0), close_commission_ratio: row.parse_optional_f64(symbol_index + 6).unwrap_or(0.0), close_today_commission_ratio: row .parse_optional_f64(symbol_index + 7) .unwrap_or_else(|| row.parse_optional_f64(symbol_index + 6).unwrap_or(0.0)), price_tick: row.parse_optional_f64(symbol_index + 8).unwrap_or(1.0), }); } Ok(params) } struct CsvRow { path: String, line: usize, fields: Vec, } impl CsvRow { fn get(&self, index: usize) -> Result<&str, DataSetError> { self.fields .get(index) .map(String::as_str) .ok_or_else(|| DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("missing column {index}"), }) } fn parse_date(&self, index: usize) -> Result { NaiveDate::parse_from_str(self.get(index)?, "%Y-%m-%d").map_err(|err| { DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid date: {err}"), } }) } fn parse_f64(&self, index: usize) -> Result { self.get(index)? .parse::() .map_err(|err| DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid f64: {err}"), }) } fn parse_u64(&self, index: usize) -> Result { self.get(index)? .parse::() .map_err(|err| DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid u64: {err}"), }) } fn parse_optional_f64(&self, index: usize) -> Option { self.fields.get(index).and_then(|value| { let trimmed = value.trim(); if trimmed.is_empty() { None } else { trimmed.parse::().ok() } }) } fn parse_bool(&self, index: usize) -> Result { self.get(index)? .parse::() .map_err(|err| DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid bool: {err}"), }) } fn parse_optional_bool(&self, index: usize) -> Option { self.fields .get(index) .and_then(|value| value.parse::().ok()) } fn parse_optional_date(&self, index: usize) -> Result, DataSetError> { let Some(value) = self.fields.get(index) else { return Ok(None); }; let trimmed = value.trim(); if trimmed.is_empty() { return Ok(None); } NaiveDate::parse_from_str(trimmed, "%Y-%m-%d") .map(Some) .map_err(|err| DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid optional date: {err}"), }) } fn parse_datetime(&self, index: usize) -> Result { NaiveDateTime::parse_from_str(self.get(index)?, "%Y-%m-%d %H:%M:%S").map_err(|err| { DataSetError::InvalidRow { path: self.path.clone(), line: self.line, message: format!("invalid datetime: {err}"), } }) } fn parse_optional_u32(&self, index: usize) -> Option { self.fields.get(index).and_then(|value| { let trimmed = value.trim(); if trimmed.is_empty() { None } else { trimmed.parse::().ok() } }) } fn parse_optional_u64(&self, index: usize) -> Option { self.fields.get(index).and_then(|value| { let trimmed = value.trim(); if trimmed.is_empty() { None } else { trimmed.parse::().ok() } }) } } fn read_partitioned_dir(dir: &Path, mut loader: F) -> Result, DataSetError> where F: FnMut(&Path) -> Result, DataSetError>, { let mut rows = Vec::new(); let mut stack = vec![dir.to_path_buf()]; while let Some(current_dir) = stack.pop() { let mut entries = fs::read_dir(¤t_dir) .map_err(|source| DataSetError::Io { path: current_dir.display().to_string(), source, })? .collect::, _>>() .map_err(|source| DataSetError::Io { path: current_dir.display().to_string(), source, })?; entries.sort_by_key(|entry| entry.path()); for entry in entries.into_iter().rev() { let path = entry.path(); if path.is_dir() { stack.push(path); continue; } if path.extension().and_then(|x| x.to_str()) != Some("csv") { continue; } rows.extend(loader(&path)?); } } Ok(rows) } fn read_rows(path: &Path) -> Result, DataSetError> { let content = fs::read_to_string(path).map_err(|source| DataSetError::Io { path: path.display().to_string(), source, })?; let mut rows = Vec::new(); for (line_idx, line) in content.lines().enumerate() { let line_no = line_idx + 1; if line_no == 1 || line.trim().is_empty() { continue; } rows.push(CsvRow { path: path.display().to_string(), line: line_no, fields: split_csv_line(line), }); } Ok(rows) } fn split_csv_line(line: &str) -> Vec { let mut fields = Vec::new(); let mut field = String::new(); let mut chars = line.trim_start_matches('\u{feff}').chars().peekable(); let mut in_quotes = false; while let Some(ch) = chars.next() { match ch { '"' if in_quotes && chars.peek() == Some(&'"') => { field.push('"'); chars.next(); } '"' => { in_quotes = !in_quotes; } ',' if !in_quotes => { fields.push(field.trim().to_string()); field.clear(); } _ => field.push(ch), } } fields.push(field.trim().to_string()); fields } fn group_by_date(rows: Vec, mut date_of: F) -> BTreeMap> where F: FnMut(&T) -> NaiveDate, { let mut grouped = BTreeMap::>::new(); for row in rows { grouped.entry(date_of(&row)).or_default().push(row); } grouped } fn collect_benchmark_code(benchmarks: &[BenchmarkSnapshot]) -> Result { let mut codes = benchmarks .iter() .map(|row| row.benchmark.clone()) .collect::>(); codes.sort_unstable(); codes.dedup(); if codes.len() == 1 { Ok(codes.remove(0)) } else { Err(DataSetError::MultipleBenchmarks) } } fn round_price_to_tick(value: f64, tick: f64) -> f64 { let effective_tick = if tick.is_finite() && tick > 0.0 { tick } else { 0.01 }; ((value / effective_tick).round() * effective_tick * 10000.0).round() / 10000.0 } fn prefix_sums(values: &[f64]) -> Vec { let mut prefix = Vec::with_capacity(values.len() + 1); prefix.push(0.0); for value in values { let next = prefix.last().copied().unwrap_or_default() + *value; prefix.push(next); } prefix } mod optional_date_format { use chrono::NaiveDate; use serde::{self, Deserialize, Deserializer, Serializer}; const FORMAT: &str = "%Y-%m-%d"; pub fn serialize(date: &Option, serializer: S) -> Result where S: Serializer, { match date { Some(date) => serializer.serialize_some(&date.format(FORMAT).to_string()), None => serializer.serialize_none(), } } pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, { let text = Option::::deserialize(deserializer)?; match text .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) { Some(text) => NaiveDate::parse_from_str(text, FORMAT) .map(Some) .map_err(serde::de::Error::custom), None => Ok(None), } } } fn build_market_series( market_by_date: &BTreeMap>, ) -> HashMap { let mut grouped = HashMap::>::new(); for rows in market_by_date.values() { for row in rows { grouped .entry(row.symbol.clone()) .or_default() .push(row.clone()); } } grouped .into_iter() .map(|(symbol, rows)| (symbol, SymbolPriceSeries::new(&rows))) .collect() } fn build_futures_params_index( rows: Vec, ) -> HashMap> { let mut grouped = HashMap::>::new(); for row in rows { grouped.entry(row.symbol.clone()).or_default().push(row); } for rows in grouped.values_mut() { rows.sort_by_key(|row| row.effective_date); } grouped } fn build_execution_quote_index( execution_quotes: Vec, ) -> HashMap<(NaiveDate, String), Vec> { let mut grouped = HashMap::<(NaiveDate, String), Vec>::new(); for quote in execution_quotes { grouped .entry((quote.date, quote.symbol.clone())) .or_default() .push(quote); } for quotes in grouped.values_mut() { quotes.sort_by_key(|quote| quote.timestamp); } grouped } fn build_order_book_depth_index( order_book_depth: Vec, ) -> HashMap<(NaiveDate, String), Vec> { let mut grouped = HashMap::<(NaiveDate, String), Vec>::new(); for level in order_book_depth { grouped .entry((level.date, level.symbol.clone())) .or_default() .push(level); } for levels in grouped.values_mut() { levels.sort_by(|left, right| { left.timestamp .cmp(&right.timestamp) .then(left.level.cmp(&right.level)) }); } grouped } fn build_eligible_universe( factor_by_date: &BTreeMap>, candidate_index: &HashMap<(NaiveDate, String), CandidateEligibility>, market_index: &HashMap<(NaiveDate, String), DailyMarketSnapshot>, ) -> BTreeMap> { let mut per_date = BTreeMap::>::new(); for (date, factors) in factor_by_date { let mut rows = Vec::new(); for factor in factors { if factor.market_cap_bn <= 0.0 || !factor.market_cap_bn.is_finite() { continue; } let key = (*date, factor.symbol.clone()); let Some(candidate) = candidate_index.get(&key) else { continue; }; let Some(market) = market_index.get(&key) else { continue; }; if !candidate.eligible_for_selection() || market.paused { continue; } rows.push(EligibleUniverseSnapshot { symbol: factor.symbol.clone(), market_cap_bn: factor.market_cap_bn, free_float_cap_bn: factor.free_float_cap_bn, }); } rows.sort_by(|left, right| { left.market_cap_bn .partial_cmp(&right.market_cap_bn) .unwrap_or(std::cmp::Ordering::Equal) .then_with(|| left.symbol.cmp(&right.symbol)) }); per_date.insert(*date, rows); } per_date } #[cfg(test)] mod tests { use super::*; use std::time::{SystemTime, UNIX_EPOCH}; fn temp_csv_path(name: &str) -> std::path::PathBuf { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_nanos(); std::env::temp_dir().join(format!("{}_{}_{}.csv", name, std::process::id(), nanos)) } fn market_row(date: &str, prev_close: f64, volume: u64) -> DailyMarketSnapshot { DailyMarketSnapshot { date: NaiveDate::parse_from_str(date, "%Y-%m-%d").unwrap(), symbol: "000001.SZ".to_string(), timestamp: None, day_open: prev_close, open: prev_close, high: prev_close, low: prev_close, close: prev_close, last_price: prev_close, bid1: prev_close, ask1: prev_close, prev_close, volume, tick_volume: 0, bid1_volume: 0, ask1_volume: 0, trading_phase: None, paused: false, upper_limit: prev_close * 1.1, lower_limit: prev_close * 0.9, price_tick: 0.01, } } #[test] fn decision_volume_average_uses_previous_completed_days_only() { let series = SymbolPriceSeries::new(&[ market_row("2025-01-02", 10.0, 100), market_row("2025-01-03", 11.0, 200), market_row("2025-01-06", 12.0, 10_000), ]); assert_eq!( series.decision_close_moving_average( NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap(), 2 ), Some(11.5) ); assert_eq!( series.decision_volume_moving_average( NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap(), 2 ), Some(150.0) ); assert_eq!( series.decision_volume_moving_average( NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap(), 3 ), None ); } #[test] fn reads_mixed_numeric_and_text_extra_factors_from_quoted_csv_json() { let path = temp_csv_path("mixed_factor_maps"); fs::write( &path, concat!( "date,symbol,market_cap_bn,free_float_cap_bn,pe_ttm,turnover_ratio,effective_turnover_ratio,extra_factors\n", "2025-01-02,000001.SZ,12,10,8,1,1,\"{\"\"custom_alpha\"\":7,\"\"industry_name\"\":\"\"electronics,hardware\"\",\"\"flag\"\":true}\"\n" ), ) .unwrap(); let factors = read_factors(&path).unwrap(); let text_factors = read_factor_texts(&path).unwrap(); fs::remove_file(&path).ok(); assert_eq!(factors.len(), 1); assert_eq!( factors[0].extra_factors.get("custom_alpha").copied(), Some(7.0) ); assert_eq!(factors[0].extra_factors.get("flag").copied(), Some(1.0)); assert_eq!(text_factors.len(), 2); assert!( text_factors .iter() .any(|row| row.field == "industry_name" && row.value == "electronics,hardware") ); assert!( text_factors .iter() .any(|row| row.field == "flag" && row.value == "true") ); } }