6 iterations of backtest refinements with key discoveries: - stop losses don't work for prediction markets (prices gap) - 50% take profit, no stop loss yields +9.37% vs +4.04% baseline - diversification beats concentration: 100 positions → +18.98% - added kalman filter, VPIN, regime detection scorers (research) exit config: take_profit 50%, stop_loss disabled, 48h max hold position sizing: kelly 0.40, max 30% per position, 100 max positions
353 lines
10 KiB
Python
Executable File
353 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Fetch historical trade and market data from Kalshi's public API.
|
|
No authentication required for public endpoints.
|
|
|
|
Features:
|
|
- Incremental saves (writes batches to disk)
|
|
- Resume capability (tracks cursor position)
|
|
- Retry logic with exponential backoff
|
|
- Date filtering for trades (--min-ts, --max-ts)
|
|
|
|
Usage:
|
|
# fetch everything (default)
|
|
python fetch_kalshi_data.py
|
|
|
|
# fetch trades from last 2 months with higher limit
|
|
python fetch_kalshi_data.py --min-ts 1763794800 --trade-limit 10000000
|
|
|
|
# reset trades state and refetch
|
|
python fetch_kalshi_data.py --reset-trades --min-ts 1763794800
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import csv
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
|
|
STATE_FILE = "fetch_state.json"
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Fetch Kalshi market and trade data")
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
type=str,
|
|
default="/mnt/work/kalshi-data",
|
|
help="Output directory for CSV files (default: /mnt/work/kalshi-data)"
|
|
)
|
|
parser.add_argument(
|
|
"--trade-limit",
|
|
type=int,
|
|
default=1_000_000,
|
|
help="Maximum number of trades to fetch (default: 1,000,000)"
|
|
)
|
|
parser.add_argument(
|
|
"--min-ts",
|
|
type=int,
|
|
default=None,
|
|
help="Minimum unix timestamp for trades (trades after this time)"
|
|
)
|
|
parser.add_argument(
|
|
"--max-ts",
|
|
type=int,
|
|
default=None,
|
|
help="Maximum unix timestamp for trades (trades before this time)"
|
|
)
|
|
parser.add_argument(
|
|
"--reset-trades",
|
|
action="store_true",
|
|
help="Reset trades state to fetch fresh (keeps markets done)"
|
|
)
|
|
parser.add_argument(
|
|
"--trades-only",
|
|
action="store_true",
|
|
help="Skip markets fetch, only fetch trades"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def fetch_json(url: str, max_retries: int = 5) -> dict:
|
|
"""Fetch JSON from URL with retries and exponential backoff."""
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except (urllib.error.HTTPError, urllib.error.URLError) as e:
|
|
wait = 2 ** attempt
|
|
print(f" attempt {attempt + 1}/{max_retries} failed: {e}")
|
|
if attempt < max_retries - 1:
|
|
print(f" retrying in {wait}s...")
|
|
time.sleep(wait)
|
|
else:
|
|
raise
|
|
except Exception as e:
|
|
wait = 2 ** attempt
|
|
print(f" unexpected error: {e}")
|
|
if attempt < max_retries - 1:
|
|
print(f" retrying in {wait}s...")
|
|
time.sleep(wait)
|
|
else:
|
|
raise
|
|
|
|
|
|
def load_state(output_dir: Path) -> dict:
|
|
"""Load saved state for resuming."""
|
|
state_path = output_dir / STATE_FILE
|
|
if state_path.exists():
|
|
with open(state_path) as f:
|
|
return json.load(f)
|
|
return {"markets_cursor": None, "markets_count": 0,
|
|
"trades_cursor": None, "trades_count": 0,
|
|
"markets_done": False, "trades_done": False}
|
|
|
|
|
|
def save_state(output_dir: Path, state: dict):
|
|
"""Save state for resuming."""
|
|
state_path = output_dir / STATE_FILE
|
|
with open(state_path, "w") as f:
|
|
json.dump(state, f)
|
|
|
|
|
|
def append_markets_csv(markets: list, output_path: Path, write_header: bool):
|
|
"""Append markets to CSV."""
|
|
mode = "w" if write_header else "a"
|
|
with open(output_path, mode, newline="") as f:
|
|
writer = csv.writer(f)
|
|
if write_header:
|
|
writer.writerow(["ticker", "title", "category", "open_time",
|
|
"close_time", "result", "status", "yes_bid",
|
|
"yes_ask", "volume", "open_interest"])
|
|
|
|
for m in markets:
|
|
result = ""
|
|
if m.get("result") == "yes":
|
|
result = "yes"
|
|
elif m.get("result") == "no":
|
|
result = "no"
|
|
elif m.get("status") == "finalized" and m.get("result"):
|
|
result = m.get("result")
|
|
|
|
writer.writerow([
|
|
m.get("ticker", ""),
|
|
m.get("title", ""),
|
|
m.get("category", ""),
|
|
m.get("open_time", ""),
|
|
m.get("close_time", m.get("expiration_time", "")),
|
|
result,
|
|
m.get("status", ""),
|
|
m.get("yes_bid", ""),
|
|
m.get("yes_ask", ""),
|
|
m.get("volume", ""),
|
|
m.get("open_interest", ""),
|
|
])
|
|
|
|
|
|
def append_trades_csv(trades: list, output_path: Path, write_header: bool):
|
|
"""Append trades to CSV."""
|
|
mode = "w" if write_header else "a"
|
|
with open(output_path, mode, newline="") as f:
|
|
writer = csv.writer(f)
|
|
if write_header:
|
|
writer.writerow(["timestamp", "ticker", "price", "volume", "taker_side"])
|
|
|
|
for t in trades:
|
|
price = t.get("yes_price", t.get("price", 50))
|
|
taker_side = t.get("taker_side", "")
|
|
if not taker_side:
|
|
taker_side = "yes" if t.get("is_taker_side_yes", True) else "no"
|
|
|
|
writer.writerow([
|
|
t.get("created_time", t.get("ts", "")),
|
|
t.get("ticker", t.get("market_ticker", "")),
|
|
price,
|
|
t.get("count", t.get("volume", 1)),
|
|
taker_side,
|
|
])
|
|
|
|
|
|
def fetch_markets_incremental(output_dir: Path, state: dict) -> int:
|
|
"""Fetch markets incrementally with state tracking."""
|
|
output_path = output_dir / "markets.csv"
|
|
cursor = state["markets_cursor"]
|
|
total = state["markets_count"]
|
|
write_header = total == 0
|
|
|
|
print(f"Resuming from {total} markets...")
|
|
|
|
while True:
|
|
url = f"{BASE_URL}/markets?limit=1000"
|
|
if cursor:
|
|
url += f"&cursor={cursor}"
|
|
|
|
print(f"Fetching markets... ({total:,} so far)")
|
|
|
|
try:
|
|
data = fetch_json(url)
|
|
except Exception as e:
|
|
print(f"Error fetching markets: {e}")
|
|
print(f"Progress saved. Run again to resume from {total:,} markets.")
|
|
return total
|
|
|
|
batch = data.get("markets", [])
|
|
if batch:
|
|
append_markets_csv(batch, output_path, write_header)
|
|
write_header = False
|
|
total += len(batch)
|
|
|
|
cursor = data.get("cursor")
|
|
state["markets_cursor"] = cursor
|
|
state["markets_count"] = total
|
|
save_state(output_dir, state)
|
|
|
|
if not cursor:
|
|
state["markets_done"] = True
|
|
save_state(output_dir, state)
|
|
break
|
|
|
|
time.sleep(0.3)
|
|
|
|
return total
|
|
|
|
|
|
def fetch_trades_incremental(
|
|
output_dir: Path,
|
|
state: dict,
|
|
limit: int,
|
|
min_ts: int = None,
|
|
max_ts: int = None
|
|
) -> int:
|
|
"""Fetch trades incrementally with state tracking."""
|
|
output_path = output_dir / "trades.csv"
|
|
cursor = state["trades_cursor"]
|
|
total = state["trades_count"]
|
|
write_header = total == 0
|
|
|
|
if total == 0:
|
|
print("Starting fresh trades fetch...")
|
|
else:
|
|
print(f"Resuming from {total:,} trades...")
|
|
|
|
if min_ts:
|
|
print(f" min_ts filter: {min_ts} ({datetime.fromtimestamp(min_ts)})")
|
|
if max_ts:
|
|
print(f" max_ts filter: {max_ts} ({datetime.fromtimestamp(max_ts)})")
|
|
|
|
while total < limit:
|
|
url = f"{BASE_URL}/markets/trades?limit=1000"
|
|
if cursor:
|
|
url += f"&cursor={cursor}"
|
|
if min_ts:
|
|
url += f"&min_ts={min_ts}"
|
|
if max_ts:
|
|
url += f"&max_ts={max_ts}"
|
|
|
|
print(f"Fetching trades... ({total:,}/{limit:,})")
|
|
|
|
try:
|
|
data = fetch_json(url)
|
|
except Exception as e:
|
|
print(f"Error fetching trades: {e}")
|
|
print(f"Progress saved. Run again to resume from {total:,} trades.")
|
|
return total
|
|
|
|
batch = data.get("trades", [])
|
|
if not batch:
|
|
break
|
|
|
|
append_trades_csv(batch, output_path, write_header)
|
|
write_header = False
|
|
total += len(batch)
|
|
|
|
cursor = data.get("cursor")
|
|
state["trades_cursor"] = cursor
|
|
state["trades_count"] = total
|
|
save_state(output_dir, state)
|
|
|
|
if not cursor:
|
|
state["trades_done"] = True
|
|
save_state(output_dir, state)
|
|
break
|
|
|
|
time.sleep(0.3)
|
|
|
|
return total
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
print("=" * 50)
|
|
print("Kalshi Data Fetcher (with resume)")
|
|
print("=" * 50)
|
|
print(f"Output: {output_dir}")
|
|
print(f"Trade limit: {args.trade_limit:,}")
|
|
|
|
state = load_state(output_dir)
|
|
|
|
# reset trades state if requested
|
|
if args.reset_trades:
|
|
print("\nResetting trades state...")
|
|
state["trades_cursor"] = None
|
|
state["trades_count"] = 0
|
|
state["trades_done"] = False
|
|
save_state(output_dir, state)
|
|
|
|
# fetch markets (skip if --trades-only)
|
|
if not args.trades_only:
|
|
if not state["markets_done"]:
|
|
print("\n[1/2] Fetching markets...")
|
|
markets_count = fetch_markets_incremental(output_dir, state)
|
|
if state["markets_done"]:
|
|
print(f"Markets complete: {markets_count:,}")
|
|
else:
|
|
print(f"Markets paused at: {markets_count:,}")
|
|
return 1
|
|
else:
|
|
print(f"\n[1/2] Markets already complete: {state['markets_count']:,}")
|
|
else:
|
|
print("\n[1/2] Skipping markets (--trades-only)")
|
|
|
|
# fetch trades
|
|
if not state["trades_done"]:
|
|
print("\n[2/2] Fetching trades...")
|
|
trades_count = fetch_trades_incremental(
|
|
output_dir,
|
|
state,
|
|
limit=args.trade_limit,
|
|
min_ts=args.min_ts,
|
|
max_ts=args.max_ts
|
|
)
|
|
if state["trades_done"]:
|
|
print(f"Trades complete: {trades_count:,}")
|
|
else:
|
|
print(f"Trades paused at: {trades_count:,}")
|
|
return 1
|
|
else:
|
|
print(f"\n[2/2] Trades already complete: {state['trades_count']:,}")
|
|
|
|
print("\n" + "=" * 50)
|
|
print("Done!")
|
|
print(f"Markets: {state['markets_count']:,}")
|
|
print(f"Trades: {state['trades_count']:,}")
|
|
print(f"Output: {output_dir}")
|
|
print("=" * 50)
|
|
|
|
# clear state for next run
|
|
(output_dir / STATE_FILE).unlink(missing_ok=True)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|