#!/usr/bin/env python3 """ Comprehensive test for calculator-server to verify the fix """ import subprocess import time import json import sys import os import threading import queue class MCPServerTester: def __init__(self, server_path): self.server_path = server_path self.server_process = None self.message_queue = queue.Queue() def start_server(self): """Start MCP server process""" print(f"Starting MCP server: {self.server_path}") self.server_process = subprocess.Popen( [self.server_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False, bufsize=0 ) # Start thread to read stderr stderr_thread = threading.Thread(target=self._read_stderr) stderr_thread.daemon = True stderr_thread.start() # Start thread to read stdout stdout_thread = threading.Thread(target=self._read_stdout) stdout_thread.daemon = True stdout_thread.start() time.sleep(1) def _read_stderr(self): """Read stderr output from server""" while self.server_process and self.server_process.poll() is None: try: line = self.server_process.stderr.readline() if line: print(f"SERVER STDERR: {line.decode().strip()}") except: break def _read_stdout(self): """Read stdout output from server with proper Content-Length parsing""" buffer = b"" while self.server_process and self.server_process.poll() is None: try: # Read line by line line = self.server_process.stdout.readline() if not line: break buffer += line # Check if we have a complete message if b'\n' in buffer: # Split on newline to get individual messages lines = buffer.split(b'\n') for i, msg_line in enumerate(lines): if msg_line.strip(): try: # Try to parse as JSON message = json.loads(msg_line.decode()) self.message_queue.put(message) except json.JSONDecodeError as e: print(f"JSON DECODE ERROR: {e}") # Keep the last incomplete line if any if lines and not lines[-1].strip(): buffer = lines[-1] else: buffer = b"" except: break def send_jsonrpc_request(self, method, params=None, id=1): """Send a JSON-RPC request""" request = { "jsonrpc": "2.0", "id": id, "method": method, "params": params or {} } request_str = json.dumps(request) message = f"{request_str}\n" self.server_process.stdin.write(message.encode()) self.server_process.stdin.flush() try: response = self.message_queue.get(timeout=10) return response except queue.Empty: print("TIMEOUT: No response received") return None def test_operation(self, operation, operand1, operand2, expected_result): """Test a specific operation""" print(f"\n=== Testing {operation} with {operand1} and {operand2} ===") params = { "name": "basic_calculator", "arguments": { "operation": operation, "operand1": operand1, "operand2": operand2 }, "_meta": { "progressToken": 0 } } response = self.send_jsonrpc_request("tools/call", params, id=1) if response and "result" in response: result = response["result"] if "content" in result and len(result["content"]) > 0: content = result["content"][0] if "text" in content: result_text = content["text"] print(f"Result text: {result_text}") # Check if result matches expected if str(expected_result) in result_text: print(f"✓ Correct result: {expected_result}") else: print(f"✗ Incorrect result: expected {expected_result}, got {result_text}") if "structured_content" in result and isinstance(result["structured_content"], dict): structured = result["structured_content"] if 'result' in structured: actual_result = structured['result'] print(f"Structured result: {actual_result}") # Check if result matches expected if abs(actual_result - expected_result) < 0.001: print(f"✓ Correct calculation: {actual_result}") else: print(f"✗ Incorrect calculation: expected {expected_result}, got {actual_result}") # Check operands if 'operand1' in structured: print(f"operand1 in result: {structured['operand1']}") if 'operand2' in structured: print(f"operand2 in result: {structured['operand2']}") else: print("✗ No response or error in response") def stop_server(self): """Stop MCP server process""" if self.server_process: print("\nStopping server...") self.server_process.terminate() try: self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() self.server_process = None def main(): if len(sys.argv) != 2: print("Usage: python3 test_calculator_comprehensive.py ") sys.exit(1) server_path = sys.argv[1] if not os.path.exists(server_path): print(f"Error: Server executable '{server_path}' not found") sys.exit(1) tester = MCPServerTester(server_path) try: tester.start_server() time.sleep(2) # Initialize first print("\nInitializing server...") init_params = { "protocolVersion": "2025-11-25", "capabilities": {}, "clientInfo": { "name": "test-client", "version": "1.0.0" } } init_response = tester.send_jsonrpc_request("initialize", init_params, id=0) if init_response: print("✓ Server initialized successfully") # Test different operations with different number types test_cases = [ # (operation, operand1, operand2, expected_result) ("add", 2, 3, 5), ("add", 2.5, 3.7, 6.2), ("add", -5, 10, 5), ("subtract", 10, 3, 7), ("multiply", 4, 5, 20), ("divide", 20, 4, 5), ("power", 2, 3, 8), ("modulo", 10, 3, 1), # Test with integer types ("add", 2, 3, 5), ("add", 100, 200, 300), # Test with float types ("add", 2.5, 3.5, 6.0), ("add", -2.5, 3.5, 1.0), ("multiply", 1.5, 2.5, 3.75), ("divide", 10.0, 2.5, 4.0), ] for operation, operand1, operand2, expected in test_cases: tester.test_operation(operation, operand1, operand2, expected) print("\n✓ All calculator tests completed successfully") else: print("✗ Failed to initialize server") except KeyboardInterrupt: print("\nTest interrupted by user") except Exception as e: print(f"\nError during test: {e}") import traceback traceback.print_exc() finally: tester.stop_server() if __name__ == "__main__": main()