浏览代码

Untested large STP implementation

Billy Barrow 4 年之前
父节点
当前提交
1eda2ef37b

+ 35 - 0
src/lib/Protocols/STP/Messages/BeginSession.vala

@@ -0,0 +1,35 @@
+namespace LibPeer.Protocols.Stp.Messages {
+
+    public class BeginSession : Message {
+
+        protected override uint8 message_type { get { return MESSAGE_BEGIN_SESSION; } }
+
+        public Bytes session_id { get; private set; }
+
+        public uint64 reply_timing { get; private set; }
+
+        public BeginSession(Bytes id, uint64 timing) {
+            session_id = id;
+            reply_timing = timing;
+        }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.write (session_id.get_data ());
+            os.put_uint64 (reply_timing);
+            os.flush ();
+        }
+
+        public BeginSession.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var b_session_id = new uint8[16];
+            ins.read(b_session_id);
+            session_id = new Bytes(b_session_id);
+            reply_timing = ins.read_uint64 ();
+        }
+
+    }
+
+}

+ 41 - 0
src/lib/Protocols/STP/Messages/Message.vala

@@ -0,0 +1,41 @@
+
+namespace LibPeer.Protocols.Stp.Messages {
+
+    public const uint8 MESSAGE_REQUEST_SESSION = 0x05;
+    public const uint8 MESSAGE_NEGOTIATE_SESSION = 0x01;
+    public const uint8 MESSAGE_BEGIN_SESSION = 0x06;
+    public const uint8 MESSAGE_SEGMENT = 0x02;
+
+    public abstract class Message : Object {
+
+        protected abstract uint8 message_type { get; }
+
+        public void serialise(OutputStream stream) {
+            MemoryOutputStream buffer = new MemoryOutputStream(null, GLib.realloc, GLib.free);
+            buffer.write({message_type});
+            serialise_data(buffer);
+            stream.write(buffer.steal_data());
+        }
+
+        protected abstract void serialise_data(OutputStream stream);
+
+        public static Message deserialise(InputStream stream) {
+            uint8[] message = new uint8[1];
+            stream.read(message);
+            switch (message[0]) {
+                case MESSAGE_REQUEST_SESSION:
+                    return new RequestSession.from_stream(stream);
+                case MESSAGE_NEGOTIATE_SESSION:
+                    return new NegotiateSession.from_stream(stream);
+                case MESSAGE_BEGIN_SESSION:
+                    return new BeginSession.from_stream(stream);
+                case MESSAGE_SEGMENT:
+                    return new SegmentMessage.from_stream(stream);
+                default:
+                    assert_not_reached();
+            }
+        }
+
+    }
+
+}

+ 47 - 0
src/lib/Protocols/STP/Messages/NegotiateSession.vala

@@ -0,0 +1,47 @@
+namespace LibPeer.Protocols.Stp.Messages {
+
+    public class NegotiateSession : Message {
+
+        protected override uint8 message_type { get { return MESSAGE_NEGOTIATE_SESSION; } }
+
+        public Bytes session_id { get; private set; }
+
+        public uint64 reply_timing { get; private set; }
+
+        public uint8[] feature_codes { get; private set; }
+
+        public uint64 timing { get; private set; }
+
+        public NegotiateSession(Bytes id, uint8[] f_codes, uint64 timing) {
+            session_id = id;
+            feature_codes = f_codes;
+            reply_timing = timing;
+        }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.write (session_id.get_data ());
+            os.put_byte ((uint8)feature_codes.length);
+            os.write (feature_codes);
+            os.put_uint64 (reply_timing);
+            os.put_uint64 (get_monotonic_time ());
+            os.flush ();
+        }
+
+        public NegotiateSession.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var b_session_id = new uint8[16];
+            ins.read(b_session_id);
+            session_id = new Bytes(b_session_id);
+            uint8 feature_count = ins.read_byte ();
+            feature_codes = new uint8[feature_count];
+            ins.read(feature_codes);
+            reply_timing = ins.read_uint64 ();
+            timing = ins.read_uint64 ();
+        }
+
+    }
+
+}

+ 43 - 0
src/lib/Protocols/STP/Messages/RequestSession.vala

@@ -0,0 +1,43 @@
+namespace LibPeer.Protocols.Stp.Messages {
+
+    public class RequestSession : Message {
+
+        protected override uint8 message_type { get { return MESSAGE_REQUEST_SESSION; } }
+
+        public Bytes session_id { get; private set; }
+
+        public Bytes in_reply_to { get; private set; }
+
+        public uint8[] feature_codes { get; private set; }
+
+        public uint64 timing { get; private set; }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.write (session_id.get_data());
+            os.write (in_reply_to.get_data());
+            os.put_byte ((uint8)feature_codes.length);
+            os.write (feature_codes);
+            os.put_uint64 (get_monotonic_time ());
+            os.flush ();
+        }
+
+        public RequestSession.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var b_session_id = new uint8[16];
+            ins.read(b_session_id);
+            session_id = new Bytes(b_session_id);
+            var b_in_reply_to = new uint8[16];
+            ins.read(b_in_reply_to);
+            in_reply_to = new Bytes(b_in_reply_to);
+            uint8 feature_count = ins.read_byte ();
+            feature_codes = new uint8[feature_count];
+            ins.read(feature_codes);
+            timing = ins.read_uint64 ();
+        }
+
+    }
+
+}

