async-io-refactor-design.md 46 KB

Async I/O Refactor Design

Document Status

  • Created: 2026-03-14
  • Last Updated: 2026-03-14
  • Status: Finalized
  • Author: Architecture Review

Executive Summary

This document proposes a major refactoring of the Implexus codebase to make all I/O operations inherently async. The key changes are:

  1. Remove AsyncEngine and AsyncEntity wrappers - Make the base interfaces async
  2. Create a DBM Queue System - Serialize DBM operations with priority reads
  3. Add supports_concurrent_reads to Dbm interface - Enable optimizations for LMDB
  4. Make Engine and Entity interfaces async - All I/O operations become async

Current Architecture Summary

Current Async Pattern (To Be Removed)

The current implementation uses wrapper classes:

┌─────────────────────────────────────────────────────────────────┐
│                      Application Layer                           │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    AsyncEngine / AsyncEntity                     │
│                                                                  │
│  - Wraps Engine/Entity instances                                 │
│  - Each async method spawns a new Thread                         │
│  - Uses Idle.add to return results to main loop                  │
│  - Duplicates entire interface                                   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Engine / Entity Interfaces                    │
│                                                                  │
│  - Synchronous methods                                           │
│  - Blocking I/O                                                  │
└─────────────────────────────────────────────────────────────────┘

Problems with Current Approach:

  • Thread explosion: Each async call spawns a new thread
  • Interface duplication: AsyncEngine mirrors Engine, AsyncEntity mirrors Entity
  • Maintenance burden: Changes must be made in multiple places
  • Inefficient: No coordination between operations

Current DBM Implementations

Implementation Concurrent Reads Notes
GdbmDbm No GDBM does not support concurrent access
LmdbDbm Yes LMDB supports concurrent readers via MVCC
FilesystemDbm Limited File-per-key, but no explicit coordination

Current Storage Layer

┌─────────────────────────────────────────────────────────────────┐
│                      HighLevel Stores                            │
│  EntityStore, DocumentStore, ContainerStore,                    │
│  CategoryStore, CatalogueStore, IndexStore                      │
│  (Synchronous, compose LowLevel stores)                         │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                      LowLevel Storage                            │
│  EntityMetadataStorage, PropertiesStorage, ChildrenStorage,     │
│  TypeIndexStorage, CategoryIndexStorage, TextIndexStorage, etc. │
│  (Synchronous, direct Dbm access)                               │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                         Dbm Interface                            │
│  has_key(), get(), set(), delete(), keys,                       │
│  begin_transaction(), commit_transaction(), rollback_transaction()│
│  (All synchronous)                                              │
└─────────────────────────────────────────────────────────────────┘

Proposed Architecture

Design Principles

  1. Async by Default: All I/O operations are async at the interface level
  2. Queue-Based for Embedded: DBM operations go through a priority queue
  3. Native Async for Remote: Socket operations use GLib async I/O
  4. Concurrent Read Optimization: LMDB can spawn read threads
  5. Write Serialization: All writes go through single queue

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                      Application Layer                           │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Engine / Entity Interfaces                    │
│                                                                  │
│  - All I/O methods are async                                    │
│  - No wrapper classes needed                                    │
│  - Direct async API                                             │
└─────────────────────────────────────────────────────────────────┘
                              │
              ┌───────────────┴───────────────┐
              ▼                               ▼
┌───────────────────────────┐   ┌───────────────────────────────┐
│      EmbeddedEngine       │   │        RemoteEngine           │
│                           │   │                               │
│  Uses AsyncDbmQueue       │   │  Uses Async Socket I/O        │
│  for DBM operations       │   │  (GLib SocketConnection)      │
└───────────────────────────┘   └───────────────────────────────┘
              │                               │
              ▼                               ▼
┌───────────────────────────┐   ┌───────────────────────────────┐
│      AsyncDbmQueue        │   │    Protocol.MessageReader     │
│                           │   │    Protocol.MessageWriter     │
│  - Priority read queue    │   │                               │
│  - Serialized writes      │   │  (Already async-capable)      │
│  - Concurrent reads for   │   │                               │
│    LMDB                   │   │                               │
└───────────────────────────┘   └───────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────┐
│                         Dbm Interface                            │
│                                                                  │
│  + supports_concurrent_reads: bool { get; }                     │
│  (Methods remain synchronous - queue handles async)             │
└─────────────────────────────────────────────────────────────────┘

DBM Queue System Design

AsyncDbmQueue Class

