Explorar o código

feat(server): integrate libmicrohttpd with GLib main loop

Switch from MHD's internal threading (USE_SELECT_INTERNALLY) to external
select mode, enabling proper integration with GLib's main event loop.
This allows the server to participate in GLib's async I/O handling and
provides a clean stop() method for graceful shutdown.

- Add SocketMonitor class to bridge MHD sockets with GLib main loop
- Add stop() method to Server for graceful shutdown
- Extend libmicrohttpd.vapi with FDSet struct and external select bindings
- Add USE_TURBO, USE_EPOLL, USE_POLL, USE_AUTO flag constants
Billy Barrow hai 1 semana
pai
achega
950101f871

+ 393 - 0
plans/mhd-glib-integration-plan.md

@@ -0,0 +1,393 @@
+# MHD + GLib Main Loop Integration Plan
+
+## Problem Statement
+
+When running Astralis apps behind a reverse proxy with SSE connections, requests sometimes hang until another request unblocks them. This is caused by cross-thread communication issues between:
+
+1. **MHD's internal select thread** (blocked in `select()`)
+2. **GLib main loop thread** (calling `MHD_resume_connection()`)
+
+The inter-thread communication (ITC) signal from `MHD_resume_connection()` doesn't reliably wake the blocked select loop.
+
+## Design Goals
+
+- Fix the hung connection problem
+- Keep GLib main loop for async/yield support
+- Efficient and responsive under heavy load
+- No busy waits
+- MHD details contained entirely within `Server/` directory
+
+## Architecture Overview
+
+```mermaid
+flowchart TB
+    subgraph GLibMainLoop[GLib Main Loop - Single Thread]
+        direction TB
+        A[MHD Socket Watch] -->|socket activity| B[MHD.run]
+        B -->|process| C[access_handler]
+        C -->|async| D[Pipeline.run.begin]
+        D -->|suspend| E[Connection Suspended]
+        
+        F[Timeout Watch] -->|periodic| G[MHD.run]
+        G -->|check| H[Resume Suspended Connections]
+        
+        I[Async Callback] -->|data ready| J[MHD.resume_connection]
+        J -->|same thread| K[Next MHD.run processes]
+    end
+    
+    subgraph MHDIntegration[MHD Integration Layer]
+        L[SocketMonitor]
+        M[ConnectionManager]
+        N[TimeoutManager]
+    end
+    
+    GLibMainLoop --> MHDIntegration
+```
+
+## Solution: External Select Mode
+
+Use MHD's **external select mode** where MHD creates no threads. Instead, we integrate MHD's file descriptors directly into GLib's main loop using `GIOChannel` watches.
+
+### Key Components
+
+#### 1. SocketMonitor (new file: `Server/SocketMonitor.vala`)
+
+Monitors MHD's listen socket and connection sockets, integrating them with GLib's main loop.
+
+```vala
+using Invercargill;
+using Invercargill.DataStructures;
+
+internal class SocketMonitor : Object {
+    private MHD.Daemon daemon;
+    private Dictionary<int, IOChannel> channels;
+    private Dictionary<int, uint> watch_ids;
+    private uint timeout_id;
+    
+    public SocketMonitor(MHD.Daemon daemon) {
+        this.daemon = daemon;
+        this.channels = new Dictionary<int, IOChannel>();
+        this.watch_ids = new Dictionary<int, uint>();
+    }
+    
+    public void start() {
+        update_watches();
+        start_timeout_watcher();
+    }
+    
+    public void stop() {
+        foreach (var id in watch_ids.get_values()) {
+            Source.remove(id);
+        }
+        watch_ids.clear();
+        channels.clear();
+        
+        if (timeout_id != 0) {
+            Source.remove(timeout_id);
+            timeout_id = 0;
+        }
+    }
+    
+    private void update_watches() {
+        // Get current fd_set from MHD
+        MHD.FDSet read_fds = MHD.FDSet();
+        MHD.FDSet write_fds = MHD.FDSet();
+        MHD.FDSet error_fds = MHD.FDSet();
+        int max_fd = 0;
+        
+        MHD.get_fdset(daemon, read_fds, write_fds, error_fds, &max_fd);
+        
+        // Update GLib IO channel watches
+        var active_fds = new Series<int>();
+        for (int fd = 0; fd <= max_fd; fd++) {
+            bool needs_read = read_fds.is_set(fd);
+            bool needs_write = write_fds.is_set(fd);
+            
+            if (needs_read || needs_write) {
+                active_fds.add(fd);
+                
+                if (!channels.has_key(fd)) {
+                    add_watch_for_fd(fd, needs_read, needs_write);
+                }
+            }
+        }
+        
+        // Remove watches for closed fds
+        var to_remove = new Series<int>();
+        foreach (var entry in channels) {
+            if (!active_fds.contains(entry.key)) {
+                to_remove.add(entry.key);
+            }
+        }
+        foreach (int fd in to_remove) {
+            remove_watch_for_fd(fd);
+        }
+    }
+    
+    private void add_watch_for_fd(int fd, bool monitor_read, bool monitor_write) {
+        var channel = new IOChannel.unix_new(fd);
+        IOCondition cond = IOCondition.IN | IOCondition.HUP | IOCondition.ERR;
+        if (monitor_write) {
+            cond |= IOCondition.OUT;
+        }
+        
+        uint watch_id = channel.add_watch(cond, (source, condition) => {
+            MHD.run(daemon);
+            update_watches(); // Socket set may have changed
+            return true;
+        });
+        
+        channels[fd] = channel;
+        watch_ids[fd] = watch_id;
+    }
+    
+    private void remove_watch_for_fd(int fd) {
+        uint watch_id = watch_ids[fd];
+        Source.remove(watch_id);
+        watch_ids.unset(fd);
+        channels.unset(fd);
+    }
+    
+    private void start_timeout_watcher() {
+        // Use MHD's recommended timeout
+        timeout_id = Timeout.add(50, () => {
+            MHD.run(daemon);
+            update_watches();
+            return true;
+        });
+    }
+}
+```
+
+#### 2. ConnectionManager (existing, enhanced: `Server/RequestContext.vala`)
+
+Already exists, but enhance to track connection state for debugging.
+
+#### 3. Server (modified: `Server/Server.vala`)
+
+Main changes to use external select mode:
+
+```vala
+using MHD;
+using Invercargill;
+using Invercargill.DataStructures;
+
+namespace Astralis {
+
+    public class Server : Object {
+        private Daemon daemon;
+        private Pipeline pipeline;
+        private int port;
+        private Series<RequestContext> request_contexts;
+        private Series<ResponseContext> response_contexts;
+        private SocketMonitor? socket_monitor;
+
+        public Server(int port, Pipeline pipeline) {
+            this.port = port;
+            this.pipeline = pipeline;
+            this.request_contexts = new Series<RequestContext>();
+            this.response_contexts = new Series<ResponseContext>();
+        }
+
+        public void run() {
+            // External select mode: no USE_SELECT_INTERNALLY flag
+            // This means MHD creates no threads - we drive it from GLib
+            daemon = Daemon.start(
+                MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
+                (uint16) this.port,
+                null,
+                (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
+                    return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
+                },
+                MHD.OPTION_NOTIFY_COMPLETED,
+                (void*) request_completed_callback,
+                null,
+                MHD.OPTION_END
+            );
+
+            if (daemon == null) {
+                error("Failed to start daemon");
+            }
+
+            // Start socket monitoring integrated with GLib main loop
+            socket_monitor = new SocketMonitor(daemon);
+            socket_monitor.start();
+
+            // Run GLib main loop
+            new MainLoop().run();
+        }
+
+        public void stop() {
+            if (socket_monitor != null) {
+                socket_monitor.stop();
+            }
+            // MHD_stop_daemon is called by Daemon's destructor
+        }
+
+        // ... rest of access_handler remains unchanged
+    }
+}
+```
+
+## VAPI Bindings Required
+
+Add to `vapi/libmicrohttpd.vapi`:
+
+```vala
+// FDSet structure for select
+// Note: fd_set is a struct containing an array of long integers
+// The actual size varies by platform but FD_SETSIZE is typically 1024
+[CCode (cname = "fd_set", has_type_id = false, destroy_function = "", cprefix = "")]
+public struct FDSet {
+    // Internal storage - size for FD_SETSIZE=1024 on Linux
+    [CCode (cname = "fds_bits", array_length = false)]
+    private ulong bits[16];
+    
+    [CCode (cname = "FD_ZERO")]
+    public void zero();
+    
+    [CCode (cname = "FD_SET")]
+    public void set(int fd);
+    
+    [CCode (cname = "FD_CLR")]
+    public void clr(int fd);
+    
+    [CCode (cname = "FD_ISSET")]
+    public bool is_set(int fd);
+}
+
+// External select mode functions
+[CCode (cname = "MHD_run")]
+public Result run(Daemon daemon);
+
+[CCode (cname = "MHD_get_fdset")]
+public Result get_fdset(Daemon daemon, FDSet read_fds, FDSet write_fds, FDSet error_fds, int* max_fd);
+
+[CCode (cname = "MHD_get_timeout")]
+public Result get_timeout(Daemon daemon, uint64* timeout);
+
+// Additional useful constants
+[CCode (cname = "MHD_USE_EPOLL")]
+public const uint USE_EPOLL;
+
+[CCode (cname = "MHD_USE_POLL")]
+public const uint USE_POLL;
+
+[CCode (cname = "MHD_USE_AUTO")]
+public const uint USE_AUTO;
+
+[CCode (cname = "MHD_USE_ITC")]
+public const uint USE_ITC;
+```
+
+## Why This Solves All Requirements
+
+### 1. Fixes Hung Connection Problem
+- `MHD_resume_connection()` is called from GLib main loop thread
+- `MHD.run()` is called from the **same** GLib main loop thread
+- No cross-thread communication needed - resume takes effect immediately
+
+### 2. Keeps GLib Main Loop
+- All async/yield code continues to work unchanged
+- Pipeline handlers, SSE endpoints, etc. all run in GLib main loop
+- No changes needed outside `Server/` directory
+
+### 3. Efficient Under Heavy Load
+- Event-driven: only calls `MHD.run()` when sockets have activity
+- No polling or busy waits
+- GLib's IOChannel watches are highly optimized
+- Can add `USE_EPOLL` flag on Linux for O(1) socket multiplexing
+
+### 4. No Busy Waits
+- Uses `GIOChannel.add_watch()` for edge-triggered notification
+- Timeout watcher only runs at MHD's recommended interval
+- MHD.get_timeout() can be used for dynamic timeout adjustment
+
+## Performance Optimizations
+
+### Optional: Dynamic Timeout
+
+```vala
+private void start_timeout_watcher() {
+    schedule_next_timeout();
+}
+
+private void schedule_next_timeout() {
+    uint64 timeout_ms = 0;
+    if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
+        // Cap at reasonable maximum
+        timeout_ms = uint64.min(timeout_ms, 1000);
+    } else {
+        timeout_ms = 50; // Default fallback
+    }
+    
+    timeout_id = Timeout.add((uint)timeout_ms, () => {
+        MHD.run(daemon);
+        update_watches();
+        schedule_next_timeout(); // Reschedule with new timeout
+        return false; // Don't auto-repeat
+    });
+}
+```
+
+### Optional: epoll on Linux
+
+For Linux systems with many connections:
+
+```vala
+daemon = Daemon.start(
+    MHD.USE_EPOLL | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
+    // ...
+);
+```
+
+Note: `USE_EPOLL` with external select requires special handling - see MHD documentation.
+
+## File Structure
+
+```
+src/Server/
+├── Server.vala           # Modified: external select mode
+├── SocketMonitor.vala    # NEW: GLib IO integration
+├── RequestContext.vala   # Unchanged
+├── ResponseContext.vala  # Unchanged
+├── ServerInput.vala      # Unchanged
+└── ServerOutput.vala     # Unchanged
+
+vapi/
+└── libmicrohttpd.vapi    # Modified: add FDSet, run, get_fdset, get_timeout
+```
+
+## Implementation Steps
+
+1. **Add VAPI bindings** for `FDSet`, `MHD_run`, `MHD_get_fdset`, `MHD_get_timeout`
+2. **Create SocketMonitor.vala** with GLib IO channel integration
+3. **Modify Server.vala** to use external select mode
+4. **Update meson.build** to include new file
+5. **Test** with SSE example under load
+
+## Testing Strategy
+
+1. **Unit test**: SocketMonitor correctly adds/removes watches
+2. **Integration test**: SSE connections don't hang under concurrent load
+3. **Load test**: Use `ab` or `wrk` to verify performance
+4. **Reverse proxy test**: Behind nginx with SSE connections
+
+## Rollback Plan
+
+If issues arise, simply revert to `USE_SELECT_INTERNALLY`:
+
+```vala
+daemon = Daemon.start(
+    MHD.USE_SELECT_INTERNALLY | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
+    // ...
+);
+```
+
+And remove SocketMonitor usage.
+
+## Future Enhancements
+
+1. **Metrics**: Track connection counts, suspend/resume frequency
+2. **Dynamic polling**: Switch between select/poll/epoll based on connection count
+3. **Connection pooling**: Reuse connections for keep-alive

