Browse Source

Hide print statements, add PeerApplication class

Billy Barrow 3 years ago
parent
commit
797a3f777b

+ 79 - 0
src/lib/Application/Application.vala

@@ -0,0 +1,79 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Aip;
+using LibPeer.Protocols.Stp;
+using LibPeer.Protocols.Stp.Streams;
+using LibPeer.Networks;
+
+namespace LibPeer {
+
+    public abstract class PeerApplication : Object {
+
+        protected ApplicationInformationProtocol discoverer { get; private set; }
+        protected StreamTransmissionProtocol transport { get; private set; }
+        protected Muxer muxer { get; private set; }
+        protected ApplicationInformation information { get; private set; }
+        protected Instance instance { get; private set; }
+        protected Network[] networks { get; private set; }
+
+        public abstract string application_namespace { get; }
+
+        construct {
+            muxer = new Muxer ();
+            
+            networks = configure_networks();
+            discoverer = new ApplicationInformationProtocol(muxer);
+            
+            foreach (var network in networks) {
+                network.bring_up();
+                discoverer.add_network (network);
+            }
+
+            instance = muxer.create_instance (application_namespace);
+            information = new ApplicationInformation.from_instance (instance);
+            information.new_group_peer.connect(on_new_discovery_peer);
+            instance.incoming_greeting.connect(on_peer_available);
+            transport = new StreamTransmissionProtocol(muxer, instance);
+            transport.incoming_stream.connect(on_incoming_stream);
+            discoverer.add_application (information);
+        }
+
+        protected virtual Network[] configure_networks() {
+            return new Network[] { new IPv4.IPv4("0.0.0.0", IPv4.IPv4.find_free_port("0.0.0.0")) };
+        }
+
+        protected virtual void on_new_discovery_peer() {
+            find_any_peer();
+        }
+
+        protected virtual void on_peer_found(InstanceInformation peer) {
+            muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
+        }
+
+        protected abstract void on_peer_available(InstanceReference peer);
+
+        protected abstract void on_incoming_stream(StpInputStream stream);
+
+        protected Query find_any_peer() {
+            var query = discoverer.find_application_instance(information);
+            query.on_answer.connect(on_peer_found);
+            return query;
+        }
+
+        protected Query find_resource_peer(Bytes resource_id) requires (resource_id.length == 32) {
+            var query = discoverer.find_application_resource(information, resource_id);
+            query.on_answer.connect(on_peer_found);
+            return query;
+        }
+
+        protected Negotiation establish_stream(InstanceReference peer) throws Error {
+            return transport.initialise_stream(peer);
+        }
+
+        protected Negotiation reply_to_stream(StpInputStream stream) throws Error {
+            return transport.initialise_stream(stream.origin, stream.session_id);
+        }
+
+
+    }
+
+}

+ 18 - 1
src/lib/Networks/IPv4/IPv4.vala

