|
@@ -47,13 +47,14 @@ namespace LibPeer.Protocols.Aip {
|
|
|
protected TimeoutMap<Bytes, int> query_response_count = new TimeoutMap<Bytes, int>(120, (a) => a.hash(), (a, b) => a.compare(b) == 0);
|
|
|
protected HashSet<Bytes> handled_query_ids = new HashSet<Bytes>((a) => a.hash(), (a, b) => a.compare(b) == 0);
|
|
|
protected HashSet<PeerInfo> peer_info = new HashSet<PeerInfo>((a) => a.hash(), (a, b) => a.equals(b));
|
|
|
- protected signal void new_peer_info(PeerInfo info);
|
|
|
|
|
|
- protected signal void new_group_peer(PeerInfo info, Bytes id);
|
|
|
+ protected signal void new_group_peer(InstanceReference info, Bytes id);
|
|
|
protected bool is_ready;
|
|
|
- protected signal void on_peer_greet(InstanceReference info);
|
|
|
+ protected TimeoutMap<InstanceReference, Bytes> pending_group_peers = new TimeoutMap<InstanceReference, Bytes>(120, (a) => a.hash(), (a, b) => a.compare(b) == 0);
|
|
|
public signal void ready();
|
|
|
|
|
|
+ private GLib.List<Query> pending_queries = new GLib.List<Query>();
|
|
|
+
|
|
|
public ApplicationInformationProtocol(Muxer muxer, AipCapabilities? capabilities = null, bool join_all = false) {
|
|
|
if(capabilities == null) {
|
|
|
this.capabilities = new AipCapabilities (){
|
|
@@ -74,7 +75,7 @@ namespace LibPeer.Protocols.Aip {
|
|
|
|
|
|
// Attach signal handlers
|
|
|
instance.incoming_greeting.connect(rx_greeting);
|
|
|
- //transport.incoming_stream()
|
|
|
+ transport.incoming_stream.connect(rx_stream);
|
|
|
}
|
|
|
|
|
|
public void add_network(Network network) {
|
|
@@ -125,14 +126,29 @@ namespace LibPeer.Protocols.Aip {
|
|
|
}
|
|
|
|
|
|
// There may may be code waiting for this moment
|
|
|
- on_peer_greet(target);
|
|
|
+ if(pending_group_peers.has_key(target)) {
|
|
|
+ Bytes group;
|
|
|
+ pending_group_peers.unset(target, out group);
|
|
|
+ new_group_peer(target, group);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected void rx_address(PeerInfo info) {
|
|
|
// We received peer info, add to our set
|
|
|
peer_info.add(info);
|
|
|
- new_peer_info(info);
|
|
|
+
|
|
|
+ // Do we have any pending queries?
|
|
|
+ if(pending_queries.length() > 0) {
|
|
|
+ // Clear the list
|
|
|
+ var queries = pending_queries;
|
|
|
+ pending_queries = null;
|
|
|
+
|
|
|
+ // Send pending queries
|
|
|
+ foreach (var query in queries) {
|
|
|
+ send_query_answer(query);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected void rx_peers(Gee.List<InstanceInformation> peers) {
|
|
@@ -204,14 +220,23 @@ namespace LibPeer.Protocols.Aip {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- protected void rx_stream(InputStream stream) {
|
|
|
+ protected void rx_stream(StpInputStream stream) {
|
|
|
// Figure out what data follows
|
|
|
uint8[] _following = new uint8[1];
|
|
|
stream.read(_following);
|
|
|
var following = _following[0];
|
|
|
|
|
|
if(following == DATA_FOLLOWING_ANSWER && capabilities.query_answer) {
|
|
|
-
|
|
|
+ handle_answer(stream);
|
|
|
+ }
|
|
|
+ else if(following == DATA_FOLLOWING_QUERY && capabilities.query_answer) {
|
|
|
+ handle_query(stream);
|
|
|
+ }
|
|
|
+ else if(following == DATA_FOLLOWING_REQUEST) {
|
|
|
+ handle_request(stream);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ stream.close();
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -231,6 +256,51 @@ namespace LibPeer.Protocols.Aip {
|
|
|
// Notify the query's subject listeners
|
|
|
query.on_answer(info);
|
|
|
}
|
|
|
+
|
|
|
+ // Does this have somewhere to forward to?
|
|
|
+ if(answer.path.length > 0) {
|
|
|
+ // Put it back on its path
|
|
|
+ send_answer(answer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void handle_request(StpInputStream stream) throws IOError, Error {
|
|
|
+ // Get the request type
|
|
|
+ var _request_type = new uint8[1];
|
|
|
+ stream.read(_request_type);
|
|
|
+ var request_type = _request_type[0];
|
|
|
+
|
|
|
+ // Is the request one of our capabilities?
|
|
|
+ if(!capabilities.has_capability_for_request_code(request_type)) {
|
|
|
+ // Ignore
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reply to the sender
|
|
|
+ transport.initialise_stream(stream.origin, stream.session_id).established.connect(os => {
|
|
|
+ switch (request_type) {
|
|
|
+ case REQUEST_CAPABILITIES:
|
|
|
+ capabilities.serialise(os);
|
|
|
+ break;
|
|
|
+ case REQUEST_ADDRESS:
|
|
|
+ muxer.get_peer_info_for_instance(os.target).serialise(os);
|
|
|
+ break;
|
|
|
+ case REQUEST_PEERS:
|
|
|
+ // TODO: implement
|
|
|
+ os.write(new uint8[] {0});
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ os.close();
|
|
|
+ });
|
|
|
+
|
|
|
+ // Have we encountered this peer before?
|
|
|
+ if(!discovered_peers.contains(stream.origin)) {
|
|
|
+ // No, add it
|
|
|
+ discovered_peers.add(stream.origin);
|
|
|
+
|
|
|
+ // Ask for capabilities
|
|
|
+ request_capabilities(stream.origin, c => rx_capabilities(stream.origin, c));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected void handle_query(StpInputStream stream) throws IOError, Error {
|
|
@@ -255,37 +325,209 @@ namespace LibPeer.Protocols.Aip {
|
|
|
// Increment the query hops
|
|
|
query.hops ++;
|
|
|
|
|
|
- // Read through the query data
|
|
|
- var dis = new DataInputStream(new MemoryInputStream.from_bytes(query.data));
|
|
|
- dis.byte_order = DataStreamByteOrder.BIG_ENDIAN;
|
|
|
-
|
|
|
// Find the query type
|
|
|
- var query_type = dis.read_byte();
|
|
|
+ var query_type = query.data[0];
|
|
|
|
|
|
if(query_type == QUERY_GROUP) {
|
|
|
// Get the group identifier
|
|
|
- var group_id = dis.read_bytes(query.data.length - 1);
|
|
|
+ var group_id = query.data[1:-1];
|
|
|
|
|
|
// Are we not in this group, but joining all?
|
|
|
if(join_all_groups && !query_groups.has_key(group_id)) {
|
|
|
// Join the group
|
|
|
-
|
|
|
+ join_query_group(group_id);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Are we in this group?
|
|
|
+ if(query_groups.has_key(group_id)) {
|
|
|
+ // Yes, send a reply
|
|
|
+ queue_query_answer(query);
|
|
|
+ }
|
|
|
+
|
|
|
+ // This is a query for a group, forward on to the default group
|
|
|
+ send_query(query, default_group);
|
|
|
+ }
|
|
|
+ else if(query_type == QUERY_APPLICATION) {
|
|
|
+ // Get the application namespace
|
|
|
+ var app_namespace = new Bytes(query.data[1:-1]);
|
|
|
+
|
|
|
+ // Are we in a group for this namespace?
|
|
|
+ if(query_groups.has_key(app_namespace)) {
|
|
|
+ // Yes, find relevent ApplicationInformation
|
|
|
+ foreach (var app in application_information) {
|
|
|
+ // Is this app relevent? TODO: Use a hashmap
|
|
|
+ if(app.namespace_bytes.compare(app_namespace) == 0) {
|
|
|
+ // Yes, answer the query
|
|
|
+ queue_query_answer(query);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Forward onto the application group
|
|
|
+ send_query(query, query_groups.get(app_namespace));
|
|
|
}
|
|
|
}
|
|
|
+ else if(query_type == QUERY_APPLICATION_RESOURCE) {
|
|
|
+ // Read the label
|
|
|
+ var label = new Bytes(query.data[1:33]);
|
|
|
+
|
|
|
+ // Read the application namespace
|
|
|
+ var app_namespace = new Bytes(query.data[33:-1]);
|
|
|
+
|
|
|
+ // Are we in a group for this namespace?
|
|
|
+ if(query_groups.has_key(app_namespace)) {
|
|
|
+ // Yes, find relevent ApplicationInformation
|
|
|
+ foreach (var app in application_information) {
|
|
|
+ // Is this app relevent and does it have this resource?
|
|
|
+ if(app.namespace_bytes.compare(app_namespace) == 0 && app.resource_set.contains(label)) {
|
|
|
+ // Yes, answer the query
|
|
|
+ queue_query_answer(query);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Forward onto the application group
|
|
|
+ send_query(query, query_groups.get(app_namespace));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void queue_query_answer(Query query) {
|
|
|
+ // Do we have peer info to send yet?
|
|
|
+ if(peer_info.size > 0) {
|
|
|
+ // Yes, do it
|
|
|
+ send_query_answer(query);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // No, wait for peer info
|
|
|
+ pending_queries.append(query);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void send_query_answer(Query query) {
|
|
|
+ // Create some instance information
|
|
|
+ var instance_info = new InstanceInformation(instance.reference, peer_info.to_array());
|
|
|
+
|
|
|
+ // Serialise the info
|
|
|
+ MemoryOutputStream stream = new MemoryOutputStream(null, GLib.realloc, GLib.free);
|
|
|
+ instance_info.serialise(stream);
|
|
|
+ stream.close();
|
|
|
+ uint8[] buffer = stream.steal_data();
|
|
|
+ buffer.length = (int)stream.get_data_size();
|
|
|
+
|
|
|
+ // Send the instance information in the answer
|
|
|
+ var answer = new Answer() {
|
|
|
+ data = new Bytes(buffer),
|
|
|
+ in_reply_to = query.identifier,
|
|
|
+ path = query.return_path
|
|
|
+ };
|
|
|
+
|
|
|
+ // Send the answer
|
|
|
+ send_answer(answer);
|
|
|
}
|
|
|
|
|
|
public void join_query_group(Bytes group) {
|
|
|
// Create the query group
|
|
|
query_groups.set(group, new QueryGroup());
|
|
|
|
|
|
- // TODO continue
|
|
|
+ // Are we ready?
|
|
|
+ if(is_ready) {
|
|
|
+ // Yes, send the query
|
|
|
+ send_group_query(group);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // No, do it when we are ready
|
|
|
+ ready.connect(() => send_group_query(group));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void send_group_query(Bytes group) {
|
|
|
// Construct a query asking for peers in the group
|
|
|
var query = new Query(new ByteComposer().add_byte(QUERY_GROUP).add_bytes(group).to_bytes());
|
|
|
|
|
|
- //
|
|
|
+ // Handler for query answer
|
|
|
+ query.on_answer.connect(answer => {
|
|
|
+ // Add to group
|
|
|
+ query_groups.get(group).add_peer(answer.instance_reference);
|
|
|
+
|
|
|
+ // Are we already connected to this peer?
|
|
|
+ if(reachable_peers.contains(answer.instance_reference)) {
|
|
|
+ // No need to greet, already connected
|
|
|
+ new_group_peer(answer.instance_reference, group);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // When this peer has greeted us, notify the group
|
|
|
+ pending_group_peers.set(answer.instance_reference, group);
|
|
|
+
|
|
|
+ // Inquire
|
|
|
+ muxer.inquire(instance, answer.instance_reference, answer.connection_methods);
|
|
|
+ });
|
|
|
+
|
|
|
+ // Send the query
|
|
|
+ initiate_query(query, default_group);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void initiate_query(Query query, QueryGroup group) {
|
|
|
+ // Save a reference to the query
|
|
|
+ queries.set(query.identifier, query);
|
|
|
+ handled_query_ids.add(query.identifier);
|
|
|
+
|
|
|
+ // Send the query
|
|
|
+ send_query(query, group);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void send_query(Query query, QueryGroup group) {
|
|
|
+ // Does the query have any hops left?
|
|
|
+ if(query.hops > MAX_QUERY_HOPS) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Loop over each instance in the query group
|
|
|
+ foreach (var instance_ref in group) {
|
|
|
+ transport.initialise_stream(instance_ref).established.connect(stream => {
|
|
|
+ // Tell the instance that the data that follows is a query
|
|
|
+ stream.write(new uint8[] { DATA_FOLLOWING_QUERY });
|
|
|
+
|
|
|
+ // Write the query
|
|
|
+ query.serialise(stream);
|
|
|
+
|
|
|
+ // Close the stream
|
|
|
+ stream.close();
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void send_answer(Answer answer) {
|
|
|
+ // Get (and remove) the last item from the path list
|
|
|
+ var send_to = answer.path[answer.path.length-1];
|
|
|
+ answer.path.length --;
|
|
|
+
|
|
|
+ // Don't send answers to queries we haven't received
|
|
|
+ if(!query_response_count.has_key(answer.in_reply_to)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Don't send answers to queries that have exceeded their maximum replies
|
|
|
+ var response_count = query_response_count.get(answer.in_reply_to);
|
|
|
+ if(response_count < 1) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Decrement response counter (stops at 0)
|
|
|
+ query_response_count.set(answer.in_reply_to, response_count - 1);
|
|
|
+
|
|
|
+ // Open a stream with the instance
|
|
|
+ transport.initialise_stream(send_to).established.connect(stream => {
|
|
|
+ // Tell the instance that the data that follows is an answer
|
|
|
+ stream.write(new uint8[] { DATA_FOLLOWING_ANSWER });
|
|
|
+
|
|
|
+ // Write the answer
|
|
|
+ answer.serialise(stream);
|
|
|
+
|
|
|
+ // Close the stream
|
|
|
+ stream.close();
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
|