ソースを参照

Breaking changes to the Muxer Protocol. Bug fixes and STP tweaks

Billy Barrow 3 年 前
コミット
23c400a83e
32 ファイル変更493 行追加153 行削除
  1. 65 3
      src/lib/Networks/IPv4/IPv4.vala
  2. 3 4
      src/lib/Networks/IPv4/IPv4PeerInfo.vala
  3. 2 0
      src/lib/Networks/Network.vala
  4. 3 4
      src/lib/Networks/PeerInfo.vala
  5. 4 0
      src/lib/Networks/Simulation/NetSim.vala
  6. 5 4
      src/lib/Protocols/AIP/AipCapabilities.vala
  7. 3 4
      src/lib/Protocols/AIP/Answer.vala
  8. 24 16
      src/lib/Protocols/AIP/ApplicationInformationProtocol.vala
  9. 3 4
      src/lib/Protocols/AIP/InstanceInformation.vala
  10. 3 5
      src/lib/Protocols/AIP/Query.vala
  11. 66 0
      src/lib/Protocols/MX2/Assembler.vala
  12. 48 0
      src/lib/Protocols/MX2/Fragment.vala
  13. 40 0
      src/lib/Protocols/MX2/Fragmenter.vala
  14. 2 0
      src/lib/Protocols/MX2/Frame.vala
  15. 22 8
      src/lib/Protocols/MX2/Muxer.vala
  16. 4 4
      src/lib/Protocols/STP/Messages/BeginSession.vala
  17. 4 4
      src/lib/Protocols/STP/Messages/NegotiateSession.vala
  18. 4 4
      src/lib/Protocols/STP/Messages/RequestSession.vala
  19. 3 4
      src/lib/Protocols/STP/Messages/SegmentMessage.vala
  20. 4 4
      src/lib/Protocols/STP/Segments/Acknowledgement.vala
  21. 4 4
      src/lib/Protocols/STP/Segments/Control.vala
  22. 4 4
      src/lib/Protocols/STP/Segments/Payload.vala
  23. 58 62
      src/lib/Protocols/STP/Sessions/EgressSession.vala
  24. 5 0
      src/lib/Protocols/STP/Sessions/IngressSession.vala
  25. 1 1
      src/lib/Protocols/STP/Sessions/Session.vala
  26. 8 5
      src/lib/Protocols/STP/StreamTransmissionProtocol.vala
  27. 10 1
      src/lib/Protocols/STP/Streams/InputStream.vala
  28. 1 1
      src/lib/Protocols/STP/Streams/OutputStream.vala
  29. 13 0
      src/lib/Util/ByteComposer.vala
  30. 25 0
      src/lib/Util/Streams.vala
  31. 4 0
      src/lib/meson.build
  32. 48 3
      src/toys/discoverer/Discoverer.vala

+ 65 - 3
src/lib/Networks/IPv4/IPv4.vala

