| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- #!/usr/bin/env python3
- """
- Test MCP server with various message sizes and edge cases
- """
- import subprocess
- import time
- import json
- import sys
- import os
- import threading
- import queue
- class MCPServerTester:
- def __init__(self, server_path):
- self.server_path = server_path
- self.server_process = None
- self.message_queue = queue.Queue()
-
- def start_server(self):
- """Start MCP server process"""
- print(f"Starting MCP server: {self.server_path}")
- self.server_process = subprocess.Popen(
- [self.server_path],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=False,
- bufsize=0
- )
-
- # Start thread to read stderr
- stderr_thread = threading.Thread(target=self._read_stderr)
- stderr_thread.daemon = True
- stderr_thread.start()
-
- # Start thread to read stdout
- stdout_thread = threading.Thread(target=self._read_stdout)
- stdout_thread.daemon = True
- stdout_thread.start()
-
- time.sleep(1)
-
- def _read_stderr(self):
- """Read stderr output from server"""
- while self.server_process and self.server_process.poll() is None:
- try:
- line = self.server_process.stderr.readline()
- if line:
- print(f"SERVER STDERR: {line.decode().strip()}")
- except:
- break
-
- def _read_stdout(self):
- """Read stdout output from server without Content-Length parsing"""
- buffer = b""
- while self.server_process and self.server_process.poll() is None:
- try:
- data = self.server_process.stdout.readline()
- if not data:
- break
- buffer += data
-
- # Try to parse JSON directly from each line
- try:
- message = buffer.strip()
- if message:
- parsed = json.loads(message.decode())
- self.message_queue.put(parsed)
- buffer = b""
- except json.JSONDecodeError:
- # If not valid JSON yet, continue reading
- continue
- except:
- break
-
- def send_jsonrpc_request(self, method, params=None, id=1):
- """Send a JSON-RPC request without Content-Length header"""
- request = {
- "jsonrpc": "2.0",
- "method": method,
- "id": id,
- "params": params or {}
- }
-
- request_str = json.dumps(request)
- message = f"{request_str}\n"
-
- self.server_process.stdin.write(message.encode())
- self.server_process.stdin.flush()
-
- try:
- response = self.message_queue.get(timeout=10)
- return response
- except queue.Empty:
- print("TIMEOUT: No response received")
- return None
-
- def test_message_sizes(self):
- """Test various message sizes"""
- print("\n=== Testing Various Message Sizes ===")
-
- # Test small message
- print("\nTesting small message...")
- small_response = self.send_jsonrpc_request("ping", id=1)
- if small_response:
- print("✓ Small message successful")
- else:
- print("✗ Small message failed")
-
- # Test medium message
- print("\nTesting medium message...")
- medium_params = {
- "data": "x" * 1000, # 1KB of data
- "metadata": {
- "timestamp": "2025-12-12T03:29:00Z",
- "source": "test-client",
- "version": "1.0.0"
- }
- }
- medium_response = self.send_jsonrpc_request("ping", medium_params, id=2)
- if medium_response:
- print("✓ Medium message successful")
- else:
- print("✗ Medium message failed")
-
- # Test large message
- print("\nTesting large message...")
- large_params = {
- "data": "x" * 10000, # 10KB of data
- "metadata": {
- "timestamp": "2025-12-12T03:29:00Z",
- "source": "test-client",
- "version": "1.0.0",
- "extra": "y" * 5000
- }
- }
- large_response = self.send_jsonrpc_request("ping", large_params, id=3)
- if large_response:
- print("✓ Large message successful")
- else:
- print("✗ Large message failed")
-
- # Test very large message
- print("\nTesting very large message...")
- very_large_params = {
- "data": "x" * 50000, # 50KB of data
- "metadata": {
- "timestamp": "2025-12-12T03:29:00Z",
- "source": "test-client",
- "version": "1.0.0",
- "extra": "y" * 25000
- }
- }
- very_large_response = self.send_jsonrpc_request("ping", very_large_params, id=4)
- if very_large_response:
- print("✓ Very large message successful")
- else:
- print("✗ Very large message failed")
-
- def test_edge_cases(self):
- """Test edge cases"""
- print("\n=== Testing Edge Cases ===")
-
- # Test empty params
- print("\nTesting empty params...")
- empty_response = self.send_jsonrpc_request("ping", {}, id=5)
- if empty_response:
- print("✓ Empty params successful")
- else:
- print("✗ Empty params failed")
-
- # Test null params
- print("\nTesting null params...")
- null_response = self.send_jsonrpc_request("ping", None, id=6)
- if null_response:
- print("✓ Null params successful")
- else:
- print("✗ Null params failed")
-
- # Test special characters
- print("\nTesting special characters...")
- special_params = {
- "data": "Special chars: \n\t\r\"'\\",
- "unicode": "Unicode: ñáéíóú 中文 🚀",
- "emoji": "🎉👍💻🔧"
- }
- special_response = self.send_jsonrpc_request("ping", special_params, id=7)
- if special_response:
- print("✓ Special characters successful")
- else:
- print("✗ Special characters failed")
-
- def test_concurrent_requests(self):
- """Test concurrent requests"""
- print("\n=== Testing Concurrent Requests ===")
-
- def send_request(request_id):
- params = {"request_id": request_id, "data": f"concurrent_{request_id}"}
- return self.send_jsonrpc_request("ping", params, request_id)
-
- # Send multiple requests quickly
- print("\nSending 10 concurrent requests...")
- responses = []
- for i in range(10):
- response = send_request(10 + i)
- responses.append(response)
-
- successful = sum(1 for r in responses if r is not None)
- print(f"✓ {successful}/10 concurrent requests successful")
-
- def stop_server(self):
- """Stop MCP server process"""
- if self.server_process:
- print("\nStopping server...")
- self.server_process.terminate()
- try:
- self.server_process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- self.server_process.kill()
- self.server_process = None
- def main():
- if len(sys.argv) != 2:
- print("Usage: python3 test_message_sizes.py <server-executable>")
- sys.exit(1)
-
- server_path = sys.argv[1]
- if not os.path.exists(server_path):
- print(f"Error: Server executable '{server_path}' not found")
- sys.exit(1)
-
- tester = MCPServerTester(server_path)
-
- try:
- tester.start_server()
- time.sleep(2)
-
- # Initialize first
- init_params = {
- "protocolVersion": "2025-11-25",
- "capabilities": {},
- "clientInfo": {
- "name": "test-client",
- "version": "1.0.0"
- }
- }
- init_response = tester.send_jsonrpc_request("initialize", init_params, id=0)
-
- if init_response:
- print("✓ Server initialized successfully")
-
- # Run tests
- tester.test_message_sizes()
- tester.test_edge_cases()
- tester.test_concurrent_requests()
-
- print("\n✓ All message size tests completed successfully")
- else:
- print("✗ Failed to initialize server")
-
- except KeyboardInterrupt:
- print("\nTest interrupted by user")
- except Exception as e:
- print(f"\nError during test: {e}")
- import traceback
- traceback.print_exc()
- finally:
- tester.stop_server()
- if __name__ == "__main__":
- main()
|