# MHD + GLib Main Loop Integration Plan ## Problem Statement When running Astralis apps behind a reverse proxy with SSE connections, requests sometimes hang until another request unblocks them. This is caused by cross-thread communication issues between: 1. **MHD's internal select thread** (blocked in `select()`) 2. **GLib main loop thread** (calling `MHD_resume_connection()`) The inter-thread communication (ITC) signal from `MHD_resume_connection()` doesn't reliably wake the blocked select loop. ## Design Goals - Fix the hung connection problem - Keep GLib main loop for async/yield support - Efficient and responsive under heavy load - No busy waits - MHD details contained entirely within `Server/` directory ## Architecture Overview ```mermaid flowchart TB subgraph GLibMainLoop[GLib Main Loop - Single Thread] direction TB A[MHD Socket Watch] -->|socket activity| B[MHD.run] B -->|process| C[access_handler] C -->|async| D[Pipeline.run.begin] D -->|suspend| E[Connection Suspended] F[Timeout Watch] -->|periodic| G[MHD.run] G -->|check| H[Resume Suspended Connections] I[Async Callback] -->|data ready| J[MHD.resume_connection] J -->|same thread| K[Next MHD.run processes] end subgraph MHDIntegration[MHD Integration Layer] L[SocketMonitor] M[ConnectionManager] N[TimeoutManager] end GLibMainLoop --> MHDIntegration ``` ## Solution: External Select Mode Use MHD's **external select mode** where MHD creates no threads. Instead, we integrate MHD's file descriptors directly into GLib's main loop using `GIOChannel` watches. ### Key Components #### 1. SocketMonitor (new file: `Server/SocketMonitor.vala`) Monitors MHD's listen socket and connection sockets, integrating them with GLib's main loop. ```vala using Invercargill; using Invercargill.DataStructures; internal class SocketMonitor : Object { private MHD.Daemon daemon; private Dictionary channels; private Dictionary watch_ids; private uint timeout_id; public SocketMonitor(MHD.Daemon daemon) { this.daemon = daemon; this.channels = new Dictionary(); this.watch_ids = new Dictionary(); } public void start() { update_watches(); start_timeout_watcher(); } public void stop() { foreach (var id in watch_ids.get_values()) { Source.remove(id); } watch_ids.clear(); channels.clear(); if (timeout_id != 0) { Source.remove(timeout_id); timeout_id = 0; } } private void update_watches() { // Get current fd_set from MHD MHD.FDSet read_fds = MHD.FDSet(); MHD.FDSet write_fds = MHD.FDSet(); MHD.FDSet error_fds = MHD.FDSet(); int max_fd = 0; MHD.get_fdset(daemon, read_fds, write_fds, error_fds, &max_fd); // Update GLib IO channel watches var active_fds = new Series(); for (int fd = 0; fd <= max_fd; fd++) { bool needs_read = read_fds.is_set(fd); bool needs_write = write_fds.is_set(fd); if (needs_read || needs_write) { active_fds.add(fd); if (!channels.has_key(fd)) { add_watch_for_fd(fd, needs_read, needs_write); } } } // Remove watches for closed fds var to_remove = new Series(); foreach (var entry in channels) { if (!active_fds.contains(entry.key)) { to_remove.add(entry.key); } } foreach (int fd in to_remove) { remove_watch_for_fd(fd); } } private void add_watch_for_fd(int fd, bool monitor_read, bool monitor_write) { var channel = new IOChannel.unix_new(fd); IOCondition cond = IOCondition.IN | IOCondition.HUP | IOCondition.ERR; if (monitor_write) { cond |= IOCondition.OUT; } uint watch_id = channel.add_watch(cond, (source, condition) => { MHD.run(daemon); update_watches(); // Socket set may have changed return true; }); channels[fd] = channel; watch_ids[fd] = watch_id; } private void remove_watch_for_fd(int fd) { uint watch_id = watch_ids[fd]; Source.remove(watch_id); watch_ids.unset(fd); channels.unset(fd); } private void start_timeout_watcher() { // Use MHD's recommended timeout timeout_id = Timeout.add(50, () => { MHD.run(daemon); update_watches(); return true; }); } } ``` #### 2. ConnectionManager (existing, enhanced: `Server/RequestContext.vala`) Already exists, but enhance to track connection state for debugging. #### 3. Server (modified: `Server/Server.vala`) Main changes to use external select mode: ```vala using MHD; using Invercargill; using Invercargill.DataStructures; namespace Astralis { public class Server : Object { private Daemon daemon; private Pipeline pipeline; private int port; private Series request_contexts; private Series response_contexts; private SocketMonitor? socket_monitor; public Server(int port, Pipeline pipeline) { this.port = port; this.pipeline = pipeline; this.request_contexts = new Series(); this.response_contexts = new Series(); } public void run() { // External select mode: no USE_SELECT_INTERNALLY flag // This means MHD creates no threads - we drive it from GLib daemon = Daemon.start( MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG, (uint16) this.port, null, (connection, url, method, version, upload_data, upload_data_size, con_cls) => { return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls); }, MHD.OPTION_NOTIFY_COMPLETED, (void*) request_completed_callback, null, MHD.OPTION_END ); if (daemon == null) { error("Failed to start daemon"); } // Start socket monitoring integrated with GLib main loop socket_monitor = new SocketMonitor(daemon); socket_monitor.start(); // Run GLib main loop new MainLoop().run(); } public void stop() { if (socket_monitor != null) { socket_monitor.stop(); } // MHD_stop_daemon is called by Daemon's destructor } // ... rest of access_handler remains unchanged } } ``` ## VAPI Bindings Required Add to `vapi/libmicrohttpd.vapi`: ```vala // FDSet structure for select // Note: fd_set is a struct containing an array of long integers // The actual size varies by platform but FD_SETSIZE is typically 1024 [CCode (cname = "fd_set", has_type_id = false, destroy_function = "", cprefix = "")] public struct FDSet { // Internal storage - size for FD_SETSIZE=1024 on Linux [CCode (cname = "fds_bits", array_length = false)] private ulong bits[16]; [CCode (cname = "FD_ZERO")] public void zero(); [CCode (cname = "FD_SET")] public void set(int fd); [CCode (cname = "FD_CLR")] public void clr(int fd); [CCode (cname = "FD_ISSET")] public bool is_set(int fd); } // External select mode functions [CCode (cname = "MHD_run")] public Result run(Daemon daemon); [CCode (cname = "MHD_get_fdset")] public Result get_fdset(Daemon daemon, FDSet read_fds, FDSet write_fds, FDSet error_fds, int* max_fd); [CCode (cname = "MHD_get_timeout")] public Result get_timeout(Daemon daemon, uint64* timeout); // Additional useful constants [CCode (cname = "MHD_USE_EPOLL")] public const uint USE_EPOLL; [CCode (cname = "MHD_USE_POLL")] public const uint USE_POLL; [CCode (cname = "MHD_USE_AUTO")] public const uint USE_AUTO; [CCode (cname = "MHD_USE_ITC")] public const uint USE_ITC; ``` ## Why This Solves All Requirements ### 1. Fixes Hung Connection Problem - `MHD_resume_connection()` is called from GLib main loop thread - `MHD.run()` is called from the **same** GLib main loop thread - No cross-thread communication needed - resume takes effect immediately ### 2. Keeps GLib Main Loop - All async/yield code continues to work unchanged - Pipeline handlers, SSE endpoints, etc. all run in GLib main loop - No changes needed outside `Server/` directory ### 3. Efficient Under Heavy Load - Event-driven: only calls `MHD.run()` when sockets have activity - No polling or busy waits - GLib's IOChannel watches are highly optimized - Can add `USE_EPOLL` flag on Linux for O(1) socket multiplexing ### 4. No Busy Waits - Uses `GIOChannel.add_watch()` for edge-triggered notification - Timeout watcher only runs at MHD's recommended interval - MHD.get_timeout() can be used for dynamic timeout adjustment ## Performance Optimizations ### Optional: Dynamic Timeout ```vala private void start_timeout_watcher() { schedule_next_timeout(); } private void schedule_next_timeout() { uint64 timeout_ms = 0; if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) { // Cap at reasonable maximum timeout_ms = uint64.min(timeout_ms, 1000); } else { timeout_ms = 50; // Default fallback } timeout_id = Timeout.add((uint)timeout_ms, () => { MHD.run(daemon); update_watches(); schedule_next_timeout(); // Reschedule with new timeout return false; // Don't auto-repeat }); } ``` ### Optional: epoll on Linux For Linux systems with many connections: ```vala daemon = Daemon.start( MHD.USE_EPOLL | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG, // ... ); ``` Note: `USE_EPOLL` with external select requires special handling - see MHD documentation. ## File Structure ``` src/Server/ ├── Server.vala # Modified: external select mode ├── SocketMonitor.vala # NEW: GLib IO integration ├── RequestContext.vala # Unchanged ├── ResponseContext.vala # Unchanged ├── ServerInput.vala # Unchanged └── ServerOutput.vala # Unchanged vapi/ └── libmicrohttpd.vapi # Modified: add FDSet, run, get_fdset, get_timeout ``` ## Implementation Steps 1. **Add VAPI bindings** for `FDSet`, `MHD_run`, `MHD_get_fdset`, `MHD_get_timeout` 2. **Create SocketMonitor.vala** with GLib IO channel integration 3. **Modify Server.vala** to use external select mode 4. **Update meson.build** to include new file 5. **Test** with SSE example under load ## Testing Strategy 1. **Unit test**: SocketMonitor correctly adds/removes watches 2. **Integration test**: SSE connections don't hang under concurrent load 3. **Load test**: Use `ab` or `wrk` to verify performance 4. **Reverse proxy test**: Behind nginx with SSE connections ## Rollback Plan If issues arise, simply revert to `USE_SELECT_INTERNALLY`: ```vala daemon = Daemon.start( MHD.USE_SELECT_INTERNALLY | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG, // ... ); ``` And remove SocketMonitor usage. ## Future Enhancements 1. **Metrics**: Track connection counts, suspend/resume frequency 2. **Dynamic polling**: Switch between select/poll/epoll based on connection count 3. **Connection pooling**: Reuse connections for keep-alive