SseExample.vala 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. using Astralis;
  2. using Invercargill;
  3. using Invercargill.DataStructures;
  4. /// CounterEndpoint is a singleton SSE endpoint that broadcasts counter changes
  5. /// to all connected clients whenever the counter is modified.
  6. ///
  7. /// It demonstrates:
  8. /// - Singleton pattern for SSE endpoints (shared state across connections)
  9. /// - Broadcasting state changes triggered by HTTP requests
  10. /// - Using new_connection to send current state to new clients
  11. /// - Public methods to modify state and broadcast changes
  12. public class CounterEndpoint : SseEndpoint {
  13. private int counter = 0;
  14. private Mutex counter_mutex = Mutex();
  15. /// Retry interval: clients should wait 2 seconds before reconnecting
  16. public override uint retry_interval { get { return 2000; } }
  17. /// Called when a new client connects - send current counter value
  18. public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
  19. print(@"Counter SSE client connected (total: $(get_open_streams().length))\n");
  20. // Send current counter value to new client
  21. try {
  22. counter_mutex.lock();
  23. int current_value = counter;
  24. counter_mutex.unlock();
  25. yield stream.send_event(new SseEvent.with_type("counter", current_value.to_string()));
  26. } catch (Error e) {
  27. print(@"Failed to send counter value: $(e.message)\n");
  28. }
  29. // Listen for disconnection
  30. stream.disconnected.connect(() => {
  31. print(@"Counter SSE client disconnected\n");
  32. });
  33. }
  34. /// Increment the counter and broadcast the new value
  35. public async void increment() {
  36. counter_mutex.lock();
  37. counter++;
  38. int new_value = counter;
  39. counter_mutex.unlock();
  40. print(@"Counter incremented to: $new_value\n");
  41. yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
  42. }
  43. /// Decrement the counter and broadcast the new value
  44. public async void decrement() {
  45. counter_mutex.lock();
  46. counter--;
  47. int new_value = counter;
  48. counter_mutex.unlock();
  49. print(@"Counter decremented to: $new_value\n");
  50. yield broadcast_event(new SseEvent.with_type("counter", new_value.to_string()));
  51. }
  52. /// Reset the counter to zero and broadcast
  53. public async void reset() {
  54. counter_mutex.lock();
  55. counter = 0;
  56. counter_mutex.unlock();
  57. print(@"Counter reset to: 0\n");
  58. yield broadcast_event(new SseEvent.with_type("counter", "0"));
  59. }
  60. }
  61. /// ClockEndpoint is a singleton SSE endpoint that broadcasts the current time
  62. /// to all connected clients every second.
  63. ///
  64. /// It demonstrates:
  65. /// - Singleton pattern for SSE endpoints (shared state across connections)
  66. /// - Implementing retry_interval property
  67. /// - Using new_connection for initial welcome message
  68. /// - Public method to broadcast events
  69. public class ClockEndpoint : SseEndpoint {
  70. private int connection_counter = 0;
  71. private Mutex counter_mutex = Mutex();
  72. /// Retry interval: clients should wait 3 seconds before reconnecting
  73. public override uint retry_interval { get { return 3000; } }
  74. /// Called when a new client connects - send welcome message
  75. public override async void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
  76. // Assign a unique connection ID
  77. int connection_id;
  78. counter_mutex.lock();
  79. connection_id = ++connection_counter;
  80. counter_mutex.unlock();
  81. print(@"SSE client connected (connection #$connection_id, total: $(get_open_streams().length))\n");
  82. // Send welcome message
  83. try {
  84. yield stream.send_event(new SseEvent.with_type("connected", @"You are connection #$connection_id"));
  85. } catch (Error e) {
  86. print(@"Failed to send welcome: $(e.message)\n");
  87. }
  88. // Listen for disconnection
  89. stream.disconnected.connect(() => {
  90. print(@"SSE client disconnected (connection #$connection_id)\n");
  91. });
  92. }
  93. /// Public method to broadcast the current time to all connected clients.
  94. /// This can be called from anywhere (e.g., from another endpoint or service).
  95. public async void broadcast_time() {
  96. var now = new DateTime.now_local();
  97. var time_str = now.format("%H:%M:%S");
  98. var date_str = now.format("%Y-%m-%d");
  99. // Create event with current time
  100. var time_event = new SseEvent.with_type("time", time_str);
  101. // Also send a JSON-formatted message
  102. var json_data = @"{\"time\":\"$time_str\",\"date\":\"$date_str\"}";
  103. var json_event = new SseEvent.with_type("datetime", json_data);
  104. // Broadcast to all connected clients
  105. yield broadcast_event(time_event);
  106. yield broadcast_event(json_event);
  107. }
  108. /// Start the broadcast loop. Called externally after construction.
  109. public void start_broadcast_loop() {
  110. broadcast_loop_iteration.begin();
  111. }
  112. private async void broadcast_loop_iteration() {
  113. // Broadcast current time
  114. yield broadcast_time();
  115. // Schedule next iteration in 1 second
  116. Timeout.add(1000, () => {
  117. broadcast_loop_iteration.begin();
  118. return false; // Don't repeat, we'll reschedule manually
  119. });
  120. }
  121. }
  122. /// CounterIncrementEndpoint handles HTTP POST requests to increment the counter
  123. class CounterIncrementEndpoint : Object, Endpoint {
  124. private CounterEndpoint counter_endpoint;
  125. public CounterIncrementEndpoint(CounterEndpoint counter_endpoint) {
  126. this.counter_endpoint = counter_endpoint;
  127. }
  128. public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
  129. counter_endpoint.increment.begin();
  130. return new HttpStringResult("{\"status\":\"ok\"}")
  131. .set_header("Content-Type", "application/json");
  132. }
  133. }
  134. /// CounterDecrementEndpoint handles HTTP POST requests to decrement the counter
  135. class CounterDecrementEndpoint : Object, Endpoint {
  136. private CounterEndpoint counter_endpoint;
  137. public CounterDecrementEndpoint(CounterEndpoint counter_endpoint) {
  138. this.counter_endpoint = counter_endpoint;
  139. }
  140. public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
  141. counter_endpoint.decrement.begin();
  142. return new HttpStringResult("{\"status\":\"ok\"}")
  143. .set_header("Content-Type", "application/json");
  144. }
  145. }
  146. /// CounterResetEndpoint handles HTTP POST requests to reset the counter
  147. class CounterResetEndpoint : Object, Endpoint {
  148. private CounterEndpoint counter_endpoint;
  149. public CounterResetEndpoint(CounterEndpoint counter_endpoint) {
  150. this.counter_endpoint = counter_endpoint;
  151. }
  152. public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
  153. counter_endpoint.reset.begin();
  154. return new HttpStringResult("{\"status\":\"ok\"}")
  155. .set_header("Content-Type", "application/json");
  156. }
  157. }
  158. /// IndexEndpoint serves a simple HTML page that connects to the SSE endpoints
  159. class IndexEndpoint : Object, Endpoint {
  160. public async HttpResult handle_request(HttpContext http_context, RouteContext route_context) throws Error {
  161. var html = """
  162. <!DOCTYPE html>
  163. <html>
  164. <head>
  165. <title>Astralis SSE Example</title>
  166. <style>
  167. body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
  168. .section { margin: 20px 0; padding: 20px; border: 1px solid #ccc; border-radius: 8px; }
  169. h1 { color: #333; }
  170. h2 { color: #666; margin-top: 0; }
  171. #clock { font-size: 3em; font-family: monospace; text-align: center; }
  172. #counter { font-size: 3em; font-family: monospace; text-align: center; }
  173. .status { padding: 5px 10px; border-radius: 4px; margin: 10px 0; }
  174. .status.connected { background: #d4edda; color: #155724; }
  175. .status.disconnected { background: #f8d7da; color: #721c24; }
  176. #events { background: #f5f5f5; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; font-size: 0.9em; }
  177. .event { margin: 2px 0; padding: 2px 5px; border-bottom: 1px solid #ddd; }
  178. .event-time { color: #999; }
  179. .event-type { color: #0066cc; font-weight: bold; }
  180. .counter-buttons { text-align: center; margin: 15px 0; }
  181. .counter-buttons button { font-size: 1.2em; padding: 10px 20px; margin: 0 5px; cursor: pointer; }
  182. .counter-buttons button.increment { background: #28a745; color: white; border: none; border-radius: 4px; }
  183. .counter-buttons button.decrement { background: #dc3545; color: white; border: none; border-radius: 4px; }
  184. .counter-buttons button.reset { background: #6c757d; color: white; border: none; border-radius: 4px; }
  185. .counter-buttons button:hover { opacity: 0.9; }
  186. </style>
  187. </head>
  188. <body>
  189. <h1>Astralis SSE Example</h1>
  190. <div class="section">
  191. <h2>Shared Counter (HTTP + SSE)</h2>
  192. <div id="counter-status" class="status disconnected">Disconnected</div>
  193. <div id="counter">0</div>
  194. <div class="counter-buttons">
  195. <button class="decrement" onclick="decrementCounter()">- Decrement</button>
  196. <button class="reset" onclick="resetCounter()">Reset</button>
  197. <button class="increment" onclick="incrementCounter()">+ Increment</button>
  198. </div>
  199. </div>
  200. <div class="section">
  201. <h2>Live Clock (Broadcast)</h2>
  202. <div id="clock-status" class="status disconnected">Disconnected</div>
  203. <div id="clock">--:--:--</div>
  204. </div>
  205. <div class="section">
  206. <h2>Event Log</h2>
  207. <div id="events"></div>
  208. </div>
  209. <script>
  210. function logEvent(type, data) {
  211. const eventsDiv = document.getElementById('events');
  212. const time = new Date().toLocaleTimeString();
  213. const eventDiv = document.createElement('div');
  214. eventDiv.className = 'event';
  215. eventDiv.innerHTML = '<span class="event-time">' + time + '</span> <span class="event-type">[' + type + ']</span> ' + data;
  216. eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
  217. }
  218. // Counter SSE connection
  219. const counterSource = new EventSource('/counter-stream');
  220. const counterStatus = document.getElementById('counter-status');
  221. counterSource.onopen = function() {
  222. counterStatus.textContent = 'Connected';
  223. counterStatus.className = 'status connected';
  224. };
  225. counterSource.onerror = function() {
  226. counterStatus.textContent = 'Disconnected (reconnecting...)';
  227. counterStatus.className = 'status disconnected';
  228. };
  229. counterSource.addEventListener('counter', function(e) {
  230. document.getElementById('counter').textContent = e.data;
  231. logEvent('counter', e.data);
  232. });
  233. // Counter control functions
  234. async function incrementCounter() {
  235. await fetch('/counter/increment', { method: 'POST' });
  236. }
  237. async function decrementCounter() {
  238. await fetch('/counter/decrement', { method: 'POST' });
  239. }
  240. async function resetCounter() {
  241. await fetch('/counter/reset', { method: 'POST' });
  242. }
  243. // Clock SSE connection
  244. const clockSource = new EventSource('/clock-stream');
  245. const clockStatus = document.getElementById('clock-status');
  246. clockSource.onopen = function() {
  247. clockStatus.textContent = 'Connected';
  248. clockStatus.className = 'status connected';
  249. };
  250. clockSource.onerror = function() {
  251. clockStatus.textContent = 'Disconnected (reconnecting...)';
  252. clockStatus.className = 'status disconnected';
  253. };
  254. clockSource.addEventListener('time', function(e) {
  255. document.getElementById('clock').textContent = e.data;
  256. });
  257. clockSource.addEventListener('datetime', function(e) {
  258. logEvent('datetime', e.data);
  259. });
  260. clockSource.addEventListener('connected', function(e) {
  261. logEvent('connected', e.data);
  262. });
  263. </script>
  264. </body>
  265. </html>
  266. """;
  267. return new HttpStringResult(html)
  268. .set_header("Content-Type", "text/html");
  269. }
  270. }
  271. void main() {
  272. var application = new WebApplication(8080);
  273. // Create and register the counter SSE endpoint as a singleton
  274. var counter_endpoint = new CounterEndpoint();
  275. application.add_singleton_endpoint<CounterEndpoint>(
  276. new EndpointRoute("/counter-stream"),
  277. () => counter_endpoint
  278. );
  279. // Register counter control endpoints (HTTP POST to modify state)
  280. application.add_endpoint<CounterIncrementEndpoint>(
  281. new EndpointRoute("/counter/increment"),
  282. () => new CounterIncrementEndpoint(counter_endpoint)
  283. );
  284. application.add_endpoint<CounterDecrementEndpoint>(
  285. new EndpointRoute("/counter/decrement"),
  286. () => new CounterDecrementEndpoint(counter_endpoint)
  287. );
  288. application.add_endpoint<CounterResetEndpoint>(
  289. new EndpointRoute("/counter/reset"),
  290. () => new CounterResetEndpoint(counter_endpoint)
  291. );
  292. // Register clock SSE endpoint as a singleton with an explicit factory
  293. // This ensures the constructor logic runs
  294. application.add_singleton_endpoint<ClockEndpoint>(
  295. new EndpointRoute("/clock-stream"),
  296. () => {
  297. var endpoint = new ClockEndpoint();
  298. endpoint.start_broadcast_loop();
  299. return endpoint;
  300. }
  301. );
  302. // Register the index page
  303. application.add_endpoint<IndexEndpoint>(new EndpointRoute("/"));
  304. print("SSE Example server running on http://localhost:8080\n");
  305. print("Open http://localhost:8080 in your browser to see SSE in action\n");
  306. print("\nFeatures:\n");
  307. print(" - /counter-stream: SSE endpoint for counter updates\n");
  308. print(" - /counter/increment: POST to increment counter\n");
  309. print(" - /counter/decrement: POST to decrement counter\n");
  310. print(" - /counter/reset: POST to reset counter to 0\n");
  311. print(" - /clock-stream: SSE endpoint for live clock\n");
  312. application.run();
  313. }