| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- /**
- * MessageWriter - Message writing to output stream
- *
- * Provides functionality for writing protocol messages to
- * an OutputStream with framing and length prefixes.
- *
- * @version 0.1
- * @since 0.1
- */
- namespace Implexus.Protocol {
- /**
- * Writes protocol messages to an OutputStream.
- *
- * Handles the binary message format including:
- * - Header serialization
- * - Payload writing
- * - Request ID assignment
- *
- * Example usage:
- * {{{
- * var writer = new MessageWriter(output_stream);
- * try {
- * var request = new GetEntityRequest.for_path(path);
- * var request_id = writer.write_request(request);
- * // wait for response with matching request_id
- * } catch (ProtocolError e) {
- * warning("Failed to write message: %s", e.message);
- * }
- * }}}
- */
- public class MessageWriter : Object {
-
- /**
- * The underlying output stream.
- */
- private OutputStream _stream;
-
- /**
- * The next request ID to assign.
- */
- private uint16 _next_request_id;
-
- /**
- * Whether the writer is closed.
- */
- private bool _closed = false;
-
- /**
- * Creates a new MessageWriter for the given stream.
- *
- * @param stream The output stream to write to
- */
- public MessageWriter(OutputStream stream) {
- _stream = stream;
- _next_request_id = 1;
- }
-
- /**
- * Writes a message to the stream.
- *
- * @param message The message to write
- * @throws ProtocolError if writing fails
- */
- public void write_message(Message message) throws ProtocolError {
- if (_closed) {
- throw new ProtocolError.CONNECTION_CLOSED("Writer is closed");
- }
-
- try {
- // Serialize the complete message
- var data = message.serialize();
-
- // Write to stream
- size_t bytes_written = 0;
- _stream.write_all(data, out bytes_written);
- _stream.flush();
-
- } catch (IOError e) {
- if (e is IOError.CLOSED) {
- _closed = true;
- }
- throw new ProtocolError.IO_ERROR("Write error: %s".printf(e.message));
- }
- }
-
- /**
- * Writes a message asynchronously.
- *
- * @param message The message to write
- * @param priority The I/O priority
- * @param cancellable Optional cancellation token
- * @throws ProtocolError if writing fails
- */
- public async void write_message_async(
- Message message,
- int priority = GLib.Priority.DEFAULT,
- Cancellable? cancellable = null
- ) throws ProtocolError {
- if (_closed) {
- throw new ProtocolError.CONNECTION_CLOSED("Writer is closed");
- }
-
- try {
- // Serialize the complete message
- var data = message.serialize();
-
- // Write to stream asynchronously
- size_t bytes_written = 0;
- yield _stream.write_all_async(data, priority, cancellable, out bytes_written);
- yield _stream.flush_async(priority, cancellable);
-
- } catch (IOError e) {
- if (e is IOError.CLOSED) {
- _closed = true;
- }
- throw new ProtocolError.IO_ERROR("Async write error: %s".printf(e.message));
- }
- }
-
- /**
- * Writes a request message and assigns a request ID.
- *
- * This method automatically assigns a unique request ID to
- * the message before writing it.
- *
- * @param request The request message to write
- * @return The assigned request ID
- * @throws ProtocolError if writing fails
- */
- public uint16 write_request(Message request) throws ProtocolError {
- var request_id = _next_request_id++;
- request.request_id = request_id;
- write_message(request);
- return request_id;
- }
-
- /**
- * Writes a request message asynchronously.
- *
- * @param request The request message to write
- * @param priority The I/O priority
- * @param cancellable Optional cancellation token
- * @return The assigned request ID
- * @throws ProtocolError if writing fails
- */
- public async uint16 write_request_async(
- Message request,
- int priority = GLib.Priority.DEFAULT,
- Cancellable? cancellable = null
- ) throws ProtocolError {
- var request_id = _next_request_id++;
- request.request_id = request_id;
- yield write_message_async(request, priority, cancellable);
- return request_id;
- }
-
- /**
- * Writes a response message with a specific request ID.
- *
- * @param response The response message to write
- * @param request_id The request ID to match
- * @throws ProtocolError if writing fails
- */
- public void write_response(Message response, uint16 request_id) throws ProtocolError {
- response.request_id = request_id;
- write_message(response);
- }
-
- /**
- * Writes a response message asynchronously.
- *
- * @param response The response message to write
- * @param request_id The request ID to match
- * @param priority The I/O priority
- * @param cancellable Optional cancellation token
- * @throws ProtocolError if writing fails
- */
- public async void write_response_async(
- Message response,
- uint16 request_id,
- int priority = GLib.Priority.DEFAULT,
- Cancellable? cancellable = null
- ) throws ProtocolError {
- response.request_id = request_id;
- yield write_message_async(response, priority, cancellable);
- }
-
- /**
- * Writes an error response.
- *
- * @param request_id The request ID to respond to
- * @param error_code The error code
- * @param error_message The error message
- * @throws ProtocolError if writing fails
- */
- public void write_error(uint16 request_id, int error_code, string error_message) throws ProtocolError {
- var error = new ErrorResponse.with_error(error_code, error_message);
- write_response(error, request_id);
- }
-
- /**
- * Writes a success response.
- *
- * @param request_id The request ID to respond to
- * @throws ProtocolError if writing fails
- */
- public void write_success(uint16 request_id) throws ProtocolError {
- var success = new SuccessResponse();
- write_response(success, request_id);
- }
-
- /**
- * Flushes the underlying stream.
- *
- * @throws ProtocolError if flushing fails
- */
- public void flush() throws ProtocolError {
- if (_closed) {
- throw new ProtocolError.CONNECTION_CLOSED("Writer is closed");
- }
-
- try {
- _stream.flush();
- } catch (IOError e) {
- throw new ProtocolError.IO_ERROR("Flush error: %s".printf(e.message));
- }
- }
-
- /**
- * Flushes the underlying stream asynchronously.
- *
- * @param priority The I/O priority
- * @param cancellable Optional cancellation token
- * @throws ProtocolError if flushing fails
- */
- public async void flush_async(
- int priority = GLib.Priority.DEFAULT,
- Cancellable? cancellable = null
- ) throws ProtocolError {
- if (_closed) {
- throw new ProtocolError.CONNECTION_CLOSED("Writer is closed");
- }
-
- try {
- yield _stream.flush_async(priority, cancellable);
- } catch (IOError e) {
- throw new ProtocolError.IO_ERROR("Async flush error: %s".printf(e.message));
- }
- }
-
- /**
- * Closes the writer.
- *
- * After closing, all write operations will throw CONNECTION_CLOSED.
- */
- public void close() {
- _closed = true;
- }
-
- /**
- * Whether the writer is closed.
- */
- public bool is_closed {
- get { return _closed; }
- }
-
- /**
- * The next request ID that will be assigned.
- */
- public uint16 next_request_id {
- get { return _next_request_id; }
- }
- }
- } // namespace Implexus.Protocol
|