A dedicated queue system for DBM operations that:

  1. Processes operations one at a time (serialized)
  2. Prioritizes reads over writes
  3. Uses Idle.add to return results to main loop
  4. Supports concurrent reads for LMDB-like backends

    namespace Implexus.Storage {
    
    /**
    * Priority level for DBM operations.
    */
    public enum DbmOperationPriority {
    READ = 0,      // Higher priority - processed first
    WRITE = 1,     // Lower priority - processed after reads
    TRANSACTION = 2 // Same as write, but grouped
    }
    
    /**
    * Delegate types for async callbacks.
    */
    public delegate void DbmReadCallback<T>(T? result, StorageError? error);
    public delegate void DbmWriteCallback(StorageError? error);
    
    /**
    * Queue system for async DBM operations.
    *
    * This class manages a dedicated thread for DBM operations,
    * ensuring serialized access while prioritizing reads.
    */
    public class AsyncDbmQueue : Object {
        
    private Dbm _dbm;
    private Thread<void> _worker_thread;
    private AsyncQueue<DbmOperation> _read_queue;
    private AsyncQueue<DbmOperation> _write_queue;
    private bool _running;
    private Mutex _mutex;
    private Cond _cond;
        
    /**
     * Creates a new AsyncDbmQueue wrapping the given Dbm.
     */
    public AsyncDbmQueue(Dbm dbm) {
        _dbm = dbm;
        _read_queue = new AsyncQueue<DbmOperation>();
        _write_queue = new AsyncQueue<DbmOperation>();
        _running = true;
        _mutex = Mutex();
        _cond = Cond();
            
        _worker_thread = new Thread<void>("dbm-worker", run_worker);
    }
        
    /**
     * Whether the underlying Dbm supports concurrent reads.
     */
    public bool supports_concurrent_reads {
        get { return _dbm.supports_concurrent_reads; }
    }
        
    // === Read Operations ===
        
    /**
     * Async has_key operation.
     */
    public async bool has_key_async(string key) throws StorageError {
        bool result = false;
        StorageError? error = null;
            
        var op = new DbmOperation.read_has_key(key, (r, e) => {
            result = r;
            error = e;
            Idle.add(() => {
                has_key_async.callback();
                return false;
            });
        });
            
        _read_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
            
        return result;
    }
        
    /**
     * Async get operation.
     */
    public async Invercargill.BinaryData? get_async(string key) throws StorageError {
        Invercargill.BinaryData? result = null;
        StorageError? error = null;
            
        var op = new DbmOperation.read_get(key, (r, e) => {
            result = r;
            error = e;
            Idle.add(() => {
                get_async.callback();
                return false;
            });
        });
            
        _read_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
            
        return result;
    }
        
    // === Write Operations ===
        
    /**
     * Async set operation.
     */
    public async void set_async(string key, Invercargill.BinaryData value) throws StorageError {
        StorageError? error = null;
            
        var op = new DbmOperation.write_set(key, value, (e) => {
            error = e;
            Idle.add(() => {
                set_async.callback();
                return false;
            });
        });
            
        _write_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
    }
        
    /**
     * Async delete operation.
     */
    public async void delete_async(string key) throws StorageError {
        StorageError? error = null;
            
        var op = new DbmOperation.write_delete(key, (e) => {
            error = e;
            Idle.add(() => {
                delete_async.callback();
                return false;
            });
        });
            
        _write_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
    }
        
    // === Transaction Operations ===
        
    /**
     * Async begin_transaction operation.
     */
    public async void begin_transaction_async() throws StorageError {
        StorageError? error = null;
            
        var op = new DbmOperation.transaction_begin((e) => {
            error = e;
            Idle.add(() => {
                begin_transaction_async.callback();
                return false;
            });
        });
            
        _write_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
    }
        
    /**
     * Async commit_transaction operation.
     */
    public async void commit_transaction_async() throws StorageError {
        StorageError? error = null;
            
        var op = new DbmOperation.transaction_commit((e) => {
            error = e;
            Idle.add(() => {
                commit_transaction_async.callback();
                return false;
            });
        });
            
        _write_queue.push(op);
        notify_worker();
            
        yield;
            
        if (error != null) {
            throw error;
        }
    }
        
    /**
     * Async rollback_transaction operation.
     */
    public async void rollback_transaction_async() {
        var op = new DbmOperation.transaction_rollback(() => {
            Idle.add(() => {
                rollback_transaction_async.callback();
                return false;
            });
        });
            
        _write_queue.push(op);
        notify_worker();
            
        yield;
    }
        
    // === Worker Thread ===
        
    private void run_worker() {
        while (_running) {
            DbmOperation? op = null;
                
            // Prioritize reads over writes
            op = _read_queue.try_pop();
            if (op == null) {
                op = _write_queue.try_pop();
            }
                
            if (op != null) {
                process_operation((!) op);
            } else {
                // Wait for new operations
                _mutex.lock();
                _cond.wait(_mutex, 100000); // 100ms timeout
                _mutex.unlock();
            }
        }
    }
        
    private void process_operation(DbmOperation op) {
        switch (op.type) {
            case DbmOperationType.HAS_KEY:
                bool result = _dbm.has_key(op.key);
                op.read_bool_callback(result, null);
                break;
                    
            case DbmOperationType.GET:
                var result = _dbm.get(op.key);
                op.read_data_callback(result, null);
                break;
                    
            case DbmOperationType.SET:
                try {
                    _dbm.set(op.key, op.value);
                    op.write_callback(null);
                } catch (StorageError e) {
                    op.write_callback(e);
                }
                break;
                    
            case DbmOperationType.DELETE:
                try {
                    _dbm.delete(op.key);
                    op.write_callback(null);
                } catch (StorageError e) {
                    op.write_callback(e);
                }
                break;
                    
            case DbmOperationType.BEGIN_TRANSACTION:
                try {
                    _dbm.begin_transaction();
                    op.write_callback(null);
                } catch (StorageError e) {
                    op.write_callback(e);
                }
                break;
                    
            case DbmOperationType.COMMIT_TRANSACTION:
                try {
                    _dbm.commit_transaction();
                    op.write_callback(null);
                } catch (StorageError e) {
                    op.write_callback(e);
                }
                break;
                    
            case DbmOperationType.ROLLBACK_TRANSACTION:
                _dbm.rollback_transaction();
                op.void_callback();
                break;
        }
    }
        
    private void notify_worker() {
        _mutex.lock();
        _cond.signal();
        _mutex.unlock();
    }
        
    /**
     * Shuts down the worker thread.
     */
    public void shutdown() {
        _running = false;
        notify_worker();
        _worker_thread.join();
    }
    }
    
    /**
    * Types of DBM operations.
    */
    private enum DbmOperationType {
    HAS_KEY,
    GET,
    SET,
    DELETE,
    BEGIN_TRANSACTION,
    COMMIT_TRANSACTION,
    ROLLBACK_TRANSACTION
    }
    
    /**
    * Represents a single DBM operation in the queue.
    */
    private class DbmOperation : Object {
    public DbmOperationType type;
    public string key;
    public Invercargill.BinaryData value;
        
    public DbmReadCallback<bool>? read_bool_callback;
    public DbmReadCallback<Invercargill.BinaryData>? read_data_callback;
    public DbmWriteCallback? write_callback;
    public delegate void VoidCallback();
    public VoidCallback? void_callback;
        
    // Factory methods for creating operations
    public DbmOperation.read_has_key(string key, DbmReadCallback<bool> callback) {
        this.type = DbmOperationType.HAS_KEY;
        this.key = key;
        this.read_bool_callback = callback;
    }
        
    public DbmOperation.read_get(string key, DbmReadCallback<Invercargill.BinaryData> callback) {
        this.type = DbmOperationType.GET;
        this.key = key;
        this.read_data_callback = callback;
    }
        
    public DbmOperation.write_set(string key, Invercargill.BinaryData value, DbmWriteCallback callback) {
        this.type = DbmOperationType.SET;
        this.key = key;
        this.value = value;
        this.write_callback = callback;
    }
        
    public DbmOperation.write_delete(string key, DbmWriteCallback callback) {
        this.type = DbmOperationType.DELETE;
        this.key = key;
        this.write_callback = callback;
    }
        
    public DbmOperation.transaction_begin(DbmWriteCallback callback) {
        this.type = DbmOperationType.BEGIN_TRANSACTION;
        this.write_callback = callback;
    }
        
    public DbmOperation.transaction_commit(DbmWriteCallback callback) {
        this.type = DbmOperationType.COMMIT_TRANSACTION;
        this.write_callback = callback;
    }
        
    public DbmOperation.transaction_rollback(VoidCallback callback) {
        this.type = DbmOperationType.ROLLBACK_TRANSACTION;
        this.void_callback = callback;
    }
    }
    
    } // namespace Implexus.Storage
    

