Sfoglia il codice sorgente

WIP result refactor

Billy Barrow 4 settimane fa
parent
commit
ee45d0a604

+ 46 - 2
src/Core/Response.vala

@@ -4,17 +4,61 @@ using Invercargill.DataStructures;
 namespace Astralis {
 
     public abstract class HttpResult : Object {
-        public Catalogue<string, string> headers { get; private set; }
+        public Dictionary<string, string> headers { get; private set; }
         public StatusCode status { get; set; }
         public uint64? content_length { get; set; }
 
         protected HttpResult(StatusCode status, uint64? content_length = null) {
-            headers = new Catalogue<string, string>();
+            headers = new Dictionary<string, string>();
             this.status = status;
             this.content_length = content_length;
+            if(content_length != null) {
+                headers["Content-Length"] = content_length.to_string();
+            }
+        }
+
+        public HttpResult set_header(string header, string value) {
+            headers[header] = value;
+            return this;
         }
 
         public abstract async void send_body(AsyncOutput output) throws Error;
     }
 
+    public class HttpDataResult : HttpResult {
+
+        private ByteBuffer bytes;
+
+        public HttpDataResult(BinaryData data, StatusCode status = StatusCode.OK) {
+            bytes = data.to_byte_buffer();
+            base(status, bytes.length);
+        }
+
+        public async override void send_body(AsyncOutput output) {
+            yield output.write(bytes);
+        }
+    }
+
+    public class HttpStringResult : HttpDataResult {
+        public HttpStringResult(string str, StatusCode status = StatusCode.OK) {
+            base(new ByteBuffer.from_byte_array(str.data), status);
+            set_header("Content-Type", "text/plain");
+        }
+    }
+
+    public class HttpStreamResult : HttpResult {
+
+        private OutputStream stream;
+
+        public HttpStreamResult(OutputStream stream, uint64? content_length = null, StatusCode status = StatusCode.OK) {
+            this.stream = stream;
+            base(status, content_length);
+        }
+
+        public async override void send_body(AsyncOutput output) {
+            yield output.write_stream(stream);
+        }
+
+    }
+
 }

+ 9 - 26
src/Handlers/Router.vala

@@ -32,6 +32,11 @@ namespace Astralis {
         /// </summary>
         public RouteErrorHandler error_handler { get; set; }
 
+        public Router() {
+            not_found_handler = new DefaultNotFoundHandler();
+            error_handler = new DefaultErrorHandler();
+        }
+
         /// <summary>
         /// Registers a route handler for the specified path pattern.
         /// 
@@ -114,18 +119,8 @@ namespace Astralis {
                 }
             }
 
-            // No route matched - use not found handler or return default 404
-            if (not_found_handler != null) {
-                return yield not_found_handler.handle_request(context);
-            }
-
-            var headers = new Catalogue<string, string>();
-            headers.add("Content-Type", "text/plain");
-            return new BufferedHttpResult.from_string(
-                "Not Found",
-                StatusCode.NOT_FOUND,
-                headers
-            );
+            // No route matched - use not found handler
+            return yield not_found_handler.handle_request(context);
         }
     }
 
@@ -541,13 +536,7 @@ namespace Astralis {
     public class DefaultNotFoundHandler : RequestHandler, Object {
         
         public async HttpResult handle_request(HttpContext http_context) throws Error {
-            var headers = new Catalogue<string, string>();
-            headers.add("Content-Type", "text/plain");
-            return new BufferedHttpResult.from_string(
-                "404 Not Found",
-                StatusCode.NOT_FOUND,
-                headers
-            );
+            return new HttpStringResult("Not Found", StatusCode.NOT_FOUND);
         }
     }
 
@@ -567,13 +556,7 @@ namespace Astralis {
                 ? @"Internal Server Error: $(error.message)"
                 : "Internal Server Error";
             
-            var headers = new Catalogue<string, string>();
-            headers.add("Content-Type", "text/plain");
-            return new BufferedHttpResult.from_string(
-                message,
-                StatusCode.INTERNAL_SERVER_ERROR,
-                headers
-            );
+            return new HttpStringResult(message, StatusCode.INTERNAL_SERVER_ERROR);
         }
     }
 }

