Sfoglia il codice sorgente

feat(continuation): add cancellation handling and simplify continuation provider

- Add `continuation_canceled` virtual method to Component base class for
  handling timeout/cancellation scenarios
- Refactor ContinuationProvider to use single pending_components dictionary
  instead of separate components and active_channels dictionaries
- Reduce continuation connection timeout from 60s to 30s
- Register ContinuationProvider as startup endpoint in SpryModule
- Add ProgressExample demonstrating real-time progress updates via SSE
Billy Barrow 1 settimana fa
parent
commit
302a9e81ef
5 ha cambiato i file con 225 aggiunte e 23 eliminazioni
  1. 191 0
      examples/ProgressExample.vala
  2. 7 0
      examples/meson.build
  3. 3 0
      src/Component.vala
  4. 18 22
      src/ContinuationProvider.vala
  5. 6 1
      src/Spry.vala

+ 191 - 0
examples/ProgressExample.vala

@@ -0,0 +1,191 @@
+using Astralis;
+using Invercargill;
+using Invercargill.DataStructures;
+using Inversion;
+using Spry;
+
+/**
+ * ProgressExample demonstrates the continuation feature for server-sent events (SSE).
+ * 
+ * The continuation feature allows a Component to send real-time progress updates
+ * to the client via SSE. This is useful for:
+ * - Long-running task progress reporting
+ * - Real-time status updates
+ * - Live data streaming
+ * 
+ * How it works:
+ * 1. Add `spry-continuation` attribute to an element in your markup
+ *    (This is shorthand for: hx-ext="sse" sse-connect="(endpoint)" sse-close="_spry-close")
+ * 2. Use `sse-swap="eventname"` on child elements to swap content when events arrive
+ * 3. Override the `continuation(SseStream stream)` method in your Component
+ * 4. Use `stream.send_event()` to send SSE events with HTML content to swap
+ */
+class ProgressComponent : Component {
+    
+    public override string markup { get {
+        return """
+        <!DOCTYPE html>
+        <html>
+        <head>
+        <script spry-res="htmx.js"></script>
+        <script spry-res="htmx-sse.js"></script>
+        <style>
+            body { font-family: system-ui, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }
+            .progress-container { 
+                background: #e0e0e0; 
+                border-radius: 8px; 
+                overflow: hidden; 
+                margin: 20px 0;
+            }
+            .progress-bar { 
+                height: 30px; 
+                background: linear-gradient(90deg, #4CAF50, #8BC34A); 
+                transition: width 0.3s ease;
+                display: flex;
+                align-items: center;
+                justify-content: center;
+                color: white;
+                font-weight: bold;
+                min-width: 40px;
+            }
+            .status { 
+                padding: 15px; 
+                background: #f5f5f5; 
+                border-radius: 4px; 
+                margin: 10px 0;
+                border-left: 4px solid #2196F3;
+            }
+            .log { 
+                max-height: 200px; 
+                overflow-y: auto; 
+                background: #263238; 
+                color: #4CAF50; 
+                padding: 15px; 
+                border-radius: 4px;
+                font-family: monospace;
+                font-size: 14px;
+            }
+            .log-entry { margin: 5px 0; }
+            h1 { color: #333; }
+            .info { color: #666; font-size: 14px; }
+        </style>
+        </head>
+        <body>
+        <h1>Task Progress Demo</h1>
+        <p class="info">This example demonstrates Spry's continuation feature for real-time progress updates via Server-Sent Events (SSE).</p>
+        
+        <div spry-continuation>
+            <div class="progress-container" sse-swap="progress">
+                <div class="progress-bar" id="progress-bar" style="width: 0%">
+                    0%
+                </div>
+            </div>
+            
+            <div class="status" sse-swap="status">
+                <strong>Status:</strong> Initializing...
+            </div>
+            
+            <div class="log" id="log">
+                <div sse-swap="log">Waiting for task to start...</div>
+            </div>
+        </div>
+        </body>
+        </html>
+        """;
+    }}
+    
+    /**
+     * The continuation method is called when a client connects to the SSE endpoint.
+     * This is where you can send real-time updates to the client.
+     * 
+     * The event data should be HTML content that will be swapped into elements
+     * with matching sse-swap="eventname" attributes.
+     */
+    public async override void continuation(SseStream stream) throws Error {
+        // Simulate a long-running task with progress updates
+        var steps = new string[] {
+            "Initializing task...",
+            "Loading configuration...",
+            "Connecting to database...",
+            "Fetching records...",
+            "Processing batch 1/5...",
+            "Processing batch 2/5...",
+            "Processing batch 3/5...",
+            "Processing batch 4/5...",
+            "Processing batch 5/5...",
+            "Validating results...",
+            "Generating report...",
+            "Finalizing..."
+        };
+        
+        for (int i = 0; i < steps.length; i++) {
+            // Calculate progress percentage
+            int percent = (int)(((i + 1) / (double)steps.length) * 100);
+            
+            // Send progress bar update - HTML that will be swapped into the progress bar
+            yield stream.send_event(new SseEvent.with_type("progress", 
+                @"<div class=\"progress-bar\" id=\"progress-bar\" style=\"width: $percent%\">$percent%</div>"));
+            
+            // Send status update - HTML that will be swapped into the status div
+            yield stream.send_event(new SseEvent.with_type("status", 
+                @"<strong>Status:</strong> $(steps[i])"));
+            
+            // Send log message - HTML that will be appended to the log
+            yield stream.send_event(new SseEvent.with_type("log", 
+                @"<div class=\"log-entry\">$(steps[i])</div>"));
+            
+            // Simulate work being done (500ms per step)
+            Timeout.add(500, () => {
+                continuation.callback();
+                return false;
+            });
+            yield;
+        }
+        
+        // Send final completion messages
+        yield stream.send_event(new SseEvent.with_type("progress", 
+            "<div class=\"progress-bar\" id=\"progress-bar\" style=\"width: 100%\">100% ✓</div>"));
+        yield stream.send_event(new SseEvent.with_type("status", 
+            "<strong>Status:</strong> Task completed successfully!"));
+        yield stream.send_event(new SseEvent.with_type("log", 
+            "<div class=\"log-entry\">✓ All tasks completed!</div>"));
+    }
+}
+
+class HomePageEndpoint : Object, Endpoint {
+
+    private ProgressComponent progress_component = inject<ProgressComponent>();
+
+    public async Astralis.HttpResult handle_request(Astralis.HttpContext http_context, Astralis.RouteContext route_context) throws Error {
+        return yield progress_component.to_result();
+    }
+}
+
+void main(string[] args) {
+    int port = args.length > 1 ? int.parse(args[1]) : 8080;
+    
+    try {
+        var application = new WebApplication(port);
+        
+        // Register compression components
+        application.use_compression();
+        
+        // Add Spry module (includes ContinuationProvider for SSE)
+        application.add_module<SpryModule>();
+
+        // Register the progress component
+        application.add_transient<ProgressComponent>();
+
+        // Register the home page endpoint
+        application.add_endpoint<HomePageEndpoint>(new EndpointRoute("/"));
+
+        print("Progress Example running on http://localhost:%d/\n", port);
+        print("Open the URL in your browser to see real-time progress updates via SSE.\n");
+        
+        application.run();
+        
+    } catch (Error e) {
+        printerr("Error: %s\n", e.message);
+        Process.exit(1);
+    }
+}

