Jake Shore f3c4cd817b Add all MCP servers + factory infra to MCPEngine — 2026-02-06
=== NEW SERVERS ADDED (7) ===
- servers/closebot — 119 tools, 14 modules, 4,656 lines TS (Stage 7)
- servers/google-console — Google Search Console MCP (Stage 7)
- servers/meta-ads — Meta/Facebook Ads MCP (Stage 8)
- servers/twilio — Twilio communications MCP (Stage 8)
- servers/competitor-research — Competitive intel MCP (Stage 6)
- servers/n8n-apps — n8n workflow MCP apps (Stage 6)
- servers/reonomy — Commercial real estate MCP (Stage 1)

=== FACTORY INFRASTRUCTURE ADDED ===
- infra/factory-tools — mcp-jest, mcp-validator, mcp-add, MCP Inspector
  - 60 test configs, 702 auto-generated test cases
  - All 30 servers score 100/100 protocol compliance
- infra/command-center — Pipeline state, operator playbook, dashboard config
- infra/factory-reviews — Automated eval reports

=== DOCS ADDED ===
- docs/MCP-FACTORY.md — Factory overview
- docs/reports/ — 5 pipeline evaluation reports
- docs/research/ — Browser MCP research

=== RULES ESTABLISHED ===
- CONTRIBUTING.md — All MCP work MUST go in this repo
- README.md — Full inventory of 37 servers + infra docs
- .gitignore — Updated for Python venvs

TOTAL: 37 MCP servers + full factory pipeline in one repo.
This is now the single source of truth for all MCP work.
2026-02-06 06:32:29 -05:00

351 lines
13 KiB
Python