+ 52 - 0
src/Server/ResponseContext.vala

@@ -0,0 +1,52 @@
+using MHD;
+namespace Astralis {
+
+    internal class ResponseContext {
+
+        public ResponseContext(Connection connection, HttpResult result) {
+            body_output = new ServerOutput ();
+            body_output.on_new_chunk.connect(new_data_available);
+            this.result = result;
+            this.connection = connection;
+        }
+
+        public void begin_response() {
+            result.send_body.begin(body_output, (obj, context) => {
+                try {
+                    result.send_body.end(context);
+                }
+                catch (Error e) {
+                    send_body_error = e;
+                }
+                finally {
+                    send_body_finished = true;
+                }
+            });
+        }
+
+        public void suspend_connection() {
+            lock(connection_suspended) {
+                MHD.suspend_connection(connection);
+                connection_suspended = true;
+            }
+        }
+
+        private void new_data_available() {
+            lock(connection_suspended) {
+                if(connection_suspended) {
+                    MHD.suspend_connection(connection);
+                    connection_suspended = false;
+                }
+            }
+        }
+
+        public HttpResult result { get; set; }
+        public ServerOutput body_output { get; set; }
+        public bool send_body_finished { get; set; }
+        public bool connection_suspended { get; set; }
+        public Connection connection { get; set; }
+        public Error? send_body_error { get; set; }
+
+    }
+
+}

+ 34 - 22
src/Server/Server.vala

