2026-01-28 23:00:58 -05:00

160 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
TextMe Token Capture Script for mitmproxy
This script intercepts TextMe mobile app traffic to capture JWT tokens
and discover API endpoints.
Usage:
mitmproxy -s capture-token.py
Then configure your phone to use the proxy (usually your computer's IP:8080)
and install mitmproxy's CA certificate on your device.
Captured tokens are saved to ~/.textme/captured-token.json
"""
import json
import os
import re
from datetime import datetime
from pathlib import Path
from mitmproxy import http, ctx
# Domains to intercept
TEXTME_DOMAINS = [
"api.textme-app.com",
"textme-app.com",
"textmeup.com",
"go-text.me",
]
# Storage path
TOKEN_PATH = Path.home() / ".textme" / "captured-token.json"
ENDPOINTS_PATH = Path.home() / ".textme" / "discovered-endpoints.json"
# Track discovered endpoints
discovered_endpoints = set()
def save_token(token: str, source: str):
"""Save captured token to file."""
TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
data = {
"token": token,
"captured_at": datetime.now().isoformat(),
"source": source,
}
TOKEN_PATH.write_text(json.dumps(data, indent=2))
ctx.log.info(f"🎉 TOKEN CAPTURED! Saved to {TOKEN_PATH}")
ctx.log.info(f"Token preview: {token[:50]}...")
def save_endpoints():
"""Save discovered endpoints to file."""
ENDPOINTS_PATH.parent.mkdir(parents=True, exist_ok=True)
data = {
"discovered_at": datetime.now().isoformat(),
"endpoints": sorted(list(discovered_endpoints)),
}
ENDPOINTS_PATH.write_text(json.dumps(data, indent=2))
def request(flow: http.HTTPFlow) -> None:
"""Intercept requests to TextMe domains."""
host = flow.request.host
# Check if this is a TextMe domain
if not any(domain in host for domain in TEXTME_DOMAINS):
return
# Log the endpoint
method = flow.request.method
path = flow.request.path
endpoint = f"{method} {path.split('?')[0]}"
if endpoint not in discovered_endpoints:
discovered_endpoints.add(endpoint)
ctx.log.info(f"📍 New endpoint: {endpoint}")
save_endpoints()
# Check for JWT in Authorization header
auth_header = flow.request.headers.get("Authorization", "")
if auth_header.startswith("JWT "):
token = auth_header[4:]
save_token(token, f"Request to {host}{path}")
elif auth_header.startswith("Bearer "):
token = auth_header[7:]
save_token(token, f"Request to {host}{path}")
# Log request details
ctx.log.info(f"{method} https://{host}{path}")
# Log request body for POST/PUT
if method in ["POST", "PUT", "PATCH"] and flow.request.content:
try:
body = flow.request.content.decode('utf-8')
if len(body) < 500:
ctx.log.info(f" Body: {body}")
except:
pass
def response(flow: http.HTTPFlow) -> None:
"""Intercept responses from TextMe domains."""
host = flow.request.host
# Check if this is a TextMe domain
if not any(domain in host for domain in TEXTME_DOMAINS):
return
# Log response
status = flow.response.status_code
ctx.log.info(f"{status} https://{host}{flow.request.path}")
# Check for token in response body
if flow.response.content:
try:
body = flow.response.content.decode('utf-8')
# Try to parse as JSON
try:
data = json.loads(body)
# Look for token fields
for key in ["token", "access", "access_token", "jwt", "auth_token"]:
if key in data:
save_token(data[key], f"Response from {host}{flow.request.path}")
break
# Log interesting response data
if len(body) < 1000:
ctx.log.info(f" Response: {body}")
except json.JSONDecodeError:
pass
except:
pass
def done():
"""Called when mitmproxy shuts down."""
ctx.log.info(f"\n{'='*50}")
ctx.log.info("TextMe Token Capture Summary")
ctx.log.info(f"{'='*50}")
ctx.log.info(f"Discovered {len(discovered_endpoints)} endpoints")
if TOKEN_PATH.exists():
ctx.log.info(f"✅ Token saved to: {TOKEN_PATH}")
else:
ctx.log.info("❌ No token captured")
ctx.log.info(f"Endpoints saved to: {ENDPOINTS_PATH}")