using Astralis; using Invercargill; using Invercargill.DataStructures; /// CounterEndpoint is a singleton SSE endpoint that broadcasts counter changes /// to all connected clients whenever the counter is modified. /// /// It demonstrates: /// - Singleton pattern for SSE endpoints (shared state across connections) /// - Broadcasting state changes triggered by HTTP requests /// - Using new_connection to send current state to new clients /// - Public methods to modify state and broadcast changes public class CounterEndpoint : SseEndpoint { private int counter = 0; private Mutex counter_mutex = Mutex(); /// Retry interval: clients should wait 2 seconds before reconnecting public override uint retry_interval { get { return 2000; } } /// Called when a new client connects - send current counter value public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) { print(@"Counter SSE client connected (total: $(get_open_streams().length))\n"); // Send current counter value to new client try { counter_mutex.lock(); int current_value = counter; counter_mutex.unlock(); yield stream.send_event(new SseEvent.with_type("counter", current_value.to_string())); } catch (Error e) { print(@"Failed to send counter value: $(e.message)\n"); } // Listen for disconnection stream.disconnected.connect(() => { print(@"Counter SSE client disconnected\n"); }); } /// Increment the counter and broadcast the new value public async void increment() { counter_mutex.lock(); counter++; int new_value = counter; counter_mutex.unlock(); print(@"Counter incremented to: $new_value\n"); yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string())); } /// Decrement the counter and broadcast the new value public async void decrement() { counter_mutex.lock(); counter--; int new_value = counter; counter_mutex.unlock(); print(@"Counter decremented to: $new_value\n"); yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string())); } /// Reset the counter to zero and broadcast public async void reset() { counter_mutex.lock(); counter = 0; counter_mutex.unlock(); print(@"Counter reset to: 0\n"); yield broadcast_event(new SseEvent.with_type("counter", "0")); } } /// ClockEndpoint is a singleton SSE endpoint that broadcasts the current time /// to all connected clients every second. /// /// It demonstrates: /// - Singleton pattern for SSE endpoints (shared state across connections) /// - Implementing retry_interval property /// - Using new_connection for initial welcome message /// - Public method to broadcast events public class ClockEndpoint : SseEndpoint { private int connection_counter = 0; private Mutex counter_mutex = Mutex(); /// Retry interval: clients should wait 3 seconds before reconnecting public override uint retry_interval { get { return 3000; } } /// Called when a new client connects - send welcome message public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) { // Assign a unique connection ID int connection_id; counter_mutex.lock(); connection_id = ++connection_counter; counter_mutex.unlock(); print(@"SSE client connected (connection #$connection_id, total: $(get_open_streams().length))\n"); // Send welcome message try { yield stream.send_event(new SseEvent.with_type("connected", @"You are connection #$connection_id")); } catch (Error e) { print(@"Failed to send welcome: $(e.message)\n"); } // Listen for disconnection stream.disconnected.connect(() => { print(@"SSE client disconnected (connection #$connection_id)\n"); }); } /// Public method to broadcast the current time to all connected clients. /// This can be called from anywhere (e.g., from another endpoint or service). public async void broadcast_time() { var now = new DateTime.now_local(); var time_str = now.format("%H:%M:%S"); var date_str = now.format("%Y-%m-%d"); // Create event with current time var time_event = new SseEvent.with_type("time", time_str); // Also send a JSON-formatted message var json_data = @"{\"time\":\"$time_str\",\"date\":\"$date_str\"}"; var json_event = new SseEvent.with_type("datetime", json_data); // Broadcast to all connected clients yield broadcast_event(time_event); yield broadcast_event(json_event); } /// Start the broadcast loop. Called externally after construction. public void start_broadcast_loop() { broadcast_loop_iteration.begin(); } private async void broadcast_loop_iteration() { // Broadcast current time yield broadcast_time(); // Schedule next iteration in 1 second Timeout.add(1000, () => { broadcast_loop_iteration.begin(); return false; // Don't repeat, we'll reschedule manually }); } } /// CounterIncrementEndpoint handles HTTP POST requests to increment the counter class CounterIncrementEndpoint : Object, Endpoint { private CounterEndpoint counter_endpoint; public CounterIncrementEndpoint(CounterEndpoint counter_endpoint) { this.counter_endpoint = counter_endpoint; } public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error { counter_endpoint.increment.begin(); return new HttpStringResult("{\"status\":\"ok\"}") .set_header("Content-Type", "application/json"); } } /// CounterDecrementEndpoint handles HTTP POST requests to decrement the counter class CounterDecrementEndpoint : Object, Endpoint { private CounterEndpoint counter_endpoint; public CounterDecrementEndpoint(CounterEndpoint counter_endpoint) { this.counter_endpoint = counter_endpoint; } public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error { counter_endpoint.decrement.begin(); return new HttpStringResult("{\"status\":\"ok\"}") .set_header("Content-Type", "application/json"); } } /// CounterResetEndpoint handles HTTP POST requests to reset the counter class CounterResetEndpoint : Object, Endpoint { private CounterEndpoint counter_endpoint; public CounterResetEndpoint(CounterEndpoint counter_endpoint) { this.counter_endpoint = counter_endpoint; } public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error { counter_endpoint.reset.begin(); return new HttpStringResult("{\"status\":\"ok\"}") .set_header("Content-Type", "application/json"); } } /// IndexEndpoint serves a simple HTML page that connects to the SSE endpoints class IndexEndpoint : Object, Endpoint { public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error { var html = """ Astralis SSE Example

Astralis SSE Example

Shared Counter (HTTP + SSE)

Disconnected
0

Live Clock (Broadcast)

Disconnected
--:--:--

Event Log

"""; return new HttpStringResult(html) .set_header("Content-Type", "text/html"); } } void main() { var application = new WebApplication(8080); // Create and register the counter SSE endpoint as a singleton var counter_endpoint = new CounterEndpoint(); application.add_singleton_endpoint( new EndpointRoute("/counter-stream"), () => counter_endpoint ); // Register counter control endpoints (HTTP POST to modify state) application.add_endpoint( new EndpointRoute("/counter/increment"), () => new CounterIncrementEndpoint(counter_endpoint) ); application.add_endpoint( new EndpointRoute("/counter/decrement"), () => new CounterDecrementEndpoint(counter_endpoint) ); application.add_endpoint( new EndpointRoute("/counter/reset"), () => new CounterResetEndpoint(counter_endpoint) ); // Register clock SSE endpoint as a singleton with an explicit factory // This ensures the constructor logic runs application.add_singleton_endpoint( new EndpointRoute("/clock-stream"), () => { var endpoint = new ClockEndpoint(); endpoint.start_broadcast_loop(); return endpoint; } ); // Register the index page application.add_endpoint(new EndpointRoute("/")); print("SSE Example server running on http://localhost:8080\n"); print("Open http://localhost:8080 in your browser to see SSE in action\n"); print("\nFeatures:\n"); print(" - /counter-stream: SSE endpoint for counter updates\n"); print(" - /counter/increment: POST to increment counter\n"); print(" - /counter/decrement: POST to decrement counter\n"); print(" - /counter/reset: POST to reset counter to 0\n"); print(" - /clock-stream: SSE endpoint for live clock\n"); application.run(); }