+ 32 - 0
src/lib/Protocols/STP/Messages/SegmentMessage.vala

@@ -0,0 +1,32 @@
+using LibPeer.Protocols.Stp.Segments;
+
+namespace LibPeer.Protocols.Stp.Messages {
+
+    public class SegmentMessage : Message {
+
+        protected override uint8 message_type { get { return MESSAGE_SEGMENT; } }
+
+        public Bytes session_id { get; private set; }
+
+        public Segment segment { get; private set; }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.write (session_id.get_data ());
+            segment.serialise (os);
+            os.flush ();
+        }
+
+        public SegmentMessage.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            var b_session_id = new uint8[16];
+            ins.read(b_session_id);
+            session_id = new Bytes(b_session_id);
+            segment = Segment.deserialise (ins);
+        }
+
+    }
+
+}

+ 43 - 0
src/lib/Protocols/STP/Negotiation.vala

@@ -0,0 +1,43 @@
+
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Streams;
+using Gee;
+
+namespace LibPeer.Protocols.Stp {
+
+    internal class Negotiation {
+        public Bytes session_id { get; set; }
+
+        public Bytes in_reply_to { get; set; }
+
+        public uint8[] feature_codes { get; set; }
+
+        public NegotiationState state { get; set; }
+
+        public InstanceReference remote_instance { get; set; }
+
+        public Retransmitter request_retransmitter { get; set; }
+
+        public Retransmitter negotiate_retransmitter { get; set;}
+
+        public uint64 ping { get; set; }
+
+        public SessionDirection direction { get; set; }
+
+        public signal void established(StpOutputStream stream);
+
+        
+    }
+
+    public enum NegotiationState {
+        REQUESTED,
+        NEGOTIATED,
+        ACCEPTED
+    }
+
+    public enum SessionDirection {
+        INGRESS,
+        EGRESS
+    }
+
+}

+ 47 - 0
src/lib/Protocols/STP/Retransmitter.vala

@@ -0,0 +1,47 @@
+
+namespace LibPeer.Protocols.Stp {
+
+    internal class Retransmitter {
+
+        public uint64 interval { get; private set; }
+
+        public int ttl { get; private set; }
+
+        public uint64 last_called { get; private set; }
+
+        public Func<int> action { get; private set; }
+
+        public bool cancelled { get; private set; }
+
+        public bool tick() {
+            if(cancelled) {
+                return false;
+            }
+
+            if(last_called < get_monotonic_time() - interval) {
+                ttl--;
+                action(ttl);
+                last_called = get_monotonic_time();
+
+                if(ttl == 0) {
+                    cancel();
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public Retransmitter(uint64 interval, int times, Func<int> action) {
+            cancelled = false;
+            this.interval = interval;
+            ttl = times - 1;
+            this.action = action;
+        }
+
+    }
+
+}

+ 33 - 0
src/lib/Protocols/STP/Segments/Acknowledgement.vala

@@ -0,0 +1,33 @@
+namespace LibPeer.Protocols.Stp.Segments {
+
+    public class Acknowledgement : Segment {
+
+        protected override uint8 identifier { get { return SEGMENT_ACKNOWLEDGEMENT; } }
+
+        public uint64 sequence_number { get; private set; }
+
+        public uint64 timing { get; private set; }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.put_uint64 (sequence_number);
+            os.put_uint64 (timing);
+            os.flush ();
+        }
+
+        public Acknowledgement.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            sequence_number = ins.read_uint64 ();
+            timing = ins.read_uint64 ();
+        }
+
+        public Acknowledgement(Payload segment) {
+            sequence_number = segment.sequence_number;
+            timing = segment.timing;
+        }
+
+    }
+
+}

+ 60 - 0
src/lib/Protocols/STP/Segments/Control.vala

@@ -0,0 +1,60 @@
+namespace LibPeer.Protocols.Stp.Segments {
+
+    public class Control : Segment {
+
+        protected override uint8 identifier { get { return SEGMENT_CONTROL; } }
+
+        public ControlCommand command { get; private set; }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.put_byte(command.to_byte());
+            os.flush ();
+        }
+
+        public Control.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            command = ControlCommand.from_byte(ins.read_byte());
+        }
+
+        public Control(ControlCommand command) {
+            this.command = command;
+        }
+
+    }
+
+    public enum ControlCommand {
+        COMPLETE,
+        ABORT,
+        NOT_CONFIGURED;
+
+        public static ControlCommand from_byte(uint8 byte) {
+            switch(byte) {
+                case 0x04:
+                    return COMPLETE;
+                case 0x18:
+                    return ABORT;
+                case 0x15:
+                    return NOT_CONFIGURED;
+                default:
+                    assert_not_reached();
+            }
+        }
+
+        public uint8 to_byte() {
+            switch(this) {
+                case COMPLETE:
+                    return 0x04;
+                case ABORT:
+                    return 0x18;
+                case NOT_CONFIGURED:
+                    return 0x15;
+                default:
+                    assert_not_reached();
+            }
+        }
+    }
+
+}