@@ -20,6 +20,14 @@ namespace LibPeer.Networks.IPv4 {
         private const uint8 DGRAM_INQUIRE = 1;
         private const uint8 DGRAM_INSTANCE = 2;
 
+        private static string[] dns_seeds = new string[] {
+            "libpeer.localresolver",
+            "libpeer.pcthingz.com",
+            "libpeer.unitatem.net",
+            "libpeer.mooo.com",
+            "libpeer.barrow.nz"
+        };
+
         public IPv4(string address, uint16 port) {
             socket = new Socket(SocketFamily.IPV4, SocketType.DATAGRAM, SocketProtocol.UDP);
             multicast_socket = new Socket(SocketFamily.IPV4, SocketType.DATAGRAM, SocketProtocol.UDP);
@@ -33,6 +41,10 @@ namespace LibPeer.Networks.IPv4 {
             return new Bytes({'I', 'P', 'v', '4'});
         }
 
+        public override uint16 get_mtu() {
+            return 511;
+        }
+
         public override void bring_up() throws IOError, Error {
             // Bind the main socket
             socket.bind(socket_address, false);
@@ -43,6 +55,7 @@ namespace LibPeer.Networks.IPv4 {
 
             new Thread<bool>("LibPeer IPv4 Listener", listen);
             new Thread<bool>("LibPeer IPv4 Local Discovery", multicast_listen);
+            new Thread<bool>("LibPeer IPv4 DNS Discovery", dns_discovery);
         }
 
         public override void bring_down() throws IOError, Error {
@@ -51,8 +64,7 @@ namespace LibPeer.Networks.IPv4 {
 
         public override void advertise(InstanceReference instance_reference) throws IOError, Error {
             var stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             dos.write(multicast_magic_number);
             dos.put_uint16(socket_address.get_port());
@@ -144,7 +156,7 @@ namespace LibPeer.Networks.IPv4 {
                 var inet_address = (InetSocketAddress)address;
 
                 var stream = new MemoryInputStream.from_data(buffer);
-                var dis = new DataInputStream(stream);
+                var dis = StreamUtil.get_data_input_stream(stream);
 
                 var magic_number = dis.read_bytes(multicast_magic_number.length);
                 if(magic_number.compare(new Bytes(multicast_magic_number)) != 0) {
@@ -177,6 +189,56 @@ namespace LibPeer.Networks.IPv4 {
             return false;
         }
 
+        private bool dns_discovery() {
+            // Loop over each DNS seed
+            var resolver = Resolver.get_default();
+            foreach (var domain in dns_seeds) {
+                // Try and query
+                try {
+                    var results = resolver.lookup_records(domain, ResolverRecordType.TXT);
+                    foreach (var result in results) {
+
+                        foreach (var child in result) {
+                            foreach (var line in child.get_strv()) {
+                                // Is this a LibPeer entry?
+                                if(line.substring(0, 2) != "P2") {
+                                    continue;
+                                }
+
+                                // Split on delimiter
+                                var data = line.split("/");
+
+                                if(data[0] == "P2M") {
+                                    // Seed message
+                                    stderr.printf(@"[LibPeer] DNS Seed MotD ($(domain)): $(data[1])\n");
+                                }
+                                else if(data[0] == "P2D") {
+                                    try {
+                                        print(@"Lookup address to inquire: $(data[1])\n");
+                                        var addresses = resolver.lookup_by_name(data[1]);
+                                        foreach (var address in addresses) {
+                                            inquire(address, int.parse(data[2]));
+                                        }
+                                    }
+                                    catch {}
+                                }
+                                else if(data[0] == "P2A") {
+                                    inquire(new InetAddress.from_string(data[1]), int.parse(data[2]));
+                                }
+                            }
+                        }
+                    }
+                }
+                catch {}
+            }
+            return true;
+        }
+
+        private void inquire(InetAddress address, int port) {
+            print(@"Sending IPv4 inquiry for instances to $(address.to_string()):$(port)\n");
+            socket.send_to(new InetSocketAddress(address, (uint16)port), new uint8[] { DGRAM_INQUIRE });
+        }
+
     }
 
 }

+ 3 - 4
src/lib/Networks/IPv4/IPv4PeerInfo.vala

@@ -1,5 +1,6 @@
 
 using LibPeer.Networks;
+using LibPeer.Util;
 
 namespace LibPeer.Networks.IPv4 {
 
@@ -20,8 +21,7 @@ namespace LibPeer.Networks.IPv4 {
 
         protected override void build(uint8 data_length, InputStream stream, Bytes network_type) throws Error
         requires (data_length == 6) {
-            DataInputStream dis = new DataInputStream(stream);
-            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream dis = StreamUtil.get_data_input_stream(stream);
             
             for(int i = 0; i < 4; i ++) {
                 address[i] = dis.read_byte();
@@ -32,8 +32,7 @@ namespace LibPeer.Networks.IPv4 {
 
         protected override Bytes get_data_segment() {
             var stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             for(int i = 0; i < 4; i ++) {
                 dos.put_byte(address[i]);

+ 2 - 0
src/lib/Networks/Network.vala

@@ -6,6 +6,8 @@ namespace LibPeer.Networks {
         
         public abstract Bytes get_network_identifier();
 
+        public abstract uint16 get_mtu();
+
         public signal void incoming_advertisment(Advertisement advertisement);
 
         public signal void incoming_receiption(Receiption receiption);

+ 3 - 4
src/lib/Networks/PeerInfo.vala

@@ -23,8 +23,7 @@ namespace LibPeer.Networks
 
         public void serialise(OutputStream stream) throws IOError, Error {
             // Create a stream writer
-            var writer = new DataOutputStream(stream);
-            writer.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var writer = StreamUtil.get_data_output_stream(stream);
             print("Start serialising PeerInfo\n");
 
             // Get the informational data
@@ -48,12 +47,12 @@ namespace LibPeer.Networks
             // Write the data
             writer.write_bytes(data);
             print("Serialised peer info\n");
+            writer.flush();
         }
         
         public static PeerInfo deserialise(InputStream stream) throws IOError, Error {
             // Create a data input stream
-            var reader = new DataInputStream(stream);
-            reader.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var reader = StreamUtil.get_data_input_stream(stream);
 
             // Get the length of the network type
             var type_length = reader.read_byte();

+ 4 - 0
src/lib/Networks/Simulation/NetSim.vala

@@ -28,6 +28,10 @@ namespace LibPeer.Networks.Simulation {
         public override GLib.Bytes get_network_identifier () {
             return new Bytes({'N', 'e', 't', 'S', 'i', 'm'});
         }
+
+        public override uint16 get_mtu() {
+            return uint16.MAX;
+        }
     
         public override void bring_up() throws IOError, Error {
             if (up) {

+ 5 - 4
src/lib/Protocols/AIP/AipCapabilities.vala

@@ -9,8 +9,7 @@ namespace LibPeer.Protocols.Aip {
         public bool query_answer { get; set; }
 
         public void serialise(OutputStream stream) throws IOError {
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             var composer = new ByteComposer();
             if(address_info) {
@@ -26,14 +25,16 @@ namespace LibPeer.Protocols.Aip {
             var data = composer.to_byte_array();
 
             dos.put_byte((uint8)data.length);
+            dos.flush();
             dos.write(data);
+            dos.flush();
         }
 
         public AipCapabilities.from_stream(InputStream stream) throws IOError {
-            var dis = new DataInputStream(stream);
-            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dis = StreamUtil.get_data_input_stream(stream);
 
             var capability_count = dis.read_byte();
+            print(@"Reading $(capability_count) capabilities\n");
 
             for (var i = 0; i < capability_count; i++) {
                 var byte = dis.read_byte();

+ 3 - 4
src/lib/Protocols/AIP/Answer.vala

@@ -1,4 +1,5 @@
 using LibPeer.Protocols.Mx2;
+using LibPeer.Util;
 using Gee;
 
 namespace LibPeer.Protocols.Aip {
@@ -12,8 +13,7 @@ namespace LibPeer.Protocols.Aip {
         public InstanceReference[] path { get; set; }
 
         public void serialise(OutputStream stream) throws IOError {
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             dos.write_bytes(in_reply_to);
 
@@ -28,8 +28,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         public Answer.from_stream(InputStream stream) throws IOError{
-            var dis = new DataInputStream(stream);
-            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dis = StreamUtil.get_data_input_stream(stream);
 
             // What is this in reply to?
             in_reply_to = dis.read_bytes(16);

+ 24 - 16
src/lib/Protocols/AIP/ApplicationInformationProtocol.vala

@@ -53,7 +53,16 @@ namespace LibPeer.Protocols.Aip {
         protected TimeoutMap<InstanceReference, Bytes> pending_group_peers = new TimeoutMap<InstanceReference, Bytes>(120, (a) => a.hash(), (a, b) => a.compare(b) == 0);
         public signal void ready();
 
-        private Gee.List<Query> pending_queries = new Gee.LinkedList<Query>();
+        private class PendingQueryAnswer {
+            public Query query;
+            public InstanceReference instance_reference;
+            public PendingQueryAnswer(Query q, InstanceReference ir) {
+                query = q;
+                instance_reference = ir;
+            }
+        }
+
+        private Gee.List<PendingQueryAnswer> pending_queries = new Gee.LinkedList<PendingQueryAnswer>();
 
         public ApplicationInformationProtocol(Muxer muxer, AipCapabilities? capabilities = null, bool join_all = false) {
             if(capabilities == null) {
@@ -95,6 +104,7 @@ namespace LibPeer.Protocols.Aip {
             new_group_peer.connect((instance_ref, id) => {
                 print("New group peer?\n");
                 if(id.compare(info.namespace_bytes) == 0) {
+                    query_groups.get(info.namespace_bytes).add_peer(instance_ref);
                     print("New group peer\n");
                     info.new_group_peer();
                 }
@@ -198,7 +208,7 @@ namespace LibPeer.Protocols.Aip {
                 print("Sending pending queries");
                 // Clear the list
                 var queries = pending_queries;
-                pending_queries = new Gee.LinkedList<Query>();
+                pending_queries = new Gee.LinkedList<PendingQueryAnswer>();
 
                 // Send pending queries
                 foreach (var query in queries) {
@@ -259,8 +269,7 @@ namespace LibPeer.Protocols.Aip {
             var request_data = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
             var request = new Request<Gee.List<InstanceInformation>>();
             send_request(request_data, target).response.connect(s => {
-                var dis = new DataInputStream(s);
-                dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+                var dis = StreamUtil.get_data_input_stream(s);
                 // Read number of peers
                 var peer_count = dis.read_byte();
 
@@ -430,7 +439,7 @@ namespace LibPeer.Protocols.Aip {
                 // Are we in this group?
                 if(query_groups.has_key(group_id)) {
                     // Yes, send a reply
-                    queue_query_answer(query);
+                    queue_query_answer(query, instance.reference);
                 }
 
                 // This is a query for a group, forward on to the default group
@@ -448,7 +457,7 @@ namespace LibPeer.Protocols.Aip {
                         // Is this app relevent? TODO: Use a hashmap
                         if(app.namespace_bytes.compare(app_namespace) == 0) {
                             // Yes, answer the query
-                            queue_query_answer(query);
+                            queue_query_answer(query, app.instance);
                         }
                     }
 
@@ -471,7 +480,7 @@ namespace LibPeer.Protocols.Aip {
                         // Is this app relevent and does it have this resource?
                         if(app.namespace_bytes.compare(app_namespace) == 0 && app.resource_set.contains(label)) {
                             // Yes, answer the query
-                            queue_query_answer(query);
+                            queue_query_answer(query, app.instance);
                         }
                     }
 
@@ -483,23 +492,24 @@ namespace LibPeer.Protocols.Aip {
             
         }
 
-        protected void queue_query_answer(Query query) {
+        protected void queue_query_answer(Query query, InstanceReference reference) {
             print("Queue query answer\n");
+            var query_answer = new PendingQueryAnswer(query, reference);
             // Do we have peer info to send yet?
             if(peer_info.size > 0) {
                 print("Query sent immediately\n");
                 // Yes, do it
-                send_query_answer(query);
+                send_query_answer(query_answer);
             }
             else {
                 // No, wait for peer info
-                pending_queries.add(query);
+                pending_queries.add(query_answer);
             }
         }
 
-        protected void send_query_answer(Query query) {
+        private void send_query_answer(PendingQueryAnswer query_answer) {
             // Create some instance information
-            var instance_info = new InstanceInformation(instance.reference, peer_info.to_array());
+            var instance_info = new InstanceInformation(query_answer.instance_reference, peer_info.to_array());
 
             // Serialise the info
             MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
@@ -512,8 +522,8 @@ namespace LibPeer.Protocols.Aip {
             // Send the instance information in the answer
             var answer = new Answer() {
                 data = new Bytes(buffer),
-                in_reply_to = query.identifier,
-                path = query.return_path
+                in_reply_to = query_answer.query.identifier,
+                path = query_answer.query.return_path
             };
 
             // Send the answer
@@ -542,8 +552,6 @@ namespace LibPeer.Protocols.Aip {
 
             // Handler for query answer
             query.on_answer.connect(answer => {
-                // Add to group
-                query_groups.get(group).add_peer(answer.instance_reference);
 
                 // Are we already connected to this peer?
                 if(reachable_peers.contains(answer.instance_reference)) {

+ 3 - 4
src/lib/Protocols/AIP/InstanceInformation.vala

@@ -1,5 +1,6 @@
 using LibPeer.Protocols.Mx2;
 using LibPeer.Networks;
+using LibPeer.Util;
 
 namespace LibPeer.Protocols.Aip {
 
@@ -15,8 +16,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         public void serialise(OutputStream stream) throws IOError, Error {
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             // Write instance reference
             instance_reference.serialise(dos);
@@ -32,8 +32,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         public InstanceInformation.from_stream(InputStream stream) throws IOError, Error {
-            var dis = new DataInputStream(stream);
-            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dis = StreamUtil.get_data_input_stream(stream);
 
             // Read the instance reference
             instance_reference = new InstanceReference.from_stream(dis);

+ 3 - 5
src/lib/Protocols/AIP/Query.vala

@@ -1,4 +1,5 @@
 using LibPeer.Protocols.Mx2;
+using LibPeer.Util;
 using Gee;
 
 namespace LibPeer.Protocols.Aip {
@@ -18,8 +19,7 @@ namespace LibPeer.Protocols.Aip {
         public signal void on_answer(InstanceInformation answer);
 
         public void serialise(OutputStream stream) throws IOError, Error {
-            var dos = new DataOutputStream(stream);
-            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dos = StreamUtil.get_data_output_stream(stream);
 
             // Write query identifier
             dos.write_bytes(identifier);
@@ -42,9 +42,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         public Query.from_stream(InputStream stream){
-            var dis = new DataInputStream(stream);
-            //  dis.buffer_size = 2;
-            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var dis = StreamUtil.get_data_input_stream(stream);
 
             // Read the identifier
             print("\tIdentifier\n");

+ 66 - 0
src/lib/Protocols/MX2/Assembler.vala

@@ -0,0 +1,66 @@
+using LibPeer.Util;
+using LibPeer.Networks;
+
+namespace LibPeer.Protocols.Mx2 {
+
+    public class Assembler {
+
+        private uint64 message_id = uint64.MAX;
+
+        private uint32 fragment_seqn = 0;
+
+        private uint32 fragment_count = 0;
+
+        private ByteComposer composer = new ByteComposer();
+
+        private bool success = false;
+
+        public InputStream? handle_data(InputStream stream) throws IOError, Error {
+            // Read the fragment
+            var fragment = new Fragment.from_stream(stream);
+            
+            // Are we currently reading this message?
+            if(fragment.message_number != message_id) {
+                // No, reset and start reading
+                reset(fragment);
+            }
+
+            // Is this the next expected sequence number?
+            if(fragment.fragment_number != fragment_seqn) {
+                //  print("Dropped or out of order fragment\n");
+                // No, we may have lost one. Drop
+                return null;
+            }
+
+            // Increment next expected sequence number
+            fragment_seqn ++;
+
+            // Add the fragment data
+            composer.add_byte_array(fragment.payload);
+
+            // Is this the last fragment for this message?
+            if(fragment_seqn == fragment_count) {
+                // Yes, create a memory stream and return
+                //  print(@"Message $(message_id) assembled\n");
+                success = true;
+                return new MemoryInputStream.from_data(composer.to_byte_array());
+            }
+
+            // Nothing to return yet
+            return null;
+        }
+
+        private void reset(Fragment fragment) {
+            if(!success) {
+                //  print(@"Message $(message_id) dropped\n");
+            }
+            message_id = fragment.message_number;
+            fragment_seqn = 0;
+            fragment_count = fragment.total_fragments;
+            composer = new ByteComposer();
+            success = false;
+        }
+
+    }
+
+}

+ 48 - 0
src/lib/Protocols/MX2/Fragment.vala

@@ -0,0 +1,48 @@
+
+using LibPeer.Util;
+
+namespace LibPeer.Protocols.Mx2 {
+
+    public class Fragment {
+
+        public const int HEADER_LENGTH = 16;
+
+        public uint64 message_number { get; set; }
+
+        public uint32 fragment_number { get; set; }
+
+        public uint32 total_fragments { get; set; }
+
+        public uint8[] payload { get; set;}
+
+        public Fragment (uint64 message, uint32 number, uint32 total, uint8[] data) {
+            message_number = message;
+            fragment_number = number;
+            total_fragments = total;
+            payload = data;
+        }
+
+        public void serialise(OutputStream stream) throws Error, IOError {
+            var dos = StreamUtil.get_data_output_stream(stream);
+            dos.put_uint64(message_number);
+            dos.put_uint32(fragment_number);
+            dos.put_uint32(total_fragments);
+            dos.put_uint16((uint16)payload.length);
+            dos.write(payload);
+            dos.flush();
+        }
+
+        public Fragment.from_stream(InputStream stream) throws Error, IOError {
+            var dis = StreamUtil.get_data_input_stream(stream);
+            message_number = dis.read_uint64();
+            fragment_number = dis.read_uint32();
+            total_fragments = dis.read_uint32();
+            var payload_length = dis.read_uint16();
+            payload = new uint8[payload_length];
+            dis.read(payload);
+            //  print(@"Fragment: mno = $(message_number); seqno = $(fragment_number); flen = $(total_fragments); size = $(payload_length);\n");
+        }
+
+    }
+
+}

+ 40 - 0
src/lib/Protocols/MX2/Fragmenter.vala

@@ -0,0 +1,40 @@
+using LibPeer.Util;
+using LibPeer.Networks;
+
+namespace LibPeer.Protocols.Mx2 {
+
+    public class Fragmenter {
+
+        private uint64 message_seqn = 0;
+
+        public void send_frame(Frame frame, Instance instance, Network network, PeerInfo info) throws IOError, Error {
+            // Get the size for the fragments
+            var fragment_size = network.get_mtu() - Fragment.HEADER_LENGTH;
+
+            // Serialise the entire frame
+            MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
+            frame.serialise(stream, instance);
+            stream.close();
+            uint8[] buffer = stream.steal_data();
+            buffer.length = (int)stream.get_data_size();
+            
+            // Calculate number of needed fragments
+            uint32 fragment_count = (buffer.length / fragment_size) + uint32.min(buffer.length % fragment_size, 1);
+
+            lock(message_seqn) {
+                try {
+                    // Create the fragments and send them
+                    for(uint32 i = 0; i < fragment_count; i++) {
+                        var fragment = new Fragment(message_seqn, i, fragment_count, buffer[i*fragment_size : uint32.min((i+1)*fragment_size, buffer.length)]);
+                        network.send_with_stream(info, fragment.serialise);
+                    }
+                }
+                finally {
+                    message_seqn++;
+                }
+            }
+        }
+
+    }
+
+}

+ 2 - 0
src/lib/Protocols/MX2/Frame.vala

@@ -92,9 +92,11 @@ namespace LibPeer.Protocols.Mx2 {
             // Verify the signature and get plaintext message
             uint8[]? payload = Asymmetric.Signing.verify(signed_payload, origin.verification_key);
 
+            
             if (payload == null) {
                 throw new IOError.FAILED("Payload signature is invalid");
             }
+            
 
             this.payload = payload;
         }

+ 22 - 8
src/lib/Protocols/MX2/Muxer.vala

@@ -23,6 +23,9 @@ namespace LibPeer.Protocols.Mx2 {
 
         private ConcurrentHashMap<InstanceReference, int> pings = new ConcurrentHashMap<InstanceReference, int>((a) => a.hash(), (a, b) => a.compare(b) == 0);
 
+        private Fragmenter fragmenter = new Fragmenter();
+        private Assembler assembler = new Assembler();
+
         public void register_network(Network network) {
             // Get the network identifier
             Bytes network_identifier = network.get_network_identifier();
@@ -65,8 +68,6 @@ namespace LibPeer.Protocols.Mx2 {
             var inquiry = new Inquiry(destination);
             inquiries.set(inquiry.id, inquiry);
 
-            int packets = 0;
-
             // Loop over each peer to try
             foreach (PeerInfo peer in peers) {
                 // Get peer network identifier
@@ -90,7 +91,7 @@ namespace LibPeer.Protocols.Mx2 {
                     var frame = new Frame(destination, instance.reference, new PathInfo.empty(), packet);
 
                     // Send using the network and peer info
-                    network.send_with_stream(peer, (stream) => frame.serialise(stream, instance));
+                    fragmenter.send_frame(frame, instance, network, peer);
                 }
             }
 
@@ -119,11 +120,11 @@ namespace LibPeer.Protocols.Mx2 {
                 .add_byte_array(data)
                 .to_byte_array();
 
+            //  print(@"MX2_SEND:\"$(new Util.ByteComposer().add_byte_array(payload).to_escaped_string())\"\n");
+
             send_packet(instance, destination, payload);
         }
 
-        // TODO: Send
-
         public int suggested_timeout_for_instance(InstanceReference instance) {
             if(pings.has_key(instance)) {
                 return pings.get(instance) * 2;
@@ -145,12 +146,21 @@ namespace LibPeer.Protocols.Mx2 {
             Frame frame = new Frame(destination, instance.reference, access_info.path_info, payload);
 
             // Send the frame over the network
-            access_info.network.send_with_stream(access_info.peer_info, (stream) => frame.serialise(stream, instance));
+            fragmenter.send_frame(frame, instance, access_info.network, access_info.peer_info);
         }
 
-        protected void handle_receiption(Receiption receiption) throws Error, IOError {
+        protected void handle_receiption(Receiption receiption) {
+            // Pass to the assembler
+            var stream = assembler.handle_data(receiption.stream);
+
+            // Did the assembler return a stream?
+            if(stream == null) {
+                // No, message is not fully assembled
+                return;
+            }
+
             // Read the incoming frame
-            Frame frame = new Frame.from_stream(receiption.stream, instances);
+            Frame frame = new Frame.from_stream(stream, instances);
 
             // Get the instance
             Instance instance = instances.get(frame.destination);
@@ -207,6 +217,9 @@ namespace LibPeer.Protocols.Mx2 {
                 // Send the greeting
                 send_packet(instance, frame.origin, greeting);
             }
+            else {
+                print(@"$(instance.application_namespace) != $(application_namespace)\n");
+            }
 
         }
 
@@ -246,6 +259,7 @@ namespace LibPeer.Protocols.Mx2 {
 
         protected void handle_payload(Receiption receiption, Frame frame, Instance instance) throws Error {
             // This is a payload for the next layer to handle, pass it up.
+            //  print(@"MX2_HANDLE:\"$(new Util.ByteComposer().add_byte_array(frame.payload).to_escaped_string())\"\n");
             MemoryInputStream stream = new MemoryInputStream.from_data(frame.payload[1:frame.payload.length]);
             instance.incoming_payload(new Packet(frame.origin, frame.destination, stream));
         }

+ 4 - 4
src/lib/Protocols/STP/Messages/BeginSession.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Messages {
 
     public class BeginSession : Message {
@@ -14,16 +16,14 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.write (session_id.get_data ());
             os.put_uint64 (reply_timing);
             os.flush ();
         }
 
         public BeginSession.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             var b_session_id = new uint8[16];
             ins.read(b_session_id);
             session_id = new Bytes(b_session_id);

+ 4 - 4
src/lib/Protocols/STP/Messages/NegotiateSession.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Messages {
 
     public class NegotiateSession : Message {
@@ -19,8 +21,7 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.write (session_id.get_data ());
             os.put_byte ((uint8)feature_codes.length);
             if(feature_codes != null) {
@@ -32,8 +33,7 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         public NegotiateSession.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             var b_session_id = new uint8[16];
             ins.read(b_session_id);
             session_id = new Bytes(b_session_id);

+ 4 - 4
src/lib/Protocols/STP/Messages/RequestSession.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Messages {
 
     public class RequestSession : Message {
@@ -19,8 +21,7 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.write (session_id.get_data());
             os.write (in_reply_to.get_data());
             os.put_byte ((uint8)feature_codes.length);
@@ -32,8 +33,7 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         public RequestSession.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             var b_session_id = new uint8[16];
             ins.read(b_session_id);
             session_id = new Bytes(b_session_id);

+ 3 - 4
src/lib/Protocols/STP/Messages/SegmentMessage.vala

@@ -1,4 +1,5 @@
 using LibPeer.Protocols.Stp.Segments;
+using LibPeer.Util;
 
 namespace LibPeer.Protocols.Stp.Messages {
 
@@ -16,16 +17,14 @@ namespace LibPeer.Protocols.Stp.Messages {
         }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.write (session_id.get_data ());
             segment.serialise (os);
             os.flush ();
         }
 
         public SegmentMessage.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             var b_session_id = new uint8[16];
             ins.read(b_session_id);
             session_id = new Bytes(b_session_id);

+ 4 - 4
src/lib/Protocols/STP/Segments/Acknowledgement.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Segments {
 
     public class Acknowledgement : Segment {
@@ -10,16 +12,14 @@ namespace LibPeer.Protocols.Stp.Segments {
 
         protected override void serialise_data (OutputStream stream) {
             //  print(@"***Ack segment $(sequence_number)\n");
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.put_uint64 (sequence_number);
             os.put_uint64 (timing);
             os.flush ();
         }
 
         public Acknowledgement.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             sequence_number = ins.read_uint64 ();
             timing = ins.read_uint64 ();
         }

+ 4 - 4
src/lib/Protocols/STP/Segments/Control.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Segments {
 
     public class Control : Segment {
@@ -7,15 +9,13 @@ namespace LibPeer.Protocols.Stp.Segments {
         public ControlCommand command { get; private set; }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.put_byte(command.to_byte());
             os.flush ();
         }
 
         public Control.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             command = ControlCommand.from_byte(ins.read_byte());
         }
 

+ 4 - 4
src/lib/Protocols/STP/Segments/Payload.vala

@@ -1,3 +1,5 @@
+using LibPeer.Util;
+
 namespace LibPeer.Protocols.Stp.Segments {
 
     public class Payload : Segment {
@@ -11,8 +13,7 @@ namespace LibPeer.Protocols.Stp.Segments {
         public uint8[] data { get; private set; }
 
         protected override void serialise_data (OutputStream stream) {
-            DataOutputStream os = new DataOutputStream (stream);
-            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataOutputStream os = StreamUtil.get_data_output_stream(stream);
             os.put_uint64 (sequence_number);
             update_timing();
             os.put_uint64 (timing);
@@ -22,8 +23,7 @@ namespace LibPeer.Protocols.Stp.Segments {
         }
 
         public Payload.from_stream(InputStream stream) {
-            DataInputStream ins = new DataInputStream (stream);
-            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            DataInputStream ins = StreamUtil.get_data_input_stream(stream);
             sequence_number = ins.read_uint64 ();
             timing = ins.read_uint64 ();
             uint32 data_length = ins.read_uint32 ();

+ 58 - 62
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -1,13 +1,14 @@
 using LibPeer.Protocols.Mx2;
 using LibPeer.Protocols.Stp.Segments;
+using LibPeer.Protocols.Stp.Streams;
 using LibPeer.Util;
 using Gee;
 
 namespace LibPeer.Protocols.Stp.Sessions {
 
-    const int SEGMENT_PAYLOAD_SIZE = 16384;
+    const int SEGMENT_PAYLOAD_SIZE = 14000;
     const int METRIC_WINDOW_SIZE = 4;
-    const int MAX_WINDOW_SIZE = 65536;
+    const int64 MAX_WINDOW_SIZE = 9223372036854775808;
 
     public class EgressSession : Session {
 
@@ -21,15 +22,15 @@ namespace LibPeer.Protocols.Stp.Sessions {
         protected AsyncQueue<Payload> payload_queue = new AsyncQueue<Payload>();
 
         private int redundant_resends = 0;
-        private int window_size = METRIC_WINDOW_SIZE;
-        private uint64 best_ping = 0;
+        private uint64 window_size = METRIC_WINDOW_SIZE;
+        private uint64 best_ping = 10000;
         private uint64 worst_ping = 0;
-        private int adjustment_delta = 0;
+        private int64 adjustment_delta = 0;
         private uint64 last_send = 0;
 
         private uint64 next_sequence_number = 0;
 
-        public signal void received_reply(IngressSession session);
+        public signal void received_reply(StpInputStream stream);
 
         public EgressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
             base(target, session_id, ping);
@@ -67,6 +68,10 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
         }
 
+        private int64 window_time = 0;
+        private int64 last_window_time = 0;
+        private uint64 last_window_size = 0;
+
         private void handle_acknowledgement(Acknowledgement segment) {
             // Is this segment still in-flight?
             if(!in_flight.has_key(segment.sequence_number)) {
@@ -90,26 +95,39 @@ namespace LibPeer.Protocols.Stp.Sessions {
             var round_trip = (get_monotonic_time()/1000) - segment.timing;
 
             // Are we currently at metric window size?
-            if(window_size == METRIC_WINDOW_SIZE) {
+            //  if(window_size == METRIC_WINDOW_SIZE) {
                 // Yes, add round trip time to the list
                 segment_trips.add(round_trip);
 
                 // Do we have a sample?
                 if(segment_trips.size >= METRIC_WINDOW_SIZE) {
+                    var current_window_time = get_monotonic_time() - window_time;
                     // Update the ping based on the average of the metric segments
-                    uint64 avarage = 0;
+                    uint64 average = segment_trips[0];
                     foreach (var ping in segment_trips) {
-                        avarage += ping;
+                        average = (average + ping)/2;
                     }
-                    avarage = avarage / segment_trips.size;
-                    best_ping = avarage;
+                    
+                    var last = (last_window_size / (double)uint64.max(last_window_time, 1));
+                    var current = (window_size / (double)uint64.max(current_window_time, 1));
 
-                    adjust_window_size(round_trip);
-                }
-                else {
-                    adjust_window_size(round_trip);
+                    last_window_size = window_size;
+
+                    adjust_window_size(last, current);
+
+
+
+
+                    best_ping = uint64.min(best_ping, average);
+                    worst_ping = uint64.max(worst_ping, average);
+                    segment_trips.clear();
+                    last_window_time = current_window_time;
+                    window_time = get_monotonic_time();
                 }
-            }
+            //  }
+            //  else {
+            //      adjust_window_size(round_trip);
+            //  }
             
         }
 
@@ -124,7 +142,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
 
             // Calculate a maximum time value for segments eligable to be resent
-            uint64 max_time = (get_monotonic_time()/1000) - 5000; //(uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
+            uint64 max_time = (get_monotonic_time()/1000) - (uint64)(worst_ping + (redundant_resends * 10));
             
             // Do we have any in-flight segments to resend?
             foreach (var segment in in_flight.values) {
@@ -146,69 +164,46 @@ namespace LibPeer.Protocols.Stp.Sessions {
             return base.get_pending_segment();
         }
 
-        private void adjust_window_size(uint64 last_trip) {
-            uint64 last_trip_metric = last_trip / 1000;
-
-            // Is this the worst we have had?
-            if(worst_ping < last_trip) {
-                // Update worst ping metric
-                worst_ping = last_trip;
-            }
+        private void adjust_window_size(double last, double current) {
+            //  print(@"current = $(current); compare = $(last);\n");
 
-            // Has the trip time gotten longer?
-            if (last_trip_metric > best_ping) {
-                // Yes, were we previously increasing the window size?
-                if(adjustment_delta > 0) {
-                    // Yes, stop increasing it
-                    adjustment_delta = 0;
-                }
-                // Were we keeping the window size consistant?
-                else if(adjustment_delta == 0) {
-                    adjustment_delta = -1;
+            if(last > current) {
+                print("\t++\n");
+                if(adjustment_delta < 0) {
+                    adjustment_delta = 1;
                 }
-                // Were we previously decreasing it?
-                else if(adjustment_delta < 0) {
-                    adjustment_delta *= 2;
+                else {
+                    adjustment_delta ++;
                 }
             }
-            // Did the trip get shorter or stay the same?
-            else if (last_trip_metric <= best_ping) {
-                // Yes, were we previously increasing the window size?
+            else if(current > last) {
+                print("\t  --\n");
                 if(adjustment_delta > 0) {
-                    // Yes, increase it some more
-                    adjustment_delta *= 2;
-                }
-                // Were we previously keeping the window size consistant?
-                if(adjustment_delta == 0) {
-                    // Yes, start incrrasing ituint64? key
-                    adjustment_delta = 1;
+                    adjustment_delta = -1;
                 }
-                // Were we previosuly decreasing the window size?
-                if(adjustment_delta < 0) {
-                    // Yes, stop
-                    adjustment_delta = 0;
+                else {
+                    adjustment_delta --;
                 }
             }
-
-            // Apply the delta
+            else {
+                print("\t   ==\n");
+                adjustment_delta = 0;
+            }
+            adjustment_delta = int64.min(adjustment_delta, 4);
+            adjustment_delta = int64.max(adjustment_delta, -4);
             window_size += adjustment_delta;
 
             // Is the window size now less than the metric size?
             if(window_size < METRIC_WINDOW_SIZE) {
                 // Yes, reset it to the metric size
                 window_size = METRIC_WINDOW_SIZE;
-
-                // Update the delta so when we have our metric we can start increasing again
-                adjustment_delta = 1;
-
-                // Clear out our trip metrics
-                segment_trips.clear();
             }
             // Is the window size now bigger than the max window size?
             if(window_size > MAX_WINDOW_SIZE) {
                 // Yes, cap it
                 window_size = MAX_WINDOW_SIZE;
             }
+            print(@"WINDOW SIZE: $(window_size)\n");
         }
 
         protected override void close_session(string reason) {
@@ -246,8 +241,9 @@ namespace LibPeer.Protocols.Stp.Sessions {
                     // TODO run through features
                     segment_trackers.set(next_sequence_number, tracker);
                     tracker.add_segment();
-                    int payload_size = data.length < (i+1)*SEGMENT_PAYLOAD_SIZE ? data.length : (i+1)*SEGMENT_PAYLOAD_SIZE;
-                    payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:payload_size]));
+                    int payload_size = int.min(data.length, (i+1)*SEGMENT_PAYLOAD_SIZE);
+                    print(@"data.length: $(data.length); i: $(i); SEGMENT_PAYLOAD_SIZE: $(SEGMENT_PAYLOAD_SIZE); payload_size: $(payload_size)\n");
+                    payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:payload_size].copy()));
                     next_sequence_number ++;
                 }
             }

+ 5 - 0
src/lib/Protocols/STP/Sessions/IngressSession.vala

@@ -35,6 +35,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
         private void handle_payload(Payload segment) {
             // TODO: Feature handling
             // Is this a packet we are interested in?
+            //  print(@"Expecting: $(next_expected_sequence_number), got $(segment.sequence_number)\n");
             if(next_expected_sequence_number <= segment.sequence_number) {
                 // Add to reconstruction dictionary
                 reconstruction.set(segment.sequence_number, segment);
@@ -72,6 +73,8 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
             // Start a counter
             uint64 sequence = next_expected_sequence_number;
+
+            //  print(@"Reconstructing from seqno $(sequence)\n");
             
             // Loop until we don't have anything to reconstruct
             for (;reconstruction.has_key(sequence); sequence++) {
@@ -87,6 +90,8 @@ namespace LibPeer.Protocols.Stp.Sessions {
             // Sequence is now the next expected sequence number
             next_expected_sequence_number = sequence;
 
+            //  print(@"Reconstruction complete: \"$(composer.to_escaped_string())\"\n");
+
             // Return the composed reconstruction
             return composer.to_byte_array();
         }

+ 1 - 1
src/lib/Protocols/STP/Sessions/Session.vala

@@ -40,7 +40,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
         protected virtual void close_session(string reason) {
             open = false;
-            //  print(@"[SESSION CLOSED] $(reason)\n");
+            print(@"[SESSION CLOSED] $(reason)\n");
             session_closed(reason);
         }
 

+ 8 - 5
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -32,9 +32,9 @@ namespace LibPeer.Protocols.Stp {
             send_thread = new Thread<void>("STP Network Send Thread", send_loop);
         }
 
-        public Negotiation initialise_stream(InstanceReference target, uint8[]? in_reply_to = null) {
+        public Negotiation initialise_stream(InstanceReference target, uint8[]? in_reply_to = null) throws IOError{
             if(muxer.get_peer_info_for_instance(target) == null) {
-                Posix.abort();
+                throw new IOError.HOST_NOT_FOUND("Cannot initialise stream: no known way to reach specified instance");
             }
 
             // Initiate a stream with another peer
@@ -230,12 +230,15 @@ namespace LibPeer.Protocols.Stp {
             // TODO feature stuff
             // Create the session object
             Session session = null;
+            Object stream = null;
             switch (negotiation.direction) {
                 case SessionDirection.INGRESS:
                     session = new IngressSession(negotiation.remote_instance, negotiation.session_id.get_data(), negotiation.ping);
+                    stream = new StpInputStream((IngressSession)session);
                     break;
                 case SessionDirection.EGRESS:
                     session = new EgressSession(negotiation.remote_instance, negotiation.session_id.get_data(), negotiation.ping);
+                    stream = new StpOutputStream((EgressSession)session);
                     break;
             }
 
@@ -248,15 +251,15 @@ namespace LibPeer.Protocols.Stp {
                     if(sessions.has_key(negotiation.in_reply_to)) {
                         Session regarding = sessions.get(negotiation.in_reply_to);
                         if(regarding is EgressSession) {
-                            notify_app(() => ((EgressSession)regarding).received_reply((IngressSession)session));
+                            notify_app(() => ((EgressSession)regarding).received_reply((StpInputStream)stream));
                             break;
                         }
                         break;
                     }
-                    notify_app(() => incoming_stream(new StpInputStream((IngressSession)session)));
+                    notify_app(() => incoming_stream((StpInputStream)stream));
                     break;
                 case SessionDirection.EGRESS:
-                    notify_app(() => negotiation.established(new StpOutputStream((EgressSession)session)));
+                    notify_app(() => negotiation.established((StpOutputStream)stream));
                     break;
             }
             

+ 10 - 1
src/lib/Protocols/STP/Streams/InputStream.vala

@@ -11,6 +11,8 @@ namespace LibPeer.Protocols.Stp.Streams {
         private Cond data_cond = Cond();
         private Mutex data_mutex = Mutex();
 
+        private int pending_data = 0;
+
         public InstanceReference origin { get { return session.target; }}
         public uint8[] session_id { get { return session.identifier; }}
 
@@ -22,8 +24,11 @@ namespace LibPeer.Protocols.Stp.Streams {
 
         private void handle_data(uint8[] data) {
             //  print("*** HANDLE DATA START\n");
+            pending_data ++;
             data_mutex.lock();
+            //  print(@"InputStream:\n\tAdding \"$(new Util.ByteComposer().add_byte_array(data).to_escaped_string())\"\n\tTo: \"$(new Util.ByteComposer().add_byte_array(unread_data).to_escaped_string())\"\n");
             unread_data = new Util.ByteComposer().add_byte_array(unread_data).add_byte_array(data).to_byte_array();
+            pending_data --;
             data_cond.broadcast();
             data_mutex.unlock();
             //  print("*** HANDLE DATA RETURN\n");
@@ -44,7 +49,7 @@ namespace LibPeer.Protocols.Stp.Streams {
 
         public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
             data_mutex.lock();
-            while(unread_data.length < buffer.length && session.open) {
+            while(unread_data.length < buffer.length && (session.open && pending_data == 0)) {
                 data_cond.wait(data_mutex);
             }
             var available_data = unread_data.length < buffer.length ? unread_data.length : buffer.length;
@@ -52,8 +57,12 @@ namespace LibPeer.Protocols.Stp.Streams {
             for(int i = 0; i < available_data; i++) {
                 buffer[i] = unread_data[i];
             }
+            //  print(@"Read:\n\t\"$(new Util.ByteComposer().add_byte_array(buffer).to_escaped_string())\"\n\tof: \"$(new Util.ByteComposer().add_byte_array(unread_data).to_escaped_string())\"\n");
             unread_data = unread_data[available_data:unread_data.length];
             data_mutex.unlock();
+
+
+
             return available_data;
         }
 

+ 1 - 1
src/lib/Protocols/STP/Streams/OutputStream.vala

@@ -17,7 +17,7 @@ namespace LibPeer.Protocols.Stp.Streams {
 
         public StpOutputStream(EgressSession session) {
             this.session = session;
-            this.session.received_reply.connect(s => reply(new StpInputStream(s)));
+            this.session.received_reply.connect(s => reply(s));
         }
 
         public override bool close (GLib.Cancellable? cancellable) {

+ 13 - 0
src/lib/Util/ByteComposer.vala

@@ -42,6 +42,19 @@ namespace LibPeer.Util {
             add_byte(0);
             return (string)to_byte_array();
         }
+
+        public string to_escaped_string() {
+            var builder = new StringBuilder();
+            foreach (var byte in to_byte_array()) {
+                if(byte >= 32 && byte <= 126) {
+                    builder.append_unichar((unichar)byte);
+                }
+                else {
+                    builder.append(@"[$(byte)d]");
+                }
+            }
+            return builder.str;
+        }
     }
 
 }

+ 25 - 0
src/lib/Util/Streams.vala

@@ -0,0 +1,25 @@
+namespace LibPeer.Util {
+
+    public class StreamUtil {
+
+        public static DataInputStream get_data_input_stream(InputStream stream) {
+            if(stream is DataInputStream) {
+                return (DataInputStream)stream;
+            }
+            var dis = new DataInputStream(stream);
+            dis.set_byte_order(DataStreamByteOrder.BIG_ENDIAN);
+            return dis;
+        }
+
+        public static DataOutputStream get_data_output_stream(OutputStream stream) {
+            if(stream is DataOutputStream) {
+                return (DataOutputStream)stream;
+            }
+            var dos = new DataOutputStream(stream);
+            dos.set_byte_order(DataStreamByteOrder.BIG_ENDIAN);
+            return dos;
+        }
+
+    }
+
+}

+ 4 - 0
src/lib/meson.build

@@ -35,6 +35,9 @@ sources += files('Protocols/MX2/InstanceReference.vala')
 sources += files('Protocols/MX2/Packet.vala')
 sources += files('Protocols/MX2/PathInfo.vala')
 sources += files('Protocols/MX2/PathStrategy.vala')
+sources += files('Protocols/MX2/Fragment.vala')
+sources += files('Protocols/MX2/Fragmenter.vala')
+sources += files('Protocols/MX2/Assembler.vala')
 sources += files('Protocols/STP/StreamTransmissionProtocol.vala')
 sources += files('Protocols/STP/Negotiation.vala')
 sources += files('Protocols/STP/Retransmitter.vala')
@@ -66,6 +69,7 @@ sources += files('Util/QueueCommand.vala')
 sources += files('Util/ThreadTimer.vala')
 sources += files('Util/ConcurrentHashMap.vala')
 sources += files('Util/TimeoutMap.vala')
+sources += files('Util/Streams.vala')
 
 libpeer = library('peer', sources, dependencies: dependencies)
 libpeer_dep = declare_dependency(link_with: libpeer, include_directories: include_directories('.'))

+ 48 - 3
src/toys/discoverer/Discoverer.vala

@@ -1,6 +1,7 @@
 using LibPeer.Networks.Simulation;
 using LibPeer.Protocols.Mx2;
 using LibPeer.Protocols.Aip;
+using LibPeer.Protocols.Stp;
 using LibPeer.Networks;
 
 using Gee;
@@ -13,6 +14,7 @@ namespace Discoverer {
         private Network network;
         private ApplicationInformationProtocol aip;
         private ApplicationInformation app_info;
+        private StreamTransmissionProtocol stp;
         private Instance app_instance;
         private int id;
 
@@ -20,14 +22,21 @@ namespace Discoverer {
             this.id = id;
             network = net;
             network.bring_up();
-            print("Instansiate\n");
+            print("Instansiate AIP\n");
             aip = new ApplicationInformationProtocol(muxer);
             print("Add network\n");
             aip.add_network(network);
-
-            app_instance = new Instance("discovery_toy");
+            
+            print("Setup application instance\n");
+            app_instance = muxer.create_instance("discovery_toy");
             app_info = new ApplicationInformation.from_instance(app_instance);
+            app_info.resource_set.add(new Bytes(new uint8[32]));
             app_info.new_group_peer.connect(group_peers_found);
+            app_instance.incoming_greeting.connect(greeted_by_peer);
+
+            print("Instansiate STP\n");
+            stp = new StreamTransmissionProtocol(muxer, app_instance);
+            stp.incoming_stream.connect(ingress_stream_established);
             print("Add application\n");
             aip.add_application (app_info);
         }
@@ -39,6 +48,42 @@ namespace Discoverer {
 
         private void found_peer(InstanceInformation info) {
             print("[GOAL!] I found a peer!\n");
+            aip.find_application_resource(app_info, new Bytes(new uint8[32])).on_answer.connect(found_resource);
+        }
+
+        private void found_resource(InstanceInformation info) {
+            print("[GOAL!] I found a resource!\n");
+            var inquire_info = "Inquiring with:\n";
+            foreach (var item in info.connection_methods) {
+                inquire_info += @"\t$(item.to_string())\n";
+            }
+            print(inquire_info);
+            muxer.inquire(app_instance, info.instance_reference, info.connection_methods);
+        }
+
+        private void greeted_by_peer(InstanceReference origin) {
+            print("[GOAL!] I received a greeting!\n");
+            stp.initialise_stream(origin).established.connect(egress_stream_established);
+        }
+
+        private void egress_stream_established(OutputStream stream) {
+            print("[GOAL!] I established an egress stream to a peer!\n");
+            stream.write(new uint8[] { 13, 'H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd', '!'});
+            stream.close();
+        }
+
+        private void ingress_stream_established(InputStream stream) {
+            print("[GOAL!] An ingress stream has been established!\n");
+            var message_size = new uint8[1];
+            stream.read(message_size);
+
+            var message = new uint8[message_size[0]];
+            stream.read(message);
+
+            stream.close();
+
+            var message_str = new LibPeer.Util.ByteComposer().add_byte_array(message).to_string();
+            print(@"[GOAL!] I received a message from a peer: '$(message_str)'\n");
         }
 
     }