sse-implementation-plan.md 9.6 KB

Server-Sent Events (SSE) Implementation Plan

Overview

This plan outlines the implementation of Server-Sent Events (SSE) support for Astralis, following the existing architectural paradigm where objects outside the Server/ folder have no knowledge of server internals.

Architectural Analysis

Existing Patterns

The current architecture uses these key abstractions:

classDiagram
    class HttpResult {
        +Dictionary headers
        +StatusCode status
        +HttpResultFlag flags
        +send_body_async AsyncOutput output
    }
    
    class AsyncOutput {
        <<interface>>
        +write_async BinaryData data
        +write_stream_async InputStream stream
    }
    
    class ServerOutput {
        -Series chunks
        +write_async BinaryData data
        +read_chunk void* buffer size_t max
        +on_new_chunk signal
    }
    
    class ResponseContext {
        +HttpResult result
        +ServerOutput body_output
        +begin_response
        +suspend_connection
    }
    
    HttpResult <|-- HttpDataResult
    HttpResult <|-- HttpStreamResult
    HttpResult <|-- HttpEmptyResult
    AsyncOutput <|.. ServerOutput
    ResponseContext --> ServerOutput
    ResponseContext --> HttpResult

Key Design Principle

Separation of Concerns:

  • Core/ - Public abstractions visible to endpoints
  • Server/ - Internal implementation details marked internal
  • Endpoints interact only with Core/ types

Proposed Design

Component Overview

classDiagram
    class SseEvent {
        +string id
        +string event_type
        +string data
        +int64 retry
        +string format
    }
    
    class SseChannel {
        <<interface>>
        +send_event_async SseEvent event
        +send_async string data
        +CancellationToken cancellation_token
    }
    
    class SseHandler {
        <<interface>>
        +handle_sse_async SseChannel channel
    }
    
    class HttpSseResult {
        +int64 retry_interval
        +SseHandler handler
        +send_body_async AsyncOutput output
    }
    
    class ServerSseChannel {
        -AsyncOutput output
        -CancellationToken token
        +send_event_async SseEvent event
        +send_async string data
    }
    
    class CancellationToken {
        +bool is_cancelled
        +cancel
        +on_cancelled signal
    }
    
    HttpResult <|-- HttpSseResult
    SseChannel <|.. ServerSseChannel
    ServerSseChannel --> AsyncOutput
    ServerSseChannel --> CancellationToken
    HttpSseResult --> SseChannel
    HttpSseResult --> SseHandler

File Structure

src/
├── Core/
│   ├── SseEvent.vala           # Public: SSE event data structure
│   ├── SseChannel.vala         # Public: Interface for sending SSE events
│   ├── SseHandler.vala         # Public: Interface for SSE handlers - similar to Endpoint
│   ├── CancellationToken.vala  # Public: Token for manual cancellation
│   └── HttpSseResult.vala      # Public: HttpResult subclass for SSE
└── Server/
    └── ServerSseChannel.vala   # Internal: ServerOutput-backed implementation

Detailed Component Specifications

1. SseEvent - Core/SseEvent.vala

A simple data class representing an SSE event following the W3C specification.

public class SseEvent : Object {
    public string? id { get; set; }
    public string? event_type { get; set; }
    public string data { get; set; }
    public int64? retry { get; set; }
    
    public SseEvent(string data);
    public SseEvent.with_id(string id, string data);
    public SseEvent.with_type(string event_type, string data);
    
    public string format();  // Returns properly formatted SSE string
}

Format output example:

id: 123
event: message
data: Hello World
retry: 3000

2. CancellationToken - Core/CancellationToken.vala

A cancellation token for manual connection management.

public class CancellationToken : Object {
    public bool is_cancelled { get; private set; }
    public signal void cancelled();
    
    public void cancel();
}

3. SseChannel - Core/SseChannel.vala

Interface for sending SSE events - endpoints use this to push data.

public interface SseChannel : Object {
    public abstract async void send_event(SseEvent event) throws Error;
    public abstract async void send_data(string data) throws Error;
    public abstract CancellationToken cancellation_token { get; }
}

4. SseHandler - Core/SseHandler.vala

Interface for SSE handlers - follows the same pattern as Endpoint.

public interface SseHandler : Object {
    public abstract async void handle_sse(SseChannel channel) throws Error;
}

5. HttpSseResult - Core/HttpSseResult.vala

HttpResult subclass that enables SSE streaming.

