|
@@ -2,19 +2,33 @@ using Invercargill;
|
|
|
|
|
|
namespace Riddle {
|
|
|
|
|
|
- public class Client {
|
|
|
+ public class Client : Object {
|
|
|
|
|
|
- private Invercargill.Sequence<PeerDiscoverer> discoverers;
|
|
|
- private Invercargill.Sequence<InetSocketAddress> servers;
|
|
|
- private InetSocketAddress? self_server;
|
|
|
- private SocketClient socket_client;
|
|
|
+ private Invercargill.Sequence<PeerDiscoverer> discoverers = new Invercargill.Sequence<PeerDiscoverer> ();
|
|
|
+ private Invercargill.Sequence<InetSocketAddress> servers = new Invercargill.Sequence<InetSocketAddress>();
|
|
|
+ private Invercargill.Sequence<Message> join_messages = new Invercargill.Sequence<Message>();
|
|
|
+ private InetSocketAddress? self_server = null;
|
|
|
+
|
|
|
+
|
|
|
|
|
|
public Client() {
|
|
|
- socket_client = new SocketClient();
|
|
|
- discoverers = new Invercargill.Sequence<PeerDiscoverer> ();
|
|
|
discoverers.add (new KnownHostDiscoverer());
|
|
|
discoverers.add (new LanDiscoverer());
|
|
|
+ setup();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Client.with_server(InetSocketAddress server_address) {
|
|
|
+ var lan_discoverer = new LanDiscoverer();
|
|
|
+ discoverers.add (new KnownHostDiscoverer());
|
|
|
+ discoverers.add (lan_discoverer);
|
|
|
+ self_server = server_address;
|
|
|
+
|
|
|
+ setup();
|
|
|
|
|
|
+ lan_discoverer.advertise(self_server);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setup() {
|
|
|
foreach (var discoverer in discoverers) {
|
|
|
discoverer.peer_discovered.connect(new_server_found);
|
|
|
discoverer.begin();
|
|
@@ -32,36 +46,52 @@ namespace Riddle {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- servers.add(address);
|
|
|
+ Enumerable<Message> j_msgs;
|
|
|
+ lock(join_messages) {
|
|
|
+ servers.add(address);
|
|
|
+ j_msgs = join_messages.to_sequence();
|
|
|
+ }
|
|
|
+
|
|
|
+ foreach (var message in j_msgs) {
|
|
|
+ raw_request(message, single(address), 5000, MessageType.OK);
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public int join(string group, uint port) {
|
|
|
- var msg = new Message(MessageType.JOIN, new string[] { group, port.to_string() }, new string[0]);
|
|
|
- var responses = send_message_to_servers(msg);
|
|
|
- return responses.where(r => r.message_type == MessageType.OK).count();
|
|
|
+ lock(join_messages) {
|
|
|
+ var msg = new Message(MessageType.JOIN, new string[] { group, port.to_string() }, new string[0]);
|
|
|
+ var responses = raw_request(msg, servers, 10000, MessageType.OK);
|
|
|
+ join_messages.add(msg);
|
|
|
+ return responses.where(r => r.message_type == MessageType.OK).count();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public int leave(string group, uint port) {
|
|
|
+ lock(join_messages) {
|
|
|
+ join_messages = join_messages.where(m => m.arguments[0] != group && m.arguments[1] != port.to_string()).to_sequence();
|
|
|
+ }
|
|
|
var msg = new Message(MessageType.LEAVE, new string[] { group, port.to_string() }, new string[0]);
|
|
|
- var responses = send_message_to_servers(msg);
|
|
|
+ var responses = raw_request(msg, servers, 10000, MessageType.OK);
|
|
|
return responses.where(r => r.message_type == MessageType.OK).count();
|
|
|
}
|
|
|
|
|
|
public int propogate(NameInfo info) {
|
|
|
var msg = new Message(MessageType.PROPOGATE, new string[] { info.name, "10" }, new string[] { info.get_encoded() });
|
|
|
- var responses = send_message_to_servers(msg);
|
|
|
+ var responses = raw_request(msg, servers, 10000, MessageType.OK);
|
|
|
return responses.where(r => r.message_type == MessageType.OK).count();
|
|
|
}
|
|
|
|
|
|
public Enumerable<InetSocketAddress> who_is_in(string group) {
|
|
|
var who_in = new Message(MessageType.WHO_IN, new string[] { group }, new string[0]);
|
|
|
- var responses = send_message_to_servers(who_in);
|
|
|
+ var responses = raw_request(who_in, servers, 2000, MessageType.ANSWER);
|
|
|
return responses.select_many<InetSocketAddress>(r => ate(r.items).select<InetSocketAddress>(i => parse_address(i)));
|
|
|
}
|
|
|
|
|
|
public Enumerable<NameInfo> who_is(string domain) {
|
|
|
var who_is = new Message(MessageType.WHO_IS, new string[] { domain }, new string[0]);
|
|
|
- var responses = send_message_to_servers(who_is);
|
|
|
+ var responses = raw_request(who_is, servers, 2000, MessageType.ANSWER);
|
|
|
+
|
|
|
if(domain.has_suffix(".rns")) {
|
|
|
return responses.select_many<NameInfo>(r => ate(r.items).select<NameInfo>(i => new DecentralisedNameInfo.from_string(i)));
|
|
|
}
|
|
@@ -69,14 +99,20 @@ namespace Riddle {
|
|
|
return responses.select_many<NameInfo>(r => ate(r.items).select<NameInfo>(i => new CertifiedNameInfo.from_string(i)));
|
|
|
}
|
|
|
|
|
|
- public int riddle(RiddleEnvelope riddle) {
|
|
|
+ public int riddle(RiddleEnvelope riddle, Enumerable<InetSocketAddress> servers) {
|
|
|
var msg = riddle.to_message();
|
|
|
- var responses = send_message_to_servers(msg);
|
|
|
+ var responses = raw_request(msg, servers, 10000, MessageType.OK);
|
|
|
return responses.where(r => r.message_type == MessageType.OK).count();
|
|
|
}
|
|
|
|
|
|
+ public Enumerable<Message> raw_request(Message msg, Enumerable<InetSocketAddress> servers, int64 timeout = 10000, MessageType? filter = null) {
|
|
|
+ var manager = new RequestManager(servers);
|
|
|
+ manager.start_request(msg);
|
|
|
+ return manager.get_responses(timeout, filter);
|
|
|
+ }
|
|
|
+
|
|
|
public InetSocketAddress? callback(SolutionEnvelope solution, InetSocketAddress server, uint8[] author_signing_key, uint8[] reply_public_key, uint8[] reply_secret_key) throws Error {
|
|
|
- var socket = socket_client.connect(server);
|
|
|
+ var socket = new SocketClient().connect(server);
|
|
|
var dis = new DataInputStream(socket.input_stream);
|
|
|
var dos = new DataOutputStream(socket.output_stream);
|
|
|
|
|
@@ -101,28 +137,6 @@ namespace Riddle {
|
|
|
return (a1.address.equal(a2.address) && a1.port == a2.port);
|
|
|
}
|
|
|
|
|
|
- private Invercargill.Sequence<Message> send_message_to_servers(Message message) {
|
|
|
- return servers.parallel_select<Message?>(server => {
|
|
|
- try {
|
|
|
- var socket = socket_client.connect(server);
|
|
|
- var dis = new DataInputStream(socket.input_stream);
|
|
|
- var dos = new DataOutputStream(socket.output_stream);
|
|
|
-
|
|
|
- message.send(dos);
|
|
|
- var reply = Message.from_stream(dis);
|
|
|
-
|
|
|
- if(reply.message_type == MessageType.NOT_ACCEPTED) {
|
|
|
- warning(@"Got NOT-ACCEPTED response from $(server.address): $(reply.arguments[0]) $(reply.arguments[1])");
|
|
|
- }
|
|
|
- return reply;
|
|
|
- }
|
|
|
- catch(Error e) {
|
|
|
- warning(@"Client message send failed: $(e.message)");
|
|
|
- return null;
|
|
|
- }
|
|
|
- }).where(m => m != null).to_sequence();
|
|
|
- }
|
|
|
-
|
|
|
private InetSocketAddress parse_address(string address) {
|
|
|
var parts = address.split(" ", 3);
|
|
|
var isa = new InetSocketAddress.from_string(parts[0], uint.parse(parts[1]));
|