#!/usr/bin/env python3 """ Resource handling testing for MCP server """ import subprocess import time import json import sys import os import threading import queue import tempfile import shutil class MCPServerTester: def __init__(self, server_path): self.server_path = server_path self.server_process = None self.message_queue = queue.Queue() self.test_dir = None def start_server(self): """Start MCP server process""" # Create a temporary directory for testing self.test_dir = tempfile.mkdtemp(prefix="mcp_test_") # Create some test files with open(os.path.join(self.test_dir, "test.txt"), "w") as f: f.write("Hello, World!") with open(os.path.join(self.test_dir, "test.json"), "w") as f: json.dump({"message": "test", "value": 42}, f) with open(os.path.join(self.test_dir, "binary.dat"), "wb") as f: f.write(b"\x00\x01\x02\x03\x04\x05") print(f"Starting MCP server: {self.server_path} with test directory: {self.test_dir}") self.server_process = subprocess.Popen( [self.server_path, self.test_dir], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False, bufsize=0 ) # Start thread to read stderr stderr_thread = threading.Thread(target=self._read_stderr) stderr_thread.daemon = True stderr_thread.start() # Start thread to read stdout stdout_thread = threading.Thread(target=self._read_stdout) stdout_thread.daemon = True stdout_thread.start() time.sleep(1) def _read_stderr(self): """Read stderr output from server""" while self.server_process and self.server_process.poll() is None: try: line = self.server_process.stderr.readline() if line: print(f"SERVER STDERR: {line.decode().strip()}") except: break def _read_stdout(self): """Read stdout output from server without Content-Length parsing""" buffer = b"" while self.server_process and self.server_process.poll() is None: try: data = self.server_process.stdout.readline() if not data: break buffer += data # Try to parse JSON directly from each line try: message = buffer.strip() if message: parsed = json.loads(message.decode()) self.message_queue.put(parsed) buffer = b"" except json.JSONDecodeError: # If not valid JSON yet, continue reading continue except: break def send_jsonrpc_request(self, method, params=None, id=1): """Send a JSON-RPC request without Content-Length header""" request = { "jsonrpc": "2.0", "method": method, "id": id, "params": params or {} } request_str = json.dumps(request) message = f"{request_str}\n" self.server_process.stdin.write(message.encode()) self.server_process.stdin.flush() try: response = self.message_queue.get(timeout=10) return response except queue.Empty: print("TIMEOUT: No response received") return None def test_resource_registration_and_discovery(self): """Test resource registration and discovery""" print("\n=== Testing Resource Registration and Discovery ===") # Test resources/list print("\nTesting resources/list...") list_response = self.send_jsonrpc_request("resources/list", id=1) if list_response and "result" in list_response: result = list_response["result"] if "resources" in result: resources = result["resources"] print(f"✓ Found {len(resources)} resources") # Check if our test files are listed resource_uris = [r.get("uri", "") for r in resources] test_files = ["test.txt", "test.json", "binary.dat"] for filename in test_files: found = any(filename in uri for uri in resource_uris) if found: print(f"✓ Found {filename} in resource list") else: print(f"✗ Missing {filename} in resource list") return True else: print("✗ No resources field in response") return False else: print("✗ Failed to list resources") return False def test_resource_content_retrieval(self): """Test resource content retrieval""" print("\n=== Testing Resource Content Retrieval ===") # Test reading text file print("\nTesting text file read...") txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}" txt_response = self.send_jsonrpc_request("resources/read", {"uri": txt_uri}, id=2) if txt_response and "result" in txt_response: result = txt_response["result"] if "contents" in result and len(result["contents"]) > 0: content = result["contents"][0] if content.get("text") == "Hello, World!": print("✓ Text file content correct") else: print(f"✗ Text file content incorrect: {content.get('text')}") else: print("✗ No content in text file response") else: print("✗ Failed to read text file") # Test reading JSON file print("\nTesting JSON file read...") json_uri = f"file://{os.path.join(self.test_dir, 'test.json')}" json_response = self.send_jsonrpc_request("resources/read", {"uri": json_uri}, id=3) if json_response and "result" in json_response: result = json_response["result"] if "contents" in result and len(result["contents"]) > 0: content = result["contents"][0] try: json_data = json.loads(content.get("text", "{}")) if json_data.get("message") == "test" and json_data.get("value") == 42: print("✓ JSON file content correct") else: print(f"✗ JSON file content incorrect: {json_data}") except json.JSONDecodeError: print("✗ JSON file content is not valid JSON") else: print("✗ No content in JSON file response") else: print("✗ Failed to read JSON file") # Test reading binary file print("\nTesting binary file read...") bin_uri = f"file://{os.path.join(self.test_dir, 'binary.dat')}" bin_response = self.send_jsonrpc_request("resources/read", {"uri": bin_uri}, id=4) if bin_response and "result" in bin_response: result = bin_response["result"] if "contents" in result and len(result["contents"]) > 0: content = result["contents"][0] if "blob" in content: print("✓ Binary file read as blob") else: print("✗ Binary file not read as blob") else: print("✗ No content in binary file response") else: print("✗ Failed to read binary file") def test_resource_subscription_and_notifications(self): """Test resource subscription and notifications""" print("\n=== Testing Resource Subscription and Notifications ===") # Test subscription print("\nTesting resource subscription...") txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}" sub_response = self.send_jsonrpc_request("resources/subscribe", {"uri": txt_uri}, id=5) if sub_response and "result" in sub_response: print("✓ Resource subscription successful") # Test unsubscription print("\nTesting resource unsubscription...") unsub_response = self.send_jsonrpc_request("resources/unsubscribe", {"uri": txt_uri}, id=6) if unsub_response and "result" in unsub_response: print("✓ Resource unsubscription successful") else: print("✗ Resource unsubscription failed") else: print("✗ Resource subscription failed") def test_resource_templates(self): """Test resource template functionality""" print("\n=== Testing Resource Templates ===") # Test templates/list print("\nTesting resources/templates/list...") templates_response = self.send_jsonrpc_request("resources/templates/list", id=7) if templates_response and "result" in templates_response: result = templates_response["result"] if "templates" in result: templates = result["templates"] print(f"✓ Found {len(templates)} templates") return True else: print("✗ No templates field in response") return False else: print("✗ Failed to list templates") return False def test_error_handling(self): """Test error handling for resources""" print("\n=== Testing Resource Error Handling ===") # Test reading non-existent file print("\nTesting non-existent file...") fake_uri = "file:///non/existent/file.txt" error_response = self.send_jsonrpc_request("resources/read", {"uri": fake_uri}, id=8) if error_response and "error" in error_response: print("✓ Non-existent file handled correctly") else: print("✗ Non-existent file not handled properly") # Test missing URI parameter print("\nTesting missing URI parameter...") missing_response = self.send_jsonrpc_request("resources/read", {}, id=9) if missing_response and "error" in missing_response: print("✓ Missing URI parameter handled correctly") else: print("✗ Missing URI parameter not handled properly") def cleanup(self): """Clean up test environment""" if self.test_dir and os.path.exists(self.test_dir): shutil.rmtree(self.test_dir) def stop_server(self): """Stop MCP server process""" if self.server_process: print("\nStopping server...") self.server_process.terminate() try: self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() self.server_process = None def main(): if len(sys.argv) != 2: print("Usage: python3 test_resources.py ") sys.exit(1) server_path = sys.argv[1] if not os.path.exists(server_path): print(f"Error: Server executable '{server_path}' not found") sys.exit(1) tester = MCPServerTester(server_path) try: tester.start_server() time.sleep(2) # Initialize first init_params = { "protocolVersion": "2025-11-25", "capabilities": {}, "clientInfo": { "name": "test-client", "version": "1.0.0" } } init_response = tester.send_jsonrpc_request("initialize", init_params, id=0) if init_response: print("✓ Server initialized successfully") # Run resource tests tester.test_resource_registration_and_discovery() tester.test_resource_content_retrieval() tester.test_resource_subscription_and_notifications() tester.test_resource_templates() tester.test_error_handling() print("\n✓ All resource tests completed successfully") else: print("✗ Failed to initialize server") except KeyboardInterrupt: print("\nTest interrupted by user") except Exception as e: print(f"\nError during test: {e}") import traceback traceback.print_exc() finally: tester.stop_server() tester.cleanup() if __name__ == "__main__": main()