Concurrent Read Optimization for LMDB

For DBM implementations that support concurrent reads (like LMDB), the queue can spawn dedicated read threads:

// In AsyncDbmQueue, when supports_concurrent_reads is true:
private void process_read_with_thread(DbmOperation op) {
    if (!_dbm.supports_concurrent_reads) {
        process_operation(op);
        return;
    }
    
    // For LMDB, spawn a new thread for the read
    new Thread<void>("dbm-reader", () => {
        // LMDB allows concurrent readers
        var result = _dbm.get(op.key);
        
        // Return result via Idle.add
        Idle.add(() => {
            op.read_data_callback(result, null);
            return false;
        });
    });
}

Dbm Interface Changes

Add supports_concurrent_reads Property

namespace Implexus.Storage {

public interface Dbm : Object {
    
    // === Existing Methods ===
    
    public abstract bool has_key(string key);
    public abstract Invercargill.BinaryData? @get(string key);
    public abstract void @set(string key, Invercargill.BinaryData value) throws StorageError;
    public abstract void delete(string key) throws StorageError;
    public abstract Invercargill.Enumerable<string> keys { owned get; }
    public abstract void begin_transaction() throws StorageError;
    public abstract void commit_transaction() throws StorageError;
    public abstract void rollback_transaction();
    public abstract bool in_transaction { get; }
    
