test_content_length.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #!/usr/bin/env python3
  2. """
  3. Test MCP server connection with proper Content-Length headers
  4. """
  5. import subprocess
  6. import time
  7. import json
  8. import sys
  9. import os
  10. import threading
  11. import queue
  12. import tempfile
  13. class MCPServerTester:
  14. def __init__(self, server_path):
  15. self.server_path = server_path
  16. self.server_process = None
  17. self.message_queue = queue.Queue()
  18. def start_server(self):
  19. """Start MCP server process"""
  20. print(f"Starting MCP server: {self.server_path}")
  21. self.server_process = subprocess.Popen(
  22. [self.server_path],
  23. stdin=subprocess.PIPE,
  24. stdout=subprocess.PIPE,
  25. stderr=subprocess.PIPE,
  26. text=False, # Use binary mode for proper handling
  27. bufsize=0
  28. )
  29. # Start thread to read stderr
  30. stderr_thread = threading.Thread(target=self._read_stderr)
  31. stderr_thread.daemon = True
  32. stderr_thread.start()
  33. # Start thread to read stdout
  34. stdout_thread = threading.Thread(target=self._read_stdout)
  35. stdout_thread.daemon = True
  36. stdout_thread.start()
  37. time.sleep(1) # Give server time to start
  38. def _read_stderr(self):
  39. """Read stderr output from server"""
  40. while self.server_process and self.server_process.poll() is None:
  41. try:
  42. line = self.server_process.stderr.readline()
  43. if line:
  44. print(f"SERVER STDERR: {line.decode().strip()}")
  45. except:
  46. break
  47. def _read_stdout(self):
  48. """Read stdout output from server without Content-Length parsing"""
  49. buffer = b""
  50. while self.server_process and self.server_process.poll() is None:
  51. try:
  52. data = self.server_process.stdout.readline()
  53. if not data:
  54. break
  55. buffer += data
  56. # Try to parse JSON directly from each line
  57. try:
  58. message = buffer.strip()
  59. if message:
  60. parsed = json.loads(message.decode())
  61. self.message_queue.put(parsed)
  62. print(f"SERVER MESSAGE: {json.dumps(parsed, indent=2)}")
  63. buffer = b""
  64. except json.JSONDecodeError:
  65. # If not valid JSON yet, continue reading
  66. continue
  67. except:
  68. break
  69. def send_jsonrpc_request(self, method, params=None, id=1):
  70. """Send a JSON-RPC request without Content-Length header"""
  71. request = {
  72. "jsonrpc": "2.0",
  73. "method": method,
  74. "id": id,
  75. "params": {}
  76. }
  77. if params:
  78. request["params"] = params
  79. request_str = json.dumps(request)
  80. message = f"{request_str}\n"
  81. print(f"SENDING: {repr(message)}")
  82. self.server_process.stdin.write(message.encode())
  83. self.server_process.stdin.flush()
  84. # Wait for response with timeout
  85. try:
  86. response = self.message_queue.get(timeout=5)
  87. return response
  88. except queue.Empty:
  89. print("TIMEOUT: No response received")
  90. return None
  91. def test_initialize(self):
  92. """Test initialize request"""
  93. print("\n=== Testing Initialize ===")
  94. params = {
  95. "protocolVersion": "2025-11-25",
  96. "capabilities": {},
  97. "clientInfo": {
  98. "name": "test-client",
  99. "version": "1.0.0"
  100. }
  101. }
  102. print("\nSending initialize request...")
  103. init_response = self.send_jsonrpc_request("initialize", params, id=1)
  104. if init_response:
  105. print(f"Initialize response: {json.dumps(init_response, indent=2)}")
  106. print("✓ Initialize successful")
  107. return True
  108. else:
  109. print("✗ Initialize failed")
  110. return False
  111. def test_simple_ping(self):
  112. """Test simple ping"""
  113. print("\n=== Testing Simple Ping ===")
  114. # Try a ping
  115. print("\nSending ping request...")
  116. ping_response = self.send_jsonrpc_request("ping", id=2)
  117. if ping_response:
  118. print(f"Ping response: {json.dumps(ping_response, indent=2)}")
  119. print("✓ Ping successful")
  120. return True
  121. else:
  122. print("✗ Ping failed")
  123. return False
  124. def stop_server(self):
  125. """Stop MCP server process"""
  126. if self.server_process:
  127. print("\nStopping server...")
  128. self.server_process.terminate()
  129. try:
  130. self.server_process.wait(timeout=5)
  131. except subprocess.TimeoutExpired:
  132. self.server_process.kill()
  133. self.server_process = None
  134. def main():
  135. if len(sys.argv) != 2:
  136. print("Usage: python3 test_content_length.py <server-executable>")
  137. sys.exit(1)
  138. server_path = sys.argv[1]
  139. if not os.path.exists(server_path):
  140. print(f"Error: Server executable '{server_path}' not found")
  141. sys.exit(1)
  142. tester = MCPServerTester(server_path)
  143. try:
  144. tester.start_server()
  145. # Wait a bit for server to fully start
  146. time.sleep(2)
  147. # Test initialize first
  148. init_success = tester.test_initialize()
  149. if init_success:
  150. # Test simple ping
  151. ping_success = tester.test_simple_ping()
  152. if ping_success:
  153. print("\n✓ Server test completed successfully")
  154. else:
  155. print("\n✗ Ping test failed")
  156. else:
  157. print("\n✗ Initialize test failed")
  158. except KeyboardInterrupt:
  159. print("\nTest interrupted by user")
  160. except Exception as e:
  161. print(f"\nError during test: {e}")
  162. import traceback
  163. traceback.print_exc()
  164. finally:
  165. tester.stop_server()
  166. if __name__ == "__main__":
  167. main()