瀏覽代碼

Basic STP seems to be working

Billy Barrow 4 年之前
父節點
當前提交
d523526412

+ 1 - 2
src/lib/Networks/Simulation/NetSim.vala

@@ -49,6 +49,7 @@ namespace LibPeer.Networks.Simulation {
 
                     // Drop
                     if (Random.int_range(1, 100) == loss_frac * 100) {
+                        print("[NET] Drop!\n");
                         continue;
                     }
 
@@ -91,8 +92,6 @@ namespace LibPeer.Networks.Simulation {
             // Create the packet
             var packet = new Packet(peer_info, data);
 
-            //  print(@"NET: $(origin.get(0)) $(origin.get(1)) $(origin.get(2)) to $(identifier.get(0)) $(identifier.get(1)) $(identifier.get(2))\n");
-
             // Add packet to queue
             packet_queue.push(new QueueCommand<Packet>.with_payload(packet));
         }

+ 6 - 13
src/lib/Protocols/STP/Retransmitter.vala

@@ -1,17 +1,15 @@
 
 namespace LibPeer.Protocols.Stp {
 
-    internal class Retransmitter {
+    internal abstract class Retransmitter {
 
-        public uint64 interval { get; private set; }
+        public uint64 interval { get; protected set; }
 
-        public int ttl { get; private set; }
+        public int ttl { get; protected set; }
 
         public uint64 last_called { get; private set; }
 
-        public Func<int> action { get; private set; }
-
-        public bool cancelled { get; private set; }
+        public bool cancelled { get; protected set; }
 
         public bool tick() {
             if(cancelled) {
@@ -20,7 +18,7 @@ namespace LibPeer.Protocols.Stp {
 
             if(last_called < get_monotonic_time() - interval*1000) {
                 ttl--;
-                action(ttl);
+                do_task();
                 last_called = get_monotonic_time();
 
                 if(ttl == 0) {
@@ -35,12 +33,7 @@ namespace LibPeer.Protocols.Stp {
             cancelled = true;
         }
 
-        public Retransmitter(uint64 interval, int times, Func<int> action) {
-            cancelled = false;
-            this.interval = interval;
-            ttl = times - 1;
-            this.action = action;
-        }
+        protected abstract void do_task();
 
     }
 

+ 6 - 2
src/lib/Protocols/STP/Segments/Payload.vala

@@ -14,7 +14,7 @@ namespace LibPeer.Protocols.Stp.Segments {
             DataOutputStream os = new DataOutputStream (stream);
             os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
             os.put_uint64 (sequence_number);
-            reset_timing();
+            update_timing();
             os.put_uint64 (timing);
             os.put_uint32 (data.length);
             os.write (data);
@@ -31,10 +31,14 @@ namespace LibPeer.Protocols.Stp.Segments {
             ins.read(data);
         }
 
-        public void reset_timing() {
+        public void update_timing() {
             timing = get_monotonic_time()/1000;
         }
 
+        public void reset_timing() {
+            timing = 0;
+        }
+
         public Payload(uint64 sequence_number, uint8[] data) {
             this.sequence_number = sequence_number;
             this.data = data;

+ 10 - 6
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -1,5 +1,6 @@
 using LibPeer.Protocols.Mx2;
 using LibPeer.Protocols.Stp.Segments;
+using LibPeer.Util;
 using Gee;
 
 namespace LibPeer.Protocols.Stp.Sessions {
@@ -10,12 +11,12 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
     public class EgressSession : Session {
 
-        private HashMap<uint64?, Payload> in_flight = new HashMap<uint64, Payload>();
+        private ConcurrentHashMap<uint64?, Payload> in_flight = new ConcurrentHashMap<uint64?, Payload>(i => (uint)i, (a, b) => a == b);
         private int in_flight_count = 0;
 
-        private HashMap<uint64?, SegmentTracker> segment_trackers = new HashMap<uint64, SegmentTracker>();
+        private ConcurrentHashMap<uint64?, SegmentTracker> segment_trackers = new ConcurrentHashMap<uint64?, SegmentTracker>(i => (uint)i, (a, b) => a == b);
 
-        private ArrayList<uint64?> segment_trips = new ArrayList<uint64>();
+        private ArrayList<uint64?> segment_trips = new ArrayList<uint64?>();
 
         protected AsyncQueue<Payload> payload_queue = new AsyncQueue<Payload>();
 
@@ -34,6 +35,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
             base(target, session_id, ping);
             best_ping = ping;
             worst_ping = ping;
+            open = true;
         }
 
         public override void process_segment(Segment segment) {
@@ -72,7 +74,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
                 redundant_resends++;
                 return;
             }
-
+            
             // We have an acknowledgement segment, remove payload segment from in-flight
             in_flight.unset(segment.sequence_number);
             in_flight_count--;
@@ -84,7 +86,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
 
             // What was the time difference?
-            var round_trip = get_monotonic_time() - segment.timing;
+            var round_trip = (get_monotonic_time()/1000) - segment.timing;
 
             // Are we currently at metric window size?
             if(window_size == METRIC_WINDOW_SIZE) {
@@ -130,6 +132,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
                     // Resend it
                     segment.reset_timing();
                     queue_segment(segment);
+                    break;
                 }
             }
 
@@ -239,7 +242,8 @@ namespace LibPeer.Protocols.Stp.Sessions {
                     // TODO run through features
                     segment_trackers.set(next_sequence_number, tracker);
                     tracker.add_segment();
-                    payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:(i+1)*SEGMENT_PAYLOAD_SIZE]));
+                    int payload_size = data.length < (i+1)*SEGMENT_PAYLOAD_SIZE ? data.length : (i+1)*SEGMENT_PAYLOAD_SIZE;
+                    payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:payload_size]));
                     next_sequence_number ++;
                 }
             }

+ 3 - 1
src/lib/Protocols/STP/Sessions/IngressSession.vala

@@ -1,5 +1,6 @@
 using LibPeer.Protocols.Mx2;
 using LibPeer.Protocols.Stp.Segments;
+using LibPeer.Util;
 using Gee;
 
 namespace LibPeer.Protocols.Stp.Sessions {
@@ -8,12 +9,13 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
         private uint64 next_expected_sequence_number = 0;
 
-        private HashMap<uint64?, Payload> reconstruction = new HashMap<int, Payload>();
+        private ConcurrentHashMap<uint64?, Payload> reconstruction = new ConcurrentHashMap<uint64?, Payload>(i => (uint)i, (a, b) => a == b);
 
         public signal void incoming_app_data(uint8[] data);
 
         public IngressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
             base(target, session_id, ping);
+            open = true;
         }
 
         public override void process_segment(Segment segment) {

+ 30 - 7
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -21,7 +21,7 @@ namespace LibPeer.Protocols.Stp {
 
         private ConcurrentHashMap<Bytes, Session> sessions = new ConcurrentHashMap<Bytes, Session>(k => k.hash(), (a, b) => a.compare(b) == 0);
 
-        private ArrayList<Retransmitter> retransmitters = new ArrayList<Retransmitter>();
+        private GLib.List<Retransmitter> retransmitters = new GLib.List<Retransmitter>();
 
         private Thread<void> send_thread;
 
@@ -52,8 +52,8 @@ namespace LibPeer.Protocols.Stp {
             var session_request = new RequestSession(negotiation.session_id, negotiation.in_reply_to, negotiation.feature_codes);
 
             // Send the request
-            negotiation.request_retransmitter = new Retransmitter(10000, 12, i => this.send_packet(negotiation.remote_instance, s => session_request.serialise(s)));
-            retransmitters.add(negotiation.request_retransmitter);
+            negotiation.request_retransmitter = new MessageRetransmitter(this, negotiation.remote_instance, session_request);
+            retransmitters.append(negotiation.request_retransmitter);
 
             // Return the negotiation object
             return negotiation;
@@ -108,8 +108,8 @@ namespace LibPeer.Protocols.Stp {
             var reply = new NegotiateSession(negotiation.session_id, {}, message.timing);
 
             // Repeatedly send the negotiation
-            negotiation.negotiate_retransmitter = new Retransmitter(10000, 12, i => this.send_packet(negotiation.remote_instance, s => reply.serialise(s)));
-            retransmitters.add(negotiation.negotiate_retransmitter);
+            negotiation.negotiate_retransmitter = new MessageRetransmitter(this, negotiation.remote_instance, reply);
+            retransmitters.append(negotiation.negotiate_retransmitter);
         }
 
         private void handle_negotiate_session(NegotiateSession message) {
@@ -138,7 +138,7 @@ namespace LibPeer.Protocols.Stp {
             var reply = new BeginSession(negotiation.session_id, message.timing);
 
             // Send the reply
-            send_packet(negotiation.remote_instance, s => reply.serialise(s));
+            this.retransmitters.append(new MessageRetransmitter(this, negotiation.remote_instance, reply));
 
             // Make sure the negotiation is in the right state
             if(negotiation.state != NegotiationState.REQUESTED) {
@@ -211,6 +211,7 @@ namespace LibPeer.Protocols.Stp {
         private void send_packet(InstanceReference target, Func<OutputStream> serialiser) throws IOError, Error{
             MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
             serialiser(stream);
+            stream.flush();
             stream.close();
             uint8[] buffer = stream.steal_data();
             buffer.length = (int)stream.get_data_size();
@@ -259,7 +260,8 @@ namespace LibPeer.Protocols.Stp {
                 foreach(var session in sessions.values) {
                     if(session.has_pending_segment()) {
                         var segment = session.get_pending_segment();
-                        send_packet(session.target, s => segment.serialise(s));
+                        var message = new SegmentMessage(new Bytes(session.identifier), segment);
+                        send_packet(session.target, s => message.serialise(s));
                     }
                 }
                 foreach (var retransmitter in retransmitters) {
@@ -273,6 +275,27 @@ namespace LibPeer.Protocols.Stp {
         private void notify_app(ThreadFunc<void> func) {
             new Thread<void>("Application notification thread", func);
         }
+
+        private class MessageRetransmitter : Retransmitter {
+
+            private StreamTransmissionProtocol stp;
+
+            private Message message;
+
+            private InstanceReference target;
+    
+            protected override void do_task () {
+                stp.send_packet(target, s => message.serialise(s));
+            }
+    
+            public MessageRetransmitter(StreamTransmissionProtocol stp, InstanceReference target, Message message, uint64 interval = 10000, int repeat = 12) {
+                this.stp = stp;
+                this.target = target;
+                this.message = message;
+                this.ttl = repeat;
+                this.interval = interval;
+            }
+        }
     }
 
 }