Browse Source

Working up to lambdas

Billy Barrow 4 years ago
parent
commit
cb3f650b71

+ 6 - 0
src/lib/Protocols/STP/Messages/RequestSession.vala

@@ -12,6 +12,12 @@ namespace LibPeer.Protocols.Stp.Messages {
 
         public uint64 timing { get; private set; }
 
+        public RequestSession(Bytes id, Bytes reply, uint8[] features) {
+            session_id = id;
+            in_reply_to = reply;
+            feature_codes = features;
+        }
+
         protected override void serialise_data (OutputStream stream) {
             DataOutputStream os = new DataOutputStream (stream);
             os.byte_order = DataStreamByteOrder.BIG_ENDIAN;

+ 9 - 9
src/lib/Protocols/STP/Negotiation.vala

@@ -5,24 +5,24 @@ using Gee;
 
 namespace LibPeer.Protocols.Stp {
 
-    internal class Negotiation {
-        public Bytes session_id { get; set; }
+    public class Negotiation {
+        public Bytes session_id { get; internal set; }
 
-        public Bytes in_reply_to { get; set; }
+        public Bytes in_reply_to { get; internal set; }
 
         public uint8[] feature_codes { get; set; }
 
-        public NegotiationState state { get; set; }
+        public NegotiationState state { get; internal set; }
 
-        public InstanceReference remote_instance { get; set; }
+        public InstanceReference remote_instance { get; internal set; }
 
-        public Retransmitter request_retransmitter { get; set; }
+        internal Retransmitter request_retransmitter { get; set; }
 
-        public Retransmitter negotiate_retransmitter { get; set;}
+        internal Retransmitter negotiate_retransmitter { get; set;}
 
-        public uint64 ping { get; set; }
+        public uint64 ping { get; internal set; }
 
-        public SessionDirection direction { get; set; }
+        public SessionDirection direction { get; internal set; }
 
         public signal void established(StpOutputStream stream);
 

+ 1 - 1
src/lib/Protocols/STP/Retransmitter.vala

@@ -18,7 +18,7 @@ namespace LibPeer.Protocols.Stp {
                 return false;
             }
 
-            if(last_called < get_monotonic_time() - interval) {
+            if(last_called < get_monotonic_time() - interval*1000) {
                 ttl--;
                 action(ttl);
                 last_called = get_monotonic_time();

+ 1 - 1
src/lib/Protocols/STP/Segments/Payload.vala

@@ -32,7 +32,7 @@ namespace LibPeer.Protocols.Stp.Segments {
         }
 
         public void reset_timing() {
-            timing = get_monotonic_time();
+            timing = get_monotonic_time()/1000;
         }
 
         public Payload(uint64 sequence_number, uint8[] data) {

+ 2 - 2
src/lib/Protocols/STP/Sessions/EgressSession.vala

@@ -121,7 +121,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
             }
 
             // Calculate a maximum time value for segments eligable to be resent
-            uint64 max_time = get_monotonic_time() - (uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
+            uint64 max_time = (get_monotonic_time()/1000) - (uint64)((worst_ping * Math.log10(redundant_resends + 10) * window_size) * 1000);
             
             // Do we have any in-flight segments to resend?
             foreach (var segment in in_flight.values) {
@@ -137,7 +137,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
         }
 
         public override Segments.Segment get_pending_segment() {
-            last_send = get_monotonic_time();
+            last_send = get_monotonic_time() / 1000;
             return base.get_pending_segment();
         }
 

+ 32 - 5
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -32,6 +32,33 @@ namespace LibPeer.Protocols.Stp {
             send_thread = new Thread<void>("STP Network Send Thread", send_loop);
         }
 
+        public Negotiation initialise_stream(InstanceReference target, uint8[] in_reply_to = EMPTY_REPLY_TO) {
+            // Initiate a stream with another peer
+            var session_id = new uint8[16];
+            UUID.generate_random(session_id);
+
+            // Start the negotiation
+            var negotiation = new Negotiation() {
+                session_id = new Bytes(session_id),
+                in_reply_to = new Bytes(in_reply_to),
+                feature_codes = {},
+                state = NegotiationState.REQUESTED,
+                remote_instance = target,
+                direction = SessionDirection.EGRESS
+            };
+            negotiations.set(negotiation.session_id, negotiation);
+
+            // Create the session request
+            var session_request = new RequestSession(negotiation.session_id, negotiation.in_reply_to, negotiation.feature_codes);
+
+            // Send the request
+            negotiation.request_retransmitter = new Retransmitter(10000, 12, i => this.send_packet(negotiation.remote_instance, s => session_request.serialise(s)));
+            retransmitters.add(negotiation.request_retransmitter);
+
+            // Return the negotiation object
+            return negotiation;
+        }
+
         private void handle_packet(Packet packet) {
             // We have a message, deserialise it
             var message = Message.deserialise(packet.stream);
@@ -81,7 +108,7 @@ namespace LibPeer.Protocols.Stp {
             var reply = new NegotiateSession(negotiation.session_id, {}, message.timing);
 
             // Repeatedly send the negotiation
-            negotiation.negotiate_retransmitter = new Retransmitter(10000, 12, i => send_packet(negotiation.remote_instance, s => reply.serialise(s)));
+            negotiation.negotiate_retransmitter = new Retransmitter(10000, 12, i => this.send_packet(negotiation.remote_instance, s => reply.serialise(s)));
             retransmitters.add(negotiation.negotiate_retransmitter);
         }
 
@@ -103,7 +130,7 @@ namespace LibPeer.Protocols.Stp {
             }
 
             // Set the ping value
-            negotiation.ping = get_monotonic_time() - message.reply_timing;
+            negotiation.ping = (get_monotonic_time()/1000) - message.reply_timing;
 
             // TODO features
 
@@ -152,7 +179,7 @@ namespace LibPeer.Protocols.Stp {
             negotiation.state = NegotiationState.ACCEPTED;
 
             // Set the ping value
-            negotiation.ping = get_monotonic_time() - message.reply_timing;
+            negotiation.ping = (get_monotonic_time()/1000) - message.reply_timing;
 
             // Setup the session
             setup_session(negotiation);
@@ -226,7 +253,7 @@ namespace LibPeer.Protocols.Stp {
             
         }
 
-        public void send_loop() {
+        private void send_loop() {
             // TODO: add a way to stop this
             while(true) {
                 foreach(var session in sessions.values) {
@@ -243,7 +270,7 @@ namespace LibPeer.Protocols.Stp {
             }
         }
 
-        public void notify_app(ThreadFunc<void> func) {
+        private void notify_app(ThreadFunc<void> func) {
             new Thread<void>("Application notification thread", func);
         }
     }

+ 3 - 0
src/lib/Protocols/STP/Streams/OutputStream.vala

@@ -9,8 +9,11 @@ namespace LibPeer.Protocols.Stp.Streams {
         public InstanceReference target { get { return session.target; }}
         public uint8[] session_id { get { return session.identifier; }}
 
+        public signal void reply(StpInputStream stream);
+
         public StpOutputStream(EgressSession session) {
             this.session = session;
+            this.session.received_reply.connect(s => reply(new StpInputStream(s)));
         }
 
         public override bool close (GLib.Cancellable? cancellable) {

+ 0 - 2
src/lib/Stream/Stream.vala

@@ -1,2 +0,0 @@
-
-

+ 109 - 0
src/toys/give_file/GiveFile.vala

@@ -0,0 +1,109 @@
+using LibPeer.Networks.Simulation;
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp;
+using LibPeer.Protocols.Stp.Streams;
+using LibPeer.Networks;
+
+using Gee;
+
+namespace GiveFile {
+
+    class FileGiver : Object {
+
+        private Muxer muxer;
+        private Network network;
+        private Instance instance;
+        private StreamTransmissionProtocol transport;
+        private string path;
+        private HashSet<InstanceReference> peers = new HashSet<InstanceReference>(r => r.hash(), (a, b) => a.compare(b) == 0);
+
+        public FileGiver(Conduit conduit, string file_path) {
+            muxer = new Muxer ();
+            network = conduit.get_interface ();
+            network.bring_up ();
+            muxer.register_network (network);
+            instance = muxer.create_instance ("GiveFile");
+            transport = new StreamTransmissionProtocol (muxer, instance);
+            path = file_path;
+
+            instance.incoming_greeting.connect((origin) => rx_greeting(origin));
+            network.incoming_advertisment.connect(rx_advertisement);
+            transport.incoming_stream.connect(incoming);
+            
+            network.advertise(instance.reference);
+            print(@"File giver created for '$path'\n");
+        }
+
+        void rx_advertisement(Advertisement adv) {
+            print("rx_advertisement\n");
+            if(!peers.contains(adv.instance_reference)) {
+                var peer_info = new GLib.List<PeerInfo>();
+                peer_info.append(adv.peer_info);
+                muxer.inquire(instance, adv.instance_reference, peer_info);
+            }
+        }
+
+        void rx_greeting(InstanceReference origin) {
+            print("rx_greeting\n");
+            peers.add(origin);
+            transport.initialise_stream(origin).established.connect(make_request);
+        }
+
+        void make_request(StpOutputStream stream) {
+            print("make_request\n");
+            stream.reply.connect(reply);
+            print("Asking peer to gib file\n");
+            stream.write({'G', 'i', 'b', ' ', 'f', 'i', 'l', 'e'});
+        }
+
+        void reply(StpInputStream stream) {
+            print("reply\n");
+            print("Peer gibs file...\n");
+            var reader = new DataInputStream(stream);
+            var size = reader.read_uint32();
+            var file = File.new_for_path(Uuid.string_random());
+            var file_stream = file.create(FileCreateFlags.PRIVATE);
+            uint8[] data = new uint8[size];
+            reader.read(data);
+            file_stream.write(data);
+            file_stream.flush();
+            file_stream.close();
+
+            print("Done\n");
+        }
+
+        void incoming(StpInputStream stream) {
+            print("incoming\n");
+            print("I have a new stream\n");
+            var magic = new uint8[8];
+            uint8[] expected_magic = {'G', 'i', 'b', ' ', 'f', 'i', 'l', 'e'};
+            stream.read(magic);
+            for(var i = 0; i < 8; i++) {
+                if(expected_magic[i] != magic[i]) {
+                    print("Peer did not ask me to gib file\n");
+                    return;
+                }
+            }
+
+            transport.initialise_stream(stream.target, stream.session_id).established.connect(send_file);
+        }
+
+        void send_file(StpOutputStream stream) {
+            print("send_file\n");
+            print("Sending my file\n");
+            var file = File.new_for_path(path);
+            var file_stream = file.read();
+            file_stream.seek(0, SeekType.END);
+            var size = file_stream.tell();
+            file_stream.seek(0, SeekType.SET);
+            var writer = new DataOutputStream(stream);
+            writer.put_uint32((uint32)size);
+            var buffer = new uint8[size];
+            file_stream.read(buffer);
+            stream.write(buffer);
+            file_stream.close();
+        }
+
+    }
+
+}

+ 23 - 0
src/toys/give_file/Main.vala

@@ -0,0 +1,23 @@
+using LibPeer.Networks.Simulation;
+
+namespace GiveFile {
+
+    class Main : Object {
+
+        public static int main(string[] args) {
+            print("Give File\n");
+
+            Conduit conduit = new Conduit();
+
+            FileGiver[] givers = new FileGiver[args.length-1];
+            for(int i = 1; i < args.length; i++) {
+                givers[i-1] = new FileGiver(conduit, args[i]);
+            }
+
+            while(true) {};
+
+            return 0;
+        }
+    }
+
+}

+ 12 - 0
src/toys/give_file/meson.build

@@ -0,0 +1,12 @@
+dependencies = [
+    dependency('glib-2.0'),
+    dependency('gobject-2.0'),
+    dependency('gio-2.0'),
+    dependency('gee-0.8'),
+    libpeer_dep
+]
+
+sources = files('Main.vala')
+sources += files('GiveFile.vala')
+
+executable('give_file', sources, dependencies: dependencies)

+ 2 - 1
src/toys/meson.build

@@ -1,2 +1,3 @@
 
-subdir('exponential_pinger')
+subdir('exponential_pinger')
+subdir('give_file')