# Async I/O Refactor Design ## Document Status - **Created**: 2026-03-14 - **Last Updated**: 2026-03-14 - **Status**: Finalized - **Author**: Architecture Review ## Executive Summary This document proposes a major refactoring of the Implexus codebase to make all I/O operations inherently async. The key changes are: 1. **Remove AsyncEngine and AsyncEntity wrappers** - Make the base interfaces async 2. **Create a DBM Queue System** - Serialize DBM operations with priority reads 3. **Add `supports_concurrent_reads` to Dbm interface** - Enable optimizations for LMDB 4. **Make Engine and Entity interfaces async** - All I/O operations become async --- ## Current Architecture Summary ### Current Async Pattern (To Be Removed) The current implementation uses wrapper classes: ``` ┌─────────────────────────────────────────────────────────────────┐ │ Application Layer │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ AsyncEngine / AsyncEntity │ │ │ │ - Wraps Engine/Entity instances │ │ - Each async method spawns a new Thread │ │ - Uses Idle.add to return results to main loop │ │ - Duplicates entire interface │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Engine / Entity Interfaces │ │ │ │ - Synchronous methods │ │ - Blocking I/O │ └─────────────────────────────────────────────────────────────────┘ ``` **Problems with Current Approach:** - Thread explosion: Each async call spawns a new thread - Interface duplication: AsyncEngine mirrors Engine, AsyncEntity mirrors Entity - Maintenance burden: Changes must be made in multiple places - Inefficient: No coordination between operations ### Current DBM Implementations | Implementation | Concurrent Reads | Notes | |----------------|------------------|-------| | GdbmDbm | No | GDBM does not support concurrent access | | LmdbDbm | Yes | LMDB supports concurrent readers via MVCC | | FilesystemDbm | Limited | File-per-key, but no explicit coordination | ### Current Storage Layer ``` ┌─────────────────────────────────────────────────────────────────┐ │ HighLevel Stores │ │ EntityStore, DocumentStore, ContainerStore, │ │ CategoryStore, CatalogueStore, IndexStore │ │ (Synchronous, compose LowLevel stores) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ LowLevel Storage │ │ EntityMetadataStorage, PropertiesStorage, ChildrenStorage, │ │ TypeIndexStorage, CategoryIndexStorage, TextIndexStorage, etc. │ │ (Synchronous, direct Dbm access) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Dbm Interface │ │ has_key(), get(), set(), delete(), keys, │ │ begin_transaction(), commit_transaction(), rollback_transaction()│ │ (All synchronous) │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## Proposed Architecture ### Design Principles 1. **Async by Default**: All I/O operations are async at the interface level 2. **Queue-Based for Embedded**: DBM operations go through a priority queue 3. **Native Async for Remote**: Socket operations use GLib async I/O 4. **Concurrent Read Optimization**: LMDB can spawn read threads 5. **Write Serialization**: All writes go through single queue ### Architecture Overview ``` ┌─────────────────────────────────────────────────────────────────┐ │ Application Layer │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Engine / Entity Interfaces │ │ │ │ - All I/O methods are async │ │ - No wrapper classes needed │ │ - Direct async API │ └─────────────────────────────────────────────────────────────────┘ │ ┌───────────────┴───────────────┐ ▼ ▼ ┌───────────────────────────┐ ┌───────────────────────────────┐ │ EmbeddedEngine │ │ RemoteEngine │ │ │ │ │ │ Uses AsyncDbmQueue │ │ Uses Async Socket I/O │ │ for DBM operations │ │ (GLib SocketConnection) │ └───────────────────────────┘ └───────────────────────────────┘ │ │ ▼ ▼ ┌───────────────────────────┐ ┌───────────────────────────────┐ │ AsyncDbmQueue │ │ Protocol.MessageReader │ │ │ │ Protocol.MessageWriter │ │ - Priority read queue │ │ │ │ - Serialized writes │ │ (Already async-capable) │ │ - Concurrent reads for │ │ │ │ LMDB │ │ │ └───────────────────────────┘ └───────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Dbm Interface │ │ │ │ + supports_concurrent_reads: bool { get; } │ │ (Methods remain synchronous - queue handles async) │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## DBM Queue System Design ### AsyncDbmQueue Class A dedicated queue system for DBM operations that: 1. Processes operations one at a time (serialized) 2. Prioritizes reads over writes 3. Uses `Idle.add` to return results to main loop 4. Supports concurrent reads for LMDB-like backends ```vala namespace Implexus.Storage { /** * Priority level for DBM operations. */ public enum DbmOperationPriority { READ = 0, // Higher priority - processed first WRITE = 1, // Lower priority - processed after reads TRANSACTION = 2 // Same as write, but grouped } /** * Delegate types for async callbacks. */ public delegate void DbmReadCallback(T? result, StorageError? error); public delegate void DbmWriteCallback(StorageError? error); /** * Queue system for async DBM operations. * * This class manages a dedicated thread for DBM operations, * ensuring serialized access while prioritizing reads. */ public class AsyncDbmQueue : Object { private Dbm _dbm; private Thread _worker_thread; private AsyncQueue _read_queue; private AsyncQueue _write_queue; private bool _running; private Mutex _mutex; private Cond _cond; /** * Creates a new AsyncDbmQueue wrapping the given Dbm. */ public AsyncDbmQueue(Dbm dbm) { _dbm = dbm; _read_queue = new AsyncQueue(); _write_queue = new AsyncQueue(); _running = true; _mutex = Mutex(); _cond = Cond(); _worker_thread = new Thread("dbm-worker", run_worker); } /** * Whether the underlying Dbm supports concurrent reads. */ public bool supports_concurrent_reads { get { return _dbm.supports_concurrent_reads; } } // === Read Operations === /** * Async has_key operation. */ public async bool has_key_async(string key) throws StorageError { bool result = false; StorageError? error = null; var op = new DbmOperation.read_has_key(key, (r, e) => { result = r; error = e; Idle.add(() => { has_key_async.callback(); return false; }); }); _read_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } return result; } /** * Async get operation. */ public async Invercargill.BinaryData? get_async(string key) throws StorageError { Invercargill.BinaryData? result = null; StorageError? error = null; var op = new DbmOperation.read_get(key, (r, e) => { result = r; error = e; Idle.add(() => { get_async.callback(); return false; }); }); _read_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } return result; } // === Write Operations === /** * Async set operation. */ public async void set_async(string key, Invercargill.BinaryData value) throws StorageError { StorageError? error = null; var op = new DbmOperation.write_set(key, value, (e) => { error = e; Idle.add(() => { set_async.callback(); return false; }); }); _write_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } } /** * Async delete operation. */ public async void delete_async(string key) throws StorageError { StorageError? error = null; var op = new DbmOperation.write_delete(key, (e) => { error = e; Idle.add(() => { delete_async.callback(); return false; }); }); _write_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } } // === Transaction Operations === /** * Async begin_transaction operation. */ public async void begin_transaction_async() throws StorageError { StorageError? error = null; var op = new DbmOperation.transaction_begin((e) => { error = e; Idle.add(() => { begin_transaction_async.callback(); return false; }); }); _write_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } } /** * Async commit_transaction operation. */ public async void commit_transaction_async() throws StorageError { StorageError? error = null; var op = new DbmOperation.transaction_commit((e) => { error = e; Idle.add(() => { commit_transaction_async.callback(); return false; }); }); _write_queue.push(op); notify_worker(); yield; if (error != null) { throw error; } } /** * Async rollback_transaction operation. */ public async void rollback_transaction_async() { var op = new DbmOperation.transaction_rollback(() => { Idle.add(() => { rollback_transaction_async.callback(); return false; }); }); _write_queue.push(op); notify_worker(); yield; } // === Worker Thread === private void run_worker() { while (_running) { DbmOperation? op = null; // Prioritize reads over writes op = _read_queue.try_pop(); if (op == null) { op = _write_queue.try_pop(); } if (op != null) { process_operation((!) op); } else { // Wait for new operations _mutex.lock(); _cond.wait(_mutex, 100000); // 100ms timeout _mutex.unlock(); } } } private void process_operation(DbmOperation op) { switch (op.type) { case DbmOperationType.HAS_KEY: bool result = _dbm.has_key(op.key); op.read_bool_callback(result, null); break; case DbmOperationType.GET: var result = _dbm.get(op.key); op.read_data_callback(result, null); break; case DbmOperationType.SET: try { _dbm.set(op.key, op.value); op.write_callback(null); } catch (StorageError e) { op.write_callback(e); } break; case DbmOperationType.DELETE: try { _dbm.delete(op.key); op.write_callback(null); } catch (StorageError e) { op.write_callback(e); } break; case DbmOperationType.BEGIN_TRANSACTION: try { _dbm.begin_transaction(); op.write_callback(null); } catch (StorageError e) { op.write_callback(e); } break; case DbmOperationType.COMMIT_TRANSACTION: try { _dbm.commit_transaction(); op.write_callback(null); } catch (StorageError e) { op.write_callback(e); } break; case DbmOperationType.ROLLBACK_TRANSACTION: _dbm.rollback_transaction(); op.void_callback(); break; } } private void notify_worker() { _mutex.lock(); _cond.signal(); _mutex.unlock(); } /** * Shuts down the worker thread. */ public void shutdown() { _running = false; notify_worker(); _worker_thread.join(); } } /** * Types of DBM operations. */ private enum DbmOperationType { HAS_KEY, GET, SET, DELETE, BEGIN_TRANSACTION, COMMIT_TRANSACTION, ROLLBACK_TRANSACTION } /** * Represents a single DBM operation in the queue. */ private class DbmOperation : Object { public DbmOperationType type; public string key; public Invercargill.BinaryData value; public DbmReadCallback? read_bool_callback; public DbmReadCallback? read_data_callback; public DbmWriteCallback? write_callback; public delegate void VoidCallback(); public VoidCallback? void_callback; // Factory methods for creating operations public DbmOperation.read_has_key(string key, DbmReadCallback callback) { this.type = DbmOperationType.HAS_KEY; this.key = key; this.read_bool_callback = callback; } public DbmOperation.read_get(string key, DbmReadCallback callback) { this.type = DbmOperationType.GET; this.key = key; this.read_data_callback = callback; } public DbmOperation.write_set(string key, Invercargill.BinaryData value, DbmWriteCallback callback) { this.type = DbmOperationType.SET; this.key = key; this.value = value; this.write_callback = callback; } public DbmOperation.write_delete(string key, DbmWriteCallback callback) { this.type = DbmOperationType.DELETE; this.key = key; this.write_callback = callback; } public DbmOperation.transaction_begin(DbmWriteCallback callback) { this.type = DbmOperationType.BEGIN_TRANSACTION; this.write_callback = callback; } public DbmOperation.transaction_commit(DbmWriteCallback callback) { this.type = DbmOperationType.COMMIT_TRANSACTION; this.write_callback = callback; } public DbmOperation.transaction_rollback(VoidCallback callback) { this.type = DbmOperationType.ROLLBACK_TRANSACTION; this.void_callback = callback; } } } // namespace Implexus.Storage ``` ### Concurrent Read Optimization for LMDB For DBM implementations that support concurrent reads (like LMDB), the queue can spawn dedicated read threads: ```vala // In AsyncDbmQueue, when supports_concurrent_reads is true: private void process_read_with_thread(DbmOperation op) { if (!_dbm.supports_concurrent_reads) { process_operation(op); return; } // For LMDB, spawn a new thread for the read new Thread("dbm-reader", () => { // LMDB allows concurrent readers var result = _dbm.get(op.key); // Return result via Idle.add Idle.add(() => { op.read_data_callback(result, null); return false; }); }); } ``` --- ## Dbm Interface Changes ### Add supports_concurrent_reads Property ```vala namespace Implexus.Storage { public interface Dbm : Object { // === Existing Methods === public abstract bool has_key(string key); public abstract Invercargill.BinaryData? @get(string key); public abstract void @set(string key, Invercargill.BinaryData value) throws StorageError; public abstract void delete(string key) throws StorageError; public abstract Invercargill.Enumerable keys { owned get; } public abstract void begin_transaction() throws StorageError; public abstract void commit_transaction() throws StorageError; public abstract void rollback_transaction(); public abstract bool in_transaction { get; } // === NEW: Concurrent Read Support === /** * Indicates whether this Dbm implementation supports concurrent reads. * * If true, multiple threads can read simultaneously without blocking. * Writes are still serialized through the queue. * * - LMDB: true (MVCC allows concurrent readers) * - GDBM: false (single-threaded access) * - FilesystemDbm: false (no explicit coordination) */ public abstract bool supports_concurrent_reads { get; } } } // namespace Implexus.Storage ``` ### Implementation Updates ```vala // GdbmDbm.vala public class GdbmDbm : Object, Dbm { public bool supports_concurrent_reads { get { return false; } } // ... rest of implementation } // LmdbDbm.vala public class LmdbDbm : Object, Dbm { public bool supports_concurrent_reads { get { return true; } } // ... rest of implementation } // FilesystemDbm.vala public class FilesystemDbm : Object, Dbm { public bool supports_concurrent_reads { get { return false; } } // ... rest of implementation } ``` --- ## Engine Interface Changes ### Current Interface (Synchronous) ```vala public interface Engine : Object { public abstract Entity get_root(); public abstract Entity? get_entity(EntityPath path) throws EngineError; public abstract Entity? get_entity_or_null(EntityPath path); public abstract bool entity_exists(EntityPath path); public abstract Invercargill.Enumerable query_by_type(string type_label); public abstract Invercargill.Enumerable query_by_expression(string type_label, string expression); public abstract Transaction begin_transaction() throws EngineError; public abstract bool in_transaction { get; } public abstract StorageConfiguration configuration { owned get; } public signal void entity_created(Entity entity); public signal void entity_deleted(EntityPath path); public signal void entity_modified(Entity entity); } ``` ### Proposed Interface (Async) ```vala public interface Engine : Object { // === Root Access === /** * Gets the root entity of the database. * * This operation is async as it may require I/O to verify/create root. */ public abstract async Entity get_root_async() throws EngineError; // === Path-Based Access === /** * Gets an entity by path, throwing an error if not found. */ public abstract async Entity? get_entity_async(EntityPath path) throws EngineError; /** * Gets an entity by path, returning null if not found. */ public abstract async Entity? get_entity_or_null_async(EntityPath path) throws EngineError; /** * Checks if an entity exists at the specified path. */ public abstract async bool entity_exists_async(EntityPath path) throws EngineError; // === Query Operations === /** * Queries all entities of a specific type. * * Returns an array with eager loading. This is simpler than async * iteration and sufficient for expected data volumes. * * DECISION: Use Entity[] (eager loading) - simpler and sufficient * for expected data volumes. See "Decisions Made" section. */ public abstract async Entity[] query_by_type_async(string type_label) throws EngineError; /** * Queries entities by type and expression. */ public abstract async Entity[] query_by_expression_async(string type_label, string expression) throws EngineError; // === Transactions === /** * Begins a new transaction. * * NOTE: The with_write_transaction() helper has been REMOVED. * Vala doesn't support async delegates, so transactions must be * managed manually with begin_transaction_async(), perform operations, * then commit_async() or rollback_async(). * * See "Decisions Made" section for details. */ public abstract async Transaction begin_transaction_async() throws EngineError; /** * Indicates whether a transaction is currently active. * This is synchronous as it's a quick property check. */ public abstract bool in_transaction { get; } // === Configuration === /** * Gets the storage configuration. * This is synchronous as it's a quick property access. */ public abstract StorageConfiguration configuration { owned get; } // === Events === public signal void entity_created(Entity entity); public signal void entity_deleted(EntityPath path); public signal void entity_modified(Entity entity); } ``` --- ## Entity Interface Changes ### Current Interface (Synchronous) The current Entity interface has synchronous methods for all operations. ### Proposed Interface (Async) ```vala public interface Entity : Object { // === Identity (Synchronous - No I/O) === public abstract unowned Engine engine { get; } public abstract EntityPath path { owned get; } public abstract string name { owned get; } public abstract EntityType entity_type { get; } public abstract string type_label { owned get; } public abstract string configured_expression { owned get; } public abstract string configured_type_label { owned get; } // === Parent/Child Navigation (Async - May require I/O) === /** * Gets the parent entity, or null for the root. */ public abstract async Entity? get_parent_async() throws EngineError; /** * Gets the names of all child entities. */ public abstract async Invercargill.ReadOnlySet get_child_names_async() throws EngineError; /** * Gets a child entity by name. */ public abstract async Entity? get_child_async(string name) throws EngineError; /** * Gets all child entities. */ public abstract async Entity[] get_children_async() throws EngineError; // === Child Management (Async - Requires I/O) === public abstract async Entity? create_container_async(string name) throws EngineError; public abstract async Entity? create_document_async(string name, string type_label) throws EngineError; public abstract async Entity? create_category_async(string name, string type_label, string expression) throws EngineError; public abstract async Entity? create_index_async(string name, string type_label, string expression) throws EngineError; public abstract async Entity? create_catalogue_async(string name, string type_label, string expression) throws EngineError; // === Document Operations (Async - Requires I/O) === /** * Gets the properties stored in this document. */ public abstract async Invercargill.Properties get_properties_async() throws EngineError; /** * Gets a property value by name. */ public abstract async Invercargill.Element? get_entity_property_async(string name) throws EngineError; /** * Sets a property value. */ public abstract async void set_entity_property_async(string name, Invercargill.Element value) throws EngineError; /** * Removes a property. */ public abstract async void remove_property_async(string name) throws EngineError; // === Lifecycle (Async - Requires I/O) === /** * Deletes this entity from the database. */ public abstract async void delete_async() throws EngineError; /** * Checks if this entity still exists in the database. * This is synchronous as it may be cached or quick to check. */ public abstract bool exists { get; } // === Set Operations === /** * Creates an EntitySet containing just this entity. */ public abstract async EntitySet as_set_async(); } ``` --- ## Transaction Interface Changes ### Proposed Interface (Async) ```vala public interface Transaction : Object { /** * Indicates whether this transaction is still active. * Synchronous - quick property check. */ public abstract bool active { get; } /** * Commits all changes made during this transaction. */ public abstract async void commit_async() throws EngineError; /** * Rolls back all changes made during this transaction. */ public abstract async void rollback_async(); } ``` --- ## HighLevel/LowLevel Storage Changes ### Approach: Async Wrappers over Sync Implementation The LowLevel storage classes remain synchronous (they operate on the Dbm directly). The HighLevel stores gain async versions that use the AsyncDbmQueue. ```vala namespace Implexus.Storage.HighLevel { public class EntityStore : Object { private LowLevel.EntityMetadataStorage _metadata; private LowLevel.TypeIndexStorage _type_index; private AsyncDbmQueue _queue; public EntityStore.with_queue(AsyncDbmQueue queue) { _queue = queue; // LowLevel stores still use sync Dbm for internal operations // but we route through the queue } // === Async Metadata Operations === public async void store_metadata_async( Core.EntityPath path, Core.EntityType type, string? type_label = null ) throws StorageError { // Serialize and queue the write string key = "entity:" + path.to_string(); var writer = new ElementWriter(); writer.write_element(new Invercargill.NativeElement((int64) type)); writer.write_element(new Invercargill.NativeElement(type_label ?? "")); yield _queue.set_async(key, writer.to_binary_data()); } public async Core.EntityType? get_entity_type_async(Core.EntityPath path) throws StorageError { string key = "entity:" + path.to_string(); var data = yield _queue.get_async(key); if (data == null) { return null; } var reader = new ElementReader((!) data); try { var element = reader.read_element(); if (element.is_null()) { return null; } int64? type_val = element.as(); return (Core.EntityType) (type_val == null ? 0 : (!) type_val); } catch (Invercargill.ElementError e) { throw new StorageError.CORRUPT_DATA("Failed to read entity type: %s".printf(e.message)); } } public async bool exists_async(Core.EntityPath path) { string key = "entity:" + path.to_string(); return yield _queue.has_key_async(key); } public async void delete_async(Core.EntityPath path) throws StorageError { string key = "entity:" + path.to_string(); yield _queue.delete_async(key); } // ... similar async versions for other methods } } // namespace Implexus.Storage.HighLevel ``` --- ## EmbeddedEngine Changes ### Current Implementation The EmbeddedEngine currently uses synchronous Dbm operations directly. ### Proposed Implementation ```vala public class EmbeddedEngine : Object, Core.Engine { private Storage.AsyncDbmQueue _dbm_queue; private Storage.HighLevel.EntityStore _entity_store; private Storage.HighLevel.DocumentStore _document_store; private Storage.HighLevel.ContainerStore _container_store; private Storage.HighLevel.CategoryStore _category_store; private Storage.HighLevel.CatalogueStore _catalogue_store; private Storage.HighLevel.IndexStore _index_store; public EmbeddedEngine(Core.StorageConfiguration config) { // Get Dbm from storage var basic_storage = (config.storage as Storage.BasicStorage); if (basic_storage != null) { var dbm = basic_storage.dbm; // Create async queue _dbm_queue = new Storage.AsyncDbmQueue(dbm); // Initialize stores with queue _entity_store = new Storage.HighLevel.EntityStore.with_queue(_dbm_queue); _document_store = new Storage.HighLevel.DocumentStore.with_queue(_dbm_queue); _container_store = new Storage.HighLevel.ContainerStore.with_queue(_dbm_queue); _category_store = new Storage.HighLevel.CategoryStore.with_queue(_dbm_queue); _catalogue_store = new Storage.HighLevel.CatalogueStore.with_queue(_dbm_queue); _index_store = new Storage.HighLevel.IndexStore.with_queue(_dbm_queue); } } // === Internal Sync Methods for Hook Use === // // DECISION: Hooks remain synchronous, running in the DBM thread. // The storage layer (HighLevel/LowLevel) also remains synchronous. // EmbeddedEngine keeps internal sync versions of methods for hook use. // // These internal methods are NOT part of the public API and are only // used by the HookManager to perform operations during hook execution. internal Core.Entity? get_entity_or_null_sync(Core.EntityPath path) { // Synchronous implementation for hook use // Uses the underlying Dbm directly, bypassing the async queue return _entity_store.get_entity_or_null_direct(path); } // === Engine Interface Implementation === public async Core.Entity get_root_async() throws Core.EngineError { if (_root != null) { return (!) _root; } var root_path = new Core.EntityPath.root(); // Check if root exists bool exists = yield _entity_store.exists_async(root_path); if (!exists) { try { yield _entity_store.store_metadata_async(root_path, Core.EntityType.CONTAINER, null); } catch (Storage.StorageError e) { throw new Core.EngineError.STORAGE_ERROR("Failed to create root: %s".printf(e.message)); } } _root = new Entities.Container(this, root_path); return (!) _root; } public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError { bool exists = yield _entity_store.exists_async(path); if (!exists) { throw new Core.EngineError.ENTITY_NOT_FOUND( "Entity not found: %s".printf(path.to_string()) ); } return yield create_entity_from_storage_async(path); } public async Core.Entity? get_entity_or_null_async(Core.EntityPath path) throws Core.EngineError { try { return yield get_entity_async(path); } catch (Core.EngineError e) { return null; } } public async bool entity_exists_async(Core.EntityPath path) throws Core.EngineError { return yield _entity_store.exists_async(path); } // ... rest of async implementations } ``` --- ## RemoteEngine Changes The RemoteEngine already uses socket communication. We need to make the socket operations properly async. ### Current Implementation ```vala internal Protocol.Message send_request_and_wait(Protocol.Message request) throws Protocol.ProtocolError { ensure_connected(); var request_id = ((!) _writer).write_request(request); return ((!) _reader).read_response_for_request(request_id); } ``` ### Proposed Implementation ```vala internal async Protocol.Message send_request_and_wait_async(Protocol.Message request) throws Protocol.ProtocolError { ensure_connected(); // Use async socket operations var request_id = yield ((!) _writer).write_request_async(request); return yield ((!) _reader).read_response_for_request_async(request_id); } public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError { ensure_connected(); try { var request = new Protocol.GetEntityRequest.for_path(path); var response = yield send_request_and_wait_async(request); if (response.message_type == Protocol.MessageType.ERROR) { var error = (Protocol.ErrorResponse) response; throw error_code_to_engine_error(error.error_code, error.error_message); } if (response.message_type == Protocol.MessageType.ENTITY_RESPONSE) { var entity_response = (Protocol.EntityResponse) response; return create_remote_entity_from_data(entity_response.entity_data); } throw new Core.EngineError.PROTOCOL_ERROR("Unexpected response type"); } catch (Protocol.ProtocolError e) { throw new Core.EngineError.PROTOCOL_ERROR("Protocol error: %s".printf(e.message)); } } ``` --- ## Migration Path **DECISION: Clean Break Migration** The project is in development, so we will use a clean break approach: - Remove all sync methods immediately - Update everything in this refactor - No gradual migration or deprecation period - No compatibility shims needed ### Phase 1: Infrastructure 1. Create `AsyncDbmQueue` class 2. Add `supports_concurrent_reads` property to Dbm interface 3. Update all Dbm implementations with the new property 4. Add tests for the queue system ### Phase 2: Interface Changes (Breaking) 1. Replace sync methods with async methods in Engine interface 2. Replace sync methods with async methods in Entity interface 3. Update Transaction interface to async 4. Remove `with_write_transaction()` helper entirely ### Phase 3: Implementation Updates 1. Implement async methods in EmbeddedEngine (including internal sync versions for hooks) 2. Implement async methods in RemoteEngine with async sockets 3. Update HighLevel stores to use async queue 4. Keep LowLevel storage synchronous (operates on Dbm directly) ### Phase 4: Entity Updates 1. Update Container, Document, Category, Catalogue, Index to use async stores 2. Ensure all entity operations use async methods internally ### Phase 5: Cleanup 1. Remove AsyncEngine and AsyncEntity wrapper classes 2. Update all tests to use async methods 3. Update any application code to use async methods --- ## Files to Modify ### New Files to Create | File | Purpose | |------|---------| | `src/Storage/AsyncDbmQueue.vala` | Queue system for async DBM operations | | `src/Storage/DbmOperation.vala` | Operation types for the queue | ### Files to Modify | File | Changes | |------|---------| | `src/Storage/Dbm.vala` | Add `supports_concurrent_reads` property | | `src/Storage/Gdbm/GdbmDbm.vala` | Implement `supports_concurrent_reads` (return false) | | `src/Storage/Lmdb/LmdbDbm.vala` | Implement `supports_concurrent_reads` (return true) | | `src/Storage/FilesystemDbm.vala` | Implement `supports_concurrent_reads` (return false) | | `src/Core/Engine.vala` | Convert all I/O methods to async | | `src/Core/Entity.vala` | Convert all I/O methods to async | | `src/Core/Transaction.vala` | (if separate) Convert to async | | `src/Storage/HighLevel/EntityStore.vala` | Add async methods | | `src/Storage/HighLevel/DocumentStore.vala` | Add async methods | | `src/Storage/HighLevel/ContainerStore.vala` | Add async methods | | `src/Storage/HighLevel/CategoryStore.vala` | Add async methods | | `src/Storage/HighLevel/CatalogueStore.vala` | Add async methods | | `src/Storage/HighLevel/IndexStore.vala` | Add async methods | | `src/Engine/EmbeddedEngine.vala` | Implement async interface methods | | `src/Engine/RemoteEngine.vala` | Implement async interface methods with async sockets | | `src/Entities/Container.vala` | Use async stores | | `src/Entities/Document.vala` | Use async stores | | `src/Entities/Category.vala` | Use async stores | | `src/Entities/Catalogue.vala` | Use async stores | | `src/Entities/Index.vala` | Use async stores | | `src/Entities/AbstractEntity.vala` | Update base implementations | | `src/Protocol/MessageReader.vala` | Add async read methods | | `src/Protocol/MessageWriter.vala` | Add async write methods | | `src/meson.build` | Add new files to build | ### Files to Delete (Phase 5) | File | Reason | |------|--------| | `src/Core/AsyncEngine.vala` | Replaced by async interface methods | | `src/Core/AsyncEntity.vala` | Replaced by async interface methods | ### Test Files to Update | File | Changes | |------|---------| | `tests/Storage/GdbmDbmTest.vala` | Add tests for `supports_concurrent_reads` | | `tests/Storage/LmdbDbmTest.vala` | Add tests for `supports_concurrent_reads` | | `tests/Storage/FilesystemDbmTest.vala` | Add tests for `supports_concurrent_reads` | | `tests/Storage/StorageTest.vala` | Add tests for AsyncDbmQueue | | `tests/Engine/EmbeddedEngineTest.vala` | Update to use async methods | | All other tests | Update to use async methods | --- ## Decisions Made This section documents the finalized design decisions for the async I/O refactor. ### Decision 1: Query Results - Eager Loading with Arrays **Decision:** Use `Entity[]` (eager loading) for query results. **Rationale:** - Simpler than implementing async iteration - Sufficient for expected data volumes - Avoids complexity of custom `AsyncEnumerable` - Consistent with Vala's async patterns **Example:** ```vala public abstract async Entity[] query_by_type_async(string type_label) throws EngineError; ``` ### Decision 2: Migration Strategy - Clean Break **Decision:** Remove all sync methods immediately, update everything in this refactor. **Rationale:** - Project is in development phase - No external users to migrate - Cleaner codebase without deprecation paths - Faster implementation without compatibility layers **Implications:** - No gradual migration period - No deprecated method markers needed - All code updated in single refactor ### Decision 3: Transaction Helper - Removed **Decision:** Remove `with_write_transaction()` entirely. **Rationale:** - Vala doesn't support async delegates - Cannot pass async callbacks to transaction helper - Manual transaction management is clearer **New Pattern:** ```vala // Old (no longer possible): yield engine.with_write_transaction_async(async (tx) => { yield entity.set_entity_property_async("key", value); }); // New (manual management): try { yield engine.begin_transaction_async(); yield entity.set_entity_property_async("key", value); yield engine.commit_async(); } catch (Error e) { yield engine.rollback_async(); throw e; } ``` ### Decision 4: Hooks - Remain Synchronous **Decision:** Keep hooks synchronous, running in the DBM thread. **Rationale:** - Hooks should be fast operations - Avoids complexity of async hook management - Storage layer (HighLevel/LowLevel) remains synchronous - Simpler mental model for hook authors **Implementation Note:** EmbeddedEngine keeps internal sync versions of methods like `get_entity_or_null_sync()` for hook use. These are: - Marked `internal` (not public API) - Used only by HookManager during hook execution - Bypass the async queue for direct Dbm access ### Decision 5: Error Handling - Use `throws` Keyword **Decision:** Use `throws` keyword for async methods, consistent with existing codebase. **Rationale:** - Vala supports throws in async methods - Consistent with existing sync methods - Familiar pattern for Vala developers - No need for Result types or error callbacks **Example:** ```vala public async Entity get_entity_async(EntityPath path) throws EngineError; public async Entity[] query_by_type_async(string type_label) throws EngineError; public async void set_entity_property_async(string name, Element value) throws EngineError; ``` ### Decision 6: Version Compatibility - Clean Break **Decision:** No compatibility shims needed - project is in development. **Rationale:** - No released versions to support - No external users requiring migration path - Cleaner implementation without compatibility layers --- ## Summary This design proposes: 1. **AsyncDbmQueue**: A dedicated thread with priority-based operation queue 2. **Async Interfaces**: Engine and Entity interfaces with async methods 3. **Concurrent Read Support**: Property on Dbm for LMDB optimization 4. **Clean Break Migration**: Immediate removal of sync methods 5. **Unified API**: No more wrapper classes, async is the default 6. **Manual Transactions**: No transaction helper, explicit begin/commit/rollback 7. **Sync Hooks**: Hooks remain synchronous with internal sync methods The key benefits are: - Single thread for DBM operations (no thread explosion) - Priority reads for better responsiveness - Clean async API without wrapper duplication - Support for LMDB's concurrent read capability - Simpler codebase without deprecation complexity