Server.vala 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. using MHD;
  2. using Invercargill;
  3. using Invercargill.DataStructures;
  4. namespace Astralis {
  5. public class Server : Object {
  6. private Daemon daemon;
  7. private Pipeline pipeline;
  8. private int port;
  9. private HashSet<RequestContext> request_contexts;
  10. private HashSet<ResponseContext> response_contexts;
  11. private SocketMonitor? socket_monitor;
  12. private MainLoop main_loop;
  13. public Server(int port, Pipeline pipeline) {
  14. this.port = port;
  15. this.pipeline = pipeline;
  16. this.request_contexts = new HashSet<RequestContext>();
  17. this.response_contexts = new HashSet<ResponseContext>();
  18. this.socket_monitor = null;
  19. this.main_loop = new MainLoop();
  20. }
  21. private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) {
  22. // On initial call for request, simply set up the context
  23. if (con_cls[0] == null) {
  24. var context = new RequestContext();
  25. request_contexts.add(context);
  26. con_cls[0] = (void*) context;
  27. return Result.YES;
  28. }
  29. // Subsequent calls are provided the context
  30. RequestContext context = (RequestContext) ((Object*) con_cls[0]);
  31. // On the second call we populate the `HttpRequest` object and begin the handler
  32. if (context.handler_context == null) {
  33. // Extract all headers from the connection (case-insensitive keys)
  34. var headers = new Catalogue<string, string>(case_insensitive_hash, case_insensitive_equal);
  35. var headers_collector = new KeyValueCollector(headers);
  36. MHD.get_connection_values(connection, MHD.ValueKind.HEADER_KIND, KeyValueCollector.key_value_iterator, (void*)headers_collector);
  37. // Extract query parameters from the connection
  38. var query_params = new Catalogue<string, string>();
  39. var query_collector = new KeyValueCollector(query_params);
  40. MHD.get_connection_values(connection, MHD.ValueKind.GET_ARGUMENT_KIND, KeyValueCollector.key_value_iterator, (void*)query_collector);
  41. // Extract cookies from the connection
  42. var cookies = new Catalogue<string, string>();
  43. var cookies_collector = new KeyValueCollector(cookies);
  44. MHD.get_connection_values(connection, MHD.ValueKind.COOKIE_KIND, KeyValueCollector.key_value_iterator, (void*)cookies_collector);
  45. // Get remote address
  46. var remote_address = get_remote_address(connection);
  47. // Create HttpRequest with all required parameters
  48. var http_request = new HttpRequest(
  49. url ?? "",
  50. method ?? "",
  51. version ?? "",
  52. headers,
  53. context.request_body,
  54. query_params,
  55. cookies,
  56. remote_address
  57. );
  58. var http_context = new HttpContext(http_request);
  59. context.handler_context = http_context;
  60. // Kick off the handler
  61. pipeline.run.begin(http_context, (obj, res) => {
  62. try {
  63. context.handler_result = pipeline.run.end(res);
  64. }
  65. catch(Error e) {
  66. context.handler_error = e;
  67. }
  68. if(context.handler_finished()) {
  69. // Just resume - access_handler will be re-entered and handle the response
  70. MHD.resume_connection(connection);
  71. this.request_monitor_update();
  72. }
  73. });
  74. }
  75. // On the second, and all subsequent calls - read the request body:
  76. if (upload_data_size[0] != 0) {
  77. var data = new uint8[upload_data_size[0]];
  78. Memory.copy(data, upload_data, upload_data_size[0]);
  79. context.request_body.write(data);
  80. upload_data_size[0] = 0;
  81. return Result.YES;
  82. }
  83. // End of request body data
  84. else {
  85. context.request_body.complete();
  86. if(context.request_reception_finished()) {
  87. return respond(connection, context);
  88. }
  89. MHD.suspend_connection(connection);
  90. return Result.YES;
  91. }
  92. }
  93. public void run() {
  94. // External select mode: no USE_SELECT_INTERNALLY flag
  95. // This means MHD creates no threads - we drive it from GLib main loop
  96. // USE_TURBO enables faster processing by not explicitly closing connections
  97. daemon = Daemon.start(
  98. MHD.ALLOW_SUSPEND_RESUME | MHD.USE_TURBO,
  99. (uint16) this.port,
  100. null,
  101. (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
  102. return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
  103. },
  104. MHD.OPTION_NOTIFY_COMPLETED,
  105. (void*) request_completed_callback,
  106. null,
  107. MHD.OPTION_END
  108. );
  109. if (daemon == null) {
  110. error("Failed to start daemon");
  111. }
  112. // Start socket monitoring integrated with GLib main loop
  113. socket_monitor = new SocketMonitor(daemon);
  114. socket_monitor.start();
  115. // Run GLib main loop
  116. main_loop.run();
  117. }
  118. /// Stop the server and clean up resources
  119. public void stop() {
  120. if (socket_monitor != null) {
  121. socket_monitor.stop();
  122. socket_monitor = null;
  123. }
  124. if (main_loop.is_running()) {
  125. main_loop.quit();
  126. }
  127. // MHD_stop_daemon is called by Daemon's destructor when daemon is finalized
  128. }
  129. internal void request_monitor_update() {
  130. if (socket_monitor != null) {
  131. socket_monitor.force_update();
  132. }
  133. }
  134. private Result respond(Connection connection, RequestContext context) {
  135. var result = Result.NO;
  136. if(context.handler_error != null) {
  137. result = handle_error(context.handler_error, connection, context);
  138. }
  139. else if(context.handler_result != null) {
  140. result = send_result(connection, context.handler_result, context);
  141. }
  142. request_contexts.remove(context);
  143. return result;
  144. }
  145. private Result send_result(Connection connection, HttpResult result, RequestContext? request_context = null) {
  146. uint64 size = result.content_length ?? -1;
  147. var response_context = new ResponseContext(this, connection, result);
  148. response_contexts.add(response_context);
  149. // Link the ResponseContext to the RequestContext for disconnect notification
  150. if (request_context != null) {
  151. request_context.response_context = response_context;
  152. }
  153. var response = new Response.from_callback(
  154. size,
  155. 1048576,
  156. (cls, pos, buf, max) => {
  157. var ctx = (ResponseContext) cls;
  158. var bytes_read = ctx.body_output.read_chunk(buf, max);
  159. if(bytes_read == 0) {
  160. if(!ctx.send_body_finished) {
  161. ctx.suspend_connection();
  162. }
  163. else if(ctx.send_body_error == null) {
  164. ctx.server.response_contexts.remove(ctx);
  165. return CONTENT_READER_END_OF_STREAM;
  166. }
  167. else {
  168. printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(ctx.send_body_error.message)\n");
  169. ctx.server.response_contexts.remove(ctx);
  170. return CONTENT_READER_END_WITH_ERROR;
  171. }
  172. }
  173. return (ssize_t)bytes_read;
  174. },
  175. (void*)response_context,
  176. null);
  177. result.headers.iterate((kv) => {
  178. response.add_header(kv.key, kv.value);
  179. });
  180. response_context.begin_response();
  181. var res = MHD.queue_response(connection, result.status, response);
  182. if(res != MHD.Result.YES) {
  183. printerr("Astralis Internal Error: Unable to queue response\n");
  184. }
  185. return res;
  186. }
  187. private Result handle_error(Error error, Connection connection, RequestContext? request_context = null) {
  188. printerr(@"Astralis Internal Server Error: Unhandled Error: $(error.message)\n");
  189. return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR), request_context);
  190. }
  191. // Static callback wrapper for MHD_OPTION_NOTIFY_COMPLETED
  192. private static void request_completed_callback(void* cls, Connection connection, void** con_cls, RequestTerminationCode toe) {
  193. // We don't have access to the Server instance here, so we handle cleanup directly
  194. if (con_cls[0] == null) {
  195. return;
  196. }
  197. RequestContext context = (RequestContext) ((Object*) con_cls[0]);
  198. // If the request was terminated abnormally, disconnect the output stream
  199. if (toe != RequestTerminationCode.COMPLETED_OK) {
  200. if (context.response_context != null) {
  201. context.response_context.body_output.close_connection();
  202. }
  203. }
  204. }
  205. // Simple memory-based InputStream for request body
  206. private class MemoryInputStream : InputStream {
  207. private uint8[] data;
  208. private size_t pos;
  209. public MemoryInputStream.from_bytes(uint8[] data) {
  210. this.data = data;
  211. this.pos = 0;
  212. }
  213. public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
  214. if (pos >= data.length) {
  215. return 0; // EOF
  216. }
  217. size_t remaining = data.length - pos;
  218. size_t to_read = buffer.length < remaining ? buffer.length : remaining;
  219. Memory.copy(buffer, data[pos:pos + to_read], to_read);
  220. pos += to_read;
  221. return (ssize_t)to_read;
  222. }
  223. public override bool close(Cancellable? cancellable = null) throws IOError {
  224. return true;
  225. }
  226. }
  227. // Helper class to collect key-value pairs (headers, query params, cookies) from MHD connection
  228. private class KeyValueCollector {
  229. private unowned Catalogue<string, string> catalogue;
  230. public KeyValueCollector(Catalogue<string, string> catalogue) {
  231. this.catalogue = catalogue;
  232. }
  233. public static Result key_value_iterator(void* cls, MHD.ValueKind kind, string key, string? value) {
  234. // cls is a pointer to KeyValueCollector instance
  235. var collector = (KeyValueCollector*) cls;
  236. if (key != null && value != null) {
  237. collector->catalogue.add(key, value);
  238. }
  239. return MHD.Result.YES;
  240. }
  241. }
  242. // Get remote address from connection
  243. private string? get_remote_address(Connection connection) {
  244. // Get connection info to retrieve client address
  245. var info = MHD.get_connection_info(connection, MHD.ConnectionInfoType.CLIENT_ADDRESS);
  246. if (info == null) {
  247. return null;
  248. }
  249. // The client_addr field points to a sockaddr structure
  250. // We need to cast it properly and extract the IP address
  251. var addr_ptr = info.client_addr;
  252. if (addr_ptr == null) {
  253. return null;
  254. }
  255. // Create SocketAddress from native sockaddr structure
  256. // Use 128 as the maximum size for sockaddr (IPv6)
  257. var sock_addr = SocketAddress.from_native((void*)addr_ptr, 128);
  258. // Cast to InetSocketAddress to get the address
  259. var inet_addr = (InetSocketAddress) sock_addr;
  260. // Get the InetAddress and convert to string
  261. return inet_addr.address.to_string();
  262. }
  263. // Case-insensitive hash function for strings (for HTTP header lookups)
  264. private static uint case_insensitive_hash(string key) {
  265. return key.down().hash();
  266. }
  267. // Case-insensitive equality function for strings (for HTTP header lookups)
  268. private static bool case_insensitive_equal(string a, string b) {
  269. return a.down() == b.down();
  270. }
  271. }
  272. }