@@ -9,11 +9,13 @@ namespace Astralis {
         private RequestHandler handler;
         private int port;
         private HashSet<RequestContext> request_contexts;
+        private HashSet<ResponseContext> response_contexts;
 
         public Server(int port, RequestHandler handler) {
             this.port = port;
             this.handler = handler;
             this.request_contexts = new HashSet<RequestContext>();
+            this.response_contexts = new HashSet<ResponseContext>();
         }
 
         private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) {
@@ -128,30 +130,40 @@ namespace Astralis {
             return result;
         }
 
+
         private Result send_result(Connection connection, HttpResult result) {
-            Response response;
-
-            if(result.get_type().is_a(typeof(BufferedHttpResult))) {
-                var buffered = (BufferedHttpResult)result;
-                response = new Response.from_buffer(
-                    buffered.buffer.length,
-                    buffered.buffer, 
-                    ResponseMemoryMode.RESPMEM_MUST_COPY
-                );
-            }
-            else {
-                response = new Response.from_buffer(
-                    0,
-                    new uint8[0], 
-                    ResponseMemoryMode.RESPMEM_PERSISTENT
-                );
-            }
+            uint64 size = result.content_length ?? -1;
+            var response_context = new ResponseContext(connection, result);
+            response_contexts.add(response_context);
+
+            var response = new Response.from_callback(
+                size,
+                1048576,
+                (cls, pos, buf, max) => {
+                    var context = (ResponseContext) ((Object*) cls[0]);
+                    var bytes_read = context.body_output.read_chunk(buf, max);
+                    if(bytes_read == 0) {
+                        if(!context.send_body_finished) {
+                            context.suspend_connection();
+                        }
+                        else if(context.send_body_error == null) {
+                            response_contexts.remove(context);
+                            return CONTENT_READER_END_OF_STREAM;
+                        }
+                        else {
+                            printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(context.send_body_error.message)\n");
+                            response_contexts.remove(context);
+                            return CONTENT_READER_END_WITH_ERROR;
+                        }
+                    }
+                    return (ssize_t)bytes_read;
+                },
+                &response_context,
+                null);
 
 
-            result.headers.to_immutable_buffer().iterate((grouping) => {
-                grouping.iterate((value) => {
-                    response.add_header(grouping.key, value);
-                });
+            result.headers.iterate((kv) => {
+                response.add_header(kv.key, kv.value);
             });
 
             var res = MHD.queue_response(connection, result.status, response);
@@ -163,7 +175,7 @@ namespace Astralis {
 
         private Result handle_error(Error error, Connection connection) {
             printerr(@"Astralis Internal Server Error: Unhandled Error: $(error.message)\n");
-            return send_result(connection, new BufferedHttpResult.from_string("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR));
+            return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR));
         }
 
         // Simple memory-based InputStream for request body

+ 3 - 1
src/Server/ServerOutput.vala

@@ -8,6 +8,7 @@ namespace Astralis {
         private BinaryData current_chunk = null;
         private Series<ByteBuffer> chunks = new Series<ByteBuffer>();
         private bool writes_complete;
+        internal signal void on_new_chunk();
 
         public async void write (Invercargill.BinaryData data) {
             while(chunks.length > MAX_CHUNKS) {
@@ -17,6 +18,7 @@ namespace Astralis {
             var buffer = data.to_byte_buffer();
             if(buffer.length > 0) {
                 chunks.add(buffer);
+                on_new_chunk();
             }
         }
 
@@ -24,7 +26,7 @@ namespace Astralis {
             assert_not_reached ();
         }
 
-        private size_t read_chunk(void* buffer, size_t max_size) {
+        internal size_t read_chunk(void* buffer, size_t max_size) {
             if(current_chunk == null && chunks.length == 0) {
                 return 0;
             }

+ 1 - 0
src/meson.build

@@ -9,6 +9,7 @@ sources = files(
     'Handlers/Router.vala',
     'Server/Server.vala',
     'Server/RequestContext.vala',
+    'Server/ResponseContext.vala',
     'Server/ServerInput.vala',
     'Server/ServerOutput.vala',
 )

+ 15 - 1
vapi/libmicrohttpd.vapi

@@ -27,6 +27,11 @@ namespace MHD {
         RESPMEM_MUST_COPY
     }
 
+    [CCode (cname = "MHD_CONTENT_READER_END_OF_STREAM")]
+    public const ssize_t CONTENT_READER_END_OF_STREAM;
+    [CCode (cname = "MHD_CONTENT_READER_END_WITH_ERROR")]
+    public const ssize_t CONTENT_READER_END_WITH_ERROR;
+
     [CCode (cname = "MHD_USE_SELECT_INTERNALLY")]
     public const uint USE_SELECT_INTERNALLY;
     [CCode (cname = "MHD_USE_THREAD_PER_CONNECTION")]
@@ -54,7 +59,10 @@ namespace MHD {
     public class Response {
         [CCode (cname = "MHD_create_response_from_buffer")]
         public Response.from_buffer (size_t size, [CCode(array_length=false)] uint8[] buffer, ResponseMemoryMode mode);
-        
+
+        [CCode (cname = "MHD_create_response_from_callback")]
+        public Response.from_callback (uint64 size, size_t block_size, ContentReaderCallback crc, void* crc_cls, ContentReaderFreeCallback? crfc);
+
         [CCode (cname = "MHD_add_response_header")]
         public int add_header (string header, string content);
     }
@@ -77,6 +85,12 @@ namespace MHD {
         string? url, string? method, string? version, 
         string? upload_data, [CCode(array_length=false)] size_t* upload_data_size, 
         void** con_cls);
+
+    [CCode (instance_pos = 0)]
+    public delegate ssize_t ContentReaderCallback (void* cls, uint64 pos, char* buf, size_t max);
+
+    [CCode (has_target = false, cname = "MHD_ContentReaderFreeCallback")]
+    public delegate void ContentReaderFreeCallback (void* cls);
     
     [CCode (cname = "MHD_queue_response")]
     public int queue_response (Connection connection, uint status_code, Response response);