+ 7 - 0
examples/meson.build

@@ -26,3 +26,10 @@ executable('template-example',
     install: false
 )
 
+# ProgressExample - demonstrates continuation feature for real-time progress updates via SSE
+executable('progress-example',
+    'ProgressExample.vala',
+    dependencies: [spry_dep, astralis_dep, invercargill_dep, inversion_dep],
+    install: false
+)
+

+ 3 - 0
src/Component.vala

@@ -23,6 +23,9 @@ namespace Spry {
         public virtual async void continuation(SseStream stream) throws Error {
             // No-op default
         }
+        public virtual async void continuation_canceled() throws Error {
+            // No-op default
+        }
         
         private PathProvider _path_provider = inject<PathProvider>();
         private ContinuationProvider _continuation_provider = inject<ContinuationProvider>();

+ 18 - 22
src/ContinuationProvider.vala

@@ -5,23 +5,19 @@ namespace Spry {
 
     public class ContinuationProvider : SseEndpoint {
 
-        private Dictionary<string, Component> components = new Dictionary<string, Component>();
-        private Dictionary<string, bool> active_channels = new Dictionary<string, bool>();
+        private Dictionary<string, Component> pending_components = new Dictionary<string, Component>();
 
         public override uint retry_interval { get { return 500; } }
 
         public async override void new_connection(HttpContext http_context, RouteContext route_context, SseStream stream) {
             var continuation_id = route_context.mapped_parameters.get_or_default("token");
             Component component = null;
-            lock(active_channels) {
-                if(components.try_get(continuation_id, out component)) {
-                    active_channels[continuation_id] = true;
-                }
-            }
+            pending_components.remove(continuation_id, out component);
 
             try {
                 if(component == null) {
-                        yield stream.send_event(new SseEvent.with_type("_spry-close", "404"));
+                    yield stream.send_event(new SseEvent.with_type("_spry-close", "404"));
+                    return;
                 }
 
                 try {
@@ -35,31 +31,31 @@ namespace Spry {
             }
             catch(Error e) {
                 warning("Failed to send '_spry-close' event: " + e.message);
-                return;
             }
 
         }
         
         public string get_continuation_path(Component component) {
             var id = Uuid.string_random();
-            components[id] = component;
-            active_channels[id] = false;
+            pending_components[id] = component;
 
-            // Remove component after 60 seconds if no connection has been made
-            Timeout.add_seconds_once(60, () => clean(id));
+            // Remove component after 30 seconds if no connection has been made
+            Timeout.add_seconds(30, () => {
+                clean.begin(id);
+                return false; // Don't repeat
+            });
             return @"/_spry/cnu/$id";
         }
 
-        private void clean(string id) {
-            bool active;
-            lock(active_channels){
-                if(!active_channels.try_get(id, out active)) {
-                    components.remove(id);
-                    return;
+        private async void clean(string id) {
+            Component component;
+            if(pending_components.remove(id, out component)) {
+                warning(@"Canceling continuation for component $(component.get_type().name()) due to timeout.");
+                try {
+                    yield component.continuation_canceled();
                 }
-                if(!active) {
-                    components.remove(id);
-                    active_channels.remove(id);
+                catch(Error e) {
+                    warning(@"Component $(component.get_type().name()) threw exception while handling a canceled continuation: $(e.message)");
                 }
             }
         }

+ 6 - 1
src/Spry.vala

@@ -6,11 +6,16 @@ namespace Spry {
     public class SpryModule : Object, Module {
         public void register_components (Container container) throws Error {
             container.register_singleton<PathProvider>();
+            container.register_scoped<ComponentFactory>();
+
+            container.register_startup<ContinuationProvider>()
+                .as<Endpoint>()
+                .with_metadata<EndpointRoute>(new EndpointRoute("/_spry/cnu/{token}"));
+
             container.register_startup<StaticResourceProvider>()
                 .as<Endpoint>()
                 .with_metadata<EndpointRoute>(new EndpointRoute("/_spry/res/{resource}"));
 
-            container.register_scoped<ComponentFactory>();
             container.register_scoped<ComponentEndpoint>()
                 .as<Endpoint>()
                 .with_metadata<EndpointRoute>(new EndpointRoute("/_spry/com/{component-id}/{action}"));