Explorar o código

feat: Introduce explicit socket monitor updates on connection resume and improve MHD timeout scheduling.

Billy Barrow hai 1 semana
pai
achega
9984dd9551
Modificáronse 3 ficheiros con 92 adicións e 37 borrados
  1. 1 0
      src/Server/ResponseContext.vala
  2. 7 0
      src/Server/Server.vala
  3. 84 37
      src/Server/SocketMonitor.vala

+ 1 - 0
src/Server/ResponseContext.vala

@@ -43,6 +43,7 @@ namespace Astralis {
                 if(connection_suspended) {
                     MHD.resume_connection(connection);
                     connection_suspended = false;
+                    server.request_monitor_update();
                 } else {
                     // Mark that we have data, preventing suspend in race window
                     pending_resume = true;

+ 7 - 0
src/Server/Server.vala

@@ -80,6 +80,7 @@ namespace Astralis {
                     if(context.handler_finished()) {
                         // Just resume - access_handler will be re-entered and handle the response
                         MHD.resume_connection(connection);
+                        this.request_monitor_update();
                     }
                 });
             }
@@ -144,6 +145,12 @@ namespace Astralis {
             // MHD_stop_daemon is called by Daemon's destructor when daemon is finalized
         }
 
+        internal void request_monitor_update() {
+            if (socket_monitor != null) {
+                socket_monitor.force_update();
+            }
+        }
+
         private Result respond(Connection connection, RequestContext context) {
             var result = Result.NO;
             if(context.handler_error != null) {

+ 84 - 37
src/Server/SocketMonitor.vala

@@ -9,15 +9,27 @@ namespace Astralis {
     /// main loop, eliminating cross-thread communication issues with suspend/resume.
     internal class SocketMonitor : Object {
         private unowned MHD.Daemon daemon;
-        private Dictionary<int, IOChannel> channels;
-        private Dictionary<int, uint> watch_ids;
+        private Dictionary<int, WatchInfo> watches;
         private uint timeout_id;
         private bool running;
         
+        private class WatchInfo {
+            public IOChannel channel;
+            public uint watch_id;
+            public bool monitor_read;
+            public bool monitor_write;
+            
+            public WatchInfo(IOChannel channel, uint watch_id, bool monitor_read, bool monitor_write) {
+                this.channel = channel;
+                this.watch_id = watch_id;
+                this.monitor_read = monitor_read;
+                this.monitor_write = monitor_write;
+            }
+        }
+        
         public SocketMonitor(MHD.Daemon daemon) {
             this.daemon = daemon;
-            this.channels = new Dictionary<int, IOChannel>();
-            this.watch_ids = new Dictionary<int, uint>();
+            this.watches = new Dictionary<int, WatchInfo>();
             this.timeout_id = 0;
             this.running = false;
         }
@@ -40,11 +52,10 @@ namespace Astralis {
             running = false;
             
             // Remove all IO channel watches
-            foreach (var entry in watch_ids) {
-                Source.remove(entry.value);
+            foreach (var entry in watches) {
+                Source.remove(entry.value.watch_id);
             }
-            watch_ids.clear();
-            channels.clear();
+            watches.clear();
             
             // Remove timeout watcher
             if (timeout_id != 0) {
@@ -80,9 +91,14 @@ namespace Astralis {
                 if (needs_read || needs_write) {
                     active_fds.add(fd);
                     
-                    // Add new watch if not already monitoring this fd
-                    IOChannel? existing_channel = null;
-                    if (!channels.try_get(fd, out existing_channel)) {
+                    // Add new watch if not already monitoring this fd, or replace it if conditions changed
+                    WatchInfo? existing_watch = null;
+                    if (watches.try_get(fd, out existing_watch)) {
+                        if (existing_watch.monitor_read != needs_read || existing_watch.monitor_write != needs_write) {
+                            remove_watch_for_fd(fd);
+                            add_watch_for_fd(fd, needs_read, needs_write);
+                        }
+                    } else {
                         add_watch_for_fd(fd, needs_read, needs_write);
                     }
                 }
@@ -90,7 +106,7 @@ namespace Astralis {
             
             // Remove watches for closed file descriptors
             var to_remove = new Series<int>();
-            foreach (var entry in channels) {
+            foreach (var entry in watches) {
                 if (!active_fds.contains(entry.key)) {
                     to_remove.add(entry.key);
                 }
@@ -124,43 +140,35 @@ namespace Astralis {
                 // Socket set may have changed after processing
                 update_watches();
                 
+                // Reschedule timeout if MHD requested a new one
+                schedule_next_timeout();
+                
                 // Continue monitoring
                 return true;
             });
             
-            channels[fd] = channel;
-            watch_ids[fd] = watch_id;
+            watches[fd] = new WatchInfo(channel, watch_id, monitor_read, monitor_write);
         }
         
         /// Remove a GLib IO channel watch for a file descriptor
         private void remove_watch_for_fd(int fd) {
-            uint? watch_id = null;
-            if (watch_ids.try_get(fd, out watch_id)) {
-                Source.remove(watch_id);
-                watch_ids.remove(fd);
+            WatchInfo? info = null;
+            if (watches.try_get(fd, out info)) {
+                Source.remove(info.watch_id);
+                watches.remove(fd);
             }
-            channels.remove(fd);
         }
         
-        /// Schedule the next timeout callback using MHD's recommended timeout
-        private void schedule_next_timeout() {
-            if (!running) {
+        private bool update_pending = false;
+
+        public void force_update() {
+            if (!running || update_pending) {
                 return;
             }
+            update_pending = true;
             
-            uint64 timeout_ms = 0;
-            if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
-                // Cap at reasonable maximum to ensure responsiveness
-                timeout_ms = uint64.min(timeout_ms, 1000);
-            } else {
-                // Default fallback when MHD doesn't specify a timeout
-                timeout_ms = 50;
-            }
-            
-            // Ensure minimum timeout to avoid busy-waiting
-            timeout_ms = uint64.max(timeout_ms, 1);
-            
-            timeout_id = Timeout.add((uint)timeout_ms, () => {
+            Idle.add(() => {
+                update_pending = false;
                 if (!running) {
                     return false;
                 }
@@ -171,12 +179,51 @@ namespace Astralis {
                 // Socket set may have changed
                 update_watches();
                 
-                // Reschedule with updated timeout
+                // Reschedule timeout
                 schedule_next_timeout();
                 
-                // Don't auto-repeat - we reschedule manually with potentially new timeout
                 return false;
             });
         }
+        
+        /// Schedule the next timeout callback using MHD's recommended timeout
+        private void schedule_next_timeout() {
+            if (!running) {
+                return;
+            }
+            
+            if (timeout_id != 0) {
+                Source.remove(timeout_id);
+                timeout_id = 0;
+            }
+            
+            uint64 timeout_ms = 0;
+            if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
+                if (timeout_ms == 0) {
+                    timeout_id = Idle.add(() => {
+                        if (!running) {
+                            return false;
+                        }
+                        timeout_id = 0;
+                        MHD.run(daemon);
+                        update_watches();
+                        schedule_next_timeout();
+                        return false; // run once, reschedule later
+                    });
+                } else {
+                    timeout_id = Timeout.add((uint)timeout_ms, () => {
+                        if (!running) {
+                            return false;
+                        }
+                        timeout_id = 0;
+                        MHD.run(daemon);
+                        update_watches();
+                        schedule_next_timeout();
+                        // Don't auto-repeat - we reschedule manually with potentially new timeout
+                        return false;
+                    });
+                }
+            }
+        }
     }
 }