+ 45 - 0
src/lib/Protocols/STP/Segments/Payload.vala

@@ -0,0 +1,45 @@
+namespace LibPeer.Protocols.Stp.Segments {
+
+    public class Payload : Segment {
+
+        protected override uint8 identifier { get { return SEGMENT_PAYLOAD; } }
+
+        public uint64 sequence_number { get; private set; }
+
+        public uint64 timing { get; private set; default = 0; }
+
+        public uint8[] data { get; private set; }
+
+        protected override void serialise_data (OutputStream stream) {
+            DataOutputStream os = new DataOutputStream (stream);
+            os.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            os.put_uint64 (sequence_number);
+            reset_timing();
+            os.put_uint64 (timing);
+            os.put_uint32 (data.length);
+            os.write (data);
+            os.flush ();
+        }
+
+        public Payload.from_stream(InputStream stream) {
+            DataInputStream ins = new DataInputStream (stream);
+            ins.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+            sequence_number = ins.read_uint64 ();
+            timing = ins.read_uint64 ();
+            uint32 data_length = ins.read_uint32 ();
+            data = new uint8[data_length];
+            ins.read(data);
+        }
+
+        public void reset_timing() {
+            timing = get_monotonic_time();
+        }
+
+        public Payload(uint64 sequence_number, uint8[] data) {
+            this.sequence_number = sequence_number;
+            this.data = data;
+        }
+
+    }
+
+}

+ 38 - 0
src/lib/Protocols/STP/Segments/Segment.vala

@@ -0,0 +1,38 @@
+
+namespace LibPeer.Protocols.Stp.Segments {
+
+    public const uint8 SEGMENT_ACKNOWLEDGEMENT = 0x06;
+    public const uint8 SEGMENT_CONTROL = 0x10;
+    public const uint8 SEGMENT_PAYLOAD = 0x0E;
+
+    public abstract class Segment : Object {
+
+        protected abstract uint8 identifier { get; }
+
+        public void serialise(OutputStream stream) {
+            MemoryOutputStream buffer = new MemoryOutputStream(null, GLib.realloc, GLib.free);
+            buffer.write({identifier});
+            serialise_data(buffer);
+            stream.write(buffer.steal_data());
+        }
+
+        protected abstract void serialise_data(OutputStream stream);
+
+        public static Segment deserialise(InputStream stream) {
+            uint8[] segment_type = new uint8[1];
+            stream.read(segment_type);
+            switch (segment_type[0]) {
+                case SEGMENT_ACKNOWLEDGEMENT:
+                    return new Acknowledgement.from_stream(stream);
+                case SEGMENT_CONTROL:
+                    return new Control.from_stream(stream);
+                case SEGMENT_PAYLOAD:
+                    return new Payload.from_stream(stream);
+                default:
+                    assert_not_reached();
+            }
+        }
+
+    }
+
+}

