Browse Source

Big block of work done over however long it has been since I last did a commit

Billy Barrow 4 years ago
parent
commit
c74abb4ed5

+ 6 - 3
src/lib/Networks/PeerInfo.vala

@@ -5,7 +5,7 @@ using LibPeer.Util;
 namespace LibPeer.Networks
 {
     
-    public abstract class PeerInfo {
+    public abstract class PeerInfo : Object {
 
         private static ConcurrentHashMap<Bytes, Type> info_types = new ConcurrentHashMap<Bytes, Type>((a) => a.hash(), (a, b) => a.compare(b) == 0);
         
@@ -58,13 +58,16 @@ namespace LibPeer.Networks
             var network_type = reader.read_bytes(type_length);
 
             //  Get the info subclass
-            Type peer_info_type = info_types.get(network_type);
+            Type peer_info_type = typeof(UnknownPeerInfo);
+            if(info_types.has_key(network_type)) {
+                peer_info_type = info_types.get(network_type);
+            }
 
             // Create the peer info object
             PeerInfo peer_info = Object.new(peer_info_type) as PeerInfo;
 
             // Build out the data
-            peer_info.build(data_length, stream);
+            peer_info.build(data_length, reader);
 
             // Return the object
             return peer_info;

+ 1 - 1
src/lib/Protocols/AIP/ApplicationInformation.vala

@@ -10,7 +10,7 @@ namespace LibPeer.Protocols.Aip {
         public string application_namespace { get; protected set; }
 
         public Bytes namespace_bytes { owned get {
-            return new Bytes(((uint8[])application_namespace)[0:-2]);
+            return new Bytes(((uint8[])application_namespace)[0:application_namespace.length]);
         }}
 
         public HashSet<Bytes> resource_set = new Gee.HashSet<Bytes>((a) => a.hash(), (a, b) => a.compare(b) == 0);

+ 70 - 23
src/lib/Protocols/AIP/ApplicationInformationProtocol.vala

@@ -7,7 +7,7 @@ using Gee;
 
 namespace LibPeer.Protocols.Aip {
 
-    public class ApplicationInformationProtocol {
+    public class ApplicationInformationProtocol : Object{
 
         internal const uint8 DATA_FOLLOWING_REQUEST = 'R';
         internal const uint8 DATA_FOLLOWING_QUERY = 'Q';
@@ -30,7 +30,7 @@ namespace LibPeer.Protocols.Aip {
 
         protected AipCapabilities capabilities;
         protected bool join_all_groups = false;
-        protected Gee.List<ApplicationInformation> application_information;
+        protected Gee.List<ApplicationInformation> application_information = new Gee.LinkedList<ApplicationInformation>();
 
         protected Muxer muxer;
         protected Instance instance;
@@ -93,7 +93,9 @@ namespace LibPeer.Protocols.Aip {
 
             // Hook up signals
             new_group_peer.connect((instance_ref, id) => {
+                //print("New group peer?\n");
                 if(id.compare(info.namespace_bytes) == 0) {
+                    //print("New group peer\n");
                     info.new_group_peer();
                 }
             });
@@ -136,29 +138,34 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_greeting(InstanceReference greeting) {
+            print("rx greeting\n");
             // Add to known peers
             discovered_peers.add(greeting);
 
             // Request capabilities from the instance
-            request_capabilities(greeting, m => rx_capabilities(greeting, m));
+            request_capabilities(greeting).response.connect_after((m) => {
+                rx_capabilities(greeting, m);
+            });
         }
 
         protected void rx_capabilities(InstanceReference target, AipCapabilities capabilities) {
+            print("rx capabilities\n");
             // Save the capabilities
             instance_capabilities.set(target, capabilities);
 
             // Can we ask the peer for our address?
             if(capabilities.address_info) {
                 // Yes, do it
-                request_address(target, rx_address);
+                request_address(target).response.connect(rx_address);
             }
             // Can we ask the peer for other peers?
             if(capabilities.find_peers) {
                 // Yes, do it
-                request_peers(target, rx_peers);
+                request_peers(target).response.connect(rx_peers);
             }
             // Can we send queries and answers to this peer?
             if(capabilities.query_answer) {
+                //print("This peer is queryable\n");
                 // Yes, add to default group
                 default_group.add_peer(target);
 
@@ -167,6 +174,7 @@ namespace LibPeer.Protocols.Aip {
 
                 // We now have a queryable peer
                 if(!is_ready) {
+                    //print("Ready B)\n");
                     is_ready = true;
                     ready();
                 }
@@ -181,6 +189,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_address(PeerInfo info) {
+            print("rx address\n");
             // We received peer info, add to our set
             peer_info.add(info);
             
@@ -198,44 +207,57 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_peers(Gee.List<InstanceInformation> peers) {
+            print("rx peers\n");
             // We received a list of peers running AIP, do we want more peers?
             if(!default_group.actively_connect) {
                 // Don't worry about it
+                //print("rx peers: ignored\n");
                 return;
             }
 
             // Send out inquries to the peers
             foreach (var peer in peers) {
+                //print("rx peers: Inquire\n");
                 muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
             }
         }
 
-        protected void request_address(InstanceReference target, Func<PeerInfo> callback) {
+        protected Request<PeerInfo> request_address(InstanceReference target) {
+            //print("request address\n");
             // Make the request
             var request = new ByteComposer().add_byte(REQUEST_ADDRESS).to_bytes();
-            send_request(request, target, s => {
+            var peer_info_request = new Request<PeerInfo>();
+            send_request(request, target).response.connect(s => {
+                print("Address response\n");
                 // Read the address (peer info)
                 var address = PeerInfo.deserialise(s);
                 // Callback
-                callback(address);
+                print("Address response signal called\n");
+                peer_info_request.response(address);
             });
+            return peer_info_request;
         }
 
-        protected void request_capabilities(InstanceReference target, Func<AipCapabilities> callback) {
+        protected Request<AipCapabilities> request_capabilities(InstanceReference target) {
             // Make the request
-            var request = new ByteComposer().add_byte(REQUEST_CAPABILITIES).to_bytes();
-            send_request(request, target, s => {
+            //print("Request capabilities\n");
+            var request_data = new ByteComposer().add_byte(REQUEST_CAPABILITIES).to_bytes();
+            var request = new Request<AipCapabilities>();
+            send_request(request_data, target).response.connect((s) => {
                 // Read capabilities
                 var target_capabilities = new AipCapabilities.from_stream(s);
                 // Callback
-                callback(target_capabilities);
+                request.response(target_capabilities);
             });
+            return request;
         }
 
-        protected void request_peers(InstanceReference target, Func<Gee.List<InstanceInformation>> callback) {
+        protected Request<Gee.List<InstanceInformation>> request_peers(InstanceReference target) {
+            //print("request peers\n");
             // Make the request
-            var request = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
-            send_request(request, target, s => {
+            var request_data = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
+            var request = new Request<Gee.List<InstanceInformation>>();
+            send_request(request_data, target).response.connect(s => {
                 var dis = new DataInputStream(s);
                 dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
                 // Read number of peers
@@ -250,20 +272,23 @@ namespace LibPeer.Protocols.Aip {
                 }
 
                 // Callback
-                callback(peers);
+                request.response(peers);
             });
+            return request;
         }
 
-        protected void send_request(Bytes request, InstanceReference target, Func<InputStream> callback) {
+        protected Request<InputStream> send_request(Bytes request, InstanceReference target) {
+            var request_obj = new Request<InputStream>();
             // Open a stream with the peer
             transport.initialise_stream(target).established.connect((s) => {
                 // Connect reply signal
-                s.reply.connect(m => callback(m));
+                s.reply.connect(m => request_obj.response(m));
 
                 // Send the request
                 s.write(new ByteComposer().add_byte(DATA_FOLLOWING_REQUEST).add_bytes(request).to_byte_array());
                 s.close();
             });
+            return request_obj;
         }    
         
         protected void rx_stream(StpInputStream stream) {
@@ -273,21 +298,26 @@ namespace LibPeer.Protocols.Aip {
             var following = _following[0];
 
             if(following == DATA_FOLLOWING_ANSWER && capabilities.query_answer) {
+                print("RX Stream: Answer\n");
                 handle_answer(stream);
             }
             else if(following == DATA_FOLLOWING_QUERY && capabilities.query_answer) {
+                print("RX Stream: Query\n");
                 handle_query(stream);
             }
             else if(following == DATA_FOLLOWING_REQUEST) {
+                print("RX Stream: Request\n");
                 handle_request(stream);
             }
             else {
+                print("RX Stream: Invalid (stream closed)\n");
                 stream.close();
             }
 
         }
 
         protected void handle_answer(InputStream stream) {
+            print("Handle query answer\n");
             // Deserialise the answer
             var answer = new Answer.from_stream(stream);
 
@@ -326,17 +356,22 @@ namespace LibPeer.Protocols.Aip {
             transport.initialise_stream(stream.origin, stream.session_id).established.connect(os => {
                 switch (request_type) {
                     case REQUEST_CAPABILITIES:
+                        //print("I got a capabilities request\n");
                         capabilities.serialise(os);
                         break;
                     case REQUEST_ADDRESS:
+                        //print("I got an address request\n");
                         muxer.get_peer_info_for_instance(os.target).serialise(os);
                         break;
                     case REQUEST_PEERS:
+                        //print("I got a peers request\n");
                         // TODO: implement
                         os.write(new uint8[] {0});
                         break;
                 }
+                //print("Replied\n");
                 os.close();
+                //print("Reply stream closed\n");
             });
 
             // Have we encountered this peer before?
@@ -345,7 +380,7 @@ namespace LibPeer.Protocols.Aip {
                 discovered_peers.add(stream.origin);
 
                 // Ask for capabilities
-                request_capabilities(stream.origin, c => rx_capabilities(stream.origin, c));
+                request_capabilities(stream.origin).response.connect(c => rx_capabilities(stream.origin, c));
             }
         }
 
@@ -378,8 +413,9 @@ namespace LibPeer.Protocols.Aip {
             var query_type = query_data[0];
 
             if(query_type == QUERY_GROUP) {
+                print("Handle query: Group\n");
                 // Get the group identifier
-                var group_id = new Bytes(query_data[1:-1]);
+                var group_id = new Bytes(query_data[1:query_data.length]);
 
                 // Are we not in this group, but joining all?
                 if(join_all_groups && !query_groups.has_key(group_id)) {
@@ -397,8 +433,9 @@ namespace LibPeer.Protocols.Aip {
                 send_query(query, default_group);
             }
             else if(query_type == QUERY_APPLICATION) {
+                print("Handle query: Application\n");
                 // Get the application namespace
-                var app_namespace = new Bytes(query_data[1:-1]);
+                var app_namespace = new Bytes(query_data[1:query_data.length]);
 
                 // Are we in a group for this namespace?
                 if(query_groups.has_key(app_namespace)) {
@@ -416,11 +453,12 @@ namespace LibPeer.Protocols.Aip {
                 }
             }
             else if(query_type == QUERY_APPLICATION_RESOURCE) {
+                print("Handle query: Application resource\n");
                 // Read the label
                 var label = new Bytes(query_data[1:33]);
 
                 // Read the application namespace
-                var app_namespace = new Bytes(query_data[33:-1]);
+                var app_namespace = new Bytes(query_data[33:query_data.length]);
 
                 // Are we in a group for this namespace?
                 if(query_groups.has_key(app_namespace)) {
@@ -442,6 +480,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void queue_query_answer(Query query) {
+            print("Queue query answer\n");
             // Do we have peer info to send yet?
             if(peer_info.size > 0) {
                 // Yes, do it
@@ -476,6 +515,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void join_query_group(Bytes group) {
+            //print("Join query group\n");
             // Create the query group
             query_groups.set(group, new QueryGroup());
 
@@ -527,6 +567,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void send_query(Query query, QueryGroup group) {
+            //print("Send query\n");
             // Does the query have any hops left?
             if(query.hops > MAX_QUERY_HOPS) {
                 return;
@@ -534,15 +575,21 @@ namespace LibPeer.Protocols.Aip {
 
             // Loop over each instance in the query group
             foreach (var instance_ref in group) {
+                //print("Contacting peer for query\n");
                 transport.initialise_stream(instance_ref).established.connect(stream => {
                     // Tell the instance that the data that follows is a query
+                    print("Query stream established\n");
                     stream.write(new uint8[] { DATA_FOLLOWING_QUERY });
 
+                    print("Sending query body\n");
+                    
                     // Write the query
                     query.serialise(stream);
-
+                    
                     // Close the stream
+                    print("Closing query stream\n");
                     stream.close();
+                    print("Query sent to peer\n");
                 });
             }
         }

+ 12 - 2
src/lib/Protocols/AIP/Query.vala

@@ -39,18 +39,25 @@ namespace LibPeer.Protocols.Aip {
             dos.write_bytes(data);
         }
 
-        public Query.from_stream(InputStream stream) throws IOError, Error{
+        public Query.from_stream(InputStream stream){
             var dis = new DataInputStream(stream);
+            //  dis.buffer_size = 2;
             dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
 
             // Read the identifier
+            print("\tIdentifier\n");
             identifier = dis.read_bytes(16);
 
             // Read header data
+            print("\tHops\n");
             hops = dis.read_byte();
+            print("\tMax Replies\n");
             max_replies = dis.read_byte();
+            print("\tData length\n");
             var data_length = dis.read_uint16();
+            print(@"\tQuery data length $(data_length)\n");
             var return_path_size = dis.read_byte();
+            print(@"\tReturn path size $(return_path_size)\n");
 
             // Deserialise return path
             return_path = new InstanceReference[return_path_size];
@@ -58,8 +65,11 @@ namespace LibPeer.Protocols.Aip {
                 return_path[i] = new InstanceReference.from_stream(dis);
             }
 
+            print("\tRead query data\n");
+
             // Read the query data
-            data = stream.read_bytes(data_length);
+            data = dis.read_bytes(data_length);
+            print(@"\tDone $(data.length)\n");
         }
 
         internal void append_return_hop(InstanceReference instance) {

+ 14 - 0
src/lib/Protocols/AIP/Request.vala

@@ -0,0 +1,14 @@
+using LibPeer.Protocols.Mx2;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    public class Request<T> {
+        public bool completed { get; set; }
+
+        public virtual signal void response(T data) {
+            print("Yeehaw\n");
+            completed = true;
+        }
+    }
+}

+ 0 - 5
src/lib/Protocols/STP/MessageRetransmitter.vala

@@ -1,5 +0,0 @@
-
-
-namespace LibPeer.Protocols.Stp {
-
-}

+ 6 - 2
src/lib/Protocols/STP/Messages/NegotiateSession.vala

@@ -23,7 +23,9 @@ namespace LibPeer.Protocols.Stp.Messages {
             os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
             os.write (session_id.get_data ());
             os.put_byte ((uint8)feature_codes.length);
-            os.write (feature_codes);
+            if(feature_codes != null) {
+                os.write (feature_codes);
+            }
             os.put_uint64 (reply_timing);
             os.put_uint64 (get_monotonic_time ()/1000);
             os.flush ();
@@ -37,7 +39,9 @@ namespace LibPeer.Protocols.Stp.Messages {
             session_id = new Bytes(b_session_id);
             uint8 feature_count = ins.read_byte ();
             feature_codes = new uint8[feature_count];
-            ins.read(feature_codes);
+            if(feature_codes != null) {
+                ins.read(feature_codes);
+            }
             reply_timing = ins.read_uint64 ();
             timing = ins.read_uint64 ();
         }

+ 6 - 2
src/lib/Protocols/STP/Messages/RequestSession.vala

@@ -24,7 +24,9 @@ namespace LibPeer.Protocols.Stp.Messages {
             os.write (session_id.get_data());
             os.write (in_reply_to.get_data());
             os.put_byte ((uint8)feature_codes.length);
-            os.write (feature_codes);
+            if(feature_codes != null) {
+                os.write (feature_codes);
+            }
             os.put_uint64 (get_monotonic_time ()/1000);
             os.flush ();
         }
@@ -40,7 +42,9 @@ namespace LibPeer.Protocols.Stp.Messages {
             in_reply_to = new Bytes(b_in_reply_to);
             uint8 feature_count = ins.read_byte ();
             feature_codes = new uint8[feature_count];
-            ins.read(feature_codes);
+            if(feature_codes != null) {
+                ins.read(feature_codes);
+            }
             timing = ins.read_uint64 ();
         }
 

+ 1 - 0
src/lib/Protocols/STP/Segments/Acknowledgement.vala

@@ -9,6 +9,7 @@ namespace LibPeer.Protocols.Stp.Segments {
         public uint64 timing { get; private set; }
 
         protected override void serialise_data (OutputStream stream) {
+            //  print(@"***Ack segment $(sequence_number)\n");
             DataOutputStream os = new DataOutputStream (stream);
             os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
             os.put_uint64 (sequence_number);

+ 4 - 2
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -70,6 +70,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
         private void handle_acknowledgement(Acknowledgement segment) {
             // Is this segment still in-flight?
             if(!in_flight.has_key(segment.sequence_number)) {
+                //  print(@"***Redundant resend (segment $(segment.sequence_number))\n");
                 // No, we must have resent redundantly
                 redundant_resends++;
                 return;
@@ -123,12 +124,13 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
 
             // Calculate a maximum time value for segments eligable to be resent
-            uint64 max_time = (get_monotonic_time()/1000) - (uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
+            uint64 max_time = (get_monotonic_time()/1000) - 5000; //(uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
             
             // Do we have any in-flight segments to resend?
             foreach (var segment in in_flight.values) {
                 // Is the segment timing value less than the max time?
                 if(segment.timing != 0 && segment.timing < max_time) {
+                    print(@"***Resend segment $(segment.sequence_number)\n");
                     // Resend it
                     segment.reset_timing();
                     queue_segment(segment);
@@ -209,7 +211,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
         }
 
-        public override void close_session(string reason) {
+        protected override void close_session(string reason) {
             base.close_session(reason);
             var error = new IOError.CONNECTION_CLOSED("The session was closed before the segment was sent");
             foreach (var tracker in segment_trackers.values) {

+ 1 - 0
src/lib/Protocols/STP/Sessions/IngressSession.vala

@@ -83,6 +83,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
                 composer.add_byte_array(segment.data);
             }
 
+            //  print(@"$(next_expected_sequence_number) => $(sequence)\n");
             // Sequence is now the next expected sequence number
             next_expected_sequence_number = sequence;
 

+ 1 - 1
src/lib/Protocols/STP/Sessions/Session.vala

@@ -44,7 +44,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
         }
 
         public virtual void close() {
-            outgoing_segment_queue = new AsyncQueue<Segment>();
+            //  outgoing_segment_queue = new AsyncQueue<Segment>();
             queue_segment(new Control(ControlCommand.COMPLETE));
             close_session("Stream closed by local application");
         }

+ 1 - 1
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -45,7 +45,7 @@ namespace LibPeer.Protocols.Stp {
             var negotiation = new Negotiation() {
                 session_id = new Bytes(session_id),
                 in_reply_to = new Bytes(in_reply_to),
-                feature_codes = {},
+                feature_codes = new uint8[0],
                 state = NegotiationState.REQUESTED,
                 remote_instance = target,
                 direction = SessionDirection.EGRESS

+ 17 - 5
src/lib/Protocols/STP/Streams/InputStream.vala

@@ -17,13 +17,24 @@ namespace LibPeer.Protocols.Stp.Streams {
         public StpInputStream(IngressSession session) {
             this.session = session;
             session.incoming_app_data.connect(handle_data);
+            session.session_closed.connect(handle_close);
         }
 
         private void handle_data(uint8[] data) {
+            //  print("*** HANDLE DATA START\n");
             data_mutex.lock();
             unread_data = new Util.ByteComposer().add_byte_array(unread_data).add_byte_array(data).to_byte_array();
             data_cond.broadcast();
             data_mutex.unlock();
+            //  print("*** HANDLE DATA RETURN\n");
+        }
+
+        private void handle_close() {
+            //  print("*** HANDLE CLOSE START\n");
+            data_mutex.lock();
+            data_cond.broadcast();
+            data_mutex.unlock();
+            //  print("*** HANDLE DATA RETURN\n");
         }
 
         public override bool close (GLib.Cancellable? cancellable) {
@@ -33,16 +44,17 @@ namespace LibPeer.Protocols.Stp.Streams {
 
         public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
             data_mutex.lock();
-            while(unread_data.length < buffer.length) {
+            while(unread_data.length < buffer.length && session.open) {
                 data_cond.wait(data_mutex);
             }
-
-            for(int i = 0; i < buffer.length; i++) {
+            var available_data = unread_data.length < buffer.length ? unread_data.length : buffer.length;
+            print(@"Read $(available_data) of $(buffer.length) bytes\n");
+            for(int i = 0; i < available_data; i++) {
                 buffer[i] = unread_data[i];
             }
-            unread_data = unread_data[buffer.length:unread_data.length];
+            unread_data = unread_data[available_data:unread_data.length];
             data_mutex.unlock();
-            return buffer.length;
+            return available_data;
         }
 
     }

+ 15 - 0
src/lib/Protocols/STP/Streams/OutputStream.vala

@@ -9,6 +9,10 @@ namespace LibPeer.Protocols.Stp.Streams {
         public InstanceReference target { get { return session.target; }}
         public uint8[] session_id { get { return session.identifier; }}
 
+        Cond sendop_cond = Cond();
+        Mutex sendop_mutex = Mutex();
+        private int send_operations = 0;
+
         public signal void reply(StpInputStream stream);
 
         public StpOutputStream(EgressSession session) {
@@ -17,11 +21,17 @@ namespace LibPeer.Protocols.Stp.Streams {
         }
 
         public override bool close (GLib.Cancellable? cancellable) {
+            sendop_mutex.lock();
+            while(send_operations != 0) {
+                print("[STP] Waiting for operations to complete before closing stream\n");
+                sendop_cond.wait(sendop_mutex);
+            }
             session.close();
             return true;
         }
 
         public override ssize_t write(uint8[] buffer, GLib.Cancellable? cancellable = null) throws IOError {
+            send_operations++;
             Cond cond = Cond();
             Mutex mutex = Mutex();
             IOError error_result = null;
@@ -46,6 +56,11 @@ namespace LibPeer.Protocols.Stp.Streams {
                 cond.wait(mutex);
             }
 
+            send_operations--;
+            sendop_mutex.lock();
+            sendop_cond.broadcast();
+            sendop_mutex.unlock();
+
             if(error_result != null) {
                 throw error_result;
             }

+ 2 - 2
src/lib/meson.build

@@ -1,6 +1,6 @@
 vapi_dir = meson.current_source_dir() / 'vapi'
 
-add_project_arguments(['--disable-warnings','--vapidir', vapi_dir], language: 'vala')
+add_project_arguments(['--disable-warnings', '--enable-checking','--vapidir', vapi_dir], language: 'vala')
 
 dependencies = [
     dependency('glib-2.0'),
@@ -36,7 +36,6 @@ sources += files('Protocols/MX2/PathStrategy.vala')
 sources += files('Protocols/STP/StreamTransmissionProtocol.vala')
 sources += files('Protocols/STP/Negotiation.vala')
 sources += files('Protocols/STP/Retransmitter.vala')
-sources += files('Protocols/STP/MessageRetransmitter.vala')
 sources += files('Protocols/STP/Sessions/Session.vala')
 sources += files('Protocols/STP/Sessions/IngressSession.vala')
 sources += files('Protocols/STP/Sessions/EgressSession.vala')
@@ -59,6 +58,7 @@ sources += files('Protocols/AIP/QueryGroup.vala')
 sources += files('Protocols/AIP/Query.vala')
 sources += files('Protocols/AIP/InstanceInformation.vala')
 sources += files('Protocols/AIP/Answer.vala')
+sources += files('Protocols/AIP/Request.vala')
 sources += files('Util/ByteComposer.vala')
 sources += files('Util/QueueCommand.vala')
 sources += files('Util/ThreadTimer.vala')

+ 6 - 2
src/toys/discoverer/Discoverer.vala

@@ -18,23 +18,27 @@ namespace Discoverer {
 
         public DiscoverWorker(int id, Conduit conduit) throws Error, IOError {
             this.id = id;
-            network = conduit.get_interface();
+            network = conduit.get_interface (200, 400, 0.0f);
             network.bring_up();
+            print("Instansiate\n");
             aip = new ApplicationInformationProtocol(muxer);
+            print("Add network\n");
             aip.add_network(network);
 
             app_instance = new Instance("discovery_toy");
             app_info = new ApplicationInformation.from_instance(app_instance);
             app_info.new_group_peer.connect(group_peers_found);
+            print("Add application\n");
             aip.add_application (app_info);
         }
 
         private void group_peers_found() {
+            print("[GOAL!] Find application instance\n");
             aip.find_application_instance(app_info).on_answer.connect(found_peer);
         }
 
         private void found_peer(InstanceInformation info) {
-            print("I found a peer!\n");
+            print("[GOAL!] I found a peer!\n");
         }
 
     }

+ 1 - 1
src/toys/give_file/GiveFile.vala

@@ -21,7 +21,7 @@ namespace GiveFile {
         public FileGiver(Conduit conduit, string file_path) {
             loop = new MainLoop();
             muxer = new Muxer ();
-            network = conduit.get_interface (0, 0, 0.2f);
+            network = conduit.get_interface (0, 0, 0.0f);
             network.bring_up ();
             muxer.register_network (network);
             instance = muxer.create_instance ("GiveFile");

+ 1 - 0
src/toys/meson.build

@@ -1,4 +1,5 @@
 
 subdir('exponential_pinger')
 subdir('give_file')
+subdir('replyer')
 subdir('discoverer')

+ 24 - 0
src/toys/replyer/Main.vala

@@ -0,0 +1,24 @@
+using LibPeer.Networks.Simulation;
+
+namespace NumericReplyer {
+
+    class Main : Object {
+
+        public static int main(string[] args) {
+            print("Replyer\n");
+            int count = int.parse(args[1]);
+
+            Conduit conduit = new Conduit();
+
+            Replyer[] pingas = new Replyer[count];
+            for (int i = 0; i < count; i++){
+                pingas[i] = new Replyer(i, conduit);
+            }
+
+            while(true) {};
+
+            return 0;
+        }
+    }
+
+}

+ 107 - 0
src/toys/replyer/Replyer.vala

@@ -0,0 +1,107 @@
+using LibPeer.Networks.Simulation;
+using LibPeer.Protocols.Mx2;
+using LibPeer.Networks;
+using LibPeer.Protocols.Stp;
+using LibPeer.Protocols.Stp.Streams;
+
+using Gee;
+
+namespace NumericReplyer {
+
+    class Replyer : Object {
+
+        private Muxer muxer = new Muxer();
+        private Network network;
+        private Instance instance;
+        private ConcurrentSet<InstanceReference> peers = new ConcurrentSet<InstanceReference>((a, b) => a.compare(b));
+        private StreamTransmissionProtocol transport;
+        private int id;
+
+        public Replyer(int id, Conduit conduit) throws Error, IOError {
+            this.id = id;
+            network = conduit.get_interface (0, 0, 0.0f);
+            network.bring_up ();
+            muxer.register_network (network);
+            instance = muxer.create_instance ("GiveFile");
+            transport = new StreamTransmissionProtocol (muxer, instance);
+
+            instance.incoming_greeting.connect((origin) => rx_greeting(origin));
+            network.incoming_advertisment.connect(rx_advertisement);
+            transport.incoming_stream.connect(incoming);
+            
+            network.advertise(instance.reference);
+        }
+
+        void rx_advertisement(Advertisement adv) {
+            print("rx_advertisement\n");
+            if(!peers.contains(adv.instance_reference)) {
+                muxer.inquire(instance, adv.instance_reference, new PeerInfo[] {adv.peer_info});
+            }
+        }
+
+        void rx_greeting(InstanceReference origin) {
+            print("rx_greeting\n");
+            peers.add(origin);
+            transport.initialise_stream(origin).established.connect(s => {
+                s.reply.connect(sr => {
+                    var reply = new uint8[1];
+                    sr.read(reply);
+                    if(reply[0] == 2) {
+                        print("RX2\n");
+                        transport.initialise_stream(sr.origin, sr.session_id).established.connect(srr => {
+                           srr.reply.connect(srrr => {
+                               srrr.read(reply);
+                               if(reply[0] == 4) {
+                                   print("Yipee\n");
+                               }
+                               else {
+                                    print("Got invalid reply! (Level 1)\n");
+                               }
+                            });
+                            print("TX3\n");
+                            srr.write(new uint8[] {3});
+                        });
+                    }
+                    else {
+                        print("Got invalid reply! (Level 1)\n");
+                    }
+                });
+                print("TX1\n");
+                s.write(new uint8[] {1});
+            });
+        }
+
+        void incoming(StpInputStream stream) {
+            print("I have a new stream\n");
+            var data = new uint8[1];
+            stream.read(data);
+
+            if(data[0] == 1) {
+                print("RX1\n");
+                transport.initialise_stream(stream.origin, stream.session_id).established.connect(sr => {
+                    sr.reply.connect(srr => {
+                        srr.read(data);
+                        if(data[0] == 3) {
+                            print("RX3\n");
+                            transport.initialise_stream(srr.origin, srr.session_id).established.connect(srr => {
+                                print("TX4\n");
+                                srr.write(new uint8[] {4});
+                                print("My work is done\n");
+                            });
+                        }
+                        else {
+                            print("Got invalid reply\n");
+                        }
+                    });
+                    print("TX2\n");
+                    sr.write(new uint8[] {2});
+                });
+            }
+            else {
+                print("Got invalid initial data\n");
+            }
+        }
+
+    }
+
+}

+ 12 - 0
src/toys/replyer/meson.build

@@ -0,0 +1,12 @@
+dependencies = [
+    dependency('glib-2.0'),
+    dependency('gobject-2.0'),
+    dependency('gio-2.0'),
+    dependency('gee-0.8'),
+    libpeer_dep
+]
+
+sources = files('Main.vala')
+sources += files('Replyer.vala')
+
+executable('replyer', sources, dependencies: dependencies)