160 lines
4.5 KiB
Python
160 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
TextMe Token Capture Script for mitmproxy
|
|
|
|
This script intercepts TextMe mobile app traffic to capture JWT tokens
|
|
and discover API endpoints.
|
|
|
|
Usage:
|
|
mitmproxy -s capture-token.py
|
|
|
|
Then configure your phone to use the proxy (usually your computer's IP:8080)
|
|
and install mitmproxy's CA certificate on your device.
|
|
|
|
Captured tokens are saved to ~/.textme/captured-token.json
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from mitmproxy import http, ctx
|
|
|
|
# Domains to intercept
|
|
TEXTME_DOMAINS = [
|
|
"api.textme-app.com",
|
|
"textme-app.com",
|
|
"textmeup.com",
|
|
"go-text.me",
|
|
]
|
|
|
|
# Storage path
|
|
TOKEN_PATH = Path.home() / ".textme" / "captured-token.json"
|
|
ENDPOINTS_PATH = Path.home() / ".textme" / "discovered-endpoints.json"
|
|
|
|
# Track discovered endpoints
|
|
discovered_endpoints = set()
|
|
|
|
|
|
def save_token(token: str, source: str):
|
|
"""Save captured token to file."""
|
|
TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
data = {
|
|
"token": token,
|
|
"captured_at": datetime.now().isoformat(),
|
|
"source": source,
|
|
}
|
|
|
|
TOKEN_PATH.write_text(json.dumps(data, indent=2))
|
|
ctx.log.info(f"🎉 TOKEN CAPTURED! Saved to {TOKEN_PATH}")
|
|
ctx.log.info(f"Token preview: {token[:50]}...")
|
|
|
|
|
|
def save_endpoints():
|
|
"""Save discovered endpoints to file."""
|
|
ENDPOINTS_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
data = {
|
|
"discovered_at": datetime.now().isoformat(),
|
|
"endpoints": sorted(list(discovered_endpoints)),
|
|
}
|
|
|
|
ENDPOINTS_PATH.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
def request(flow: http.HTTPFlow) -> None:
|
|
"""Intercept requests to TextMe domains."""
|
|
host = flow.request.host
|
|
|
|
# Check if this is a TextMe domain
|
|
if not any(domain in host for domain in TEXTME_DOMAINS):
|
|
return
|
|
|
|
# Log the endpoint
|
|
method = flow.request.method
|
|
path = flow.request.path
|
|
endpoint = f"{method} {path.split('?')[0]}"
|
|
|
|
if endpoint not in discovered_endpoints:
|
|
discovered_endpoints.add(endpoint)
|
|
ctx.log.info(f"📍 New endpoint: {endpoint}")
|
|
save_endpoints()
|
|
|
|
# Check for JWT in Authorization header
|
|
auth_header = flow.request.headers.get("Authorization", "")
|
|
|
|
if auth_header.startswith("JWT "):
|
|
token = auth_header[4:]
|
|
save_token(token, f"Request to {host}{path}")
|
|
|
|
elif auth_header.startswith("Bearer "):
|
|
token = auth_header[7:]
|
|
save_token(token, f"Request to {host}{path}")
|
|
|
|
# Log request details
|
|
ctx.log.info(f"→ {method} https://{host}{path}")
|
|
|
|
# Log request body for POST/PUT
|
|
if method in ["POST", "PUT", "PATCH"] and flow.request.content:
|
|
try:
|
|
body = flow.request.content.decode('utf-8')
|
|
if len(body) < 500:
|
|
ctx.log.info(f" Body: {body}")
|
|
except:
|
|
pass
|
|
|
|
|
|
def response(flow: http.HTTPFlow) -> None:
|
|
"""Intercept responses from TextMe domains."""
|
|
host = flow.request.host
|
|
|
|
# Check if this is a TextMe domain
|
|
if not any(domain in host for domain in TEXTME_DOMAINS):
|
|
return
|
|
|
|
# Log response
|
|
status = flow.response.status_code
|
|
ctx.log.info(f"← {status} https://{host}{flow.request.path}")
|
|
|
|
# Check for token in response body
|
|
if flow.response.content:
|
|
try:
|
|
body = flow.response.content.decode('utf-8')
|
|
|
|
# Try to parse as JSON
|
|
try:
|
|
data = json.loads(body)
|
|
|
|
# Look for token fields
|
|
for key in ["token", "access", "access_token", "jwt", "auth_token"]:
|
|
if key in data:
|
|
save_token(data[key], f"Response from {host}{flow.request.path}")
|
|
break
|
|
|
|
# Log interesting response data
|
|
if len(body) < 1000:
|
|
ctx.log.info(f" Response: {body}")
|
|
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
except:
|
|
pass
|
|
|
|
|
|
def done():
|
|
"""Called when mitmproxy shuts down."""
|
|
ctx.log.info(f"\n{'='*50}")
|
|
ctx.log.info("TextMe Token Capture Summary")
|
|
ctx.log.info(f"{'='*50}")
|
|
ctx.log.info(f"Discovered {len(discovered_endpoints)} endpoints")
|
|
|
|
if TOKEN_PATH.exists():
|
|
ctx.log.info(f"✅ Token saved to: {TOKEN_PATH}")
|
|
else:
|
|
ctx.log.info("❌ No token captured")
|
|
|
|
ctx.log.info(f"Endpoints saved to: {ENDPOINTS_PATH}")
|