Ver Fonte

feat(sse): pass http and route context to new_connection callback

Update SseEndpoint.new_connection to receive HttpContext and RouteContext
parameters, enabling SSE endpoints to access request information and route
parameters when handling new connections.

Also add CounterEndpoint example demonstrating stateful SSE with HTTP-triggered
broadcasts, showing how HTTP endpoints can modify shared state and broadcast
changes to all connected SSE clients.

BREAKING CHANGE: SseEndpoint.new_connection signature changed from
(SseStream stream) to (HttpContext http_context, RouteContext route_context,
SseStream stream)
Billy Barrow há 1 semana atrás
pai
commit
60cdf7931a
2 ficheiros alterados com 272 adições e 76 exclusões
  1. 262 70
      examples/SseExample.vala
  2. 10 6
      src/Endpoints/SseEndpoint.vala

+ 262 - 70
examples/SseExample.vala

@@ -2,6 +2,76 @@ using Astralis;
 using Invercargill;
 using Invercargill.DataStructures;
 
+/// CounterEndpoint is a singleton SSE endpoint that broadcasts counter changes
+/// to all connected clients whenever the counter is modified.
+/// 
+/// It demonstrates:
+/// - Singleton pattern for SSE endpoints (shared state across connections)
+/// - Broadcasting state changes triggered by HTTP requests
+/// - Using new_connection to send current state to new clients
+/// - Public methods to modify state and broadcast changes
+public class CounterEndpoint : SseEndpoint {
+
+    private int counter = 0;
+    private Mutex counter_mutex = Mutex();
+
+    /// Retry interval: clients should wait 2 seconds before reconnecting
+    public override uint retry_interval { get { return 2000; } }
+
+    /// Called when a new client connects - send current counter value
+    public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
+        print(@"Counter SSE client connected (total: $(get_open_streams().length))\n");
+
+        // Send current counter value to new client
+        try {
+            counter_mutex.lock();
+            int current_value = counter;
+            counter_mutex.unlock();
+
+            yield stream.send_event(new SseEvent.with_type("counter", current_value.to_string()));
+        } catch (Error e) {
+            print(@"Failed to send counter value: $(e.message)\n");
+        }
+
+        // Listen for disconnection
+        stream.disconnected.connect(() => {
+            print(@"Counter SSE client disconnected\n");
+        });
+    }
+
+    /// Increment the counter and broadcast the new value
+    public async void increment() {
+        counter_mutex.lock();
+        counter++;
+        int new_value = counter;
+        counter_mutex.unlock();
+
+        print(@"Counter incremented to: $new_value\n");
+        yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
+    }
+
+    /// Decrement the counter and broadcast the new value
+    public async void decrement() {
+        counter_mutex.lock();
+        counter--;
+        int new_value = counter;
+        counter_mutex.unlock();
+
+        print(@"Counter decremented to: $new_value\n");
+        yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
+    }
+
+    /// Reset the counter to zero and broadcast
+    public async void reset() {
+        counter_mutex.lock();
+        counter = 0;
+        counter_mutex.unlock();
+
+        print(@"Counter reset to: 0\n");
+        yield broadcast_event(new SseEvent.with_type("counter", "0"));
+    }
+}
+
 /// ClockEndpoint is a singleton SSE endpoint that broadcasts the current time
 /// to all connected clients every second.
 /// 