+ 253 - 0
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -0,0 +1,253 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Segments;
+using Gee;
+
+namespace LibPeer.Protocols.Stp.Sessions {
+
+    const int SEGMENT_PAYLOAD_SIZE = 16384;
+    const int METRIC_WINDOW_SIZE = 4;
+    const int MAX_WINDOW_SIZE = 65536;
+
+    public class EgressSession : Session {
+
+        private HashMap<uint64?, Payload> in_flight = new HashMap<uint64, Payload>();
+        private int in_flight_count = 0;
+
+        private HashMap<uint64?, SegmentTracker> segment_trackers = new HashMap<uint64, SegmentTracker>();
+
+        private ArrayList<uint64?> segment_trips = new ArrayList<uint64>();
+
+        protected AsyncQueue<Payload> payload_queue = new AsyncQueue<Payload>();
+
+        private int redundant_resends = 0;
+        private int window_size = METRIC_WINDOW_SIZE;
+        private uint64 best_ping = 0;
+        private uint64 worst_ping = 0;
+        private int adjustment_delta = 0;
+        private uint64 last_send = 0;
+
+        private uint64 next_sequence_number = 0;
+
+        public signal void received_reply(IngressSession session);
+
+        public EgressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
+            base(target, session_id, ping);
+            best_ping = ping;
+            worst_ping = ping;
+        }
+
+        public override void process_segment(Segment segment) {
+            // We have received a segment from the muxer
+            // Determine the segment type
+            if(segment is Acknowledgement) {
+                handle_acknowledgement((Acknowledgement)segment);
+                return;
+            }
+            if(segment is Control) {
+                handle_control((Control)segment);
+                return;
+            }
+            
+        }
+
+        private void handle_control(Control segment) {
+            // We have a control segment, what is it telling us?
+            switch(segment.command) {
+                case ControlCommand.COMPLETE:
+                    close_session("The remote peer completed the stream");
+                    break;
+                case ControlCommand.ABORT:
+                    close_session("The stream was aborted by the remote peer");
+                    break;
+                case ControlCommand.NOT_CONFIGURED:
+                    close_session("The remote peer claims to not know about this session");
+                    break;
+            }
+        }
+
+        private void handle_acknowledgement(Acknowledgement segment) {
+            // Is this segment still in-flight?
+            if(!in_flight.has_key(segment.sequence_number)) {
+                // No, we must have resent redundantly
+                redundant_resends++;
+                return;
+            }
+
+            // We have an acknowledgement segment, remove payload segment from in-flight
+            in_flight.unset(segment.sequence_number);
+            in_flight_count--;
+
+            // Do we have a tracking object for this?
+            if(segment_trackers.has_key(segment.sequence_number)) {
+                // Yes, notify it
+                segment_trackers.get(segment.sequence_number).complete_segment();
+            }
+
+            // What was the time difference?
+            var round_trip = get_monotonic_time() - segment.timing;
+
+            // Are we currently at metric window size?
+            if(window_size == METRIC_WINDOW_SIZE) {
+                // Yes, add round trip time to the list
+                segment_trips.add(round_trip);
+
+                // Do we have a sample?
+                if(segment_trips.size >= METRIC_WINDOW_SIZE) {
+                    // Update the ping based on the average of the metric segments
+                    uint64 avarage = 0;
+                    foreach (var ping in segment_trips) {
+                        avarage += ping;
+                    }
+                    avarage = avarage / segment_trips.size;
+                    best_ping = avarage;
+
+                    adjust_window_size(round_trip);
+                }
+                else {
+                    adjust_window_size(round_trip);
+                }
+            }
+            
+        }
+
+        public override bool has_pending_segment() {
+            // Do we have segments to queue, and room in our window to queue them?
+            if(payload_queue.length() > 0 && in_flight_count < window_size) {
+                // Yes, do it
+                var segment = payload_queue.pop();
+                in_flight.set(segment.sequence_number, segment);
+                in_flight_count++;
+                outgoing_segment_queue.push(segment);
+            }
+
+            // Calculate a maximum time value for segments eligable to be resent
+            uint64 max_time = get_monotonic_time() - (uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
+            
+            // Do we have any in-flight segments to resend?
+            foreach (var segment in in_flight.values) {
+                // Is the segment timing value less than the max time?
+                if(segment.timing != 0 && segment.timing < max_time) {
+                    // Resend it
+                    segment.reset_timing();
+                    queue_segment(segment);
+                }
+            }
+
+            return base.has_pending_segment();
+        }
+
+        public override Segments.Segment get_pending_segment() {
+            last_send = get_monotonic_time();
+            return base.get_pending_segment();
+        }
+
+        private void adjust_window_size(uint64 last_trip) {
+            uint64 last_trip_metric = last_trip / 1000;
+
+            // Is this the worst we have had?
+            if(worst_ping < last_trip) {
+                // Update worst ping metric
+                worst_ping = last_trip;
+            }
+
+            // Has the trip time gotten longer?
+            if (last_trip_metric > best_ping) {
+                // Yes, were we previously increasing the window size?
+                if(adjustment_delta > 0) {
+                    // Yes, stop increasing it
+                    adjustment_delta = 0;
+                }
+                // Were we keeping the window size consistant?
+                else if(adjustment_delta == 0) {
+                    adjustment_delta = -1;
+                }
+                // Were we previously decreasing it?
+                else if(adjustment_delta < 0) {
+                    adjustment_delta *= 2;
+                }
+            }
+            // Did the trip get shorter or stay the same?
+            else if (last_trip_metric <= best_ping) {
+                // Yes, were we previously increasing the window size?
+                if(adjustment_delta > 0) {
+                    // Yes, increase it some more
+                    adjustment_delta *= 2;
+                }
+                // Were we previously keeping the window size consistant?
+                if(adjustment_delta == 0) {
+                    // Yes, start incrrasing ituint64? key
+                    adjustment_delta = 1;
+                }
+                // Were we previosuly decreasing the window size?
+                if(adjustment_delta < 0) {
+                    // Yes, stop
+                    adjustment_delta = 0;
+                }
+            }
+
+            // Apply the delta
+            window_size += adjustment_delta;
+
+            // Is the window size now less than the metric size?
+            if(window_size < METRIC_WINDOW_SIZE) {
+                // Yes, reset it to the metric size
+                window_size = METRIC_WINDOW_SIZE;
+
+                // Update the delta so when we have our metric we can start increasing again
+                adjustment_delta = 1;
+
+                // Clear out our trip metrics
+                segment_trips.clear();
+            }
+            // Is the window size now bigger than the max window size?
+            if(window_size > MAX_WINDOW_SIZE) {
+                // Yes, cap it
+                window_size = MAX_WINDOW_SIZE;
+            }
+        }
+
+        public override void close_session(string reason) {
+            base.close_session(reason);
+            var error = new IOError.CONNECTION_CLOSED("The session was closed before the segment was sent");
+            foreach (var tracker in segment_trackers.values) {
+                tracker.fail(error);
+            }
+        }
+
+        public SegmentTracker queue_send(uint8[] data) throws IOError{
+            // Is the stream open?
+            if(!open) {
+                throw new IOError.CLOSED("Cannot send data: The stream is closed");
+            }
+
+            // Create a segment tracker
+            var tracker = new SegmentTracker();
+
+            // Get lock on payload queue
+            lock(payload_queue) {
+                // Calculate number of segments needed
+                int segment_count = data.length / SEGMENT_PAYLOAD_SIZE;
+                if (data.length % SEGMENT_PAYLOAD_SIZE != 0) {
+                    segment_count++;
+                }
+
+                if(segment_count == 0) {
+                    throw new IOError.INVALID_DATA("No data to send");
+                }
+
+                for(int i = 0; i < segment_count; i++) {
+                    // TODO run through features
+                    segment_trackers.set(next_sequence_number, tracker);
+                    tracker.add_segment();
+                    payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:(i+1)*SEGMENT_PAYLOAD_SIZE]));
+                    next_sequence_number ++;
+                }
+            }
+
+            // Return the tracker
+            return tracker;
+        }
+
+    }
+
+}

