Server.vala 14 KB


  1. using Invercargill;
  2. namespace Riddle {
  3. public class Server {
  4. public const int REGISTRATION_TIMEOUT_US = 600000000;
  5. public const int GOSSIP_INTERVAL_US = 2000000;
  6. public const string RIDDLE_SERVER_GROUP = "Riddle";
  7. private SocketService service;
  8. private Gee.HashMap<string, Invercargill.Sequence<Registration>> registrations = new Gee.HashMap<string, Invercargill.Sequence<Registration>>();
  9. private Gee.HashMap<string, RiddleEnvelope> riddles = new Gee.HashMap<string, RiddleEnvelope>();
  10. private NameInfoStore names;
  11. private Client client;
  12. private Fifo<Gossip> gossip = new Fifo<Gossip>();
  13. private Gee.HashSet<string> gossip_idempotency_tokens = new Gee.HashSet<string>();
  14. private Thread<bool> gossip_thread;
  15. private uint16 port;
  16. public Server(string address, uint16 port, NameInfoStore name_store) throws Error {
  17. this.names = name_store;
  18. this.port = port;
  19. var add = new InetAddress.from_string(address);
  20. service = new SocketService ();
  21. service.add_inet_port (port, null);
  22. client = new Client.with_server(new InetSocketAddress(add, port));
  23. client.join(RIDDLE_SERVER_GROUP, port);
  24. gossip_thread = new Thread<bool>("Gossip", spread_gossip);
  25. }
  26. public async void start() {
  27. service.incoming.connect((c, o) => {
  28. handle_connection.begin(c);
  29. return false;
  30. });
  31. service.start ();
  32. }
  33. private async void handle_connection(SocketConnection connection) {
  34. try {
  35. print("New connection\n");
  36. var dis = new DataInputStream(connection.input_stream);
  37. var dos = new DataOutputStream(connection.output_stream);
  38. var message = yield Message.from_stream_async(dis);
  39. print(@"Received $(MessageType.to_string(message.message_type)) message\n");
  40. var reply = service_message(message, (InetSocketAddress)connection.get_remote_address());
  41. reply.send(dos);
  42. yield dos.flush_async();
  43. yield connection.close_async();
  44. print("Connection closed.\n");
  45. }
  46. catch(Error e) {
  47. warning(@"Error servicing connection: $(e.message)");
  48. try{
  49. yield connection.close_async();
  50. }
  51. catch(Error e2) {
  52. warning(@"Error closing connection after initial error: $(e2.message)");
  53. }
  54. }
  55. }
  56. private bool spread_gossip() {
  57. foreach (var goss in gossip) {
  58. // What's the goss?
  59. var members = get_group_registrations(goss.group).select<InetSocketAddress>(r => r.address);
  60. print("Spreading gossip!\n");
  61. client.raw_request(goss.message, members, 30000);
  62. Thread.usleep(GOSSIP_INTERVAL_US);
  63. }
  64. return true;
  65. }
  66. private Message service_message(Message msg, InetSocketAddress origin) throws Error {
  67. switch (msg.message_type) {
  68. case MessageType.JOIN:
  69. return handle_register(msg, origin);
  70. case MessageType.LEAVE:
  71. return handle_deregister(msg, origin);
  72. case MessageType.PROPOGATE:
  73. return handle_propogate(msg);
  74. case MessageType.WHO_IN:
  75. return handle_who_in(msg);
  76. case MessageType.WHO_IS:
  77. return handle_who_is(msg);
  78. case MessageType.RIDDLE:
  79. return handle_riddle(msg, origin);
  80. case MessageType.CALLBACK:
  81. return handle_callback(msg);
  82. default:
  83. return new Message(MessageType.ERROR, new string[] { "unknown-command" }, new string[0]);
  84. }
  85. }
  86. private Message handle_register(Message msg, InetSocketAddress origin) throws Error {
  87. cleanup_registrations();
  88. var port = uint.parse(msg.arguments[1]);
  89. add_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), port));
  90. var relevent = get_group_registrations(msg.arguments[0])
  91. .where(r => r.address.address.to_string() != origin.address.to_string() && r.address.port != port);
  92. if(relevent.any()) {
  93. return new Message(MessageType.SEE_ALSO, new string[0], relevent
  94. .select<string>(r => @"$(r.address.address.to_string()) $(r.address.port)")
  95. .to_array());
  96. }
  97. return new Message(MessageType.OK, new string[0], new string[0]);
  98. }
  99. private Message handle_deregister(Message msg, InetSocketAddress origin) throws Error {
  100. remove_registration(msg.arguments[0], new InetSocketAddress.from_string(origin.address.to_string(), uint.parse(msg.arguments[1])));
  101. return new Message(MessageType.OK, new string[0], new string[0]);
  102. }
  103. private Message handle_propogate(Message msg) throws Error {
  104. var name = msg.arguments[0];
  105. var ttl = int.parse(msg.arguments[1]);
  106. var token = msg.arguments[2];
  107. var encoded_name_info = msg.items[0];
  108. try {
  109. if(name.has_suffix(".rns")) {
  110. var name_info = new DecentralisedNameInfo.from_string(encoded_name_info);
  111. if(name_info.name != name) {
  112. return new Message(MessageType.NOT_ACCEPTED, new string[] { "200", "name-mismatch" }, new string[0]);
  113. }
  114. if(!add_name_info_if_latest(name_info)) {
  115. return new Message(MessageType.NOT_ACCEPTED, new string[] { "205", "outdated" }, new string[0]);
  116. }
  117. }
  118. else {
  119. // TODO, implement Certified Names
  120. return new Message(MessageType.ERROR, new string[0], new string[0]);
  121. }
  122. }
  123. catch(NameInfoError.BAD_DATA e) {
  124. return new Message(MessageType.NOT_ACCEPTED, new string[] { "201", "malformed-information" }, new string[0]);
  125. }
  126. catch(NameInfoError.INVALID e) {
  127. return new Message(MessageType.NOT_ACCEPTED, new string[] { "202", "invalid-information" }, new string[0]);
  128. }
  129. catch(NameInfoError.NOT_IN_DATE e) {
  130. return new Message(MessageType.NOT_ACCEPTED, new string[] { "203", "outside-date-range" }, new string[0]);
  131. }
  132. if(ttl > 0) {
  133. msg.arguments[1] = (ttl - 1).to_string();
  134. gossip_to(RIDDLE_SERVER_GROUP, msg, token);
  135. }
  136. return new Message(MessageType.OK, new string[0], new string[0]);
  137. }
  138. private Message handle_who_in(Message msg) throws Error {
  139. cleanup_registrations();
  140. var addresses = get_group_registrations(msg.arguments[0]).select<string>(r => @"$(r.address.address.to_string()) $(r.address.port)");
  141. if(addresses.count() == 0) {
  142. return new Message(MessageType.UNKNOWN, new string[0], new string[0]);
  143. }
  144. return new Message(MessageType.ANSWER, new string[0], addresses.to_array());
  145. }
  146. private Message handle_who_is(Message msg) throws Error {
  147. var name = get_name_info(msg.arguments[0]);
  148. if(name == null) {
  149. return new Message(MessageType.UNKNOWN, new string[0], new string[0]);
  150. }
  151. return new Message(MessageType.ANSWER, new string[0], new string[] { name.get_encoded() });
  152. }
  153. private Message handle_riddle(Message msg, InetSocketAddress origin) throws Error {
  154. var riddle_envelope = new RiddleEnvelope.from_message(msg, origin.address);
  155. if(!riddle_envelope.validate_identifier()) {
  156. return new Message(MessageType.NOT_ACCEPTED, new string[] { "100", "invalid-identifier" }, new string[0]);
  157. }
  158. lock(riddles) {
  159. if(riddles.has_key(riddle_envelope.identifier)) {
  160. return new Message(MessageType.NOT_ACCEPTED, new string[] { "104", "already-received" }, new string[0]);
  161. }
  162. riddles.set(riddle_envelope.identifier, riddle_envelope);
  163. }
  164. var to_forward = riddle_envelope.forward(port);
  165. if(to_forward != null) {
  166. gossip_to(riddle_envelope.group_name, to_forward.to_message(), riddle_envelope.identifier);
  167. }
  168. // Pass to application to solve then compact when done.
  169. riddle_envelope.compact();
  170. return new Message(MessageType.OK, new string[0], new string[0]);
  171. }
  172. private Message handle_callback(Message msg) throws Error {
  173. var solution_envelope = new SolutionEnvelope.from_message(msg);
  174. var riddle_metadata = get_riddle_metadata(solution_envelope.identifier);
  175. if(riddle_metadata == null) {
  176. return new Message(MessageType.NOT_ACCEPTED, new string[] { "105", "riddle-not-found" }, new string[0]);
  177. }
  178. if(!solution_envelope.verify(riddle_metadata.solution_verification_key)) {
  179. return new Message(MessageType.NOT_ACCEPTED, new string[] { "101", "verification-failed" }, new string[0]);
  180. }
  181. if(riddle_metadata.reply_address == null) {
  182. // TODO Verify with Riddle Object
  183. return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]);
  184. }
  185. // TODO Forward to reply address
  186. return new Message(MessageType.ERROR, new string[] { "not-implemented" }, new string[0]);
  187. }
  188. private void cleanup_registrations() {
  189. lock(registrations) {
  190. var copy = new Gee.HashMap<string, Invercargill.Sequence<Registration>>();
  191. foreach (var group in registrations) {
  192. var time = new DateTime.now_utc();
  193. copy.set(group.key, group.value.where(r => time.difference(r.timestamp) < REGISTRATION_TIMEOUT_US).to_sequence());
  194. }
  195. registrations = copy;
  196. }
  197. }
  198. private void add_registration(string group, InetSocketAddress address) {
  199. var reg = new Registration() {
  200. address = address,
  201. timestamp = new DateTime.now_utc()
  202. };
  203. lock(registrations) {
  204. if(!registrations.has_key(group)) {
  205. registrations.set(group, new Invercargill.Sequence<Registration>());
  206. }
  207. registrations[group].add(reg);
  208. }
  209. }
  210. private void remove_registration(string group, InetSocketAddress address) {
  211. lock(registrations) {
  212. Invercargill.Sequence<Registration> group_regs;
  213. registrations.unset(group, out group_regs);
  214. registrations.set(group, group_regs.where(r => r.address.port != address.port || r.address.address.to_string() != address.address.to_string()).to_sequence());
  215. }
  216. }
  217. private Enumerable<Registration> get_group_registrations(string group) {
  218. lock(registrations) {
  219. var regs = Invercargill.empty<Registration>();
  220. if(registrations.has_key(group)) {
  221. regs = registrations[group].to_sequence();
  222. }
  223. return regs;
  224. }
  225. }
  226. private bool add_name_info_if_latest(NameInfo info) {
  227. lock(names) {
  228. if(!names.is_outdated(info.name, info.effective)) {
  229. return false;
  230. }
  231. names.save_name(info);
  232. }
  233. return true;
  234. }
  235. private NameInfo? get_name_info(string name) {
  236. lock(names) {
  237. if(names.has_name(name)) {
  238. return names.get_name(name);
  239. }
  240. }
  241. return null;
  242. }
  243. private RiddleEnvelope? get_riddle_metadata(string identifier) {
  244. lock(riddles) {
  245. if(riddles.has_key(identifier)) {
  246. return riddles[identifier];
  247. }
  248. }
  249. return null;
  250. }
  251. private void gossip_to(string group, Message message, string token) {
  252. lock(gossip) {
  253. if(gossip_idempotency_tokens.contains(token)){
  254. return;
  255. }
  256. var g = new Gossip() {
  257. message = message,
  258. group = group,
  259. };
  260. gossip.push(g);
  261. gossip_idempotency_tokens.add(token);
  262. }
  263. }
  264. private void gossip_next(string group, Message message, string token) {
  265. lock(gossip) {
  266. if(gossip_idempotency_tokens.contains(token)){
  267. return;
  268. }
  269. var g = new Gossip() {
  270. message = message,
  271. group = group
  272. };
  273. gossip.push_start(g);
  274. gossip_idempotency_tokens.add(token);
  275. }
  276. }
  277. }
  278. private class Registration {
  279. public InetSocketAddress address { get; set; }
  280. public DateTime timestamp { get; set; }
  281. }
  282. private class Gossip {
  283. public Message message { get; set; }
  284. public string group { get; set; }
  285. }
  286. }