@@ -19,7 +89,7 @@ public class ClockEndpoint : SseEndpoint {
     public override uint retry_interval { get { return 3000; } }
 
     /// Called when a new client connects - send welcome message
-    public override async void new_connection(SseStream stream) {
+    public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
         // Assign a unique connection ID
         int connection_id;
         counter_mutex.lock();
@@ -77,82 +147,177 @@ public class ClockEndpoint : SseEndpoint {
     }
 }
 
+/// CounterIncrementEndpoint handles HTTP POST requests to increment the counter
+class CounterIncrementEndpoint : Object, Endpoint {
+    private CounterEndpoint counter_endpoint;
+
+    public CounterIncrementEndpoint(CounterEndpoint counter_endpoint) {
+        this.counter_endpoint = counter_endpoint;
+    }
+
+    public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
+        counter_endpoint.increment.begin();
+        return new HttpStringResult("{\"status\":\"ok\"}")
+            .set_header("Content-Type", "application/json");
+    }
+}
+
+/// CounterDecrementEndpoint handles HTTP POST requests to decrement the counter
+class CounterDecrementEndpoint : Object, Endpoint {
+    private CounterEndpoint counter_endpoint;
+
+    public CounterDecrementEndpoint(CounterEndpoint counter_endpoint) {
+        this.counter_endpoint = counter_endpoint;
+    }
+
+    public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
+        counter_endpoint.decrement.begin();
+        return new HttpStringResult("{\"status\":\"ok\"}")
+            .set_header("Content-Type", "application/json");
+    }
+}
+
+/// CounterResetEndpoint handles HTTP POST requests to reset the counter
+class CounterResetEndpoint : Object, Endpoint {
+    private CounterEndpoint counter_endpoint;
+
+    public CounterResetEndpoint(CounterEndpoint counter_endpoint) {
+        this.counter_endpoint = counter_endpoint;
+    }
+
+    public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
+        counter_endpoint.reset.begin();
+        return new HttpStringResult("{\"status\":\"ok\"}")
+            .set_header("Content-Type", "application/json");
+    }
+}
+
 /// IndexEndpoint serves a simple HTML page that connects to the SSE endpoints
 class IndexEndpoint : Object, Endpoint {
     public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
         var html = """
-<!DOCTYPE html>
-<html>
-<head>
-    <title>Astralis SSE Example</title>
-    <style>
-        body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
-        .section { margin: 20px 0; padding: 20px; border: 1px solid #ccc; border-radius: 8px; }
-        h1 { color: #333; }
-        h2 { color: #666; margin-top: 0; }
-        #clock { font-size: 3em; font-family: monospace; text-align: center; }
-        .status { padding: 5px 10px; border-radius: 4px; margin: 10px 0; }
-        .status.connected { background: #d4edda; color: #155724; }
-        .status.disconnected { background: #f8d7da; color: #721c24; }
-        #events { background: #f5f5f5; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; font-size: 0.9em; }
-        .event { margin: 2px 0; padding: 2px 5px; border-bottom: 1px solid #ddd; }
-        .event-time { color: #999; }
-        .event-type { color: #0066cc; font-weight: bold; }
-    </style>
-</head>
-<body>
-    <h1>Astralis SSE Example</h1>
-    
-    <div class="section">
-        <h2>Live Clock (Broadcast)</h2>
-        <div id="clock-status" class="status disconnected">Disconnected</div>
-        <div id="clock">--:--:--</div>
-    </div>
-    
-    <div class="section">
-        <h2>Event Log</h2>
-        <div id="events"></div>
-    </div>
-    
-    <script>
-        function logEvent(type, data) {
-            const eventsDiv = document.getElementById('events');
-            const time = new Date().toLocaleTimeString();
-            const eventDiv = document.createElement('div');
-            eventDiv.className = 'event';
-            eventDiv.innerHTML = '<span class="event-time">' + time + '</span> <span class="event-type">[' + type + ']</span> ' + data;
-            eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
-        }
-        
-        // Clock SSE connection
-        const clockSource = new EventSource('/clock-stream');
-        const clockStatus = document.getElementById('clock-status');
+    <!DOCTYPE html>
+    <html>
+    <head>
+        <title>Astralis SSE Example</title>
+        <style>
+            body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
+            .section { margin: 20px 0; padding: 20px; border: 1px solid #ccc; border-radius: 8px; }
+            h1 { color: #333; }
+            h2 { color: #666; margin-top: 0; }
+            #clock { font-size: 3em; font-family: monospace; text-align: center; }
+            #counter { font-size: 3em; font-family: monospace; text-align: center; }
+            .status { padding: 5px 10px; border-radius: 4px; margin: 10px 0; }
+            .status.connected { background: #d4edda; color: #155724; }
+            .status.disconnected { background: #f8d7da; color: #721c24; }
+            #events { background: #f5f5f5; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; font-size: 0.9em; }
+            .event { margin: 2px 0; padding: 2px 5px; border-bottom: 1px solid #ddd; }
+            .event-time { color: #999; }
+            .event-type { color: #0066cc; font-weight: bold; }
+            .counter-buttons { text-align: center; margin: 15px 0; }
+            .counter-buttons button { font-size: 1.2em; padding: 10px 20px; margin: 0 5px; cursor: pointer; }
+            .counter-buttons button.increment { background: #28a745; color: white; border: none; border-radius: 4px; }
+            .counter-buttons button.decrement { background: #dc3545; color: white; border: none; border-radius: 4px; }
+            .counter-buttons button.reset { background: #6c757d; color: white; border: none; border-radius: 4px; }
+            .counter-buttons button:hover { opacity: 0.9; }
+        </style>
+    </head>
+    <body>
+        <h1>Astralis SSE Example</h1>
         
-        clockSource.onopen = function() {
-            clockStatus.textContent = 'Connected';
-            clockStatus.className = 'status connected';
-        };
+        <div class="section">
+            <h2>Shared Counter (HTTP + SSE)</h2>
+            <div id="counter-status" class="status disconnected">Disconnected</div>
+            <div id="counter">0</div>
+            <div class="counter-buttons">
+                <button class="decrement" onclick="decrementCounter()">- Decrement</button>
+                <button class="reset" onclick="resetCounter()">Reset</button>
+                <button class="increment" onclick="incrementCounter()">+ Increment</button>
+            </div>
+        </div>
         
-        clockSource.onerror = function() {
-            clockStatus.textContent = 'Disconnected (reconnecting...)';
-            clockStatus.className = 'status disconnected';
-        };
+        <div class="section">
+            <h2>Live Clock (Broadcast)</h2>
+            <div id="clock-status" class="status disconnected">Disconnected</div>
+            <div id="clock">--:--:--</div>
+        </div>
         
-        clockSource.addEventListener('time', function(e) {
-            document.getElementById('clock').textContent = e.data;
-        });
+        <div class="section">
+            <h2>Event Log</h2>
+            <div id="events"></div>
+        </div>
         
-        clockSource.addEventListener('datetime', function(e) {
-            logEvent('datetime', e.data);
-        });
-        
-        clockSource.addEventListener('connected', function(e) {
-            logEvent('connected', e.data);
-        });
-    </script>
-</body>
-</html>
-""";
+        <script>
+            function logEvent(type, data) {
+                const eventsDiv = document.getElementById('events');
+                const time = new Date().toLocaleTimeString();
+                const eventDiv = document.createElement('div');
+                eventDiv.className = 'event';
+                eventDiv.innerHTML = '<span class="event-time">' + time + '</span> <span class="event-type">[' + type + ']</span> ' + data;
+                eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
+            }
+            
+            // Counter SSE connection
+            const counterSource = new EventSource('/counter-stream');
+            const counterStatus = document.getElementById('counter-status');
+            
+            counterSource.onopen = function() {
+                counterStatus.textContent = 'Connected';
+                counterStatus.className = 'status connected';
+            };
+            
+            counterSource.onerror = function() {
+                counterStatus.textContent = 'Disconnected (reconnecting...)';
+                counterStatus.className = 'status disconnected';
+            };
+            
+            counterSource.addEventListener('counter', function(e) {
+                document.getElementById('counter').textContent = e.data;
+                logEvent('counter', e.data);
+            });
+            
+            // Counter control functions
+            async function incrementCounter() {
+                await fetch('/counter/increment', { method: 'POST' });
+            }
+            
+            async function decrementCounter() {
+                await fetch('/counter/decrement', { method: 'POST' });
+            }
+            
+            async function resetCounter() {
+                await fetch('/counter/reset', { method: 'POST' });
+            }
+            
+            // Clock SSE connection
+            const clockSource = new EventSource('/clock-stream');
+            const clockStatus = document.getElementById('clock-status');
+            
+            clockSource.onopen = function() {
+                clockStatus.textContent = 'Connected';
+                clockStatus.className = 'status connected';
+            };
+            
+            clockSource.onerror = function() {
+                clockStatus.textContent = 'Disconnected (reconnecting...)';
+                clockStatus.className = 'status disconnected';
+            };
+            
+            clockSource.addEventListener('time', function(e) {
+                document.getElementById('clock').textContent = e.data;
+            });
+            
+            clockSource.addEventListener('datetime', function(e) {
+                logEvent('datetime', e.data);
+            });
+            
+            clockSource.addEventListener('connected', function(e) {
+                logEvent('connected', e.data);
+            });
+        </script>
+    </body>
+    </html>
+    """;
         return new HttpStringResult(html)
             .set_header("Content-Type", "text/html");
     }
@@ -161,7 +326,28 @@ class IndexEndpoint : Object, Endpoint {
 void main() {
     var application = new WebApplication(8080);
     
-    // Register SSE endpoint as a singleton with an explicit factory
+    // Create and register the counter SSE endpoint as a singleton
+    var counter_endpoint = new CounterEndpoint();
+    application.add_singleton_endpoint<CounterEndpoint>(
+        new EndpointRoute("/counter-stream"),
+        () => counter_endpoint
+    );
+    
+    // Register counter control endpoints (HTTP POST to modify state)
+    application.add_endpoint<CounterIncrementEndpoint>(
+        new EndpointRoute("/counter/increment"),
+        () => new CounterIncrementEndpoint(counter_endpoint)
+    );
+    application.add_endpoint<CounterDecrementEndpoint>(
+        new EndpointRoute("/counter/decrement"),
+        () => new CounterDecrementEndpoint(counter_endpoint)
+    );
+    application.add_endpoint<CounterResetEndpoint>(
+        new EndpointRoute("/counter/reset"),
+        () => new CounterResetEndpoint(counter_endpoint)
+    );
+    
+    // Register clock SSE endpoint as a singleton with an explicit factory
     // This ensures the constructor logic runs
     application.add_singleton_endpoint<ClockEndpoint>(
         new EndpointRoute("/clock-stream"),
@@ -177,6 +363,12 @@ void main() {
     
     print("SSE Example server running on http://localhost:8080\n");
     print("Open http://localhost:8080 in your browser to see SSE in action\n");
+    print("\nFeatures:\n");
+    print("  - /counter-stream: SSE endpoint for counter updates\n");
+    print("  - /counter/increment: POST to increment counter\n");
+    print("  - /counter/decrement: POST to decrement counter\n");
+    print("  - /counter/reset: POST to reset counter to 0\n");
+    print("  - /clock-stream: SSE endpoint for live clock\n");
     
     application.run();
 }

+ 10 - 6
src/Endpoints/SseEndpoint.vala

@@ -140,11 +140,15 @@ namespace Astralis {
     public class HttpSseResult : HttpResult {
 
         private SseEndpoint endpoint;
+        private HttpContext http_context;
+        private RouteContext route_context;
 
         /// Create an SSE result for the given endpoint.
-        internal HttpSseResult(SseEndpoint endpoint) {
+        internal HttpSseResult(SseEndpoint endpoint, HttpContext http_context, RouteContext route_context) {
             base(StatusCode.OK);
             this.endpoint = endpoint;
+            this.http_context = http_context;
+            this.route_context = route_context;
 
             // Set required SSE headers
             set_header("Content-Type", "text/event-stream");
@@ -169,7 +173,7 @@ namespace Astralis {
             endpoint.register_stream(stream);
 
             // Notify the endpoint that a new connection was established
-            endpoint.notify_new_connection(stream);
+            endpoint.notify_new_connection(http_context, route_context, stream);
 
             // Keep the connection alive until the stream is closed
             while (!stream.is_closed) {
@@ -227,7 +231,7 @@ namespace Astralis {
         /// This method is sealed and cannot be overridden. It always returns
         /// an HttpSseResult which manages the SSE connection lifecycle.
         public sealed async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
-            return new HttpSseResult(this);
+            return new HttpSseResult(this, http_context, route_context);
         }
 
         /// Called when a new client connects.
@@ -237,7 +241,7 @@ namespace Astralis {
         /// disconnects or the stream is explicitly closed.
         /// 
         /// @param stream The SSE stream for this client connection
-        public virtual async void new_connection(SseStream stream) {
+        public virtual async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
             // Default implementation does nothing
         }
 
@@ -283,8 +287,8 @@ namespace Astralis {
             streams_mutex.unlock();
         }
 
-        internal void notify_new_connection(SseStream stream) {
-            new_connection.begin(stream);
+        internal void notify_new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
+            new_connection.begin(http_context, route_context, stream);
         }
     }