# Storage Layer This document describes the storage abstraction and binary serialization format. ## Storage Architecture ```mermaid graph TB subgraph API Layer Engine[Engine] end subgraph Storage Layer Storage[Storage Interface] Serializer[EntitySerializer] Deserializer[EntityDeserializer] end subgraph Async Queue AsyncDbmQueue[AsyncDbmQueue] DbmOperation[DbmOperation] WorkerThread[Worker Thread] end subgraph Serialization ElementWriter[ElementWriter] ElementReader[ElementReader] end subgraph Persistence Dbm[Dbm Interface] FilesystemDbm[FilesystemDbm] GdbmDbm[GdbmDbm] LmdbDbm[LmdbDbm] end Engine --> Storage Storage --> Serializer Storage --> Deserializer Serializer --> ElementWriter Deserializer --> ElementReader Storage --> AsyncDbmQueue AsyncDbmQueue --> DbmOperation AsyncDbmQueue --> WorkerThread AsyncDbmQueue --> Dbm Dbm --> FilesystemDbm Dbm --> GdbmDbm Dbm --> LmdbDbm ``` ## Async I/O Architecture All database I/O operations are asynchronous, using a queue system to handle different DBM backend capabilities. ### AsyncDbmQueue The `AsyncDbmQueue` manages asynchronous execution of database operations: ```vala namespace Implexus.Storage { /** * Queue system for async DBM operations. * * For DBMs without concurrent read support (GDBM, Filesystem): * - All operations go through a single worker thread * - Read operations are prioritized over writes * * For DBMs with concurrent read support (LMDB): * - Write operations go through the worker thread * - Read operations spawn their own threads */ public class AsyncDbmQueue : GLib.Object { private weak Dbm _dbm; private AsyncQueue _read_queue; private AsyncQueue _write_queue; private Thread? _worker_thread = null; private bool _running = false; /** * Creates a new AsyncDbmQueue for the given DBM. * * @param dbm The DBM instance to wrap */ public AsyncDbmQueue(Dbm dbm) { _dbm = dbm; _read_queue = new AsyncQueue(); _write_queue = new AsyncQueue(); } /** * Starts the worker thread. * Must be called before executing any operations. */ public void start(); /** * Stops the worker thread and waits for it to finish. * * @param timeout_ms Maximum time to wait for shutdown (0 = wait forever) * @return true if shutdown completed, false if timed out */ public bool shutdown(int timeout_ms = 5000); /** * Executes a read operation asynchronously. * * For concurrent-read DBMs (LMDB), spawns a new thread. * For single-threaded DBMs (GDBM, Filesystem), queues and prioritizes. */ public async void execute_read_async(owned DbmOperation op) throws Error; /** * Executes a write operation asynchronously. * * Write operations always go through the queue to ensure * serialization and data consistency. */ public async void execute_write_async(owned DbmOperation op) throws Error; /** * Queues a read operation and returns the operation object. */ public DbmOperation queue_read(); /** * Queues a write operation and returns the operation object. */ public DbmOperation queue_write(); } } // namespace Implexus.Storage ``` ### DbmOperation Represents a single operation in the queue: ```vala namespace Implexus.Storage { /** * Type of database operation. */ public enum DbmOperationType { /** Read operation - prioritized over writes */ READ, /** Write operation - processed in order */ WRITE } /** * Represents a single operation in the AsyncDbmQueue. * * Stores the operation type, callback for async continuation, * and provides storage for the result or error. */ public class DbmOperation : GLib.Object { /** The type of this operation (READ or WRITE) */ public DbmOperationType op_type { get; construct; } /** The callback to resume the async method when complete */ public SourceFunc callback; /** The result of the operation (if successful) */ public void* result { get; set; } /** The error that occurred during execution (if any) */ public Error? error { get; set; } /** Whether the operation has been completed */ public bool completed { get; set; default = false; } /** * Creates a new DbmOperation. * * @param type The type of operation (READ or WRITE) * @param cb The callback to resume the async method */ public DbmOperation(DbmOperationType type, owned SourceFunc cb); } } // namespace Implexus.Storage ``` ### Worker Thread Behavior The worker thread processes operations with read prioritization: 1. **Read Priority**: Check read queue first, then write queue 2. **Blocking**: If no operations available, block on read queue with timeout 3. **Result Delivery**: Use `Idle.add()` to return results to main loop 4. **Shutdown**: Push dummy operation to wake worker, wait for completion ```vala private void _worker() { while (_running) { // Try to get a read operation first (prioritize reads) DbmOperation? op = _read_queue.try_pop(); // If no read operation, try write if (op == null) { op = _write_queue.try_pop(); } // If still no operation, block on read queue with timeout if (op == null) { op = _read_queue.timed_pop(...); continue; } // Signal completion via Idle to return to main context Idle.add(() => { op.callback(); return Source.REMOVE; }); } } ``` ## DBM Interface The DBM interface provides low-level key-value storage with concurrent read support indicator: ```vala namespace Implexus.Storage { public interface Dbm : Object { /** * Whether this DBM implementation supports concurrent read operations. * * If true, read operations can spawn new threads while writes go through * the async queue. This enables better read parallelism. * * - LMDB: true (MVCC allows concurrent readers) * - GDBM: false (single-threaded access required) * - Filesystem: false (single-threaded access required) */ public abstract bool supports_concurrent_reads { get; } /** * Checks if a key exists in the database. */ public abstract bool has_key(string key); /** * Gets the value for a key. */ public abstract Invercargill.BinaryData? @get(string key); /** * Sets a key-value pair. */ public abstract void @set(string key, Invercargill.BinaryData value) throws StorageError; /** * Deletes a key from the database. */ public abstract void delete(string key) throws StorageError; /** * Gets all keys in the database. */ public abstract Invercargill.Enumerable keys { owned get; } /** * Begins a new transaction. */ public abstract void begin_transaction() throws StorageError; /** * Commits the current transaction. */ public abstract void commit_transaction() throws StorageError; /** * Rolls back the current transaction. */ public abstract void rollback_transaction(); /** * Indicates whether a transaction is currently active. */ public abstract bool in_transaction { get; } } } // namespace Implexus.Storage ``` ### Concurrent Read Support by Backend | Backend | supports_concurrent_reads | Notes | |---------|---------------------------|-------| | LMDB | `true` | MVCC allows multiple concurrent readers | | GDBM | `false` | Single-threaded access required | | Filesystem | `false` | In-memory dictionary, single-threaded | ## Key Naming Convention Storage keys are organized by prefix: | Key Pattern | Purpose | |-------------|---------| | `entity:` | Serialized entity data | | `children:` | Set of child names for path | | `type:` | Set of paths for type | | `meta:version` | Database version | | `meta:config` | Configuration data | ## Binary Serialization Format ### Overview The serialization format encodes Element types to binary without GLib.Object specific logic. All values are written in big-endian byte order. ### Format Version Header Every serialized entity starts with a header: ``` Offset Size Field 0 4 Magic: 0x49 0x4D 0x50 0x58 ("IMPX") 4 2 Version (currently 0x0001) 6 1 Entity type (0=Container, 1=Document, 2=Category, 3=Index) 7 2 Flags (reserved) ``` ### Element Type Codes | Code | Element Type | |------|-------------| | 0x00 | NullElement | | 0x01 | ValueElement (type inferred from content) | | 0x02 | String | | 0x03 | Boolean | | 0x04 | Int8 | | 0x05 | UInt8 | | 0x06 | Int16 | | 0x07 | UInt16 | | 0x08 | Int32 | | 0x09 | UInt32 | | 0x0A | Int64 | | 0x0B | UInt64 | | 0x0C | Float | | 0x0D | Double | | 0x0E | Element Array | | 0x0F | Element Dictionary | ### ElementWriter ```vala namespace Implexus.Serialization { public class ElementWriter { private Invercargill.DataStructures.ByteBuffer _buffer; public ElementWriter() { _buffer = new Invercargill.DataStructures.ByteBuffer(); } public void write_null() { _buffer.append_byte(0x00); } public void write_element(Invercargill.Element? element) { if (element == null || element.is_null()) { write_null(); return; } var value = element as Invercargill.ValueElement; if (value != null) { write_value((!) value); return; } // Handle other element types var type = element.type(); if (type == typeof(string)) { write_string(element.to_value()); } else if (type == typeof(bool)) { write_bool(element.to_value()); } else if (type == typeof(int8)) { write_int8(element.to_value()); } else if (type == typeof(uint8)) { write_uint8(element.to_value()); } else if (type == typeof(int16)) { write_int16(element.to_value()); } else if (type == typeof(uint16)) { write_uint16(element.to_value()); } else if (type == typeof(int32)) { write_int32(element.to_value()); } else if (type == typeof(uint32)) { write_uint32(element.to_value()); } else if (type == typeof(int64)) { write_int64(element.to_value()); } else if (type == typeof(uint64)) { write_uint64(element.to_value()); } else if (type == typeof(float)) { write_float(element.to_value()); } else if (type == typeof(double)) { write_double(element.to_value()); } else { // Fallback: write as string write_string(element.to_string()); } } public void write_value(Invercargill.ValueElement value) { // ValueElement wraps GLib.Value, determine type var gvalue = value.to_gvalue(); write_gvalue(ref gvalue); } private void write_gvalue(ref GLib.Value gvalue) { if (gvalue.type() == typeof(string)) { write_string((string) gvalue); } else if (gvalue.type() == typeof(bool)) { write_bool((bool) gvalue); } else if (gvalue.type() == typeof(int)) { write_int64((int) gvalue); } else if (gvalue.type() == typeof(int64)) { write_int64((int64) gvalue); } else if (gvalue.type() == typeof(double)) { write_double((double) gvalue); } else if (gvalue.type() == typeof(float)) { write_float((float) gvalue); } else { write_string(gvalue.strdup_contents()); } } public void write_string(string value) { _buffer.append_byte(0x02); var bytes = value.data; write_length(bytes.length); _buffer.append_bytes(bytes); } public void write_bool(bool value) { _buffer.append_byte(0x03); _buffer.append_byte(value ? 1 : 0); } public void write_int8(int8 value) { _buffer.append_byte(0x04); _buffer.append_byte((uint8) value); } public void write_uint8(uint8 value) { _buffer.append_byte(0x05); _buffer.append_byte(value); } public void write_int16(int16 value) { _buffer.append_byte(0x06); _buffer.append_int16_be(value); } public void write_uint16(uint16 value) { _buffer.append_byte(0x07); _buffer.append_uint16_be(value); } public void write_int32(int32 value) { _buffer.append_byte(0x08); _buffer.append_int32_be(value); } public void write_uint32(uint32 value) { _buffer.append_byte(0x09); _buffer.append_uint32_be(value); } public void write_int64(int64 value) { _buffer.append_byte(0x0A); _buffer.append_int64_be(value); } public void write_uint64(uint64 value) { _buffer.append_byte(0x0B); _buffer.append_uint64_be(value); } public void write_float(float value) { _buffer.append_byte(0x0C); _buffer.append_float_be(value); } public void write_double(double value) { _buffer.append_byte(0x0D); _buffer.append_double_be(value); } public void write_length(int64 length) { // Variable-length encoding for lengths if (length < 0x80) { _buffer.append_byte((uint8) length); } else if (length < 0x4000) { _buffer.append_byte((uint8) ((length >> 8) | 0x80)); _buffer.append_byte((uint8) (length & 0xFF)); } else { _buffer.append_byte(0xFF); _buffer.append_int64_be(length); } } public void write_array(Invercargill.ReadOnlyCollection array) { _buffer.append_byte(0x0E); write_length(array.count); foreach (var element in array) { write_element(element); } } public void write_dictionary(Invercargill.ReadOnlyAssociative dict) { _buffer.append_byte(0x0F); write_length(dict.count); foreach (var entry in dict.entries) { write_string(entry.key); write_element(entry.value); } } public uint8[] to_bytes() { return _buffer.to_bytes(); } public void reset() { _buffer.clear(); } } } // namespace Implexus.Serialization ``` ### ElementReader ```vala namespace Implexus.Serialization { public class ElementReader { private uint8[] _data; private int _position; public ElementReader(uint8[] data) { _data = data; _position = 0; } public Invercargill.Element? read_element() throws SerializationError { if (_position >= _data.length) { throw new SerializationError.UNEXPECTED_END("Unexpected end of data"); } var type_code = _data[_position++]; switch (type_code) { case 0x00: return new Invercargill.NullElement(); case 0x02: return new Invercargill.ValueElement(read_string()); case 0x03: return new Invercargill.ValueElement(read_bool()); case 0x04: return new Invercargill.ValueElement(read_int8()); case 0x05: return new Invercargill.ValueElement(read_uint8()); case 0x06: return new Invercargill.ValueElement(read_int16()); case 0x07: return new Invercargill.ValueElement(read_uint16()); case 0x08: return new Invercargill.ValueElement(read_int32()); case 0x09: return new Invercargill.ValueElement(read_uint32()); case 0x0A: return new Invercargill.ValueElement(read_int64()); case 0x0B: return new Invercargill.ValueElement(read_uint64()); case 0x0C: return new Invercargill.ValueElement(read_float()); case 0x0D: return new Invercargill.ValueElement(read_double()); case 0x0E: return read_array(); case 0x0F: return read_dictionary(); default: throw new SerializationError.UNKNOWN_TYPE("Unknown type code: 0x%02X", type_code); } } private string read_string() { var length = read_length(); var bytes = new uint8[length]; Memory.copy(bytes, &_data[_position], length); _position += length; return (string) bytes; } private bool read_bool() { return _data[_position++] != 0; } private int8 read_int8() { return (int8) _data[_position++]; } private uint8 read_uint8() { return _data[_position++]; } private int16 read_int16() { var value = (int16) (_data[_position] << 8 | _data[_position + 1]); _position += 2; return value; } private uint16 read_uint16() { var value = (uint16) (_data[_position] << 8 | _data[_position + 1]); _position += 2; return value; } private int32 read_int32() { var value = (int32) ( _data[_position] << 24 | _data[_position + 1] << 16 | _data[_position + 2] << 8 | _data[_position + 3] ); _position += 4; return value; } private uint32 read_uint32() { var value = (uint32) ( _data[_position] << 24 | _data[_position + 1] << 16 | _data[_position + 2] << 8 | _data[_position + 3] ); _position += 4; return value; } private int64 read_int64() { var value = (int64) ( (int64) _data[_position] << 56 | (int64) _data[_position + 1] << 48 | (int64) _data[_position + 2] << 40 | (int64) _data[_position + 3] << 32 | (int64) _data[_position + 4] << 24 | (int64) _data[_position + 5] << 16 | (int64) _data[_position + 6] << 8 | (int64) _data[_position + 7] ); _position += 8; return value; } private uint64 read_uint64() { var value = (uint64) ( (uint64) _data[_position] << 56 | (uint64) _data[_position + 1] << 48 | (uint64) _data[_position + 2] << 40 | (uint64) _data[_position + 3] << 32 | (uint64) _data[_position + 4] << 24 | (uint64) _data[_position + 5] << 16 | (uint64) _data[_position + 6] << 8 | (uint64) _data[_position + 7] ); _position += 8; return value; } private float read_float() { var bytes = new uint8[4]; Memory.copy(bytes, &_data[_position], 4); _position += 4; return *((float*) bytes); } private double read_double() { var bytes = new uint8[8]; Memory.copy(bytes, &_data[_position], 8); _position += 8; return *((double*) bytes); } private int64 read_length() { var first = _data[_position++]; if (first < 0x80) { return first; } else if (first < 0xFF) { var second = _data[_position++]; return ((first & 0x7F) << 8) | second; } else { return read_int64(); } } private Invercargill.Element read_array() throws SerializationError { var length = read_length(); var array = new Invercargill.DataStructures.Vector(); for (int64 i = 0; i < length; i++) { array.add(read_element()); } return new Invercargill.NativeElement(array); } private Invercargill.Element read_dictionary() throws SerializationError { var length = read_length(); var dict = new Invercargill.DataStructures.Dictionary(); for (int64 i = 0; i < length; i++) { var key = read_string(); var value = read_element(); dict.set(key, value); } return new Invercargill.NativeElement(dict); } } } // namespace Implexus.Serialization ``` ## Entity Serialization ### EntitySerializer ```vala namespace Implexus.Serialization { public class EntitySerializer { public uint8[] serialize(Entity entity) throws SerializationError { var writer = new ElementWriter(); // Write header write_header(writer, entity); // Write entity-specific data switch (entity.entity_type) { case EntityType.CONTAINER: write_container(writer, (Container) entity); break; case EntityType.DOCUMENT: write_document(writer, (Document) entity); break; case EntityType.CATEGORY: write_category(writer, (Category) entity); break; case EntityType.INDEX: write_index(writer, (Index) entity); break; } return writer.to_bytes(); } private void write_header(ElementWriter writer, Entity entity) { // Magic writer._buffer.append_byte(0x49); // 'I' writer._buffer.append_byte(0x4D); // 'M' writer._buffer.append_byte(0x50); // 'P' writer._buffer.append_byte(0x58); // 'X' // Version writer._buffer.append_uint16_be(1); // Entity type writer._buffer.append_byte((uint8) entity.entity_type); // Flags (reserved) writer._buffer.append_uint16_be(0); } private void write_container(ElementWriter writer, Container container) { // Categories only need path - children are tracked separately writer.write_string(container.path.to_string()); } private void write_document(ElementWriter writer, Document document) { // Write path writer.write_string(document.path.to_string()); // Write type label writer.write_string(document.type_label); // Write properties var props = document.properties; var prop_dict = new Invercargill.DataStructures.Dictionary(); foreach (var key in props.keys) { prop_dict.set(key, props.get(key)); } writer.write_dictionary(prop_dict); } private void write_category(ElementWriter writer, Category category) { // Write path writer.write_string(category.path.to_string()); // Write type label writer.write_string(category.configured_type_label); // Write expression writer.write_string(category.configured_expression); } private void write_index(ElementWriter writer, Index index) { // Write path writer.write_string(index.path.to_string()); // Write type label writer.write_string(index.configured_type_label); // Write expression writer.write_string(index.configured_expression); } } } // namespace Implexus.Serialization ``` ### EntityDeserializer ```vala namespace Implexus.Serialization { public class EntityDeserializer { public Entity deserialize(uint8[] data, Engine engine, Path path) throws SerializationError { var reader = new ElementReader(data); // Read and validate header var header = read_header(reader); validate_header(header); // Read entity-specific data switch (header.entity_type) { case EntityType.CONTAINER: return read_container(reader, engine, path); case EntityType.DOCUMENT: return read_document(reader, engine, path); case EntityType.CATEGORY: return read_category(reader, engine, path); case EntityType.INDEX: return read_index(reader, engine, path); default: throw new SerializationError.UNKNOWN_TYPE( "Unknown entity type: %d", (int) header.entity_type ); } } private Header read_header(ElementReader reader) throws SerializationError { var header = new Header(); // Magic header.magic[0] = reader._data[reader._position++]; header.magic[1] = reader._data[reader._position++]; header.magic[2] = reader._data[reader._position++]; header.magic[3] = reader._data[reader._position++]; // Version header.version = reader.read_uint16(); // Entity type header.entity_type = (EntityType) reader._data[reader._position++]; // Flags header.flags = reader.read_uint16(); return header; } private void validate_header(Header header) throws SerializationError { if (header.magic[0] != 'I' || header.magic[1] != 'M' || header.magic[2] != 'P' || header.magic[3] != 'X') { throw new SerializationError.INVALID_FORMAT("Invalid magic number"); } if (header.version != 1) { throw new SerializationError.UNSUPPORTED_VERSION( "Unsupported version: %d", header.version ); } } private Container read_container(ElementReader reader, Engine engine, Path path) { // Path is stored but we already have it var stored_path = reader.read_string(); return new Container(engine, path); } private Document read_document(ElementReader reader, Engine engine, Path path) { // Path var stored_path = reader.read_string(); // Type label var type_label = reader.read_string(); // Create document var doc = new Document(engine, path, type_label); // Properties var props_dict = reader.read_dictionary(); var dict = props_dict as Invercargill.DataStructures.Dictionary; if (dict != null) { foreach (var entry in dict.entries) { doc.set_property(entry.key, entry.value); } } return doc; } private Category read_category(ElementReader reader, Engine engine, Path path) { // Path var stored_path = reader.read_string(); // Type label var type_label = reader.read_string(); // Expression var expression = reader.read_string(); return new Category(engine, path, type_label, expression); } private Index read_index(ElementReader reader, Engine engine, Path path) { // Path var stored_path = reader.read_string(); // Type label var type_label = reader.read_string(); // Expression var expression = reader.read_string(); return new Index(engine, path, type_label, expression); } } private class Header { public uint8[] magic = new uint8[4]; public uint16 version; public EntityType entity_type; public uint16 flags; } } // namespace Implexus.Serialization ``` ## SerializationError ```vala namespace Implexus.Serialization { public errordomain SerializationError { INVALID_FORMAT, UNSUPPORTED_VERSION, UNKNOWN_TYPE, UNEXPECTED_END, CORRUPT_DATA; } } // namespace Implexus.Serialization ``` ## Storage Configuration ```vala namespace Implexus.Storage { public class StorageConfiguration : Object { private Storage _storage; public StorageConfiguration(Storage storage) { _storage = storage; } public Storage storage { get { return _storage; } } // Configuration options public bool auto_sync { get; set; default = true; } public int cache_size { get; set; default = 1000; } public bool enable_compression { get; set; default = false; } } } // namespace Implexus.Storage ``` ## Hooks Hooks remain synchronous and run in the DBM worker thread context. This allows hooks to perform additional database operations without thread-safety concerns. ```vala // Hooks run synchronously in the DBM worker thread // They can perform additional DB operations directly public delegate void EntityHook(Entity entity); // Example hook registration engine.hook_manager.register_create_hook("Task", (entity) => { // This runs in the DBM worker thread // Can perform synchronous DB operations update_indexes(entity); }); ``` **Important**: Since hooks run in the worker thread, they should not call async methods or block indefinitely. Long-running operations should be offloaded to separate threads.