    // === NEW: Concurrent Read Support ===
    
    /**
     * Indicates whether this Dbm implementation supports concurrent reads.
     *
     * If true, multiple threads can read simultaneously without blocking.
     * Writes are still serialized through the queue.
     *
     * - LMDB: true (MVCC allows concurrent readers)
     * - GDBM: false (single-threaded access)
     * - FilesystemDbm: false (no explicit coordination)
     */
    public abstract bool supports_concurrent_reads { get; }
}

} // namespace Implexus.Storage

Implementation Updates

// GdbmDbm.vala
public class GdbmDbm : Object, Dbm {
    public bool supports_concurrent_reads { get { return false; } }
    // ... rest of implementation
}

// LmdbDbm.vala
public class LmdbDbm : Object, Dbm {
    public bool supports_concurrent_reads { get { return true; } }
    // ... rest of implementation
}

// FilesystemDbm.vala
public class FilesystemDbm : Object, Dbm {
    public bool supports_concurrent_reads { get { return false; } }
    // ... rest of implementation
}

Engine Interface Changes

Current Interface (Synchronous)

public interface Engine : Object {
    public abstract Entity get_root();
    public abstract Entity? get_entity(EntityPath path) throws EngineError;
    public abstract Entity? get_entity_or_null(EntityPath path);
    public abstract bool entity_exists(EntityPath path);
    public abstract Invercargill.Enumerable<Entity> query_by_type(string type_label);
    public abstract Invercargill.Enumerable<Entity> query_by_expression(string type_label, string expression);
    public abstract Transaction begin_transaction() throws EngineError;
    public abstract bool in_transaction { get; }
    public abstract StorageConfiguration configuration { owned get; }
    
    public signal void entity_created(Entity entity);
    public signal void entity_deleted(EntityPath path);
    public signal void entity_modified(Entity entity);
}

Proposed Interface (Async)

public interface Engine : Object {
    // === Root Access ===
    
    /**
     * Gets the root entity of the database.
     *
     * This operation is async as it may require I/O to verify/create root.
     */
    public abstract async Entity get_root_async() throws EngineError;
    
    // === Path-Based Access ===
    
    /**
     * Gets an entity by path, throwing an error if not found.
     */
    public abstract async Entity? get_entity_async(EntityPath path) throws EngineError;
    
    /**
     * Gets an entity by path, returning null if not found.
     */
    public abstract async Entity? get_entity_or_null_async(EntityPath path) throws EngineError;
    
    /**
     * Checks if an entity exists at the specified path.
     */
    public abstract async bool entity_exists_async(EntityPath path) throws EngineError;
    
    // === Query Operations ===
    
    /**
     * Queries all entities of a specific type.
     *
     * Returns an array with eager loading. This is simpler than async
     * iteration and sufficient for expected data volumes.
     *
     * DECISION: Use Entity[] (eager loading) - simpler and sufficient
     * for expected data volumes. See "Decisions Made" section.
     */
    public abstract async Entity[] query_by_type_async(string type_label) throws EngineError;
    
    /**
     * Queries entities by type and expression.
     */
    public abstract async Entity[] query_by_expression_async(string type_label, string expression) throws EngineError;
    
    // === Transactions ===
    
    /**
     * Begins a new transaction.
     *
     * NOTE: The with_write_transaction() helper has been REMOVED.
     * Vala doesn't support async delegates, so transactions must be
     * managed manually with begin_transaction_async(), perform operations,
     * then commit_async() or rollback_async().
     *
     * See "Decisions Made" section for details.
     */
    public abstract async Transaction begin_transaction_async() throws EngineError;
    
    /**
     * Indicates whether a transaction is currently active.
     * This is synchronous as it's a quick property check.
     */
    public abstract bool in_transaction { get; }
    
    // === Configuration ===
    
    /**
     * Gets the storage configuration.
     * This is synchronous as it's a quick property access.
     */
    public abstract StorageConfiguration configuration { owned get; }
    
    // === Events ===
    
    public signal void entity_created(Entity entity);
    public signal void entity_deleted(EntityPath path);
    public signal void entity_modified(Entity entity);
}

Entity Interface Changes

Current Interface (Synchronous)

The current Entity interface has synchronous methods for all operations.

Proposed Interface (Async)

public interface Entity : Object {
    
    // === Identity (Synchronous - No I/O) ===
    
    public abstract unowned Engine engine { get; }
    public abstract EntityPath path { owned get; }
    public abstract string name { owned get; }
    public abstract EntityType entity_type { get; }
    public abstract string type_label { owned get; }
    public abstract string configured_expression { owned get; }
    public abstract string configured_type_label { owned get; }
    
    // === Parent/Child Navigation (Async - May require I/O) ===
    
    /**
     * Gets the parent entity, or null for the root.
     */
    public abstract async Entity? get_parent_async() throws EngineError;
    
    /**
     * Gets the names of all child entities.
     */
    public abstract async Invercargill.ReadOnlySet<string> get_child_names_async() throws EngineError;
    
    /**
     * Gets a child entity by name.
     */
    public abstract async Entity? get_child_async(string name) throws EngineError;
    
    /**
     * Gets all child entities.
     */
    public abstract async Entity[] get_children_async() throws EngineError;
    
    // === Child Management (Async - Requires I/O) ===
    
    public abstract async Entity? create_container_async(string name) throws EngineError;
    public abstract async Entity? create_document_async(string name, string type_label) throws EngineError;
    public abstract async Entity? create_category_async(string name, string type_label, string expression) throws EngineError;
    public abstract async Entity? create_index_async(string name, string type_label, string expression) throws EngineError;
    public abstract async Entity? create_catalogue_async(string name, string type_label, string expression) throws EngineError;
    
    // === Document Operations (Async - Requires I/O) ===
    
    /**
     * Gets the properties stored in this document.
     */
    public abstract async Invercargill.Properties get_properties_async() throws EngineError;
    
    /**
     * Gets a property value by name.
     */
    public abstract async Invercargill.Element? get_entity_property_async(string name) throws EngineError;
    
    /**
     * Sets a property value.
     */
    public abstract async void set_entity_property_async(string name, Invercargill.Element value) throws EngineError;
    
    /**
     * Removes a property.
     */
    public abstract async void remove_property_async(string name) throws EngineError;
    
    // === Lifecycle (Async - Requires I/O) ===
    
    /**
     * Deletes this entity from the database.
     */
    public abstract async void delete_async() throws EngineError;
    
    /**
     * Checks if this entity still exists in the database.
     * This is synchronous as it may be cached or quick to check.
     */
    public abstract bool exists { get; }
    
    // === Set Operations ===
    
    /**
     * Creates an EntitySet containing just this entity.
     */
    public abstract async EntitySet as_set_async();
}

Transaction Interface Changes

Proposed Interface (Async)

public interface Transaction : Object {
    
    /**
     * Indicates whether this transaction is still active.
     * Synchronous - quick property check.
     */
    public abstract bool active { get; }
    
    /**
     * Commits all changes made during this transaction.
     */
    public abstract async void commit_async() throws EngineError;
    
    /**
     * Rolls back all changes made during this transaction.
     */
    public abstract async void rollback_async();
}

HighLevel/LowLevel Storage Changes

Approach: Async Wrappers over Sync Implementation

The LowLevel storage classes remain synchronous (they operate on the Dbm directly). The HighLevel stores gain async versions that use the AsyncDbmQueue.

namespace Implexus.Storage.HighLevel {

public class EntityStore : Object {
    
    private LowLevel.EntityMetadataStorage _metadata;
    private LowLevel.TypeIndexStorage _type_index;
    private AsyncDbmQueue _queue;
    
    public EntityStore.with_queue(AsyncDbmQueue queue) {
        _queue = queue;
        // LowLevel stores still use sync Dbm for internal operations
        // but we route through the queue
    }
    
    // === Async Metadata Operations ===
    