+ 28 - 5
src/Server/Server.vala

@@ -10,12 +10,16 @@ namespace Astralis {
         private int port;
         private HashSet<RequestContext> request_contexts;
         private HashSet<ResponseContext> response_contexts;
+        private SocketMonitor? socket_monitor;
+        private MainLoop main_loop;
 
         public Server(int port, Pipeline pipeline) {
             this.port = port;
             this.pipeline = pipeline;
             this.request_contexts = new HashSet<RequestContext>();
             this.response_contexts = new HashSet<ResponseContext>();
+            this.socket_monitor = null;
+            this.main_loop = new MainLoop();
         }
 
         private int access_handler (Connection connection, string? url, string? method, string? version, string? upload_data, size_t* upload_data_size, void** con_cls) {
@@ -100,12 +104,14 @@ namespace Astralis {
         }
 
         public void run() {
+            // External select mode: no USE_SELECT_INTERNALLY flag
+            // This means MHD creates no threads - we drive it from GLib main loop
+            // USE_TURBO enables faster processing by not explicitly closing connections
             daemon = Daemon.start(
-                MHD.USE_SELECT_INTERNALLY | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG, 
-                (uint16) this.port, 
-                null, 
+                MHD.ALLOW_SUSPEND_RESUME | MHD.USE_TURBO,
+                (uint16) this.port,
+                null,
                 (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
-                    // Trampoline to instance method if needed, or just use lambda capturing 'this'
                     return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
                 },
                 MHD.OPTION_NOTIFY_COMPLETED,
@@ -118,7 +124,24 @@ namespace Astralis {
                 error("Failed to start daemon");
             }
 
-            new MainLoop().run();
+            // Start socket monitoring integrated with GLib main loop
+            socket_monitor = new SocketMonitor(daemon);
+            socket_monitor.start();
+
+            // Run GLib main loop
+            main_loop.run();
+        }
+
+        /// Stop the server and clean up resources
+        public void stop() {
+            if (socket_monitor != null) {
+                socket_monitor.stop();
+                socket_monitor = null;
+            }
+            if (main_loop.is_running()) {
+                main_loop.quit();
+            }
+            // MHD_stop_daemon is called by Daemon's destructor when daemon is finalized
         }
 
         private Result respond(Connection connection, RequestContext context) {

+ 182 - 0
src/Server/SocketMonitor.vala

@@ -0,0 +1,182 @@
+using MHD;
+using Invercargill;
+using Invercargill.DataStructures;
+
+namespace Astralis {
+
+    /// Monitors MHD's file descriptors and integrates them with GLib's main loop.
+    /// This enables external select mode where MHD runs entirely within the GLib
+    /// main loop, eliminating cross-thread communication issues with suspend/resume.
+    internal class SocketMonitor : Object {
+        private unowned MHD.Daemon daemon;
+        private Dictionary<int, IOChannel> channels;
+        private Dictionary<int, uint> watch_ids;
+        private uint timeout_id;
+        private bool running;
+        
+        public SocketMonitor(MHD.Daemon daemon) {
+            this.daemon = daemon;
+            this.channels = new Dictionary<int, IOChannel>();
+            this.watch_ids = new Dictionary<int, uint>();
+            this.timeout_id = 0;
+            this.running = false;
+        }
+        
+        /// Start monitoring MHD sockets and integrate with GLib main loop
+        public void start() {
+            if (running) {
+                return;
+            }
+            running = true;
+            update_watches();
+            schedule_next_timeout();
+        }
+        
+        /// Stop monitoring and clean up all GLib sources
+        public void stop() {
+            if (!running) {
+                return;
+            }
+            running = false;
+            
+            // Remove all IO channel watches
+            foreach (var entry in watch_ids) {
+                Source.remove(entry.value);
+            }
+            watch_ids.clear();
+            channels.clear();
+            
+            // Remove timeout watcher
+            if (timeout_id != 0) {
+                Source.remove(timeout_id);
+                timeout_id = 0;
+            }
+        }
+        
+        /// Update GLib IO channel watches based on MHD's current fd_set
+        private void update_watches() {
+            if (!running) {
+                return;
+            }
+            
+            // Get current fd_set from MHD
+            MHD.FDSet read_fds = MHD.FDSet();
+            MHD.FDSet write_fds = MHD.FDSet();
+            MHD.FDSet error_fds = MHD.FDSet();
+            int max_fd = 0;
+            
+            MHD.FDSet.zero(ref read_fds);
+            MHD.FDSet.zero(ref write_fds);
+            MHD.FDSet.zero(ref error_fds);
+            
+            MHD.get_fdset(daemon, ref read_fds, ref write_fds, ref error_fds, &max_fd);
+            
+            // Build set of active file descriptors
+            var active_fds = new Series<int>();
+            for (int fd = 0; fd <= max_fd; fd++) {
+                bool needs_read = MHD.FDSet.is_set(fd, ref read_fds);
+                bool needs_write = MHD.FDSet.is_set(fd, ref write_fds);
+                
+                if (needs_read || needs_write) {
+                    active_fds.add(fd);
+                    
+                    // Add new watch if not already monitoring this fd
+                    IOChannel? existing_channel = null;
+                    if (!channels.try_get(fd, out existing_channel)) {
+                        add_watch_for_fd(fd, needs_read, needs_write);
+                    }
+                }
+            }
+            
+            // Remove watches for closed file descriptors
+            var to_remove = new Series<int>();
+            foreach (var entry in channels) {
+                if (!active_fds.contains(entry.key)) {
+                    to_remove.add(entry.key);
+                }
+            }
+            foreach (int fd in to_remove) {
+                remove_watch_for_fd(fd);
+            }
+        }
+        
+        /// Add a GLib IO channel watch for a file descriptor
+        private void add_watch_for_fd(int fd, bool monitor_read, bool monitor_write) {
+            var channel = new IOChannel.unix_new(fd);
+            
+            // Build IO condition flags - always monitor for errors/hangup
+            IOCondition cond = IOCondition.HUP | IOCondition.ERR | IOCondition.NVAL;
+            if (monitor_read) {
+                cond |= IOCondition.IN;
+            }
+            if (monitor_write) {
+                cond |= IOCondition.OUT;
+            }
+            
+            uint watch_id = channel.add_watch(cond, (source, condition) => {
+                if (!running) {
+                    return false;
+                }
+                
+                // Run MHD processing
+                MHD.run(daemon);
+                
+                // Socket set may have changed after processing
+                update_watches();
+                
+                // Continue monitoring
+                return true;
+            });
+            
+            channels[fd] = channel;
+            watch_ids[fd] = watch_id;
+        }
+        
+        /// Remove a GLib IO channel watch for a file descriptor
+        private void remove_watch_for_fd(int fd) {
+            uint? watch_id = null;
+            if (watch_ids.try_get(fd, out watch_id)) {
+                Source.remove(watch_id);
+                watch_ids.remove(fd);
+            }
+            channels.remove(fd);
+        }
+        
+        /// Schedule the next timeout callback using MHD's recommended timeout
+        private void schedule_next_timeout() {
+            if (!running) {
+                return;
+            }
+            
+            uint64 timeout_ms = 0;
+            if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
+                // Cap at reasonable maximum to ensure responsiveness
+                timeout_ms = uint64.min(timeout_ms, 1000);
+            } else {
+                // Default fallback when MHD doesn't specify a timeout
+                timeout_ms = 50;
+            }
+            
+            // Ensure minimum timeout to avoid busy-waiting
+            timeout_ms = uint64.max(timeout_ms, 1);
+            
+            timeout_id = Timeout.add((uint)timeout_ms, () => {
+                if (!running) {
+                    return false;
+                }
+                
+                // Run MHD processing
+                MHD.run(daemon);
+                
+                // Socket set may have changed
+                update_watches();
+                
+                // Reschedule with updated timeout
+                schedule_next_timeout();
+                
+                // Don't auto-repeat - we reschedule manually with potentially new timeout
+                return false;
+            });
+        }
+    }
+}

+ 2 - 1
src/meson.build

@@ -1,4 +1,4 @@
-sources = files(
+esources = files(
     'Core/HttpContext.vala',
     'Core/HttpValues.vala',
     'Core/HttpResult.vala',
@@ -17,6 +17,7 @@ sources = files(
     'Endpoints/FastResource.vala',
     'Endpoints/SseEndpoint.vala',
     'Server/Server.vala',
+    'Server/SocketMonitor.vala',
     'Server/RequestContext.vala',
     'Server/ResponseContext.vala',
     'Server/ServerInput.vala',

+ 40 - 0
vapi/libmicrohttpd.vapi

@@ -40,6 +40,14 @@ namespace MHD {
     public const uint USE_DEBUG;
     [CCode (cname = "MHD_ALLOW_SUSPEND_RESUME")]
     public const uint ALLOW_SUSPEND_RESUME;
+    [CCode (cname = "MHD_USE_TURBO")]
+    public const uint USE_TURBO;
+    [CCode (cname = "MHD_USE_EPOLL")]
+    public const uint USE_EPOLL;
+    [CCode (cname = "MHD_USE_POLL")]
+    public const uint USE_POLL;
+    [CCode (cname = "MHD_USE_AUTO")]
+    public const uint USE_AUTO;
 
     [CCode (cname = "MHD_OPTION_END")]
     public const int OPTION_END;
@@ -161,4 +169,36 @@ namespace MHD {
 
     [CCode (cname = "MHD_get_connection_info")]
     public unowned ConnectionInfo? get_connection_info (Connection connection, ConnectionInfoType info_type);
+
+    // FDSet structure for external select mode
+    // Note: fd_set is a struct containing an array of long integers
+    // The actual size varies by platform but FD_SETSIZE is typically 1024
+    [CCode (cname = "fd_set", has_type_id = false, destroy_function = "")]
+    public struct FDSet {
+        // Internal storage - size for FD_SETSIZE=1024 on Linux (16 * 8 bytes = 128 bytes = 1024 bits)
+        [CCode (cname = "__fds_bits", array_length = false)]
+        private ulong fds_bits[16];
+        
+        [CCode (cname = "FD_ZERO")]
+        public static void zero(ref FDSet set);
+        
+        [CCode (cname = "FD_SET")]
+        public static void set(int fd, ref FDSet set);
+        
+        [CCode (cname = "FD_CLR")]
+        public static void clr(int fd, ref FDSet set);
+        
+        [CCode (cname = "FD_ISSET")]
+        public static bool is_set(int fd, ref FDSet set);
+    }
+
+    // External select mode functions
+    [CCode (cname = "MHD_run")]
+    public Result run(Daemon daemon);
+
+    [CCode (cname = "MHD_get_fdset")]
+    public Result get_fdset(Daemon daemon, ref FDSet read_fds, ref FDSet write_fds, ref FDSet error_fds, int* max_fd);
+
+    [CCode (cname = "MHD_get_timeout")]
+    public Result get_timeout(Daemon daemon, uint64* timeout);
 }