Forráskód Böngészése

feat(server): add SSE support with proper connection lifecycle handling

Add Server-Sent Events (SSE) endpoint infrastructure with robust client
disconnect detection. This introduces connection state tracking across
the AsyncOutput hierarchy and integrates with libmicrohttpd's request
completion callback to detect abnormal client disconnections.

Key changes:
- Add SseEndpoint class for SSE streaming with broadcast support
- Add SSE example demonstrating real-time event broadcasting
- Add 'connected' and 'write_would_block' properties to AsyncOutput interface
- Implement connection tracking in all compressor wrappers (brotli, gzip, zstd)
- Add request completion callback to detect client disconnections
- Fix ResponseContext to resume (not suspend) connection when data available
- Fix ServerOutput read_chunk to properly track consumed bytes
Billy Barrow 1 hete
szülő
commit
557e7d4c09

+ 182 - 0
examples/SseExample.vala

@@ -0,0 +1,182 @@
+using Astralis;
+using Invercargill;
+using Invercargill.DataStructures;
+
+/// ClockEndpoint is a singleton SSE endpoint that broadcasts the current time
+/// to all connected clients every second.
+/// 
+/// It demonstrates:
+/// - Singleton pattern for SSE endpoints (shared state across connections)
+/// - Implementing retry_interval property
+/// - Using new_connection for initial welcome message
+/// - Public method to broadcast events
+public class ClockEndpoint : SseEndpoint {
+
+    private int connection_counter = 0;
+    private Mutex counter_mutex = Mutex();
+
+    /// Retry interval: clients should wait 3 seconds before reconnecting
+    public override uint retry_interval { get { return 3000; } }
+
+    /// Called when a new client connects - send welcome message
+    public override async void new_connection(SseStream stream) {
+        // Assign a unique connection ID
+        int connection_id;
+        counter_mutex.lock();
+        connection_id = ++connection_counter;
+        counter_mutex.unlock();
+
+        print(@"SSE client connected (connection #$connection_id, total: $(get_open_streams().length))\n");
+
+        // Send welcome message
+        try {
+            yield stream.send_event(new SseEvent.with_type("connected", @"You are connection #$connection_id"));
+        } catch (Error e) {
+            print(@"Failed to send welcome: $(e.message)\n");
+        }
+
+        // Listen for disconnection
+        stream.disconnected.connect(() => {
+            print(@"SSE client disconnected (connection #$connection_id)\n");
+        });
+    }
+
+    /// Public method to broadcast the current time to all connected clients.
+    /// This can be called from anywhere (e.g., from another endpoint or service).
+    public async void broadcast_time() {
+        var now = new DateTime.now_local();
+        var time_str = now.format("%H:%M:%S");
+        var date_str = now.format("%Y-%m-%d");
+
+        // Create event with current time
+        var time_event = new SseEvent.with_type("time", time_str);
+        
+        // Also send a JSON-formatted message
+        var json_data = @"{\"time\":\"$time_str\",\"date\":\"$date_str\"}";
+        var json_event = new SseEvent.with_type("datetime", json_data);
+
+        // Broadcast to all connected clients
+        yield broadcast_event(time_event);
+        yield broadcast_event(json_event);
+    }
+
+    /// Start the broadcast loop. Called externally after construction.
+    public void start_broadcast_loop() {
+        broadcast_loop_iteration.begin();
+    }
+
+    private async void broadcast_loop_iteration() {
+        // Broadcast current time
+        yield broadcast_time();
+
+        // Schedule next iteration in 1 second
+        Timeout.add(1000, () => {
+            broadcast_loop_iteration.begin();
+            return false; // Don't repeat, we'll reschedule manually
+        });
+    }
+}
+
+/// IndexEndpoint serves a simple HTML page that connects to the SSE endpoints
+class IndexEndpoint : Object, Endpoint {
+    public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
+        var html = """
+<!DOCTYPE html>
+<html>
+<head>
+    <title>Astralis SSE Example</title>
+    <style>
+        body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
+        .section { margin: 20px 0; padding: 20px; border: 1px solid #ccc; border-radius: 8px; }
+        h1 { color: #333; }
+        h2 { color: #666; margin-top: 0; }
+        #clock { font-size: 3em; font-family: monospace; text-align: center; }
+        .status { padding: 5px 10px; border-radius: 4px; margin: 10px 0; }
+        .status.connected { background: #d4edda; color: #155724; }
+        .status.disconnected { background: #f8d7da; color: #721c24; }
+        #events { background: #f5f5f5; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; font-size: 0.9em; }
+        .event { margin: 2px 0; padding: 2px 5px; border-bottom: 1px solid #ddd; }
+        .event-time { color: #999; }
+        .event-type { color: #0066cc; font-weight: bold; }
+    </style>
+</head>
+<body>
+    <h1>Astralis SSE Example</h1>
+    
+    <div class="section">
+        <h2>Live Clock (Broadcast)</h2>
+        <div id="clock-status" class="status disconnected">Disconnected</div>
+        <div id="clock">--:--:--</div>
+    </div>
+    
+    <div class="section">
+        <h2>Event Log</h2>
+        <div id="events"></div>
+    </div>
+    
+    <script>
+        function logEvent(type, data) {
+            const eventsDiv = document.getElementById('events');
+            const time = new Date().toLocaleTimeString();
+            const eventDiv = document.createElement('div');
+            eventDiv.className = 'event';
+            eventDiv.innerHTML = '<span class="event-time">' + time + '</span> <span class="event-type">[' + type + ']</span> ' + data;
+            eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
+        }
+        
+        // Clock SSE connection
+        const clockSource = new EventSource('/clock-stream');
+        const clockStatus = document.getElementById('clock-status');
+        
+        clockSource.onopen = function() {
+            clockStatus.textContent = 'Connected';
+            clockStatus.className = 'status connected';
+        };
+        
+        clockSource.onerror = function() {
+            clockStatus.textContent = 'Disconnected (reconnecting...)';
+            clockStatus.className = 'status disconnected';
+        };
+        
+        clockSource.addEventListener('time', function(e) {
+            document.getElementById('clock').textContent = e.data;
+        });
+        
+        clockSource.addEventListener('datetime', function(e) {
+            logEvent('datetime', e.data);
+        });
+        
+        clockSource.addEventListener('connected', function(e) {
+            logEvent('connected', e.data);
+        });
+    </script>
+</body>
+</html>
+""";
+        return new HttpStringResult(html)
+            .set_header("Content-Type", "text/html");
+    }
+}
+
+void main() {
+    var application = new WebApplication(8080);
+    
+    // Register SSE endpoint as a singleton with an explicit factory
+    // This ensures the constructor logic runs
+    application.add_singleton_endpoint<ClockEndpoint>(
+        new EndpointRoute("/clock-stream"),
+        () => {
+            var endpoint = new ClockEndpoint();
+            endpoint.start_broadcast_loop();
+            return endpoint;
+        }
+    );
+    
+    // Register the index page
+    application.add_endpoint<IndexEndpoint>(new EndpointRoute("/"));
+    
+    print("SSE Example server running on http://localhost:8080\n");
+    print("Open http://localhost:8080 in your browser to see SSE in action\n");
+    
+    application.run();
+}

