using MHD; using Invercargill; using Invercargill.DataStructures; namespace Astralis { public class Server : Object { private Daemon daemon; private Pipeline pipeline; private int port; private HashSet request_contexts; private HashSet response_contexts; private SocketMonitor? socket_monitor; private MainLoop main_loop; public Server(int port, Pipeline pipeline) { this.port = port; this.pipeline = pipeline; this.request_contexts = new HashSet(); this.response_contexts = new HashSet(); this.socket_monitor = null; this.main_loop = new MainLoop(); } private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) { // On initial call for request, simply set up the context if (con_cls[0] == null) { var context = new RequestContext(); request_contexts.add(context); con_cls[0] = (void*) context; return Result.YES; } // Subsequent calls are provided the context RequestContext context = (RequestContext) ((Object*) con_cls[0]); // On the second call we populate the `HttpRequest` object and begin the handler if (context.handler_context == null) { // Extract all headers from the connection (case-insensitive keys) var headers = new Catalogue(case_insensitive_hash, case_insensitive_equal); var headers_collector = new KeyValueCollector(headers); MHD.get_connection_values(connection, MHD.ValueKind.HEADER_KIND, KeyValueCollector.key_value_iterator, (void*)headers_collector); // Extract query parameters from the connection var query_params = new Catalogue(); var query_collector = new KeyValueCollector(query_params); MHD.get_connection_values(connection, MHD.ValueKind.GET_ARGUMENT_KIND, KeyValueCollector.key_value_iterator, (void*)query_collector); // Extract cookies from the connection var cookies = new Catalogue(); var cookies_collector = new KeyValueCollector(cookies); MHD.get_connection_values(connection, MHD.ValueKind.COOKIE_KIND, KeyValueCollector.key_value_iterator, (void*)cookies_collector); // Get remote address var remote_address = get_remote_address(connection); // Create HttpRequest with all required parameters var http_request = new HttpRequest( url ?? "", method ?? "", version ?? "", headers, context.request_body, query_params, cookies, remote_address ); var http_context = new HttpContext(http_request); context.handler_context = http_context; // Kick off the handler pipeline.run.begin(http_context, (obj, res) => { try { context.handler_result = pipeline.run.end(res); } catch(Error e) { context.handler_error = e; } if(context.handler_finished()) { // Just resume - access_handler will be re-entered and handle the response MHD.resume_connection(connection); this.request_monitor_update(); } }); } // On the second, and all subsequent calls - read the request body: if (upload_data_size[0] != 0) { var data = new uint8[upload_data_size[0]]; Memory.copy(data, upload_data, upload_data_size[0]); context.request_body.write(data); upload_data_size[0] = 0; return Result.YES; } // End of request body data else { context.request_body.complete(); if(context.request_reception_finished()) { return respond(connection, context); } MHD.suspend_connection(connection); return Result.YES; } } public void run() { // External select mode: no USE_SELECT_INTERNALLY flag // This means MHD creates no threads - we drive it from GLib main loop // USE_TURBO enables faster processing by not explicitly closing connections daemon = Daemon.start( MHD.ALLOW_SUSPEND_RESUME | MHD.USE_TURBO, (uint16) this.port, null, (connection, url, method, version, upload_data, upload_data_size, con_cls) => { return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls); }, MHD.OPTION_NOTIFY_COMPLETED, (void*) request_completed_callback, null, MHD.OPTION_END ); if (daemon == null) { error("Failed to start daemon"); } // Start socket monitoring integrated with GLib main loop socket_monitor = new SocketMonitor(daemon); socket_monitor.start(); // Run GLib main loop main_loop.run(); } /// Stop the server and clean up resources public void stop() { if (socket_monitor != null) { socket_monitor.stop(); socket_monitor = null; } if (main_loop.is_running()) { main_loop.quit(); } // MHD_stop_daemon is called by Daemon's destructor when daemon is finalized } internal void request_monitor_update() { if (socket_monitor != null) { socket_monitor.force_update(); } } private Result respond(Connection connection, RequestContext context) { var result = Result.NO; if(context.handler_error != null) { result = handle_error(context.handler_error, connection, context); } else if(context.handler_result != null) { result = send_result(connection, context.handler_result, context); } request_contexts.remove(context); return result; } private Result send_result(Connection connection, HttpResult result, RequestContext? request_context = null) { uint64 size = result.content_length ?? -1; var response_context = new ResponseContext(this, connection, result); response_contexts.add(response_context); // Link the ResponseContext to the RequestContext for disconnect notification if (request_context != null) { request_context.response_context = response_context; } var response = new Response.from_callback( size, 1048576, (cls, pos, buf, max) => { var ctx = (ResponseContext) cls; var bytes_read = ctx.body_output.read_chunk(buf, max); if(bytes_read == 0) { if(!ctx.send_body_finished) { ctx.suspend_connection(); } else if(ctx.send_body_error == null) { ctx.server.response_contexts.remove(ctx); return CONTENT_READER_END_OF_STREAM; } else { printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(ctx.send_body_error.message)\n"); ctx.server.response_contexts.remove(ctx); return CONTENT_READER_END_WITH_ERROR; } } return (ssize_t)bytes_read; }, (void*)response_context, null); result.headers.iterate((kv) => { response.add_header(kv.key, kv.value); }); response_context.begin_response(); var res = MHD.queue_response(connection, result.status, response); if(res != MHD.Result.YES) { printerr("Astralis Internal Error: Unable to queue response\n"); } return res; } private Result handle_error(Error error, Connection connection, RequestContext? request_context = null) { printerr(@"Astralis Internal Server Error: Unhandled Error: $(error.message)\n"); return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR), request_context); } // Static callback wrapper for MHD_OPTION_NOTIFY_COMPLETED private static void request_completed_callback(void* cls, Connection connection, void** con_cls, RequestTerminationCode toe) { // We don't have access to the Server instance here, so we handle cleanup directly if (con_cls[0] == null) { return; } RequestContext context = (RequestContext) ((Object*) con_cls[0]); // If the request was terminated abnormally, disconnect the output stream if (toe != RequestTerminationCode.COMPLETED_OK) { if (context.response_context != null) { context.response_context.body_output.close_connection(); } } } // Simple memory-based InputStream for request body private class MemoryInputStream : InputStream { private uint8[] data; private size_t pos; public MemoryInputStream.from_bytes(uint8[] data) { this.data = data; this.pos = 0; } public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { if (pos >= data.length) { return 0; // EOF } size_t remaining = data.length - pos; size_t to_read = buffer.length < remaining ? buffer.length : remaining; Memory.copy(buffer, data[pos:pos + to_read], to_read); pos += to_read; return (ssize_t)to_read; } public override bool close(Cancellable? cancellable = null) throws IOError { return true; } } // Helper class to collect key-value pairs (headers, query params, cookies) from MHD connection private class KeyValueCollector { private unowned Catalogue catalogue; public KeyValueCollector(Catalogue catalogue) { this.catalogue = catalogue; } public static Result key_value_iterator(void* cls, MHD.ValueKind kind, string key, string? value) { // cls is a pointer to KeyValueCollector instance var collector = (KeyValueCollector*) cls; if (key != null && value != null) { collector->catalogue.add(key, value); } return MHD.Result.YES; } } // Get remote address from connection private string? get_remote_address(Connection connection) { // Get connection info to retrieve client address var info = MHD.get_connection_info(connection, MHD.ConnectionInfoType.CLIENT_ADDRESS); if (info == null) { return null; } // The client_addr field points to a sockaddr structure // We need to cast it properly and extract the IP address var addr_ptr = info.client_addr; if (addr_ptr == null) { return null; } // Create SocketAddress from native sockaddr structure // Use 128 as the maximum size for sockaddr (IPv6) var sock_addr = SocketAddress.from_native((void*)addr_ptr, 128); // Cast to InetSocketAddress to get the address var inet_addr = (InetSocketAddress) sock_addr; // Get the InetAddress and convert to string return inet_addr.address.to_string(); } // Case-insensitive hash function for strings (for HTTP header lookups) private static uint case_insensitive_hash(string key) { return key.down().hash(); } // Case-insensitive equality function for strings (for HTTP header lookups) private static bool case_insensitive_equal(string a, string b) { return a.down() == b.down(); } } }