# Copyright (c) 2025 Scott Wilcox
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test Runner for MCP Testing Framework.
This module provides a utility for running MCP test cases against different server implementations.
"""
import asyncio
import json
import os
import sys
import time
from typing import Dict, Any, List, Optional, Union, Callable, Tuple
from mcp_testing.protocols.base import MCPProtocolAdapter
from mcp_testing.transports.base import MCPTransportAdapter
from mcp_testing.transports.stdio import StdioTransportAdapter
from mcp_testing.transports.http import HttpTransportAdapter
from mcp_testing.protocols.v2024_11_05 import MCP2024_11_05Adapter
from mcp_testing.protocols.v2025_03_26 import MCP2025_03_26Adapter
class MCPTestRunner:
"""
Test runner for MCP testing framework.
This class provides a utility for running MCP test cases against different server
implementations and collecting results.
"""
def __init__(self, debug: bool = False):
"""
Initialize the test runner.
Args:
debug: Whether to enable debug output
"""
self.debug = debug
self.results = {}
# Check for shutdown skipping early
self.skip_shutdown = self._should_skip_shutdown()
if self.skip_shutdown and self.debug:
print("Note: Shutdown will be skipped based on environment configuration")
def _should_skip_shutdown(self) -> bool:
"""
Check if shutdown should be skipped based on environment variable.
Returns:
bool: True if shutdown should be skipped, False otherwise
"""
skip_shutdown = os.environ.get("MCP_SKIP_SHUTDOWN", "").lower()
return skip_shutdown in ("true", "1", "yes")
async def run_test(self, test_func: Callable[[MCPProtocolAdapter], Tuple[bool, str]],
server_command: str,
protocol_version: str,
test_name: str,
env_vars: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
transport_type: str = "stdio") -> Dict[str, Any]:
"""
Run a single test case.
Args:
test_func: The test function to run
server_command: The command to launch the server or server URL for HTTP
protocol_version: The protocol version to use
test_name: The name of the test
env_vars: Environment variables to pass to the server process
timeout: Optional timeout in seconds for the test execution
transport_type: Type of transport to use ("stdio" or "http")
Returns:
A dictionary containing the test results
"""
# Skip shutdown-related tests if shutdown is disabled
if self.skip_shutdown and (test_name == "test_shutdown" or test_name == "test_exit_after_shutdown"):
if self.debug:
print(f"Skipping {test_name} because shutdown is disabled")
result = {
"name": test_name,
"passed": True, # Mark as passed to avoid false failures
"message": "Test skipped because shutdown is disabled via MCP_SKIP_SHUTDOWN",
"duration": 0,
"skipped": True
}
self.results[test_name] = result
return result
if self.debug:
print(f"\nRunning test: {test_name}")
# Create a fresh transport adapter for each test
if self.debug:
if transport_type == "stdio":
print(f"Starting server process: {server_command}")
if env_vars:
print(f"Environment variables: {env_vars}")
else:
print(f"Connecting to server URL: {server_command}")
if transport_type == "stdio":
transport_adapter = StdioTransportAdapter(
server_command=server_command,
env_vars=env_vars,
debug=self.debug
)
else: # HTTP transport
transport_adapter = HttpTransportAdapter(
server_url=server_command,
debug=self.debug
)
# Create a fresh protocol adapter for each test
if protocol_version == "2024-11-05":
protocol_adapter = MCP2024_11_05Adapter(
transport=transport_adapter,
debug=self.debug
)
elif protocol_version == "2025-03-26":
protocol_adapter = MCP2025_03_26Adapter(
transport=transport_adapter,
debug=self.debug
)
elif protocol_version == "2025-06-18":
from mcp_testing.protocols.v2025_06_18 import MCP2025_06_18Adapter
protocol_adapter = MCP2025_06_18Adapter(
transport=transport_adapter,
debug=self.debug
)
else:
raise ValueError(f"Unsupported protocol version: {protocol_version}")
start_time = time.time()
try:
# Initialize the connection
if self.debug:
print(f"Initializing server...")
await protocol_adapter.initialize()
if self.debug:
print(f"Sending initialized notification...")
await protocol_adapter.send_initialized()
# Run the test with timeout if specified
if self.debug:
print(f"Executing test: {test_name}")
# Handle test with timeout if specified
if timeout:
try:
# Create a task for the test function
test_task = asyncio.create_task(test_func(protocol_adapter))
# Wait for either the task to complete or timeout
passed, message = await asyncio.wait_for(test_task, timeout=timeout)
except asyncio.TimeoutError:
# Check if this is a tools test which can be treated as non-critical
if test_name.startswith("test_tools_") or test_name.startswith("test_tool_"):
if self.debug:
print(f"⚠️ WARNING: Test {test_name} timed out after {timeout}s (continuing)")
passed = True # Treat as passed for compliance
message = f"Test timed out after {timeout}s but is considered non-critical"
# Mark it as a timeout for reporting
result = {
"name": test_name,
"passed": passed,
"message": message,
"duration": time.time() - start_time,
"timeout": True,
"non_critical": True
}
self.results[test_name] = result
# Skip shutdown for this test since we couldn't complete it normally
if not self.skip_shutdown:
if self.debug:
print(f"Skipping shutdown due to timeout")
# Just terminate the transport
await transport_adapter.terminate()
return result
else:
# For critical tests, consider timeout as failure
passed = False
message = f"Test timed out after {timeout}s"
else:
# Run the test without timeout
passed, message = await test_func(protocol_adapter)
if self.debug:
status = "PASSED" if passed else "FAILED"
print(f"Test execution complete: {status}")
if message:
print(f" Message: {message}")
# Determine whether to skip shutdown based on environment variables
# This respects both env_vars argument and global environment
skip_shutdown = self.skip_shutdown
if not skip_shutdown and env_vars:
skip_env = env_vars.get("MCP_SKIP_SHUTDOWN", "").lower()
skip_shutdown = skip_env in ("true", "1", "yes")
# Handle shutdown based on configuration
if not skip_shutdown:
try:
if self.debug:
print(f"Sending shutdown request...")
await protocol_adapter.shutdown()
if self.debug:
print(f"Sending exit notification...")
await protocol_adapter.exit()
except Exception as e:
if self.debug:
print(f"Error during shutdown: {str(e)}")
# Don't fail the test just because shutdown failed
pass
# Calculate test duration
duration = time.time() - start_time
# Store and return the result
result = {
"name": test_name,
"passed": passed,
"message": message,
"duration": duration
}
self.results[test_name] = result
return result
except Exception as e:
duration = time.time() - start_time
error_message = f"Test failed with error: {str(e)}"
if self.debug:
print(error_message)
import traceback
traceback.print_exc()
result = {
"name": test_name,
"passed": False,
"message": error_message,
"duration": duration
}
self.results[test_name] = result
return result
finally:
# Always terminate the transport
try:
await transport_adapter.terminate()
except:
pass
async def run_tests(self, tests: List[Tuple[Callable[[MCPProtocolAdapter], Tuple[bool, str]], str]],
protocol: str = "2024-11-05",
transport: str = "stdio",
server_command: str = None,
env_vars: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None) -> Dict[str, Any]:
"""
Run a list of test cases.
Args:
tests: List of (test_func, test_name) tuples
protocol: Protocol version to use
transport: Transport type to use ("stdio" or "http")
server_command: Command to launch server or server URL for HTTP
env_vars: Environment variables to pass to server process
timeout: Optional timeout in seconds for test execution
Returns:
A dictionary containing the aggregated test results
"""
if not server_command:
raise ValueError("server_command is required")
results = {
"results": [],
"passed": 0,
"failed": 0,
"skipped": 0,
"timeouts": 0
}
for test_func, test_name in tests:
result = await self.run_test(
test_func=test_func,
server_command=server_command,
protocol_version=protocol,
test_name=test_name,
env_vars=env_vars,
timeout=timeout,
transport_type=transport
)
results["results"].append(result)
if result.get("skipped", False):
results["skipped"] += 1
elif result.get("timeout", False):
results["timeouts"] += 1
elif result["passed"]:
results["passed"] += 1
else:
results["failed"] += 1
return results
# Convenience function to run tests
async def run_tests(tests: List[Tuple[Callable[[MCPProtocolAdapter], Tuple[bool, str]], str]],
protocol: str = "2024-11-05",
transport: str = "stdio",
server_command: str = None,
env_vars: Optional[Dict[str, str]] = None,
debug: bool = False,
timeout: Optional[int] = None) -> Dict[str, Any]:
"""
Run a list of test cases.
Args:
tests: A list of tuples containing (test_func, test_name)
protocol: The protocol version to use
transport: The transport type to use
server_command: The command to launch the server (for stdio transport)
env_vars: Environment variables to pass to the server process
debug: Whether to enable debug output
timeout: Optional timeout in seconds for each test execution
Returns:
A dictionary containing the test results
"""
runner = MCPTestRunner(debug=debug)
return await runner.run_tests(
tests=tests,
protocol=protocol,
transport=transport,
server_command=server_command,
env_vars=env_vars,
timeout=timeout
)