소스 검색

Very incomplete AIP

Billy Barrow 4 년 전
부모
커밋
49ac0bb71e

+ 56 - 0
src/lib/Protocols/AIP/AipCapabilities.vala

@@ -0,0 +1,56 @@
+using LibPeer.Util;
+
+namespace LibPeer.Protocols.Aip {
+
+    public class AipCapabilities {
+
+        public bool address_info { get; set; }
+        public bool find_peers { get; set; }
+        public bool query_answer { get; set; }
+
+        public void serialise(OutputStream stream) throws IOError {
+            var dos = new DataOutputStream(stream);
+            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            var composer = new ByteComposer();
+            if(address_info) {
+                composer.add_byte(ApplicationInformationProtocol.CAPABILITY_ADDRESS_INFO);
+            }
+            if(find_peers) {
+                composer.add_byte(ApplicationInformationProtocol.CAPABILITY_FIND_PEERS);
+            }
+            if(query_answer) {
+                composer.add_byte(ApplicationInformationProtocol.CAPABILITY_QUERY_ANSWER);
+            }
+
+            var data = composer.to_byte_array();
+
+            dos.put_byte((uint8)data.length);
+            dos.write(data);
+        }
+
+        public AipCapabilities.from_stream(InputStream stream) throws IOError {
+            var dis = new DataInputStream(stream);
+            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            var capability_count = dis.read_byte();
+
+            for (var i = 0; i < capability_count; i++) {
+                var byte = dis.read_byte();
+                switch (byte) {
+                    case ApplicationInformationProtocol.CAPABILITY_ADDRESS_INFO:
+                        address_info = true;
+                        break;
+                    case ApplicationInformationProtocol.CAPABILITY_FIND_PEERS:
+                        find_peers = true;
+                        break;
+                    case ApplicationInformationProtocol.CAPABILITY_QUERY_ANSWER:
+                        query_answer = true;
+                        break;
+                }
+            }
+        }
+
+    }
+
+}

+ 49 - 0
src/lib/Protocols/AIP/Answer.vala

@@ -0,0 +1,49 @@
+using LibPeer.Protocols.Mx2;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    internal class Answer {
+
+        public Bytes in_reply_to { get; set; }
+
+        public Bytes data { get; set; }
+
+        public InstanceReference[] path { get; set; }
+
+        public void serialise(OutputStream stream) throws IOError {
+            var dos = new DataOutputStream(stream);
+            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            dos.put_int32(data.length);
+            dos.put_byte((uint8)path.length);
+
+            foreach (var reference in path) {
+                reference.serialise(dos);
+            }
+
+            dos.write(data.get_data());
+        }
+
+        public Answer.from_stream(InputStream stream) throws IOError{
+            var dis = new DataInputStream(stream);
+            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // What is this in reply to?
+            in_reply_to = dis.read_bytes(16);
+
+            var data_length = dis.read_int32();
+            var path_size = dis.read_byte();
+
+            path = new InstanceReference[path_size];
+
+            for(var i = 0; i < path_size; i++) {
+                path[i] = new InstanceReference.from_stream(dis);
+            }
+
+            data = dis.read_bytes(data_length);
+        }
+
+    }
+
+}

+ 20 - 0
src/lib/Protocols/AIP/ApplicationInformation.vala

@@ -0,0 +1,20 @@
+using LibPeer.Protocols.Mx2;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    public class ApplicationInformation {
+
+        public InstanceReference instance { get; protected set; }
+
+        public string application_namespace { get; protected set; }
+
+        public HashSet<Bytes> resource_set = new Gee.HashSet<Bytes>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+
+        public ApplicationInformation(InstanceReference iref, string app_namespace) {
+            instance = iref;
+            application_namespace = app_namespace;
+        }
+    }
+
+}

+ 294 - 0
src/lib/Protocols/AIP/ApplicationInformationProtocol.vala

