/** * MessageWriter - Message writing to output stream * * Provides functionality for writing protocol messages to * an OutputStream with framing and length prefixes. * * @version 0.1 * @since 0.1 */ namespace Implexus.Protocol { /** * Writes protocol messages to an OutputStream. * * Handles the binary message format including: * - Header serialization * - Payload writing * - Request ID assignment * * Example usage: * {{{ * var writer = new MessageWriter(output_stream); * try { * var request = new GetEntityRequest.for_path(path); * var request_id = writer.write_request(request); * // wait for response with matching request_id * } catch (ProtocolError e) { * warning("Failed to write message: %s", e.message); * } * }}} */ public class MessageWriter : Object { /** * The underlying output stream. */ private OutputStream _stream; /** * The next request ID to assign. */ private uint16 _next_request_id; /** * Whether the writer is closed. */ private bool _closed = false; /** * Creates a new MessageWriter for the given stream. * * @param stream The output stream to write to */ public MessageWriter(OutputStream stream) { _stream = stream; _next_request_id = 1; } /** * Writes a message to the stream. * * @param message The message to write * @throws ProtocolError if writing fails */ public void write_message(Message message) throws ProtocolError { if (_closed) { throw new ProtocolError.CONNECTION_CLOSED("Writer is closed"); } try { // Serialize the complete message var data = message.serialize(); // Write to stream size_t bytes_written = 0; _stream.write_all(data, out bytes_written); _stream.flush(); } catch (IOError e) { if (e is IOError.CLOSED) { _closed = true; } throw new ProtocolError.IO_ERROR("Write error: %s".printf(e.message)); } } /** * Writes a message asynchronously. * * @param message The message to write * @param priority The I/O priority * @param cancellable Optional cancellation token * @throws ProtocolError if writing fails */ public async void write_message_async( Message message, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null ) throws ProtocolError { if (_closed) { throw new ProtocolError.CONNECTION_CLOSED("Writer is closed"); } try { // Serialize the complete message var data = message.serialize(); // Write to stream asynchronously size_t bytes_written = 0; yield _stream.write_all_async(data, priority, cancellable, out bytes_written); yield _stream.flush_async(priority, cancellable); } catch (IOError e) { if (e is IOError.CLOSED) { _closed = true; } throw new ProtocolError.IO_ERROR("Async write error: %s".printf(e.message)); } } /** * Writes a request message and assigns a request ID. * * This method automatically assigns a unique request ID to * the message before writing it. * * @param request The request message to write * @return The assigned request ID * @throws ProtocolError if writing fails */ public uint16 write_request(Message request) throws ProtocolError { var request_id = _next_request_id++; request.request_id = request_id; write_message(request); return request_id; } /** * Writes a request message asynchronously. * * @param request The request message to write * @param priority The I/O priority * @param cancellable Optional cancellation token * @return The assigned request ID * @throws ProtocolError if writing fails */ public async uint16 write_request_async( Message request, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null ) throws ProtocolError { var request_id = _next_request_id++; request.request_id = request_id; yield write_message_async(request, priority, cancellable); return request_id; } /** * Writes a response message with a specific request ID. * * @param response The response message to write * @param request_id The request ID to match * @throws ProtocolError if writing fails */ public void write_response(Message response, uint16 request_id) throws ProtocolError { response.request_id = request_id; write_message(response); } /** * Writes a response message asynchronously. * * @param response The response message to write * @param request_id The request ID to match * @param priority The I/O priority * @param cancellable Optional cancellation token * @throws ProtocolError if writing fails */ public async void write_response_async( Message response, uint16 request_id, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null ) throws ProtocolError { response.request_id = request_id; yield write_message_async(response, priority, cancellable); } /** * Writes an error response. * * @param request_id The request ID to respond to * @param error_code The error code * @param error_message The error message * @throws ProtocolError if writing fails */ public void write_error(uint16 request_id, int error_code, string error_message) throws ProtocolError { var error = new ErrorResponse.with_error(error_code, error_message); write_response(error, request_id); } /** * Writes a success response. * * @param request_id The request ID to respond to * @throws ProtocolError if writing fails */ public void write_success(uint16 request_id) throws ProtocolError { var success = new SuccessResponse(); write_response(success, request_id); } /** * Flushes the underlying stream. * * @throws ProtocolError if flushing fails */ public void flush() throws ProtocolError { if (_closed) { throw new ProtocolError.CONNECTION_CLOSED("Writer is closed"); } try { _stream.flush(); } catch (IOError e) { throw new ProtocolError.IO_ERROR("Flush error: %s".printf(e.message)); } } /** * Flushes the underlying stream asynchronously. * * @param priority The I/O priority * @param cancellable Optional cancellation token * @throws ProtocolError if flushing fails */ public async void flush_async( int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null ) throws ProtocolError { if (_closed) { throw new ProtocolError.CONNECTION_CLOSED("Writer is closed"); } try { yield _stream.flush_async(priority, cancellable); } catch (IOError e) { throw new ProtocolError.IO_ERROR("Async flush error: %s".printf(e.message)); } } /** * Closes the writer. * * After closing, all write operations will throw CONNECTION_CLOSED. */ public void close() { _closed = true; } /** * Whether the writer is closed. */ public bool is_closed { get { return _closed; } } /** * The next request ID that will be assigned. */ public uint16 next_request_id { get { return _next_request_id; } } } } // namespace Implexus.Protocol