using Invercargill; namespace Riddle { public class Server { public const int REGISTRATION_TIMEOUT_US = 600000000; public const int GOSSIP_INTERVAL_US = 2000000; public const string RIDDLE_SERVER_GROUP = "Riddle"; private SocketService service; private Gee.HashMap> registrations = new Gee.HashMap>(); private Gee.HashMap riddles = new Gee.HashMap(); private NameInfoStore names; private Client client; private Fifo gossip = new Fifo(); private Gee.HashSet gossip_idempotency_tokens = new Gee.HashSet(); private Thread gossip_thread; private uint16 port; public Server(string address, uint16 port, NameInfoStore name_store) throws Error { this.names = name_store; this.port = port; var add = new InetAddress.from_string(address); service = new SocketService (); service.add_inet_port (port, null); client = new Client.with_server(new InetSocketAddress(add, port)); client.join(RIDDLE_SERVER_GROUP, port); gossip_thread = new Thread("Gossip", spread_gossip); } public async void start() { service.incoming.connect((c, o) => { handle_connection.begin(c); return false; }); service.start (); } private async void handle_connection(SocketConnection connection) { try { print("New connection\n"); var dis = new DataInputStream(connection.input_stream); var dos = new DataOutputStream(connection.output_stream); var message = yield Message.from_stream_async(dis); print(@"Received $(MessageType.to_string(message.message_type)) message\n"); var reply = service_message(message, (InetSocketAddress)connection.get_remote_address()); reply.send(dos); yield dos.flush_async(); yield connection.close_async(); print("Connection closed.\n"); } catch(Error e) { warning(@"Error servicing connection: $(e.message)"); try{ yield connection.close_async(); } catch(Error e2) { warning(@"Error closing connection after initial error: $(e2.message)"); } } } private bool spread_gossip() { foreach (var goss in gossip) { // What's the goss? var members = get_group_registrations(goss.group).select(r => r.address); print("Spreading gossip!\n"); client.raw_request(goss.message, members, 30000); Thread.usleep(GOSSIP_INTERVAL_US); } return true; } private Message service_message(Message msg, InetSocketAddress origin) throws Error { switch (msg.message_type) { case MessageType.JOIN: return handle_register(msg, origin); case MessageType.LEAVE: return handle_deregister(msg, origin); case MessageType.PROPOGATE: return handle_propogate(msg); case MessageType.WHO_IN: return handle_who_in(msg); case MessageType.WHO_IS: return handle_who_is(msg); case MessageType.RIDDLE: return handle_riddle(msg, origin); case MessageType.CALLBACK: return handle_callback(msg); default: return new Message(MessageType.ERROR, new string[] { "unknown-command" }, new string[0]); } } private Message handle_register(Message msg, InetSocketAddress origin) throws Error { cleanup_registrations(); var port = uint.parse(msg.arguments[1]); add_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), port)); var relevent = get_group_registrations(msg.arguments[0]) .where(r => r.address.address.to_string() != origin.address.to_string() && r.address.port != port); if(relevent.any()) { return new Message(MessageType.SEE_ALSO, new string[0], relevent .select(r => @"$(r.address.address.to_string()) $(r.address.port)") .to_array()); } return new Message(MessageType.OK, new string[0], new string[0]); } private Message handle_deregister(Message msg, InetSocketAddress origin) throws Error { remove_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), uint.parse(msg.arguments[1]))); return new Message(MessageType.OK, new string[0], new string[0]); } private Message handle_propogate(Message msg) throws Error { var name = msg.arguments[0]; var ttl = int.parse(msg.arguments[1]); var token = msg.arguments[2]; var encoded_name_info = msg.items[0]; try { if(name.has_suffix(".rns")) { var name_info = new DecentralisedNameInfo.from_string(encoded_name_info); if(name_info.name != name) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "200", "name-mismatch" }, new string[0]); } if(!add_name_info_if_latest(name_info)) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "205", "outdated" }, new string[0]); } } else { // TODO, implement Certified Names return new Message(MessageType.ERROR, new string[0], new string[0]); } } catch(NameInfoError.BAD_DATA e) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "201", "malformed-information" }, new string[0]); } catch(NameInfoError.INVALID e) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "202", "invalid-information" }, new string[0]); } catch(NameInfoError.NOT_IN_DATE e) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "203", "outside-date-range" }, new string[0]); } if(ttl > 0) { msg.arguments[1] = (ttl - 1).to_string(); gossip_to(RIDDLE_SERVER_GROUP, msg, token); } return new Message(MessageType.OK, new string[0], new string[0]); } private Message handle_who_in(Message msg) throws Error { cleanup_registrations(); var addresses = get_group_registrations(msg.arguments[0]).select(r => @"$(r.address.address.to_string()) $(r.address.port)"); if(addresses.count() == 0) { return new Message(MessageType.UNKNOWN, new string[0], new string[0]); } return new Message(MessageType.ANSWER, new string[0], addresses.to_array()); } private Message handle_who_is(Message msg) throws Error { var name = get_name_info(msg.arguments[0]); if(name == null) { return new Message(MessageType.UNKNOWN, new string[0], new string[0]); } return new Message(MessageType.ANSWER, new string[0], new string[] { name.get_encoded() }); } private Message handle_riddle(Message msg, InetSocketAddress origin) throws Error { var riddle_envelope = new RiddleEnvelope.from_message(msg, origin.address); if(!riddle_envelope.validate_identifier()) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "100", "invalid-identifier" }, new string[0]); } lock(riddles) { if(riddles.has_key(riddle_envelope.identifier)) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "104", "already-received" }, new string[0]); } riddles.set(riddle_envelope.identifier, riddle_envelope); } var to_forward = riddle_envelope.forward(port); if(to_forward != null) { gossip_to(riddle_envelope.group_name, to_forward.to_message(), riddle_envelope.identifier); } // Pass to application to solve then compact when done. riddle_envelope.compact(); return new Message(MessageType.OK, new string[0], new string[0]); } private Message handle_callback(Message msg) throws Error { var solution_envelope = new SolutionEnvelope.from_message(msg); var riddle_metadata = get_riddle_metadata(solution_envelope.identifier); if(riddle_metadata == null) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "105", "riddle-not-found" }, new string[0]); } if(!solution_envelope.verify(riddle_metadata.solution_verification_key)) { return new Message(MessageType.NOT_ACCEPTED, new string[] { "101", "verification-failed" }, new string[0]); } if(riddle_metadata.reply_address == null) { // TODO Verify with Riddle Object return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]); } // TODO Forward to reply address return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]); } private void cleanup_registrations() { lock(registrations) { var copy = new Gee.HashMap>(); foreach (var group in registrations) { var time = new DateTime.now_utc(); copy.set(group.key, group.value.where(r => time.difference(r.timestamp) < REGISTRATION_TIMEOUT_US).to_sequence()); } registrations = copy; } } private void add_registration(string group, InetSocketAddress address) { var reg = new Registration() { address = address, timestamp = new DateTime.now_utc() }; lock(registrations) { if(!registrations.has_key(group)) { registrations.set(group, new Invercargill.Sequence()); } registrations[group].add(reg); } } private void remove_registration(string group, InetSocketAddress address) { lock(registrations) { Invercargill.Sequence group_regs; registrations.unset(group, out group_regs); registrations.set(group, group_regs.where(r => r.address.port != address.port || r.address.address.to_string() != address.address.to_string()).to_sequence()); } } private Enumerable get_group_registrations(string group) { lock(registrations) { var regs = Invercargill.empty(); if(registrations.has_key(group)) { regs = registrations[group].to_sequence(); } return regs; } } private bool add_name_info_if_latest(NameInfo info) { lock(names) { if(!names.is_outdated(info.name, info.effective)) { return false; } names.save_name(info); } return true; } private NameInfo? get_name_info(string name) { lock(names) { if(names.has_name(name)) { return names.get_name(name); } } return null; } private RiddleEnvelope? get_riddle_metadata(string identifier) { lock(riddles) { if(riddles.has_key(identifier)) { return riddles[identifier]; } } return null; } private void gossip_to(string group, Message message, string token) { lock(gossip) { if(gossip_idempotency_tokens.contains(token)){ return; } var g = new Gossip() { message = message, group = group, }; gossip.push(g); gossip_idempotency_tokens.add(token); } } private void gossip_next(string group, Message message, string token) { lock(gossip) { if(gossip_idempotency_tokens.contains(token)){ return; } var g = new Gossip() { message = message, group = group }; gossip.push_start(g); gossip_idempotency_tokens.add(token); } } } private class Registration { public InetSocketAddress address { get; set; } public DateTime timestamp { get; set; } } private class Gossip { public Message message { get; set; } public string group { get; set; } } }