+ 92 - 0
src/lib/Protocols/STP/Sessions/IngressSession.vala

@@ -0,0 +1,92 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Segments;
+using Gee;
+
+namespace LibPeer.Protocols.Stp.Sessions {
+
+    public class IngressSession : Session {
+
+        private uint64 next_expected_sequence_number = 0;
+
+        private HashMap<uint64?, Payload> reconstruction = new HashMap<int, Payload>();
+
+        public signal void incoming_app_data(uint8[] data);
+
+        public IngressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
+            base(target, session_id, ping);
+        }
+
+        public override void process_segment(Segment segment) {
+            // We have received a segment from the muxer
+            // Determine the segment type
+            if(segment is Payload) {
+                handle_payload((Payload)segment);
+                return;
+            }
+            if(segment is Control) {
+                handle_control((Control)segment);
+                return;
+            }
+            
+        }
+
+        private void handle_payload(Payload segment) {
+            // TODO: Feature handling
+            // Is this a packet we are interested in?
+            if(next_expected_sequence_number <= segment.sequence_number) {
+                // Add to reconstruction dictionary
+                reconstruction.set(segment.sequence_number, segment);
+
+                // Is this the next expected sequence number?
+                if(next_expected_sequence_number == segment.sequence_number) {
+                    // Reconstruct the data
+                    incoming_app_data(complete_reconstruction());
+                }
+            }
+
+            // Send an acknowledgement to the segment
+            var acknowledgement = new Acknowledgement(segment);
+            queue_segment(acknowledgement);
+        }
+
+        private void handle_control(Control segment) {
+            // We have a control segment, what is it telling us?
+            switch(segment.command) {
+                case ControlCommand.COMPLETE:
+                    close_session("The remote peer completed the stream");
+                    break;
+                case ControlCommand.ABORT:
+                    close_session("The stream was aborted by the remote peer");
+                    break;
+                case ControlCommand.NOT_CONFIGURED:
+                    close_session("The remote peer claims to not know about this session");
+                    break;
+            }
+        }
+
+        private uint8[] complete_reconstruction() {
+            // Create a byte composer
+            var composer = new Util.ByteComposer();
+
+            // Start a counter
+            uint64 sequence = next_expected_sequence_number;
+            
+            // Loop until we don't have anything to reconstruct
+            for (;reconstruction.has_key(sequence); sequence++) {
+                // Get and remove the segment from the dictionary
+                Payload segment;
+                reconstruction.unset(sequence, out segment);
+                
+                // Compose
+                composer.add_byte_array(segment.data);
+            }
+
+            // Sequence is now the next expected sequence number
+            next_expected_sequence_number = sequence;
+
+            // Return the composed reconstruction
+            return composer.to_byte_array();
+        }
+    }
+
+}

+ 36 - 0
src/lib/Protocols/STP/Sessions/SegmentTracker.vala

@@ -0,0 +1,36 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Segments;
+using Gee;
+
+namespace LibPeer.Protocols.Stp.Sessions {
+
+    public class SegmentTracker {
+
+        public int pending_segment_count { get; private set; }
+
+        public int complete_segment_count { get; private set; }
+
+        public signal void on_complete();
+
+        public signal void on_error(IOError e);
+
+        internal void add_segment() {
+            pending_segment_count++;
+        }
+
+        internal void complete_segment() {
+            complete_segment_count++;
+            if(complete_segment_count == pending_segment_count) {
+                on_complete();
+            }
+        }
+
+        internal void fail(IOError e) {
+            if(complete_segment_count < pending_segment_count) {
+                on_error(e);
+            }
+        }
+
+    }
+
+}

+ 53 - 0
src/lib/Protocols/STP/Sessions/Session.vala

@@ -0,0 +1,53 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Segments;
+using Gee;
+
+namespace LibPeer.Protocols.Stp.Sessions {
+
+    public abstract class Session : Object {
+
+        protected AsyncQueue<Segment> outgoing_segment_queue = new AsyncQueue<Segment>();
+
+        public bool open { get; protected set; }
+
+        public signal void session_closed(string reason);
+
+        public uint8[] identifier { get; protected set; }
+
+        public uint64 initial_ping { get; protected set; }
+
+        public InstanceReference target { get; protected set; }
+
+        protected Session(InstanceReference target, uint8[] session_id, uint64 ping) {
+            this.target = target;
+            identifier = session_id;
+            initial_ping = ping;
+        }
+
+        public virtual bool has_pending_segment() {
+            return outgoing_segment_queue.length() > 0;
+        }
+
+        public virtual Segment get_pending_segment() {
+            return outgoing_segment_queue.pop();
+        }
+
+        protected void queue_segment(Segment segment) {
+            outgoing_segment_queue.push(segment);
+        }
+
+        public abstract void process_segment(Segment segment);
+
+        protected virtual void close_session(string reason) {
+            open = false;
+            session_closed(reason);
+        }
+
+        public virtual void close() {
+            outgoing_segment_queue = new AsyncQueue<Segment>();
+            queue_segment(new Control(ControlCommand.COMPLETE));
+            close_session("Stream closed by local application");
+        }
+    }
+
+}

