When running Astralis apps behind a reverse proxy with SSE connections, requests sometimes hang until another request unblocks them. This is caused by cross-thread communication issues between:
select())MHD_resume_connection())The inter-thread communication (ITC) signal from MHD_resume_connection() doesn't reliably wake the blocked select loop.
Server/ directoryflowchart TB
subgraph GLibMainLoop[GLib Main Loop - Single Thread]
direction TB
A[MHD Socket Watch] -->|socket activity| B[MHD.run]
B -->|process| C[access_handler]
C -->|async| D[Pipeline.run.begin]
D -->|suspend| E[Connection Suspended]
F[Timeout Watch] -->|periodic| G[MHD.run]
G -->|check| H[Resume Suspended Connections]
I[Async Callback] -->|data ready| J[MHD.resume_connection]
J -->|same thread| K[Next MHD.run processes]
end
subgraph MHDIntegration[MHD Integration Layer]
L[SocketMonitor]
M[ConnectionManager]
N[TimeoutManager]
end
GLibMainLoop --> MHDIntegration
Use MHD's external select mode where MHD creates no threads. Instead, we integrate MHD's file descriptors directly into GLib's main loop using GIOChannel watches.
Server/SocketMonitor.vala)Monitors MHD's listen socket and connection sockets, integrating them with GLib's main loop.
using Invercargill;
using Invercargill.DataStructures;
internal class SocketMonitor : Object {
private MHD.Daemon daemon;
private Dictionary<int, IOChannel> channels;
private Dictionary<int, uint> watch_ids;
private uint timeout_id;
public SocketMonitor(MHD.Daemon daemon) {
this.daemon = daemon;
this.channels = new Dictionary<int, IOChannel>();
this.watch_ids = new Dictionary<int, uint>();
}
public void start() {
update_watches();
start_timeout_watcher();
}
public void stop() {
foreach (var id in watch_ids.get_values()) {
Source.remove(id);
}
watch_ids.clear();
channels.clear();
if (timeout_id != 0) {
Source.remove(timeout_id);
timeout_id = 0;
}
}
private void update_watches() {
// Get current fd_set from MHD
MHD.FDSet read_fds = MHD.FDSet();
MHD.FDSet write_fds = MHD.FDSet();
MHD.FDSet error_fds = MHD.FDSet();
int max_fd = 0;
MHD.get_fdset(daemon, read_fds, write_fds, error_fds, &max_fd);
// Update GLib IO channel watches
var active_fds = new Series<int>();
for (int fd = 0; fd <= max_fd; fd++) {
bool needs_read = read_fds.is_set(fd);
bool needs_write = write_fds.is_set(fd);
if (needs_read || needs_write) {
active_fds.add(fd);
if (!channels.has_key(fd)) {
add_watch_for_fd(fd, needs_read, needs_write);
}
}
}
// Remove watches for closed fds
var to_remove = new Series<int>();
foreach (var entry in channels) {
if (!active_fds.contains(entry.key)) {
to_remove.add(entry.key);
}
}
foreach (int fd in to_remove) {
remove_watch_for_fd(fd);
}
}
private void add_watch_for_fd(int fd, bool monitor_read, bool monitor_write) {
var channel = new IOChannel.unix_new(fd);
IOCondition cond = IOCondition.IN | IOCondition.HUP | IOCondition.ERR;
if (monitor_write) {
cond |= IOCondition.OUT;
}
uint watch_id = channel.add_watch(cond, (source, condition) => {
MHD.run(daemon);
update_watches(); // Socket set may have changed
return true;
});
channels[fd] = channel;
watch_ids[fd] = watch_id;
}
private void remove_watch_for_fd(int fd) {
uint watch_id = watch_ids[fd];
Source.remove(watch_id);
watch_ids.unset(fd);
channels.unset(fd);
}
private void start_timeout_watcher() {
// Use MHD's recommended timeout
timeout_id = Timeout.add(50, () => {
MHD.run(daemon);
update_watches();
return true;
});
}
}
Server/RequestContext.vala)Already exists, but enhance to track connection state for debugging.
Server/Server.vala)Main changes to use external select mode:
using MHD;
using Invercargill;
using Invercargill.DataStructures;
namespace Astralis {
public class Server : Object {
private Daemon daemon;
private Pipeline pipeline;
private int port;
private Series<RequestContext> request_contexts;
private Series<ResponseContext> response_contexts;
private SocketMonitor? socket_monitor;
public Server(int port, Pipeline pipeline) {
this.port = port;
this.pipeline = pipeline;
this.request_contexts = new Series<RequestContext>();
this.response_contexts = new Series<ResponseContext>();
}
public void run() {
// External select mode: no USE_SELECT_INTERNALLY flag
// This means MHD creates no threads - we drive it from GLib
daemon = Daemon.start(
MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
(uint16) this.port,
null,
(connection, url, method, version, upload_data, upload_data_size, con_cls) => {
return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
},
MHD.OPTION_NOTIFY_COMPLETED,
(void*) request_completed_callback,
null,
MHD.OPTION_END
);
if (daemon == null) {
error("Failed to start daemon");
}
// Start socket monitoring integrated with GLib main loop
socket_monitor = new SocketMonitor(daemon);
socket_monitor.start();
// Run GLib main loop
new MainLoop().run();
}
public void stop() {
if (socket_monitor != null) {
socket_monitor.stop();
}
// MHD_stop_daemon is called by Daemon's destructor
}
// ... rest of access_handler remains unchanged
}
}
Add to vapi/libmicrohttpd.vapi:
// FDSet structure for select
// Note: fd_set is a struct containing an array of long integers
// The actual size varies by platform but FD_SETSIZE is typically 1024
[CCode (cname = "fd_set", has_type_id = false, destroy_function = "", cprefix = "")]
public struct FDSet {
// Internal storage - size for FD_SETSIZE=1024 on Linux
[CCode (cname = "fds_bits", array_length = false)]
private ulong bits[16];
[CCode (cname = "FD_ZERO")]
public void zero();
[CCode (cname = "FD_SET")]
public void set(int fd);
[CCode (cname = "FD_CLR")]
public void clr(int fd);
[CCode (cname = "FD_ISSET")]
public bool is_set(int fd);
}
// External select mode functions
[CCode (cname = "MHD_run")]
public Result run(Daemon daemon);
[CCode (cname = "MHD_get_fdset")]
public Result get_fdset(Daemon daemon, FDSet read_fds, FDSet write_fds, FDSet error_fds, int* max_fd);
[CCode (cname = "MHD_get_timeout")]
public Result get_timeout(Daemon daemon, uint64* timeout);
// Additional useful constants
[CCode (cname = "MHD_USE_EPOLL")]
public const uint USE_EPOLL;
[CCode (cname = "MHD_USE_POLL")]
public const uint USE_POLL;
[CCode (cname = "MHD_USE_AUTO")]
public const uint USE_AUTO;
[CCode (cname = "MHD_USE_ITC")]
public const uint USE_ITC;
MHD_resume_connection() is called from GLib main loop threadMHD.run() is called from the same GLib main loop threadServer/ directoryMHD.run() when sockets have activityUSE_EPOLL flag on Linux for O(1) socket multiplexingGIOChannel.add_watch() for edge-triggered notificationprivate void start_timeout_watcher() {
schedule_next_timeout();
}
private void schedule_next_timeout() {
uint64 timeout_ms = 0;
if (MHD.get_timeout(daemon, &timeout_ms) == MHD.Result.YES) {
// Cap at reasonable maximum
timeout_ms = uint64.min(timeout_ms, 1000);
} else {
timeout_ms = 50; // Default fallback
}
timeout_id = Timeout.add((uint)timeout_ms, () => {
MHD.run(daemon);
update_watches();
schedule_next_timeout(); // Reschedule with new timeout
return false; // Don't auto-repeat
});
}
For Linux systems with many connections:
daemon = Daemon.start(
MHD.USE_EPOLL | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
// ...
);
Note: USE_EPOLL with external select requires special handling - see MHD documentation.
src/Server/
├── Server.vala # Modified: external select mode
├── SocketMonitor.vala # NEW: GLib IO integration
├── RequestContext.vala # Unchanged
├── ResponseContext.vala # Unchanged
├── ServerInput.vala # Unchanged
└── ServerOutput.vala # Unchanged
vapi/
└── libmicrohttpd.vapi # Modified: add FDSet, run, get_fdset, get_timeout
FDSet, MHD_run, MHD_get_fdset, MHD_get_timeoutab or wrk to verify performanceIf issues arise, simply revert to USE_SELECT_INTERNALLY:
daemon = Daemon.start(
MHD.USE_SELECT_INTERNALLY | MHD.ALLOW_SUSPEND_RESUME | MHD.USE_DEBUG,
// ...
);
And remove SocketMonitor usage.