| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- using MHD;
- using Invercargill;
- using Invercargill.DataStructures;
- namespace Astralis {
- public class Server : Object {
- private Daemon daemon;
- private Pipeline pipeline;
- private int port;
- private HashSet<RequestContext> request_contexts;
- private HashSet<ResponseContext> response_contexts;
- private SocketMonitor? socket_monitor;
- private MainLoop main_loop;
- public Server(int port, Pipeline pipeline) {
- this.port = port;
- this.pipeline = pipeline;
- this.request_contexts = new HashSet<RequestContext>();
- this.response_contexts = new HashSet<ResponseContext>();
- this.socket_monitor = null;
- this.main_loop = new MainLoop();
- }
- private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) {
- // On initial call for request, simply set up the context
- if (con_cls[0] == null) {
- var context = new RequestContext();
- request_contexts.add(context);
- con_cls[0] = (void*) context;
- return Result.YES;
- }
- // Subsequent calls are provided the context
- RequestContext context = (RequestContext) ((Object*) con_cls[0]);
- // On the second call we populate the `HttpRequest` object and begin the handler
- if (context.handler_context == null) {
- // Extract all headers from the connection (case-insensitive keys)
- var headers = new Catalogue<string, string>(case_insensitive_hash, case_insensitive_equal);
- var headers_collector = new KeyValueCollector(headers);
- MHD.get_connection_values(connection, MHD.ValueKind.HEADER_KIND, KeyValueCollector.key_value_iterator, (void*)headers_collector);
-
- // Extract query parameters from the connection
- var query_params = new Catalogue<string, string>();
- var query_collector = new KeyValueCollector(query_params);
- MHD.get_connection_values(connection, MHD.ValueKind.GET_ARGUMENT_KIND, KeyValueCollector.key_value_iterator, (void*)query_collector);
-
- // Extract cookies from the connection
- var cookies = new Catalogue<string, string>();
- var cookies_collector = new KeyValueCollector(cookies);
- MHD.get_connection_values(connection, MHD.ValueKind.COOKIE_KIND, KeyValueCollector.key_value_iterator, (void*)cookies_collector);
-
- // Get remote address
- var remote_address = get_remote_address(connection);
-
- // Create HttpRequest with all required parameters
- var http_request = new HttpRequest(
- url ?? "",
- method ?? "",
- version ?? "",
- headers,
- context.request_body,
- query_params,
- cookies,
- remote_address
- );
- var http_context = new HttpContext(http_request);
- context.handler_context = http_context;
- // Kick off the handler
- pipeline.run.begin(http_context, (obj, res) => {
- try {
- context.handler_result = pipeline.run.end(res);
- }
- catch(Error e) {
- context.handler_error = e;
- }
- if(context.handler_finished()) {
- // Just resume - access_handler will be re-entered and handle the response
- MHD.resume_connection(connection);
- this.request_monitor_update();
- }
- });
- }
- // On the second, and all subsequent calls - read the request body:
- if (upload_data_size[0] != 0) {
- var data = new uint8[upload_data_size[0]];
- Memory.copy(data, upload_data, upload_data_size[0]);
- context.request_body.write(data);
- upload_data_size[0] = 0;
- return Result.YES;
- }
- // End of request body data
- else {
- context.request_body.complete();
- if(context.request_reception_finished()) {
- return respond(connection, context);
- }
- MHD.suspend_connection(connection);
- return Result.YES;
- }
- }
- public void run() {
- // External select mode: no USE_SELECT_INTERNALLY flag
- // This means MHD creates no threads - we drive it from GLib main loop
- // USE_TURBO enables faster processing by not explicitly closing connections
- daemon = Daemon.start(
- MHD.ALLOW_SUSPEND_RESUME | MHD.USE_TURBO,
- (uint16) this.port,
- null,
- (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
- return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
- },
- MHD.OPTION_NOTIFY_COMPLETED,
- (void*) request_completed_callback,
- null,
- MHD.OPTION_END
- );
- if (daemon == null) {
- error("Failed to start daemon");
- }
- // Start socket monitoring integrated with GLib main loop
- socket_monitor = new SocketMonitor(daemon);
- socket_monitor.start();
- // Run GLib main loop
- main_loop.run();
- }
- /// Stop the server and clean up resources
- public void stop() {
- if (socket_monitor != null) {
- socket_monitor.stop();
- socket_monitor = null;
- }
- if (main_loop.is_running()) {
- main_loop.quit();
- }
- // MHD_stop_daemon is called by Daemon's destructor when daemon is finalized
- }
- internal void request_monitor_update() {
- if (socket_monitor != null) {
- socket_monitor.force_update();
- }
- }
- private Result respond(Connection connection, RequestContext context) {
- var result = Result.NO;
- if(context.handler_error != null) {
- result = handle_error(context.handler_error, connection, context);
- }
- else if(context.handler_result != null) {
- result = send_result(connection, context.handler_result, context);
- }
- request_contexts.remove(context);
- return result;
- }
- private Result send_result(Connection connection, HttpResult result, RequestContext? request_context = null) {
- uint64 size = result.content_length ?? -1;
- var response_context = new ResponseContext(this, connection, result);
- response_contexts.add(response_context);
- // Link the ResponseContext to the RequestContext for disconnect notification
- if (request_context != null) {
- request_context.response_context = response_context;
- }
- var response = new Response.from_callback(
- size,
- 1048576,
- (cls, pos, buf, max) => {
- var ctx = (ResponseContext) cls;
- var bytes_read = ctx.body_output.read_chunk(buf, max);
- if(bytes_read == 0) {
- if(!ctx.send_body_finished) {
- ctx.suspend_connection();
- }
- else if(ctx.send_body_error == null) {
- ctx.server.response_contexts.remove(ctx);
- return CONTENT_READER_END_OF_STREAM;
- }
- else {
- printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(ctx.send_body_error.message)\n");
- ctx.server.response_contexts.remove(ctx);
- return CONTENT_READER_END_WITH_ERROR;
- }
- }
- return (ssize_t)bytes_read;
- },
- (void*)response_context,
- null);
- result.headers.iterate((kv) => {
- response.add_header(kv.key, kv.value);
- });
- response_context.begin_response();
- var res = MHD.queue_response(connection, result.status, response);
- if(res != MHD.Result.YES) {
- printerr("Astralis Internal Error: Unable to queue response\n");
- }
- return res;
- }
- private Result handle_error(Error error, Connection connection, RequestContext? request_context = null) {
- printerr(@"Astralis Internal Server Error: Unhandled Error: $(error.message)\n");
- return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR), request_context);
- }
- // Static callback wrapper for MHD_OPTION_NOTIFY_COMPLETED
- private static void request_completed_callback(void* cls, Connection connection, void** con_cls, RequestTerminationCode toe) {
- // We don't have access to the Server instance here, so we handle cleanup directly
- if (con_cls[0] == null) {
- return;
- }
-
- RequestContext context = (RequestContext) ((Object*) con_cls[0]);
-
- // If the request was terminated abnormally, disconnect the output stream
- if (toe != RequestTerminationCode.COMPLETED_OK) {
- if (context.response_context != null) {
- context.response_context.body_output.close_connection();
- }
- }
- }
- // Simple memory-based InputStream for request body
- private class MemoryInputStream : InputStream {
- private uint8[] data;
- private size_t pos;
- public MemoryInputStream.from_bytes(uint8[] data) {
- this.data = data;
- this.pos = 0;
- }
- public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
- if (pos >= data.length) {
- return 0; // EOF
- }
- size_t remaining = data.length - pos;
- size_t to_read = buffer.length < remaining ? buffer.length : remaining;
- Memory.copy(buffer, data[pos:pos + to_read], to_read);
- pos += to_read;
- return (ssize_t)to_read;
- }
- public override bool close(Cancellable? cancellable = null) throws IOError {
- return true;
- }
- }
- // Helper class to collect key-value pairs (headers, query params, cookies) from MHD connection
- private class KeyValueCollector {
- private unowned Catalogue<string, string> catalogue;
- public KeyValueCollector(Catalogue<string, string> catalogue) {
- this.catalogue = catalogue;
- }
- public static Result key_value_iterator(void* cls, MHD.ValueKind kind, string key, string? value) {
- // cls is a pointer to KeyValueCollector instance
- var collector = (KeyValueCollector*) cls;
- if (key != null && value != null) {
- collector->catalogue.add(key, value);
- }
- return MHD.Result.YES;
- }
- }
- // Get remote address from connection
- private string? get_remote_address(Connection connection) {
- // Get connection info to retrieve client address
- var info = MHD.get_connection_info(connection, MHD.ConnectionInfoType.CLIENT_ADDRESS);
- if (info == null) {
- return null;
- }
- // The client_addr field points to a sockaddr structure
- // We need to cast it properly and extract the IP address
- var addr_ptr = info.client_addr;
- if (addr_ptr == null) {
- return null;
- }
- // Create SocketAddress from native sockaddr structure
- // Use 128 as the maximum size for sockaddr (IPv6)
- var sock_addr = SocketAddress.from_native((void*)addr_ptr, 128);
-
- // Cast to InetSocketAddress to get the address
- var inet_addr = (InetSocketAddress) sock_addr;
-
- // Get the InetAddress and convert to string
- return inet_addr.address.to_string();
- }
- // Case-insensitive hash function for strings (for HTTP header lookups)
- private static uint case_insensitive_hash(string key) {
- return key.down().hash();
- }
- // Case-insensitive equality function for strings (for HTTP header lookups)
- private static bool case_insensitive_equal(string a, string b) {
- return a.down() == b.down();
- }
- }
- }
|