+ 251 - 0
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -0,0 +1,251 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp.Messages;
+using LibPeer.Protocols.Stp.Sessions;
+using LibPeer.Protocols.Stp.Streams;
+using LibPeer.Util;
+using Gee;
+
+namespace LibPeer.Protocols.Stp {
+
+    public class StreamTransmissionProtocol {
+
+        public const uint8[] EMPTY_REPLY_TO = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+
+        private Muxer muxer { get; set; }
+
+        private Instance instance { get; set; }
+
+        public signal void incoming_stream(StpInputStream stream);
+
+        private ConcurrentHashMap<Bytes, Negotiation> negotiations = new ConcurrentHashMap<Bytes, Negotiation>(k => k.hash(), (a, b) => a.compare(b) == 0);
+
+        private ConcurrentHashMap<Bytes, Session> sessions = new ConcurrentHashMap<Bytes, Session>(k => k.hash(), (a, b) => a.compare(b) == 0);
+
+        private ArrayList<Retransmitter> retransmitters = new ArrayList<Retransmitter>();
+
+        private Thread<void> send_thread;
+
+        public StreamTransmissionProtocol(Muxer muxer, Instance instance) {
+            this.muxer = muxer;
+            this.instance = instance;
+            instance.incoming_payload.connect (handle_packet);
+            send_thread = new Thread<void>("STP Network Send Thread", send_loop);
+        }
+
+        private void handle_packet(Packet packet) {
+            // We have a message, deserialise it
+            var message = Message.deserialise(packet.stream);
+
+            // What type of message do we have?
+            if(message is SegmentMessage) {
+                handle_segment_message((SegmentMessage)message);
+                return;
+            }
+            if(message is RequestSession) {
+                handle_request_session(packet, (RequestSession)message);
+                return;
+            }
+            if(message is NegotiateSession) {
+                handle_negotiate_session((NegotiateSession)message);
+                return;
+            }
+            if(message is BeginSession) {
+                handle_begin_session((BeginSession)message);
+                return;
+            }
+        }
+
+        private void handle_request_session(Packet packet, RequestSession message) {
+            // Skip if we have already handled this request
+            if(negotiations.has_key(message.session_id)) {
+                return;
+            }
+
+            // A peer wants to initiate a session with us
+            // Create a negotiation object
+            var negotiation = new Negotiation() {
+                session_id = message.session_id,
+                in_reply_to = message.in_reply_to,
+                feature_codes = message.feature_codes,
+                state = NegotiationState.NEGOTIATED,
+                remote_instance = packet.origin,
+                direction = SessionDirection.INGRESS
+            };
+
+            // Add to negotiations
+            negotiations.set(negotiation.session_id, negotiation);
+
+            // TODO handle features
+
+            // Construct a reply
+            var reply = new NegotiateSession(negotiation.session_id, {}, message.timing);
+
+            // Repeatedly send the negotiation
+            negotiation.negotiate_retransmitter = new Retransmitter(10000, 12, i => send_packet(negotiation.remote_instance, s => reply.serialise(s)));
+            retransmitters.add(negotiation.negotiate_retransmitter);
+        }
+
+        private void handle_negotiate_session(NegotiateSession message) {
+            // We are getting a negotiation reply from a peer
+            // Do we have a negotiation open with this peer?
+            if(!negotiations.has_key(message.session_id)) {
+                // TODO send cleanup
+                return;
+            }
+
+            // Get the negotiation
+            var negotiation = negotiations.get(message.session_id);
+
+            // Cancel the retransmitter
+            if(negotiation.request_retransmitter != null){
+                negotiation.request_retransmitter.cancel();
+                negotiation.request_retransmitter = null;
+            }
+
+            // Set the ping value
+            negotiation.ping = get_monotonic_time() - message.reply_timing;
+
+            // TODO features
+
+            // Reply with a begin session message
+            var reply = new BeginSession(negotiation.session_id, message.timing);
+
+            // Send the reply
+            send_packet(negotiation.remote_instance, s => reply.serialise(s));
+
+            // Make sure the negotiation is in the right state
+            if(negotiation.state != NegotiationState.REQUESTED) {
+                return;
+            }
+
+            // Update the negotiation state
+            negotiation.state = NegotiationState.ACCEPTED;
+
+            // Setup the session
+            setup_session(negotiation);
+        }
+
+        private void handle_begin_session(BeginSession message) {
+            // We are getting a negotiation reply form a peer
+            // Do we have a negotiation open with this peer?
+            if(!negotiations.has_key(message.session_id)) {
+                // TODO send cleanup
+                return;
+            }
+
+            // Get the negotiation
+            var negotiation = negotiations.get(message.session_id);
+
+            // Cancel the retransmitter
+            if(negotiation.negotiate_retransmitter != null){
+                negotiation.negotiate_retransmitter.cancel();
+                negotiation.negotiate_retransmitter = null;
+            }
+
+            // Make sure the negotiation is in the right state
+            if(negotiation.state != NegotiationState.NEGOTIATED) {
+                // TODO send cleanup
+                return;
+            }
+
+            // Update the negotiation state
+            negotiation.state = NegotiationState.ACCEPTED;
+
+            // Set the ping value
+            negotiation.ping = get_monotonic_time() - message.reply_timing;
+
+            // Setup the session
+            setup_session(negotiation);
+
+            // Cleanup the negotiation;
+            negotiations.unset(negotiation.session_id);
+        }
+
+        private void handle_segment_message(SegmentMessage message) {
+            // Do we have a session open?
+            if(!sessions.has_key(message.session_id)) {
+                // Skip
+                return;
+            }
+
+            // Is there a valid negotiation still open?
+            if(negotiations.has_key(message.session_id)) {
+                // Cleanup the negotiation
+                negotiations.unset(message.session_id);
+            }
+
+            // Get the session
+            var session = sessions.get(message.session_id);
+
+            // Give the session the segment
+            session.process_segment(message.segment);
+        }
+
+        private void send_packet(InstanceReference target, Func<OutputStream> serialiser) throws IOError, Error{
+            MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
+            serialiser(stream);
+            stream.close();
+            uint8[] buffer = stream.steal_data();
+            buffer.length = (int)stream.get_data_size();
+            muxer.send(instance, target, buffer);
+        }
+
+        private void setup_session(Negotiation negotiation) {
+            // TODO feature stuff
+            // Create the session object
+            Session session = null;
+            switch (negotiation.direction) {
+                case SessionDirection.INGRESS:
+                    session = new IngressSession(negotiation.remote_instance, negotiation.session_id.get_data(), negotiation.ping);
+                    break;
+                case SessionDirection.EGRESS:
+                    session = new EgressSession(negotiation.remote_instance, negotiation.session_id.get_data(), negotiation.ping);
+                    break;
+            }
+
+            // Save the session
+            sessions.set(negotiation.session_id, session);
+
+            switch (negotiation.direction) {
+                case SessionDirection.INGRESS:
+                    // Was this in reply to another session?
+                    if(sessions.has_key(negotiation.in_reply_to)) {
+                        Session regarding = sessions.get(negotiation.in_reply_to);
+                        if(regarding is EgressSession) {
+                            notify_app(() => ((EgressSession)regarding).received_reply((IngressSession)session));
+                            break;
+                        }
+                        break;
+                    }
+                    notify_app(() => incoming_stream(new StpInputStream((IngressSession)session)));
+                    break;
+                case SessionDirection.EGRESS:
+                    notify_app(() => negotiation.established(new StpOutputStream((EgressSession)session)));
+                    break;
+            }
+            
+        }
+
+        public void send_loop() {
+            // TODO: add a way to stop this
+            while(true) {
+                foreach(var session in sessions.values) {
+                    if(session.has_pending_segment()) {
+                        var segment = session.get_pending_segment();
+                        send_packet(session.target, s => segment.serialise(s));
+                    }
+                }
+                foreach (var retransmitter in retransmitters) {
+                    if(!retransmitter.tick()) {
+                        retransmitters.remove(retransmitter);
+                    }
+                }
+            }
+        }
+
+        public void notify_app(ThreadFunc<void> func) {
+            new Thread<void>("Application notification thread", func);
+        }
+    }
+
+}

