mhd-glib-integration-plan.md 12 KB

MHD + GLib Main Loop Integration Plan

Problem Statement

When running Astralis apps behind a reverse proxy with SSE connections, requests sometimes hang until another request unblocks them. This is caused by cross-thread communication issues between:

  1. MHD's internal select thread (blocked in select())
  2. GLib main loop thread (calling MHD_resume_connection())

The inter-thread communication (ITC) signal from MHD_resume_connection() doesn't reliably wake the blocked select loop.

Design Goals

  • Fix the hung connection problem
  • Keep GLib main loop for async/yield support
  • Efficient and responsive under heavy load
  • No busy waits
  • MHD details contained entirely within Server/ directory

Architecture Overview

flowchart TB
    subgraph GLibMainLoop[GLib Main Loop - Single Thread]
        direction TB
        A[MHD Socket Watch] -->|socket activity| B[MHD.run]
        B -->|process| C[access_handler]
        C -->|async| D[Pipeline.run.begin]
        D -->|suspend| E[Connection Suspended]
        
        F[Timeout Watch] -->|periodic| G[MHD.run]
        G -->|check| H[Resume Suspended Connections]
        
        I[Async Callback] -->|data ready| J[MHD.resume_connection]
        J -->|same thread| K[Next MHD.run processes]
    end
    
    subgraph MHDIntegration[MHD Integration Layer]
        L[SocketMonitor]
        M[ConnectionManager]
        N[TimeoutManager]
    end
    
    GLibMainLoop --> MHDIntegration

Solution: External Select Mode

Use MHD's external select mode where MHD creates no threads. Instead, we integrate MHD's file descriptors directly into GLib's main loop using GIOChannel watches.

Key Components

1. SocketMonitor (new file: Server/SocketMonitor.vala)

Monitors MHD's listen socket and connection sockets, integrating them with GLib's main loop.

using Invercargill;
using Invercargill.DataStructures;

internal class SocketMonitor : Object {
    private MHD.Daemon daemon;
    private Dictionary<int, IOChannel> channels;
    private Dictionary<int, uint> watch_ids;
    private uint timeout_id;
    
    public SocketMonitor(MHD.Daemon daemon) {
        this.daemon = daemon;
        this.channels = new Dictionary<int, IOChannel>();
        this.watch_ids = new Dictionary<int, uint>();
    }
    
    public void start() {
        update_watches();
        start_timeout_watcher();
    }
    
    public void stop() {
        foreach (var id in watch_ids.get_values()) {
            Source.remove(id);
        }
        watch_ids.clear();
        channels.clear();
        
        if (timeout_id != 0) {
            Source.remove(timeout_id);
            timeout_id = 0;
        }
    }
    
    private void update_watches() {
        // Get current fd_set from MHD
        MHD.FDSet read_fds = MHD.FDSet();
        MHD.FDSet write_fds = MHD.FDSet();
        MHD.FDSet error_fds = MHD.FDSet();
        int max_fd = 0;
        
        MHD.get_fdset(daemon, read_fds, write_fds, error_fds, &max_fd);
        
        // Update GLib IO channel watches
        var active_fds = new Series<int>();
        for (int fd = 0; fd <= max_fd; fd++) {
            bool needs_read = read_fds.is_set(fd);
            bool needs_write = write_fds.is_set(fd);
            
            if (needs_read || needs_write) {
                active_fds.add(fd);
                
                if (!channels.has_key(fd)) {
                    add_watch_for_fd(fd, needs_read, needs_write);
                }
            }
        }
        
        // Remove watches for closed fds
        var to_remove = new Series<int>();
        foreach (var entry in channels) {
            if (!active_fds.contains(entry.key)) {
                to_remove.add(entry.key);
            }
        }
        foreach (int fd in to_remove) {
            remove_watch_for_fd(fd);
        }
    }
    
    private void add_watch_for_fd(int fd, bool monitor_read, bool monitor_write) {
        var channel = new IOChannel.unix_new(fd);
        IOCondition cond = IOCondition.IN | IOCondition.HUP | IOCondition.ERR;
        if (monitor_write) {
            cond |= IOCondition.OUT;
        }
        
        uint watch_id = channel.add_watch(cond, (source, condition) => {
            MHD.run(daemon);
            update_watches(); // Socket set may have changed
            return true;
        });
        
        channels[fd] = channel;
        watch_ids[fd] = watch_id;
    }
    
    private void remove_watch_for_fd(int fd) {
        uint watch_id = watch_ids[fd];
        Source.remove(watch_id);
        watch_ids.unset(fd);
        channels.unset(fd);
    }
    
    private void start_timeout_watcher() {
        // Use MHD's recommended timeout
        timeout_id = Timeout.add(50, () => {
            MHD.run(daemon);
            update_watches();
            return true;
        });
    }
}

2. ConnectionManager (existing, enhanced: Server/RequestContext.vala)

Already exists, but enhance to track connection state for debugging.

3. Server (modified: Server/Server.vala)

Main changes to use external select mode:

using MHD;
using Invercargill;
using Invercargill.DataStructures;

namespace Astralis {

    public class Server : Object {
        private Daemon daemon;
        private Pipeline pipeline;
        private int port;
        private Series<RequestContext> request_contexts;
        private Series<ResponseContext> response_contexts;
        private SocketMonitor? socket_monitor;

        public Server(int port, Pipeline pipeline) {
            this.port = port;
            this.pipeline = pipeline;
            this.request_contexts = new Series<RequestContext>();
            this.response_contexts = new Series<ResponseContext>();
        }

