clawdbot-workspace/scripts/captcha_browser.py
2026-01-28 23:00:58 -05:00

331 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Audio Captcha Solver - Browser Network Interception
Intercepts audio files from browser network requests instead of recording system audio.
No BlackHole needed!
Usage:
python captcha_browser.py --url "https://example.com/login" # Opens page, waits for audio
python captcha_browser.py --listen # Attach to existing Chrome
python captcha_browser.py --help
Requires: playwright (pip install playwright && playwright install)
"""
import asyncio
import argparse
import json
import re
import os
import sys
import tempfile
import subprocess
from pathlib import Path
from datetime import datetime
from urllib.parse import urlparse
try:
from playwright.async_api import async_playwright
except ImportError:
print("❌ Playwright not installed. Run:")
print(" pip install playwright && playwright install chromium")
sys.exit(1)
# Audio file patterns to intercept
AUDIO_PATTERNS = [
r'\.mp3',
r'\.wav',
r'\.ogg',
r'\.m4a',
r'\.webm',
r'audio/',
r'captcha.*audio',
r'recaptcha.*audio',
r'hcaptcha.*audio',
r'/audio\?',
r'payload.*audio',
]
class AudioCaptchaSolver:
def __init__(self, output_dir=None):
self.output_dir = Path(output_dir or tempfile.gettempdir()) / "captcha-audio"
self.output_dir.mkdir(exist_ok=True)
self.captured_audio = []
self.browser = None
self.context = None
self.page = None
def is_audio_request(self, url, content_type=""):
"""Check if a request is likely an audio file."""
url_lower = url.lower()
content_lower = content_type.lower()
# Check content type
if any(t in content_lower for t in ['audio/', 'mpeg', 'wav', 'ogg', 'webm']):
return True
# Check URL patterns
for pattern in AUDIO_PATTERNS:
if re.search(pattern, url_lower):
return True
return False
async def handle_response(self, response):
"""Handle network responses and capture audio files."""
url = response.url
content_type = response.headers.get('content-type', '')
if self.is_audio_request(url, content_type):
try:
# Get the audio data
body = await response.body()
# Determine file extension
if 'mp3' in url.lower() or 'mpeg' in content_type:
ext = '.mp3'
elif 'wav' in url.lower() or 'wav' in content_type:
ext = '.wav'
elif 'ogg' in url.lower() or 'ogg' in content_type:
ext = '.ogg'
elif 'webm' in url.lower() or 'webm' in content_type:
ext = '.webm'
else:
ext = '.mp3' # Default
# Save the file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"captcha_{timestamp}{ext}"
filepath = self.output_dir / filename
with open(filepath, 'wb') as f:
f.write(body)
self.captured_audio.append({
'url': url,
'path': str(filepath),
'size': len(body),
'content_type': content_type
})
print(f"🎵 Captured audio: {filename} ({len(body)} bytes)")
print(f" URL: {url[:80]}...")
except Exception as e:
print(f"⚠️ Failed to capture {url}: {e}")
async def start_browser(self, url=None, headless=False):
"""Start browser and begin monitoring."""
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(headless=headless)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
# Set up network interception
self.page.on("response", self.handle_response)
if url:
print(f"🌐 Navigating to: {url}")
await self.page.goto(url)
return self.page
async def wait_for_audio(self, timeout=60):
"""Wait for audio to be captured."""
print(f"👂 Listening for audio captcha (timeout: {timeout}s)...")
print(" Click the audio captcha button when ready!")
print()
start = asyncio.get_event_loop().time()
initial_count = len(self.captured_audio)
while asyncio.get_event_loop().time() - start < timeout:
if len(self.captured_audio) > initial_count:
# Give a moment for any additional audio chunks
await asyncio.sleep(1)
return self.captured_audio[-1]
await asyncio.sleep(0.5)
return None
async def close(self):
"""Clean up browser."""
if self.browser:
await self.browser.close()
def analyze_audio(audio_path, mode="transcribe", target=None):
"""Analyze captured audio using Whisper or Gemini."""
if mode == "transcribe":
print("🧠 Transcribing with Whisper...")
cmd = [
"whisper", audio_path,
"--model", "small",
"--language", "en",
"--output_format", "txt",
"--output_dir", str(Path(audio_path).parent)
]
subprocess.run(cmd, capture_output=True)
txt_path = audio_path.rsplit('.', 1)[0] + '.txt'
if os.path.exists(txt_path):
with open(txt_path, 'r') as f:
text = f.read().strip()
# Extract alphanumeric
chars = re.findall(r'[A-Za-z0-9]', text)
extracted = ''.join(chars).upper()
return {
"success": True,
"mode": "transcribe",
"raw_text": text,
"extracted_chars": extracted,
"answer": extracted
}
elif mode == "identify":
if not target:
target = "the requested sound"
print(f"🧠 Asking Gemini: which sound is '{target}'?")
prompt = f"""Listen to this audio captcha. It contains multiple sounds.
Which sound is a "{target}"?
Reply with ONLY the number (1, 2, 3, etc.) of the matching sound.
If sounds are labeled, use those labels. Otherwise, count by order (first=1, second=2, etc.)
Just the number, nothing else."""
cmd = ["gemini", "-p", prompt, "-f", audio_path]
result = subprocess.run(cmd, capture_output=True, text=True)
response = result.stdout.strip()
numbers = re.findall(r'\d+', response)
answer = numbers[0] if numbers else response
return {
"success": True,
"mode": "identify",
"target_sound": target,
"answer": answer,
"raw_response": response
}
elif mode == "describe":
print("🧠 Asking Gemini to describe all sounds...")
prompt = """Listen to this audio and describe each distinct sound you hear.
Format as a numbered list:
1: [description]
2: [description]
etc."""
cmd = ["gemini", "-p", prompt, "-f", audio_path]
result = subprocess.run(cmd, capture_output=True, text=True)
return {
"success": True,
"mode": "describe",
"description": result.stdout.strip()
}
return {"success": False, "error": "Unknown mode"}
async def main_async(args):
solver = AudioCaptchaSolver()
try:
# Start browser
await solver.start_browser(url=args.url, headless=False)
print()
print("=" * 50)
print("🎯 AUDIO CAPTCHA SOLVER")
print("=" * 50)
print()
print("1. Find the audio captcha on the page")
print("2. Click the audio/speaker button to play it")
print("3. I'll intercept the audio file automatically")
print()
# Wait for audio
audio = await solver.wait_for_audio(timeout=args.timeout)
if audio:
print()
print(f"✅ Got audio file: {audio['path']}")
print()
# Analyze it
result = analyze_audio(audio['path'], mode=args.mode, target=args.target)
print()
print("=" * 50)
print("📝 RESULT:")
print("=" * 50)
if result.get("success"):
if args.mode == "transcribe":
print(f"Raw text: {result.get('raw_text', 'N/A')}")
print(f"Extracted: {result.get('extracted_chars', 'N/A')}")
answer = result.get('answer', '')
elif args.mode == "identify":
print(f"Target: {result.get('target_sound', 'N/A')}")
print(f"Answer: {result.get('answer', 'N/A')}")
answer = result.get('answer', '')
elif args.mode == "describe":
print(result.get('description', 'N/A'))
answer = ""
if answer:
subprocess.run(["pbcopy"], input=answer.encode(), check=True)
print()
print(f"📋 Copied to clipboard: {answer}")
else:
print(f"Error: {result.get('error', 'Unknown')}")
if args.json:
print()
print("JSON output:")
print(json.dumps(result, indent=2))
else:
print("❌ No audio captured within timeout")
# Keep browser open for manual interaction if needed
if not args.auto_close:
print()
input("Press Enter to close browser...")
finally:
await solver.close()
def main():
parser = argparse.ArgumentParser(description="Audio Captcha Solver - Browser Network Interception")
parser.add_argument("--url", "-u", help="URL to open (captcha page)")
parser.add_argument("--mode", choices=["transcribe", "identify", "describe"], default="transcribe",
help="Analysis mode")
parser.add_argument("--target", "-t", help="For identify mode: sound to find")
parser.add_argument("--timeout", type=int, default=60, help="Timeout waiting for audio (seconds)")
parser.add_argument("--json", "-j", action="store_true", help="Output JSON result")
parser.add_argument("--auto-close", action="store_true", help="Close browser automatically after capture")
args = parser.parse_args()
if not args.url:
print("Usage: python captcha_browser.py --url 'https://example.com/login'")
print()
print("This will open a browser, monitor network requests, and capture")
print("any audio captcha files automatically when you click play.")
sys.exit(1)
asyncio.run(main_async(args))
if __name__ == "__main__":
main()