test_mcp_client.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/env python3
  2. """
  3. Simple test script to verify VAPI resource provider functionality.
  4. This script sends basic MCP requests to test the VAPI resource provider.
  5. """
  6. import json
  7. import subprocess
  8. import sys
  9. import time
  10. def send_mcp_request(method, params=None):
  11. """Send an MCP request to the server and return the response."""
  12. request = {
  13. "jsonrpc": "2.0",
  14. "id": 1,
  15. "method": method
  16. }
  17. if params:
  18. request["params"] = params
  19. # Start the MCP server process
  20. process = subprocess.Popen(
  21. ["./builddir/valaq", "--mcp"],
  22. stdin=subprocess.PIPE,
  23. stdout=subprocess.PIPE,
  24. stderr=subprocess.PIPE,
  25. text=True
  26. )
  27. try:
  28. # Send the request
  29. request_json = json.dumps(request) + "\n"
  30. stdout, stderr = process.communicate(input=request_json, timeout=5)
  31. if stderr:
  32. print(f"STDERR: {stderr}")
  33. if stdout:
  34. try:
  35. response = json.loads(stdout.strip())
  36. return response
  37. except json.JSONDecodeError as e:
  38. print(f"Failed to parse JSON response: {e}")
  39. print(f"Raw response: {stdout}")
  40. return None
  41. else:
  42. print("No response from server")
  43. return None
  44. except subprocess.TimeoutExpired:
  45. process.kill()
  46. print("Server timed out")
  47. return None
  48. except Exception as e:
  49. print(f"Error communicating with server: {e}")
  50. return None
  51. def test_list_resources():
  52. """Test listing VAPI resources."""
  53. print("Testing list_resources...")
  54. response = send_mcp_request("resources/list")
  55. if response and "result" in response:
  56. resources = response["result"].get("resources", [])
  57. print(f"Found {len(resources)} resources:")
  58. for resource in resources:
  59. print(f" - {resource['uri']}: {resource.get('description', 'No description')}")
  60. return True
  61. else:
  62. print(f"Failed to list resources: {response}")
  63. return False
  64. def test_read_resource():
  65. """Test reading a VAPI resource."""
  66. print("\nTesting read_resource for test_namespace_methods.vapi...")
  67. response = send_mcp_request("resources/read", {"uri": "vapi://test_namespace_methods"})
  68. if response and "result" in response:
  69. contents = response["result"].get("contents", [])
  70. if contents:
  71. print(f"Resource content (first 200 chars): {contents[0]['text'][:200]}...")
  72. return True
  73. else:
  74. print("No content in response")
  75. return False
  76. else:
  77. print(f"Failed to read resource: {response}")
  78. return False
  79. def test_read_symbol_path():
  80. """Test reading a VAPI symbol path."""
  81. print("\nTesting read_resource for vapi://test_namespace_methods/Invercargill...")
  82. response = send_mcp_request("resources/read", {"uri": "vapi://test_namespace_methods/Invercargill"})
  83. if response and "result" in response:
  84. contents = response["result"].get("contents", [])
  85. if contents:
  86. print(f"Symbol path content (first 200 chars): {contents[0]['text'][:200]}...")
  87. return True
  88. else:
  89. print("No content in response")
  90. return False
  91. else:
  92. print(f"Failed to read symbol path: {response}")
  93. return False
  94. def test_list_templates():
  95. """Test listing resource templates."""
  96. print("\nTesting resources/templates/list...")
  97. response = send_mcp_request("resources/templates/list")
  98. if response and "result" in response:
  99. templates = response["result"].get("resourceTemplates", [])
  100. print(f"Found {len(templates)} templates:")
  101. for template in templates:
  102. print(f" - {template['name']}: {template.get('uriTemplate', 'No URI template')}")
  103. print(f" Description: {template.get('description', 'No description')}")
  104. return True
  105. else:
  106. print(f"Failed to list templates: {response}")
  107. return False
  108. def main():
  109. """Run all tests."""
  110. print("Testing VAPI Resource Provider MCP Implementation")
  111. print("=" * 50)
  112. tests = [
  113. test_list_resources,
  114. test_read_resource,
  115. test_read_symbol_path,
  116. test_list_templates
  117. ]
  118. passed = 0
  119. total = len(tests)
  120. for test in tests:
  121. if test():
  122. passed += 1
  123. time.sleep(1) # Small delay between tests
  124. print(f"\nTest Results: {passed}/{total} tests passed")
  125. if passed == total:
  126. print("All tests passed! VAPI resource provider is working correctly.")
  127. return 0
  128. else:
  129. print("Some tests failed. Please check the implementation.")
  130. return 1
  131. if __name__ == "__main__":
  132. sys.exit(main())