This document proposes a major refactoring of the Implexus codebase to make all I/O operations inherently async. The key changes are:
supports_concurrent_reads to Dbm interface - Enable optimizations for LMDBThe current implementation uses wrapper classes:
┌─────────────────────────────────────────────────────────────────┐
│ Application Layer │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ AsyncEngine / AsyncEntity │
│ │
│ - Wraps Engine/Entity instances │
│ - Each async method spawns a new Thread │
│ - Uses Idle.add to return results to main loop │
│ - Duplicates entire interface │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Engine / Entity Interfaces │
│ │
│ - Synchronous methods │
│ - Blocking I/O │
└─────────────────────────────────────────────────────────────────┘
Problems with Current Approach:
| Implementation | Concurrent Reads | Notes |
|---|---|---|
| GdbmDbm | No | GDBM does not support concurrent access |
| LmdbDbm | Yes | LMDB supports concurrent readers via MVCC |
| FilesystemDbm | Limited | File-per-key, but no explicit coordination |
┌─────────────────────────────────────────────────────────────────┐
│ HighLevel Stores │
│ EntityStore, DocumentStore, ContainerStore, │
│ CategoryStore, CatalogueStore, IndexStore │
│ (Synchronous, compose LowLevel stores) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ LowLevel Storage │
│ EntityMetadataStorage, PropertiesStorage, ChildrenStorage, │
│ TypeIndexStorage, CategoryIndexStorage, TextIndexStorage, etc. │
│ (Synchronous, direct Dbm access) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Dbm Interface │
│ has_key(), get(), set(), delete(), keys, │
│ begin_transaction(), commit_transaction(), rollback_transaction()│
│ (All synchronous) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Application Layer │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Engine / Entity Interfaces │
│ │
│ - All I/O methods are async │
│ - No wrapper classes needed │
│ - Direct async API │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌───────────────────────────┐ ┌───────────────────────────────┐
│ EmbeddedEngine │ │ RemoteEngine │
│ │ │ │
│ Uses AsyncDbmQueue │ │ Uses Async Socket I/O │
│ for DBM operations │ │ (GLib SocketConnection) │
└───────────────────────────┘ └───────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────┐ ┌───────────────────────────────┐
│ AsyncDbmQueue │ │ Protocol.MessageReader │
│ │ │ Protocol.MessageWriter │
│ - Priority read queue │ │ │
│ - Serialized writes │ │ (Already async-capable) │
│ - Concurrent reads for │ │ │
│ LMDB │ │ │
└───────────────────────────┘ └───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Dbm Interface │
│ │
│ + supports_concurrent_reads: bool { get; } │
│ (Methods remain synchronous - queue handles async) │
└─────────────────────────────────────────────────────────────────┘
A dedicated queue system for DBM operations that:
Idle.add to return results to main loopSupports concurrent reads for LMDB-like backends
namespace Implexus.Storage {
/**
* Priority level for DBM operations.
*/
public enum DbmOperationPriority {
READ = 0, // Higher priority - processed first
WRITE = 1, // Lower priority - processed after reads
TRANSACTION = 2 // Same as write, but grouped
}
/**
* Delegate types for async callbacks.
*/
public delegate void DbmReadCallback<T>(T? result, StorageError? error);
public delegate void DbmWriteCallback(StorageError? error);
/**
* Queue system for async DBM operations.
*
* This class manages a dedicated thread for DBM operations,
* ensuring serialized access while prioritizing reads.
*/
public class AsyncDbmQueue : Object {
private Dbm _dbm;
private Thread<void> _worker_thread;
private AsyncQueue<DbmOperation> _read_queue;
private AsyncQueue<DbmOperation> _write_queue;
private bool _running;
private Mutex _mutex;
private Cond _cond;
/**
* Creates a new AsyncDbmQueue wrapping the given Dbm.
*/
public AsyncDbmQueue(Dbm dbm) {
_dbm = dbm;
_read_queue = new AsyncQueue<DbmOperation>();
_write_queue = new AsyncQueue<DbmOperation>();
_running = true;
_mutex = Mutex();
_cond = Cond();
_worker_thread = new Thread<void>("dbm-worker", run_worker);
}
/**
* Whether the underlying Dbm supports concurrent reads.
*/
public bool supports_concurrent_reads {
get { return _dbm.supports_concurrent_reads; }
}
// === Read Operations ===
/**
* Async has_key operation.
*/
public async bool has_key_async(string key) throws StorageError {
bool result = false;
StorageError? error = null;
var op = new DbmOperation.read_has_key(key, (r, e) => {
result = r;
error = e;
Idle.add(() => {
has_key_async.callback();
return false;
});
});
_read_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
return result;
}
/**
* Async get operation.
*/
public async Invercargill.BinaryData? get_async(string key) throws StorageError {
Invercargill.BinaryData? result = null;
StorageError? error = null;
var op = new DbmOperation.read_get(key, (r, e) => {
result = r;
error = e;
Idle.add(() => {
get_async.callback();
return false;
});
});
_read_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
return result;
}
// === Write Operations ===
/**
* Async set operation.
*/
public async void set_async(string key, Invercargill.BinaryData value) throws StorageError {
StorageError? error = null;
var op = new DbmOperation.write_set(key, value, (e) => {
error = e;
Idle.add(() => {
set_async.callback();
return false;
});
});
_write_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
}
/**
* Async delete operation.
*/
public async void delete_async(string key) throws StorageError {
StorageError? error = null;
var op = new DbmOperation.write_delete(key, (e) => {
error = e;
Idle.add(() => {
delete_async.callback();
return false;
});
});
_write_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
}
// === Transaction Operations ===
/**
* Async begin_transaction operation.
*/
public async void begin_transaction_async() throws StorageError {
StorageError? error = null;
var op = new DbmOperation.transaction_begin((e) => {
error = e;
Idle.add(() => {
begin_transaction_async.callback();
return false;
});
});
_write_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
}
/**
* Async commit_transaction operation.
*/
public async void commit_transaction_async() throws StorageError {
StorageError? error = null;
var op = new DbmOperation.transaction_commit((e) => {
error = e;
Idle.add(() => {
commit_transaction_async.callback();
return false;
});
});
_write_queue.push(op);
notify_worker();
yield;
if (error != null) {
throw error;
}
}
/**
* Async rollback_transaction operation.
*/
public async void rollback_transaction_async() {
var op = new DbmOperation.transaction_rollback(() => {
Idle.add(() => {
rollback_transaction_async.callback();
return false;
});
});
_write_queue.push(op);
notify_worker();
yield;
}
// === Worker Thread ===
private void run_worker() {
while (_running) {
DbmOperation? op = null;
// Prioritize reads over writes
op = _read_queue.try_pop();
if (op == null) {
op = _write_queue.try_pop();
}
if (op != null) {
process_operation((!) op);
} else {
// Wait for new operations
_mutex.lock();
_cond.wait(_mutex, 100000); // 100ms timeout
_mutex.unlock();
}
}
}
private void process_operation(DbmOperation op) {
switch (op.type) {
case DbmOperationType.HAS_KEY:
bool result = _dbm.has_key(op.key);
op.read_bool_callback(result, null);
break;
case DbmOperationType.GET:
var result = _dbm.get(op.key);
op.read_data_callback(result, null);
break;
case DbmOperationType.SET:
try {
_dbm.set(op.key, op.value);
op.write_callback(null);
} catch (StorageError e) {
op.write_callback(e);
}
break;
case DbmOperationType.DELETE:
try {
_dbm.delete(op.key);
op.write_callback(null);
} catch (StorageError e) {
op.write_callback(e);
}
break;
case DbmOperationType.BEGIN_TRANSACTION:
try {
_dbm.begin_transaction();
op.write_callback(null);
} catch (StorageError e) {
op.write_callback(e);
}
break;
case DbmOperationType.COMMIT_TRANSACTION:
try {
_dbm.commit_transaction();
op.write_callback(null);
} catch (StorageError e) {
op.write_callback(e);
}
break;
case DbmOperationType.ROLLBACK_TRANSACTION:
_dbm.rollback_transaction();
op.void_callback();
break;
}
}
private void notify_worker() {
_mutex.lock();
_cond.signal();
_mutex.unlock();
}
/**
* Shuts down the worker thread.
*/
public void shutdown() {
_running = false;
notify_worker();
_worker_thread.join();
}
}
/**
* Types of DBM operations.
*/
private enum DbmOperationType {
HAS_KEY,
GET,
SET,
DELETE,
BEGIN_TRANSACTION,
COMMIT_TRANSACTION,
ROLLBACK_TRANSACTION
}
/**
* Represents a single DBM operation in the queue.
*/
private class DbmOperation : Object {
public DbmOperationType type;
public string key;
public Invercargill.BinaryData value;
public DbmReadCallback<bool>? read_bool_callback;
public DbmReadCallback<Invercargill.BinaryData>? read_data_callback;
public DbmWriteCallback? write_callback;
public delegate void VoidCallback();
public VoidCallback? void_callback;
// Factory methods for creating operations
public DbmOperation.read_has_key(string key, DbmReadCallback<bool> callback) {
this.type = DbmOperationType.HAS_KEY;
this.key = key;
this.read_bool_callback = callback;
}
public DbmOperation.read_get(string key, DbmReadCallback<Invercargill.BinaryData> callback) {
this.type = DbmOperationType.GET;
this.key = key;
this.read_data_callback = callback;
}
public DbmOperation.write_set(string key, Invercargill.BinaryData value, DbmWriteCallback callback) {
this.type = DbmOperationType.SET;
this.key = key;
this.value = value;
this.write_callback = callback;
}
public DbmOperation.write_delete(string key, DbmWriteCallback callback) {
this.type = DbmOperationType.DELETE;
this.key = key;
this.write_callback = callback;
}
public DbmOperation.transaction_begin(DbmWriteCallback callback) {
this.type = DbmOperationType.BEGIN_TRANSACTION;
this.write_callback = callback;
}
public DbmOperation.transaction_commit(DbmWriteCallback callback) {
this.type = DbmOperationType.COMMIT_TRANSACTION;
this.write_callback = callback;
}
public DbmOperation.transaction_rollback(VoidCallback callback) {
this.type = DbmOperationType.ROLLBACK_TRANSACTION;
this.void_callback = callback;
}
}
} // namespace Implexus.Storage
For DBM implementations that support concurrent reads (like LMDB), the queue can spawn dedicated read threads:
// In AsyncDbmQueue, when supports_concurrent_reads is true:
private void process_read_with_thread(DbmOperation op) {
if (!_dbm.supports_concurrent_reads) {
process_operation(op);
return;
}
// For LMDB, spawn a new thread for the read
new Thread<void>("dbm-reader", () => {
// LMDB allows concurrent readers
var result = _dbm.get(op.key);
// Return result via Idle.add
Idle.add(() => {
op.read_data_callback(result, null);
return false;
});
});
}
namespace Implexus.Storage {
public interface Dbm : Object {
// === Existing Methods ===
public abstract bool has_key(string key);
public abstract Invercargill.BinaryData? @get(string key);
public abstract void @set(string key, Invercargill.BinaryData value) throws StorageError;
public abstract void delete(string key) throws StorageError;
public abstract Invercargill.Enumerable<string> keys { owned get; }
public abstract void begin_transaction() throws StorageError;
public abstract void commit_transaction() throws StorageError;
public abstract void rollback_transaction();
public abstract bool in_transaction { get; }
// === NEW: Concurrent Read Support ===
/**
* Indicates whether this Dbm implementation supports concurrent reads.
*
* If true, multiple threads can read simultaneously without blocking.
* Writes are still serialized through the queue.
*
* - LMDB: true (MVCC allows concurrent readers)
* - GDBM: false (single-threaded access)
* - FilesystemDbm: false (no explicit coordination)
*/
public abstract bool supports_concurrent_reads { get; }
}
} // namespace Implexus.Storage
// GdbmDbm.vala
public class GdbmDbm : Object, Dbm {
public bool supports_concurrent_reads { get { return false; } }
// ... rest of implementation
}
// LmdbDbm.vala
public class LmdbDbm : Object, Dbm {
public bool supports_concurrent_reads { get { return true; } }
// ... rest of implementation
}
// FilesystemDbm.vala
public class FilesystemDbm : Object, Dbm {
public bool supports_concurrent_reads { get { return false; } }
// ... rest of implementation
}
public interface Engine : Object {
public abstract Entity get_root();
public abstract Entity? get_entity(EntityPath path) throws EngineError;
public abstract Entity? get_entity_or_null(EntityPath path);
public abstract bool entity_exists(EntityPath path);
public abstract Invercargill.Enumerable<Entity> query_by_type(string type_label);
public abstract Invercargill.Enumerable<Entity> query_by_expression(string type_label, string expression);
public abstract Transaction begin_transaction() throws EngineError;
public abstract bool in_transaction { get; }
public abstract StorageConfiguration configuration { owned get; }
public signal void entity_created(Entity entity);
public signal void entity_deleted(EntityPath path);
public signal void entity_modified(Entity entity);
}
public interface Engine : Object {
// === Root Access ===
/**
* Gets the root entity of the database.
*
* This operation is async as it may require I/O to verify/create root.
*/
public abstract async Entity get_root_async() throws EngineError;
// === Path-Based Access ===
/**
* Gets an entity by path, throwing an error if not found.
*/
public abstract async Entity? get_entity_async(EntityPath path) throws EngineError;
/**
* Gets an entity by path, returning null if not found.
*/
public abstract async Entity? get_entity_or_null_async(EntityPath path) throws EngineError;
/**
* Checks if an entity exists at the specified path.
*/
public abstract async bool entity_exists_async(EntityPath path) throws EngineError;
// === Query Operations ===
/**
* Queries all entities of a specific type.
*
* Returns an array with eager loading. This is simpler than async
* iteration and sufficient for expected data volumes.
*
* DECISION: Use Entity[] (eager loading) - simpler and sufficient
* for expected data volumes. See "Decisions Made" section.
*/
public abstract async Entity[] query_by_type_async(string type_label) throws EngineError;
/**
* Queries entities by type and expression.
*/
public abstract async Entity[] query_by_expression_async(string type_label, string expression) throws EngineError;
// === Transactions ===
/**
* Begins a new transaction.
*
* NOTE: The with_write_transaction() helper has been REMOVED.
* Vala doesn't support async delegates, so transactions must be
* managed manually with begin_transaction_async(), perform operations,
* then commit_async() or rollback_async().
*
* See "Decisions Made" section for details.
*/
public abstract async Transaction begin_transaction_async() throws EngineError;
/**
* Indicates whether a transaction is currently active.
* This is synchronous as it's a quick property check.
*/
public abstract bool in_transaction { get; }
// === Configuration ===
/**
* Gets the storage configuration.
* This is synchronous as it's a quick property access.
*/
public abstract StorageConfiguration configuration { owned get; }
// === Events ===
public signal void entity_created(Entity entity);
public signal void entity_deleted(EntityPath path);
public signal void entity_modified(Entity entity);
}
The current Entity interface has synchronous methods for all operations.
public interface Entity : Object {
// === Identity (Synchronous - No I/O) ===
public abstract unowned Engine engine { get; }
public abstract EntityPath path { owned get; }
public abstract string name { owned get; }
public abstract EntityType entity_type { get; }
public abstract string type_label { owned get; }
public abstract string configured_expression { owned get; }
public abstract string configured_type_label { owned get; }
// === Parent/Child Navigation (Async - May require I/O) ===
/**
* Gets the parent entity, or null for the root.
*/
public abstract async Entity? get_parent_async() throws EngineError;
/**
* Gets the names of all child entities.
*/
public abstract async Invercargill.ReadOnlySet<string> get_child_names_async() throws EngineError;
/**
* Gets a child entity by name.
*/
public abstract async Entity? get_child_async(string name) throws EngineError;
/**
* Gets all child entities.
*/
public abstract async Entity[] get_children_async() throws EngineError;
// === Child Management (Async - Requires I/O) ===
public abstract async Entity? create_container_async(string name) throws EngineError;
public abstract async Entity? create_document_async(string name, string type_label) throws EngineError;
public abstract async Entity? create_category_async(string name, string type_label, string expression) throws EngineError;
public abstract async Entity? create_index_async(string name, string type_label, string expression) throws EngineError;
public abstract async Entity? create_catalogue_async(string name, string type_label, string expression) throws EngineError;
// === Document Operations (Async - Requires I/O) ===
/**
* Gets the properties stored in this document.
*/
public abstract async Invercargill.Properties get_properties_async() throws EngineError;
/**
* Gets a property value by name.
*/
public abstract async Invercargill.Element? get_entity_property_async(string name) throws EngineError;
/**
* Sets a property value.
*/
public abstract async void set_entity_property_async(string name, Invercargill.Element value) throws EngineError;
/**
* Removes a property.
*/
public abstract async void remove_property_async(string name) throws EngineError;
// === Lifecycle (Async - Requires I/O) ===
/**
* Deletes this entity from the database.
*/
public abstract async void delete_async() throws EngineError;
/**
* Checks if this entity still exists in the database.
* This is synchronous as it may be cached or quick to check.
*/
public abstract bool exists { get; }
// === Set Operations ===
/**
* Creates an EntitySet containing just this entity.
*/
public abstract async EntitySet as_set_async();
}
public interface Transaction : Object {
/**
* Indicates whether this transaction is still active.
* Synchronous - quick property check.
*/
public abstract bool active { get; }
/**
* Commits all changes made during this transaction.
*/
public abstract async void commit_async() throws EngineError;
/**
* Rolls back all changes made during this transaction.
*/
public abstract async void rollback_async();
}
The LowLevel storage classes remain synchronous (they operate on the Dbm directly). The HighLevel stores gain async versions that use the AsyncDbmQueue.
namespace Implexus.Storage.HighLevel {
public class EntityStore : Object {
private LowLevel.EntityMetadataStorage _metadata;
private LowLevel.TypeIndexStorage _type_index;
private AsyncDbmQueue _queue;
public EntityStore.with_queue(AsyncDbmQueue queue) {
_queue = queue;
// LowLevel stores still use sync Dbm for internal operations
// but we route through the queue
}
// === Async Metadata Operations ===
public async void store_metadata_async(
Core.EntityPath path,
Core.EntityType type,
string? type_label = null
) throws StorageError {
// Serialize and queue the write
string key = "entity:" + path.to_string();
var writer = new ElementWriter();
writer.write_element(new Invercargill.NativeElement<int64?>((int64) type));
writer.write_element(new Invercargill.NativeElement<string>(type_label ?? ""));
yield _queue.set_async(key, writer.to_binary_data());
}
public async Core.EntityType? get_entity_type_async(Core.EntityPath path) throws StorageError {
string key = "entity:" + path.to_string();
var data = yield _queue.get_async(key);
if (data == null) {
return null;
}
var reader = new ElementReader((!) data);
try {
var element = reader.read_element();
if (element.is_null()) {
return null;
}
int64? type_val = element.as<int64?>();
return (Core.EntityType) (type_val == null ? 0 : (!) type_val);
} catch (Invercargill.ElementError e) {
throw new StorageError.CORRUPT_DATA("Failed to read entity type: %s".printf(e.message));
}
}
public async bool exists_async(Core.EntityPath path) {
string key = "entity:" + path.to_string();
return yield _queue.has_key_async(key);
}
public async void delete_async(Core.EntityPath path) throws StorageError {
string key = "entity:" + path.to_string();
yield _queue.delete_async(key);
}
// ... similar async versions for other methods
}
} // namespace Implexus.Storage.HighLevel
The EmbeddedEngine currently uses synchronous Dbm operations directly.
public class EmbeddedEngine : Object, Core.Engine {
private Storage.AsyncDbmQueue _dbm_queue;
private Storage.HighLevel.EntityStore _entity_store;
private Storage.HighLevel.DocumentStore _document_store;
private Storage.HighLevel.ContainerStore _container_store;
private Storage.HighLevel.CategoryStore _category_store;
private Storage.HighLevel.CatalogueStore _catalogue_store;
private Storage.HighLevel.IndexStore _index_store;
public EmbeddedEngine(Core.StorageConfiguration config) {
// Get Dbm from storage
var basic_storage = (config.storage as Storage.BasicStorage);
if (basic_storage != null) {
var dbm = basic_storage.dbm;
// Create async queue
_dbm_queue = new Storage.AsyncDbmQueue(dbm);
// Initialize stores with queue
_entity_store = new Storage.HighLevel.EntityStore.with_queue(_dbm_queue);
_document_store = new Storage.HighLevel.DocumentStore.with_queue(_dbm_queue);
_container_store = new Storage.HighLevel.ContainerStore.with_queue(_dbm_queue);
_category_store = new Storage.HighLevel.CategoryStore.with_queue(_dbm_queue);
_catalogue_store = new Storage.HighLevel.CatalogueStore.with_queue(_dbm_queue);
_index_store = new Storage.HighLevel.IndexStore.with_queue(_dbm_queue);
}
}
// === Internal Sync Methods for Hook Use ===
//
// DECISION: Hooks remain synchronous, running in the DBM thread.
// The storage layer (HighLevel/LowLevel) also remains synchronous.
// EmbeddedEngine keeps internal sync versions of methods for hook use.
//
// These internal methods are NOT part of the public API and are only
// used by the HookManager to perform operations during hook execution.
internal Core.Entity? get_entity_or_null_sync(Core.EntityPath path) {
// Synchronous implementation for hook use
// Uses the underlying Dbm directly, bypassing the async queue
return _entity_store.get_entity_or_null_direct(path);
}
// === Engine Interface Implementation ===
public async Core.Entity get_root_async() throws Core.EngineError {
if (_root != null) {
return (!) _root;
}
var root_path = new Core.EntityPath.root();
// Check if root exists
bool exists = yield _entity_store.exists_async(root_path);
if (!exists) {
try {
yield _entity_store.store_metadata_async(root_path, Core.EntityType.CONTAINER, null);
} catch (Storage.StorageError e) {
throw new Core.EngineError.STORAGE_ERROR("Failed to create root: %s".printf(e.message));
}
}
_root = new Entities.Container(this, root_path);
return (!) _root;
}
public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError {
bool exists = yield _entity_store.exists_async(path);
if (!exists) {
throw new Core.EngineError.ENTITY_NOT_FOUND(
"Entity not found: %s".printf(path.to_string())
);
}
return yield create_entity_from_storage_async(path);
}
public async Core.Entity? get_entity_or_null_async(Core.EntityPath path) throws Core.EngineError {
try {
return yield get_entity_async(path);
} catch (Core.EngineError e) {
return null;
}
}
public async bool entity_exists_async(Core.EntityPath path) throws Core.EngineError {
return yield _entity_store.exists_async(path);
}
// ... rest of async implementations
}
The RemoteEngine already uses socket communication. We need to make the socket operations properly async.
internal Protocol.Message send_request_and_wait(Protocol.Message request) throws Protocol.ProtocolError {
ensure_connected();
var request_id = ((!) _writer).write_request(request);
return ((!) _reader).read_response_for_request(request_id);
}
internal async Protocol.Message send_request_and_wait_async(Protocol.Message request) throws Protocol.ProtocolError {
ensure_connected();
// Use async socket operations
var request_id = yield ((!) _writer).write_request_async(request);
return yield ((!) _reader).read_response_for_request_async(request_id);
}
public async Core.Entity? get_entity_async(Core.EntityPath path) throws Core.EngineError {
ensure_connected();
try {
var request = new Protocol.GetEntityRequest.for_path(path);
var response = yield send_request_and_wait_async(request);
if (response.message_type == Protocol.MessageType.ERROR) {
var error = (Protocol.ErrorResponse) response;
throw error_code_to_engine_error(error.error_code, error.error_message);
}
if (response.message_type == Protocol.MessageType.ENTITY_RESPONSE) {
var entity_response = (Protocol.EntityResponse) response;
return create_remote_entity_from_data(entity_response.entity_data);
}
throw new Core.EngineError.PROTOCOL_ERROR("Unexpected response type");
} catch (Protocol.ProtocolError e) {
throw new Core.EngineError.PROTOCOL_ERROR("Protocol error: %s".printf(e.message));
}
}
DECISION: Clean Break Migration
The project is in development, so we will use a clean break approach:
AsyncDbmQueue classsupports_concurrent_reads property to Dbm interfacewith_write_transaction() helper entirely| File | Purpose |
|---|---|
src/Storage/AsyncDbmQueue.vala |
Queue system for async DBM operations |
src/Storage/DbmOperation.vala |
Operation types for the queue |
| File | Changes |
|---|---|
src/Storage/Dbm.vala |
Add supports_concurrent_reads property |
src/Storage/Gdbm/GdbmDbm.vala |
Implement supports_concurrent_reads (return false) |
src/Storage/Lmdb/LmdbDbm.vala |
Implement supports_concurrent_reads (return true) |
src/Storage/FilesystemDbm.vala |
Implement supports_concurrent_reads (return false) |
src/Core/Engine.vala |
Convert all I/O methods to async |
src/Core/Entity.vala |
Convert all I/O methods to async |
src/Core/Transaction.vala |
(if separate) Convert to async |
src/Storage/HighLevel/EntityStore.vala |
Add async methods |
src/Storage/HighLevel/DocumentStore.vala |
Add async methods |
src/Storage/HighLevel/ContainerStore.vala |
Add async methods |
src/Storage/HighLevel/CategoryStore.vala |
Add async methods |
src/Storage/HighLevel/CatalogueStore.vala |
Add async methods |
src/Storage/HighLevel/IndexStore.vala |
Add async methods |
src/Engine/EmbeddedEngine.vala |
Implement async interface methods |
src/Engine/RemoteEngine.vala |
Implement async interface methods with async sockets |
src/Entities/Container.vala |
Use async stores |
src/Entities/Document.vala |
Use async stores |
src/Entities/Category.vala |
Use async stores |
src/Entities/Catalogue.vala |
Use async stores |
src/Entities/Index.vala |
Use async stores |
src/Entities/AbstractEntity.vala |
Update base implementations |
src/Protocol/MessageReader.vala |
Add async read methods |
src/Protocol/MessageWriter.vala |
Add async write methods |
src/meson.build |
Add new files to build |
| File | Reason |
|---|---|
src/Core/AsyncEngine.vala |
Replaced by async interface methods |
src/Core/AsyncEntity.vala |
Replaced by async interface methods |
| File | Changes |
|---|---|
tests/Storage/GdbmDbmTest.vala |
Add tests for supports_concurrent_reads |
tests/Storage/LmdbDbmTest.vala |
Add tests for supports_concurrent_reads |
tests/Storage/FilesystemDbmTest.vala |
Add tests for supports_concurrent_reads |
tests/Storage/StorageTest.vala |
Add tests for AsyncDbmQueue |
tests/Engine/EmbeddedEngineTest.vala |
Update to use async methods |
| All other tests | Update to use async methods |
This section documents the finalized design decisions for the async I/O refactor.
Decision: Use Entity[] (eager loading) for query results.
Rationale:
AsyncEnumerable<Entity>Example:
public abstract async Entity[] query_by_type_async(string type_label) throws EngineError;
Decision: Remove all sync methods immediately, update everything in this refactor.
Rationale:
Implications:
Decision: Remove with_write_transaction() entirely.
Rationale:
New Pattern:
// Old (no longer possible):
yield engine.with_write_transaction_async(async (tx) => {
yield entity.set_entity_property_async("key", value);
});
// New (manual management):
try {
yield engine.begin_transaction_async();
yield entity.set_entity_property_async("key", value);
yield engine.commit_async();
} catch (Error e) {
yield engine.rollback_async();
throw e;
}
Decision: Keep hooks synchronous, running in the DBM thread.
Rationale:
Implementation Note:
EmbeddedEngine keeps internal sync versions of methods like get_entity_or_null_sync() for hook use. These are:
internal (not public API)throws KeywordDecision: Use throws keyword for async methods, consistent with existing codebase.
Rationale:
Example:
public async Entity get_entity_async(EntityPath path) throws EngineError;
public async Entity[] query_by_type_async(string type_label) throws EngineError;
public async void set_entity_property_async(string name, Element value) throws EngineError;
Decision: No compatibility shims needed - project is in development.
Rationale:
This design proposes:
The key benefits are: