331 lines
11 KiB
Python
Executable File
331 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Audio Captcha Solver - Browser Network Interception
|
|
|
|
Intercepts audio files from browser network requests instead of recording system audio.
|
|
No BlackHole needed!
|
|
|
|
Usage:
|
|
python captcha_browser.py --url "https://example.com/login" # Opens page, waits for audio
|
|
python captcha_browser.py --listen # Attach to existing Chrome
|
|
python captcha_browser.py --help
|
|
|
|
Requires: playwright (pip install playwright && playwright install)
|
|
"""
|
|
|
|
import asyncio
|
|
import argparse
|
|
import json
|
|
import re
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
try:
|
|
from playwright.async_api import async_playwright
|
|
except ImportError:
|
|
print("❌ Playwright not installed. Run:")
|
|
print(" pip install playwright && playwright install chromium")
|
|
sys.exit(1)
|
|
|
|
|
|
# Audio file patterns to intercept
|
|
AUDIO_PATTERNS = [
|
|
r'\.mp3',
|
|
r'\.wav',
|
|
r'\.ogg',
|
|
r'\.m4a',
|
|
r'\.webm',
|
|
r'audio/',
|
|
r'captcha.*audio',
|
|
r'recaptcha.*audio',
|
|
r'hcaptcha.*audio',
|
|
r'/audio\?',
|
|
r'payload.*audio',
|
|
]
|
|
|
|
class AudioCaptchaSolver:
|
|
def __init__(self, output_dir=None):
|
|
self.output_dir = Path(output_dir or tempfile.gettempdir()) / "captcha-audio"
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
self.captured_audio = []
|
|
self.browser = None
|
|
self.context = None
|
|
self.page = None
|
|
|
|
def is_audio_request(self, url, content_type=""):
|
|
"""Check if a request is likely an audio file."""
|
|
url_lower = url.lower()
|
|
content_lower = content_type.lower()
|
|
|
|
# Check content type
|
|
if any(t in content_lower for t in ['audio/', 'mpeg', 'wav', 'ogg', 'webm']):
|
|
return True
|
|
|
|
# Check URL patterns
|
|
for pattern in AUDIO_PATTERNS:
|
|
if re.search(pattern, url_lower):
|
|
return True
|
|
|
|
return False
|
|
|
|
async def handle_response(self, response):
|
|
"""Handle network responses and capture audio files."""
|
|
url = response.url
|
|
content_type = response.headers.get('content-type', '')
|
|
|
|
if self.is_audio_request(url, content_type):
|
|
try:
|
|
# Get the audio data
|
|
body = await response.body()
|
|
|
|
# Determine file extension
|
|
if 'mp3' in url.lower() or 'mpeg' in content_type:
|
|
ext = '.mp3'
|
|
elif 'wav' in url.lower() or 'wav' in content_type:
|
|
ext = '.wav'
|
|
elif 'ogg' in url.lower() or 'ogg' in content_type:
|
|
ext = '.ogg'
|
|
elif 'webm' in url.lower() or 'webm' in content_type:
|
|
ext = '.webm'
|
|
else:
|
|
ext = '.mp3' # Default
|
|
|
|
# Save the file
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
filename = f"captcha_{timestamp}{ext}"
|
|
filepath = self.output_dir / filename
|
|
|
|
with open(filepath, 'wb') as f:
|
|
f.write(body)
|
|
|
|
self.captured_audio.append({
|
|
'url': url,
|
|
'path': str(filepath),
|
|
'size': len(body),
|
|
'content_type': content_type
|
|
})
|
|
|
|
print(f"🎵 Captured audio: {filename} ({len(body)} bytes)")
|
|
print(f" URL: {url[:80]}...")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Failed to capture {url}: {e}")
|
|
|
|
async def start_browser(self, url=None, headless=False):
|
|
"""Start browser and begin monitoring."""
|
|
playwright = await async_playwright().start()
|
|
|
|
self.browser = await playwright.chromium.launch(headless=headless)
|
|
self.context = await self.browser.new_context()
|
|
self.page = await self.context.new_page()
|
|
|
|
# Set up network interception
|
|
self.page.on("response", self.handle_response)
|
|
|
|
if url:
|
|
print(f"🌐 Navigating to: {url}")
|
|
await self.page.goto(url)
|
|
|
|
return self.page
|
|
|
|
async def wait_for_audio(self, timeout=60):
|
|
"""Wait for audio to be captured."""
|
|
print(f"👂 Listening for audio captcha (timeout: {timeout}s)...")
|
|
print(" Click the audio captcha button when ready!")
|
|
print()
|
|
|
|
start = asyncio.get_event_loop().time()
|
|
initial_count = len(self.captured_audio)
|
|
|
|
while asyncio.get_event_loop().time() - start < timeout:
|
|
if len(self.captured_audio) > initial_count:
|
|
# Give a moment for any additional audio chunks
|
|
await asyncio.sleep(1)
|
|
return self.captured_audio[-1]
|
|
await asyncio.sleep(0.5)
|
|
|
|
return None
|
|
|
|
async def close(self):
|
|
"""Clean up browser."""
|
|
if self.browser:
|
|
await self.browser.close()
|
|
|
|
|
|
def analyze_audio(audio_path, mode="transcribe", target=None):
|
|
"""Analyze captured audio using Whisper or Gemini."""
|
|
|
|
if mode == "transcribe":
|
|
print("🧠 Transcribing with Whisper...")
|
|
cmd = [
|
|
"whisper", audio_path,
|
|
"--model", "small",
|
|
"--language", "en",
|
|
"--output_format", "txt",
|
|
"--output_dir", str(Path(audio_path).parent)
|
|
]
|
|
subprocess.run(cmd, capture_output=True)
|
|
|
|
txt_path = audio_path.rsplit('.', 1)[0] + '.txt'
|
|
if os.path.exists(txt_path):
|
|
with open(txt_path, 'r') as f:
|
|
text = f.read().strip()
|
|
# Extract alphanumeric
|
|
chars = re.findall(r'[A-Za-z0-9]', text)
|
|
extracted = ''.join(chars).upper()
|
|
return {
|
|
"success": True,
|
|
"mode": "transcribe",
|
|
"raw_text": text,
|
|
"extracted_chars": extracted,
|
|
"answer": extracted
|
|
}
|
|
|
|
elif mode == "identify":
|
|
if not target:
|
|
target = "the requested sound"
|
|
|
|
print(f"🧠 Asking Gemini: which sound is '{target}'?")
|
|
|
|
prompt = f"""Listen to this audio captcha. It contains multiple sounds.
|
|
Which sound is a "{target}"?
|
|
Reply with ONLY the number (1, 2, 3, etc.) of the matching sound.
|
|
If sounds are labeled, use those labels. Otherwise, count by order (first=1, second=2, etc.)
|
|
Just the number, nothing else."""
|
|
|
|
cmd = ["gemini", "-p", prompt, "-f", audio_path]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
response = result.stdout.strip()
|
|
numbers = re.findall(r'\d+', response)
|
|
answer = numbers[0] if numbers else response
|
|
|
|
return {
|
|
"success": True,
|
|
"mode": "identify",
|
|
"target_sound": target,
|
|
"answer": answer,
|
|
"raw_response": response
|
|
}
|
|
|
|
elif mode == "describe":
|
|
print("🧠 Asking Gemini to describe all sounds...")
|
|
|
|
prompt = """Listen to this audio and describe each distinct sound you hear.
|
|
Format as a numbered list:
|
|
1: [description]
|
|
2: [description]
|
|
etc."""
|
|
|
|
cmd = ["gemini", "-p", prompt, "-f", audio_path]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
return {
|
|
"success": True,
|
|
"mode": "describe",
|
|
"description": result.stdout.strip()
|
|
}
|
|
|
|
return {"success": False, "error": "Unknown mode"}
|
|
|
|
|
|
async def main_async(args):
|
|
solver = AudioCaptchaSolver()
|
|
|
|
try:
|
|
# Start browser
|
|
await solver.start_browser(url=args.url, headless=False)
|
|
|
|
print()
|
|
print("=" * 50)
|
|
print("🎯 AUDIO CAPTCHA SOLVER")
|
|
print("=" * 50)
|
|
print()
|
|
print("1. Find the audio captcha on the page")
|
|
print("2. Click the audio/speaker button to play it")
|
|
print("3. I'll intercept the audio file automatically")
|
|
print()
|
|
|
|
# Wait for audio
|
|
audio = await solver.wait_for_audio(timeout=args.timeout)
|
|
|
|
if audio:
|
|
print()
|
|
print(f"✅ Got audio file: {audio['path']}")
|
|
print()
|
|
|
|
# Analyze it
|
|
result = analyze_audio(audio['path'], mode=args.mode, target=args.target)
|
|
|
|
print()
|
|
print("=" * 50)
|
|
print("📝 RESULT:")
|
|
print("=" * 50)
|
|
|
|
if result.get("success"):
|
|
if args.mode == "transcribe":
|
|
print(f"Raw text: {result.get('raw_text', 'N/A')}")
|
|
print(f"Extracted: {result.get('extracted_chars', 'N/A')}")
|
|
answer = result.get('answer', '')
|
|
elif args.mode == "identify":
|
|
print(f"Target: {result.get('target_sound', 'N/A')}")
|
|
print(f"Answer: {result.get('answer', 'N/A')}")
|
|
answer = result.get('answer', '')
|
|
elif args.mode == "describe":
|
|
print(result.get('description', 'N/A'))
|
|
answer = ""
|
|
|
|
if answer:
|
|
subprocess.run(["pbcopy"], input=answer.encode(), check=True)
|
|
print()
|
|
print(f"📋 Copied to clipboard: {answer}")
|
|
else:
|
|
print(f"Error: {result.get('error', 'Unknown')}")
|
|
|
|
if args.json:
|
|
print()
|
|
print("JSON output:")
|
|
print(json.dumps(result, indent=2))
|
|
|
|
else:
|
|
print("❌ No audio captured within timeout")
|
|
|
|
# Keep browser open for manual interaction if needed
|
|
if not args.auto_close:
|
|
print()
|
|
input("Press Enter to close browser...")
|
|
|
|
finally:
|
|
await solver.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Audio Captcha Solver - Browser Network Interception")
|
|
parser.add_argument("--url", "-u", help="URL to open (captcha page)")
|
|
parser.add_argument("--mode", choices=["transcribe", "identify", "describe"], default="transcribe",
|
|
help="Analysis mode")
|
|
parser.add_argument("--target", "-t", help="For identify mode: sound to find")
|
|
parser.add_argument("--timeout", type=int, default=60, help="Timeout waiting for audio (seconds)")
|
|
parser.add_argument("--json", "-j", action="store_true", help="Output JSON result")
|
|
parser.add_argument("--auto-close", action="store_true", help="Close browser automatically after capture")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.url:
|
|
print("Usage: python captcha_browser.py --url 'https://example.com/login'")
|
|
print()
|
|
print("This will open a browser, monitor network requests, and capture")
|
|
print("any audio captcha files automatically when you click play.")
|
|
sys.exit(1)
|
|
|
|
asyncio.run(main_async(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|