This document describes the storage abstraction and binary serialization format.
graph TB
subgraph API Layer
Engine[Engine]
end
subgraph Storage Layer
Storage[Storage Interface]
Serializer[EntitySerializer]
Deserializer[EntityDeserializer]
end
subgraph Async Queue
AsyncDbmQueue[AsyncDbmQueue]
DbmOperation[DbmOperation]
WorkerThread[Worker Thread]
end
subgraph Serialization
ElementWriter[ElementWriter]
ElementReader[ElementReader]
end
subgraph Persistence
Dbm[Dbm Interface]
FilesystemDbm[FilesystemDbm]
GdbmDbm[GdbmDbm]
LmdbDbm[LmdbDbm]
end
Engine --> Storage
Storage --> Serializer
Storage --> Deserializer
Serializer --> ElementWriter
Deserializer --> ElementReader
Storage --> AsyncDbmQueue
AsyncDbmQueue --> DbmOperation
AsyncDbmQueue --> WorkerThread
AsyncDbmQueue --> Dbm
Dbm --> FilesystemDbm
Dbm --> GdbmDbm
Dbm --> LmdbDbm
All database I/O operations are asynchronous, using a queue system to handle different DBM backend capabilities.
The AsyncDbmQueue manages asynchronous execution of database operations:
namespace Implexus.Storage {
/**
* Queue system for async DBM operations.
*
* For DBMs without concurrent read support (GDBM, Filesystem):
* - All operations go through a single worker thread
* - Read operations are prioritized over writes
*
* For DBMs with concurrent read support (LMDB):
* - Write operations go through the worker thread
* - Read operations spawn their own threads
*/
public class AsyncDbmQueue : GLib.Object {
private weak Dbm _dbm;
private AsyncQueue<DbmOperation> _read_queue;
private AsyncQueue<DbmOperation> _write_queue;
private Thread<void>? _worker_thread = null;
private bool _running = false;
/**
* Creates a new AsyncDbmQueue for the given DBM.
*
* @param dbm The DBM instance to wrap
*/
public AsyncDbmQueue(Dbm dbm) {
_dbm = dbm;
_read_queue = new AsyncQueue<DbmOperation>();
_write_queue = new AsyncQueue<DbmOperation>();
}
/**
* Starts the worker thread.
* Must be called before executing any operations.
*/
public void start();
/**
* Stops the worker thread and waits for it to finish.
*
* @param timeout_ms Maximum time to wait for shutdown (0 = wait forever)
* @return true if shutdown completed, false if timed out
*/
public bool shutdown(int timeout_ms = 5000);
/**
* Executes a read operation asynchronously.
*
* For concurrent-read DBMs (LMDB), spawns a new thread.
* For single-threaded DBMs (GDBM, Filesystem), queues and prioritizes.
*/
public async void execute_read_async(owned DbmOperation op) throws Error;
/**
* Executes a write operation asynchronously.
*
* Write operations always go through the queue to ensure
* serialization and data consistency.
*/
public async void execute_write_async(owned DbmOperation op) throws Error;
/**
* Queues a read operation and returns the operation object.
*/
public DbmOperation queue_read();
/**
* Queues a write operation and returns the operation object.
*/
public DbmOperation queue_write();
}
} // namespace Implexus.Storage
Represents a single operation in the queue:
namespace Implexus.Storage {
/**
* Type of database operation.
*/
public enum DbmOperationType {
/** Read operation - prioritized over writes */
READ,
/** Write operation - processed in order */
WRITE
}
/**
* Represents a single operation in the AsyncDbmQueue.
*
* Stores the operation type, callback for async continuation,
* and provides storage for the result or error.
*/
public class DbmOperation : GLib.Object {
/** The type of this operation (READ or WRITE) */
public DbmOperationType op_type { get; construct; }
/** The callback to resume the async method when complete */
public SourceFunc callback;
/** The result of the operation (if successful) */
public void* result { get; set; }
/** The error that occurred during execution (if any) */
public Error? error { get; set; }
/** Whether the operation has been completed */
public bool completed { get; set; default = false; }
/**
* Creates a new DbmOperation.
*
* @param type The type of operation (READ or WRITE)
* @param cb The callback to resume the async method
*/
public DbmOperation(DbmOperationType type, owned SourceFunc cb);
}
} // namespace Implexus.Storage
The worker thread processes operations with read prioritization:
Idle.add() to return results to main loopShutdown: Push dummy operation to wake worker, wait for completion
private void _worker() {
while (_running) {
// Try to get a read operation first (prioritize reads)
DbmOperation? op = _read_queue.try_pop();
// If no read operation, try write
if (op == null) {
op = _write_queue.try_pop();
}
// If still no operation, block on read queue with timeout
if (op == null) {
op = _read_queue.timed_pop(...);
continue;
}
// Signal completion via Idle to return to main context
Idle.add(() => {
op.callback();
return Source.REMOVE;
});
}
}
The DBM interface provides low-level key-value storage with concurrent read support indicator:
namespace Implexus.Storage {
public interface Dbm : Object {
/**
* Whether this DBM implementation supports concurrent read operations.
*
* If true, read operations can spawn new threads while writes go through
* the async queue. This enables better read parallelism.
*
* - LMDB: true (MVCC allows concurrent readers)
* - GDBM: false (single-threaded access required)
* - Filesystem: false (single-threaded access required)
*/
public abstract bool supports_concurrent_reads { get; }
/**
* Checks if a key exists in the database.
*/
public abstract bool has_key(string key);
/**
* Gets the value for a key.
*/
public abstract Invercargill.BinaryData? @get(string key);
/**
* Sets a key-value pair.
*/
public abstract void @set(string key, Invercargill.BinaryData value) throws StorageError;
/**
* Deletes a key from the database.
*/
public abstract void delete(string key) throws StorageError;
/**
* Gets all keys in the database.
*/
public abstract Invercargill.Enumerable<string> keys { owned get; }
/**
* Begins a new transaction.
*/
public abstract void begin_transaction() throws StorageError;
/**
* Commits the current transaction.
*/
public abstract void commit_transaction() throws StorageError;
/**
* Rolls back the current transaction.
*/
public abstract void rollback_transaction();
/**
* Indicates whether a transaction is currently active.
*/
public abstract bool in_transaction { get; }
}
} // namespace Implexus.Storage
| Backend | supports_concurrent_reads | Notes |
|---|---|---|
| LMDB | true |
MVCC allows multiple concurrent readers |
| GDBM | false |
Single-threaded access required |
| Filesystem | false |
In-memory dictionary, single-threaded |
Storage keys are organized by prefix:
| Key Pattern | Purpose |
|---|---|
entity:<path> |
Serialized entity data |
children:<path> |
Set of child names for path |
type:<type_label> |
Set of paths for type |
meta:version |
Database version |
meta:config |
Configuration data |
The serialization format encodes Element types to binary without GLib.Object specific logic. All values are written in big-endian byte order.
Every serialized entity starts with a header:
Offset Size Field
0 4 Magic: 0x49 0x4D 0x50 0x58 ("IMPX")
4 2 Version (currently 0x0001)
6 1 Entity type (0=Container, 1=Document, 2=Category, 3=Index)
7 2 Flags (reserved)
| Code | Element Type |
|---|---|
| 0x00 | NullElement |
| 0x01 | ValueElement (type inferred from content) |
| 0x02 | String |
| 0x03 | Boolean |
| 0x04 | Int8 |
| 0x05 | UInt8 |
| 0x06 | Int16 |
| 0x07 | UInt16 |
| 0x08 | Int32 |
| 0x09 | UInt32 |
| 0x0A | Int64 |
| 0x0B | UInt64 |
| 0x0C | Float |
| 0x0D | Double |
| 0x0E | Element Array |
| 0x0F | Element Dictionary |
namespace Implexus.Serialization {
public class ElementWriter {
private Invercargill.DataStructures.ByteBuffer _buffer;
public ElementWriter() {
_buffer = new Invercargill.DataStructures.ByteBuffer();
}
public void write_null() {
_buffer.append_byte(0x00);
}
public void write_element(Invercargill.Element? element) {
if (element == null || element.is_null()) {
write_null();
return;
}
var value = element as Invercargill.ValueElement;
if (value != null) {
write_value((!) value);
return;
}
// Handle other element types
var type = element.type();
if (type == typeof(string)) {
write_string(element.to_value<string>());
} else if (type == typeof(bool)) {
write_bool(element.to_value<bool?>());
} else if (type == typeof(int8)) {
write_int8(element.to_value<int8>());
} else if (type == typeof(uint8)) {
write_uint8(element.to_value<uint8>());
} else if (type == typeof(int16)) {
write_int16(element.to_value<int16>());
} else if (type == typeof(uint16)) {
write_uint16(element.to_value<uint16>());
} else if (type == typeof(int32)) {
write_int32(element.to_value<int32>());
} else if (type == typeof(uint32)) {
write_uint32(element.to_value<uint32>());
} else if (type == typeof(int64)) {
write_int64(element.to_value<int64>());
} else if (type == typeof(uint64)) {
write_uint64(element.to_value<uint64>());
} else if (type == typeof(float)) {
write_float(element.to_value<float>());
} else if (type == typeof(double)) {
write_double(element.to_value<double>());
} else {
// Fallback: write as string
write_string(element.to_string());
}
}
public void write_value(Invercargill.ValueElement value) {
// ValueElement wraps GLib.Value, determine type
var gvalue = value.to_gvalue();
write_gvalue(ref gvalue);
}
private void write_gvalue(ref GLib.Value gvalue) {
if (gvalue.type() == typeof(string)) {
write_string((string) gvalue);
} else if (gvalue.type() == typeof(bool)) {
write_bool((bool) gvalue);
} else if (gvalue.type() == typeof(int)) {
write_int64((int) gvalue);
} else if (gvalue.type() == typeof(int64)) {
write_int64((int64) gvalue);
} else if (gvalue.type() == typeof(double)) {
write_double((double) gvalue);
} else if (gvalue.type() == typeof(float)) {
write_float((float) gvalue);
} else {
write_string(gvalue.strdup_contents());
}
}
public void write_string(string value) {
_buffer.append_byte(0x02);
var bytes = value.data;
write_length(bytes.length);
_buffer.append_bytes(bytes);
}
public void write_bool(bool value) {
_buffer.append_byte(0x03);
_buffer.append_byte(value ? 1 : 0);
}
public void write_int8(int8 value) {
_buffer.append_byte(0x04);
_buffer.append_byte((uint8) value);
}
public void write_uint8(uint8 value) {
_buffer.append_byte(0x05);
_buffer.append_byte(value);
}
public void write_int16(int16 value) {
_buffer.append_byte(0x06);
_buffer.append_int16_be(value);
}
public void write_uint16(uint16 value) {
_buffer.append_byte(0x07);
_buffer.append_uint16_be(value);
}
public void write_int32(int32 value) {
_buffer.append_byte(0x08);
_buffer.append_int32_be(value);
}
public void write_uint32(uint32 value) {
_buffer.append_byte(0x09);
_buffer.append_uint32_be(value);
}
public void write_int64(int64 value) {
_buffer.append_byte(0x0A);
_buffer.append_int64_be(value);
}
public void write_uint64(uint64 value) {
_buffer.append_byte(0x0B);
_buffer.append_uint64_be(value);
}
public void write_float(float value) {
_buffer.append_byte(0x0C);
_buffer.append_float_be(value);
}
public void write_double(double value) {
_buffer.append_byte(0x0D);
_buffer.append_double_be(value);
}
public void write_length(int64 length) {
// Variable-length encoding for lengths
if (length < 0x80) {
_buffer.append_byte((uint8) length);
} else if (length < 0x4000) {
_buffer.append_byte((uint8) ((length >> 8) | 0x80));
_buffer.append_byte((uint8) (length & 0xFF));
} else {
_buffer.append_byte(0xFF);
_buffer.append_int64_be(length);
}
}
public void write_array(Invercargill.ReadOnlyCollection<Invercargill.Element> array) {
_buffer.append_byte(0x0E);
write_length(array.count);
foreach (var element in array) {
write_element(element);
}
}
public void write_dictionary(Invercargill.ReadOnlyAssociative<string, Invercargill.Element> dict) {
_buffer.append_byte(0x0F);
write_length(dict.count);
foreach (var entry in dict.entries) {
write_string(entry.key);
write_element(entry.value);
}
}
public uint8[] to_bytes() {
return _buffer.to_bytes();
}
public void reset() {
_buffer.clear();
}
}
} // namespace Implexus.Serialization
namespace Implexus.Serialization {
public class ElementReader {
private uint8[] _data;
private int _position;
public ElementReader(uint8[] data) {
_data = data;
_position = 0;
}
public Invercargill.Element? read_element() throws SerializationError {
if (_position >= _data.length) {
throw new SerializationError.UNEXPECTED_END("Unexpected end of data");
}
var type_code = _data[_position++];
switch (type_code) {
case 0x00: return new Invercargill.NullElement();
case 0x02: return new Invercargill.ValueElement(read_string());
case 0x03: return new Invercargill.ValueElement(read_bool());
case 0x04: return new Invercargill.ValueElement(read_int8());
case 0x05: return new Invercargill.ValueElement(read_uint8());
case 0x06: return new Invercargill.ValueElement(read_int16());
case 0x07: return new Invercargill.ValueElement(read_uint16());
case 0x08: return new Invercargill.ValueElement(read_int32());
case 0x09: return new Invercargill.ValueElement(read_uint32());
case 0x0A: return new Invercargill.ValueElement(read_int64());
case 0x0B: return new Invercargill.ValueElement(read_uint64());
case 0x0C: return new Invercargill.ValueElement(read_float());
case 0x0D: return new Invercargill.ValueElement(read_double());
case 0x0E: return read_array();
case 0x0F: return read_dictionary();
default:
throw new SerializationError.UNKNOWN_TYPE("Unknown type code: 0x%02X", type_code);
}
}
private string read_string() {
var length = read_length();
var bytes = new uint8[length];
Memory.copy(bytes, &_data[_position], length);
_position += length;
return (string) bytes;
}
private bool read_bool() {
return _data[_position++] != 0;
}
private int8 read_int8() {
return (int8) _data[_position++];
}
private uint8 read_uint8() {
return _data[_position++];
}
private int16 read_int16() {
var value = (int16) (_data[_position] << 8 | _data[_position + 1]);
_position += 2;
return value;
}
private uint16 read_uint16() {
var value = (uint16) (_data[_position] << 8 | _data[_position + 1]);
_position += 2;
return value;
}
private int32 read_int32() {
var value = (int32) (
_data[_position] << 24 |
_data[_position + 1] << 16 |
_data[_position + 2] << 8 |
_data[_position + 3]
);
_position += 4;
return value;
}
private uint32 read_uint32() {
var value = (uint32) (
_data[_position] << 24 |
_data[_position + 1] << 16 |
_data[_position + 2] << 8 |
_data[_position + 3]
);
_position += 4;
return value;
}
private int64 read_int64() {
var value = (int64) (
(int64) _data[_position] << 56 |
(int64) _data[_position + 1] << 48 |
(int64) _data[_position + 2] << 40 |
(int64) _data[_position + 3] << 32 |
(int64) _data[_position + 4] << 24 |
(int64) _data[_position + 5] << 16 |
(int64) _data[_position + 6] << 8 |
(int64) _data[_position + 7]
);
_position += 8;
return value;
}
private uint64 read_uint64() {
var value = (uint64) (
(uint64) _data[_position] << 56 |
(uint64) _data[_position + 1] << 48 |
(uint64) _data[_position + 2] << 40 |
(uint64) _data[_position + 3] << 32 |
(uint64) _data[_position + 4] << 24 |
(uint64) _data[_position + 5] << 16 |
(uint64) _data[_position + 6] << 8 |
(uint64) _data[_position + 7]
);
_position += 8;
return value;
}
private float read_float() {
var bytes = new uint8[4];
Memory.copy(bytes, &_data[_position], 4);
_position += 4;
return *((float*) bytes);
}
private double read_double() {
var bytes = new uint8[8];
Memory.copy(bytes, &_data[_position], 8);
_position += 8;
return *((double*) bytes);
}
private int64 read_length() {
var first = _data[_position++];
if (first < 0x80) {
return first;
} else if (first < 0xFF) {
var second = _data[_position++];
return ((first & 0x7F) << 8) | second;
} else {
return read_int64();
}
}
private Invercargill.Element read_array() throws SerializationError {
var length = read_length();
var array = new Invercargill.DataStructures.Vector<Invercargill.Element>();
for (int64 i = 0; i < length; i++) {
array.add(read_element());
}
return new Invercargill.NativeElement(array);
}
private Invercargill.Element read_dictionary() throws SerializationError {
var length = read_length();
var dict = new Invercargill.DataStructures.Dictionary<string, Invercargill.Element>();
for (int64 i = 0; i < length; i++) {
var key = read_string();
var value = read_element();
dict.set(key, value);
}
return new Invercargill.NativeElement(dict);
}
}
} // namespace Implexus.Serialization
namespace Implexus.Serialization {
public class EntitySerializer {
public uint8[] serialize(Entity entity) throws SerializationError {
var writer = new ElementWriter();
// Write header
write_header(writer, entity);
// Write entity-specific data
switch (entity.entity_type) {
case EntityType.CONTAINER:
write_container(writer, (Container) entity);
break;
case EntityType.DOCUMENT:
write_document(writer, (Document) entity);
break;
case EntityType.CATEGORY:
write_category(writer, (Category) entity);
break;
case EntityType.INDEX:
write_index(writer, (Index) entity);
break;
}
return writer.to_bytes();
}
private void write_header(ElementWriter writer, Entity entity) {
// Magic
writer._buffer.append_byte(0x49); // 'I'
writer._buffer.append_byte(0x4D); // 'M'
writer._buffer.append_byte(0x50); // 'P'
writer._buffer.append_byte(0x58); // 'X'
// Version
writer._buffer.append_uint16_be(1);
// Entity type
writer._buffer.append_byte((uint8) entity.entity_type);
// Flags (reserved)
writer._buffer.append_uint16_be(0);
}
private void write_container(ElementWriter writer, Container container) {
// Categories only need path - children are tracked separately
writer.write_string(container.path.to_string());
}
private void write_document(ElementWriter writer, Document document) {
// Write path
writer.write_string(document.path.to_string());
// Write type label
writer.write_string(document.type_label);
// Write properties
var props = document.properties;
var prop_dict = new Invercargill.DataStructures.Dictionary<string, Invercargill.Element>();
foreach (var key in props.keys) {
prop_dict.set(key, props.get(key));
}
writer.write_dictionary(prop_dict);
}
private void write_category(ElementWriter writer, Category category) {
// Write path
writer.write_string(category.path.to_string());
// Write type label
writer.write_string(category.configured_type_label);
// Write expression
writer.write_string(category.configured_expression);
}
private void write_index(ElementWriter writer, Index index) {
// Write path
writer.write_string(index.path.to_string());
// Write type label
writer.write_string(index.configured_type_label);
// Write expression
writer.write_string(index.configured_expression);
}
}
} // namespace Implexus.Serialization
namespace Implexus.Serialization {
public class EntityDeserializer {
public Entity deserialize(uint8[] data, Engine engine, Path path) throws SerializationError {
var reader = new ElementReader(data);
// Read and validate header
var header = read_header(reader);
validate_header(header);
// Read entity-specific data
switch (header.entity_type) {
case EntityType.CONTAINER:
return read_container(reader, engine, path);
case EntityType.DOCUMENT:
return read_document(reader, engine, path);
case EntityType.CATEGORY:
return read_category(reader, engine, path);
case EntityType.INDEX:
return read_index(reader, engine, path);
default:
throw new SerializationError.UNKNOWN_TYPE(
"Unknown entity type: %d", (int) header.entity_type
);
}
}
private Header read_header(ElementReader reader) throws SerializationError {
var header = new Header();
// Magic
header.magic[0] = reader._data[reader._position++];
header.magic[1] = reader._data[reader._position++];
header.magic[2] = reader._data[reader._position++];
header.magic[3] = reader._data[reader._position++];
// Version
header.version = reader.read_uint16();
// Entity type
header.entity_type = (EntityType) reader._data[reader._position++];
// Flags
header.flags = reader.read_uint16();
return header;
}
private void validate_header(Header header) throws SerializationError {
if (header.magic[0] != 'I' || header.magic[1] != 'M' ||
header.magic[2] != 'P' || header.magic[3] != 'X') {
throw new SerializationError.INVALID_FORMAT("Invalid magic number");
}
if (header.version != 1) {
throw new SerializationError.UNSUPPORTED_VERSION(
"Unsupported version: %d", header.version
);
}
}
private Container read_container(ElementReader reader, Engine engine, Path path) {
// Path is stored but we already have it
var stored_path = reader.read_string();
return new Container(engine, path);
}
private Document read_document(ElementReader reader, Engine engine, Path path) {
// Path
var stored_path = reader.read_string();
// Type label
var type_label = reader.read_string();
// Create document
var doc = new Document(engine, path, type_label);
// Properties
var props_dict = reader.read_dictionary();
var dict = props_dict as Invercargill.DataStructures.Dictionary<string, Invercargill.Element>;
if (dict != null) {
foreach (var entry in dict.entries) {
doc.set_property(entry.key, entry.value);
}
}
return doc;
}
private Category read_category(ElementReader reader, Engine engine, Path path) {
// Path
var stored_path = reader.read_string();
// Type label
var type_label = reader.read_string();
// Expression
var expression = reader.read_string();
return new Category(engine, path, type_label, expression);
}
private Index read_index(ElementReader reader, Engine engine, Path path) {
// Path
var stored_path = reader.read_string();
// Type label
var type_label = reader.read_string();
// Expression
var expression = reader.read_string();
return new Index(engine, path, type_label, expression);
}
}
private class Header {
public uint8[] magic = new uint8[4];
public uint16 version;
public EntityType entity_type;
public uint16 flags;
}
} // namespace Implexus.Serialization
namespace Implexus.Serialization {
public errordomain SerializationError {
INVALID_FORMAT,
UNSUPPORTED_VERSION,
UNKNOWN_TYPE,
UNEXPECTED_END,
CORRUPT_DATA;
}
} // namespace Implexus.Serialization
namespace Implexus.Storage {
public class StorageConfiguration : Object {
private Storage _storage;
public StorageConfiguration(Storage storage) {
_storage = storage;
}
public Storage storage { get { return _storage; } }
// Configuration options
public bool auto_sync { get; set; default = true; }
public int cache_size { get; set; default = 1000; }
public bool enable_compression { get; set; default = false; }
}
} // namespace Implexus.Storage
Hooks remain synchronous and run in the DBM worker thread context. This allows hooks to perform additional database operations without thread-safety concerns.
// Hooks run synchronously in the DBM worker thread
// They can perform additional DB operations directly
public delegate void EntityHook(Entity entity);
// Example hook registration
engine.hook_manager.register_create_hook("Task", (entity) => {
// This runs in the DBM worker thread
// Can perform synchronous DB operations
update_indexes(entity);
});
Important: Since hooks run in the worker thread, they should not call async methods or block indefinitely. Long-running operations should be offloaded to separate threads.