Close RQAlpha P0-P2 parity gaps

This commit is contained in:
boris
2026-04-23 21:07:59 -07:00
parent 6be87c9982
commit beb9c7a7ae
8 changed files with 1830 additions and 86 deletions

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::calendar::TradingCalendar;
use crate::futures::{FuturesCommissionType, FuturesTradingParameter};
use crate::instrument::Instrument;
mod date_format {
@@ -345,6 +346,51 @@ pub struct PriceBar {
pub ask1_volume: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DividendRecord {
#[serde(with = "date_format")]
pub ex_dividend_date: NaiveDate,
#[serde(with = "date_format")]
pub payable_date: NaiveDate,
pub symbol: String,
pub dividend_cash_before_tax: f64,
pub round_lot: u32,
}
#[derive(Debug, Clone, Serialize)]
pub struct SplitRecord {
#[serde(with = "date_format")]
pub ex_dividend_date: NaiveDate,
pub symbol: String,
pub split_ratio: f64,
}
#[derive(Debug, Clone, Serialize)]
pub struct FactorValue {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
pub field: String,
pub value: f64,
}
#[derive(Debug, Clone, Serialize)]
pub struct SecuritiesMarginRecord {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
pub field: String,
pub value: f64,
}
#[derive(Debug, Clone, Serialize)]
pub struct YieldCurvePoint {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub tenor: String,
pub value: f64,
}
#[derive(Debug, Clone)]
pub struct EligibleUniverseSnapshot {
pub symbol: String,
@@ -620,6 +666,7 @@ pub struct DataSet {
benchmark_series_cache: BenchmarkPriceSeries,
eligible_universe_by_date: BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>>,
benchmark_code: String,
futures_params_by_symbol: HashMap<String, Vec<FuturesTradingParameter>>,
}
impl DataSet {
@@ -641,7 +688,13 @@ impl DataSet {
} else {
Vec::new()
};
Self::from_components_with_actions_and_quotes(
let futures_params_path = path.join("futures_trading_parameters.csv");
let futures_params = if futures_params_path.exists() {
read_futures_trading_parameters(&futures_params_path)?
} else {
Vec::new()
};
Self::from_components_with_actions_quotes_and_futures(
instruments,
market,
factors,
@@ -649,6 +702,7 @@ impl DataSet {
benchmarks,
corporate_actions,
execution_quotes,
futures_params,
)
}
@@ -670,7 +724,13 @@ impl DataSet {
} else {
Vec::new()
};
Self::from_components_with_actions_and_quotes(
let futures_params_dir = path.join("futures_trading_parameters");
let futures_params = if futures_params_dir.exists() {
read_partitioned_dir(&futures_params_dir, read_futures_trading_parameters)?
} else {
Vec::new()
};
Self::from_components_with_actions_quotes_and_futures(
instruments,
market,
factors,
@@ -678,6 +738,7 @@ impl DataSet {
benchmarks,
corporate_actions,
execution_quotes,
futures_params,
)
}
@@ -726,6 +787,28 @@ impl DataSet {
benchmarks: Vec<BenchmarkSnapshot>,
corporate_actions: Vec<CorporateAction>,
execution_quotes: Vec<IntradayExecutionQuote>,
) -> Result<Self, DataSetError> {
Self::from_components_with_actions_quotes_and_futures(
instruments,
market,
factors,
candidates,
benchmarks,
corporate_actions,
execution_quotes,
Vec::new(),
)
}
pub fn from_components_with_actions_quotes_and_futures(
instruments: Vec<Instrument>,
market: Vec<DailyMarketSnapshot>,
factors: Vec<DailyFactorSnapshot>,
candidates: Vec<CandidateEligibility>,
benchmarks: Vec<BenchmarkSnapshot>,
corporate_actions: Vec<CorporateAction>,
execution_quotes: Vec<IntradayExecutionQuote>,
futures_params: Vec<FuturesTradingParameter>,
) -> Result<Self, DataSetError> {
let benchmark_code = collect_benchmark_code(&benchmarks)?;
let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect());
@@ -764,6 +847,7 @@ impl DataSet {
BenchmarkPriceSeries::new(&benchmark_by_date.values().cloned().collect::<Vec<_>>());
let eligible_universe_by_date =
build_eligible_universe(&factor_by_date, &candidate_index, &market_index);
let futures_params_by_symbol = build_futures_params_index(futures_params);
Ok(Self {
instruments,
@@ -781,6 +865,7 @@ impl DataSet {
benchmark_series_cache,
eligible_universe_by_date,
benchmark_code,
futures_params_by_symbol,
})
}
@@ -870,6 +955,38 @@ impl DataSet {
self.benchmark_by_date.values().cloned().collect()
}
pub fn futures_trading_parameter(
&self,
date: NaiveDate,
symbol: &str,
) -> Option<&FuturesTradingParameter> {
self.futures_params_by_symbol.get(symbol).and_then(|rows| {
rows.iter()
.rev()
.find(|row| row.effective_date.is_none_or(|effective| effective <= date))
})
}
pub fn futures_settlement_price(
&self,
date: NaiveDate,
symbol: &str,
mode: &str,
) -> Option<f64> {
let snapshot = self.market(date, symbol)?;
match normalize_field(mode).as_str() {
"settlement" | "settle" => self
.factor_numeric_value(date, symbol, "settlement")
.or_else(|| self.factor_numeric_value(date, symbol, "settle"))
.or(Some(snapshot.close)),
"prev_settlement" | "pre_settlement" => self
.factor_numeric_value(date, symbol, "prev_settlement")
.or_else(|| self.factor_numeric_value(date, symbol, "pre_settlement"))
.or(Some(snapshot.prev_close)),
_ => Some(snapshot.close),
}
}
pub fn history_bars(
&self,
date: NaiveDate,
@@ -994,6 +1111,218 @@ impl DataSet {
})
}
pub fn get_dividend(
&self,
symbol: &str,
start: NaiveDate,
end: NaiveDate,
) -> Vec<DividendRecord> {
let mut rows = self
.corporate_actions_by_date
.range(start..=end)
.flat_map(|(_, actions)| actions.iter())
.filter(|action| action.symbol == symbol && action.share_cash.abs() > f64::EPSILON)
.map(|action| DividendRecord {
ex_dividend_date: action.date,
payable_date: action.payable_date.unwrap_or(action.date),
symbol: action.symbol.clone(),
dividend_cash_before_tax: action.share_cash,
round_lot: self
.instrument(symbol)
.map(Instrument::effective_round_lot)
.unwrap_or(100),
})
.collect::<Vec<_>>();
rows.sort_by_key(|row| row.ex_dividend_date);
rows
}
pub fn get_split(&self, symbol: &str, start: NaiveDate, end: NaiveDate) -> Vec<SplitRecord> {
let mut rows = self
.corporate_actions_by_date
.range(start..=end)
.flat_map(|(_, actions)| actions.iter())
.filter(|action| action.symbol == symbol && (action.split_ratio() - 1.0).abs() > 1e-12)
.map(|action| SplitRecord {
ex_dividend_date: action.date,
symbol: action.symbol.clone(),
split_ratio: action.split_ratio(),
})
.collect::<Vec<_>>();
rows.sort_by_key(|row| row.ex_dividend_date);
rows
}
pub fn get_factor(
&self,
symbol: &str,
start: NaiveDate,
end: NaiveDate,
field: &str,
) -> Vec<FactorValue> {
if start > end {
return Vec::new();
}
let field = normalize_field(field);
let mut rows = self
.factor_by_date
.range(start..=end)
.flat_map(|(_, snapshots)| snapshots.iter())
.filter(|snapshot| snapshot.symbol == symbol)
.filter_map(|snapshot| {
factor_numeric_value(snapshot, &field).map(|value| FactorValue {
date: snapshot.date,
symbol: snapshot.symbol.clone(),
field: field.clone(),
value,
})
})
.collect::<Vec<_>>();
rows.sort_by_key(|row| row.date);
rows
}
pub fn get_yield_curve(
&self,
start: NaiveDate,
end: NaiveDate,
tenor: Option<&str>,
) -> Vec<YieldCurvePoint> {
if start > end {
return Vec::new();
}
let tenor_filter = tenor.map(normalize_field);
let mut rows = Vec::new();
for (date, snapshots) in self.factor_by_date.range(start..=end) {
for snapshot in snapshots {
for (field, value) in &snapshot.extra_factors {
let normalized = normalize_field(field);
let Some(raw_tenor) = normalized
.strip_prefix("yield_curve_")
.or_else(|| normalized.strip_prefix("yc_"))
else {
continue;
};
if tenor_filter
.as_ref()
.is_some_and(|expected| expected != raw_tenor)
{
continue;
}
rows.push(YieldCurvePoint {
date: *date,
tenor: raw_tenor.to_string(),
value: *value,
});
}
}
}
rows.sort_by(|left, right| {
left.date
.cmp(&right.date)
.then(left.tenor.cmp(&right.tenor))
});
rows
}
pub fn get_margin_stocks(&self, date: NaiveDate, margin_type: &str) -> Vec<String> {
let field = match normalize_field(margin_type).as_str() {
"stock" => "margin_stock",
"cash" => "margin_cash",
_ => "margin_all",
};
let mut symbols = self
.factor_by_date
.get(&date)
.map(|rows| {
rows.iter()
.filter(|row| {
row.extra_factors
.get(field)
.or_else(|| row.extra_factors.get("margin_all"))
.is_some_and(|value| *value > 0.0)
})
.map(|row| row.symbol.clone())
.collect::<Vec<_>>()
})
.unwrap_or_default();
if symbols.is_empty() {
symbols = self
.active_instruments(
date,
&self
.instruments
.keys()
.map(String::as_str)
.collect::<Vec<_>>(),
)
.into_iter()
.filter(|instrument| !instrument.board.eq_ignore_ascii_case("FUTURE"))
.map(|instrument| instrument.symbol.clone())
.collect();
}
symbols.sort();
symbols.dedup();
symbols
}
pub fn get_securities_margin(
&self,
symbol: &str,
start: NaiveDate,
end: NaiveDate,
field: &str,
) -> Vec<SecuritiesMarginRecord> {
self.get_factor(symbol, start, end, field)
.into_iter()
.map(|row| SecuritiesMarginRecord {
date: row.date,
symbol: row.symbol,
field: row.field,
value: row.value,
})
.collect()
}
pub fn get_dominant_future(&self, underlying_symbol: &str, date: NaiveDate) -> Option<String> {
let underlying = normalize_field(underlying_symbol);
let mut candidates = self
.futures_params_by_symbol
.keys()
.filter(|symbol| normalize_field(symbol).starts_with(&underlying))
.filter(|symbol| {
self.futures_trading_parameter(date, symbol.as_str())
.is_some()
})
.cloned()
.collect::<Vec<_>>();
if candidates.is_empty() {
candidates = self
.instruments
.values()
.filter(|instrument| instrument.board.eq_ignore_ascii_case("FUTURE"))
.filter(|instrument| normalize_field(&instrument.symbol).starts_with(&underlying))
.filter(|instrument| instrument.is_active_on(date))
.map(|instrument| instrument.symbol.clone())
.collect();
}
candidates.sort();
candidates.into_iter().next()
}
pub fn get_dominant_future_price(
&self,
underlying_symbol: &str,
start: NaiveDate,
end: NaiveDate,
frequency: &str,
) -> Vec<PriceBar> {
let Some(symbol) = self.get_dominant_future(underlying_symbol, end) else {
return Vec::new();
};
self.get_price(&symbol, start, end, frequency)
}
pub fn get_price(
&self,
symbol: &str,
@@ -1649,6 +1978,41 @@ fn read_execution_quotes(path: &Path) -> Result<Vec<IntradayExecutionQuote>, Dat
Ok(quotes)
}
fn read_futures_trading_parameters(
path: &Path,
) -> Result<Vec<FuturesTradingParameter>, DataSetError> {
let rows = read_rows(path)?;
let mut params = Vec::new();
for row in rows {
let first = row.get(0)?.trim();
let (effective_date, symbol_index) = if NaiveDate::parse_from_str(first, "%Y-%m-%d").is_ok()
{
(row.parse_optional_date(0)?, 1)
} else {
(None, 0)
};
params.push(FuturesTradingParameter {
effective_date,
symbol: row.get(symbol_index)?.to_string(),
contract_multiplier: row.parse_optional_f64(symbol_index + 1).unwrap_or(1.0),
long_margin_rate: row.parse_optional_f64(symbol_index + 2).unwrap_or(0.0),
short_margin_rate: row.parse_optional_f64(symbol_index + 3).unwrap_or(0.0),
commission_type: row
.fields
.get(symbol_index + 4)
.map(|value| FuturesCommissionType::parse(value))
.unwrap_or(FuturesCommissionType::ByMoney),
open_commission_ratio: row.parse_optional_f64(symbol_index + 5).unwrap_or(0.0),
close_commission_ratio: row.parse_optional_f64(symbol_index + 6).unwrap_or(0.0),
close_today_commission_ratio: row
.parse_optional_f64(symbol_index + 7)
.unwrap_or_else(|| row.parse_optional_f64(symbol_index + 6).unwrap_or(0.0)),
price_tick: row.parse_optional_f64(symbol_index + 8).unwrap_or(1.0),
});
}
Ok(params)
}
struct CsvRow {
path: String,
line: usize,
@@ -1934,6 +2298,19 @@ fn build_market_series(
.collect()
}
fn build_futures_params_index(
rows: Vec<FuturesTradingParameter>,
) -> HashMap<String, Vec<FuturesTradingParameter>> {
let mut grouped = HashMap::<String, Vec<FuturesTradingParameter>>::new();
for row in rows {
grouped.entry(row.symbol.clone()).or_default().push(row);
}
for rows in grouped.values_mut() {
rows.sort_by_key(|row| row.effective_date);
}
grouped
}
fn build_execution_quote_index(
execution_quotes: Vec<IntradayExecutionQuote>,
) -> HashMap<(NaiveDate, String), Vec<IntradayExecutionQuote>> {