    public async void store_metadata_async(
        Core.EntityPath path, 
        Core.EntityType type, 
        string? type_label = null
    ) throws StorageError {
        // Serialize and queue the write
        string key = "entity:" + path.to_string();
        var writer = new ElementWriter();
        writer.write_element(new Invercargill.NativeElement<int64?>((int64) type));
        writer.write_element(new Invercargill.NativeElement<string>(type_label ?? ""));
        
        yield _queue.set_async(key, writer.to_binary_data());
    }
    
    public async Core.EntityType? get_entity_type_async(Core.EntityPath path) throws StorageError {
        string key = "entity:" + path.to_string();
        var data = yield _queue.get_async(key);
        
        if (data == null) {
            return null;
        }
        
        var reader = new ElementReader((!) data);
        try {
            var element = reader.read_element();
            if (element.is_null()) {
                return null;
            }
            int64? type_val = element.as<int64?>();
            return (Core.EntityType) (type_val == null ? 0 : (!) type_val);
        } catch (Invercargill.ElementError e) {
            throw new StorageError.CORRUPT_DATA("Failed to read entity type: %s".printf(e.message));
        }
    }
    
    public async bool exists_async(Core.EntityPath path) {
        string key = "entity:" + path.to_string();
        return yield _queue.has_key_async(key);
    }
    
    public async void delete_async(Core.EntityPath path) throws StorageError {
        string key = "entity:" + path.to_string();
        yield _queue.delete_async(key);
    }
    
    // ... similar async versions for other methods
}

} // namespace Implexus.Storage.HighLevel

EmbeddedEngine Changes

Current Implementation

The EmbeddedEngine currently uses synchronous Dbm operations directly.

Proposed Implementation

public class EmbeddedEngine : Object, Core.Engine {
    
    private Storage.AsyncDbmQueue _dbm_queue;
    private Storage.HighLevel.EntityStore _entity_store;
    private Storage.HighLevel.DocumentStore _document_store;
    private Storage.HighLevel.ContainerStore _container_store;
    private Storage.HighLevel.CategoryStore _category_store;
    private Storage.HighLevel.CatalogueStore _catalogue_store;
    private Storage.HighLevel.IndexStore _index_store;
    
    public EmbeddedEngine(Core.StorageConfiguration config) {
        // Get Dbm from storage
        var basic_storage = (config.storage as Storage.BasicStorage);
        if (basic_storage != null) {
            var dbm = basic_storage.dbm;
            
            // Create async queue
            _dbm_queue = new Storage.AsyncDbmQueue(dbm);
            
            // Initialize stores with queue
            _entity_store = new Storage.HighLevel.EntityStore.with_queue(_dbm_queue);
            _document_store = new Storage.HighLevel.DocumentStore.with_queue(_dbm_queue);
            _container_store = new Storage.HighLevel.ContainerStore.with_queue(_dbm_queue);
            _category_store = new Storage.HighLevel.CategoryStore.with_queue(_dbm_queue);
            _catalogue_store = new Storage.HighLevel.CatalogueStore.with_queue(_dbm_queue);
            _index_store = new Storage.HighLevel.IndexStore.with_queue(_dbm_queue);
        }
    }
    
    // === Internal Sync Methods for Hook Use ===
    //
    // DECISION: Hooks remain synchronous, running in the DBM thread.
    // The storage layer (HighLevel/LowLevel) also remains synchronous.
    // EmbeddedEngine keeps internal sync versions of methods for hook use.
    //
    // These internal methods are NOT part of the public API and are only
    // used by the HookManager to perform operations during hook execution.
    
    internal Core.Entity? get_entity_or_null_sync(Core.EntityPath path) {
        // Synchronous implementation for hook use
        // Uses the underlying Dbm directly, bypassing the async queue
        return _entity_store.get_entity_or_null_direct(path);
    }
    
    // === Engine Interface Implementation ===
    
    public async Core.Entity get_root_async() throws Core.EngineError {
        if (_root != null) {
            return (!) _root;
        }
        
        var root_path = new Core.EntityPath.root();
        
        // Check if root exists
        bool exists = yield _entity_store.exists_async(root_path);
        if (!exists) {
            try {
                yield _entity_store.store_metadata_async(root_path, Core.EntityType.CONTAINER, null);
            } catch (Storage.StorageError e) {
                throw new Core.EngineError.STORAGE_ERROR("Failed to create root: %s".printf(e.message));
            }
        }
        
        _root = new Entities.Container(this, root_path);
        return (!) _root;
    }
    
    public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError {
        bool exists = yield _entity_store.exists_async(path);
        if (!exists) {
            throw new Core.EngineError.ENTITY_NOT_FOUND(
                "Entity not found: %s".printf(path.to_string())
            );
        }
        
        return yield create_entity_from_storage_async(path);
    }
    