+ 50 - 0
src/lib/Protocols/STP/Streams/InputStream.vala

@@ -0,0 +1,50 @@
+using LibPeer.Protocols.Stp.Sessions;
+using LibPeer.Protocols.Mx2;
+
+namespace LibPeer.Protocols.Stp.Streams {
+
+    public class StpInputStream : InputStream {
+
+        private IngressSession session;
+
+        private uint8[] unread_data;
+        private Cond data_cond = Cond();
+        private Mutex data_mutex = Mutex();
+
+        public InstanceReference target { get { return session.target; }}
+        public uint8[] session_id { get { return session.identifier; }}
+
+        public StpInputStream(IngressSession session) {
+            this.session = session;
+            session.incoming_app_data.connect(handle_data);
+        }
+
+        private void handle_data(uint8[] data) {
+            data_mutex.lock();
+            unread_data = new Util.ByteComposer().add_byte_array(unread_data).add_byte_array(data).to_byte_array();
+            data_cond.broadcast();
+            data_mutex.unlock();
+        }
+
+        public override bool close (GLib.Cancellable? cancellable) {
+            session.close();
+            return true;
+        }
+
+        public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
+            data_mutex.lock();
+            while(unread_data.length < buffer.length) {
+                data_cond.wait(data_mutex);
+            }
+
+            for(int i = 0; i < buffer.length; i++) {
+                buffer[i] = unread_data[i];
+            }
+            unread_data = unread_data[buffer.length:unread_data.length];
+            data_mutex.unlock();
+            return buffer.length;
+        }
+
+    }
+
+}