@@ -0,0 +1,294 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Protocols.Stp;
+using LibPeer.Protocols.Stp.Streams;
+using LibPeer.Util;
+using LibPeer.Networks;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    class ApplicationInformationProtocol {
+
+        internal const uint8 DATA_FOLLOWING_REQUEST = 'R';
+        internal const uint8 DATA_FOLLOWING_QUERY = 'Q';
+        internal const uint8 DATA_FOLLOWING_ANSWER = 'A';
+
+        internal const uint8 REQUEST_CAPABILITIES = 'C';
+        internal const uint8 REQUEST_ADDRESS = 'A';
+        internal const uint8 REQUEST_PEERS = 'P';
+
+        internal const uint8 QUERY_GROUP = 'G';
+        internal const uint8 QUERY_APPLICATION = 'A';
+        internal const uint8 QUERY_APPLICATION_RESOURCE = 'R';
+
+        internal const uint8 CAPABILITY_ADDRESS_INFO = 'A';
+        internal const uint8 CAPABILITY_FIND_PEERS = 'P';
+        internal const uint8 CAPABILITY_QUERY_ANSWER = 'Q';
+
+        private const int MAX_QUERY_HOPS = 16;
+
+
+        protected AipCapabilities capabilities;
+        protected bool join_all_groups = false;
+        protected Gee.List<ApplicationInformation> application_information;
+
+        protected Muxer muxer;
+        protected Instance instance;
+        protected StreamTransmissionProtocol transport;
+
+        protected HashSet<InstanceReference> discovered_peers = new HashSet<InstanceReference>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected ConcurrentHashMap<InstanceReference, PeerInfo> peer_connection_methods = new ConcurrentHashMap<InstanceReference, PeerInfo>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected ConcurrentHashMap<InstanceReference, AipCapabilities> instance_capabilities = new ConcurrentHashMap<InstanceReference, AipCapabilities>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected QueryGroup default_group = new QueryGroup (20);
+        protected ConcurrentHashMap<Bytes, QueryGroup> query_groups = new ConcurrentHashMap<Bytes, QueryGroup>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected HashSet<InstanceReference> reachable_peers = new HashSet<InstanceReference>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+
+        protected TimeoutMap<Bytes, Query> queries = new TimeoutMap<Bytes, Query>(120, (a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected TimeoutMap<Bytes, int> query_response_count = new TimeoutMap<Bytes, int>(120, (a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected HashSet<Bytes> handled_query_ids = new HashSet<Bytes>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        protected HashSet<PeerInfo> peer_info = new HashSet<PeerInfo>((a) => a.hash(), (a, b) => a.equals(b));
+        protected signal void new_peer_info(PeerInfo info);
+
+        protected signal void new_group_peer(PeerInfo info, Bytes id);
+        protected bool is_ready;
+        protected signal void on_peer_greet(InstanceReference info);
+        public signal void ready();
+
+        public ApplicationInformationProtocol(Muxer muxer, AipCapabilities? capabilities = null, bool join_all = false) {
+            if(capabilities == null) {
+                this.capabilities = new AipCapabilities (){
+                    address_info = true,
+                    find_peers = true,
+                    query_answer = true
+                };
+            }
+            else {
+                this.capabilities = capabilities;
+            }
+
+            this.join_all_groups = join_all;
+
+            this.muxer = muxer;
+            instance = muxer.create_instance ("AIP");
+            transport = new StreamTransmissionProtocol(muxer, instance);
+
+            // Attach signal handlers
+            instance.incoming_greeting.connect(rx_greeting);
+            //transport.incoming_stream()
+        }
+
+        public void add_network(Network network) {
+            network.incoming_advertisment.connect(rx_advertisement);
+            muxer.register_network(network);
+            network.advertise(instance.reference);
+        }
+
+        protected void rx_advertisement(Advertisement advertisement) {
+            // Send an inquiry
+            muxer.inquire(instance, advertisement.instance_reference, new PeerInfo[] { advertisement.peer_info });
+        }
+
+        protected void rx_greeting(InstanceReference greeting) {
+            // Add to known peers
+            discovered_peers.add(greeting);
+
+            // Request capabilities from the instance
+            request_capabilities(greeting, m => rx_capabilities(greeting, m));
+        }
+
+        protected void rx_capabilities(InstanceReference target, AipCapabilities capabilities) {
+            // Save the capabilities
+            instance_capabilities.set(target, capabilities);
+
+            // Can we ask the peer for our address?
+            if(capabilities.address_info) {
+                // Yes, do it
+                request_address(target, rx_address);
+            }
+            // Can we ask the peer for other peers?
+            if(capabilities.find_peers) {
+                // Yes, do it
+                request_peers(target, rx_peers);
+            }
+            // Can we send queries and answers to this peer?
+            if(capabilities.query_answer) {
+                // Yes, add to default group
+                default_group.add_peer(target);
+
+                // Peer is now reachable for queries
+                reachable_peers.add(target);
+
+                // We now have a queryable peer
+                if(!is_ready) {
+                    is_ready = true;
+                    ready();
+                }
+
+                // There may may be code waiting for this moment
+                on_peer_greet(target);
+            }
+        }
+
+        protected void rx_address(PeerInfo info) {
+            // We received peer info, add to our set
+            peer_info.add(info);
+            new_peer_info(info);
+        }
+
+        protected void rx_peers(Gee.List<InstanceInformation> peers) {
+            // We received a list of peers running AIP, do we want more peers?
+            if(!default_group.actively_connect) {
+                // Don't worry about it
+                return;
+            }
+
+            // Send out inquries to the peers
+            foreach (var peer in peers) {
+                muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
+            }
+        }
+
+        protected void request_address(InstanceReference target, Func<PeerInfo> callback) {
+            // Make the request
+            var request = new ByteComposer().add_byte(REQUEST_ADDRESS).to_bytes();
+            send_request(request, target, s => {
+                // Read the address (peer info)
+                var address = PeerInfo.deserialise(s);
+                // Callback
+                callback(address);
+            });
+        }
+
+        protected void request_capabilities(InstanceReference target, Func<AipCapabilities> callback) {
+            // Make the request
+            var request = new ByteComposer().add_byte(REQUEST_CAPABILITIES).to_bytes();
+            send_request(request, target, s => {
+                // Read capabilities
+                var target_capabilities = new AipCapabilities.from_stream(s);
+                // Callback
+                callback(target_capabilities);
+            });
+        }
+
+        protected void request_peers(InstanceReference target, Func<Gee.List<InstanceInformation>> callback) {
+            // Make the request
+            var request = new ByteComposer().add_byte(REQUEST_PEERS).to_bytes();
+            send_request(request, target, s => {
+                var dis = new DataInputStream(s);
+                dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+                // Read number of peers
+                var peer_count = dis.read_byte();
+
+                // Create the list
+                var peers = new ArrayList<InstanceInformation>();
+
+                // Read the peers (instance info)
+                for (int i = 0; i < peer_count; i++) {
+                    peers.add(new InstanceInformation.from_stream(dis));
+                }
+
+                // Callback
+                callback(peers);
+            });
+        }
+
+        protected void send_request(Bytes request, InstanceReference target, Func<InputStream> callback) {
+            // Open a stream with the peer
+            transport.initialise_stream(target).established.connect((s) => {
+                // Connect reply signal
+                s.reply.connect(m => callback(m));
+
+                // Send the request
+                s.write(new ByteComposer().add_byte(DATA_FOLLOWING_REQUEST).add_bytes(request).to_byte_array());
+                s.close();
+            });
+        }    
+        
+        protected void rx_stream(InputStream stream) {
+            // Figure out what data follows
+            uint8[] _following = new uint8[1];
+            stream.read(_following);
+            var following = _following[0];
+
+            if(following == DATA_FOLLOWING_ANSWER && capabilities.query_answer) {
+                
+            }
+
+        }
+
+        protected void handle_answer(InputStream stream) {
+            // Deserialise the answer
+            var answer = new Answer.from_stream(stream);
+
+            // Is this an answer to one of our queries?
+            if(queries.has_key(answer.in_reply_to)) {
+                // Yes, get the query
+                var query = queries.get(answer.in_reply_to);
+
+                // Get instance information from the answer
+                var info = new InstanceInformation.from_stream(new MemoryInputStream.from_bytes(answer.data));
+
+                // Notify the query's subject listeners
+                query.on_answer(info);
+            }
+        }
+
+        protected void handle_query(StpInputStream stream) throws IOError, Error {
+            // Deserialise the query
+            var query = new Query.from_stream(stream);
+
+            // Have we come across this query before?
+            if(handled_query_ids.contains(query.identifier)) {
+                // Don't forward
+                return;
+            }
+
+            // Mark as handled
+            handled_query_ids.add(query.identifier);
+
+            // Create a replies counter
+            query_response_count.set(query.identifier, query.max_replies);
+
+            // Append the originator of the stream to the query reply path
+            query.append_return_hop(stream.origin);
+
+            // Increment the query hops
+            query.hops ++;
+
+            // Read through the query data
+            var dis = new DataInputStream(new MemoryInputStream.from_bytes(query.data));
+            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // Find the query type
+            var query_type = dis.read_byte();
+
+            if(query_type == QUERY_GROUP) {
+                // Get the group identifier
+                var group_id = dis.read_bytes(query.data.length - 1);
+
+                // Are we not in this group, but joining all?
+                if(join_all_groups && !query_groups.has_key(group_id)) {
+                    // Join the group
+                    
+                }
+            }
+        }
+
+        public void join_query_group(Bytes group) {
+            // Create the query group
+            query_groups.set(group, new QueryGroup());
+
+            // TODO continue
+        }
+
+        public void send_group_query(Bytes group) {
+            // Construct a query asking for peers in the group
+            var query = new Query(new ByteComposer().add_byte(QUERY_GROUP).add_bytes(group).to_bytes());
+
+            // 
+        }
+        
+
+    }
+
+}

+ 52 - 0
src/lib/Protocols/AIP/InstanceInformation.vala

@@ -0,0 +1,52 @@
+using LibPeer.Protocols.Mx2;
+using LibPeer.Networks;
+
+namespace LibPeer.Protocols.Aip {
+
+    internal class InstanceInformation {
+
+        public InstanceReference instance_reference { get; private set; }
+
+        public PeerInfo[] connection_methods { get; private set; }
+
+        public InstanceInformation(InstanceReference instance, PeerInfo[] methods) {
+            instance_reference = instance;
+            connection_methods = methods;
+        }
+
+        public void serialise(OutputStream stream) throws IOError, Error {
+            var dos = new DataOutputStream(stream);
+            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // Write instance reference
+            instance_reference.serialise(dos);
+            
+            // Write number of connection methods
+            dos.put_byte((uint8)connection_methods.length);
+
+            // Write connection methods
+            foreach (var method in connection_methods) {
+                method.serialise(dos);
+            }
+        }
+
+        public InstanceInformation.from_stream(InputStream stream) throws IOError, Error {
+            var dis = new DataInputStream(stream);
+            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // Read the instance reference
+            instance_reference = new InstanceReference.from_stream(dis);
+
+            // Read number of connection methods
+            var method_count = dis.read_byte();
+
+            // Read conneciton methods
+            connection_methods = new PeerInfo[method_count];
+            for (int i = 0; i < method_count; i++) {
+                connection_methods[i] = PeerInfo.deserialise(stream);
+            }
+        }
+
+    }
+
+}

+ 85 - 0
src/lib/Protocols/AIP/Query.vala

@@ -0,0 +1,85 @@
+using LibPeer.Protocols.Mx2;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    internal class Query {
+
+        public Bytes identifier { get; set; }
+
+        public Bytes data { get; set; }
+
+        public uint8 max_replies { get; set; }
+
+        public uint8 hops { get; set; }
+
+        public InstanceReference[] return_path { get; set; }
+
+        public signal void on_answer(InstanceInformation answer);
+
+        public void serialise(OutputStream stream) throws IOError, Error {
+            var dos = new DataOutputStream(stream);
+            dos.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // Write query identifier
+            dos.write_bytes(identifier);
+
+            // Send header data
+            dos.put_byte(hops);
+            dos.put_byte(max_replies);
+            dos.put_uint16((uint16)data.length);
+            dos.put_byte((uint8)return_path.length);
+
+            // Serialise the return path
+            foreach (var reference in return_path) {
+                reference.serialise(dos);
+            }
+
+            // Write the query data
+            dos.write_bytes(data);
+        }
+
+        public Query.from_stream(InputStream stream) throws IOError, Error{
+            var dis = new DataInputStream(stream);
+            dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
+
+            // Read the identifier
+            identifier = dis.read_bytes(16);
+
+            // Read header data
+            hops = dis.read_byte();
+            max_replies = dis.read_byte();
+            var data_length = dis.read_uint16();
+            var return_path_size = dis.read_byte();
+
+            // Deserialise return path
+            return_path = new InstanceReference[return_path_size];
+            for(var i = 0; i < return_path_size; i++) {
+                return_path[i] = new InstanceReference.from_stream(dis);
+            }
+
+            // Read the query data
+            data = stream.read_bytes(data_length);
+        }
+
+        public void append_return_hop(InstanceReference instance) {
+            var paths = return_path;
+            return_path = new InstanceReference[paths.length + 1];
+            return_path[paths.length] = instance;
+        }
+
+        public Query(Bytes data, uint8 max_replies = 10, uint8 hops = 0, InstanceReference[] return_path = new InstanceReference[0], Bytes? identifier = null) {
+            if(identifier == null) {
+                uint8[] uuid = new uint8[16];
+                UUID.generate_random(uuid);
+                this.identifier = new Bytes(uuid);
+            }
+            this.data = data;
+            this.max_replies = max_replies;
+            this.hops = hops;
+            this.return_path = return_path;
+        }
+
+    }
+
+}

+ 27 - 0
src/lib/Protocols/AIP/QueryGroup.vala

@@ -0,0 +1,27 @@
+using LibPeer.Protocols.Mx2;
+using Gee;
+
+namespace LibPeer.Protocols.Aip {
+
+    internal class QueryGroup {
+
+        private HashSet<InstanceReference> instances = new HashSet<InstanceReference>((a) => a.hash(), (a, b) => a.compare(b) == 0);
+        private int target;
+
+        public QueryGroup(int target = 15) {
+            this.target = target;
+        }
+
+        public void add_peer(InstanceReference instance) {
+            instances.add(instance);
+        }
+
+        public bool actively_connect {
+            get {
+                return instances.size < target;
+            }
+        }
+
+    }
+
+}

+ 1 - 1
src/lib/Protocols/MX2/Muxer.vala

@@ -60,7 +60,7 @@ namespace LibPeer.Protocols.Mx2 {
             return instance;
         }
 
-        public Inquiry inquire(Instance instance, InstanceReference destination, GLib.List<PeerInfo> peers) throws IOError, Error {
+        public Inquiry inquire(Instance instance, InstanceReference destination, PeerInfo[] peers) throws IOError, Error {
             // Create an inquiry
             var inquiry = new Inquiry(destination);
             inquiries.set(inquiry.id, inquiry);

+ 1 - 1
src/lib/Protocols/STP/Streams/InputStream.vala

@@ -11,7 +11,7 @@ namespace LibPeer.Protocols.Stp.Streams {
         private Cond data_cond = Cond();
         private Mutex data_mutex = Mutex();
 
-        public InstanceReference target { get { return session.target; }}
+        public InstanceReference origin { get { return session.target; }}
         public uint8[] session_id { get { return session.identifier; }}
 
         public StpInputStream(IngressSession session) {

+ 88 - 0
src/lib/Util/TimeoutMap.vala

@@ -0,0 +1,88 @@
+using Gee;
+
+namespace LibPeer.Util {
+
+    private class TimeoutObject<T> {
+        public T object;
+        public int64 timestamp;
+
+        public void touch() {
+            timestamp = get_monotonic_time();
+        }
+
+        public TimeoutObject(T obj) {
+            object = obj;
+            touch();
+        }
+    }
+
+    public class TimeoutMap<K, V> {
+
+        public TimeoutMap (int timeout, owned HashDataFunc<K>? key_hash_func = null, owned EqualDataFunc<K>? key_equal_func = null, owned EqualDataFunc<V>? value_equal_func = null) {
+            EqualDataFunc<TimeoutObject<V>> unwrapped_value_equal_func = (a, b) => value_equal_func(a.object, b.object);
+            map = new ConcurrentHashMap<K, TimeoutObject<V>> (key_hash_func, key_equal_func, unwrapped_value_equal_func);
+            this.timeout = timeout;
+            timeout_fuzz_ms = 10000;
+        }
+
+        public int timeout { get; set; }
+        public int timeout_fuzz_ms { get; set; }
+
+        private int64 last_clean = 0;
+
+        private ConcurrentHashMap<K, TimeoutObject<V>> map;
+
+        public void @set(K key, V value) {
+            map.set(key, new TimeoutObject<V>(value));
+        }
+
+        public V @get(K key) {
+            clean();
+            var to = map.get(key);
+            to.touch();
+            return to.object;
+        }
+
+        public bool has_key(K key) {
+            clean();
+            lock(map) {
+                return map.has_key(key);
+            }
+        }
+
+        public bool unset(K key, out V value) {
+            lock(map) {
+                TimeoutObject<V> wrapped_value;
+                bool result = map.unset(key, out wrapped_value);
+                if(result) {
+                    value = wrapped_value.object;
+                }
+                return result;
+            }
+        }
+
+        public void clear() {
+            lock(map) {
+                map.clear();
+            }
+        }
+
+        public void clean() {
+            if(last_clean > get_monotonic_time() - (timeout_fuzz_ms * 1000)) {
+                return;
+            }
+            lock(map) {
+                int64 min_timestamp = get_monotonic_time() - (timeout * 1000000);
+                foreach (var key in map.keys) {
+                    var to = map.get(key);
+                    if(to.timestamp < min_timestamp) {
+                        map.unset(key);
+                    }
+                }
+                last_clean = get_monotonic_time();
+            }
+        }
+        
+    }
+
+}

+ 8 - 0
src/lib/meson.build

@@ -52,10 +52,18 @@ sources += files('Protocols/STP/Messages/RequestSession.vala')
 sources += files('Protocols/STP/Messages/SegmentMessage.vala')
 sources += files('Protocols/STP/Streams/InputStream.vala')
 sources += files('Protocols/STP/Streams/OutputStream.vala')
+sources += files('Protocols/AIP/ApplicationInformationProtocol.vala')
+sources += files('Protocols/AIP/AipCapabilities.vala')
+sources += files('Protocols/AIP/ApplicationInformation.vala')
+sources += files('Protocols/AIP/QueryGroup.vala')
+sources += files('Protocols/AIP/Query.vala')
+sources += files('Protocols/AIP/InstanceInformation.vala')
+sources += files('Protocols/AIP/Answer.vala')
 sources += files('Util/ByteComposer.vala')
 sources += files('Util/QueueCommand.vala')
 sources += files('Util/ThreadTimer.vala')
 sources += files('Util/ConcurrentHashMap.vala')
+sources += files('Util/TimeoutMap.vala')
 
 libpeer = library('peer', sources, dependencies: dependencies)
 libpeer_dep = declare_dependency(link_with: libpeer, include_directories: include_directories('.'))