Przeglądaj źródła

refactor: Remove explicit mutex locks and enhance asynchronous waiting mechanisms for simplified concurrency.

Billy Barrow 1 tydzień temu
rodzic
commit
bac6041715

+ 2 - 16
src/Endpoints/SseEndpoint.vala

@@ -12,7 +12,6 @@ namespace Astralis {
 
         private AsyncOutput output;
         private bool _is_closed = false;
-        private Mutex close_mutex = Mutex();
 
         /// Signal emitted when the client disconnects.
         public signal void disconnected();
@@ -20,10 +19,7 @@ namespace Astralis {
         /// Indicates whether this stream has been closed.
         public bool is_closed { 
             get {
-                close_mutex.lock();
-                var result = _is_closed;
-                close_mutex.unlock();
-                return result;
+                return _is_closed;
             }
         }
 
@@ -53,10 +49,8 @@ namespace Astralis {
 
         /// Close this stream. After closing, no more data can be sent.
         public void close() {
-            close_mutex.lock();
             var was_closed = _is_closed;
             _is_closed = true;
-            close_mutex.unlock();
             
             if (!was_closed) {
                 disconnected();
@@ -218,7 +212,6 @@ namespace Astralis {
     public abstract class SseEndpoint : Object, Endpoint {
 
         private Series<SseStream> open_streams = new Series<SseStream>();
-        private Mutex streams_mutex = Mutex();
 
         /// The retry interval in milliseconds sent to clients for reconnection.
         /// 
@@ -252,10 +245,7 @@ namespace Astralis {
         ///
         /// @return An immutable collection of open streams
         protected ImmutableLot<SseStream> get_open_streams() {
-            streams_mutex.lock();
-            var result = open_streams.where(s => !s.is_closed).to_immutable_buffer();
-            streams_mutex.unlock();
-            return result;
+            return open_streams.where(s => !s.is_closed).to_immutable_buffer();
         }
 
         /// Send an event to all currently open streams.
@@ -276,15 +266,11 @@ namespace Astralis {
         }
 
         internal void register_stream(SseStream stream) {
-            streams_mutex.lock();
             open_streams.add(stream);
-            streams_mutex.unlock();
         }
 
         internal void unregister_stream(SseStream stream) {
-            streams_mutex.lock();
             open_streams.remove(stream);
-            streams_mutex.unlock();
         }
 
         internal void notify_new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {

+ 26 - 35
src/Markup/MarkupTemplate.vala

@@ -14,7 +14,6 @@ namespace Astralis {
     public abstract class MarkupTemplate : GLib.Object {
         private Html.Doc* cached_doc = null;
         private bool loaded = false;
-        private GLib.Mutex mutex = GLib.Mutex();
         
         /// <summary>
         /// Returns the HTML markup string for this template.
@@ -45,41 +44,35 @@ namespace Astralis {
         
         /// <summary>
         /// Parses the template markup if not already loaded.
-        /// Thread-safe: uses mutex to prevent double-parsing.
         /// </summary>
         private void ensure_loaded() throws GLib.Error {
-            mutex.lock();
-            try {
-                if (loaded) {
-                    return;
-                }
-                
-                string html = markup;
-                
-                int options = (int)(Html.ParserOption.RECOVER |
-                    Html.ParserOption.NOERROR |
-                    Html.ParserOption.NOWARNING |
-                    Html.ParserOption.NOBLANKS |
-                    Html.ParserOption.NONET);
-                
-                char[] buffer = html.to_utf8();
-                cached_doc = Html.Doc.read_memory(buffer, buffer.length, "", "utf-8", options);
-                
-                if (cached_doc == null) {
-                    // Try parsing as a fragment wrapped in a basic structure
-                    string wrapped = "<!DOCTYPE html><html><head><meta charset=\"UTF-8\"/></head><body>%s</body></html>".printf(html);
-                    char[] wrapped_buffer = wrapped.to_utf8();
-                    cached_doc = Html.Doc.read_memory(wrapped_buffer, wrapped_buffer.length, "", "utf-8", options);
-                }
-                
-                if (cached_doc == null) {
-                    throw new MarkupError.PARSE_ERROR("Failed to parse template markup");
-                }
-                
-                loaded = true;
-            } finally {
-                mutex.unlock();
+            if (loaded) {
+                return;
             }
+            
+            string html = markup;
+            
+            int options = (int)(Html.ParserOption.RECOVER |
+                Html.ParserOption.NOERROR |
+                Html.ParserOption.NOWARNING |
+                Html.ParserOption.NOBLANKS |
+                Html.ParserOption.NONET);
+            
+            char[] buffer = html.to_utf8();
+            cached_doc = Html.Doc.read_memory(buffer, buffer.length, "", "utf-8", options);
+            
+            if (cached_doc == null) {
+                // Try parsing as a fragment wrapped in a basic structure
+                string wrapped = "<!DOCTYPE html><html><head><meta charset=\"UTF-8\"/></head><body>%s</body></html>".printf(html);
+                char[] wrapped_buffer = wrapped.to_utf8();
+                cached_doc = Html.Doc.read_memory(wrapped_buffer, wrapped_buffer.length, "", "utf-8", options);
+            }
+            
+            if (cached_doc == null) {
+                throw new MarkupError.PARSE_ERROR("Failed to parse template markup");
+            }
+            
+            loaded = true;
         }
         
         /// <summary>
@@ -130,13 +123,11 @@ namespace Astralis {
         /// Useful if the template source may have changed.
         /// </summary>
         public void invalidate() {
-            mutex.lock();
             if (cached_doc != null) {
                 delete cached_doc;
                 cached_doc = null;
             }
             loaded = false;
-            mutex.unlock();
         }
         
         ~MarkupTemplate() {

+ 2 - 10
src/Server/RequestContext.vala

@@ -10,29 +10,21 @@ namespace Astralis {
         public bool handler_started { get; set; }
         public ResponseContext? response_context { get; set; }
 
-        private Mutex request_lock;
         private bool request_fully_received;
         private bool handler_finished_execution;
 
         public RequestContext() {
             request_body = new ServerInput();
-            request_lock = Mutex();
         }
 
         public bool handler_finished() {
-            request_lock.lock();
             handler_finished_execution = true;
-            var result = request_fully_received;
-            request_lock.unlock();
-            return result;
+            return request_fully_received;
         }
 
         public bool request_reception_finished() {
-            request_lock.lock();
             request_fully_received = true;
-            var result = handler_finished_execution;
-            request_lock.unlock();
-            return result;
+            return handler_finished_execution;
         }
 
     }

+ 16 - 2
src/Server/ServerInput.vala

@@ -7,6 +7,8 @@ namespace Astralis {
         private bool write_occurred;
         private Series<ByteBuffer> chunks = new Series<ByteBuffer>();
         private bool writes_complete;
+        private delegate void WaitHandler();
+        private signal void on_data_or_complete();
 
         public BinaryData? peek() {
             return chunks.first_or_default();
@@ -14,7 +16,12 @@ namespace Astralis {
 
         public async BinaryData? read() throws Error {
             while (chunks.length == 0 && !writes_complete) {
-                Idle.add(read.callback);
+                WaitHandler handler = null;
+                handler = () => {
+                    on_data_or_complete.disconnect(handler);
+                    Idle.add(read.callback);
+                };
+                on_data_or_complete.connect(handler);
                 yield;
             }
             if(chunks.length == 0) {
@@ -25,7 +32,12 @@ namespace Astralis {
 
         public async BinaryData read_all() {
             while (!writes_complete) {
-                Idle.add(read_all.callback);
+                WaitHandler handler = null;
+                handler = () => {
+                    on_data_or_complete.disconnect(handler);
+                    Idle.add(read_all.callback);
+                };
+                on_data_or_complete.connect(handler);
                 yield;
             }
             var data = new ByteComposition();
@@ -37,10 +49,12 @@ namespace Astralis {
         internal void write(uint8[] data) {
             chunks.add(new ByteBuffer.from_byte_array(data));
             write_occurred = true;
+            on_data_or_complete();
         }
 
         internal void complete() {
             writes_complete = true;
+            on_data_or_complete();
         }
 
     }

+ 3 - 22
src/Server/ServerOutput.vala

@@ -11,15 +11,11 @@ namespace Astralis {
         internal signal void on_chunk_poped();
         private delegate void Handler();
         private bool _connected = true;
-        private Mutex chunks_lock = Mutex();
 
         public bool connected { get { return _connected; } }
         public bool write_would_block { 
             get { 
-                chunks_lock.lock();
-                var result = chunks.length > MAX_CHUNKS;
-                chunks_lock.unlock();
-                return result;
+                return chunks.length > MAX_CHUNKS;
             } 
         }
 
@@ -31,12 +27,8 @@ namespace Astralis {
             if (!_connected) {
                 throw new IOError.CLOSED("Cannot write to disconnected client");
             }
-            bool should_wait = false;
-            chunks_lock.lock();
-            should_wait = chunks.length > MAX_CHUNKS;
-            chunks_lock.unlock();
             
-            while(should_wait) {
+            while(chunks.length > MAX_CHUNKS) {
                 Handler pop_handler = null;
                 pop_handler = () => {
                     on_chunk_poped.disconnect(pop_handler);
@@ -44,37 +36,26 @@ namespace Astralis {
                 };
                 on_chunk_poped.connect(pop_handler);
                 yield;
-                chunks_lock.lock();
-                should_wait = chunks.length > MAX_CHUNKS;
-                chunks_lock.unlock();
             }
+
             var buffer = data.to_byte_buffer();
             if(buffer.length > 0) {
-                chunks_lock.lock();
                 chunks.add(buffer);
-                chunks_lock.unlock();
                 on_new_chunk();
             }
         }
 
         internal size_t read_chunk(void* buffer, size_t max_size) {
-            chunks_lock.lock();
-            
             if(current_chunk != null && current_chunk.peek_count() == 0) {
                 current_chunk = null;
             }
             if(current_chunk == null && chunks.length == 0) {
-                chunks_lock.unlock();
                 return 0;
             }
             if(current_chunk == null) {
                 current_chunk = chunks.pop_start ();
-                chunks_lock.unlock();
                 on_chunk_poped();
             }
-            else {
-                chunks_lock.unlock();
-            }
             
             var size = current_chunk.write_to (buffer, max_size);
             current_chunk = current_chunk.skip((uint)size);