+ 53 - 0
src/lib/Protocols/STP/Streams/OutputStream.vala

@@ -0,0 +1,53 @@
+using LibPeer.Protocols.Stp.Sessions;
+using LibPeer.Protocols.Mx2;
+
+namespace LibPeer.Protocols.Stp.Streams {
+
+    public class StpOutputStream : OutputStream {
+
+        private EgressSession session;
+        public InstanceReference target { get { return session.target; }}
+        public uint8[] session_id { get { return session.identifier; }}
+
+        public StpOutputStream(EgressSession session) {
+            this.session = session;
+        }
+
+        public override bool close (GLib.Cancellable? cancellable) {
+            session.close();
+            return true;
+        }
+
+        public override ssize_t write(uint8[] buffer, GLib.Cancellable? cancellable = null) throws IOError {
+            Cond cond = Cond();
+            Mutex mutex = Mutex();
+            IOError error_result = null;
+            bool complete = false;
+            var tracker = session.queue_send(buffer);
+            tracker.on_complete.connect(() => {
+                mutex.lock();
+                complete = true;
+                cond.broadcast();
+                mutex.unlock();
+            });
+            tracker.on_error.connect(e => {
+                mutex.lock();
+                error_result = e;
+                complete = true;
+                cond.broadcast();
+                mutex.unlock();
+            });
+            
+            mutex.lock();
+            while(!complete) {
+                cond.wait(mutex);
+            }
+
+            if(error_result != null) {
+                throw error_result;
+            }
+            return buffer.length;
+        }
+    }
+
+}

+ 4 - 12
src/lib/Util/ConcurrentHashMap.vala

@@ -58,9 +58,7 @@ namespace LibPeer.Util {
 		 * {@inheritDoc}
 		 */
 		public override Gee.MapIterator<K,V> map_iterator () {
-            lock(_map) {
-                return copy().map_iterator();
-            }
+            return copy().map_iterator();
         }
 
 		/**
@@ -83,17 +81,13 @@ namespace LibPeer.Util {
 		 * {@inheritDoc}
 		 */
 		public override Gee.Set<Gee.Map.Entry<K,V>> entries { owned get {
-            lock(_map) {
-                return copy().entries;
-            }
+            return copy().entries;
         } }
 		/**
 		 * {@inheritDoc}
 		 */
 		public override Gee.Set<K> keys { owned get {
-            lock(_map) {
-                return copy().keys;
-            }
+            return copy().keys;
         } }
 		/**
 		 * {@inheritDoc}
@@ -115,9 +109,7 @@ namespace LibPeer.Util {
 		 * {@inheritDoc}
 		 */
 		public override Gee.Collection<V> values { owned get {
-            lock(_map) {
-                return copy().values;
-            }
+            return copy().values;
         } }
 
     }

+ 20 - 1
src/lib/meson.build

@@ -11,7 +11,8 @@ dependencies = [
     meson.get_compiler('vala').find_library('uuid', dirs: vapi_dir),
     meson.get_compiler('c').find_library('uuid'),
     meson.get_compiler('vala').find_library('libsodium', dirs: vapi_dir),
-    meson.get_compiler('c').find_library('sodium')
+    meson.get_compiler('c').find_library('sodium'),
+    meson.get_compiler('c').find_library('m')
 ]
 
 sources = files('Networks/Advertisement.vala')
@@ -32,6 +33,24 @@ sources += files('Protocols/MX2/InstanceReference.vala')
 sources += files('Protocols/MX2/Packet.vala')
 sources += files('Protocols/MX2/PathInfo.vala')
 sources += files('Protocols/MX2/PathStrategy.vala')
+sources += files('Protocols/STP/StreamTransmissionProtocol.vala')
+sources += files('Protocols/STP/Negotiation.vala')
+sources += files('Protocols/STP/Retransmitter.vala')
+sources += files('Protocols/STP/Sessions/Session.vala')
+sources += files('Protocols/STP/Sessions/IngressSession.vala')
+sources += files('Protocols/STP/Sessions/EgressSession.vala')
+sources += files('Protocols/STP/Sessions/SegmentTracker.vala')
+sources += files('Protocols/STP/Segments/Segment.vala')
+sources += files('Protocols/STP/Segments/Acknowledgement.vala')
+sources += files('Protocols/STP/Segments/Control.vala')
+sources += files('Protocols/STP/Segments/Payload.vala')
+sources += files('Protocols/STP/Messages/Message.vala')
+sources += files('Protocols/STP/Messages/BeginSession.vala')
+sources += files('Protocols/STP/Messages/NegotiateSession.vala')
+sources += files('Protocols/STP/Messages/RequestSession.vala')
+sources += files('Protocols/STP/Messages/SegmentMessage.vala')
+sources += files('Protocols/STP/Streams/InputStream.vala')
+sources += files('Protocols/STP/Streams/OutputStream.vala')
 sources += files('Util/ByteComposer.vala')
 sources += files('Util/QueueCommand.vala')
 sources += files('Util/ThreadTimer.vala')