test_calculator_comprehensive.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #!/usr/bin/env python3
  2. """
  3. Comprehensive test for calculator-server to verify the fix
  4. """
  5. import subprocess
  6. import time
  7. import json
  8. import sys
  9. import os
  10. import threading
  11. import queue
  12. class MCPServerTester:
  13. def __init__(self, server_path):
  14. self.server_path = server_path
  15. self.server_process = None
  16. self.message_queue = queue.Queue()
  17. def start_server(self):
  18. """Start MCP server process"""
  19. print(f"Starting MCP server: {self.server_path}")
  20. self.server_process = subprocess.Popen(
  21. [self.server_path],
  22. stdin=subprocess.PIPE,
  23. stdout=subprocess.PIPE,
  24. stderr=subprocess.PIPE,
  25. text=False,
  26. bufsize=0
  27. )
  28. # Start thread to read stderr
  29. stderr_thread = threading.Thread(target=self._read_stderr)
  30. stderr_thread.daemon = True
  31. stderr_thread.start()
  32. # Start thread to read stdout
  33. stdout_thread = threading.Thread(target=self._read_stdout)
  34. stdout_thread.daemon = True
  35. stdout_thread.start()
  36. time.sleep(1)
  37. def _read_stderr(self):
  38. """Read stderr output from server"""
  39. while self.server_process and self.server_process.poll() is None:
  40. try:
  41. line = self.server_process.stderr.readline()
  42. if line:
  43. print(f"SERVER STDERR: {line.decode().strip()}")
  44. except:
  45. break
  46. def _read_stdout(self):
  47. """Read stdout output from server with proper Content-Length parsing"""
  48. buffer = b""
  49. while self.server_process and self.server_process.poll() is None:
  50. try:
  51. # Read line by line
  52. line = self.server_process.stdout.readline()
  53. if not line:
  54. break
  55. buffer += line
  56. # Check if we have a complete message
  57. if b'\n' in buffer:
  58. # Split on newline to get individual messages
  59. lines = buffer.split(b'\n')
  60. for i, msg_line in enumerate(lines):
  61. if msg_line.strip():
  62. try:
  63. # Try to parse as JSON
  64. message = json.loads(msg_line.decode())
  65. self.message_queue.put(message)
  66. except json.JSONDecodeError as e:
  67. print(f"JSON DECODE ERROR: {e}")
  68. # Keep the last incomplete line if any
  69. if lines and not lines[-1].strip():
  70. buffer = lines[-1]
  71. else:
  72. buffer = b""
  73. except:
  74. break
  75. def send_jsonrpc_request(self, method, params=None, id=1):
  76. """Send a JSON-RPC request"""
  77. request = {
  78. "jsonrpc": "2.0",
  79. "id": id,
  80. "method": method,
  81. "params": params or {}
  82. }
  83. request_str = json.dumps(request)
  84. message = f"{request_str}\n"
  85. self.server_process.stdin.write(message.encode())
  86. self.server_process.stdin.flush()
  87. try:
  88. response = self.message_queue.get(timeout=10)
  89. return response
  90. except queue.Empty:
  91. print("TIMEOUT: No response received")
  92. return None
  93. def test_operation(self, operation, operand1, operand2, expected_result):
  94. """Test a specific operation"""
  95. print(f"\n=== Testing {operation} with {operand1} and {operand2} ===")
  96. params = {
  97. "name": "basic_calculator",
  98. "arguments": {
  99. "operation": operation,
  100. "operand1": operand1,
  101. "operand2": operand2
  102. },
  103. "_meta": {
  104. "progressToken": 0
  105. }
  106. }
  107. response = self.send_jsonrpc_request("tools/call", params, id=1)
  108. if response and "result" in response:
  109. result = response["result"]
  110. if "content" in result and len(result["content"]) > 0:
  111. content = result["content"][0]
  112. if "text" in content:
  113. result_text = content["text"]
  114. print(f"Result text: {result_text}")
  115. # Check if result matches expected
  116. if str(expected_result) in result_text:
  117. print(f"✓ Correct result: {expected_result}")
  118. else:
  119. print(f"✗ Incorrect result: expected {expected_result}, got {result_text}")
  120. if "structured_content" in result and isinstance(result["structured_content"], dict):
  121. structured = result["structured_content"]
  122. if 'result' in structured:
  123. actual_result = structured['result']
  124. print(f"Structured result: {actual_result}")
  125. # Check if result matches expected
  126. if abs(actual_result - expected_result) < 0.001:
  127. print(f"✓ Correct calculation: {actual_result}")
  128. else:
  129. print(f"✗ Incorrect calculation: expected {expected_result}, got {actual_result}")
  130. # Check operands
  131. if 'operand1' in structured:
  132. print(f"operand1 in result: {structured['operand1']}")
  133. if 'operand2' in structured:
  134. print(f"operand2 in result: {structured['operand2']}")
  135. else:
  136. print("✗ No response or error in response")
  137. def stop_server(self):
  138. """Stop MCP server process"""
  139. if self.server_process:
  140. print("\nStopping server...")
  141. self.server_process.terminate()
  142. try:
  143. self.server_process.wait(timeout=5)
  144. except subprocess.TimeoutExpired:
  145. self.server_process.kill()
  146. self.server_process = None
  147. def main():
  148. if len(sys.argv) != 2:
  149. print("Usage: python3 test_calculator_comprehensive.py <server-executable>")
  150. sys.exit(1)
  151. server_path = sys.argv[1]
  152. if not os.path.exists(server_path):
  153. print(f"Error: Server executable '{server_path}' not found")
  154. sys.exit(1)
  155. tester = MCPServerTester(server_path)
  156. try:
  157. tester.start_server()
  158. time.sleep(2)
  159. # Initialize first
  160. print("\nInitializing server...")
  161. init_params = {
  162. "protocolVersion": "2025-11-25",
  163. "capabilities": {},
  164. "clientInfo": {
  165. "name": "test-client",
  166. "version": "1.0.0"
  167. }
  168. }
  169. init_response = tester.send_jsonrpc_request("initialize", init_params, id=0)
  170. if init_response:
  171. print("✓ Server initialized successfully")
  172. # Test different operations with different number types
  173. test_cases = [
  174. # (operation, operand1, operand2, expected_result)
  175. ("add", 2, 3, 5),
  176. ("add", 2.5, 3.7, 6.2),
  177. ("add", -5, 10, 5),
  178. ("subtract", 10, 3, 7),
  179. ("multiply", 4, 5, 20),
  180. ("divide", 20, 4, 5),
  181. ("power", 2, 3, 8),
  182. ("modulo", 10, 3, 1),
  183. # Test with integer types
  184. ("add", 2, 3, 5),
  185. ("add", 100, 200, 300),
  186. # Test with float types
  187. ("add", 2.5, 3.5, 6.0),
  188. ("add", -2.5, 3.5, 1.0),
  189. ("multiply", 1.5, 2.5, 3.75),
  190. ("divide", 10.0, 2.5, 4.0),
  191. ]
  192. for operation, operand1, operand2, expected in test_cases:
  193. tester.test_operation(operation, operand1, operand2, expected)
  194. print("\n✓ All calculator tests completed successfully")
  195. else:
  196. print("✗ Failed to initialize server")
  197. except KeyboardInterrupt:
  198. print("\nTest interrupted by user")
  199. except Exception as e:
  200. print(f"\nError during test: {e}")
  201. import traceback
  202. traceback.print_exc()
  203. finally:
  204. tester.stop_server()
  205. if __name__ == "__main__":
  206. main()