Client.vala 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. using Invercargill;
  2. namespace Riddle {
  3. public class Client : Object {
  4. private Invercargill.Sequence<PeerDiscoverer> discoverers = new Invercargill.Sequence<PeerDiscoverer> ();
  5. private Invercargill.Sequence<InetSocketAddress> servers = new Invercargill.Sequence<InetSocketAddress>();
  6. private Invercargill.Sequence<Message> join_messages = new Invercargill.Sequence<Message>();
  7. private InetSocketAddress? self_server = null;
  8. public Client() {
  9. discoverers.add (new KnownHostDiscoverer());
  10. discoverers.add (new LanDiscoverer());
  11. setup();
  12. }
  13. public Client.with_server(InetSocketAddress server_address) {
  14. var lan_discoverer = new LanDiscoverer();
  15. discoverers.add (new KnownHostDiscoverer());
  16. discoverers.add (lan_discoverer);
  17. self_server = server_address;
  18. setup();
  19. lan_discoverer.advertise(self_server);
  20. }
  21. private void setup() {
  22. foreach (var discoverer in discoverers) {
  23. discoverer.peer_discovered.connect(new_server_found);
  24. discoverer.begin();
  25. }
  26. }
  27. private void new_server_found(InetSocketAddress address) {
  28. if(self_server != null && server_equals(address, self_server)) {
  29. // Ignore self_server address
  30. return;
  31. }
  32. if(servers.any(s => server_equals(s, address))) {
  33. // Ignore already discovered server
  34. return;
  35. }
  36. Enumerable<Message> j_msgs;
  37. lock(join_messages) {
  38. servers.add(address);
  39. j_msgs = join_messages.to_sequence();
  40. }
  41. foreach (var message in j_msgs) {
  42. raw_request(message, single(address), 5000, MessageType.OK);
  43. }
  44. }
  45. public int join(string group, uint port) {
  46. lock(join_messages) {
  47. var msg = new Message(MessageType.JOIN, new string[] { group, port.to_string() }, new string[0]);
  48. var responses = raw_request(msg, servers, 10000, MessageType.OK);
  49. join_messages.add(msg);
  50. return responses.where(r => r.message_type == MessageType.OK).count();
  51. }
  52. }
  53. public int leave(string group, uint port) {
  54. lock(join_messages) {
  55. join_messages = join_messages.where(m => m.arguments[0] != group && m.arguments[1] != port.to_string()).to_sequence();
  56. }
  57. var msg = new Message(MessageType.LEAVE, new string[] { group, port.to_string() }, new string[0]);
  58. var responses = raw_request(msg, servers, 10000, MessageType.OK);
  59. return responses.where(r => r.message_type == MessageType.OK).count();
  60. }
  61. public int propogate(NameInfo info, string? idempotency_token = null) {
  62. var token = idempotency_token ?? GLib.Uuid.string_random();
  63. var msg = new Message(MessageType.PROPOGATE, new string[] { info.name, "10", token }, new string[] { info.get_encoded() });
  64. var responses = raw_request(msg, servers, 10000, MessageType.OK);
  65. return responses.where(r => r.message_type == MessageType.OK).count();
  66. }
  67. public Enumerable<InetSocketAddress> who_is_in(string group) {
  68. var who_in = new Message(MessageType.WHO_IN, new string[] { group }, new string[0]);
  69. var responses = raw_request(who_in, servers, 2000, MessageType.ANSWER);
  70. return responses.select_many<InetSocketAddress>(r => ate(r.items).select<InetSocketAddress>(i => parse_address(i)));
  71. }
  72. public Enumerable<NameInfo> who_is(string domain) {
  73. var who_is = new Message(MessageType.WHO_IS, new string[] { domain }, new string[0]);
  74. var responses = raw_request(who_is, servers, 2000, MessageType.ANSWER);
  75. if(domain.has_suffix(".rns")) {
  76. return responses.select_many<NameInfo>(r => ate(r.items).select<NameInfo>(i => new DecentralisedNameInfo.from_string(i)));
  77. }
  78. return responses.select_many<NameInfo>(r => ate(r.items).select<NameInfo>(i => new CertifiedNameInfo.from_string(i)));
  79. }
  80. public int riddle(RiddleEnvelope riddle, Enumerable<InetSocketAddress> servers) {
  81. var msg = riddle.to_message();
  82. var responses = raw_request(msg, servers, 10000, MessageType.OK);
  83. return responses.where(r => r.message_type == MessageType.OK).count();
  84. }
  85. public Enumerable<Message> raw_request(Message msg, Enumerable<InetSocketAddress> servers, int64 timeout = 10000, MessageType? filter = null) {
  86. var manager = new RequestManager(servers);
  87. manager.start_request(msg);
  88. return manager.get_responses(timeout, filter);
  89. }
  90. public InetSocketAddress? callback(SolutionEnvelope solution, InetSocketAddress server, uint8[] author_signing_key, uint8[] reply_public_key, uint8[] reply_secret_key) throws Error {
  91. var socket = new SocketClient().connect(server);
  92. var dis = new DataInputStream(socket.input_stream);
  93. var dos = new DataOutputStream(socket.output_stream);
  94. solution.to_message().send(dos);
  95. var reply = Message.from_stream(dis);
  96. if(reply.message_type == MessageType.NOT_ACCEPTED) {
  97. warning(@"Got NOT-ACCEPTED response from $(server.address): $(reply.arguments[0]) $(reply.arguments[1])");
  98. return null;
  99. }
  100. var verified_reply = Solution.verify_solved_response(reply, author_signing_key);
  101. if(verified_reply == null) {
  102. return null;
  103. }
  104. return Solution.decrypt_connection_details(verified_reply, reply_public_key, reply_secret_key);
  105. }
  106. public void sync(InetSocketAddress server, NameInfoStore store) throws Error {
  107. var socket = new SocketClient().connect(server);
  108. var dis = new DataInputStream(socket.input_stream);
  109. var dos = new DataOutputStream(socket.output_stream);
  110. var existing = store.get_all().select<string>(i => @"$(i.name) $(i.effective.format_iso8601())");
  111. var request = new Message(MessageType.SYNC, new string[0], existing.to_array());
  112. request.send(dos);
  113. var reply = Message.from_stream(dis);
  114. if(reply.message_type != MessageType.DOMAINS) {
  115. warning(@"Bad response from $(server.address) for SYNC request.");
  116. }
  117. foreach (var item in reply.items) {
  118. var parts = item.split(" ", 2);
  119. var name = parts[0];
  120. var data = parts[1];
  121. NameInfo info;
  122. if(name.has_suffix(".rns")) {
  123. info = new DecentralisedNameInfo.from_string(data);
  124. }
  125. else {
  126. info = new CertifiedNameInfo.from_string(data);
  127. }
  128. store.save_name(info);
  129. }
  130. }
  131. internal static bool server_equals(InetSocketAddress a1, InetSocketAddress a2) {
  132. return (a1.address.equal(a2.address) && a1.port == a2.port);
  133. }
  134. private InetSocketAddress parse_address(string address) {
  135. var parts = address.split(" ", 3);
  136. var isa = new InetSocketAddress.from_string(parts[0], uint.parse(parts[1]));
  137. return isa;
  138. }
  139. }
  140. }