#!/usr/bin/env python3 """ Fetch historical trade and market data from Kalshi's public API. No authentication required for public endpoints. Features: - Incremental saves (writes batches to disk) - Resume capability (tracks cursor position) - Retry logic with exponential backoff """ import json import csv import time import urllib.request import urllib.error from datetime import datetime from pathlib import Path BASE_URL = "https://api.elections.kalshi.com/trade-api/v2" STATE_FILE = "fetch_state.json" def fetch_json(url: str, max_retries: int = 5) -> dict: """Fetch JSON from URL with retries and exponential backoff.""" req = urllib.request.Request(url, headers={"Accept": "application/json"}) for attempt in range(max_retries): try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except (urllib.error.HTTPError, urllib.error.URLError) as e: wait = 2 ** attempt print(f" attempt {attempt + 1}/{max_retries} failed: {e}") if attempt < max_retries - 1: print(f" retrying in {wait}s...") time.sleep(wait) else: raise except Exception as e: wait = 2 ** attempt print(f" unexpected error: {e}") if attempt < max_retries - 1: print(f" retrying in {wait}s...") time.sleep(wait) else: raise def load_state(output_dir: Path) -> dict: """Load saved state for resuming.""" state_path = output_dir / STATE_FILE if state_path.exists(): with open(state_path) as f: return json.load(f) return {"markets_cursor": None, "markets_count": 0, "trades_cursor": None, "trades_count": 0, "markets_done": False, "trades_done": False} def save_state(output_dir: Path, state: dict): """Save state for resuming.""" state_path = output_dir / STATE_FILE with open(state_path, "w") as f: json.dump(state, f) def append_markets_csv(markets: list, output_path: Path, write_header: bool): """Append markets to CSV.""" mode = "w" if write_header else "a" with open(output_path, mode, newline="") as f: writer = csv.writer(f) if write_header: writer.writerow(["ticker", "title", "category", "open_time", "close_time", "result", "status", "yes_bid", "yes_ask", "volume", "open_interest"]) for m in markets: result = "" if m.get("result") == "yes": result = "yes" elif m.get("result") == "no": result = "no" elif m.get("status") == "finalized" and m.get("result"): result = m.get("result") writer.writerow([ m.get("ticker", ""), m.get("title", ""), m.get("category", ""), m.get("open_time", ""), m.get("close_time", m.get("expiration_time", "")), result, m.get("status", ""), m.get("yes_bid", ""), m.get("yes_ask", ""), m.get("volume", ""), m.get("open_interest", ""), ]) def append_trades_csv(trades: list, output_path: Path, write_header: bool): """Append trades to CSV.""" mode = "w" if write_header else "a" with open(output_path, mode, newline="") as f: writer = csv.writer(f) if write_header: writer.writerow(["timestamp", "ticker", "price", "volume", "taker_side"]) for t in trades: price = t.get("yes_price", t.get("price", 50)) taker_side = t.get("taker_side", "") if not taker_side: taker_side = "yes" if t.get("is_taker_side_yes", True) else "no" writer.writerow([ t.get("created_time", t.get("ts", "")), t.get("ticker", t.get("market_ticker", "")), price, t.get("count", t.get("volume", 1)), taker_side, ]) def fetch_markets_incremental(output_dir: Path, state: dict) -> int: """Fetch markets incrementally with state tracking.""" output_path = output_dir / "markets.csv" cursor = state["markets_cursor"] total = state["markets_count"] write_header = total == 0 print(f"Resuming from {total} markets...") while True: url = f"{BASE_URL}/markets?limit=1000" if cursor: url += f"&cursor={cursor}" print(f"Fetching markets... ({total:,} so far)") try: data = fetch_json(url) except Exception as e: print(f"Error fetching markets: {e}") print(f"Progress saved. Run again to resume from {total:,} markets.") return total batch = data.get("markets", []) if batch: append_markets_csv(batch, output_path, write_header) write_header = False total += len(batch) cursor = data.get("cursor") state["markets_cursor"] = cursor state["markets_count"] = total save_state(output_dir, state) if not cursor: state["markets_done"] = True save_state(output_dir, state) break time.sleep(0.3) return total def fetch_trades_incremental(output_dir: Path, state: dict, limit: int) -> int: """Fetch trades incrementally with state tracking.""" output_path = output_dir / "trades.csv" cursor = state["trades_cursor"] total = state["trades_count"] write_header = total == 0 print(f"Resuming from {total} trades...") while total < limit: url = f"{BASE_URL}/markets/trades?limit=1000" if cursor: url += f"&cursor={cursor}" print(f"Fetching trades... ({total:,}/{limit:,})") try: data = fetch_json(url) except Exception as e: print(f"Error fetching trades: {e}") print(f"Progress saved. Run again to resume from {total:,} trades.") return total batch = data.get("trades", []) if not batch: break append_trades_csv(batch, output_path, write_header) write_header = False total += len(batch) cursor = data.get("cursor") state["trades_cursor"] = cursor state["trades_count"] = total save_state(output_dir, state) if not cursor: state["trades_done"] = True save_state(output_dir, state) break time.sleep(0.3) return total def main(): output_dir = Path("/mnt/work/kalshi-data") output_dir.mkdir(exist_ok=True) print("=" * 50) print("Kalshi Data Fetcher (with resume)") print("=" * 50) state = load_state(output_dir) # fetch markets if not state["markets_done"]: print("\n[1/2] Fetching markets...") markets_count = fetch_markets_incremental(output_dir, state) if state["markets_done"]: print(f"Markets complete: {markets_count:,}") else: print(f"Markets paused at: {markets_count:,}") return 1 else: print(f"\n[1/2] Markets already complete: {state['markets_count']:,}") # fetch trades if not state["trades_done"]: print("\n[2/2] Fetching trades...") trades_count = fetch_trades_incremental(output_dir, state, limit=1000000) if state["trades_done"]: print(f"Trades complete: {trades_count:,}") else: print(f"Trades paused at: {trades_count:,}") return 1 else: print(f"\n[2/2] Trades already complete: {state['trades_count']:,}") print("\n" + "=" * 50) print("Done!") print(f"Markets: {state['markets_count']:,}") print(f"Trades: {state['trades_count']:,}") print(f"Output: {output_dir}") print("=" * 50) # clear state for next run (output_dir / STATE_FILE).unlink(missing_ok=True) return 0 if __name__ == "__main__": exit(main())