| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- #!/usr/bin/env python3
- """
- Comprehensive test for calculator-server to verify the fix
- """
- import subprocess
- import time
- import json
- import sys
- import os
- import threading
- import queue
- class MCPServerTester:
- def __init__(self, server_path):
- self.server_path = server_path
- self.server_process = None
- self.message_queue = queue.Queue()
-
- def start_server(self):
- """Start MCP server process"""
- print(f"Starting MCP server: {self.server_path}")
- self.server_process = subprocess.Popen(
- [self.server_path],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=False,
- bufsize=0
- )
-
- # Start thread to read stderr
- stderr_thread = threading.Thread(target=self._read_stderr)
- stderr_thread.daemon = True
- stderr_thread.start()
-
- # Start thread to read stdout
- stdout_thread = threading.Thread(target=self._read_stdout)
- stdout_thread.daemon = True
- stdout_thread.start()
-
- time.sleep(1)
-
- def _read_stderr(self):
- """Read stderr output from server"""
- while self.server_process and self.server_process.poll() is None:
- try:
- line = self.server_process.stderr.readline()
- if line:
- print(f"SERVER STDERR: {line.decode().strip()}")
- except:
- break
-
- def _read_stdout(self):
- """Read stdout output from server with proper Content-Length parsing"""
- buffer = b""
- while self.server_process and self.server_process.poll() is None:
- try:
- # Read line by line
- line = self.server_process.stdout.readline()
- if not line:
- break
-
- buffer += line
-
- # Check if we have a complete message
- if b'\n' in buffer:
- # Split on newline to get individual messages
- lines = buffer.split(b'\n')
- for i, msg_line in enumerate(lines):
- if msg_line.strip():
- try:
- # Try to parse as JSON
- message = json.loads(msg_line.decode())
- self.message_queue.put(message)
- except json.JSONDecodeError as e:
- print(f"JSON DECODE ERROR: {e}")
- # Keep the last incomplete line if any
- if lines and not lines[-1].strip():
- buffer = lines[-1]
- else:
- buffer = b""
- except:
- break
-
- def send_jsonrpc_request(self, method, params=None, id=1):
- """Send a JSON-RPC request"""
- request = {
- "jsonrpc": "2.0",
- "id": id,
- "method": method,
- "params": params or {}
- }
-
- request_str = json.dumps(request)
- message = f"{request_str}\n"
-
- self.server_process.stdin.write(message.encode())
- self.server_process.stdin.flush()
-
- try:
- response = self.message_queue.get(timeout=10)
- return response
- except queue.Empty:
- print("TIMEOUT: No response received")
- return None
-
- def test_operation(self, operation, operand1, operand2, expected_result):
- """Test a specific operation"""
- print(f"\n=== Testing {operation} with {operand1} and {operand2} ===")
-
- params = {
- "name": "basic_calculator",
- "arguments": {
- "operation": operation,
- "operand1": operand1,
- "operand2": operand2
- },
- "_meta": {
- "progressToken": 0
- }
- }
-
- response = self.send_jsonrpc_request("tools/call", params, id=1)
-
- if response and "result" in response:
- result = response["result"]
- if "content" in result and len(result["content"]) > 0:
- content = result["content"][0]
- if "text" in content:
- result_text = content["text"]
- print(f"Result text: {result_text}")
-
- # Check if result matches expected
- if str(expected_result) in result_text:
- print(f"✓ Correct result: {expected_result}")
- else:
- print(f"✗ Incorrect result: expected {expected_result}, got {result_text}")
-
- if "structured_content" in result and isinstance(result["structured_content"], dict):
- structured = result["structured_content"]
- if 'result' in structured:
- actual_result = structured['result']
- print(f"Structured result: {actual_result}")
-
- # Check if result matches expected
- if abs(actual_result - expected_result) < 0.001:
- print(f"✓ Correct calculation: {actual_result}")
- else:
- print(f"✗ Incorrect calculation: expected {expected_result}, got {actual_result}")
-
- # Check operands
- if 'operand1' in structured:
- print(f"operand1 in result: {structured['operand1']}")
- if 'operand2' in structured:
- print(f"operand2 in result: {structured['operand2']}")
- else:
- print("✗ No response or error in response")
-
- def stop_server(self):
- """Stop MCP server process"""
- if self.server_process:
- print("\nStopping server...")
- self.server_process.terminate()
- try:
- self.server_process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- self.server_process.kill()
- self.server_process = None
- def main():
- if len(sys.argv) != 2:
- print("Usage: python3 test_calculator_comprehensive.py <server-executable>")
- sys.exit(1)
-
- server_path = sys.argv[1]
- if not os.path.exists(server_path):
- print(f"Error: Server executable '{server_path}' not found")
- sys.exit(1)
-
- tester = MCPServerTester(server_path)
-
- try:
- tester.start_server()
- time.sleep(2)
-
- # Initialize first
- print("\nInitializing server...")
- init_params = {
- "protocolVersion": "2025-11-25",
- "capabilities": {},
- "clientInfo": {
- "name": "test-client",
- "version": "1.0.0"
- }
- }
- init_response = tester.send_jsonrpc_request("initialize", init_params, id=0)
-
- if init_response:
- print("✓ Server initialized successfully")
-
- # Test different operations with different number types
- test_cases = [
- # (operation, operand1, operand2, expected_result)
- ("add", 2, 3, 5),
- ("add", 2.5, 3.7, 6.2),
- ("add", -5, 10, 5),
- ("subtract", 10, 3, 7),
- ("multiply", 4, 5, 20),
- ("divide", 20, 4, 5),
- ("power", 2, 3, 8),
- ("modulo", 10, 3, 1),
- # Test with integer types
- ("add", 2, 3, 5),
- ("add", 100, 200, 300),
- # Test with float types
- ("add", 2.5, 3.5, 6.0),
- ("add", -2.5, 3.5, 1.0),
- ("multiply", 1.5, 2.5, 3.75),
- ("divide", 10.0, 2.5, 4.0),
- ]
-
- for operation, operand1, operand2, expected in test_cases:
- tester.test_operation(operation, operand1, operand2, expected)
-
- print("\n✓ All calculator tests completed successfully")
- else:
- print("✗ Failed to initialize server")
-
- except KeyboardInterrupt:
- print("\nTest interrupted by user")
- except Exception as e:
- print(f"\nError during test: {e}")
- import traceback
- traceback.print_exc()
- finally:
- tester.stop_server()
- if __name__ == "__main__":
- main()
|