public class HttpSseResult : HttpResult {
    public int64 retry_interval { get; set; }
    public SseHandler handler { get; set; }
    
    public HttpSseResult();
    public HttpSseResult.with_retry(int64 retry_ms);
    public HttpSseResult.with_handler(SseHandler handler);
    
    public override async void send_body(AsyncOutput output) throws Error;
}

6. ServerSseChannel - Server/ServerSseChannel.vala

Internal implementation that wraps an AsyncOutput.

internal class ServerSseChannel : Object, SseChannel {
    private AsyncOutput output;
    private CancellationToken cancellation_token;
    
    public ServerSseChannel(AsyncOutput output, CancellationToken token);
    
    public async void send_event(SseEvent event) throws Error;
    public async void send_data(string data) throws Error;
}

Usage Example

// Define an SSE handler class
class StockTickerHandler : Object, SseHandler {
    public async void handle_sse(SseChannel channel) throws Error {
        while (!channel.cancellation_token.is_cancelled) {
            var price = yield get_latest_stock_price();
            var evt = new SseEvent.with_type("price-update", price.to_string());
            yield channel.send_event(evt);
            yield AsyncUtil.delay_async(1000);  // Wait 1 second
        }
    }
}

// Use in endpoint
class StockTickerEndpoint : Object, Endpoint {
    public async HttpResult handle_request(HttpContext ctx, RouteContext route) throws Error {
        return new HttpSseResult.with_handler(new StockTickerHandler());
    }
}

Alternative: Anonymous Class Pattern

For simpler cases, Vala allows inline class implementation:

class SimpleSseEndpoint : Object, Endpoint {
    public async HttpResult handle_request(HttpContext ctx, RouteContext route) throws Error {
        var handler = new Object() with SseHandler {
            public async void handle_sse(SseChannel channel) throws Error {
                while (!channel.cancellation_token.is_cancelled) {
                    yield channel.send_data("ping");
                    Timeout.add(1000, () => {
                        handle_sse.callback();
                        return false;
                    });
                    yield;
                }
            }
        };
        return new HttpSseResult.with_handler(handler);
    }
}

Implementation Steps

Phase 1: Core Abstractions

  1. Create SseEvent class in Core/SseEvent.vala
  2. Create CancellationToken class in Core/CancellationToken.vala
  3. Create SseChannel interface in Core/SseChannel.vala
  4. Create SseHandler interface in Core/SseHandler.vala

Phase 2: HttpResult Integration

  1. Create HttpSseResult class in Core/HttpSseResult.vala
  2. Update meson.build to include new files

Phase 3: Server Implementation

  1. Create ServerSseChannel in Server/ServerSseChannel.vala

Phase 4: Example and Testing

  1. Create example endpoint in examples/SseExample.vala
  2. Update examples/meson.build

Design Decisions and Rationale

Why SseChannel Interface?

Following the existing pattern where AsyncOutput is an interface in Core but ServerOutput is an internal implementation, SseChannel provides:

  • Clean separation between public API and server implementation
  • Testability - mock implementations for unit tests
  • Flexibility - alternative implementations possible

Why CancellationToken vs Automatic Detection?

User preference for manual management. Benefits:

  • Explicit control in endpoint code
  • Clear resource ownership
  • Predictable cleanup behavior
  • Endpoint can perform cleanup operations before terminating

Why SseHandler Interface Instead of Async Delegate?

Vala does not support async delegates. Using an interface:

  • Follows the same pattern as Endpoint in the existing codebase
  • Enables IoC container registration and dependency injection
  • Provides clear contract for SSE handlers
  • Allows both named classes and anonymous implementations

SSE Protocol Compliance

The implementation follows the W3C Server-Sent Events specification:

  • Content-Type: text/event-stream
  • Events formatted with field: value syntax
  • Events separated by blank lines
  • Support for id, event, data, and retry fields
  • Automatic reconnection via retry field

Integration with Existing Components

Compression Pipeline

SSE streams should set DO_NOT_COMPRESS flag since:

  • Content is text-based and already efficient
  • Compression adds latency
  • Real-time nature conflicts with compression buffering

Response Headers

HttpSseResult automatically sets:

  • Content-Type: text/event-stream
  • Cache-Control: no-cache
  • Connection: keep-alive

Minimal Feature Set

Based on discussion, keeping features minimal:

  • No built-in Last-Event-ID support (endpoints can handle if needed)
  • No built-in heartbeat (endpoints can send comments if needed)
  • No connection timeout (endpoints manage via CancellationToken)