@@ -214,7 +214,7 @@ namespace LibPeer.Networks.IPv4 {
                                 }
                                 else if(data[0] == "P2D") {
                                     try {
-                                        print(@"Lookup address to inquire: $(data[1])\n");
+                                        //  print(@"Lookup address to inquire: $(data[1])\n");
                                         var addresses = resolver.lookup_by_name(data[1]);
                                         foreach (var address in addresses) {
                                             inquire(address, int.parse(data[2]));
@@ -239,6 +239,23 @@ namespace LibPeer.Networks.IPv4 {
             socket.send_to(new InetSocketAddress(address, (uint16)port), new uint8[] { DGRAM_INQUIRE });
         }
 
+        public static uint16 find_free_port(string ip_address) {
+            uint16 port = 2000;
+            var address = new InetAddress.from_string(ip_address);
+            while(true) {
+                try {
+                    var socket_addr = new InetSocketAddress(address, port);
+                    var socket = new Socket(SocketFamily.IPV4, SocketType.DATAGRAM, SocketProtocol.UDP);
+                    socket.bind(socket_addr, false);
+                    socket.close();
+                    return port;
+                }
+                catch {
+                    port++;
+                }
+            }
+        }
+
     }
 
 }

+ 6 - 6
src/lib/Networks/PeerInfo.vala

@@ -24,29 +24,29 @@ namespace LibPeer.Networks
         public void serialise(OutputStream stream) throws IOError, Error {
             // Create a stream writer
             var writer = StreamUtil.get_data_output_stream(stream);
-            print("Start serialising PeerInfo\n");
+            //  print("Start serialising PeerInfo\n");
 
             // Get the informational data
             var type = get_network_identifier();
             var data = get_data_segment();
 
-            print("Serialising type length\n");
+            //  print("Serialising type length\n");
             // Write the length of the network type
             writer.put_byte((uint8)type.length);
 
-            print("Serialising data segment length\n");
+            //  print("Serialising data segment length\n");
             // Write the length of the data segment
             writer.put_byte((uint8)data.length);
 
             var stringType = new ByteComposer().add_bytes(type).to_string(true);
-            print(@"Serialising type: $(stringType) ($(to_string()))\n");
+            //  print(@"Serialising type: $(stringType) ($(to_string()))\n");
             // Write the network identifier
             writer.write_bytes(type);
 
-            print("Serialising data\n");
+            //  print("Serialising data\n");
             // Write the data
             writer.write_bytes(data);
-            print("Serialised peer info\n");
+            //  print("Serialised peer info\n");
             writer.flush();
         }
         

+ 1 - 1
src/lib/Protocols/AIP/AipCapabilities.vala

@@ -34,7 +34,7 @@ namespace LibPeer.Protocols.Aip {
             var dis = StreamUtil.get_data_input_stream(stream);
 
             var capability_count = dis.read_byte();
-            print(@"Reading $(capability_count) capabilities\n");
+            //  print(@"Reading $(capability_count) capabilities\n");
 
             for (var i = 0; i < capability_count; i++) {
                 var byte = dis.read_byte();

+ 2 - 2
src/lib/Protocols/AIP/Answer.vala

@@ -35,7 +35,7 @@ namespace LibPeer.Protocols.Aip {
 
             var data_length = dis.read_int32();
             var path_size = dis.read_byte();
-            print(@"Reading $(path_size) instance references\n");
+            //  print(@"Reading $(path_size) instance references\n");
 
             path = new InstanceReference[path_size];
 
@@ -43,7 +43,7 @@ namespace LibPeer.Protocols.Aip {
                 path[i] = new InstanceReference.from_stream(dis);
             }
 
-            print(@"Reading $(data_length) bytes of answer data\n");
+            //  print(@"Reading $(data_length) bytes of answer data\n");
 
             data = dis.read_bytes(data_length);
         }

+ 44 - 44
src/lib/Protocols/AIP/ApplicationInformationProtocol.vala

@@ -102,10 +102,10 @@ namespace LibPeer.Protocols.Aip {
 
             // Hook up signals
             new_group_peer.connect((instance_ref, id) => {
-                print("New group peer?\n");
+                // print("New group peer?\n");
                 if(id.compare(info.namespace_bytes) == 0) {
                     query_groups.get(info.namespace_bytes).add_peer(instance_ref);
-                    print("New group peer\n");
+                    // print("New group peer\n");
                     info.new_group_peer();
                 }
             });
@@ -148,7 +148,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_greeting(InstanceReference greeting) {
-            print("rx greeting\n");
+            // print("rx greeting\n");
             // Add to known peers
             discovered_peers.add(greeting);
 
@@ -159,7 +159,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_capabilities(InstanceReference target, AipCapabilities capabilities) {
-            print("rx capabilities\n");
+            // print("rx capabilities\n");
             // Save the capabilities
             instance_capabilities.set(target, capabilities);
 
@@ -175,7 +175,7 @@ namespace LibPeer.Protocols.Aip {
             }
             // Can we send queries and answers to this peer?
             if(capabilities.query_answer) {
-                print("This peer is queryable\n");
+                // print("This peer is queryable\n");
                 // Yes, add to default group
                 default_group.add_peer(target);
 
@@ -184,7 +184,7 @@ namespace LibPeer.Protocols.Aip {
 
                 // We now have a queryable peer
                 if(!is_ready) {
-                    print("Ready B)\n");
+                    // print("Ready B)\n");
                     is_ready = true;
                     ready();
                 }
@@ -199,13 +199,13 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_address(PeerInfo info) {
-            print("rx address\n");
+            // print("rx address\n");
             // We received peer info, add to our set
             peer_info.add(info);
             
             // Do we have any pending queries?
             if(pending_queries.size > 0) {
-                print("Sending pending queries");
+                // print("Sending pending queries");
                 // Clear the list
                 var queries = pending_queries;
                 pending_queries = new Gee.LinkedList<PendingQueryAnswer>();
@@ -218,32 +218,32 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void rx_peers(Gee.List<InstanceInformation> peers) {
-            print("rx peers\n");
+            // print("rx peers\n");
             // We received a list of peers running AIP, do we want more peers?
             if(!default_group.actively_connect) {
                 // Don't worry about it
-                print("rx peers: ignored\n");
+                // print("rx peers: ignored\n");
                 return;
             }
 
             // Send out inquries to the peers
             foreach (var peer in peers) {
-                print("rx peers: Inquire\n");
+                // print("rx peers: Inquire\n");
                 muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
             }
         }
 
         protected Request<PeerInfo> request_address(InstanceReference target) {
-            print("request address\n");
+            // print("request address\n");
             // Make the request
             var request = new ByteComposer().add_byte(REQUEST_ADDRESS).to_bytes();
             var peer_info_request = new Request<PeerInfo>();
             send_request(request, target).response.connect(s => {
-                print("Address response\n");
+                // print("Address response\n");
                 // Read the address (peer info)
                 var address = PeerInfo.deserialise(s);
                 // Callback
-                print("Address response signal called\n");
+                // print("Address response signal called\n");
                 peer_info_request.response(address);
             });
             return peer_info_request;
@@ -251,7 +251,7 @@ namespace LibPeer.Protocols.Aip {
 
         protected Request<AipCapabilities> request_capabilities(InstanceReference target) {
             // Make the request
-            print("Request capabilities\n");
+            // print("Request capabilities\n");
             var request_data = new ByteComposer().add_byte(REQUEST_CAPABILITIES).to_bytes();
             var request = new Request<AipCapabilities>();
             send_request(request_data, target).response.connect((s) => {
@@ -264,7 +264,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected Request<Gee.List<InstanceInformation>> request_peers(InstanceReference target) {
-            print("request peers\n");
+            // print("request peers\n");
             // Make the request
             var request_data = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
             var request = new Request<Gee.List<InstanceInformation>>();
@@ -308,26 +308,26 @@ namespace LibPeer.Protocols.Aip {
             var following = _following[0];
 
             if(following == DATA_FOLLOWING_ANSWER && capabilities.query_answer) {
-                print("RX Stream: Answer\n");
+                // print("RX Stream: Answer\n");
                 handle_answer(stream);
             }
             else if(following == DATA_FOLLOWING_QUERY && capabilities.query_answer) {
-                print("RX Stream: Query\n");
+                // print("RX Stream: Query\n");
                 handle_query(stream);
             }
             else if(following == DATA_FOLLOWING_REQUEST) {
-                print("RX Stream: Request\n");
+                // print("RX Stream: Request\n");
                 handle_request(stream);
             }
             else {
-                print(@"RX Stream: Invalid following $(following) (stream closed)\n");
+                // print(@"RX Stream: Invalid following $(following) (stream closed)\n");
                 stream.close();
             }
 
         }
 
         protected void handle_answer(InputStream stream) {
-            print("Handle query answer\n");
+            // print("Handle query answer\n");
             // Deserialise the answer
             var answer = new Answer.from_stream(stream);
 
@@ -350,7 +350,7 @@ namespace LibPeer.Protocols.Aip {
                 send_answer(answer);
             }
 
-            print("Answer handled!\n");
+            // print("Answer handled!\n");
         }
 
         protected void handle_request(StpInputStream stream) throws IOError, Error {
@@ -369,22 +369,22 @@ namespace LibPeer.Protocols.Aip {
             transport.initialise_stream(stream.origin, stream.session_id).established.connect(os => {
                 switch (request_type) {
                     case REQUEST_CAPABILITIES:
-                        print("I got a capabilities request\n");
+                        // print("I got a capabilities request\n");
                         capabilities.serialise(os);
                         break;
                     case REQUEST_ADDRESS:
-                        print("I got an address request\n");
+                        // print("I got an address request\n");
                         muxer.get_peer_info_for_instance(os.target).serialise(os);
                         break;
                     case REQUEST_PEERS:
-                        print("I got a peers request\n");
+                        // print("I got a peers request\n");
                         // TODO: implement
                         os.write(new uint8[] {0});
                         break;
                 }
-                print("Replied\n");
+                // print("Replied\n");
                 os.close();
-                print("Reply stream closed\n");
+                // print("Reply stream closed\n");
             });
 
             // Have we encountered this peer before?
@@ -426,7 +426,7 @@ namespace LibPeer.Protocols.Aip {
             var query_type = query_data[0];
 
             if(query_type == QUERY_GROUP) {
-                print("Handle query: Group\n");
+                // print("Handle query: Group\n");
                 // Get the group identifier
                 var group_id = new Bytes(query_data[1:query_data.length]);
 
@@ -446,7 +446,7 @@ namespace LibPeer.Protocols.Aip {
                 send_query(query, default_group);
             }
             else if(query_type == QUERY_APPLICATION) {
-                print("Handle query: Application\n");
+                // print("Handle query: Application\n");
                 // Get the application namespace
                 var app_namespace = new Bytes(query_data[1:query_data.length]);
 
@@ -466,7 +466,7 @@ namespace LibPeer.Protocols.Aip {
                 }
             }
             else if(query_type == QUERY_APPLICATION_RESOURCE) {
-                print("Handle query: Application resource\n");
+                // print("Handle query: Application resource\n");
                 // Read the label
                 var label = new Bytes(query_data[1:33]);
 
@@ -493,11 +493,11 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void queue_query_answer(Query query, InstanceReference reference) {
-            print("Queue query answer\n");
+            // print("Queue query answer\n");
             var query_answer = new PendingQueryAnswer(query, reference);
             // Do we have peer info to send yet?
             if(peer_info.size > 0) {
-                print("Query sent immediately\n");
+                // print("Query sent immediately\n");
                 // Yes, do it
                 send_query_answer(query_answer);
             }
@@ -513,7 +513,7 @@ namespace LibPeer.Protocols.Aip {
 
             // Serialise the info
             MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
-            print("Serialising instance info\n");
+            // print("Serialising instance info\n");
             instance_info.serialise(stream);
             stream.close();
             uint8[] buffer = stream.steal_data();
@@ -531,7 +531,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void join_query_group(Bytes group) {
-            print("Join query group\n");
+            // print("Join query group\n");
             // Create the query group
             query_groups.set(group, new QueryGroup());
 
@@ -581,7 +581,7 @@ namespace LibPeer.Protocols.Aip {
         }
 
         protected void send_query(Query query, QueryGroup group) {
-            print("Send query\n");
+            // print("Send query\n");
             // Does the query have any hops left?
             if(query.hops > MAX_QUERY_HOPS) {
                 return;
@@ -589,30 +589,30 @@ namespace LibPeer.Protocols.Aip {
 
             // Loop over each instance in the query group
             foreach (var instance_ref in group) {
-                print("Contacting peer for query\n");
+                // print("Contacting peer for query\n");
                 transport.initialise_stream(instance_ref).established.connect(stream => {
                     // Tell the instance that the data that follows is a query
-                    print("Query stream established\n");
+                    // print("Query stream established\n");
                     stream.write(new uint8[] { DATA_FOLLOWING_QUERY });
 
-                    print("Sending query body\n");
+                    // print("Sending query body\n");
                     
                     // Write the query
                     query.serialise(stream);
                     
                     // Close the stream
-                    print("Closing query stream\n");
+                    // print("Closing query stream\n");
                     stream.close();
-                    print("Query sent to peer\n");
+                    // print("Query sent to peer\n");
                 });
             }
         }
 
         protected void send_answer(Answer answer) {
             // Get (and remove) the last item from the path list
-            print(@"Before pop answer.path.length = $(answer.path.length)\n");
+            // print(@"Before pop answer.path.length = $(answer.path.length)\n");
             InstanceReference send_to = answer.pop_path();
-            print(@"After pop answer.path.length = $(answer.path.length)\n");
+            // print(@"After pop answer.path.length = $(answer.path.length)\n");
 
             // Don't send answers to queries we haven't received
             if(!query_response_count.has_key(answer.in_reply_to)) {
@@ -627,11 +627,11 @@ namespace LibPeer.Protocols.Aip {
 
             // Decrement response counter (stops at 0)
             query_response_count.set(answer.in_reply_to, response_count - 1);
-            print(@"Forwarding answer: send_to != null = $(send_to != null); answer.path.length = $(answer.path.length);\n");
+            // print(@"Forwarding answer: send_to != null = $(send_to != null); answer.path.length = $(answer.path.length);\n");
 
             // Open a stream with the instance
             transport.initialise_stream(send_to).established.connect(stream => {
-                print("Writing answer to stream\n");
+                // print("Writing answer to stream\n");
                 // Tell the instance that the data that follows is an answer
                 stream.write(new uint8[] { DATA_FOLLOWING_ANSWER });
 

+ 2 - 2
src/lib/Protocols/AIP/InstanceInformation.vala

@@ -24,7 +24,7 @@ namespace LibPeer.Protocols.Aip {
             // Write number of connection methods
             dos.put_byte((uint8)connection_methods.length);
 
-            print(@"$(connection_methods.length) Connection methods\n");
+            //  print(@"$(connection_methods.length) Connection methods\n");
             // Write connection methods
             foreach (var method in connection_methods) {
                 method.serialise(dos);
@@ -39,7 +39,7 @@ namespace LibPeer.Protocols.Aip {
 
             // Read number of connection methods
             var method_count = dis.read_byte();
-            print(@"Reading $(method_count) connection methods\n");
+            //  print(@"Reading $(method_count) connection methods\n");
 
             // Read conneciton methods
             connection_methods = new PeerInfo[method_count];

+ 0 - 10
src/lib/Protocols/AIP/Query.vala

@@ -32,9 +32,7 @@ namespace LibPeer.Protocols.Aip {
 
             // Serialise the return path
             foreach (var reference in return_path) {
-                print("Instance reference serialisation for return path begins\n");
                 reference.serialise(dos);
-                print("Instance reference serialisation for return path ends\n");
             }
 
             // Write the query data
@@ -45,19 +43,13 @@ namespace LibPeer.Protocols.Aip {
             var dis = StreamUtil.get_data_input_stream(stream);
 
             // Read the identifier
-            print("\tIdentifier\n");
             identifier = dis.read_bytes(16);
 
             // Read header data
-            print("\tHops\n");
             hops = dis.read_byte();
-            print("\tMax Replies\n");
             max_replies = dis.read_byte();
-            print("\tData length\n");
             var data_length = dis.read_uint16();
-            print(@"\tQuery data length $(data_length)\n");
             var return_path_size = dis.read_byte();
-            print(@"\tReturn path size $(return_path_size)\n");
 
             // Deserialise return path
             return_path = new InstanceReference[return_path_size];
@@ -65,11 +57,9 @@ namespace LibPeer.Protocols.Aip {
                 return_path[i] = new InstanceReference.from_stream(dis);
             }
 
-            print("\tRead query data\n");
 
             // Read the query data
             data = dis.read_bytes(data_length);
-            print(@"\tDone $(data.length)\n");
         }
 
         internal void append_return_hop(InstanceReference instance) {

+ 0 - 1
src/lib/Protocols/AIP/Request.vala

@@ -7,7 +7,6 @@ namespace LibPeer.Protocols.Aip {
         public bool completed { get; set; }
 
         public virtual signal void response(T data) {
-            print("Yeehaw\n");
             completed = true;
         }
     }

+ 1 - 1
src/lib/Protocols/STP/Sessions/Session.vala

@@ -40,7 +40,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
 
         protected virtual void close_session(string reason) {
             open = false;
-            print(@"[SESSION CLOSED] $(reason)\n");
+            //  print(@"[SESSION CLOSED] $(reason)\n");
             session_closed(reason);
         }
 

+ 9 - 0
src/lib/Util/ByteComposer.vala

@@ -24,6 +24,15 @@ namespace LibPeer.Util {
             return this;
         }
 
+        public ByteComposer add_string(string str, bool remove_null_termination = true) {
+            var data = (uint8[])str;
+            if(remove_null_termination) {
+                data = data[0:-1];
+            }
+            add_byte_array(data);
+            return this;
+        }
+
         public uint8[] to_byte_array() {
             uint8[] data = {};
             foreach (Bytes bytes in components) {

+ 1 - 0
src/lib/meson.build

@@ -70,6 +70,7 @@ sources += files('Util/ThreadTimer.vala')
 sources += files('Util/ConcurrentHashMap.vala')
 sources += files('Util/TimeoutMap.vala')
 sources += files('Util/Streams.vala')
+sources += files('Application/Application.vala')
 
 libpeer = library('peer', sources, dependencies: dependencies, install: true, install_dir: [true, true, true])
 libpeer_dep = declare_dependency(link_with: libpeer, include_directories: include_directories('.'))

+ 61 - 0
src/toys/hello_world/Main.vala

@@ -0,0 +1,61 @@
+using LibPeer;
+using LibPeer.Protocols.Stp.Streams;
+using LibPeer.Protocols.Mx2;
+using LibPeer.Util;
+
+namespace HelloWorldApp {
+
+    class Main : PeerApplication {
+
+        private uint8[] message = new uint8[] { 'H', 'e', 'l', 'l', 'o', ',', 'w', 'o', 'r', 'l', 'd', '!' };
+
+        public override string application_namespace { get { return "hello-world"; }}
+
+        public static int main(string[] args) {
+            var t = new Main();
+            while (true) {};
+            return 0;
+        }
+        
+        protected override void on_incoming_stream (StpInputStream stream) {
+            var message_length = new uint8[1];
+            stream.read (message_length);
+            var message = new uint8[message_length[0]];
+            stream.read(message);
+
+            print(@"A peer has made a connection to us! It has a message: \"$(new ByteComposer().add_byte_array(message).to_escaped_string())\"\n");
+
+            reply_to_stream(stream).established.connect (s => acknowledge_message(s));
+        }
+
+        protected override void on_peer_available (InstanceReference peer) {
+            print("A new peer is available!\n");
+            establish_stream (peer).established.connect (send_message);
+        }
+
+        private void send_message(StpOutputStream stream) {
+            print("I'm sending the peer a message\n");
+            var data = new ByteComposer().add_byte((uint8)message.length).add_byte_array(message).to_byte_array();
+            stream.write (data);
+            stream.close();
+            stream.reply.connect(on_reply);
+        }
+
+        private void on_reply(StpInputStream stream) {
+            var data = new uint8[6];
+            stream.read(data);
+            stream.close();
+            print(@"Remote peer says \"$(new ByteComposer().add_byte_array(data).to_escaped_string())\" for the message!\n");
+        }
+
+        private void acknowledge_message(StpOutputStream stream) {
+            stream.write (new uint8[] {'T', 'h', 'a', 'n', 'k', 's'});
+            stream.close();
+            print("I told them thanks for the message!\n");
+        }
+
+    }
+
+
+
+}

+ 11 - 0
src/toys/hello_world/meson.build

@@ -0,0 +1,11 @@
+dependencies = [
+    dependency('glib-2.0'),
+    dependency('gobject-2.0'),
+    dependency('gio-2.0'),
+    dependency('gee-0.8'),
+    libpeer_dep
+]
+
+sources = files('Main.vala')
+
+executable('hello_world', sources, dependencies: dependencies)

+ 2 - 1
src/toys/meson.build

@@ -2,4 +2,5 @@
 subdir('exponential_pinger')
 subdir('give_file')
 subdir('replyer')
-subdir('discoverer')
+subdir('discoverer')
+subdir('hello_world')