123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- using Invercargill;
- namespace Riddle {
- public class Server {
- public const int REGISTRATION_TIMEOUT_US = 600000000;
- public const int GOSSIP_INTERVAL_US = 2000000;
- public const string RIDDLE_SERVER_GROUP = "Riddle";
- private SocketService service;
- private Gee.HashMap<string, Invercargill.Sequence<Registration>> registrations = new Gee.HashMap<string, Invercargill.Sequence<Registration>>();
- private Gee.HashMap<string, RiddleEnvelope> riddles = new Gee.HashMap<string, RiddleEnvelope>();
- private NameInfoStore names;
- private Client client;
- private Fifo<Gossip> gossip = new Fifo<Gossip>();
- private Gee.HashSet<string> gossip_idempotency_tokens = new Gee.HashSet<string>();
- private Thread<bool> gossip_thread;
- private uint16 port;
-
-
- public Server(string address, uint16 port, NameInfoStore name_store) throws Error {
- this.names = name_store;
- this.port = port;
- var add = new InetAddress.from_string(address);
- service = new SocketService ();
- service.add_inet_port (port, null);
- client = new Client.with_server(new InetSocketAddress(add, port));
- client.join(RIDDLE_SERVER_GROUP, port);
- gossip_thread = new Thread<bool>("Gossip", spread_gossip);
- }
- public async void start() {
- service.incoming.connect((c, o) => {
- handle_connection.begin(c);
- return false;
- });
- service.start ();
- }
- private async void handle_connection(SocketConnection connection) {
- try {
- print("New connection\n");
- var dis = new DataInputStream(connection.input_stream);
- var dos = new DataOutputStream(connection.output_stream);
- var message = yield Message.from_stream_async(dis);
- print(@"Received $(MessageType.to_string(message.message_type)) message\n");
- var reply = service_message(message, (InetSocketAddress)connection.get_remote_address());
- reply.send(dos);
- yield dos.flush_async();
- yield connection.close_async();
- print("Connection closed.\n");
- }
- catch(Error e) {
- warning(@"Error servicing connection: $(e.message)");
- try{
- yield connection.close_async();
- }
- catch(Error e2) {
- warning(@"Error closing connection after initial error: $(e2.message)");
- }
- }
- }
- private bool spread_gossip() {
- foreach (var goss in gossip) {
- // What's the goss?
- var members = get_group_registrations(goss.group).select<InetSocketAddress>(r => r.address);
- print("Spreading gossip!\n");
- client.raw_request(goss.message, members, 30000);
- Thread.usleep(GOSSIP_INTERVAL_US);
- }
- return true;
- }
- private Message service_message(Message msg, InetSocketAddress origin) throws Error {
- switch (msg.message_type) {
- case MessageType.JOIN:
- return handle_register(msg, origin);
- case MessageType.LEAVE:
- return handle_deregister(msg, origin);
- case MessageType.PROPOGATE:
- return handle_propogate(msg);
- case MessageType.WHO_IN:
- return handle_who_in(msg);
- case MessageType.WHO_IS:
- return handle_who_is(msg);
- case MessageType.RIDDLE:
- return handle_riddle(msg, origin);
- case MessageType.CALLBACK:
- return handle_callback(msg);
- default:
- return new Message(MessageType.ERROR, new string[] { "unknown-command" }, new string[0]);
- }
- }
- private Message handle_register(Message msg, InetSocketAddress origin) throws Error {
- cleanup_registrations();
- var port = uint.parse(msg.arguments[1]);
- add_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), port));
- var relevent = get_group_registrations(msg.arguments[0])
- .where(r => r.address.address.to_string() != origin.address.to_string() && r.address.port != port);
- if(relevent.any()) {
- return new Message(MessageType.SEE_ALSO, new string[0], relevent
- .select<string>(r => @"$(r.address.address.to_string()) $(r.address.port)")
- .to_array());
- }
- return new Message(MessageType.OK, new string[0], new string[0]);
- }
- private Message handle_deregister(Message msg, InetSocketAddress origin) throws Error {
- remove_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), uint.parse(msg.arguments[1])));
- return new Message(MessageType.OK, new string[0], new string[0]);
- }
- private Message handle_propogate(Message msg) throws Error {
- var name = msg.arguments[0];
- var ttl = int.parse(msg.arguments[1]);
- var token = msg.arguments[2];
- var encoded_name_info = msg.items[0];
- try {
- if(name.has_suffix(".rns")) {
- var name_info = new DecentralisedNameInfo.from_string(encoded_name_info);
- if(name_info.name != name) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "200", "name-mismatch" }, new string[0]);
- }
- if(!add_name_info_if_latest(name_info)) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "205", "outdated" }, new string[0]);
- }
- }
- else {
- // TODO, implement Certified Names
- return new Message(MessageType.ERROR, new string[0], new string[0]);
- }
- }
- catch(NameInfoError.BAD_DATA e) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "201", "malformed-information" }, new string[0]);
- }
- catch(NameInfoError.INVALID e) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "202", "invalid-information" }, new string[0]);
- }
- catch(NameInfoError.NOT_IN_DATE e) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "203", "outside-date-range" }, new string[0]);
- }
- if(ttl > 0) {
- msg.arguments[1] = (ttl - 1).to_string();
- gossip_to(RIDDLE_SERVER_GROUP, msg, token);
- }
- return new Message(MessageType.OK, new string[0], new string[0]);
- }
- private Message handle_who_in(Message msg) throws Error {
- cleanup_registrations();
- var addresses = get_group_registrations(msg.arguments[0]).select<string>(r => @"$(r.address.address.to_string()) $(r.address.port)");
- if(addresses.count() == 0) {
- return new Message(MessageType.UNKNOWN, new string[0], new string[0]);
- }
- return new Message(MessageType.ANSWER, new string[0], addresses.to_array());
- }
- private Message handle_who_is(Message msg) throws Error {
- var name = get_name_info(msg.arguments[0]);
- if(name == null) {
- return new Message(MessageType.UNKNOWN, new string[0], new string[0]);
- }
- return new Message(MessageType.ANSWER, new string[0], new string[] { name.get_encoded() });
- }
- private Message handle_riddle(Message msg, InetSocketAddress origin) throws Error {
- var riddle_envelope = new RiddleEnvelope.from_message(msg, origin.address);
- if(!riddle_envelope.validate_identifier()) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "100", "invalid-identifier" }, new string[0]);
- }
- lock(riddles) {
- if(riddles.has_key(riddle_envelope.identifier)) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "104", "already-received" }, new string[0]);
- }
- riddles.set(riddle_envelope.identifier, riddle_envelope);
- }
-
- var to_forward = riddle_envelope.forward(port);
- if(to_forward != null) {
- gossip_to(riddle_envelope.group_name, to_forward.to_message(), riddle_envelope.identifier);
- }
- // Pass to application to solve then compact when done.
- riddle_envelope.compact();
- return new Message(MessageType.OK, new string[0], new string[0]);
- }
- private Message handle_callback(Message msg) throws Error {
- var solution_envelope = new SolutionEnvelope.from_message(msg);
- var riddle_metadata = get_riddle_metadata(solution_envelope.identifier);
- if(riddle_metadata == null) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "105", "riddle-not-found" }, new string[0]);
- }
- if(!solution_envelope.verify(riddle_metadata.solution_verification_key)) {
- return new Message(MessageType.NOT_ACCEPTED, new string[] { "101", "verification-failed" }, new string[0]);
- }
- if(riddle_metadata.reply_address == null) {
- // TODO Verify with Riddle Object
- return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]);
- }
- // TODO Forward to reply address
- return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]);
- }
- private void cleanup_registrations() {
- lock(registrations) {
- var copy = new Gee.HashMap<string, Invercargill.Sequence<Registration>>();
- foreach (var group in registrations) {
- var time = new DateTime.now_utc();
- copy.set(group.key, group.value.where(r => time.difference(r.timestamp) < REGISTRATION_TIMEOUT_US).to_sequence());
- }
- registrations = copy;
- }
- }
- private void add_registration(string group, InetSocketAddress address) {
- var reg = new Registration() {
- address = address,
- timestamp = new DateTime.now_utc()
- };
- lock(registrations) {
- if(!registrations.has_key(group)) {
- registrations.set(group, new Invercargill.Sequence<Registration>());
- }
- registrations[group].add(reg);
- }
- }
- private void remove_registration(string group, InetSocketAddress address) {
- lock(registrations) {
- Invercargill.Sequence<Registration> group_regs;
- registrations.unset(group, out group_regs);
- registrations.set(group, group_regs.where(r => r.address.port != address.port || r.address.address.to_string() != address.address.to_string()).to_sequence());
- }
- }
- private Enumerable<Registration> get_group_registrations(string group) {
- lock(registrations) {
- var regs = Invercargill.empty<Registration>();
- if(registrations.has_key(group)) {
- regs = registrations[group].to_sequence();
- }
- return regs;
- }
- }
- private bool add_name_info_if_latest(NameInfo info) {
- lock(names) {
- if(!names.is_outdated(info.name, info.effective)) {
- return false;
- }
- names.save_name(info);
- }
- return true;
- }
- private NameInfo? get_name_info(string name) {
- lock(names) {
- if(names.has_name(name)) {
- return names.get_name(name);
- }
- }
- return null;
- }
- private RiddleEnvelope? get_riddle_metadata(string identifier) {
- lock(riddles) {
- if(riddles.has_key(identifier)) {
- return riddles[identifier];
- }
- }
- return null;
- }
- private void gossip_to(string group, Message message, string token) {
- lock(gossip) {
- if(gossip_idempotency_tokens.contains(token)){
- return;
- }
- var g = new Gossip() {
- message = message,
- group = group,
- };
- gossip.push(g);
- gossip_idempotency_tokens.add(token);
- }
- }
- private void gossip_next(string group, Message message, string token) {
- lock(gossip) {
- if(gossip_idempotency_tokens.contains(token)){
- return;
- }
- var g = new Gossip() {
- message = message,
- group = group
- };
- gossip.push_start(g);
- gossip_idempotency_tokens.add(token);
- }
- }
- }
- private class Registration {
- public InetSocketAddress address { get; set; }
- public DateTime timestamp { get; set; }
- }
- private class Gossip {
- public Message message { get; set; }
- public string group { get; set; }
- }
-
- }
|