Browse Source

Improve MX2 Frame format

Billy Barrow 3 năm trước cách đây
mục cha
commit
5cd8e47cac

+ 1 - 1
src/lib/Application/Application.vala

@@ -30,7 +30,7 @@ namespace LibPeer {
                 }
 
                 discoverer = new ApplicationInformationProtocol(muxer);
-                networks = network_list ?? new Network[] { new IPv4.IPv4("0.0.0.0", IPv4.IPv4.find_free_port("0.0.0.0")) };
+                networks = network_list ?? new Network[] { IPv4.IPv4.automatic() };
                 
                 foreach (var network in networks) {
                     network.bring_up();

+ 29 - 3
src/lib/Networks/IPv4/IPv4.vala

@@ -15,6 +15,8 @@ namespace LibPeer.Networks.IPv4 {
         private InetSocketAddress multicast_address;
         private HashSet<InstanceReference> advertised_instances = new HashSet<InstanceReference>((a) => a.hash(), (a, b) => a.compare(b) == 0);
 
+        public bool local_only { get; private set; }
+
         private static uint8[] multicast_magic_number = new uint8[] {'L', 'i', 'b', 'P', 'e', 'e', 'r', '2', '-', 'I', 'P', 'v', '4', ':'};
         private const uint8 DGRAM_DATA = 0;
         private const uint8 DGRAM_INQUIRE = 1;
@@ -28,7 +30,8 @@ namespace LibPeer.Networks.IPv4 {
             "libpeer.barrow.nz"
         };
 
-        public IPv4(string address, uint16 port) {
+        public IPv4(string address, uint16 port, bool local_only = false) {
+            this.local_only = local_only;
             socket = new Socket(SocketFamily.IPV4, SocketType.DATAGRAM, SocketProtocol.UDP);
             multicast_socket = new Socket(SocketFamily.IPV4, SocketType.DATAGRAM, SocketProtocol.UDP);
             var inet_address = new InetAddress.from_string(address);
@@ -37,6 +40,10 @@ namespace LibPeer.Networks.IPv4 {
             multicast_address = new InetSocketAddress(new InetAddress.from_string("224.0.0.3"), 1199);
         }
 
+        public static IPv4 automatic(bool local_only = false) {
+            return new IPv4("0.0.0.0", IPv4.find_free_port("0.0.0.0"), local_only);
+        }
+
         public override Bytes get_network_identifier() {
             return new Bytes({'I', 'P', 'v', '4'});
         }
@@ -79,10 +86,20 @@ namespace LibPeer.Networks.IPv4 {
             advertised_instances.add(instance_reference);
         }
 
+        private bool address_allowed(InetAddress address) {
+            return !local_only || address.is_site_local || address.is_link_local;
+        }
+
         public override void send(uint8[] bytes, PeerInfo peer_info) throws IOError, Error {
             var ipv4_info = (IPv4PeerInfo)peer_info;
+
+            var address = ipv4_info.to_socket_address();
+            if(!address_allowed(address.address)) {
+                throw new IOError.NETWORK_UNREACHABLE("IPv4 address of remote peer is not site-local or link-local and network has been set to local-only mode.");
+            }
+
             var buffer = new ByteComposer().add_byte(DGRAM_DATA).add_byte_array(bytes).to_byte_array();
-            socket.send_to(ipv4_info.to_socket_address(), buffer);
+            socket.send_to(address, buffer);
         }
 
         private bool listen() {
@@ -92,13 +109,18 @@ namespace LibPeer.Networks.IPv4 {
                     var buffer = new uint8[65536];
                     SocketAddress address;
                     var size = socket.receive_from(out address, buffer);
+                    var ip_address = (InetSocketAddress)address;
                     buffer.length = (int)size;
 
+                    if(!address_allowed(ip_address.address)) {
+                        continue;
+                    }
+
                     // Put the datagram into a stream
                     var stream = new MemoryInputStream.from_data(buffer);
 
                     // Create peer info
-                    var info = new IPv4PeerInfo((InetSocketAddress)address);
+                    var info = new IPv4PeerInfo(ip_address);
 
                     // Read the datagram type
                     var type = new uint8[1];
@@ -235,6 +257,10 @@ namespace LibPeer.Networks.IPv4 {
         }
 
         private void inquire(InetAddress address, int port) {
+            if(!address_allowed(address)) {
+                print(@"Not sending IPv4 inquiry for instances to $(address.to_string()):$(port) as network has been set to local-only mode\n");
+                return;
+            }
             print(@"Sending IPv4 inquiry for instances to $(address.to_string()):$(port)\n");
             socket.send_to(new InetSocketAddress(address, (uint16)port), new uint8[] { DGRAM_INQUIRE });
         }

+ 1 - 1
src/lib/Protocols/MX2/Fragmenter.vala

@@ -7,7 +7,7 @@ namespace LibPeer.Protocols.Mx2 {
 
         private uint64 message_seqn = 0;
 
-        public void send_frame(Frame frame, Instance instance, Network network, PeerInfo info) throws IOError, Error {
+        public void send_frame(Frame frame, Instance? instance, Network network, PeerInfo info) throws IOError, Error {
             // Get the size for the fragments
             var fragment_size = network.get_mtu() - Fragment.HEADER_LENGTH;
 

+ 107 - 32
src/lib/Protocols/MX2/Frame.vala

@@ -5,6 +5,27 @@ using LibPeer.Util;
 
 namespace LibPeer.Protocols.Mx2 {
 
+    public enum PayloadType {
+        INQUIRE = 5,
+        GREET = 6,
+        DATA = 22,
+        DISPEL = 21
+    }
+
+    public enum FrameCrypto {
+        NONE = 0,
+        SIGNED = 1,
+        SIGNED_ENCRYPTED = 2
+    }
+
+    public enum FrameReadStatus {
+        OK,
+        MALFORMED_FRAME,
+        INSTANCE_NOT_FOUND,
+        INVALID_SIGNATURE,
+        DECRYPTION_ERROR
+    }
+
     public class Frame {
 
         private const uint8[] MAGIC_NUMBER = {'M', 'X', '2'};
@@ -15,18 +36,26 @@ namespace LibPeer.Protocols.Mx2 {
 
         public PathInfo via { get; private set; }
 
+        public FrameCrypto cryptography_level { get; private set; }
+
+        public PayloadType payload_type { get; private set; }
+
         public uint8[] payload { get; private set; }
 
-        public Frame(InstanceReference destination, InstanceReference origin, PathInfo via, uint8[] payload) {
+        public FrameReadStatus read_status { get; private set; }
+
+        public Frame(InstanceReference destination, InstanceReference origin, PathInfo via, PayloadType type, uint8[] payload, FrameCrypto crypto_level = FrameCrypto.SIGNED_ENCRYPTED) {
             this.destination = destination;
             this.origin = origin;
             this.via = via;
+            this.cryptography_level = crypto_level;
+            this.payload_type = type;
             this.payload = payload;
         }
 
-        public void serialise(OutputStream stream, Instance instance)
+        public void serialise(OutputStream stream, Instance? instance)
             throws IOError
-            requires (instance.reference.compare(origin) == 0)
+            requires (cryptography_level == FrameCrypto.NONE || instance != null)
         {
             // Magic number
             stream.write(MAGIC_NUMBER);
@@ -40,14 +69,27 @@ namespace LibPeer.Protocols.Mx2 {
             // Write the via field
             via.serialise(stream);
 
-            // Sign the data
-            uint8[] signed_payload = Asymmetric.Signing.sign(payload, instance.sign_private_key);
+            // Write the cryptography level
+            stream.write(new uint8[] { (uint8) cryptography_level });
+
+            // Create the data that is to be protected according to the crypto level
+            uint8[] output = new ByteComposer()
+                .add_byte((uint8) payload_type)
+                .add_byte_array(payload)
+                .to_byte_array();
 
-            // Encrypt the signed payload
-            uint8[] encrypted_signed_payload = Asymmetric.Sealing.seal(signed_payload, destination.public_key);
+            if(cryptography_level >= FrameCrypto.SIGNED) {
+                // Sign the data
+                output = Asymmetric.Signing.sign(output, instance.sign_private_key);
+
+                if(cryptography_level >= FrameCrypto.SIGNED_ENCRYPTED) {
+                    // Encrypt the signed payload
+                    output = Asymmetric.Sealing.seal(output, destination.public_key);
+                }
+            }
 
             // Write the signed and encrypted payload
-            stream.write(encrypted_signed_payload);
+            stream.write(output);
         }
 
         public Frame.from_stream(InputStream stream, ConcurrentHashMap<InstanceReference, Instance> instances) throws IOError, Error{
@@ -56,7 +98,7 @@ namespace LibPeer.Protocols.Mx2 {
             stream.read(magic);
 
             if(new Bytes(magic).compare(new Bytes(MAGIC_NUMBER)) != 0) {
-                throw new IOError.FAILED("Invalid magic number");
+                read_status = FrameReadStatus.MALFORMED_FRAME;
             }
 
             // Read the destination
@@ -65,40 +107,73 @@ namespace LibPeer.Protocols.Mx2 {
             // Read the origin
             origin = new InstanceReference.from_stream(stream);
 
+            // Read the via field
+            via = new PathInfo.from_stream(stream);
+
+            // Read the crypto level field
+            var level = new uint8[1];
+            stream.read(level);
+            cryptography_level = (FrameCrypto)level[0];
+
             // Do we have an instance matching the destination of this frame?
             if (!instances.has_key(destination)) {
-                throw new IOError.FAILED("Message matches no provided instances");
+                // No, unreadable 
+                read_status = FrameReadStatus.INSTANCE_NOT_FOUND;
+                return;
             }
 
-            // Get the instance
-            Instance instance = instances.get(destination);
-
-            // Read the via field
-            via = new PathInfo.from_stream(stream);
-
-            // The remainder of the stream is the encrypted payload
-            uint8[] encrypted_signed_payload = new uint8[uint16.MAX];
+            // The remainder of the stream is the payload
+            uint8[]? frame_payload = new uint8[uint16.MAX];
             size_t bytes_read;
-            stream.read_all(encrypted_signed_payload, out bytes_read);
-            encrypted_signed_payload.resize((int)bytes_read);
+            stream.read_all(frame_payload, out bytes_read);
+            frame_payload.resize((int)bytes_read);
+
+            // If encryption was used, decrypt
+            if(cryptography_level >= FrameCrypto.SIGNED_ENCRYPTED) {
+                // Get the instance
+                var instance = instances.get(destination);
+                frame_payload = Asymmetric.Sealing.unseal(frame_payload, instance.seal_public_key, instance.seal_private_key);
+
+                if(frame_payload == null) {
+                    read_status = FrameReadStatus.DECRYPTION_ERROR;
+                    return;
+                }
+            }
 
-            // Decrypt the payload
-            uint8[]? signed_payload = Asymmetric.Sealing.unseal(encrypted_signed_payload, instance.seal_public_key, instance.seal_private_key);
+            // If signing was used, verify
+            if(cryptography_level >= FrameCrypto.SIGNED) {
+                frame_payload = Asymmetric.Signing.verify(frame_payload, origin.verification_key);
 
-            if (signed_payload == null) {
-                throw new IOError.FAILED("Payload could not be decrypted");
+                if(frame_payload == null) {
+                    read_status = FrameReadStatus.INVALID_SIGNATURE;
+                    return;
+                }
             }
 
-            // Verify the signature and get plaintext message
-            uint8[]? payload = Asymmetric.Signing.verify(signed_payload, origin.verification_key);
-
-            
-            if (payload == null) {
-                throw new IOError.FAILED("Payload signature is invalid");
+            // Read the payload type
+            payload_type = (PayloadType)frame_payload[0];
+            if(!valid_combination(cryptography_level, payload_type)) {
+                throw new IOError.INVALID_DATA("The payload type requires a higher cryptography level.");
             }
-            
 
-            this.payload = payload;
+            // Save the payload
+            payload = frame_payload[1:frame_payload.length];
+            read_status = FrameReadStatus.OK;
+        }
+
+        public bool valid_combination(FrameCrypto crypto_level, PayloadType type) {
+            switch (type) {
+                case PayloadType.DATA:
+                    return crypto_level >= FrameCrypto.SIGNED_ENCRYPTED;
+                case PayloadType.INQUIRE:
+                    return crypto_level >= FrameCrypto.SIGNED;
+                case PayloadType.GREET:
+                    return crypto_level >= FrameCrypto.SIGNED;
+                case PayloadType.DISPEL:
+                    return crypto_level >= FrameCrypto.NONE;
+                default:
+                    assert_not_reached();
+            }
         }
 
     }

+ 59 - 44
src/lib/Protocols/MX2/Muxer.vala

@@ -7,11 +7,6 @@ namespace LibPeer.Protocols.Mx2 {
 
     public class Muxer {
 
-        private const uint8 PACKET_INQUIRE = 5;
-        private const uint8 PACKET_GREET = 6;
-        private const uint8 PACKET_PAYLOAD = 22;
-        private const uint8 PACKET_HEARTBEAT = 7;
-
         private const int FALLBACK_PING_VALUE = 120000;
         
         private ConcurrentHashMap<Bytes, HashSet<Network>> networks = new ConcurrentHashMap<Bytes, HashSet<Network>>((a) => a.hash(), (a, b) => a.compare(b) == 0);
@@ -83,13 +78,12 @@ namespace LibPeer.Protocols.Mx2 {
                 foreach (Network network in networks.get(network_identifier)) {
                     // Create the inquire packet
                     uint8[] packet = new ByteComposer()
-                        .add_byte(PACKET_INQUIRE)
                         .add_bytes(inquiry.id)
                         .add_char_array(instance.application_namespace.to_utf8())
                         .to_byte_array();
 
                     // Create a frame containing an inquire packet
-                    var frame = new Frame(destination, instance.reference, new PathInfo.empty(), packet);
+                    var frame = new Frame(destination, instance.reference, new PathInfo.empty(), PayloadType.INQUIRE, packet);
 
                     // Send using the network and peer info
                     fragmenter.send_frame(frame, instance, network, peer);
@@ -116,14 +110,7 @@ namespace LibPeer.Protocols.Mx2 {
         }
 
         public void send(Instance instance, InstanceReference destination, uint8[] data) throws IOError, Error {
-            uint8[] payload = new ByteComposer()
-                .add_byte(PACKET_PAYLOAD)
-                .add_byte_array(data)
-                .to_byte_array();
-
-            //  print(@"MX2_SEND:\"$(new Util.ByteComposer().add_byte_array(payload).to_escaped_string())\"\n");
-
-            send_packet(instance, destination, payload);
+            send_packet(instance, destination, PayloadType.DATA, data);
         }
 
         public int suggested_timeout_for_instance(InstanceReference instance) {
@@ -133,7 +120,7 @@ namespace LibPeer.Protocols.Mx2 {
             return FALLBACK_PING_VALUE;
         }
 
-        protected void send_packet(Instance instance, InstanceReference destination, uint8[] payload) throws IOError, Error {
+        protected void send_packet(Instance instance, InstanceReference destination, PayloadType payload_type, uint8[] payload) throws IOError, Error {
             // Do we know the destination instance?
             if(!remote_instance_mapping.has_key(destination)) {
                 // No, throw an error
@@ -144,12 +131,21 @@ namespace LibPeer.Protocols.Mx2 {
             InstanceAccessInfo access_info = remote_instance_mapping.get(destination);
 
             // Create a frame
-            Frame frame = new Frame(destination, instance.reference, access_info.path_info, payload);
+            Frame frame = new Frame(destination, instance.reference, access_info.path_info, payload_type, payload);
 
             // Send the frame over the network
             fragmenter.send_frame(frame, instance, access_info.network, access_info.peer_info);
         }
 
+        protected void dispel_peer(Receiption receiption, Frame frame) throws Error {
+            print(@"Dispelling peer at $(receiption.peer_info)\n");
+            // Create a frame
+            Frame dispel_frame = new Frame(frame.origin, frame.destination, frame.via.return_path, PayloadType.DISPEL, new uint8[0], FrameCrypto.NONE);
+
+            // Send the frame over the network
+            fragmenter.send_frame(dispel_frame, null, receiption.network, receiption.peer_info);
+        }
+
         protected void handle_receiption(Receiption receiption) {
             // Pass to the assembler
             var stream = assembler.handle_data(receiption.stream);
@@ -163,60 +159,68 @@ namespace LibPeer.Protocols.Mx2 {
             // Read the incoming frame
             Frame frame = new Frame.from_stream(stream, instances);
 
+            // Make a decision based on how well the frame was read
+            switch (frame.read_status) {
+                case FrameReadStatus.DECRYPTION_ERROR:
+                case FrameReadStatus.INVALID_SIGNATURE:
+                case FrameReadStatus.MALFORMED_FRAME:
+                    return;
+                case FrameReadStatus.INSTANCE_NOT_FOUND:
+                    dispel_peer(receiption, frame);
+                    return;
+            }
+
             // Get the instance
             Instance instance = instances.get(frame.destination);
 
-            // Read the packet type
-            uint8 packet_type = frame.payload[0];
-
-            // Determine what to do
-            switch (packet_type) {
-                case PACKET_INQUIRE:
+            // Determine what to do with the payload
+            switch (frame.payload_type) {
+                case PayloadType.INQUIRE:
                     handle_inquire(receiption, frame, instance);
                     break;
                 
-                case PACKET_GREET:
+                case PayloadType.GREET:
                     handle_greet(receiption, frame, instance);
                     break;
 
-                case PACKET_PAYLOAD:
+                case PayloadType.DATA:
                     handle_payload(receiption, frame, instance);
                     break;
 
+                case PayloadType.DISPEL:
+                    handle_dispel(receiption, frame, instance);
+                    break;
+
                 default:
-                    throw new IOError.INVALID_DATA("Invalid packet type");
+                    throw new IOError.INVALID_DATA("Invalid payload type");
             }
             
         }
 
         protected void handle_inquire(Receiption receiption, Frame frame, Instance instance) throws Error {
             // Next 16 bytes of packet is the inquiriy ID
-            uint8[] inquiry_id = frame.payload[1:17];
+            uint8[] inquiry_id = frame.payload[0:16];
 
             // Rest of the packet indicates the desired application namespace
             string application_namespace = new ByteComposer()
-                .add_byte_array(frame.payload[17:frame.payload.length])
+                .add_byte_array(frame.payload[16:frame.payload.length])
                 .to_string();
 
             // Does the application namespace match the instance's
             if (instance.application_namespace == application_namespace) {
                 // Yes, save this instance's information locally for use later
-                remote_instance_mapping.set(frame.origin, new InstanceAccessInfo() { 
-                    network = receiption.network,
-                    peer_info = receiption.peer_info,
-                    path_info = frame.via.return_path
-                });
+                if(!remote_instance_mapping.has_key(frame.origin)) {
+                    remote_instance_mapping.set(frame.origin, new InstanceAccessInfo() { 
+                        network = receiption.network,
+                        peer_info = receiption.peer_info,
+                        path_info = frame.via.return_path
+                    });
+                }
 
                 print(@"Saved instance mapping with address $(receiption.peer_info.to_string()) due to inquiry\n");
 
-                // Create the greeting
-                uint8[] greeting = new ByteComposer()
-                    .add_byte(PACKET_GREET)
-                    .add_byte_array(inquiry_id)
-                    .to_byte_array();
-
                 // Send the greeting
-                send_packet(instance, frame.origin, greeting);
+                send_packet(instance, frame.origin, PayloadType.GREET, inquiry_id);
             }
             else {
                 print(@"$(instance.application_namespace) != $(application_namespace)\n");
@@ -238,7 +242,7 @@ namespace LibPeer.Protocols.Mx2 {
                 print(@"Saved instance mapping with address $(receiption.peer_info.to_string()) due to greeting\n");
 
                 // Get the inquiry id
-                Bytes inquiry_id = new Bytes(frame.payload[1:17]);
+                Bytes inquiry_id = new Bytes(frame.payload[0:16]);
 
                 // Determine the ping
                 int ping = FALLBACK_PING_VALUE;
@@ -258,10 +262,21 @@ namespace LibPeer.Protocols.Mx2 {
             }
         }
 
+        private void handle_dispel(Receiption receiption, Frame frame, Instance instance) throws Error {
+            print("Dispelled instance due to request from remote machine\n");
+            // Received a dispel frame
+            remote_instance_mapping.unset(frame.origin);
+        }
+
         protected void handle_payload(Receiption receiption, Frame frame, Instance instance) throws Error {
-            // This is a payload for the next layer to handle, pass it up.
-            //  print(@"MX2_HANDLE:\"$(new Util.ByteComposer().add_byte_array(frame.payload).to_escaped_string())\"\n");
-            MemoryInputStream stream = new MemoryInputStream.from_data(frame.payload[1:frame.payload.length]);
+            // Update access info - where we receive data from is always where we should send it back
+            remote_instance_mapping.set(frame.origin, new InstanceAccessInfo() { 
+                network = receiption.network,
+                peer_info = receiption.peer_info,
+                path_info = frame.via.return_path
+            });
+
+            MemoryInputStream stream = new MemoryInputStream.from_data(frame.payload);
             instance.incoming_payload(new Packet(frame.origin, frame.destination, stream));
         }
 

+ 4 - 0
src/lib/Protocols/STP/Sessions/Session.vala

@@ -45,6 +45,10 @@ namespace LibPeer.Protocols.Stp.Sessions {
             outgoing_segment_queue.push(segment);
         }
 
+        public virtual void segment_failure(Segment segment, Error error) {
+            close_session(@"Could not send segment over the network: $(error.message)");
+        }
+
         public abstract void process_segment(Segment segment);
 
         protected virtual void close_session(string reason) {

+ 13 - 2
src/lib/Protocols/STP/StreamTransmissionProtocol.vala

@@ -290,7 +290,12 @@ namespace LibPeer.Protocols.Stp {
         private void send_pending_segement(Session session) {
             var segment = session.get_pending_segment();
             var message = new SegmentMessage(new Bytes(session.identifier), segment);
-            send_packet(session.target, s => message.serialise(s));
+            try {
+                send_packet(session.target, s => message.serialise(s));
+            }
+            catch (Error e) {
+                session.segment_failure(segment, e);
+            }
         }
 
         private void notify_app(ThreadFunc<void> func) {
@@ -306,7 +311,13 @@ namespace LibPeer.Protocols.Stp {
             private InstanceReference target;
     
             protected override void do_task () {
-                stp.send_packet(target, s => message.serialise(s));
+                try {
+                    stp.send_packet(target, s => message.serialise(s));
+                }
+                catch(Error e) {
+                    printerr(@"Error retransmitting packet: $(e.message)\n");
+                    cancel();
+                }
             }
     
             public MessageRetransmitter(StreamTransmissionProtocol stp, InstanceReference target, Message message, uint64 interval = 10000, int repeat = 12) {

+ 1 - 1
src/toys/hello_world/Main.vala

@@ -16,7 +16,7 @@ namespace HelloWorldApp {
         }
 
         public Main() {
-            initialise("hello-world");
+            initialise("hello-world", new LibPeer.Networks.Network[] { LibPeer.Networks.IPv4.IPv4.automatic(true) });
         }
         
         protected override void on_incoming_stream (StpInputStream stream) {