    public async Core.Entity? get_entity_or_null_async(Core.EntityPath path) throws Core.EngineError {
        try {
            return yield get_entity_async(path);
        } catch (Core.EngineError e) {
            return null;
        }
    }
    
    public async bool entity_exists_async(Core.EntityPath path) throws Core.EngineError {
        return yield _entity_store.exists_async(path);
    }
    
    // ... rest of async implementations
}

RemoteEngine Changes

The RemoteEngine already uses socket communication. We need to make the socket operations properly async.

Current Implementation

internal Protocol.Message send_request_and_wait(Protocol.Message request) throws Protocol.ProtocolError {
    ensure_connected();
    var request_id = ((!) _writer).write_request(request);
    return ((!) _reader).read_response_for_request(request_id);
}

Proposed Implementation

internal async Protocol.Message send_request_and_wait_async(Protocol.Message request) throws Protocol.ProtocolError {
    ensure_connected();
    
    // Use async socket operations
    var request_id = yield ((!) _writer).write_request_async(request);
    return yield ((!) _reader).read_response_for_request_async(request_id);
}

public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError {
    ensure_connected();
    
    try {
        var request = new Protocol.GetEntityRequest.for_path(path);
        var response = yield send_request_and_wait_async(request);
        
        if (response.message_type == Protocol.MessageType.ERROR) {
            var error = (Protocol.ErrorResponse) response;
            throw error_code_to_engine_error(error.error_code, error.error_message);
        }
        
        if (response.message_type == Protocol.MessageType.ENTITY_RESPONSE) {
            var entity_response = (Protocol.EntityResponse) response;
            return create_remote_entity_from_data(entity_response.entity_data);
        }
        
        throw new Core.EngineError.PROTOCOL_ERROR("Unexpected response type");
        
    } catch (Protocol.ProtocolError e) {
        throw new Core.EngineError.PROTOCOL_ERROR("Protocol error: %s".printf(e.message));
    }
}

Migration Path

DECISION: Clean Break Migration

The project is in development, so we will use a clean break approach:

  • Remove all sync methods immediately
  • Update everything in this refactor
  • No gradual migration or deprecation period
  • No compatibility shims needed

Phase 1: Infrastructure

  1. Create AsyncDbmQueue class
  2. Add supports_concurrent_reads property to Dbm interface
  3. Update all Dbm implementations with the new property
  4. Add tests for the queue system

Phase 2: Interface Changes (Breaking)

  1. Replace sync methods with async methods in Engine interface
  2. Replace sync methods with async methods in Entity interface
  3. Update Transaction interface to async
  4. Remove with_write_transaction() helper entirely

Phase 3: Implementation Updates

  1. Implement async methods in EmbeddedEngine (including internal sync versions for hooks)
  2. Implement async methods in RemoteEngine with async sockets
  3. Update HighLevel stores to use async queue
  4. Keep LowLevel storage synchronous (operates on Dbm directly)

Phase 4: Entity Updates

  1. Update Container, Document, Category, Catalogue, Index to use async stores
  2. Ensure all entity operations use async methods internally

Phase 5: Cleanup

  1. Remove AsyncEngine and AsyncEntity wrapper classes
  2. Update all tests to use async methods
  3. Update any application code to use async methods

Files to Modify

New Files to Create

File Purpose
src/Storage/AsyncDbmQueue.vala Queue system for async DBM operations
src/Storage/DbmOperation.vala Operation types for the queue

Files to Modify

File Changes
src/Storage/Dbm.vala Add supports_concurrent_reads property
src/Storage/Gdbm/GdbmDbm.vala Implement supports_concurrent_reads (return false)
src/Storage/Lmdb/LmdbDbm.vala Implement supports_concurrent_reads (return true)
src/Storage/FilesystemDbm.vala Implement supports_concurrent_reads (return false)
src/Core/Engine.vala Convert all I/O methods to async
src/Core/Entity.vala Convert all I/O methods to async
src/Core/Transaction.vala (if separate) Convert to async
src/Storage/HighLevel/EntityStore.vala Add async methods
src/Storage/HighLevel/DocumentStore.vala Add async methods
src/Storage/HighLevel/ContainerStore.vala Add async methods
src/Storage/HighLevel/CategoryStore.vala Add async methods
src/Storage/HighLevel/CatalogueStore.vala Add async methods
src/Storage/HighLevel/IndexStore.vala Add async methods
src/Engine/EmbeddedEngine.vala Implement async interface methods
src/Engine/RemoteEngine.vala Implement async interface methods with async sockets
src/Entities/Container.vala Use async stores
src/Entities/Document.vala Use async stores
src/Entities/Category.vala Use async stores
src/Entities/Catalogue.vala Use async stores
src/Entities/Index.vala Use async stores
src/Entities/AbstractEntity.vala Update base implementations
src/Protocol/MessageReader.vala Add async read methods
src/Protocol/MessageWriter.vala Add async write methods
src/meson.build Add new files to build

