| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- using Astralis;
- using Invercargill;
- using Invercargill.DataStructures;
- /// CounterEndpoint is a singleton SSE endpoint that broadcasts counter changes
- /// to all connected clients whenever the counter is modified.
- ///
- /// It demonstrates:
- /// - Singleton pattern for SSE endpoints (shared state across connections)
- /// - Broadcasting state changes triggered by HTTP requests
- /// - Using new_connection to send current state to new clients
- /// - Public methods to modify state and broadcast changes
- public class CounterEndpoint : SseEndpoint {
- private int counter = 0;
- private Mutex counter_mutex = Mutex();
- /// Retry interval: clients should wait 2 seconds before reconnecting
- public override uint retry_interval { get { return 2000; } }
- /// Called when a new client connects - send current counter value
- public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
- print(@"Counter SSE client connected (total: $(get_open_streams().length))\n");
- // Send current counter value to new client
- try {
- counter_mutex.lock();
- int current_value = counter;
- counter_mutex.unlock();
- yield stream.send_event(new SseEvent.with_type("counter", current_value.to_string()));
- } catch (Error e) {
- print(@"Failed to send counter value: $(e.message)\n");
- }
- // Listen for disconnection
- stream.disconnected.connect(() => {
- print(@"Counter SSE client disconnected\n");
- });
- }
- /// Increment the counter and broadcast the new value
- public async void increment() {
- counter_mutex.lock();
- counter++;
- int new_value = counter;
- counter_mutex.unlock();
- print(@"Counter incremented to: $new_value\n");
- yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
- }
- /// Decrement the counter and broadcast the new value
- public async void decrement() {
- counter_mutex.lock();
- counter--;
- int new_value = counter;
- counter_mutex.unlock();
- print(@"Counter decremented to: $new_value\n");
- yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
- }
- /// Reset the counter to zero and broadcast
- public async void reset() {
- counter_mutex.lock();
- counter = 0;
- counter_mutex.unlock();
- print(@"Counter reset to: 0\n");
- yield broadcast_event(new SseEvent.with_type("counter", "0"));
- }
- }
- /// ClockEndpoint is a singleton SSE endpoint that broadcasts the current time
- /// to all connected clients every second.
- ///
- /// It demonstrates:
- /// - Singleton pattern for SSE endpoints (shared state across connections)
- /// - Implementing retry_interval property
- /// - Using new_connection for initial welcome message
- /// - Public method to broadcast events
- public class ClockEndpoint : SseEndpoint {
- private int connection_counter = 0;
- private Mutex counter_mutex = Mutex();
- /// Retry interval: clients should wait 3 seconds before reconnecting
- public override uint retry_interval { get { return 3000; } }
- /// Called when a new client connects - send welcome message
- public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
- // Assign a unique connection ID
- int connection_id;
- counter_mutex.lock();
- connection_id = ++connection_counter;
- counter_mutex.unlock();
- print(@"SSE client connected (connection #$connection_id, total: $(get_open_streams().length))\n");
- // Send welcome message
- try {
- yield stream.send_event(new SseEvent.with_type("connected", @"You are connection #$connection_id"));
- } catch (Error e) {
- print(@"Failed to send welcome: $(e.message)\n");
- }
- // Listen for disconnection
- stream.disconnected.connect(() => {
- print(@"SSE client disconnected (connection #$connection_id)\n");
- });
- }
- /// Public method to broadcast the current time to all connected clients.
- /// This can be called from anywhere (e.g., from another endpoint or service).
- public async void broadcast_time() {
- var now = new DateTime.now_local();
- var time_str = now.format("%H:%M:%S");
- var date_str = now.format("%Y-%m-%d");
- // Create event with current time
- var time_event = new SseEvent.with_type("time", time_str);
-
- // Also send a JSON-formatted message
- var json_data = @"{\"time\":\"$time_str\",\"date\":\"$date_str\"}";
- var json_event = new SseEvent.with_type("datetime", json_data);
- // Broadcast to all connected clients
- yield broadcast_event(time_event);
- yield broadcast_event(json_event);
- }
- /// Start the broadcast loop. Called externally after construction.
- public void start_broadcast_loop() {
- broadcast_loop_iteration.begin();
- }
- private async void broadcast_loop_iteration() {
- // Broadcast current time
- yield broadcast_time();
- // Schedule next iteration in 1 second
- Timeout.add(1000, () => {
- broadcast_loop_iteration.begin();
- return false; // Don't repeat, we'll reschedule manually
- });
- }
- }
- /// CounterIncrementEndpoint handles HTTP POST requests to increment the counter
- class CounterIncrementEndpoint : Object, Endpoint {
- private CounterEndpoint counter_endpoint;
- public CounterIncrementEndpoint(CounterEndpoint counter_endpoint) {
- this.counter_endpoint = counter_endpoint;
- }
- public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
- counter_endpoint.increment.begin();
- return new HttpStringResult("{\"status\":\"ok\"}")
- .set_header("Content-Type", "application/json");
- }
- }
- /// CounterDecrementEndpoint handles HTTP POST requests to decrement the counter
- class CounterDecrementEndpoint : Object, Endpoint {
- private CounterEndpoint counter_endpoint;
- public CounterDecrementEndpoint(CounterEndpoint counter_endpoint) {
- this.counter_endpoint = counter_endpoint;
- }
- public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
- counter_endpoint.decrement.begin();
- return new HttpStringResult("{\"status\":\"ok\"}")
- .set_header("Content-Type", "application/json");
- }
- }
- /// CounterResetEndpoint handles HTTP POST requests to reset the counter
- class CounterResetEndpoint : Object, Endpoint {
- private CounterEndpoint counter_endpoint;
- public CounterResetEndpoint(CounterEndpoint counter_endpoint) {
- this.counter_endpoint = counter_endpoint;
- }
- public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
- counter_endpoint.reset.begin();
- return new HttpStringResult("{\"status\":\"ok\"}")
- .set_header("Content-Type", "application/json");
- }
- }
- /// IndexEndpoint serves a simple HTML page that connects to the SSE endpoints
- class IndexEndpoint : Object, Endpoint {
- public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
- var html = """
- <!DOCTYPE html>
- <html>
- <head>
- <title>Astralis SSE Example</title>
- <style>
- body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
- .section { margin: 20px 0; padding: 20px; border: 1px solid #ccc; border-radius: 8px; }
- h1 { color: #333; }
- h2 { color: #666; margin-top: 0; }
- #clock { font-size: 3em; font-family: monospace; text-align: center; }
- #counter { font-size: 3em; font-family: monospace; text-align: center; }
- .status { padding: 5px 10px; border-radius: 4px; margin: 10px 0; }
- .status.connected { background: #d4edda; color: #155724; }
- .status.disconnected { background: #f8d7da; color: #721c24; }
- #events { background: #f5f5f5; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; font-size: 0.9em; }
- .event { margin: 2px 0; padding: 2px 5px; border-bottom: 1px solid #ddd; }
- .event-time { color: #999; }
- .event-type { color: #0066cc; font-weight: bold; }
- .counter-buttons { text-align: center; margin: 15px 0; }
- .counter-buttons button { font-size: 1.2em; padding: 10px 20px; margin: 0 5px; cursor: pointer; }
- .counter-buttons button.increment { background: #28a745; color: white; border: none; border-radius: 4px; }
- .counter-buttons button.decrement { background: #dc3545; color: white; border: none; border-radius: 4px; }
- .counter-buttons button.reset { background: #6c757d; color: white; border: none; border-radius: 4px; }
- .counter-buttons button:hover { opacity: 0.9; }
- </style>
- </head>
- <body>
- <h1>Astralis SSE Example</h1>
-
- <div class="section">
- <h2>Shared Counter (HTTP + SSE)</h2>
- <div id="counter-status" class="status disconnected">Disconnected</div>
- <div id="counter">0</div>
- <div class="counter-buttons">
- <button class="decrement" onclick="decrementCounter()">- Decrement</button>
- <button class="reset" onclick="resetCounter()">Reset</button>
- <button class="increment" onclick="incrementCounter()">+ Increment</button>
- </div>
- </div>
-
- <div class="section">
- <h2>Live Clock (Broadcast)</h2>
- <div id="clock-status" class="status disconnected">Disconnected</div>
- <div id="clock">--:--:--</div>
- </div>
-
- <div class="section">
- <h2>Event Log</h2>
- <div id="events"></div>
- </div>
-
- <script>
- function logEvent(type, data) {
- const eventsDiv = document.getElementById('events');
- const time = new Date().toLocaleTimeString();
- const eventDiv = document.createElement('div');
- eventDiv.className = 'event';
- eventDiv.innerHTML = '<span class="event-time">' + time + '</span> <span class="event-type">[' + type + ']</span> ' + data;
- eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
- }
-
- // Counter SSE connection
- const counterSource = new EventSource('/counter-stream');
- const counterStatus = document.getElementById('counter-status');
-
- counterSource.onopen = function() {
- counterStatus.textContent = 'Connected';
- counterStatus.className = 'status connected';
- };
-
- counterSource.onerror = function() {
- counterStatus.textContent = 'Disconnected (reconnecting...)';
- counterStatus.className = 'status disconnected';
- };
-
- counterSource.addEventListener('counter', function(e) {
- document.getElementById('counter').textContent = e.data;
- logEvent('counter', e.data);
- });
-
- // Counter control functions
- async function incrementCounter() {
- await fetch('/counter/increment', { method: 'POST' });
- }
-
- async function decrementCounter() {
- await fetch('/counter/decrement', { method: 'POST' });
- }
-
- async function resetCounter() {
- await fetch('/counter/reset', { method: 'POST' });
- }
-
- // Clock SSE connection
- const clockSource = new EventSource('/clock-stream');
- const clockStatus = document.getElementById('clock-status');
-
- clockSource.onopen = function() {
- clockStatus.textContent = 'Connected';
- clockStatus.className = 'status connected';
- };
-
- clockSource.onerror = function() {
- clockStatus.textContent = 'Disconnected (reconnecting...)';
- clockStatus.className = 'status disconnected';
- };
-
- clockSource.addEventListener('time', function(e) {
- document.getElementById('clock').textContent = e.data;
- });
-
- clockSource.addEventListener('datetime', function(e) {
- logEvent('datetime', e.data);
- });
-
- clockSource.addEventListener('connected', function(e) {
- logEvent('connected', e.data);
- });
- </script>
- </body>
- </html>
- """;
- return new HttpStringResult(html)
- .set_header("Content-Type", "text/html");
- }
- }
- void main() {
- var application = new WebApplication(8080);
-
- // Create and register the counter SSE endpoint as a singleton
- var counter_endpoint = new CounterEndpoint();
- application.add_singleton_endpoint<CounterEndpoint>(
- new EndpointRoute("/counter-stream"),
- () => counter_endpoint
- );
-
- // Register counter control endpoints (HTTP POST to modify state)
- application.add_endpoint<CounterIncrementEndpoint>(
- new EndpointRoute("/counter/increment"),
- () => new CounterIncrementEndpoint(counter_endpoint)
- );
- application.add_endpoint<CounterDecrementEndpoint>(
- new EndpointRoute("/counter/decrement"),
- () => new CounterDecrementEndpoint(counter_endpoint)
- );
- application.add_endpoint<CounterResetEndpoint>(
- new EndpointRoute("/counter/reset"),
- () => new CounterResetEndpoint(counter_endpoint)
- );
-
- // Register clock SSE endpoint as a singleton with an explicit factory
- // This ensures the constructor logic runs
- application.add_singleton_endpoint<ClockEndpoint>(
- new EndpointRoute("/clock-stream"),
- () => {
- var endpoint = new ClockEndpoint();
- endpoint.start_broadcast_loop();
- return endpoint;
- }
- );
-
- // Register the index page
- application.add_endpoint<IndexEndpoint>(new EndpointRoute("/"));
-
- print("SSE Example server running on http://localhost:8080\n");
- print("Open http://localhost:8080 in your browser to see SSE in action\n");
- print("\nFeatures:\n");
- print(" - /counter-stream: SSE endpoint for counter updates\n");
- print(" - /counter/increment: POST to increment counter\n");
- print(" - /counter/decrement: POST to decrement counter\n");
- print(" - /counter/reset: POST to reset counter to 0\n");
- print(" - /clock-stream: SSE endpoint for live clock\n");
-
- application.run();
- }
|