test_resources_fixed.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #!/usr/bin/env python3
  2. """
  3. Resource handling testing for MCP server - fixed version
  4. """
  5. import subprocess
  6. import time
  7. import json
  8. import sys
  9. import os
  10. import threading
  11. import queue
  12. import tempfile
  13. import shutil
  14. class MCPServerTester:
  15. def __init__(self, server_path):
  16. self.server_path = server_path
  17. self.server_process = None
  18. self.message_queue = queue.Queue()
  19. self.test_dir = None
  20. def start_server(self):
  21. """Start MCP server process"""
  22. # Create a temporary directory for testing
  23. self.test_dir = tempfile.mkdtemp(prefix="mcp_test_")
  24. # Create some test files
  25. with open(os.path.join(self.test_dir, "test.txt"), "w") as f:
  26. f.write("Hello, World!")
  27. with open(os.path.join(self.test_dir, "test.json"), "w") as f:
  28. json.dump({"message": "test", "value": 42}, f)
  29. with open(os.path.join(self.test_dir, "binary.dat"), "wb") as f:
  30. f.write(b"\x00\x01\x02\x03\x04\x05")
  31. print(f"Starting MCP server: {self.server_path} with test directory: {self.test_dir}")
  32. self.server_process = subprocess.Popen(
  33. [self.server_path, self.test_dir],
  34. stdin=subprocess.PIPE,
  35. stdout=subprocess.PIPE,
  36. stderr=subprocess.PIPE,
  37. text=False,
  38. bufsize=0
  39. )
  40. # Start thread to read stderr
  41. stderr_thread = threading.Thread(target=self._read_stderr)
  42. stderr_thread.daemon = True
  43. stderr_thread.start()
  44. # Start thread to read stdout
  45. stdout_thread = threading.Thread(target=self._read_stdout)
  46. stdout_thread.daemon = True
  47. stdout_thread.start()
  48. time.sleep(1)
  49. def _read_stderr(self):
  50. """Read stderr output from server"""
  51. while self.server_process and self.server_process.poll() is None:
  52. try:
  53. line = self.server_process.stderr.readline()
  54. if line:
  55. print(f"SERVER STDERR: {line.decode().strip()}")
  56. except:
  57. break
  58. def _read_stdout(self):
  59. """Read stdout output from server without Content-Length parsing"""
  60. buffer = b""
  61. while self.server_process and self.server_process.poll() is None:
  62. try:
  63. data = self.server_process.stdout.readline()
  64. if not data:
  65. break
  66. buffer += data
  67. # Try to parse JSON directly from each line
  68. try:
  69. message = buffer.strip()
  70. if message:
  71. parsed = json.loads(message.decode())
  72. self.message_queue.put(parsed)
  73. buffer = b""
  74. except json.JSONDecodeError:
  75. # If not valid JSON yet, continue reading
  76. continue
  77. except:
  78. break
  79. def send_jsonrpc_request(self, method, params=None, id=1):
  80. """Send a JSON-RPC request without Content-Length header"""
  81. request = {
  82. "jsonrpc": "2.0",
  83. "method": method,
  84. "id": id,
  85. "params": params or {}
  86. }
  87. request_str = json.dumps(request)
  88. message = f"{request_str}\n"
  89. self.server_process.stdin.write(message.encode())
  90. self.server_process.stdin.flush()
  91. try:
  92. response = self.message_queue.get(timeout=10)
  93. return response
  94. except queue.Empty:
  95. print("TIMEOUT: No response received")
  96. return None
  97. def test_resource_registration_and_discovery(self):
  98. """Test resource registration and discovery"""
  99. print("\n=== Testing Resource Registration and Discovery ===")
  100. # Test resources/list
  101. print("\nTesting resources/list...")
  102. list_response = self.send_jsonrpc_request("resources/list", id=1)
  103. if list_response and "result" in list_response:
  104. result = list_response["result"]
  105. if "resources" in result:
  106. resources = result["resources"]
  107. print(f"✓ Found {len(resources)} resources")
  108. # Check if our test files are listed
  109. resource_uris = [r.get("uri", "") for r in resources]
  110. test_files = ["test.txt", "test.json", "binary.dat"]
  111. for filename in test_files:
  112. found = any(filename in uri for uri in resource_uris)
  113. if found:
  114. print(f"✓ Found {filename} in resource list")
  115. else:
  116. print(f"✗ Missing {filename} in resource list")
  117. return True
  118. else:
  119. print("✗ No resources field in response")
  120. return False
  121. else:
  122. print("✗ Failed to list resources")
  123. return False
  124. def test_resource_content_retrieval(self):
  125. """Test resource content retrieval"""
  126. print("\n=== Testing Resource Content Retrieval ===")
  127. # Test reading text file
  128. print("\nTesting text file read...")
  129. txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}"
  130. txt_response = self.send_jsonrpc_request("resources/read", {"uri": txt_uri}, id=2)
  131. if txt_response and "result" in txt_response:
  132. result = txt_response["result"]
  133. # Handle both response formats
  134. if "text" in result:
  135. # Direct text format
  136. if result["text"] == "Hello, World!":
  137. print("✓ Text file content correct")
  138. else:
  139. print(f"✗ Text file content incorrect: {result.get('text')}")
  140. elif "contents" in result and len(result["contents"]) > 0:
  141. # Contents array format
  142. content = result["contents"][0]
  143. if content.get("text") == "Hello, World!":
  144. print("✓ Text file content correct")
  145. else:
  146. print(f"✗ Text file content incorrect: {content.get('text')}")
  147. else:
  148. print("✗ No content in text file response")
  149. else:
  150. print("✗ Failed to read text file")
  151. # Test reading JSON file
  152. print("\nTesting JSON file read...")
  153. json_uri = f"file://{os.path.join(self.test_dir, 'test.json')}"
  154. json_response = self.send_jsonrpc_request("resources/read", {"uri": json_uri}, id=3)
  155. if json_response and "result" in json_response:
  156. result = json_response["result"]
  157. if "text" in result:
  158. # Direct text format
  159. try:
  160. json_data = json.loads(result["text"])
  161. if json_data.get("message") == "test" and json_data.get("value") == 42:
  162. print("✓ JSON file content correct")
  163. else:
  164. print(f"✗ JSON file content incorrect: {json_data}")
  165. except json.JSONDecodeError:
  166. print("✗ JSON file content is not valid JSON")
  167. elif "contents" in result and len(result["contents"]) > 0:
  168. # Contents array format
  169. content = result["contents"][0]
  170. if content.get("text"):
  171. try:
  172. json_data = json.loads(content.get("text"))
  173. if json_data.get("message") == "test" and json_data.get("value") == 42:
  174. print("✓ JSON file content correct")
  175. else:
  176. print(f"✗ JSON file content incorrect: {json_data}")
  177. except json.JSONDecodeError:
  178. print("✗ JSON file content is not valid JSON")
  179. else:
  180. print("✗ JSON file content not in text format")
  181. else:
  182. print("✗ No content in JSON file response")
  183. else:
  184. print("✗ Failed to read JSON file")
  185. # Test reading binary file
  186. print("\nTesting binary file read...")
  187. bin_uri = f"file://{os.path.join(self.test_dir, 'binary.dat')}"
  188. bin_response = self.send_jsonrpc_request("resources/read", {"uri": bin_uri}, id=4)
  189. if bin_response and "result" in bin_response:
  190. result = bin_response["result"]
  191. if "blob" in result:
  192. print("✓ Binary file read as blob")
  193. else:
  194. print("✗ Binary file not read as blob")
  195. else:
  196. print("✗ Failed to read binary file")
  197. def test_resource_subscription_and_notifications(self):
  198. """Test resource subscription and notifications"""
  199. print("\n=== Testing Resource Subscription and Notifications ===")
  200. # Test subscription
  201. print("\nTesting resource subscription...")
  202. txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}"
  203. sub_response = self.send_jsonrpc_request("resources/subscribe", {"uri": txt_uri}, id=5)
  204. if sub_response and "result" in sub_response:
  205. print("✓ Resource subscription successful")
  206. # Test unsubscription
  207. print("\nTesting resource unsubscription...")
  208. unsub_response = self.send_jsonrpc_request("resources/unsubscribe", {"uri": txt_uri}, id=6)
  209. if unsub_response and "result" in unsub_response:
  210. print("✓ Resource unsubscription successful")
  211. else:
  212. print("✗ Resource unsubscription failed")
  213. else:
  214. print("✗ Resource subscription failed")
  215. def test_resource_templates(self):
  216. """Test resource template functionality"""
  217. print("\n=== Testing Resource Templates ===")
  218. # Test templates/list
  219. print("\nTesting resources/templates/list...")
  220. templates_response = self.send_jsonrpc_request("resources/templates/list", id=7)
  221. if templates_response and "result" in templates_response:
  222. result = templates_response["result"]
  223. if "templates" in result:
  224. templates = result["templates"]
  225. print(f"✓ Found {len(templates)} templates")
  226. return True
  227. else:
  228. print("✗ No templates field in response")
  229. return False
  230. else:
  231. print("✗ Failed to list templates")
  232. return False
  233. def test_error_handling(self):
  234. """Test error handling for resources"""
  235. print("\n=== Testing Resource Error Handling ===")
  236. # Test reading non-existent file
  237. print("\nTesting non-existent file...")
  238. fake_uri = "file:///non/existent/file.txt"
  239. error_response = self.send_jsonrpc_request("resources/read", {"uri": fake_uri}, id=8)
  240. if error_response and "error" in error_response:
  241. print("✓ Non-existent file handled correctly")
  242. else:
  243. print("✗ Non-existent file not handled properly")
  244. # Test missing URI parameter
  245. print("\nTesting missing URI parameter...")
  246. missing_response = self.send_jsonrpc_request("resources/read", {}, id=9)
  247. if missing_response and "error" in missing_response:
  248. print("✓ Missing URI parameter handled correctly")
  249. else:
  250. print("✗ Missing URI parameter not handled properly")
  251. def cleanup(self):
  252. """Clean up test environment"""
  253. if self.test_dir and os.path.exists(self.test_dir):
  254. shutil.rmtree(self.test_dir)
  255. def stop_server(self):
  256. """Stop MCP server process"""
  257. if self.server_process:
  258. print("\nStopping server...")
  259. self.server_process.terminate()
  260. try:
  261. self.server_process.wait(timeout=5)
  262. except subprocess.TimeoutExpired:
  263. self.server_process.kill()
  264. self.server_process = None
  265. def main():
  266. if len(sys.argv) != 2:
  267. print("Usage: python3 test_resources_fixed.py <server-executable>")
  268. sys.exit(1)
  269. server_path = sys.argv[1]
  270. if not os.path.exists(server_path):
  271. print(f"Error: Server executable '{server_path}' not found")
  272. sys.exit(1)
  273. tester = MCPServerTester(server_path)
  274. try:
  275. tester.start_server()
  276. time.sleep(2)
  277. # Initialize first
  278. init_params = {
  279. "protocolVersion": "2025-11-25",
  280. "capabilities": {},
  281. "clientInfo": {
  282. "name": "test-client",
  283. "version": "1.0.0"
  284. }
  285. }
  286. init_response = tester.send_jsonrpc_request("initialize", init_params, id=0)
  287. if init_response:
  288. print("✓ Server initialized successfully")
  289. # Run resource tests
  290. tester.test_resource_registration_and_discovery()
  291. tester.test_resource_content_retrieval()
  292. tester.test_resource_subscription_and_notifications()
  293. tester.test_resource_templates()
  294. tester.test_error_handling()
  295. print("\n✓ All resource tests completed successfully")
  296. else:
  297. print("✗ Failed to initialize server")
  298. except KeyboardInterrupt:
  299. print("\nTest interrupted by user")
  300. except Exception as e:
  301. print(f"\nError during test: {e}")
  302. import traceback
  303. traceback.print_exc()
  304. finally:
  305. tester.stop_server()
  306. tester.cleanup()
  307. if __name__ == "__main__":
  308. main()