        public void run() {
            // External select mode: no USE_SELECT_INTERNALLY flag
            // This means MHD creates no threads - we drive it from GLib
            daemon = Daemon.start(
                MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
                (uint16) this.port,
                null,
                (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
                    return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
                },
                MHD.OPTION_NOTIFY_COMPLETED,
                (void*) request_completed_callback,
                null,
                MHD.OPTION_END
            );

            if (daemon == null) {
                error("Failed to start daemon");
            }

            // Start socket monitoring integrated with GLib main loop
            socket_monitor = new SocketMonitor(daemon);
            socket_monitor.start();

            // Run GLib main loop
            new MainLoop().run();
        }

        public void stop() {
            if (socket_monitor != null) {
                socket_monitor.stop();
            }
            // MHD_stop_daemon is called by Daemon's destructor
        }

        // ... rest of access_handler remains unchanged
    }
}

VAPI Bindings Required

Add to vapi/libmicrohttpd.vapi:

// FDSet structure for select
// Note: fd_set is a struct containing an array of long integers
// The actual size varies by platform but FD_SETSIZE is typically 1024
[CCode (cname = "fd_set", has_type_id = false, destroy_function = "", cprefix = "")]
public struct FDSet {
    // Internal storage - size for FD_SETSIZE=1024 on Linux
    [CCode (cname = "fds_bits", array_length = false)]
    private ulong bits[16];
    
    [CCode (cname = "FD_ZERO")]
    public void zero();
    
    [CCode (cname = "FD_SET")]
    public void set(int fd);
    
    [CCode (cname = "FD_CLR")]
    public void clr(int fd);
    
    [CCode (cname = "FD_ISSET")]
    public bool is_set(int fd);
}

// External select mode functions
[CCode (cname = "MHD_run")]
public Result run(Daemon daemon);

[CCode (cname = "MHD_get_fdset")]
public Result get_fdset(Daemon daemon, FDSet read_fds, FDSet write_fds, FDSet error_fds, int* max_fd);

[CCode (cname = "MHD_get_timeout")]
public Result get_timeout(Daemon daemon, uint64* timeout);

// Additional useful constants
[CCode (cname = "MHD_USE_EPOLL")]
public const uint USE_EPOLL;

[CCode (cname = "MHD_USE_POLL")]
public const uint USE_POLL;

[CCode (cname = "MHD_USE_AUTO")]
public const uint USE_AUTO;

[CCode (cname = "MHD_USE_ITC")]
public const uint USE_ITC;

Why This Solves All Requirements

1. Fixes Hung Connection Problem

  • MHD_resume_connection() is called from GLib main loop thread
  • MHD.run() is called from the same GLib main loop thread
  • No cross-thread communication needed - resume takes effect immediately

2. Keeps GLib Main Loop

  • All async/yield code continues to work unchanged
  • Pipeline handlers, SSE endpoints, etc. all run in GLib main loop
  • No changes needed outside Server/ directory

3. Efficient Under Heavy Load

  • Event-driven: only calls MHD.run() when sockets have activity
  • No polling or busy waits
  • GLib's IOChannel watches are highly optimized
  • Can add USE_EPOLL flag on Linux for O(1) socket multiplexing

4. No Busy Waits

  • Uses GIOChannel.add_watch() for edge-triggered notification
  • Timeout watcher only runs at MHD's recommended interval
  • MHD.get_timeout() can be used for dynamic timeout adjustment

Performance Optimizations

Optional: Dynamic Timeout

private void start_timeout_watcher() {
    schedule_next_timeout();
}

private void schedule_next_timeout() {
    uint64 timeout_ms = 0;
    if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
        // Cap at reasonable maximum
        timeout_ms = uint64.min(timeout_ms, 1000);
    } else {
        timeout_ms = 50; // Default fallback
    }
    
    timeout_id = Timeout.add((uint)timeout_ms, () => {
        MHD.run(daemon);
        update_watches();
        schedule_next_timeout(); // Reschedule with new timeout
        return false; // Don't auto-repeat
    });
}

Optional: epoll on Linux

For Linux systems with many connections:

daemon = Daemon.start(
    MHD.USE_EPOLL | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
    // ...
);

Note: USE_EPOLL with external select requires special handling - see MHD documentation.

File Structure

src/Server/
├── Server.vala           # Modified: external select mode
├── SocketMonitor.vala    # NEW: GLib IO integration
├── RequestContext.vala   # Unchanged
├── ResponseContext.vala  # Unchanged
├── ServerInput.vala      # Unchanged
└── ServerOutput.vala     # Unchanged

vapi/
└── libmicrohttpd.vapi    # Modified: add FDSet, run, get_fdset, get_timeout

Implementation Steps

  1. Add VAPI bindings for FDSet, MHD_run, MHD_get_fdset, MHD_get_timeout
  2. Create SocketMonitor.vala with GLib IO channel integration
  3. Modify Server.vala to use external select mode
  4. Update meson.build to include new file
  5. Test with SSE example under load

Testing Strategy

  1. Unit test: SocketMonitor correctly adds/removes watches
  2. Integration test: SSE connections don't hang under concurrent load
  3. Load test: Use ab or wrk to verify performance
  4. Reverse proxy test: Behind nginx with SSE connections

Rollback Plan

If issues arise, simply revert to USE_SELECT_INTERNALLY:

daemon = Daemon.start(
    MHD.USE_SELECT_INTERNALLY | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
    // ...
);

And remove SocketMonitor usage.

Future Enhancements

  1. Metrics: Track connection counts, suspend/resume frequency
  2. Dynamic polling: Switch between select/poll/epoll based on connection count
  3. Connection pooling: Reuse connections for keep-alive