Ver Fonte

Refactor AsyncPipe

Billy Barrow há 4 semanas atrás
pai
commit
8ae9bc07dd

+ 13 - 0
src/Core/AsyncInput.vala

@@ -0,0 +1,13 @@
+using Invercargill;
+
+namespace Astralis {
+
+    public interface AsyncInput : Object {
+
+        public abstract BinaryData? peek();
+        public abstract async BinaryData? read();
+        public abstract async BinaryData read_all();
+
+    }
+
+}

+ 12 - 0
src/Core/AsyncOutput.vala

@@ -0,0 +1,12 @@
+using Invercargill;
+
+namespace Astralis {
+
+    public interface AsyncOutput : Object {
+
+        public abstract async void write(BinaryData data);
+        public abstract async void write_stream(OutputStream stream);
+
+    }
+
+}

+ 2 - 2
src/Core/Context.vala

@@ -20,7 +20,7 @@ namespace Astralis {
         public Catalogue<string, string> cookies { get; private set; }
 
         // Request body
-        public AsyncPipe request_body { get; private set; }
+        public AsyncInput request_body { get; private set; }
 
         // Content information
         public string content_type { get; private set; }
@@ -37,7 +37,7 @@ namespace Astralis {
             string method,
             string version,
             Catalogue<string, string> headers,
-            AsyncPipe request_body,
+            AsyncInput request_body,
             Catalogue<string, string> query,
             Catalogue<string, string> cookies,
             string? remote_address

+ 3 - 3
src/Data/FormDataParser.vala

@@ -107,7 +107,7 @@ namespace Astralis {
 
         /// Parse form data from an AsyncPipe, auto-detecting content type
         /// content_type_header should be the full Content-Type header value
-        public static async FormData parse(AsyncPipe pipe, string content_type_header) throws Error {
+        public static async FormData parse(AsyncInput pipe, string content_type_header) throws Error {
             if (content_type_header.contains("multipart/form-data")) {
                 string boundary = extract_boundary(content_type_header);
                 return yield parse_multipart(pipe, boundary);
@@ -119,7 +119,7 @@ namespace Astralis {
         }
 
         /// Parse application/x-www-form-urlencoded data from AsyncPipe
-        public static async FormData parse_urlencoded(AsyncPipe pipe) throws Error {
+        public static async FormData parse_urlencoded(AsyncInput pipe) throws Error {
             var body = yield pipe.read_all();
             var body_str = body.to_raw_string();
 
@@ -161,7 +161,7 @@ namespace Astralis {
         }
 
         /// Parse multipart/form-data from AsyncPipe
-        public static async FormData parse_multipart(AsyncPipe pipe, string boundary) throws Error {
+        public static async FormData parse_multipart(AsyncInput pipe, string boundary) throws Error {
             var body = yield pipe.read_all();
             
             // Extract bytes from BinaryData using write_to

+ 2 - 2
src/Server/RequestContext.vala

@@ -4,7 +4,7 @@ namespace Astralis {
     internal class RequestContext {
 
         public HttpContext? handler_context { get; set; }
-        public AsyncPipe.AsyncPipeWriter request_body_pipe { get; set; }
+        public ServerInput request_body { get; set; }
         public HttpResult? handler_result { get; set; }
         public Error? handler_error { get; set; }
         public bool handler_started { get; set; }
@@ -14,7 +14,7 @@ namespace Astralis {
         private bool handler_finished_execution;
 
         public RequestContext() {
-            request_body_pipe = AsyncPipe.new_writer();
+            request_body = new ServerInput();
             request_lock = Mutex();
         }
 

+ 3 - 3
src/Server/Server.vala

@@ -55,7 +55,7 @@ namespace Astralis {
                     method ?? "",
                     version ?? "",
                     headers,
-                    context.request_body_pipe.pipe,
+                    context.request_body,
                     query_params,
                     cookies,
                     remote_address
@@ -82,13 +82,13 @@ namespace Astralis {
             if (upload_data_size[0] != 0) {
                 var data = new uint8[upload_data_size[0]];
                 Memory.copy(data, upload_data, upload_data_size[0]);
-                context.request_body_pipe.write(data);
+                context.request_body.write(data);
                 upload_data_size[0] = 0;
                 return Result.YES;
             }
             // End of request body data
             else {
-                context.request_body_pipe.complete();
+                context.request_body.complete();
                 if(context.request_reception_finished()) {
                     return respond(connection, context);
                 }

+ 5 - 22
src/Core/AsyncPipe.vala → src/Server/ServerInput.vala

@@ -2,13 +2,13 @@ using Invercargill;
 using Invercargill.DataStructures;
 namespace Astralis {
 
-    public class AsyncPipe : Object {
+    internal class ServerInput : Object, AsyncInput {
 
         private bool write_occurred;
         private Series<ByteBuffer> chunks = new Series<ByteBuffer>();
         private bool writes_complete;
 
-        public ByteBuffer? peek() {
+        public BinaryData? peek() {
             return chunks.first_or_default();
         }
 
@@ -34,30 +34,13 @@ namespace Astralis {
             return data;
         }
 
-        private void write(uint8[] data) {
+        internal void write(uint8[] data) {
             chunks.add(new ByteBuffer.from_byte_array(data));
             write_occurred = true;
         }
 
-        public static AsyncPipeWriter new_writer() {
-            return new AsyncPipeWriter(new AsyncPipe());
-        }
-
-        public class AsyncPipeWriter {
-
-            public AsyncPipeWriter(AsyncPipe pipe) {
-                this.pipe = pipe;
-            }
-
-            public AsyncPipe pipe { get; private set; }
-            public void write(uint8[] data) {
-                pipe.write(data);
-            }
-
-            public void complete() {
-                pipe.writes_complete = true;
-            }
-
+        internal void complete() {
+            writes_complete = true;
         }
 
     }

+ 41 - 0
src/Server/ServerOutput.vala

@@ -0,0 +1,41 @@
+using Invercargill;
+using Invercargill.DataStructures;
+namespace Astralis {
+
+    internal class ServerOutput : Object, AsyncOutput {
+
+        const int MAX_CHUNKS = 5;
+        private BinaryData current_chunk = null;
+        private Series<ByteBuffer> chunks = new Series<ByteBuffer>();
+        private bool writes_complete;
+
+        public async void write (Invercargill.BinaryData data) {
+            while(chunks.length > MAX_CHUNKS) {
+                Idle.add(write.callback);
+                yield;
+            }
+            var buffer = data.to_byte_buffer();
+            if(buffer.length > 0) {
+                chunks.add(buffer);
+            }
+        }
+
+        public async void write_stream (GLib.OutputStream stream) {
+            assert_not_reached ();
+        }
+
+        private size_t read_chunk(void* buffer, size_t max_size) {
+            if(current_chunk == null && chunks.length == 0) {
+                return 0;
+            }
+            if(current_chunk == null) {
+                current_chunk = chunks.pop_start ();
+            }
+            var size = current_chunk.write_to (buffer, max_size);
+            current_chunk = current_chunk.skip((uint)max_size);
+            return size;
+        }
+
+    }
+
+}

+ 4 - 1
src/meson.build

@@ -3,11 +3,14 @@ sources = files(
     'Core/HttpValues.vala',
     'Core/RequestHandler.vala',
     'Core/Response.vala',
-    'Core/AsyncPipe.vala',
+    'Core/AsyncInput.vala',
+    'Core/AsyncOutput.vala',
     'Data/FormDataParser.vala',
     'Handlers/Router.vala',
     'Server/Server.vala',
     'Server/RequestContext.vala',
+    'Server/ServerInput.vala',
+    'Server/ServerOutput.vala',
 )
 
 libastralis = shared_library('astralis',