Explorar o código

Cleanup and bugfixes

Billy Barrow %!s(int64=3) %!d(string=hai) anos
pai
achega
a4d670ed64

+ 10 - 0
src/lib/Protocols/AIP/Answer.vala

@@ -49,5 +49,15 @@ namespace LibPeer.Protocols.Aip {
             data = dis.read_bytes(data_length);
         }
 
+        public InstanceReference pop_path() {
+            var reference = path[path.length - 1];
+            var old_path = path;
+            path = new InstanceReference[old_path.length - 1];
+            for(int i = 0; i < path.length; i++) {
+                path[i] = old_path[i];
+            }
+            return reference;
+        }
+
     }
 }

+ 21 - 19
src/lib/Protocols/AIP/ApplicationInformationProtocol.vala

@@ -93,9 +93,9 @@ namespace LibPeer.Protocols.Aip {
 
             // Hook up signals
             new_group_peer.connect((instance_ref, id) => {
-                //print("New group peer?\n");
+                print("New group peer?\n");
                 if(id.compare(info.namespace_bytes) == 0) {
-                    //print("New group peer\n");
+                    print("New group peer\n");
                     info.new_group_peer();
                 }
             });
@@ -165,7 +165,7 @@ namespace LibPeer.Protocols.Aip {
             }
             // Can we send queries and answers to this peer?
             if(capabilities.query_answer) {
-                //print("This peer is queryable\n");
+                print("This peer is queryable\n");
                 // Yes, add to default group
                 default_group.add_peer(target);
 
@@ -174,7 +174,7 @@ namespace LibPeer.Protocols.Aip {
 
                 // We now have a queryable peer
                 if(!is_ready) {
-                    //print("Ready B)\n");
+                    print("Ready B)\n");
                     is_ready = true;
                     ready();
                 }
@@ -212,19 +212,19 @@ namespace LibPeer.Protocols.Aip {
             // We received a list of peers running AIP, do we want more peers?
             if(!default_group.actively_connect) {
                 // Don't worry about it
-                //print("rx peers: ignored\n");
+                print("rx peers: ignored\n");
                 return;
             }
 
             // Send out inquries to the peers
             foreach (var peer in peers) {
-                //print("rx peers: Inquire\n");
+                print("rx peers: Inquire\n");
                 muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
             }
         }
 
         protected Request<PeerInfo> request_address(InstanceReference target) {
-            //print("request address\n");
+            print("request address\n");
             // Make the request
             var request = new ByteComposer().add_byte(REQUEST_ADDRESS).to_bytes();
             var peer_info_request = new Request<PeerInfo>();
@@ -241,7 +241,7 @@ namespace LibPeer.Protocols.Aip {
 
         protected Request<AipCapabilities> request_capabilities(InstanceReference target) {
             // Make the request
-            //print("Request capabilities\n");
+            print("Request capabilities\n");
             var request_data = new ByteComposer().add_byte(REQUEST_CAPABILITIES).to_bytes();
             var request = new Request<AipCapabilities>();
             send_request(request_data, target).response.connect((s) => {
@@ -254,7 +254,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected Request<Gee.List<InstanceInformation>> request_peers(InstanceReference target) {
-            //print("request peers\n");
+            print("request peers\n");
             // Make the request
             var request_data = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
             var request = new Request<Gee.List<InstanceInformation>>();
@@ -360,22 +360,22 @@ namespace LibPeer.Protocols.Aip {
             transport.initialise_stream(stream.origin, stream.session_id).established.connect(os => {
                 switch (request_type) {
                     case REQUEST_CAPABILITIES:
-                        //print("I got a capabilities request\n");
+                        print("I got a capabilities request\n");
                         capabilities.serialise(os);
                         break;
                     case REQUEST_ADDRESS:
-                        //print("I got an address request\n");
+                        print("I got an address request\n");
                         muxer.get_peer_info_for_instance(os.target).serialise(os);
                         break;
                     case REQUEST_PEERS:
-                        //print("I got a peers request\n");
+                        print("I got a peers request\n");
                         // TODO: implement
                         os.write(new uint8[] {0});
                         break;
                 }
-                //print("Replied\n");
+                print("Replied\n");
                 os.close();
-                //print("Reply stream closed\n");
+                print("Reply stream closed\n");
             });
 
             // Have we encountered this peer before?
@@ -521,7 +521,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void join_query_group(Bytes group) {
-            //print("Join query group\n");
+            print("Join query group\n");
             // Create the query group
             query_groups.set(group, new QueryGroup());
 
@@ -573,7 +573,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void send_query(Query query, QueryGroup group) {
-            //print("Send query\n");
+            print("Send query\n");
             // Does the query have any hops left?
             if(query.hops > MAX_QUERY_HOPS) {
                 return;
@@ -581,7 +581,7 @@ namespace LibPeer.Protocols.Aip {
 
             // Loop over each instance in the query group
             foreach (var instance_ref in group) {
-                //print("Contacting peer for query\n");
+                print("Contacting peer for query\n");
                 transport.initialise_stream(instance_ref).established.connect(stream => {
                     // Tell the instance that the data that follows is a query
                     print("Query stream established\n");
@@ -602,8 +602,9 @@ namespace LibPeer.Protocols.Aip {
 
         protected void send_answer(Answer answer) {
             // Get (and remove) the last item from the path list
-            var send_to = answer.path[answer.path.length-1];
-            answer.path.length --;
+            print(@"Before pop answer.path.length = $(answer.path.length)\n");
+            InstanceReference send_to = answer.pop_path();
+            print(@"After pop answer.path.length = $(answer.path.length)\n");
 
             // Don't send answers to queries we haven't received
             if(!query_response_count.has_key(answer.in_reply_to)) {
@@ -618,6 +619,7 @@ namespace LibPeer.Protocols.Aip {
 
             // Decrement response counter (stops at 0)
             query_response_count.set(answer.in_reply_to, response_count - 1);
+            print(@"Forwarding answer: send_to != null = $(send_to != null); answer.path.length = $(answer.path.length);\n");
 
             // Open a stream with the instance
             transport.initialise_stream(send_to).established.connect(stream => {

+ 5 - 0
src/lib/Protocols/AIP/Query.vala

@@ -32,7 +32,9 @@ namespace LibPeer.Protocols.Aip {
 
             // Serialise the return path
             foreach (var reference in return_path) {
+                print("Instance reference serialisation for return path begins\n");
                 reference.serialise(dos);
+                print("Instance reference serialisation for return path ends\n");
             }
 
             // Write the query data
@@ -76,6 +78,9 @@ namespace LibPeer.Protocols.Aip {
             var paths = return_path;
             return_path = new InstanceReference[paths.length + 1];
             return_path[paths.length] = instance;
+            for(int i = 0; i < paths.length; i++) {
+                return_path[i] = paths[i];
+            }
         }
 
         public Query(Bytes data, uint8 max_replies = 10, uint8 hops = 0, InstanceReference[] return_path = new InstanceReference[0], Bytes? identifier = null) {

+ 5 - 1
src/lib/Protocols/MX2/Muxer.vala

@@ -135,7 +135,7 @@ namespace LibPeer.Protocols.Mx2 {
             // Do we know the destination instance?
             if(!remote_instance_mapping.has_key(destination)) {
                 // No, throw an error
-                throw new IOError.HOST_NOT_FOUND("No knwon way to reach the specified instance");
+                throw new IOError.HOST_NOT_FOUND("No known way to reach the specified instance");
             }
 
             // Get access information
@@ -196,6 +196,8 @@ namespace LibPeer.Protocols.Mx2 {
                     path_info = frame.via.return_path
                 });
 
+                print(@"Saved instance mapping with address $(receiption.peer_info.to_string()) due to inquiry\n");
+
                 // Create the greeting
                 uint8[] greeting = new ByteComposer()
                     .add_byte(PACKET_GREET)
@@ -219,6 +221,8 @@ namespace LibPeer.Protocols.Mx2 {
                     path_info = frame.via.return_path
                 });
 
+                print(@"Saved instance mapping with address $(receiption.peer_info.to_string()) due to greeting\n");
+
                 // Get the inquiry id
                 Bytes inquiry_id = new Bytes(frame.payload[1:17]);
 

+ 2 - 0
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -213,6 +213,8 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
         protected override void close_session(string reason) {
             base.close_session(reason);
+            payload_queue = new AsyncQueue<Payload>();
+            in_flight.clear();
             var error = new IOError.CONNECTION_CLOSED("The session was closed before the segment was sent");
             foreach (var tracker in segment_trackers.values) {
                 tracker.fail(error);

+ 1 - 0
src/lib/Protocols/STP/Sessions/Session.vala

@@ -40,6 +40,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
         protected virtual void close_session(string reason) {
             open = false;
+            //  print(@"[SESSION CLOSED] $(reason)\n");
             session_closed(reason);
         }
 

+ 17 - 3
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -33,6 +33,10 @@ namespace LibPeer.Protocols.Stp {
         }
 
         public Negotiation initialise_stream(InstanceReference target, uint8[]? in_reply_to = null) {
+            if(muxer.get_peer_info_for_instance(target) == null) {
+                Posix.abort();
+            }
+
             // Initiate a stream with another peer
             var session_id = new uint8[16];
             UUID.generate_random(session_id);
@@ -263,10 +267,14 @@ namespace LibPeer.Protocols.Stp {
             while(true) {
                 foreach(var session in sessions.values) {
                     if(session.has_pending_segment()) {
-                        var segment = session.get_pending_segment();
-                        var message = new SegmentMessage(new Bytes(session.identifier), segment);
-                        send_packet(session.target, s => message.serialise(s));
+                        send_pending_segement(session);
                     }
+                    //  if(!session.open) {
+                    //      while(session.has_pending_segment()) {
+                    //          send_pending_segement(session);
+                    //      }
+                    //      sessions.unset(new Bytes(session.identifier));
+                    //  }
                 }
                 foreach (var retransmitter in retransmitters) {
                     if(!retransmitter.tick()) {
@@ -276,6 +284,12 @@ namespace LibPeer.Protocols.Stp {
             }
         }
 
+        private void send_pending_segement(Session session) {
+            var segment = session.get_pending_segment();
+            var message = new SegmentMessage(new Bytes(session.identifier), segment);
+            send_packet(session.target, s => message.serialise(s));
+        }
+
         private void notify_app(ThreadFunc<void> func) {
             new Thread<void>("Application notification thread", func);
         }

+ 1 - 1
src/lib/Util/ConcurrentHashMap.vala

@@ -27,7 +27,7 @@ namespace LibPeer.Util {
 
         public override void clear () {
             lock(_map) {
-                clear();
+                _map.clear();
             }
         }
 		/**

+ 1 - 1
src/toys/give_file/GiveFile.vala

@@ -65,7 +65,7 @@ namespace GiveFile {
             var file_stream = file.create(FileCreateFlags.PRIVATE);
             uint8[] hunk = new uint8[size/100];
             int hunks_received = 0;
-            while(hunks_received < size/100) {
+            while(hunks_received < 100) {
                 reader.read(hunk);
                 file_stream.write(hunk);
                 hunks_received++;