test_mcp_with_headers.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #!/usr/bin/env python3
  2. """
  3. Test MCP server connection with proper Content-Length headers
  4. """
  5. import subprocess
  6. import time
  7. import json
  8. import sys
  9. import os
  10. import threading
  11. import queue
  12. class MCPServerTester:
  13. def __init__(self, server_path):
  14. self.server_path = server_path
  15. self.server_process = None
  16. self.message_queue = queue.Queue()
  17. def start_server(self):
  18. """Start MCP server process"""
  19. print(f"Starting MCP server: {self.server_path}")
  20. self.server_process = subprocess.Popen(
  21. [self.server_path],
  22. stdin=subprocess.PIPE,
  23. stdout=subprocess.PIPE,
  24. stderr=subprocess.PIPE,
  25. text=True,
  26. bufsize=0
  27. )
  28. # Start thread to read stderr
  29. stderr_thread = threading.Thread(target=self._read_stderr)
  30. stderr_thread.daemon = True
  31. stderr_thread.start()
  32. # Start thread to read stdout
  33. stdout_thread = threading.Thread(target=self._read_stdout)
  34. stdout_thread.daemon = True
  35. stdout_thread.start()
  36. time.sleep(1) # Give server time to start
  37. def _read_stderr(self):
  38. """Read stderr output from server"""
  39. while self.server_process and self.server_process.poll() is None:
  40. line = self.server_process.stderr.readline()
  41. if line:
  42. print(f"SERVER STDERR: {line.strip()}")
  43. def _read_stdout(self):
  44. """Read stdout output from server without Content-Length parsing"""
  45. while self.server_process and self.server_process.poll() is None:
  46. try:
  47. line = self.server_process.stdout.readline()
  48. if not line:
  49. break
  50. # Try to parse JSON directly from each line
  51. try:
  52. message = line.strip()
  53. if message:
  54. parsed = json.loads(message)
  55. self.message_queue.put(parsed)
  56. print(f"SERVER MESSAGE: {json.dumps(parsed, indent=2)}")
  57. except json.JSONDecodeError:
  58. # If not valid JSON yet, continue reading
  59. continue
  60. except:
  61. break
  62. def send_jsonrpc_request(self, method, params=None, id=1):
  63. """Send a JSON-RPC request without Content-Length header"""
  64. request = {
  65. "jsonrpc": "2.0",
  66. "method": method,
  67. "id": id,
  68. "params": {}
  69. }
  70. if params:
  71. request["params"] = params
  72. request_str = json.dumps(request)
  73. message = f"{request_str}\n"
  74. print(f"SENDING: {repr(message)}")
  75. self.server_process.stdin.write(message)
  76. self.server_process.stdin.flush()
  77. # Wait for response with timeout
  78. try:
  79. response = self.message_queue.get(timeout=5)
  80. return response
  81. except queue.Empty:
  82. print("TIMEOUT: No response received")
  83. return None
  84. def test_simple_ping(self):
  85. """Test simple ping"""
  86. print("\n=== Testing Simple Ping ===")
  87. # Try a ping
  88. print("\nSending ping request...")
  89. ping_response = self.send_jsonrpc_request("ping", id=1)
  90. if ping_response:
  91. print(f"Ping response: {json.dumps(ping_response, indent=2)}")
  92. print("✓ Ping successful")
  93. return True
  94. else:
  95. print("✗ Ping failed")
  96. return False
  97. def stop_server(self):
  98. """Stop MCP server process"""
  99. if self.server_process:
  100. print("\nStopping server...")
  101. self.server_process.terminate()
  102. try:
  103. self.server_process.wait(timeout=5)
  104. except subprocess.TimeoutExpired:
  105. self.server_process.kill()
  106. self.server_process = None
  107. def main():
  108. if len(sys.argv) != 2:
  109. print("Usage: python3 test_mcp_with_headers.py <server-executable>")
  110. sys.exit(1)
  111. server_path = sys.argv[1]
  112. if not os.path.exists(server_path):
  113. print(f"Error: Server executable '{server_path}' not found")
  114. sys.exit(1)
  115. tester = MCPServerTester(server_path)
  116. try:
  117. tester.start_server()
  118. # Wait a bit for server to fully start
  119. time.sleep(2)
  120. # Test simple ping
  121. success = tester.test_simple_ping()
  122. if success:
  123. print("\n✓ Server test completed successfully")
  124. else:
  125. print("\n✗ Server test failed")
  126. except KeyboardInterrupt:
  127. print("\nTest interrupted by user")
  128. except Exception as e:
  129. print(f"\nError during test: {e}")
  130. import traceback
  131. traceback.print_exc()
  132. finally:
  133. tester.stop_server()
  134. if __name__ == "__main__":
  135. main()