Kaynağa Gözat

fix(server): resolve connection hang caused by race conditions

Add thread-safe mutex locking for ServerOutput chunks collection to
prevent concurrent access issues. Implement pending_resume flag in
ResponseContext to handle the race window between deciding to suspend
and new data arriving. Add unique IDs to RequestContext, ResponseContext,
and ServerOutput for better request tracing. Add comprehensive debug
logging throughout the request lifecycle to aid in troubleshooting
connection issues.
Billy Barrow 1 hafta önce
ebeveyn
işleme
50148d56e5

+ 4 - 0
src/Server/RequestContext.vala

@@ -3,11 +3,13 @@ namespace Astralis {
 
     internal class RequestContext {
 
+        public int id { get; set; }
         public HttpContext? handler_context { get; set; }
         public ServerInput request_body { get; set; }
         public HttpResult? handler_result { get; set; }
         public Error? handler_error { get; set; }
         public bool handler_started { get; set; }
+        public bool response_sent { get; set; }
         public ResponseContext? response_context { get; set; }
 
         private Mutex request_lock;
@@ -24,6 +26,7 @@ namespace Astralis {
             handler_finished_execution = true;
             var result = request_fully_received;
             request_lock.unlock();
+            debug(@"[REQ-$(id)] handler_finished()=$(result) (request_fully_received=$(request_fully_received))");
             return result;
         }
 
@@ -32,6 +35,7 @@ namespace Astralis {
             request_fully_received = true;
             var result = handler_finished_execution;
             request_lock.unlock();
+            debug(@"[REQ-$(id)] request_reception_finished()=$(result) (handler_finished_execution=$(handler_finished_execution))");
             return result;
         }
 

+ 24 - 0
src/Server/ResponseContext.vala

@@ -2,41 +2,64 @@ using MHD;
 namespace Astralis {
 
     internal class ResponseContext {
+        private static int next_id = 0;
+        public int id { get; private set; }
 
         public ResponseContext(Server server, Connection connection, HttpResult result) {
+            this.id = AtomicInt.add(ref next_id, 1);
             this.server = server;
             body_output = new ServerOutput ();
             body_output.on_new_chunk.connect(new_data_available);
             this.result = result;
             this.connection = connection;
+            debug(@"[RC-$(id)] Created for $(result.status)");
         }
 
         public void begin_response() {
+            debug(@"[RC-$(id)] begin_response() starting");
             result.send_body.begin(body_output, (obj, context) => {
                 try {
                     result.send_body.end(context);
+                    debug(@"[RC-$(id)] send_body completed successfully");
                 }
                 catch (Error e) {
                     send_body_error = e;
+                    debug(@"[RC-$(id)] send_body error: $(e.message)");
                 }
                 finally {
                     send_body_finished = true;
+                    debug(@"[RC-$(id)] send_body_finished = true");
                 }
             });
         }
 
         public void suspend_connection() {
+            debug(@"[RC-$(id)] suspend_connection() called");
             lock(connection_suspended) {
+                // Check if data arrived while we were deciding to suspend
+                if (pending_resume) {
+                    // Data arrived in the race window - don't suspend
+                    pending_resume = false;
+                    debug(@"[RC-$(id)] suspend_connection() ABORTED - pending_resume was true");
+                    return;
+                }
                 MHD.suspend_connection(connection);
                 connection_suspended = true;
+                debug(@"[RC-$(id)] Connection SUSPENDED");
             }
         }
 
         private void new_data_available() {
+            debug(@"[RC-$(id)] new_data_available() called");
             lock(connection_suspended) {
                 if(connection_suspended) {
+                    debug(@"[RC-$(id)] new_data_available() - RESUMING (was suspended)");
                     MHD.resume_connection(connection);
                     connection_suspended = false;
+                } else {
+                    // Mark that we have data, preventing suspend in race window
+                    pending_resume = true;
+                    debug(@"[RC-$(id)] new_data_available() - setting pending_resume=true (not yet suspended)");
                 }
             }
         }
@@ -45,6 +68,7 @@ namespace Astralis {
         public ServerOutput body_output { get; set; }
         public bool send_body_finished { get; set; }
         public bool connection_suspended { get; set; }
+        private bool pending_resume { get; set; }
         public Connection connection { get; set; }
         public Error? send_body_error { get; set; }
         public Server server { get; set; }

+ 35 - 9
src/Server/Server.vala

@@ -18,13 +18,17 @@ namespace Astralis {
             this.response_contexts = new HashSet<ResponseContext>();
         }
 
+        private static int request_counter = 0;
+        
         private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) {
 
             // On initial call for request, simply set up the context
             if (con_cls[0] == null) {
                 var context = new RequestContext();
+                context.id = AtomicInt.add(ref request_counter, 1);
                 request_contexts.add(context);
                 con_cls[0] = (void*) context;
+                debug(@"[REQ-$(context.id)] access_handler() FIRST CALL - url=$(url) method=$(method)");
                 return Result.YES;
             }
 
@@ -33,6 +37,8 @@ namespace Astralis {
 
             // On the second call we populate the `HttpRequest` object and begin the handler
             if (context.handler_context == null) {
+                debug(@"[REQ-$(context.id)] access_handler() SECOND CALL - starting handler");
+                
                 // Extract all headers from the connection (case-insensitive keys)
                 var headers = new Catalogue<string, string>(case_insensitive_hash, case_insensitive_equal);
                 var headers_collector = new KeyValueCollector(headers);
@@ -67,14 +73,18 @@ namespace Astralis {
 
                 // Kick off the handler
                 pipeline.run.begin(http_context, (obj, res) => {
+                    debug(@"[REQ-$(context.id)] handler callback fired");
                     try {
                         context.handler_result = pipeline.run.end(res);
+                        debug(@"[REQ-$(context.id)] handler completed successfully");
                     }
                     catch(Error e) {
                         context.handler_error = e;
+                        debug(@"[REQ-$(context.id)] handler error: $(e.message)");
                     }
                     if(context.handler_finished()) {
-                        respond(connection, context);
+                        debug(@"[REQ-$(context.id)] handler_finished()=true, resuming connection");
+                        // Just resume - access_handler will be re-entered and handle the response
                         MHD.resume_connection(connection);
                     }
                 });
@@ -82,6 +92,7 @@ namespace Astralis {
 
             // On the second, and all subsequent calls - read the request body:
             if (upload_data_size[0] != 0) {
+                debug(@"[REQ-$(context.id)] access_handler() receiving $(upload_data_size[0]) bytes body data");
                 var data = new uint8[upload_data_size[0]];
                 Memory.copy(data, upload_data, upload_data_size[0]);
                 context.request_body.write(data);
@@ -90,10 +101,13 @@ namespace Astralis {
             }
             // End of request body data
             else {
+                debug(@"[REQ-$(context.id)] access_handler() no more body data");
                 context.request_body.complete();
                 if(context.request_reception_finished()) {
+                    debug(@"[REQ-$(context.id)] request_reception_finished()=true, calling respond()");
                     return respond(connection, context);
                 }
+                debug(@"[REQ-$(context.id)] SUSPENDING connection (waiting for handler)");
                 MHD.suspend_connection(connection);
                 return Result.YES;
             }
@@ -122,14 +136,18 @@ namespace Astralis {
         }
 
         private Result respond(Connection connection, RequestContext context) {
+            debug(@"[REQ-$(context.id)] respond() called");
             var result = Result.NO;
             if(context.handler_error != null) {
+                debug(@"[REQ-$(context.id)] respond() handling error");
                 result = handle_error(context.handler_error, connection, context);
             }
             else if(context.handler_result != null) {
+                debug(@"[REQ-$(context.id)] respond() sending result");
                 result = send_result(connection, context.handler_result, context);
             }
             request_contexts.remove(context);
+            debug(@"[REQ-$(context.id)] respond() returning %d", (int)result);
             return result;
         }
 
@@ -138,6 +156,9 @@ namespace Astralis {
             uint64 size = result.content_length ?? -1;
             var response_context = new ResponseContext(this, connection, result);
             response_contexts.add(response_context);
+            
+            int req_id = request_context != null ? request_context.id : -1;
+            debug(@"[REQ-$(req_id)] send_result() creating RC-$(response_context.id), content_length=$(size)");
 
             // Link the ResponseContext to the RequestContext for disconnect notification
             if (request_context != null) {
@@ -148,19 +169,23 @@ namespace Astralis {
                 size,
                 1048576,
                 (cls, pos, buf, max) => {
-                    var context = (ResponseContext) cls;
-                    var bytes_read = context.body_output.read_chunk(buf, max);
+                    var ctx = (ResponseContext) cls;
+                    debug(@"[RC-$(ctx.id)] content_reader_callback called, max=$(max)");
+                    var bytes_read = ctx.body_output.read_chunk(buf, max);
+                    debug(@"[RC-$(ctx.id)] content_reader_callback read $(bytes_read) bytes");
                     if(bytes_read == 0) {
-                        if(!context.send_body_finished) {
-                            context.suspend_connection();
+                        if(!ctx.send_body_finished) {
+                            debug(@"[RC-$(ctx.id)] content_reader_callback: no data, body not finished - suspending");
+                            ctx.suspend_connection();
                         }
-                        else if(context.send_body_error == null) {
-                            context.server.response_contexts.remove(context);
+                        else if(ctx.send_body_error == null) {
+                            debug(@"[RC-$(ctx.id)] content_reader_callback: body finished successfully - END_OF_STREAM");
+                            ctx.server.response_contexts.remove(ctx);
                             return CONTENT_READER_END_OF_STREAM;
                         }
                         else {
-                            printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(context.send_body_error.message)\n");
-                            context.server.response_contexts.remove(context);
+                            printerr(@"Astralis Internal Server Error: Unhandled Error Sending HttpResult: $(ctx.send_body_error.message)\n");
+                            ctx.server.response_contexts.remove(ctx);
                             return CONTENT_READER_END_WITH_ERROR;
                         }
                     }
@@ -179,6 +204,7 @@ namespace Astralis {
             if(res != MHD.Result.YES) {
                 printerr("Astralis Internal Error: Unable to queue response\n");
             }
+            debug(@"[REQ-$(req_id)] send_result() queued response, result=%d", (int)res);
             return res;
         }
 

+ 51 - 3
src/Server/ServerOutput.vala

@@ -3,6 +3,8 @@ using Invercargill.DataStructures;
 namespace Astralis {
 
     internal class ServerOutput : Object, AsyncOutput {
+        private static int next_id = 0;
+        private int id;
 
         const int MAX_CHUNKS = 5;
         private BinaryData current_chunk = null;
@@ -11,19 +13,46 @@ namespace Astralis {
         internal signal void on_chunk_poped();
         private delegate void Handler();
         private bool _connected = true;
+        private Mutex chunks_lock = Mutex();
+
+        public ServerOutput() {
+            this.id = AtomicInt.add(ref next_id, 1);
+            debug(@"[SO-$(id)] Created");
+        }
 
         public bool connected { get { return _connected; } }
-        public bool write_would_block { get { return chunks.length > MAX_CHUNKS; } }
+        public bool write_would_block {
+            get {
+                chunks_lock.lock();
+                var result = chunks.length > MAX_CHUNKS;
+                chunks_lock.unlock();
+                return result;
+            }
+        }
 
         internal void close_connection() {
+            debug(@"[SO-$(id)] close_connection() called");
             _connected = false;
         }
 
         public async void write (Invercargill.BinaryData data) throws Error {
+            var buffer = data.to_byte_buffer();
+            debug(@"[SO-$(id)] write() called with $(buffer.length) bytes");
+            
             if (!_connected) {
+                debug(@"[SO-$(id)] write() FAILED - not connected");
                 throw new IOError.CLOSED("Cannot write to disconnected client");
             }
-            while(chunks.length > MAX_CHUNKS) {
+            bool should_wait = false;
+            chunks_lock.lock();
+            should_wait = chunks.length > MAX_CHUNKS;
+            chunks_lock.unlock();
+            
+            if (should_wait) {
+                debug(@"[SO-$(id)] write() waiting - chunks.length > MAX_CHUNKS");
+            }
+            
+            while(should_wait) {
                 Handler pop_handler = null;
                 pop_handler = () => {
                     on_chunk_poped.disconnect(pop_handler);
@@ -31,27 +60,46 @@ namespace Astralis {
                 };
                 on_chunk_poped.connect(pop_handler);
                 yield;
+                chunks_lock.lock();
+                should_wait = chunks.length > MAX_CHUNKS;
+                chunks_lock.unlock();
             }
-            var buffer = data.to_byte_buffer();
+            
             if(buffer.length > 0) {
+                chunks_lock.lock();
                 chunks.add(buffer);
+                var chunk_count = chunks.length;
+                chunks_lock.unlock();
+                debug(@"[SO-$(id)] write() added chunk, now $(chunk_count) chunks, firing on_new_chunk");
                 on_new_chunk();
             }
         }
 
         internal size_t read_chunk(void* buffer, size_t max_size) {
+            chunks_lock.lock();
+            
             if(current_chunk != null && current_chunk.peek_count() == 0) {
                 current_chunk = null;
             }
             if(current_chunk == null && chunks.length == 0) {
+                debug(@"[SO-$(id)] read_chunk() returning 0 - no data available");
+                chunks_lock.unlock();
                 return 0;
             }
             if(current_chunk == null) {
                 current_chunk = chunks.pop_start ();
+                var remaining = chunks.length;
+                chunks_lock.unlock();
+                debug(@"[SO-$(id)] read_chunk() popped new chunk, $(remaining) chunks remaining");
                 on_chunk_poped();
             }
+            else {
+                chunks_lock.unlock();
+            }
+            
             var size = current_chunk.write_to (buffer, max_size);
             current_chunk = current_chunk.skip((uint)size);
+            debug(@"[SO-$(id)] read_chunk() returning $(size) bytes");
             return size;
         }