+ 7 - 0
examples/meson.build

@@ -102,3 +102,10 @@ executable('htmx-example',
     dependencies: [astralis_dep, invercargill_dep],
     install: false
 )
+
+# SSE Example - demonstrates Server-Sent Events with broadcasting
+executable('sse-example',
+    'SseExample.vala',
+    dependencies: [astralis_dep, invercargill_dep],
+    install: false
+)

+ 6 - 0
src/Components/BrotliCompressor.vala

@@ -133,6 +133,9 @@ namespace Astralis {
             private bool finished = false;
             private uint8[] output_buffer;
 
+            public bool connected { get { return downstream.connected; } }
+            public bool write_would_block { get { return downstream.write_would_block; } }
+
             public BrotliAsyncOutput(AsyncOutput downstream, int quality, Brotli.EncoderMode mode) throws Error {
                 this.downstream = downstream;
 
@@ -149,6 +152,9 @@ namespace Astralis {
             }
 
             public async void write(BinaryData data) throws Error {
+                if (!connected) {
+                    throw new IOError.CLOSED("Cannot write to closed output stream");
+                }
                 if (finished) {
                     throw new IOError.FAILED("Cannot write to finished brotli stream");
                 }

+ 6 - 0
src/Components/GzipCompressor.vala

@@ -81,6 +81,9 @@ namespace Astralis {
             private bool finished = false;
             private uint8[] output_buffer;
             
+            public bool connected { get { return downstream.connected; } }
+            public bool write_would_block { get { return downstream.write_would_block; } }
+            
             public GzipAsyncOutput(AsyncOutput downstream, int compression_level) throws Error {
                 this.downstream = downstream;
                 
@@ -97,6 +100,9 @@ namespace Astralis {
             }
 
             public async void write(BinaryData data) throws Error {
+                if (!connected) {
+                    throw new IOError.CLOSED("Cannot write to closed output stream");
+                }
                 if (finished) {
                     throw new IOError.FAILED("Cannot write to finished gzip stream");
                 }

+ 6 - 0
src/Components/ZstdCompressor.vala

@@ -80,6 +80,9 @@ namespace Astralis {
             private bool finished = false;
             private uint8[] output_buffer;
             
+            public bool connected { get { return downstream.connected; } }
+            public bool write_would_block { get { return downstream.write_would_block; } }
+            
             public ZstdAsyncOutput(AsyncOutput downstream, int compression_level) throws Error {
                 this.downstream = downstream;
                 
@@ -97,6 +100,9 @@ namespace Astralis {
             }
 
             public async void write(BinaryData data) throws Error {
+                if (!connected) {
+                    throw new IOError.CLOSED("Cannot write to closed output stream");
+                }
                 if (finished) {
                     throw new IOError.FAILED("Cannot write to finished zstd stream");
                 }

+ 20 - 0
src/Core/AsyncOutput.vala

@@ -5,7 +5,20 @@ namespace Astralis {
 
     public interface AsyncOutput : Object {
 
+        /// Indicates whether the output stream is still connected.
+        /// When false, calls to write() will throw IOError.CLOSED.
+        public abstract bool connected { get; }
+
+        /// Indicates whether a write would block due to buffer full.
+        /// Buffered implementations return true when buffer is full.
+        /// Passthrough implementations delegate to underlying output.
+        /// Memory buffer implementations always return false.
+        public abstract bool write_would_block { get; }
+
+        /// Write data to the output stream.
+        /// Throws IOError.CLOSED if the connection is closed.
         public abstract async void write(BinaryData data) throws Error;
+
         public virtual async void write_stream(InputStream stream) throws Error {
             uint8[] chunk = new uint8[8192];
             while (true) {
@@ -22,8 +35,15 @@ namespace Astralis {
     /// An AsyncOutput that buffers all written data in memory
     public class BufferAsyncOutput : Object, AsyncOutput {
         private ByteArray buffer = new ByteArray();
+        private bool _connected = true;
+
+        public bool connected { get { return _connected; } }
+        public bool write_would_block { get { return false; } }  // Never blocks - unlimited memory buffer
 
         public async void write(BinaryData data) throws Error {
+            if (!_connected) {
+                throw new IOError.CLOSED("Cannot write to closed output stream");
+            }
             uint8[] bytes = data.to_array();
             buffer.append(bytes);
         }

+ 291 - 0
src/Endpoints/SseEndpoint.vala

@@ -0,0 +1,291 @@
+using Invercargill;
+using Invercargill.DataStructures;
+
+namespace Astralis {
+
+    /// Represents a single Server-Sent Events stream connection.
+    /// 
+    /// SseStream provides the interface for sending events to a specific client
+    /// connection. Each connected client gets its own SseStream instance that
+    /// can be used to send targeted events.
+    public class SseStream : Object {
+
+        private AsyncOutput output;
+        private bool _is_closed = false;
+        private Mutex close_mutex = Mutex();
+
+        /// Signal emitted when the client disconnects.
+        public signal void disconnected();
+
+        /// Indicates whether this stream has been closed.
+        public bool is_closed { 
+            get {
+                close_mutex.lock();
+                var result = _is_closed;
+                close_mutex.unlock();
+                return result;
+            }
+        }
+
+        internal SseStream(AsyncOutput output) {
+            this.output = output;
+        }
+
+        /// Send an SSE event to this stream.
+        /// 
+        /// @param event The SSE event to send
+        /// @throws Error if the stream is closed or writing fails
+        public async void send_event(SseEvent event) throws Error {
+            if (is_closed) {
+                throw new IOError.CLOSED("Cannot send to closed SSE stream");
+            }
+            if (!output.connected) {
+                close();
+                throw new IOError.CLOSED("Cannot send to closed SSE stream");
+            }
+            // Skip writing if the buffer is full (client is slow)
+            if (output.write_would_block) {
+                return;
+            }
+            var formatted = event.format();
+            yield output.write(new ByteBuffer.from_byte_array(formatted.data));
+        }
+
+        /// Close this stream. After closing, no more data can be sent.
+        public void close() {
+            close_mutex.lock();
+            var was_closed = _is_closed;
+            _is_closed = true;
+            close_mutex.unlock();
+            
+            if (!was_closed) {
+                disconnected();
+            }
+        }
+    }
+
+    /// Represents a Server-Sent Events message.
+    /// 
+    /// SseEvent follows the W3C SSE specification with support for
+    /// id, event type, data, and retry fields.
+    public class SseEvent : Object {
+
+        /// Optional event ID for reconnection tracking
+        public string? id { get; set; }
+
+        /// Optional event type for client-side event listeners
+        public string? event_type { get; set; }
+
+        /// The event data payload
+        public string data { get; set; }
+
+        /// Optional retry interval in milliseconds for reconnection
+        public int64? retry { get; set; }
+
+        /// Create a basic SSE event with just data.
+        public SseEvent(string data) {
+            this.data = data;
+        }
+
+        /// Create an SSE event with an ID and data.
+        public SseEvent.with_id(string id, string data) {
+            this.id = id;
+            this.data = data;
+        }
+
+        /// Create an SSE event with a type and data.
+        public SseEvent.with_type(string event_type, string data) {
+            this.event_type = event_type;
+            this.data = data;
+        }
+
+        /// Create a fully-specified SSE event.
+        public SseEvent.full(string? id, string? event_type, string data, int64? retry = null) {
+            this.id = id;
+            this.event_type = event_type;
+            this.data = data;
+            this.retry = retry;
+        }
+
+        /// Format this event according to the W3C SSE specification.
+        /// 
+        /// @return A properly formatted SSE string ready to be sent
+        public string format() {
+            var builder = new StringBuilder();
+
+            if (id != null) {
+                builder.append(@"id: $id\n");
+            }
+            if (event_type != null) {
+                builder.append(@"event: $event_type\n");
+            }
+            // Handle multi-line data by prefixing each line with "data: "
+            var lines = data.split("\n");
+            foreach (var line in lines) {
+                builder.append(@"data: $line\n");
+            }
+            if (retry != null) {
+                builder.append(@"retry: $retry\n");
+            }
+            builder.append("\n");
+
+            return builder.str;
+        }
+    }
+
+    /// HTTP result type for Server-Sent Events responses.
+    ///
+    /// HttpSseResult sets the appropriate headers for SSE (Content-Type: text/event-stream,
+    /// Cache-Control: no-cache) and manages the SSE connection lifecycle.
+    public class HttpSseResult : HttpResult {
+
+        private SseEndpoint endpoint;
+
+        /// Create an SSE result for the given endpoint.
+        internal HttpSseResult(SseEndpoint endpoint) {
+            base(StatusCode.OK);
+            this.endpoint = endpoint;
+
+            // Set required SSE headers
+            set_header("Content-Type", "text/event-stream");
+            set_header("Cache-Control", "no-cache");
+            set_header("Connection", "keep-alive");
+
+            // SSE should not be compressed for real-time delivery
+            set_flag(HttpResultFlag.DO_NOT_COMPRESS);
+        }
+
+        public override async void send_body(AsyncOutput output) throws Error {
+            var stream = new SseStream(output);
+
+            // Send retry interval if specified
+            var retry = endpoint.retry_interval;
+            if (retry > 0) {
+                var retry_event = new SseEvent.full(null, null, "", retry);
+                yield stream.send_event(retry_event);
+            }
+
+            // Register the stream with the endpoint
+            endpoint.register_stream(stream);
+
+            // Notify the endpoint that a new connection was established
+            endpoint.notify_new_connection(stream);
+
+            // Keep the connection alive until the stream is closed
+            while (!stream.is_closed) {
+                // Use a longer sleep interval to avoid busy-waiting
+                // The stream will be closed by the server or by error
+                Timeout.add(100, send_body.callback);
+                yield;
+            }
+
+            // Always unregister when done
+            endpoint.unregister_stream(stream);
+        }
+    }
+
+    /// Abstract base class for Server-Sent Events endpoints.
+    /// 
+    /// SseEndpoint is designed to be registered as a singleton in the IoC container
+    /// and provides:
+    /// - Management of all open SSE streams
+    /// - Protected access to view all open streams
+    /// - Protected method to broadcast events to all streams
+    /// 
+    /// The connection lifecycle is managed internally - implementers only need to
+    /// override `retry_interval` and optionally `new_connection` to handle new clients.
+    /// 
+    /// Example usage:
+    /// ```vala
+    /// public class NewsFeedEndpoint : SseEndpoint {
+    ///     public override uint retry_interval { get { return 3000; } }
+    ///     
+    ///     // Called when each client connects
+    ///     public override async void new_connection(SseStream stream) {
+    ///         yield stream.send_event(new SseEvent.with_type("welcome", "Connected to news feed"));
+    ///     }
+    ///     
+    ///     // Call this to broadcast to all clients
+    ///     public async void broadcast_news(string news) {
+    ///         yield broadcast_event(new SseEvent.with_type("news", news));
+    ///     }
+    /// }
+    /// ```
+    public abstract class SseEndpoint : Object, Endpoint {
+
+        private Series<SseStream> open_streams = new Series<SseStream>();
+        private Mutex streams_mutex = Mutex();
+
+        /// The retry interval in milliseconds sent to clients for reconnection.
+        /// 
+        /// Override this property to specify how long clients should wait before
+        /// attempting to reconnect if the connection is lost.
+        public abstract uint retry_interval { get; }
+
+        /// Handle the HTTP request and return an SSE result.
+        /// 
+        /// This method is sealed and cannot be overridden. It always returns
+        /// an HttpSseResult which manages the SSE connection lifecycle.
+        public sealed async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
+            return new HttpSseResult(this);
+        }
+
+        /// Called when a new client connects.
+        /// 
+        /// Override this method to send initial data to the client or set up
+        /// per-client state. The connection will remain open until the client
+        /// disconnects or the stream is explicitly closed.
+        /// 
+        /// @param stream The SSE stream for this client connection
+        public virtual async void new_connection(SseStream stream) {
+            // Default implementation does nothing
+        }
+
+        /// Get a read-only view of all currently open streams.
+        ///
+        /// This allows inheritors to iterate over streams for targeted
+        /// messaging or monitoring purposes.
+        ///
+        /// @return An immutable collection of open streams
+        protected ImmutableLot<SseStream> get_open_streams() {
+            streams_mutex.lock();
+            var result = open_streams.where(s => !s.is_closed).to_immutable_buffer();
+            streams_mutex.unlock();
+            return result;
+        }
+
+        /// Send an event to all currently open streams.
+        /// 
+        /// This method will attempt to send to all streams even if some fail.
+        /// Streams that fail to receive will be closed.
+        /// 
+        /// @param event The SSE event to broadcast
+        protected async void broadcast_event(SseEvent event) {
+            var streams = get_open_streams();
+            foreach (var stream in streams) {
+                try {
+                    yield stream.send_event(event);
+                } catch (Error e) {
+                    stream.close();
+                }
+            }
+        }
+
+        internal void register_stream(SseStream stream) {
+            streams_mutex.lock();
+            open_streams.add(stream);
+            streams_mutex.unlock();
+        }
+
+        internal void unregister_stream(SseStream stream) {
+            streams_mutex.lock();
+            open_streams.remove(stream);
+            streams_mutex.unlock();
+        }
+
+        internal void notify_new_connection(SseStream stream) {
+            new_connection.begin(stream);
+        }
+    }
+
+}

+ 1 - 0
src/Server/RequestContext.vala

@@ -8,6 +8,7 @@ namespace Astralis {
         public HttpResult? handler_result { get; set; }
         public Error? handler_error { get; set; }
         public bool handler_started { get; set; }
+        public ResponseContext? response_context { get; set; }
 
         private Mutex request_lock;
         private bool request_fully_received;

+ 2 - 2
src/Server/ResponseContext.vala

@@ -35,7 +35,7 @@ namespace Astralis {
         private void new_data_available() {
             lock(connection_suspended) {
                 if(connection_suspended) {
-                    MHD.suspend_connection(connection);
+                    MHD.resume_connection(connection);
                     connection_suspended = false;
                 }
             }
@@ -51,4 +51,4 @@ namespace Astralis {
 
     }
 
-}
+}

+ 31 - 6
src/Server/Server.vala

@@ -107,7 +107,10 @@ namespace Astralis {
                 (connection, url, method, version, upload_data, upload_data_size, con_cls) => {
                     // Trampoline to instance method if needed, or just use lambda capturing 'this'
                     return this.access_handler(connection, url, method, version, upload_data, upload_data_size, con_cls);
-                }, 
+                },
+                MHD.OPTION_NOTIFY_COMPLETED,
+                (void*) request_completed_callback,
+                null,
                 MHD.OPTION_END
             );
 
@@ -121,21 +124,26 @@ namespace Astralis {
         private Result respond(Connection connection, RequestContext context) {
             var result = Result.NO;
             if(context.handler_error != null) {
-                result = handle_error(context.handler_error, connection);
+                result = handle_error(context.handler_error, connection, context);
             }
             else if(context.handler_result != null) {
-                result = send_result(connection, context.handler_result);
+                result = send_result(connection, context.handler_result, context);
             }
             request_contexts.remove(context);
             return result;
         }
 
 
-        private Result send_result(Connection connection, HttpResult result) {
+        private Result send_result(Connection connection, HttpResult result, RequestContext? request_context = null) {
             uint64 size = result.content_length ?? -1;
             var response_context = new ResponseContext(this, connection, result);
             response_contexts.add(response_context);
 
+            // Link the ResponseContext to the RequestContext for disconnect notification
+            if (request_context != null) {
+                request_context.response_context = response_context;
+            }
+
             var response = new Response.from_callback(
                 size,
                 1048576,
@@ -174,9 +182,26 @@ namespace Astralis {
             return res;
         }
 
-        private Result handle_error(Error error, Connection connection) {
+        private Result handle_error(Error error, Connection connection, RequestContext? request_context = null) {
             printerr(@"Astralis Internal Server Error: Unhandled Error: $(error.message)\n");
-            return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR));
+            return send_result(connection, new HttpStringResult("Internal Server Error", StatusCode.INTERNAL_SERVER_ERROR), request_context);
+        }
+
+        // Static callback wrapper for MHD_OPTION_NOTIFY_COMPLETED
+        private static void request_completed_callback(void* cls, Connection connection, void** con_cls, RequestTerminationCode toe) {
+            // We don't have access to the Server instance here, so we handle cleanup directly
+            if (con_cls[0] == null) {
+                return;
+            }
+            
+            RequestContext context = (RequestContext) ((Object*) con_cls[0]);
+            
+            // If the request was terminated abnormally, disconnect the output stream
+            if (toe != RequestTerminationCode.COMPLETED_OK) {
+                if (context.response_context != null) {
+                    context.response_context.body_output.close_connection();
+                }
+            }
         }
 
         // Simple memory-based InputStream for request body

+ 25 - 3
src/Server/ServerOutput.vala

@@ -8,10 +8,28 @@ namespace Astralis {
         private BinaryData current_chunk = null;
         private Series<ByteBuffer> chunks = new Series<ByteBuffer>();
         internal signal void on_new_chunk();
+        internal signal void on_chunk_poped();
+        private delegate void Handler();
+        private bool _connected = true;
 
-        public async void write (Invercargill.BinaryData data) {
+        public bool connected { get { return _connected; } }
+        public bool write_would_block { get { return chunks.length > MAX_CHUNKS; } }
+
+        internal void close_connection() {
+            _connected = false;
+        }
+
+        public async void write (Invercargill.BinaryData data) throws Error {
+            if (!_connected) {
+                throw new IOError.CLOSED("Cannot write to disconnected client");
+            }
             while(chunks.length > MAX_CHUNKS) {
-                Idle.add(write.callback);
+                Handler pop_handler = null;
+                pop_handler = () => {
+                    on_chunk_poped.disconnect(pop_handler);
+                    Idle.add(write.callback);
+                };
+                on_chunk_poped.connect(pop_handler);
                 yield;
             }
             var buffer = data.to_byte_buffer();
@@ -22,14 +40,18 @@ namespace Astralis {
         }
 
         internal size_t read_chunk(void* buffer, size_t max_size) {
+            if(current_chunk != null && current_chunk.peek_count() == 0) {
+                current_chunk = null;
+            }
             if(current_chunk == null && chunks.length == 0) {
                 return 0;
             }
             if(current_chunk == null) {
                 current_chunk = chunks.pop_start ();
+                on_chunk_poped();
             }
             var size = current_chunk.write_to (buffer, max_size);
-            current_chunk = current_chunk.skip((uint)max_size);
+            current_chunk = current_chunk.skip((uint)size);
             return size;
         }
 

+ 1 - 0
src/meson.build

@@ -15,6 +15,7 @@ sources = files(
     'Endpoints/Endpoint.vala',
     'Endpoints/FilesystemResource.vala',
     'Endpoints/FastResource.vala',
+    'Endpoints/SseEndpoint.vala',
     'Server/Server.vala',
     'Server/RequestContext.vala',
     'Server/ResponseContext.vala',

+ 5 - 0
vapi/libmicrohttpd.vapi

@@ -43,6 +43,8 @@ namespace MHD {
 
     [CCode (cname = "MHD_OPTION_END")]
     public const int OPTION_END;
+    [CCode (cname = "MHD_OPTION_NOTIFY_COMPLETED")]
+    public const int OPTION_NOTIFY_COMPLETED;
 
     [SimpleType]
     [CCode (cname = "struct MHD_Connection*")]
@@ -107,6 +109,9 @@ namespace MHD {
     [CCode (has_target = false)]
     public delegate Result KeyValueIterator (void* cls, ValueKind kind, string key, string? value);
 
+    [CCode (has_target = false, cname = "MHD_RequestCompletedCallback")]
+    public delegate void RequestCompletedCallback (void* cls, Connection connection, void** con_cls, RequestTerminationCode toe);
+
     [CCode (cname = "MHD_get_connection_values")]
     public int get_connection_values (Connection connection, ValueKind kind, KeyValueIterator? iterator, void* iterator_cls = null);