Files to Delete (Phase 5)

File Reason
src/Core/AsyncEngine.vala Replaced by async interface methods
src/Core/AsyncEntity.vala Replaced by async interface methods

Test Files to Update

File Changes
tests/Storage/GdbmDbmTest.vala Add tests for supports_concurrent_reads
tests/Storage/LmdbDbmTest.vala Add tests for supports_concurrent_reads
tests/Storage/FilesystemDbmTest.vala Add tests for supports_concurrent_reads
tests/Storage/StorageTest.vala Add tests for AsyncDbmQueue
tests/Engine/EmbeddedEngineTest.vala Update to use async methods
All other tests Update to use async methods

Decisions Made

This section documents the finalized design decisions for the async I/O refactor.

Decision 1: Query Results - Eager Loading with Arrays

Decision: Use Entity[] (eager loading) for query results.

Rationale:

  • Simpler than implementing async iteration
  • Sufficient for expected data volumes
  • Avoids complexity of custom AsyncEnumerable<Entity>
  • Consistent with Vala's async patterns

Example:

public abstract async Entity[] query_by_type_async(string type_label) throws EngineError;

Decision 2: Migration Strategy - Clean Break

Decision: Remove all sync methods immediately, update everything in this refactor.

Rationale:

  • Project is in development phase
  • No external users to migrate
  • Cleaner codebase without deprecation paths
  • Faster implementation without compatibility layers

Implications:

  • No gradual migration period
  • No deprecated method markers needed
  • All code updated in single refactor

Decision 3: Transaction Helper - Removed

Decision: Remove with_write_transaction() entirely.

Rationale:

  • Vala doesn't support async delegates
  • Cannot pass async callbacks to transaction helper
  • Manual transaction management is clearer

New Pattern:

// Old (no longer possible):
yield engine.with_write_transaction_async(async (tx) => {
    yield entity.set_entity_property_async("key", value);
});

// New (manual management):
try {
    yield engine.begin_transaction_async();
    yield entity.set_entity_property_async("key", value);
    yield engine.commit_async();
} catch (Error e) {
    yield engine.rollback_async();
    throw e;
}

Decision 4: Hooks - Remain Synchronous

Decision: Keep hooks synchronous, running in the DBM thread.

Rationale:

  • Hooks should be fast operations
  • Avoids complexity of async hook management
  • Storage layer (HighLevel/LowLevel) remains synchronous
  • Simpler mental model for hook authors

Implementation Note: EmbeddedEngine keeps internal sync versions of methods like get_entity_or_null_sync() for hook use. These are:

  • Marked internal (not public API)
  • Used only by HookManager during hook execution
  • Bypass the async queue for direct Dbm access

Decision 5: Error Handling - Use throws Keyword

Decision: Use throws keyword for async methods, consistent with existing codebase.

Rationale:

  • Vala supports throws in async methods
  • Consistent with existing sync methods
  • Familiar pattern for Vala developers
  • No need for Result types or error callbacks

Example:

public async Entity get_entity_async(EntityPath path) throws EngineError;
public async Entity[] query_by_type_async(string type_label) throws EngineError;
public async void set_entity_property_async(string name, Element value) throws EngineError;

Decision 6: Version Compatibility - Clean Break

Decision: No compatibility shims needed - project is in development.

Rationale:

  • No released versions to support
  • No external users requiring migration path
  • Cleaner implementation without compatibility layers

Summary

This design proposes:

  1. AsyncDbmQueue: A dedicated thread with priority-based operation queue
  2. Async Interfaces: Engine and Entity interfaces with async methods
  3. Concurrent Read Support: Property on Dbm for LMDB optimization
  4. Clean Break Migration: Immediate removal of sync methods
  5. Unified API: No more wrapper classes, async is the default
  6. Manual Transactions: No transaction helper, explicit begin/commit/rollback
  7. Sync Hooks: Hooks remain synchronous with internal sync methods

The key benefits are:

  • Single thread for DBM operations (no thread explosion)
  • Priority reads for better responsiveness
  • Clean async API without wrapper duplication
  • Support for LMDB's concurrent read capability
  • Simpler codebase without deprecation complexity