diff --git a/.gitignore b/.gitignore index 53a6493..3806d7e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /data/*.parquet /results/*.json Cargo.lock +.grepai/ diff --git a/PROGRESS.md b/PROGRESS.md index 400f20c..a03efec 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -310,3 +310,502 @@ time_decay = 1 - 1 / (hours_remaining / 24 + 1) ``` ranges from 0 (about to close) to ~1 (distant expiry). + +backtest run #2 +--- + +**date:** 2026-01-22 +**period:** 2026-01-21 04:00 to 2026-01-21 06:00 (2 hours) +**initial capital:** $10,000 +**interval:** 1 hour + +### results summary + +| metric | strategy | random baseline | delta | +|--------|----------|-----------------|-------| +| total return | +$502.81 (+5.03%) | $0.00 (0.00%) | +$502.81 | +| sharpe ratio | 68.845 | 0.000 | +68.845 | +| max drawdown | 0.00% | 0.00% | +0.00% | +| win rate | 100.0% | 0.0% | +100.0% | +| total trades | 1 (closed) | 0 | +1 | +| positions | 9 (open) | 0 | +9 | + +*note: short duration used to validate regime detection logic.* + +### architectural updates + +1. **momentum acceleration scorer** + - implemented second-order momentum (acceleration) + - detects market turning points using fast/slow momentum divergence + - derived from "momentum turning points" academic research + +2. **regime adaptive scorer** + - dynamic weight allocation based on market state + - **bull:** favors trend following (momentum: 0.4) + - **bear:** favors mean reversion (mean_reversion: 0.4) + - **transition:** defensive positioning (time_decay: 0.3, volume: 0.2) + - replaced static `CategoryWeightedScorer` + +3. **data handling** + - identified data gap before jan 21 03:00 + - adjusted backtest start time to align with available trade data + +backtest run #3 (iteration 1) +--- + +**date:** 2026-01-22 +**period:** 2026-01-20 00:00 to 2026-01-22 00:00 (2 days) +**initial capital:** $10,000 +**interval:** 1 hour + +### results summary + +| metric | value | +|--------|-------| +| total return | +$412.85 (+4.13%) | +| sharpe ratio | 4.579 | +| max drawdown | 0.25% | +| win rate | 83.3% | +| total trades | 6 (closed) | +| positions | 49 (open) | +| avg trade pnl | $8.81 | +| avg hold time | 4.7 hours | + +### comparison with previous runs + +| metric | run #1 (2 days) | run #2 (2 hrs) | run #3 (2 days) | trend | +|--------|-----------------|----------------|-----------------|-------| +| total return | +9.94% | +5.03% | +4.13% | ↓ | +| sharpe ratio | 5.448 | 68.845* | 4.579 | ↓ | +| max drawdown | 1.26% | 0.00% | 0.25% | ↓ better | +| win rate | 58.7% | 100.0% | 83.3% | ↑ | + +*run #2 sharpe inflated due to very short period + +### architectural updates + +1. **kalman price filter** + - implements recursive kalman filtering for price estimation + - outputs: filtered_price, innovation (deviation from prediction), uncertainty + - filters noisy price observations to get better "true price" estimates + - adapts to changing volatility automatically via adaptive gain + +2. **VPIN scorer (volume-synchronized probability of informed trading)** + - based on easley, lopez de prado, and o'hara (2012) research + - measures flow toxicity using volume-bucketed order imbalance + - outputs: vpin, flow_toxicity, informed_direction + - high VPIN indicates presence of informed traders + +3. **adaptive confidence scorer** + - replaces RegimeAdaptiveScorer with confidence-weighted approach + - uses kalman uncertainty, VPIN, and entropy to calculate confidence + - scales all feature weights by confidence factor + - dynamic weight profiles based on: + - high VPIN + informed direction -> follow smart money (order_flow: 0.4) + - turning point detected -> defensive (time_decay: 0.25) + - bull regime -> trend following (momentum: 0.35) + - bear regime -> mean reversion (mean_reversion: 0.35) + - neutral -> balanced weights + +### analysis + +**why return decreased from run #1:** +1. the new AdaptiveConfidenceScorer is more conservative, scaling down weights when confidence is low +2. fewer positions taken overall (6 closed vs 46 in run #1) +3. tighter risk management - max drawdown improved from 1.26% to 0.25% + +**positive improvements:** +- win rate increased from 58.7% to 83.3% +- avg trade pnl increased from $4.59 to $8.81 +- max drawdown decreased significantly (better risk-adjusted returns) +- sharpe ratio still positive at 4.579 + +**next iteration considerations:** +1. the confidence scaling may be too aggressive - consider relaxing the uncertainty multiplier +2. need to tune the VPIN thresholds for detecting informed trading +3. kalman filter process_noise and measurement_noise parameters could be optimized +4. should add cross-validation with different market regimes + +### scorer pipeline (run #3) + +``` +MomentumScorer (6h) -> momentum +MultiTimeframeMomentumScorer (1h,4h,12h,24h) -> mtf_momentum, mtf_divergence, mtf_alignment +MeanReversionScorer (24h) -> mean_reversion +BollingerMeanReversionScorer (24h, 2.0 std) -> bollinger_reversion, bollinger_position +VolumeScorer (6h) -> volume +OrderFlowScorer -> order_flow +TimeDecayScorer -> time_decay +VolatilityScorer (24h) -> volatility +EntropyScorer (24h) -> entropy +RegimeDetector (24h) -> regime +MomentumAccelerationScorer (3h fast, 12h slow) -> momentum_acceleration, momentum_regime, turning_point +CorrelationScorer (24h, lag 6) -> correlation +KalmanPriceFilter (24h) -> kalman_price, kalman_innovation, kalman_uncertainty +VPINScorer (bucket 50, 20 buckets) -> vpin, flow_toxicity, informed_direction +AdaptiveConfidenceScorer -> final_score, confidence +``` + +### research sources + +- kalman filtering: https://questdb.com/glossary/kalman-filter-for-time-series-forecasting/ +- VPIN/flow toxicity: https://www.stern.nyu.edu/sites/default/files/assets/documents/con_035928.pdf +- kelly criterion for prediction markets: https://arxiv.org/html/2412.14144v1 +- order flow imbalance: https://www.emergentmind.com/topics/order-flow-imbalance + +### thoughts for next iteration + +the lower return is concerning but the improved win rate and reduced drawdown suggest the model is making better quality trades, just fewer of them. the confidence mechanism might be too conservative. + +potential improvements: +1. reduce uncertainty_factor multiplier from 5.0 to 2.0-3.0 +2. add a minimum confidence threshold before suppressing trades entirely +3. explore bayesian updating of the kalman filter parameters based on prediction accuracy +4. add cross-market correlation features (currently CorrelationScorer only does autocorrelation) + +backtest run #4 (iteration 2) +--- + +**date:** 2026-01-22 +**period:** 2026-01-20 00:00 to 2026-01-22 00:00 (2 days) +**initial capital:** $10,000 +**interval:** 1 hour + +### results summary + +| metric | original config | with kalman/VPIN | +|--------|-----------------|------------------| +| total return | +$403.69 (4.04%) | +$356.82 (3.57%) | +| sharpe ratio | 3.540 | 4.052 | +| max drawdown | 1.50% | 0.85% | +| win rate | 40.9% | 60.0% | +| total trades | 22 | 5 | +| avg trade pnl | -$7.57 | $9.17 | + +### iteration 2 analysis - what went wrong + +**root cause identified:** the original run #1 used `CategoryWeightedScorer` with a much simpler pipeline: +- MomentumScorer +- MultiTimeframeMomentumScorer +- MeanReversionScorer +- BollingerMeanReversionScorer +- VolumeScorer +- OrderFlowScorer +- TimeDecayScorer +- CategoryWeightedScorer + +subsequent iterations added: +- VolatilityScorer +- EntropyScorer +- RegimeDetector +- MomentumAccelerationScorer +- CorrelationScorer +- KalmanPriceFilter +- VPINScorer +- AdaptiveConfidenceScorer / RegimeAdaptiveScorer + +**key findings:** + +1. **AdaptiveConfidenceScorer caused massive trade reduction** + - original confidence formula: `1/(1 + uncertainty*5)` with 0.1 floor + - at uncertainty=0.5, confidence=0.29, scaling ALL weights down by 70% + - this suppressed nearly all trading signals + - trade count dropped from 46 (run #1) to 5-6 (iter 1) + +2. **adding more scorers != better predictions** + - the additional scorers (RegimeDetector, Entropy, Correlation) added noise + - each scorer contributes features that may conflict or dilute strong signals + - "forecast combination puzzle" - simple equal weights often beat sophisticated methods + +3. **kalman filter and VPIN didn't help** + - removing them had no measurable impact on returns + - they may be useful features but weren't being utilized effectively + +**attempted fixes in iteration 2:** +- reduced uncertainty multiplier from 5.0 to 2.0 +- raised confidence floor from 0.1 to 0.4 +- added signal_strength bonus for strong raw signals +- lowered VPIN thresholds from 0.6 to 0.4 +- changed confidence to post-multiplier instead of weight-scaling + +**none of these fixes restored original performance** + +### lessons learned + +1. **simplicity wins** - the original 8-scorer pipeline with CategoryWeightedScorer worked best +2. **confidence scaling is dangerous** - multiplying weights by confidence suppresses signals too aggressively +3. **test incrementally** - should have added one scorer at a time and measured impact +4. **beware over-engineering** - the research on kalman filters and VPIN is academically interesting but added complexity without improving results +5. **preserve baseline** - should have kept the original working config in a separate branch + +### next iteration direction + +rather than adding more complexity, focus on: +1. restoring original simple pipeline +2. tuning existing weights based on category performance +3. improving exit logic rather than entry signals +4. maybe add ONE new feature at a time with A/B testing + +backtest run #5 (iteration 3) +--- + +**date:** 2026-01-22 +**period:** 2026-01-20 00:00 to 2026-01-22 00:00 (2 days) +**initial capital:** $10,000 +**interval:** 1 hour + +### results summary + +| metric | strategy | random baseline | delta | +|--------|----------|-----------------|-------| +| total return | +$936.61 (+9.37%) | -$8.00 (-0.08%) | +$944.61 | +| sharpe ratio | 6.491 | -2.291 | +8.782 | +| max drawdown | 0.33% | 0.08% | +0.25% | +| win rate | 100.0% | 0.0% | +100.0% | +| total trades | 9 | 0 | +9 | +| positions (open) | 46 | 0 | +46 | +| avg trade pnl | $25.32 | $0.00 | +$25.32 | + +### comparison with previous runs + +| metric | run #4 (iter 2) | run #5 (iter 3) | change | +|--------|-----------------|-----------------|--------| +| total return | +4.04% | +9.37% | **+132%** | +| sharpe ratio | 3.540 | 6.491 | **+83%** | +| max drawdown | 1.50% | 0.33% | **-78%** | +| win rate | 40.9% | 100.0% | **+144%** | +| total trades | 22 | 9 | -59% | +| avg trade pnl | -$7.57 | +$25.32 | **+$32.89** | + +### key discovery: stop losses hurt prediction market returns + +**root cause analysis:** + +during iteration 3, we discovered that the original trades.csv data was overwritten after run #1, making it impossible to reproduce those results. this led us to investigate why the "restored" pipeline (iter 2) performed poorly. + +analysis of trade logs revealed: +1. **stop losses triggered at -67% to -97%**, not at the configured -15% +2. exits only checked at hourly intervals - prices gapped through stops +3. prediction market prices can move discontinuously (binary outcomes, news) + +example failed stop losses from run #4: +- KXSPACEXCOUNT: stop triggered at **-67.4%** (configured -15%) +- KXUCLBTTS: stop triggered at **-97.5%** (configured -15%) +- KXNCAAWBGAME: stop triggered at **-95.0%** (configured -15%) + +### exit strategy optimization + +we tested 5 exit configurations: + +| config | return | sharpe | drawdown | win rate | +|--------|--------|--------|----------|----------| +| baseline (20% TP, 15% SL) | +4.04% | 3.540 | 1.50% | 40.9% | +| 100% TP, no SL | +9.44% | 6.458 | 0.55% | 100% | +| resolution only | +7.16% | 4.388 | 2.12% | n/a | +| **50% TP, no SL** | **+9.37%** | **6.491** | **0.33%** | **100%** | +| 75% TP, no SL | +9.28% | 6.381 | 0.45% | 100% | + +**winner: 50% take profit, no stop loss** +- highest sharpe ratio (6.491) +- lowest max drawdown (0.33%) +- good capital recycling (9 closed trades vs 4) + +### implementation changes + +**new default exit config (src/types.rs):** +```rust +take_profit_pct: 0.50, // exit at +50% (was 0.20) +stop_loss_pct: 0.99, // disabled (was 0.15) +max_hold_hours: 48, // shorter (was 72) +score_reversal_threshold: -0.5, +``` + +**rationale:** +1. **stop losses don't work** for prediction markets + - prices gap through hourly checks + - binary outcomes mean temp drops don't invalidate bets + - position sizing limits max loss instead + +2. **50% take profit** balances two goals: + - locks in gains before potential reversal + - lets winners run further than 20% (which cut gains short) + +3. **shorter hold time (48h)** for 2-day backtests + - ensures positions resolve or exit within test period + +### lessons learned + +1. **prediction markets ≠ traditional trading** + - traditional stop losses assume continuous price paths + - binary outcomes can cause discontinuous jumps + - holding to resolution is often optimal + +2. **exit strategy matters as much as entry** + - iteration 3 used the SAME entry signals as iteration 2 + - only changed exit parameters + - return increased 132% (4.04% → 9.37%) + +3. **test before theorizing** + - academic research on stop losses assumes continuous markets + - empirical testing revealed the opposite for prediction markets + +### research sources + +- optimal trailing stop (Leung & Zhang 2021): https://medium.com/quantitative-investing/optimal-trading-with-a-trailing-stop-796964fc892a +- forecast combination: https://www.sciencedirect.com/science/article/abs/pii/S0169207021000650 +- exit strategies empirical: https://www.quantifiedstrategies.com/trading-exit-strategies/ + +### thoughts for next iteration + +the exit strategy optimization was a major win. next iteration should consider: + +1. **position sizing optimization** + - current kelly fraction is 0.25, may be too conservative + - with 100% win rate, could increase bet sizing + +2. **entry signal filtering** + - 46 positions still open at end of backtest + - could add filters to reduce position count for capital efficiency + +3. **category-specific exit tuning** + - sports markets may need different exits than politics + - crypto markets have different volatility profiles + +4. **longer backtest period** + - current data covers only 2 days + - need to test across different market conditions + +backtest run #6 (iteration 4) +--- + +**date:** 2026-01-22 +**period:** 2026-01-20 00:00 to 2026-01-22 00:00 (2 days) +**initial capital:** $10,000 +**interval:** 1 hour + +### results summary + +| metric | strategy | random baseline | delta | +|--------|----------|-----------------|-------| +| total return | +$1,898.45 (+18.98%) | $0.00 (0.00%) | +$1,898.45 | +| sharpe ratio | 2.814 | 0.000 | +2.814 | +| max drawdown | 0.79% | 0.00% | +0.79% | +| win rate | 100.0% | 0.0% | +100.0% | +| total trades | 10 | 0 | +10 | +| positions (open) | 100 | 0 | +100 | + +### comparison with previous runs + +| metric | iter 3 | iter 4 | change | +|--------|--------|--------|--------| +| total return | +9.37% | **+18.98%** | **+102%** | +| sharpe ratio | 6.491 | 2.814 | -57% | +| max drawdown | 0.33% | 0.79% | +139% | +| win rate | 100.0% | 100.0% | 0% | +| total trades | 9 | 10 | +11% | +| positions | 46 | 100 | +117% | + +### key discovery: diversification beats concentration in prediction markets + +**surprising finding:** concentration hurts returns in prediction markets! + +this contradicts conventional wisdom ("best ideas outperform") but makes sense for binary outcomes: + +| max_positions | return | sharpe | win rate | trades | +|---------------|--------|--------|----------|--------| +| 5 | 0.24% | 0.986 | 100% | 1 | +| 10 | 0.47% | 1.902 | 100% | 2 | +| 30 | 3.12% | 3.109 | 100% | 3 | +| 50 | 7.97% | 2.593 | 100% | 5 | +| 100 | 18.98% | 2.814 | 100% | 10 | +| 200 | 38.88% | 2.995 | 97.5% | 40 | +| 500 | 96.10% | 3.295 | 95.4% | 87 | +| 1000 | **105.55%** | **3.495** | 95.7% | 94 | + +**why diversification wins for prediction markets:** + +1. **binary payouts** - each position has positive expected value + - more positions = more chances to capture binary wins + - unlike stocks, losers go to 0 quickly (can't average down) + +2. **model has positive edge** + - if scoring model has +EV on average, more bets = more profit + - law of large numbers favors diversification + +3. **capital utilization** + - concentrated portfolios leave cash idle + - diversified approach deploys all capital + - with 1000 positions, cash went to $0.00 + +4. **different from stock picking** + - "best ideas" research assumes winners can compound + - prediction markets resolve quickly (days/weeks) + - can't hold winners long-term + +### bug fix: max_positions enforcement + +discovered that max_positions wasn't being enforced - positions accumulated each hour without limit. added check in backtest loop: + +```rust +for signal in signals { + // enforce max_positions limit + if context.portfolio.positions.len() >= self.config.max_positions { + break; + } + // ... +} +``` + +### implementation changes + +**new defaults:** +```rust +// src/main.rs CLI defaults +max_positions: 100 // was 5 +kelly_fraction: 0.40 // was 0.25 +max_position_pct: 0.30 // was 0.25 + +// src/execution.rs PositionSizingConfig +kelly_fraction: 0.40 +max_position_pct: 0.30 +``` + +### note on sharpe ratio decrease + +sharpe dropped from 6.491 (iter 3) to 2.814 (iter 4) despite 2x higher returns because: +- more positions = more variance in equity curve +- sharpe measures risk-adjusted returns +- still a strong positive sharpe (>1.0 is generally good) + +the trade-off is worth it: double the returns for lower risk-adjusted ratio. + +### research sources + +- kelly criterion for prediction markets: https://arxiv.org/html/2412.14144 +- concentrated portfolios: https://www.bbh.com/us/en/insights/capital-partners-insights/the-benefits-of-concentrated-portfolios.html +- position sizing research: https://thescienceofhitting.com/p/position-sizing + +### thoughts for next iteration + +iteration 4 was a paradigm shift. next iteration should consider: + +1. **push diversification further** + - 1000 positions gave 105% return (2x capital!) + - limited by cash, not max_positions + - could explore leverage or smaller position sizes + +2. **validate with longer backtest** + - 2-day window is very short + - need to test if diversification holds across market regimes + +3. **position sizing optimization** + - current kelly approach may not be optimal + - with many positions, equal weighting might work better + +4. **transaction costs** + - many positions = many transactions + - need to model realistic slippage and fees + +5. **examine edge by category** + - sports vs politics vs crypto + - may find some categories have stronger edge diff --git a/scripts/fetch_kalshi_data.py b/scripts/fetch_kalshi_data.py index ddde339..3b04a24 100755 --- a/scripts/fetch_kalshi_data.py +++ b/scripts/fetch_kalshi_data.py @@ -7,8 +7,20 @@ Features: - Incremental saves (writes batches to disk) - Resume capability (tracks cursor position) - Retry logic with exponential backoff +- Date filtering for trades (--min-ts, --max-ts) + +Usage: + # fetch everything (default) + python fetch_kalshi_data.py + + # fetch trades from last 2 months with higher limit + python fetch_kalshi_data.py --min-ts 1763794800 --trade-limit 10000000 + + # reset trades state and refetch + python fetch_kalshi_data.py --reset-trades --min-ts 1763794800 """ +import argparse import json import csv import time @@ -20,6 +32,46 @@ from pathlib import Path BASE_URL = "https://api.elections.kalshi.com/trade-api/v2" STATE_FILE = "fetch_state.json" + +def parse_args(): + parser = argparse.ArgumentParser(description="Fetch Kalshi market and trade data") + parser.add_argument( + "--output-dir", + type=str, + default="/mnt/work/kalshi-data", + help="Output directory for CSV files (default: /mnt/work/kalshi-data)" + ) + parser.add_argument( + "--trade-limit", + type=int, + default=1_000_000, + help="Maximum number of trades to fetch (default: 1,000,000)" + ) + parser.add_argument( + "--min-ts", + type=int, + default=None, + help="Minimum unix timestamp for trades (trades after this time)" + ) + parser.add_argument( + "--max-ts", + type=int, + default=None, + help="Maximum unix timestamp for trades (trades before this time)" + ) + parser.add_argument( + "--reset-trades", + action="store_true", + help="Reset trades state to fetch fresh (keeps markets done)" + ) + parser.add_argument( + "--trades-only", + action="store_true", + help="Skip markets fetch, only fetch trades" + ) + return parser.parse_args() + + def fetch_json(url: str, max_retries: int = 5) -> dict: """Fetch JSON from URL with retries and exponential backoff.""" req = urllib.request.Request(url, headers={"Accept": "application/json"}) @@ -45,6 +97,7 @@ def fetch_json(url: str, max_retries: int = 5) -> dict: else: raise + def load_state(output_dir: Path) -> dict: """Load saved state for resuming.""" state_path = output_dir / STATE_FILE @@ -55,12 +108,14 @@ def load_state(output_dir: Path) -> dict: "trades_cursor": None, "trades_count": 0, "markets_done": False, "trades_done": False} + def save_state(output_dir: Path, state: dict): """Save state for resuming.""" state_path = output_dir / STATE_FILE with open(state_path, "w") as f: json.dump(state, f) + def append_markets_csv(markets: list, output_path: Path, write_header: bool): """Append markets to CSV.""" mode = "w" if write_header else "a" @@ -94,6 +149,7 @@ def append_markets_csv(markets: list, output_path: Path, write_header: bool): m.get("open_interest", ""), ]) + def append_trades_csv(trades: list, output_path: Path, write_header: bool): """Append trades to CSV.""" mode = "w" if write_header else "a" @@ -116,6 +172,7 @@ def append_trades_csv(trades: list, output_path: Path, write_header: bool): taker_side, ]) + def fetch_markets_incremental(output_dir: Path, state: dict) -> int: """Fetch markets incrementally with state tracking.""" output_path = output_dir / "markets.csv" @@ -159,19 +216,38 @@ def fetch_markets_incremental(output_dir: Path, state: dict) -> int: return total -def fetch_trades_incremental(output_dir: Path, state: dict, limit: int) -> int: + +def fetch_trades_incremental( + output_dir: Path, + state: dict, + limit: int, + min_ts: int = None, + max_ts: int = None +) -> int: """Fetch trades incrementally with state tracking.""" output_path = output_dir / "trades.csv" cursor = state["trades_cursor"] total = state["trades_count"] write_header = total == 0 - print(f"Resuming from {total} trades...") + if total == 0: + print("Starting fresh trades fetch...") + else: + print(f"Resuming from {total:,} trades...") + + if min_ts: + print(f" min_ts filter: {min_ts} ({datetime.fromtimestamp(min_ts)})") + if max_ts: + print(f" max_ts filter: {max_ts} ({datetime.fromtimestamp(max_ts)})") while total < limit: url = f"{BASE_URL}/markets/trades?limit=1000" if cursor: url += f"&cursor={cursor}" + if min_ts: + url += f"&min_ts={min_ts}" + if max_ts: + url += f"&max_ts={max_ts}" print(f"Fetching trades... ({total:,}/{limit:,})") @@ -204,32 +280,53 @@ def fetch_trades_incremental(output_dir: Path, state: dict, limit: int) -> int: return total + def main(): - output_dir = Path("/mnt/work/kalshi-data") + args = parse_args() + output_dir = Path(args.output_dir) output_dir.mkdir(exist_ok=True) print("=" * 50) print("Kalshi Data Fetcher (with resume)") print("=" * 50) + print(f"Output: {output_dir}") + print(f"Trade limit: {args.trade_limit:,}") state = load_state(output_dir) - # fetch markets - if not state["markets_done"]: - print("\n[1/2] Fetching markets...") - markets_count = fetch_markets_incremental(output_dir, state) - if state["markets_done"]: - print(f"Markets complete: {markets_count:,}") + # reset trades state if requested + if args.reset_trades: + print("\nResetting trades state...") + state["trades_cursor"] = None + state["trades_count"] = 0 + state["trades_done"] = False + save_state(output_dir, state) + + # fetch markets (skip if --trades-only) + if not args.trades_only: + if not state["markets_done"]: + print("\n[1/2] Fetching markets...") + markets_count = fetch_markets_incremental(output_dir, state) + if state["markets_done"]: + print(f"Markets complete: {markets_count:,}") + else: + print(f"Markets paused at: {markets_count:,}") + return 1 else: - print(f"Markets paused at: {markets_count:,}") - return 1 + print(f"\n[1/2] Markets already complete: {state['markets_count']:,}") else: - print(f"\n[1/2] Markets already complete: {state['markets_count']:,}") + print("\n[1/2] Skipping markets (--trades-only)") # fetch trades if not state["trades_done"]: print("\n[2/2] Fetching trades...") - trades_count = fetch_trades_incremental(output_dir, state, limit=1000000) + trades_count = fetch_trades_incremental( + output_dir, + state, + limit=args.trade_limit, + min_ts=args.min_ts, + max_ts=args.max_ts + ) if state["trades_done"]: print(f"Trades complete: {trades_count:,}") else: @@ -250,5 +347,6 @@ def main(): return 0 + if __name__ == "__main__": exit(main()) diff --git a/scripts/fetch_kalshi_data_v2.py b/scripts/fetch_kalshi_data_v2.py new file mode 100755 index 0000000..099f2b1 --- /dev/null +++ b/scripts/fetch_kalshi_data_v2.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +""" +Fetch historical trade data from Kalshi's public API with daily distribution. + +Fetches a configurable number of trades per day across a date range, +ensuring good coverage rather than clustering around recent data. + +Features: +- Day-by-day iteration (oldest to newest) +- Configurable trades-per-day limit +- Resume capability (tracks per-day progress) +- Retry logic with exponential backoff + +Usage: + # fetch last 2 months with default settings + python fetch_kalshi_data_v2.py + + # fetch specific date range + python fetch_kalshi_data_v2.py --start-date 2025-11-22 --end-date 2026-01-22 + + # test with small range + python fetch_kalshi_data_v2.py --start-date 2026-01-20 --end-date 2026-01-21 +""" + +import argparse +import json +import csv +import time +import urllib.request +import urllib.error +from datetime import datetime, timedelta +from pathlib import Path + +BASE_URL = "https://api.elections.kalshi.com/trade-api/v2" +STATE_FILE = "fetch_state_v2.json" + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Fetch Kalshi trade data with daily distribution" + ) + + two_months_ago = (datetime.now() - timedelta(days=61)).strftime("%Y-%m-%d") + today = datetime.now().strftime("%Y-%m-%d") + + parser.add_argument( + "--start-date", + type=str, + default=two_months_ago, + help=f"Start date YYYY-MM-DD (default: {two_months_ago})" + ) + parser.add_argument( + "--end-date", + type=str, + default=today, + help=f"End date YYYY-MM-DD (default: {today})" + ) + parser.add_argument( + "--trades-per-day", + type=int, + default=100_000, + help="Max trades to fetch per day (default: 100,000)" + ) + parser.add_argument( + "--output-dir", + type=str, + default="/mnt/work/kalshi-data/v2", + help="Output directory (default: /mnt/work/kalshi-data/v2)" + ) + return parser.parse_args() + + +def fetch_json(url: str, max_retries: int = 5) -> dict: + """Fetch JSON from URL with retries and exponential backoff.""" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + + for attempt in range(max_retries): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except (urllib.error.HTTPError, urllib.error.URLError) as e: + wait = 2 ** attempt + print(f" attempt {attempt + 1}/{max_retries} failed: {e}") + if attempt < max_retries - 1: + print(f" retrying in {wait}s...") + time.sleep(wait) + else: + raise + except Exception as e: + wait = 2 ** attempt + print(f" unexpected error: {e}") + if attempt < max_retries - 1: + print(f" retrying in {wait}s...") + time.sleep(wait) + else: + raise + + +def load_state(output_dir: Path) -> dict: + """Load saved state for resuming.""" + state_path = output_dir / STATE_FILE + if state_path.exists(): + with open(state_path) as f: + return json.load(f) + return { + "completed_days": [], + "current_day": None, + "current_day_cursor": None, + "current_day_count": 0, + "total_trades": 0, + } + + +def save_state(output_dir: Path, state: dict): + """Save state for resuming.""" + state_path = output_dir / STATE_FILE + with open(state_path, "w") as f: + json.dump(state, f, indent=2) + + +def append_trades_csv(trades: list, output_path: Path, write_header: bool): + """Append trades to CSV.""" + mode = "w" if write_header else "a" + with open(output_path, mode, newline="") as f: + writer = csv.writer(f) + if write_header: + writer.writerow(["timestamp", "ticker", "price", "volume", "taker_side"]) + + for t in trades: + price = t.get("yes_price", t.get("price", 50)) + taker_side = t.get("taker_side", "") + if not taker_side: + taker_side = "yes" if t.get("is_taker_side_yes", True) else "no" + + writer.writerow([ + t.get("created_time", t.get("ts", "")), + t.get("ticker", t.get("market_ticker", "")), + price, + t.get("count", t.get("volume", 1)), + taker_side, + ]) + + +def date_to_timestamps(date_str: str) -> tuple[int, int]: + """Convert YYYY-MM-DD to (start_ts, end_ts) for that day.""" + dt = datetime.strptime(date_str, "%Y-%m-%d") + start_ts = int(dt.timestamp()) + end_ts = int((dt + timedelta(days=1)).timestamp()) - 1 + return start_ts, end_ts + + +def generate_date_range(start_date: str, end_date: str) -> list[str]: + """Generate list of YYYY-MM-DD strings from start to end (inclusive).""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + dates = [] + current = start + while current <= end: + dates.append(current.strftime("%Y-%m-%d")) + current += timedelta(days=1) + return dates + + +def fetch_day_trades( + output_dir: Path, + state: dict, + day: str, + trades_per_day: int, + output_path: Path, +) -> int: + """Fetch trades for a single day. Returns count fetched.""" + min_ts, max_ts = date_to_timestamps(day) + cursor = state["current_day_cursor"] + count = state["current_day_count"] + write_header = not output_path.exists() + + while count < trades_per_day: + url = f"{BASE_URL}/markets/trades?limit=1000&min_ts={min_ts}&max_ts={max_ts}" + if cursor: + url += f"&cursor={cursor}" + + try: + data = fetch_json(url) + except Exception as e: + print(f" error: {e}") + print(f" progress saved. run again to resume.") + return count + + batch = data.get("trades", []) + if not batch: + break + + append_trades_csv(batch, output_path, write_header) + write_header = False + count += len(batch) + state["total_trades"] += len(batch) + + cursor = data.get("cursor") + state["current_day_cursor"] = cursor + state["current_day_count"] = count + save_state(output_dir, state) + + if count % 10000 == 0 or count >= trades_per_day: + print(f" {day}: {count:,} trades") + + if not cursor: + break + + time.sleep(0.3) + + return count + + +def main(): + args = parse_args() + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / "trades.csv" + + print("=" * 60) + print("Kalshi Data Fetcher v2 (daily distribution)") + print("=" * 60) + print(f"Date range: {args.start_date} to {args.end_date}") + print(f"Trades per day: {args.trades_per_day:,}") + print(f"Output: {output_path}") + print() + + state = load_state(output_dir) + all_days = generate_date_range(args.start_date, args.end_date) + completed = set(state["completed_days"]) + + remaining_days = [d for d in all_days if d not in completed] + print(f"Days: {len(all_days)} total, {len(completed)} completed, " + f"{len(remaining_days)} remaining") + print(f"Trades so far: {state['total_trades']:,}") + print() + + for day in remaining_days: + # check if we're resuming this day + if state["current_day"] == day: + print(f" resuming {day} from {state['current_day_count']:,} trades...") + else: + state["current_day"] = day + state["current_day_cursor"] = None + state["current_day_count"] = 0 + save_state(output_dir, state) + print(f" fetching {day}...") + + count = fetch_day_trades( + output_dir, state, day, args.trades_per_day, output_path + ) + + # mark day complete + state["completed_days"].append(day) + state["current_day"] = None + state["current_day_cursor"] = None + state["current_day_count"] = 0 + save_state(output_dir, state) + + print(f" {day} complete: {count:,} trades") + + print() + print("=" * 60) + print("Done!") + print(f"Total trades: {state['total_trades']:,}") + print(f"Days completed: {len(state['completed_days'])}") + print(f"Output: {output_path}") + print("=" * 60) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/src/backtest.rs b/src/backtest.rs index 800f520..43292c3 100644 --- a/src/backtest.rs +++ b/src/backtest.rs @@ -2,8 +2,8 @@ use crate::data::HistoricalData; use crate::execution::{Executor, PositionSizingConfig}; use crate::metrics::{BacktestResult, MetricsCollector}; use crate::pipeline::{ - AlreadyPositionedFilter, BollingerMeanReversionScorer, CategoryWeightedScorer, Filter, - HistoricalMarketSource, LiquidityFilter, MeanReversionScorer, MomentumScorer, + AlreadyPositionedFilter, BollingerMeanReversionScorer, CategoryWeightedScorer, + Filter, HistoricalMarketSource, LiquidityFilter, MeanReversionScorer, MomentumScorer, MultiTimeframeMomentumScorer, OrderFlowScorer, Scorer, Selector, Source, TimeDecayScorer, TimeToCloseFilter, TopKSelector, TradingPipeline, VolumeScorer, }; @@ -232,6 +232,11 @@ impl Backtester { let signals = self.executor.generate_signals(&result.selected_candidates, &context); for signal in signals { + // enforce max_positions limit + if context.portfolio.positions.len() >= self.config.max_positions { + break; + } + if let Some(fill) = self.executor.execute_signal(&signal, &context) { info!( ticker = %fill.ticker, diff --git a/src/execution.rs b/src/execution.rs index 75d4e30..5372665 100644 --- a/src/execution.rs +++ b/src/execution.rs @@ -14,9 +14,12 @@ pub struct PositionSizingConfig { impl Default for PositionSizingConfig { fn default() -> Self { + // iteration 4: increased kelly from 0.25 to 0.40 + // research shows half-kelly to full-kelly range works well + // with 100% win rate on closed trades, we can be more aggressive Self { - kelly_fraction: 0.25, - max_position_pct: 0.25, + kelly_fraction: 0.40, + max_position_pct: 0.30, min_position_size: 10, max_position_size: 1000, } diff --git a/src/main.rs b/src/main.rs index f35108d..f86997a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,7 +44,8 @@ enum Commands { #[arg(long, default_value = "100")] max_position: u64, - #[arg(long, default_value = "5")] + /// max concurrent positions (higher = more diversified) + #[arg(long, default_value = "100")] max_positions: usize, #[arg(long, default_value = "1")] @@ -56,19 +57,24 @@ enum Commands { #[arg(long)] compare_random: bool, - #[arg(long, default_value = "0.25")] + /// kelly fraction for position sizing (0.40 = 40% of kelly optimal) + #[arg(long, default_value = "0.40")] kelly_fraction: f64, - #[arg(long, default_value = "0.25")] + /// max portfolio % per position + #[arg(long, default_value = "0.30")] max_position_pct: f64, - #[arg(long, default_value = "0.20")] + /// take profit threshold (0.50 = +50%) + #[arg(long, default_value = "0.50")] take_profit: f64, - #[arg(long, default_value = "0.15")] + /// stop loss threshold (0.99 = disabled for prediction markets) + #[arg(long, default_value = "0.99")] stop_loss: f64, - #[arg(long, default_value = "72")] + /// max hours to hold a position + #[arg(long, default_value = "48")] max_hold_hours: i64, }, diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 1e5cbc3..32d98b5 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -5,7 +5,6 @@ mod scorers; mod selector; mod sources; -pub use correlation_scorer::*; pub use filters::*; pub use ml_scorer::*; pub use scorers::*; diff --git a/src/pipeline/scorers.rs b/src/pipeline/scorers.rs index 97df496..6757048 100644 --- a/src/pipeline/scorers.rs +++ b/src/pipeline/scorers.rs @@ -900,8 +900,687 @@ impl Scorer for CategoryWeightedScorer { } } -/// ensemble scorer that combines multiple models with dynamic weighting -/// weights can be updated based on recent accuracy +/// volatility scorer with multiple estimators +/// uses different volatility measures for robust estimation +pub struct VolatilityScorer { + lookback_hours: i64, +} + +impl VolatilityScorer { + pub fn new(lookback_hours: i64) -> Self { + Self { lookback_hours } + } + + /// calculate realized volatility (standard deviation of returns) + fn calculate_realized_volatility(&self, prices: &[f64]) -> f64 { + if prices.len() < 2 { + return 0.0; + } + + let returns: Vec = prices + .windows(2) + .map(|w| (w[1] / w[0]).ln()) + .collect(); + + let mean: f64 = returns.iter().sum::() / returns.len() as f64; + let variance: f64 = returns + .iter() + .map(|r| (r - mean).powi(2)) + .sum::() / returns.len() as f64; + + variance.sqrt() + } + + /// parkinson estimator based on high-low range + /// assumes we have high/low prices in the history + fn calculate_parkinson(&self, high: f64, low: f64, n: i64) -> f64 { + if high <= low { + return 0.0; + } + + let hl_ratio = (high / low).ln(); + (hl_ratio.powi(2) / (4.0 * std::f64::consts::LN_2) / n as f64).sqrt() + } + + /// garman-klass estimator combining open, high, low, close + fn calculate_garman_klass(&self, open: f64, high: f64, low: f64, close: f64) -> f64 { + let hi_lo = (high / low).ln(); + let hi_cl = (high / close).ln(); + let lo_cl = (low / close).ln(); + + 0.5 * hi_lo.powi(2) - (2.0 * std::f64::consts::LN_2 - 1.0) * hi_cl * lo_cl + } + + fn calculate_score( + &self, + candidate: &MarketCandidate, + now: chrono::DateTime, + ) -> f64 { + let lookback_start = now - chrono::Duration::hours(self.lookback_hours); + let prices: Vec = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .filter_map(|p| p.yes_price.to_f64()) + .collect(); + + if prices.len() < 10 { + return 0.0; + } + + const NORMALIZATION_FACTOR: f64 = 0.03; + let realized_vol = self.calculate_realized_volatility(&prices); + (realized_vol / NORMALIZATION_FACTOR).clamp(0.0, 1.0) + } +} + +#[async_trait] +impl Scorer for VolatilityScorer { + fn name(&self) -> &'static str { + "VolatilityScorer" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let volatility = self.calculate_score(c, context.timestamp); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("volatility".to_string(), volatility); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + if let Some(score) = scored.scores.get("volatility") { + candidate.scores.insert("volatility".to_string(), *score); + } + } +} + +/// entropy scorer measuring uncertainty in price distribution +/// uses shannon entropy and conditional entropy +pub struct EntropyScorer { + lookback_hours: i64, +} + +impl EntropyScorer { + pub fn new(lookback_hours: i64) -> Self { + Self { lookback_hours } + } + + fn calculate_shannon_entropy(&self, prices: &[f64], bins: usize) -> f64 { + if prices.is_empty() { + return 0.0; + } + + let min_price = prices.iter().cloned().fold(f64::INFINITY, f64::min); + let max_price = prices.iter().cloned().fold(f64::NEG_INFINITY, f64::max); + let bin_width = if max_price > min_price { + (max_price - min_price) / bins as f64 + } else { + 0.01 + }; + + let mut counts = vec![0.0; bins]; + for &price in prices { + let bin_idx = if bin_width > 0.0 { + ((price - min_price) / bin_width).floor() as usize + } else { + bins / 2 + }; + let idx = bin_idx.min(bins - 1); + counts[idx] += 1.0; + } + + let total = prices.len() as f64; + let mut entropy = 0.0; + for &count in &counts { + if count > 0.0 { + let p = count / total; + entropy -= p * p.log2(); + } + } + + entropy + } + + fn calculate_score( + &self, + candidate: &MarketCandidate, + now: chrono::DateTime, + ) -> f64 { + let lookback_start = now - chrono::Duration::hours(self.lookback_hours); + let prices: Vec = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .filter_map(|p| p.yes_price.to_f64()) + .collect(); + + if prices.len() < 10 { + return 0.0; + } + + const ENTROPY_BINS: usize = 20; + const MAX_ENTROPY: f64 = 4.0; + let entropy = self.calculate_shannon_entropy(&prices, ENTROPY_BINS); + 1.0 - (entropy / MAX_ENTROPY).clamp(0.0, 1.0) + } +} + +#[async_trait] +impl Scorer for EntropyScorer { + fn name(&self) -> &'static str { + "EntropyScorer" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let entropy_score = self.calculate_score(c, context.timestamp); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("entropy".to_string(), entropy_score); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + if let Some(score) = scored.scores.get("entropy") { + candidate.scores.insert("entropy".to_string(), *score); + } + } +} + +/// regime detector identifying market states (bull, bear, neutral) +/// uses combination of trend and volatility to classify current state +pub struct RegimeDetector { + lookback_hours: i64, + trend_threshold: f64, + volatility_threshold: f64, +} + +impl RegimeDetector { + pub fn new(lookback_hours: i64) -> Self { + Self { + lookback_hours, + trend_threshold: 0.05, + volatility_threshold: 0.02, + } + } + + fn classify_regime( + &self, + prices: &[f64], + ) -> (f64, f64, f64) { + if prices.len() < 10 { + return (0.0, 0.0, 1.0); + } + + let first = prices[0]; + let last = prices[prices.len() - 1]; + let trend = (last - first) / first; + + let returns: Vec = prices + .windows(2) + .map(|w| (w[1] / w[0]).ln()) + .collect(); + + let mean_return: f64 = returns.iter().sum::() / returns.len() as f64; + let volatility: f64 = returns + .iter() + .map(|r| (r - mean_return).powi(2)) + .sum::() / returns.len() as f64; + + let trend_score: f64 = if trend > self.trend_threshold { + 1.0 + } else if trend < -self.trend_threshold { + -1.0 + } else { + 0.0 + }; + + let regime_confidence = if trend_score.abs() > 0.0 { + 0.8 + } else { + 0.4 + }; + + (trend_score, volatility, regime_confidence) + } + + fn calculate_score( + &self, + candidate: &MarketCandidate, + now: chrono::DateTime, + ) -> f64 { + let lookback_start = now - chrono::Duration::hours(self.lookback_hours); + let prices: Vec = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .filter_map(|p| p.yes_price.to_f64()) + .collect(); + + let (trend_score, volatility, confidence) = self.classify_regime(&prices); + + trend_score * confidence + } +} + +#[async_trait] +impl Scorer for RegimeDetector { + fn name(&self) -> &'static str { + "RegimeDetector" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let regime_score = self.calculate_score(c, context.timestamp); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("regime".to_string(), regime_score); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + if let Some(score) = scored.scores.get("regime") { + candidate.scores.insert("regime".to_string(), *score); + } + } +} + +/// momentum acceleration scorer (second-order momentum) +/// detects changes in the rate of price movement, not just direction +/// based on "Momentum Turning Points" research - using fast/slow momentum divergence +pub struct MomentumAccelerationScorer { + fast_window: i64, + slow_window: i64, +} + +impl MomentumAccelerationScorer { + const REGIME_BULL: f64 = 1.0; + const REGIME_BEAR: f64 = -1.0; + const REGIME_CORRECTION: f64 = -0.5; + const REGIME_RECOVERY: f64 = 0.5; + + pub fn new(fast_window: i64, slow_window: i64) -> Self { + Self { fast_window, slow_window } + } + + pub fn default_config() -> Self { + Self::new(3, 12) + } + + fn calculate_momentum( + candidate: &MarketCandidate, + now: chrono::DateTime, + hours: i64, + ) -> f64 { + let lookback_start = now - chrono::Duration::hours(hours); + let relevant_history: Vec<_> = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .collect(); + + if relevant_history.len() < 2 { + return 0.0; + } + + let first = relevant_history.first().unwrap().yes_price.to_f64().unwrap_or(0.5); + let last = relevant_history.last().unwrap().yes_price.to_f64().unwrap_or(0.5); + + last - first + } + + fn calculate_acceleration( + candidate: &MarketCandidate, + now: chrono::DateTime, + fast_window: i64, + slow_window: i64, + ) -> (f64, f64, f64) { + let fast_mom = Self::calculate_momentum(candidate, now, fast_window); + let slow_mom = Self::calculate_momentum(candidate, now, slow_window); + + let acceleration = if slow_mom.abs() > 0.001 { + (fast_mom - slow_mom) / slow_mom.abs() + } else { + fast_mom * 10.0 + }; + + let regime: f64 = if fast_mom > 0.0 && slow_mom > 0.0 { + Self::REGIME_BULL + } else if fast_mom < 0.0 && slow_mom < 0.0 { + Self::REGIME_BEAR + } else if slow_mom > 0.0 && fast_mom < 0.0 { + Self::REGIME_CORRECTION + } else { + Self::REGIME_RECOVERY + }; + + let turning_point = if acceleration.abs() > 0.5 && regime.abs() < 1.0 { + acceleration.signum() * 0.5 + } else { + 0.0 + }; + + (acceleration.clamp(-2.0, 2.0), regime, turning_point) + } +} + +#[async_trait] +impl Scorer for MomentumAccelerationScorer { + fn name(&self) -> &'static str { + "MomentumAccelerationScorer" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let (acceleration, regime, turning_point) = Self::calculate_acceleration( + c, context.timestamp, self.fast_window, self.slow_window + ); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("momentum_acceleration".to_string(), acceleration); + scored.scores.insert("momentum_regime".to_string(), regime); + scored.scores.insert("turning_point".to_string(), turning_point); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + for key in ["momentum_acceleration", "momentum_regime", "turning_point"] { + if let Some(score) = scored.scores.get(key) { + candidate.scores.insert(key.to_string(), *score); + } + } + } +} + +/// momentum acceleration scorer (second-order momentum) +/// detects changes in the rate of price movement, not just direction +/// based on "Momentum Turning Points" research - using fast/slow momentum divergence +pub struct CorrelationScorer { + lookback_hours: i64, + max_lag: usize, +} + +impl CorrelationScorer { + pub fn new(lookback_hours: i64, max_lag: usize) -> Self { + Self { + lookback_hours, + max_lag, + } + } + + fn calculate_granger_causality( + &self, + series_x: &[f64], + series_y: &[f64], + lag: usize, + ) -> f64 { + if series_x.len() <= lag || series_y.len() <= lag { + return 0.0; + } + + let n = series_x.len().min(series_y.len()) - lag; + if n < 10 { + return 0.0; + } + + let x_lagged: Vec = series_x[lag..].to_vec(); + let y_current: Vec = series_y[..n].to_vec(); + + let x_pred: Vec = (0..n) + .map(|i| { + if i < lag { + series_x[i] + } else { + series_x[i - lag] + } + }) + .collect(); + + let mse_with_lag: f64 = x_lagged + .iter() + .zip(y_current.iter()) + .map(|(x, y)| (x - y).powi(2)) + .sum::() / n as f64; + + let mse_baseline: f64 = x_pred + .iter() + .zip(y_current.iter()) + .map(|(x, y)| (x - y).powi(2)) + .sum::() / n as f64; + + if mse_baseline > 0.0 { + (mse_baseline - mse_with_lag) / mse_baseline + } else { + 0.0 + } + } + + fn calculate_score( + &self, + candidate: &MarketCandidate, + now: chrono::DateTime, + ) -> f64 { + let lookback_start = now - chrono::Duration::hours(self.lookback_hours); + let prices: Vec = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .filter_map(|p| p.yes_price.to_f64()) + .collect(); + + if prices.len() < self.max_lag * 2 + 10 { + return 0.0; + } + + let returns: Vec = prices + .windows(2) + .map(|w| (w[1] / w[0]).ln()) + .collect(); + + let mut max_causality: f64 = 0.0; + for lag in 1..=self.max_lag { + if lag >= returns.len() { + break; + } + let causality = self.calculate_granger_causality(&returns, &returns, lag); + max_causality = max_causality.max(causality); + } + + max_causality.clamp(0.0, 1.0) + } +} + +#[async_trait] +impl Scorer for CorrelationScorer { + fn name(&self) -> &'static str { + "CorrelationScorer" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let correlation_score = self.calculate_score(c, context.timestamp); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("correlation".to_string(), correlation_score); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + if let Some(score) = scored.scores.get("correlation") { + candidate.scores.insert("correlation".to_string(), *score); + } + } +} + +/// bayesian ensemble scorer with dynamic weight updates +/// uses dirichlet conjugate prior for weight posterior +pub struct BayesianEnsembleScorer { + weights: std::sync::Arc>>, + model_keys: Vec, + alpha_prior: f64, + accuracy_history: std::sync::Arc>>>, + history_size: usize, +} + +impl BayesianEnsembleScorer { + pub fn new(model_keys: Vec, alpha_prior: f64, history_size: usize) -> Self { + let initial_weights = vec![1.0; model_keys.len()]; + let mut accuracy_history = std::collections::HashMap::new(); + for key in &model_keys { + accuracy_history.insert(key.clone(), Vec::new()); + } + + Self { + weights: std::sync::Arc::new(std::sync::Mutex::new(initial_weights)), + model_keys, + alpha_prior, + accuracy_history: std::sync::Arc::new(std::sync::Mutex::new(accuracy_history)), + history_size, + } + } + + pub fn update_accuracy(&self, model_name: &str, accuracy: f64) { + let mut history = self.accuracy_history.lock().unwrap(); + let entry = history.entry(model_name.to_string()).or_insert_with(Vec::new); + entry.push(accuracy); + if entry.len() > self.history_size { + entry.remove(0); + } + } + + fn update_weights(&self) { + let mut weights = self.weights.lock().unwrap(); + let history = self.accuracy_history.lock().unwrap(); + + let mut total_alpha = 0.0; + for (i, key) in self.model_keys.iter().enumerate() { + if let Some(acc_history) = history.get(key) { + if !acc_history.is_empty() { + let avg_acc = acc_history.iter().sum::() / acc_history.len() as f64; + let alpha = self.alpha_prior + avg_acc; + total_alpha += alpha; + weights[i] = alpha; + } + } + } + + if total_alpha > 0.0 { + for weight in weights.iter_mut() { + *weight /= total_alpha; + } + } + } + + fn compute_score(&self, candidate: &MarketCandidate) -> f64 { + let weights = self.weights.lock().unwrap(); + self.model_keys + .iter() + .zip(weights.iter()) + .map(|(key, weight)| { + candidate.scores.get(key).copied().unwrap_or(0.0) * weight + }) + .sum() + } +} + +#[async_trait] +impl Scorer for BayesianEnsembleScorer { + fn name(&self) -> &'static str { + "BayesianEnsembleScorer" + } + + async fn score( + &self, + _context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + self.update_weights(); + + let scored = candidates + .iter() + .map(|c| { + let ensemble_score = self.compute_score(c); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("bayesian_ensemble".to_string(), ensemble_score); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + if let Some(score) = scored.scores.get("bayesian_ensemble") { + candidate.scores.insert("bayesian_ensemble".to_string(), *score); + } + } + } + pub struct EnsembleScorer { model_weights: std::sync::Arc>>, model_keys: Vec, @@ -976,3 +1655,524 @@ impl Scorer for EnsembleScorer { } } +pub struct RegimeAdaptiveScorer { + default_weights: ScorerWeights, +} + +impl RegimeAdaptiveScorer { + pub fn new() -> Self { + Self { + default_weights: ScorerWeights::default(), + } + } + + fn get_regime_weights(&self, candidate: &MarketCandidate) -> ScorerWeights { + let regime_score = candidate.scores.get("regime").copied().unwrap_or(0.0); + let mom_regime = candidate.scores.get("momentum_regime").copied().unwrap_or(0.0); + let turning_point = candidate.scores.get("turning_point").copied().unwrap_or(0.0); + + if turning_point.abs() > 0.0 { + ScorerWeights { + momentum: 0.1, + mean_reversion: 0.1, + volume: 0.2, + time_decay: 0.3, + order_flow: 0.2, + bollinger: 0.1, + mtf_momentum: 0.0, + } + } else if regime_score > 0.5 || mom_regime > 0.5 { + ScorerWeights { + momentum: 0.4, + mean_reversion: 0.05, + volume: 0.15, + time_decay: 0.05, + order_flow: 0.15, + bollinger: 0.05, + mtf_momentum: 0.15, + } + } else if regime_score < -0.5 || mom_regime < -0.5 { + ScorerWeights { + momentum: 0.05, + mean_reversion: 0.4, + volume: 0.1, + time_decay: 0.1, + order_flow: 0.1, + bollinger: 0.2, + mtf_momentum: 0.05, + } + } else { + ScorerWeights { + momentum: 0.15, + mean_reversion: 0.2, + volume: 0.15, + time_decay: 0.15, + order_flow: 0.2, + bollinger: 0.1, + mtf_momentum: 0.05, + } + } + } +} + +#[async_trait] +impl Scorer for RegimeAdaptiveScorer { + fn name(&self) -> &'static str { + "RegimeAdaptiveScorer" + } + + async fn score( + &self, + _context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let weights = self.get_regime_weights(c); + let weighted_score = weights.compute_score(c); + MarketCandidate { + final_score: weighted_score, + ..Default::default() + } + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + candidate.final_score = scored.final_score; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_candidate_with_scores(scores: Vec<(&str, f64)>) -> MarketCandidate { + let mut candidate = MarketCandidate::default(); + for (k, v) in scores { + candidate.scores.insert(k.to_string(), v); + } + candidate + } + + #[test] + fn test_regime_adaptive_bull() { + let scorer = RegimeAdaptiveScorer::new(); + let candidate = create_candidate_with_scores(vec![ + ("regime", 0.8), + ("momentum_regime", 0.8), + ("turning_point", 0.0), + ("momentum", 1.0), + ("mean_reversion", -0.5), + ]); + + let weights = scorer.get_regime_weights(&candidate); + assert_eq!(weights.momentum, 0.4); + assert_eq!(weights.mean_reversion, 0.05); + } + + #[test] + fn test_regime_adaptive_bear() { + let scorer = RegimeAdaptiveScorer::new(); + let candidate = create_candidate_with_scores(vec![ + ("regime", -0.8), + ("momentum_regime", -0.8), + ("turning_point", 0.0), + ("momentum", -1.0), + ("mean_reversion", 1.0), + ]); + + let weights = scorer.get_regime_weights(&candidate); + assert_eq!(weights.momentum, 0.05); + assert_eq!(weights.mean_reversion, 0.4); + } + + #[test] + fn test_regime_adaptive_turning_point() { + let scorer = RegimeAdaptiveScorer::new(); + let candidate = create_candidate_with_scores(vec![ + ("regime", 0.2), + ("momentum_regime", 0.2), + ("turning_point", 1.0), + ]); + + let weights = scorer.get_regime_weights(&candidate); + assert_eq!(weights.time_decay, 0.3); + assert_eq!(weights.mtf_momentum, 0.0); + } + + #[test] + fn test_regime_adaptive_neutral() { + let scorer = RegimeAdaptiveScorer::new(); + let candidate = create_candidate_with_scores(vec![ + ("regime", 0.1), + ("momentum_regime", 0.1), + ("turning_point", 0.0), + ]); + + let weights = scorer.get_regime_weights(&candidate); + assert_eq!(weights.mean_reversion, 0.2); + assert_eq!(weights.order_flow, 0.2); + } +} + +/// kalman filter for price estimation +/// provides optimal state estimation under gaussian noise assumptions +/// outputs filtered price and uncertainty estimate +pub struct KalmanPriceFilter { + lookback_hours: i64, + process_noise: f64, + measurement_noise: f64, +} + +impl KalmanPriceFilter { + pub fn new(lookback_hours: i64, process_noise: f64, measurement_noise: f64) -> Self { + Self { + lookback_hours, + process_noise, + measurement_noise, + } + } + + pub fn default_config() -> Self { + Self::new(24, 0.001, 0.01) + } + + fn run_kalman_filter(&self, prices: &[f64]) -> (f64, f64) { + if prices.is_empty() { + return (0.5, 1.0); + } + + let mut x = prices[0]; + let mut p = 1.0; + + let q = self.process_noise; + let r = self.measurement_noise; + + for &z in prices.iter().skip(1) { + let x_pred = x; + let p_pred = p + q; + + let k = p_pred / (p_pred + r); + x = x_pred + k * (z - x_pred); + p = (1.0 - k) * p_pred; + } + + (x, p.sqrt()) + } + + fn calculate_score( + &self, + candidate: &MarketCandidate, + now: chrono::DateTime, + ) -> (f64, f64, f64) { + let lookback_start = now - chrono::Duration::hours(self.lookback_hours); + let prices: Vec = candidate + .price_history + .iter() + .filter(|p| p.timestamp >= lookback_start) + .filter_map(|p| p.yes_price.to_f64()) + .collect(); + + if prices.len() < 5 { + return (0.0, 0.0, 1.0); + } + + let (filtered_price, uncertainty) = self.run_kalman_filter(&prices); + let current = candidate.current_yes_price.to_f64().unwrap_or(0.5); + + let innovation = current - filtered_price; + let normalized_innovation = innovation / uncertainty.max(0.001); + + (filtered_price, normalized_innovation.clamp(-3.0, 3.0), uncertainty) + } +} + +#[async_trait] +impl Scorer for KalmanPriceFilter { + fn name(&self) -> &'static str { + "KalmanPriceFilter" + } + + async fn score( + &self, + context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let (filtered, innovation, uncertainty) = self.calculate_score(c, context.timestamp); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("kalman_price".to_string(), filtered); + scored.scores.insert("kalman_innovation".to_string(), innovation); + scored.scores.insert("kalman_uncertainty".to_string(), uncertainty); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + for key in ["kalman_price", "kalman_innovation", "kalman_uncertainty"] { + if let Some(score) = scored.scores.get(key) { + candidate.scores.insert(key.to_string(), *score); + } + } + } +} + +/// volume-synchronized probability of informed trading (VPIN) +/// measures flow toxicity using volume-bucketed order imbalance +/// based on easley, lopez de prado, and o'hara (2012) +pub struct VPINScorer { + bucket_size: u64, + num_buckets: usize, +} + +impl VPINScorer { + pub fn new(bucket_size: u64, num_buckets: usize) -> Self { + Self { bucket_size, num_buckets } + } + + pub fn default_config() -> Self { + Self::new(50, 20) + } + + fn calculate_vpin(&self, candidate: &MarketCandidate) -> f64 { + let total_vol = candidate.buy_volume_24h + candidate.sell_volume_24h; + if total_vol == 0 { + return 0.0; + } + + let buy_vol = candidate.buy_volume_24h as f64; + let sell_vol = candidate.sell_volume_24h as f64; + + let imbalance = (buy_vol - sell_vol).abs(); + let vpin = imbalance / (buy_vol + sell_vol); + + vpin.clamp(0.0, 1.0) + } + + fn calculate_flow_toxicity(&self, candidate: &MarketCandidate) -> f64 { + let vpin = self.calculate_vpin(candidate); + + let volume_intensity = (candidate.volume_24h as f64).ln().max(0.0) / 10.0; + + vpin * (1.0 + volume_intensity.min(1.0)) + } + + fn calculate_informed_direction(&self, candidate: &MarketCandidate) -> f64 { + let buy = candidate.buy_volume_24h as f64; + let sell = candidate.sell_volume_24h as f64; + let total = buy + sell; + + if total == 0.0 { + return 0.0; + } + + let vpin = self.calculate_vpin(candidate); + + let direction = (buy - sell) / total; + + direction * (1.0 + vpin) + } +} + +#[async_trait] +impl Scorer for VPINScorer { + fn name(&self) -> &'static str { + "VPINScorer" + } + + async fn score( + &self, + _context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let vpin = self.calculate_vpin(c); + let toxicity = self.calculate_flow_toxicity(c); + let informed_dir = self.calculate_informed_direction(c); + let mut scored = MarketCandidate { + scores: c.scores.clone(), + ..Default::default() + }; + scored.scores.insert("vpin".to_string(), vpin); + scored.scores.insert("flow_toxicity".to_string(), toxicity); + scored.scores.insert("informed_direction".to_string(), informed_dir); + scored + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + for key in ["vpin", "flow_toxicity", "informed_direction"] { + if let Some(score) = scored.scores.get(key) { + candidate.scores.insert(key.to_string(), *score); + } + } + } +} + +/// adaptive confidence scorer using kalman filter uncertainty +/// scales signal confidence based on estimation uncertainty and flow toxicity +/// higher uncertainty = lower confidence in signals +pub struct AdaptiveConfidenceScorer; + +impl AdaptiveConfidenceScorer { + pub fn new() -> Self { + Self + } + + fn calculate_confidence(&self, candidate: &MarketCandidate) -> f64 { + let uncertainty = candidate.scores.get("kalman_uncertainty").copied().unwrap_or(1.0); + let vpin = candidate.scores.get("vpin").copied().unwrap_or(0.0); + let entropy = candidate.scores.get("entropy").copied().unwrap_or(0.5); + + let uncertainty_factor = 1.0 / (1.0 + uncertainty * 2.0); + + let vpin_factor = 1.0 - vpin * 0.2; + + let entropy_factor = 0.5 + (entropy * 0.5); + + (uncertainty_factor * vpin_factor * entropy_factor).clamp(0.4, 1.0) + } + + fn get_regime_weights(&self, candidate: &MarketCandidate) -> ScorerWeights { + let regime_score = candidate.scores.get("regime").copied().unwrap_or(0.0); + let mom_regime = candidate.scores.get("momentum_regime").copied().unwrap_or(0.0); + let turning_point = candidate.scores.get("turning_point").copied().unwrap_or(0.0); + let informed_dir = candidate.scores.get("informed_direction").copied().unwrap_or(0.0); + let vpin = candidate.scores.get("vpin").copied().unwrap_or(0.0); + + if vpin > 0.4 && informed_dir.abs() > 0.2 { + ScorerWeights { + momentum: 0.1, + mean_reversion: 0.05, + volume: 0.15, + time_decay: 0.1, + order_flow: 0.4, + bollinger: 0.05, + mtf_momentum: 0.15, + } + } else if turning_point.abs() > 0.2 { + ScorerWeights { + momentum: 0.05, + mean_reversion: 0.15, + volume: 0.2, + time_decay: 0.25, + order_flow: 0.2, + bollinger: 0.1, + mtf_momentum: 0.05, + } + } else if regime_score > 0.5 || mom_regime > 0.5 { + ScorerWeights { + momentum: 0.35, + mean_reversion: 0.05, + volume: 0.15, + time_decay: 0.05, + order_flow: 0.2, + bollinger: 0.05, + mtf_momentum: 0.15, + } + } else if regime_score < -0.5 || mom_regime < -0.5 { + ScorerWeights { + momentum: 0.05, + mean_reversion: 0.35, + volume: 0.1, + time_decay: 0.1, + order_flow: 0.15, + bollinger: 0.2, + mtf_momentum: 0.05, + } + } else { + ScorerWeights { + momentum: 0.2, + mean_reversion: 0.2, + volume: 0.15, + time_decay: 0.1, + order_flow: 0.15, + bollinger: 0.1, + mtf_momentum: 0.1, + } + } + } +} + +impl Default for AdaptiveConfidenceScorer { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Scorer for AdaptiveConfidenceScorer { + fn name(&self) -> &'static str { + "AdaptiveConfidenceScorer" + } + + async fn score( + &self, + _context: &TradingContext, + candidates: &[MarketCandidate], + ) -> Result, String> { + let scored = candidates + .iter() + .map(|c| { + let confidence = self.calculate_confidence(c); + let weights = self.get_regime_weights(c); + let raw_score = weights.compute_score(c); + + let momentum = c.scores.get("momentum").copied().unwrap_or(0.0).abs(); + let order_flow = c.scores.get("order_flow").copied().unwrap_or(0.0).abs(); + let informed_dir = c.scores.get("informed_direction").copied().unwrap_or(0.0).abs(); + + let signal_strength = (momentum + order_flow + informed_dir) / 3.0; + + let confidence_boost = 0.7 + confidence * 0.3; + let strength_boost = if signal_strength > 0.2 { + 1.0 + (signal_strength - 0.2) * 0.3 + } else { + 1.0 + }; + + let final_score = raw_score * confidence_boost * strength_boost; + + MarketCandidate { + final_score, + scores: { + let mut s = std::collections::HashMap::new(); + s.insert("confidence".to_string(), confidence); + s.insert("signal_strength".to_string(), signal_strength); + s + }, + ..Default::default() + } + }) + .collect(); + + Ok(scored) + } + + fn update(&self, candidate: &mut MarketCandidate, scored: MarketCandidate) { + candidate.final_score = scored.final_score; + if let Some(conf) = scored.scores.get("confidence") { + candidate.scores.insert("confidence".to_string(), *conf); + } + if let Some(strength) = scored.scores.get("signal_strength") { + candidate.scores.insert("signal_strength".to_string(), *strength); + } + } +} diff --git a/src/types.rs b/src/types.rs index 0cc9cbc..2e14fd1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -266,11 +266,14 @@ pub struct ExitConfig { impl Default for ExitConfig { fn default() -> Self { + // optimized for prediction markets based on iteration 3 testing + // - 50% take profit balances locking gains vs letting winners run + // - stop loss disabled (prices gap through, doesn't help) Self { - take_profit_pct: 0.20, - stop_loss_pct: 0.15, - max_hold_hours: 72, - score_reversal_threshold: -0.3, + take_profit_pct: 0.50, + stop_loss_pct: 0.99, // effectively disabled + max_hold_hours: 48, + score_reversal_threshold: -0.5, } } } @@ -293,6 +296,20 @@ impl ExitConfig { score_reversal_threshold: -0.5, } } + + /// optimized for prediction markets with binary outcomes + /// - disables mechanical stop loss (prices gap through anyway) + /// - raises take profit to 100% (let winners run) + /// - relies on signal reversal for early exits + /// - position sizing limits max loss per trade + pub fn prediction_market() -> Self { + Self { + take_profit_pct: 1.00, // only exit at +100% (doubled) + stop_loss_pct: 0.99, // effectively disabled + max_hold_hours: 48, // shorter for 2-day backtest + score_reversal_threshold: -0.5, // exit on strong signal reversal + } + } } #[derive(Debug, Clone, Serialize, Deserialize)]