| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- #!/usr/bin/env python3
- """
- Resource handling testing for MCP server
- """
- import subprocess
- import time
- import json
- import sys
- import os
- import threading
- import queue
- import tempfile
- import shutil
- class MCPServerTester:
- def __init__(self, server_path):
- self.server_path = server_path
- self.server_process = None
- self.message_queue = queue.Queue()
- self.test_dir = None
-
- def start_server(self):
- """Start MCP server process"""
- # Create a temporary directory for testing
- self.test_dir = tempfile.mkdtemp(prefix="mcp_test_")
-
- # Create some test files
- with open(os.path.join(self.test_dir, "test.txt"), "w") as f:
- f.write("Hello, World!")
- with open(os.path.join(self.test_dir, "test.json"), "w") as f:
- json.dump({"message": "test", "value": 42}, f)
- with open(os.path.join(self.test_dir, "binary.dat"), "wb") as f:
- f.write(b"\x00\x01\x02\x03\x04\x05")
-
- print(f"Starting MCP server: {self.server_path} with test directory: {self.test_dir}")
- self.server_process = subprocess.Popen(
- [self.server_path, self.test_dir],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=False,
- bufsize=0
- )
-
- # Start thread to read stderr
- stderr_thread = threading.Thread(target=self._read_stderr)
- stderr_thread.daemon = True
- stderr_thread.start()
-
- # Start thread to read stdout
- stdout_thread = threading.Thread(target=self._read_stdout)
- stdout_thread.daemon = True
- stdout_thread.start()
-
- time.sleep(1)
-
- def _read_stderr(self):
- """Read stderr output from server"""
- while self.server_process and self.server_process.poll() is None:
- try:
- line = self.server_process.stderr.readline()
- if line:
- print(f"SERVER STDERR: {line.decode().strip()}")
- except:
- break
-
- def _read_stdout(self):
- """Read stdout output from server without Content-Length parsing"""
- buffer = b""
- while self.server_process and self.server_process.poll() is None:
- try:
- data = self.server_process.stdout.readline()
- if not data:
- break
- buffer += data
-
- # Try to parse JSON directly from each line
- try:
- message = buffer.strip()
- if message:
- parsed = json.loads(message.decode())
- self.message_queue.put(parsed)
- buffer = b""
- except json.JSONDecodeError:
- # If not valid JSON yet, continue reading
- continue
- except:
- break
-
- def send_jsonrpc_request(self, method, params=None, id=1):
- """Send a JSON-RPC request without Content-Length header"""
- request = {
- "jsonrpc": "2.0",
- "method": method,
- "id": id,
- "params": params or {}
- }
-
- request_str = json.dumps(request)
- message = f"{request_str}\n"
-
- self.server_process.stdin.write(message.encode())
- self.server_process.stdin.flush()
-
- try:
- response = self.message_queue.get(timeout=10)
- return response
- except queue.Empty:
- print("TIMEOUT: No response received")
- return None
-
- def test_resource_registration_and_discovery(self):
- """Test resource registration and discovery"""
- print("\n=== Testing Resource Registration and Discovery ===")
-
- # Test resources/list
- print("\nTesting resources/list...")
- list_response = self.send_jsonrpc_request("resources/list", id=1)
-
- if list_response and "result" in list_response:
- result = list_response["result"]
- if "resources" in result:
- resources = result["resources"]
- print(f"✓ Found {len(resources)} resources")
-
- # Check if our test files are listed
- resource_uris = [r.get("uri", "") for r in resources]
- test_files = ["test.txt", "test.json", "binary.dat"]
-
- for filename in test_files:
- found = any(filename in uri for uri in resource_uris)
- if found:
- print(f"✓ Found {filename} in resource list")
- else:
- print(f"✗ Missing {filename} in resource list")
-
- return True
- else:
- print("✗ No resources field in response")
- return False
- else:
- print("✗ Failed to list resources")
- return False
-
- def test_resource_content_retrieval(self):
- """Test resource content retrieval"""
- print("\n=== Testing Resource Content Retrieval ===")
-
- # Test reading text file
- print("\nTesting text file read...")
- txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}"
- txt_response = self.send_jsonrpc_request("resources/read", {"uri": txt_uri}, id=2)
-
- if txt_response and "result" in txt_response:
- result = txt_response["result"]
- if "contents" in result and len(result["contents"]) > 0:
- content = result["contents"][0]
- if content.get("text") == "Hello, World!":
- print("✓ Text file content correct")
- else:
- print(f"✗ Text file content incorrect: {content.get('text')}")
- else:
- print("✗ No content in text file response")
- else:
- print("✗ Failed to read text file")
-
- # Test reading JSON file
- print("\nTesting JSON file read...")
- json_uri = f"file://{os.path.join(self.test_dir, 'test.json')}"
- json_response = self.send_jsonrpc_request("resources/read", {"uri": json_uri}, id=3)
-
- if json_response and "result" in json_response:
- result = json_response["result"]
- if "contents" in result and len(result["contents"]) > 0:
- content = result["contents"][0]
- try:
- json_data = json.loads(content.get("text", "{}"))
- if json_data.get("message") == "test" and json_data.get("value") == 42:
- print("✓ JSON file content correct")
- else:
- print(f"✗ JSON file content incorrect: {json_data}")
- except json.JSONDecodeError:
- print("✗ JSON file content is not valid JSON")
- else:
- print("✗ No content in JSON file response")
- else:
- print("✗ Failed to read JSON file")
-
- # Test reading binary file
- print("\nTesting binary file read...")
- bin_uri = f"file://{os.path.join(self.test_dir, 'binary.dat')}"
- bin_response = self.send_jsonrpc_request("resources/read", {"uri": bin_uri}, id=4)
-
- if bin_response and "result" in bin_response:
- result = bin_response["result"]
- if "contents" in result and len(result["contents"]) > 0:
- content = result["contents"][0]
- if "blob" in content:
- print("✓ Binary file read as blob")
- else:
- print("✗ Binary file not read as blob")
- else:
- print("✗ No content in binary file response")
- else:
- print("✗ Failed to read binary file")
-
- def test_resource_subscription_and_notifications(self):
- """Test resource subscription and notifications"""
- print("\n=== Testing Resource Subscription and Notifications ===")
-
- # Test subscription
- print("\nTesting resource subscription...")
- txt_uri = f"file://{os.path.join(self.test_dir, 'test.txt')}"
- sub_response = self.send_jsonrpc_request("resources/subscribe", {"uri": txt_uri}, id=5)
-
- if sub_response and "result" in sub_response:
- print("✓ Resource subscription successful")
-
- # Test unsubscription
- print("\nTesting resource unsubscription...")
- unsub_response = self.send_jsonrpc_request("resources/unsubscribe", {"uri": txt_uri}, id=6)
-
- if unsub_response and "result" in unsub_response:
- print("✓ Resource unsubscription successful")
- else:
- print("✗ Resource unsubscription failed")
- else:
- print("✗ Resource subscription failed")
-
- def test_resource_templates(self):
- """Test resource template functionality"""
- print("\n=== Testing Resource Templates ===")
-
- # Test templates/list
- print("\nTesting resources/templates/list...")
- templates_response = self.send_jsonrpc_request("resources/templates/list", id=7)
-
- if templates_response and "result" in templates_response:
- result = templates_response["result"]
- if "templates" in result:
- templates = result["templates"]
- print(f"✓ Found {len(templates)} templates")
- return True
- else:
- print("✗ No templates field in response")
- return False
- else:
- print("✗ Failed to list templates")
- return False
-
- def test_error_handling(self):
- """Test error handling for resources"""
- print("\n=== Testing Resource Error Handling ===")
-
- # Test reading non-existent file
- print("\nTesting non-existent file...")
- fake_uri = "file:///non/existent/file.txt"
- error_response = self.send_jsonrpc_request("resources/read", {"uri": fake_uri}, id=8)
-
- if error_response and "error" in error_response:
- print("✓ Non-existent file handled correctly")
- else:
- print("✗ Non-existent file not handled properly")
-
- # Test missing URI parameter
- print("\nTesting missing URI parameter...")
- missing_response = self.send_jsonrpc_request("resources/read", {}, id=9)
-
- if missing_response and "error" in missing_response:
- print("✓ Missing URI parameter handled correctly")
- else:
- print("✗ Missing URI parameter not handled properly")
-
- def cleanup(self):
- """Clean up test environment"""
- if self.test_dir and os.path.exists(self.test_dir):
- shutil.rmtree(self.test_dir)
-
- def stop_server(self):
- """Stop MCP server process"""
- if self.server_process:
- print("\nStopping server...")
- self.server_process.terminate()
- try:
- self.server_process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- self.server_process.kill()
- self.server_process = None
- def main():
- if len(sys.argv) != 2:
- print("Usage: python3 test_resources.py <server-executable>")
- sys.exit(1)
-
- server_path = sys.argv[1]
- if not os.path.exists(server_path):
- print(f"Error: Server executable '{server_path}' not found")
- sys.exit(1)
-
- tester = MCPServerTester(server_path)
-
- try:
- tester.start_server()
- time.sleep(2)
-
- # Initialize first
- init_params = {
- "protocolVersion": "2025-11-25",
- "capabilities": {},
- "clientInfo": {
- "name": "test-client",
- "version": "1.0.0"
- }
- }
- init_response = tester.send_jsonrpc_request("initialize", init_params, id=0)
-
- if init_response:
- print("✓ Server initialized successfully")
-
- # Run resource tests
- tester.test_resource_registration_and_discovery()
- tester.test_resource_content_retrieval()
- tester.test_resource_subscription_and_notifications()
- tester.test_resource_templates()
- tester.test_error_handling()
-
- print("\n✓ All resource tests completed successfully")
- else:
- print("✗ Failed to initialize server")
-
- except KeyboardInterrupt:
- print("\nTest interrupted by user")
- except Exception as e:
- print(f"\nError during test: {e}")
- import traceback
- traceback.print_exc()
- finally:
- tester.stop_server()
- tester.cleanup()
- if __name__ == "__main__":
- main()
|