test_converters.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. #!/usr/bin/env python3
  2. """
  3. Test the converters implemented in compat.vala
  4. """
  5. import subprocess
  6. import json
  7. import sys
  8. import os
  9. def test_inject_content_length():
  10. """Test the InjectContentLength converter"""
  11. print("=== Testing InjectContentLength Converter ===")
  12. # Create a simple test program in Vala that uses the converter
  13. test_code = """
  14. using GLib;
  15. using Mcp.Core.Compat;
  16. public class TestInjectContentLength {
  17. public static int main (string[] args) {
  18. // Create the converter
  19. var converter = new InjectContentLength ();
  20. // Test input: JSON message without Content-Length header
  21. string input = "{\\"jsonrpc\\": \\"2.0\\", \\"method\\": \\"ping\\", \\"id\\": 1}\\n";
  22. // Output buffer
  23. uint8[] output = new uint8[1024];
  24. size_t bytes_read, bytes_written;
  25. // Convert
  26. var result = converter.convert (input.data, output, ConverterFlags.NONE,
  27. out bytes_read, out bytes_written);
  28. // Extract the output
  29. string output_str = (string)output;
  30. output_str = output_str.substring (0, (int)bytes_written);
  31. // Print the result
  32. print ("Input: %s", input);
  33. print ("Output: %s", output_str);
  34. // Check if Content-Length header was added
  35. if (output_str.has_prefix ("Content-Length:")) {
  36. print ("✓ Content-Length header injected successfully\\n");
  37. return 0;
  38. } else {
  39. print ("✗ Content-Length header not found\\n");
  40. return 1;
  41. }
  42. }
  43. }
  44. """
  45. # Write the test code to a file
  46. with open("test_inject.vala", "w") as f:
  47. f.write(test_code)
  48. # Compile and run the test
  49. try:
  50. # Compile
  51. compile_cmd = [
  52. "valac",
  53. "--pkg", "glib-2.0",
  54. "--pkg", "gio-2.0",
  55. "--pkg", "json-glib-1.0",
  56. "--pkg", "jsonrpc-glib-1.0",
  57. "--pkg", "gee-0.8",
  58. "--pkg", "posix",
  59. "--vapidir", "builddir/src",
  60. "--pkg", "mcp-vala",
  61. "test_inject.vala",
  62. "-o", "test_inject"
  63. ]
  64. result = subprocess.run(compile_cmd, capture_output=True, text=True, cwd=".")
  65. if result.returncode != 0:
  66. print(f"Compilation failed: {result.stderr}")
  67. print(f"Compilation stdout: {result.stdout}")
  68. return False
  69. # Run
  70. run_result = subprocess.run(["./test_inject"], capture_output=True, text=True)
  71. print(run_result.stdout)
  72. if run_result.returncode == 0:
  73. return True
  74. else:
  75. print(f"Test failed: {run_result.stderr}")
  76. return False
  77. except Exception as e:
  78. print(f"Error running test: {e}")
  79. return False
  80. finally:
  81. # Clean up
  82. for f in ["test_inject.vala", "test_inject"]:
  83. try:
  84. os.remove(f)
  85. except:
  86. pass
  87. def test_strip_content_length():
  88. """Test the StripContentLength converter"""
  89. print("=== Testing StripContentLength Converter ===")
  90. # Create a simple test program in Vala that uses the converter
  91. test_code = """
  92. using GLib;
  93. using Mcp.Core.Compat;
  94. public class TestStripContentLength {
  95. public static int main (string[] args) {
  96. // Create the converter
  97. var converter = new StripContentLength ();
  98. // Test input: JSON message with Content-Length header
  99. string input = "Content-Length: 45\\r\\n\\r\\n{\\"jsonrpc\\": \\"2.0\\", \\"method\\": \\"ping\\", \\"id\\": 1}";
  100. // Output buffer
  101. uint8[] output = new uint8[1024];
  102. size_t bytes_read, bytes_written;
  103. // Convert
  104. var result = converter.convert (input.data, output, ConverterFlags.NONE,
  105. out bytes_read, out bytes_written);
  106. // Extract the output
  107. string output_str = (string)output;
  108. output_str = output_str.substring (0, (int)bytes_written);
  109. // Print the result
  110. print ("Input: %s", input);
  111. print ("Output: %s", output_str);
  112. // Check if Content-Length header was removed and newline was added
  113. if (!output_str.has_prefix ("Content-Length:") && output_str.has_suffix ("\\n")) {
  114. print ("✓ Content-Length header stripped and newline added successfully\\n");
  115. return 0;
  116. } else {
  117. print ("✗ Content-Length header not stripped or newline not added\\n");
  118. return 1;
  119. }
  120. }
  121. }
  122. """
  123. # Write the test code to a file
  124. with open("test_strip.vala", "w") as f:
  125. f.write(test_code)
  126. # Compile and run the test
  127. try:
  128. # Compile
  129. compile_cmd = [
  130. "valac",
  131. "--pkg", "glib-2.0",
  132. "--pkg", "gio-2.0",
  133. "--pkg", "json-glib-1.0",
  134. "--pkg", "jsonrpc-glib-1.0",
  135. "--pkg", "gee-0.8",
  136. "--pkg", "posix",
  137. "--vapidir", "builddir/src",
  138. "--pkg", "mcp-vala",
  139. "test_strip.vala",
  140. "-o", "test_strip"
  141. ]
  142. result = subprocess.run(compile_cmd, capture_output=True, text=True, cwd=".")
  143. if result.returncode != 0:
  144. print(f"Compilation failed: {result.stderr}")
  145. return False
  146. # Run
  147. run_result = subprocess.run(["./test_strip"], capture_output=True, text=True)
  148. print(run_result.stdout)
  149. if run_result.returncode == 0:
  150. return True
  151. else:
  152. print(f"Test failed: {run_result.stderr}")
  153. return False
  154. except Exception as e:
  155. print(f"Error running test: {e}")
  156. return False
  157. finally:
  158. # Clean up
  159. for f in ["test_strip.vala", "test_strip"]:
  160. try:
  161. os.remove(f)
  162. except:
  163. pass
  164. def main():
  165. print("Testing MCP Converters\n")
  166. # Test both converters
  167. inject_success = test_inject_content_length()
  168. strip_success = test_strip_content_length()
  169. if inject_success and strip_success:
  170. print("✓ All converter tests passed!")
  171. return 0
  172. else:
  173. print("✗ Some converter tests failed!")
  174. return 1
  175. if __name__ == "__main__":
  176. sys.exit(main())