server.vala 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. using Invercargill;
  2. namespace Astrogate {
  3. private static Configuration config;
  4. private static Tunnel? tunnel;
  5. private static Gee.LinkedList<ProxyConnection> closing_connections;
  6. public static int main(string[] args) {
  7. var config_path = args.length > 1 ? args[1] : "/etc/astrogate.conf";
  8. run.begin(config_path);
  9. new MainLoop().run();
  10. return 0;
  11. }
  12. private async void run(string config_path) {
  13. config = new Configuration();
  14. closing_connections = new Gee.LinkedList<ProxyConnection>();
  15. print("Reading config...\n");
  16. yield config.read_file(File.new_for_commandline_arg(config_path));
  17. var service = new SocketService();
  18. var service_ports = config.entries.group_by<uint16>(s => s.source_port, (a, b) => a == b);
  19. foreach (var address in config.bind_to) {
  20. foreach (var port in service_ports) {
  21. print(@"Binding to $(address):$(port.key) for hosts: $(port.items.to_string(i => i.hostname, ", "))\n");
  22. var socket_addr = new InetSocketAddress(address, port.key);
  23. service.add_address(socket_addr, SocketType.STREAM, SocketProtocol.TCP, null, null);
  24. }
  25. }
  26. if(config.tunnel_enabled) {
  27. tunnel = new Tunnel(config.tunnel_interface, 1500, config.tunnel_bind, config.tunnel_network, config.tunnel_netmask);
  28. tunnel.listen_threaded();
  29. var tunnel_dests = config.entries
  30. .select_many<ConfigurationDestination>(e => e.destinations)
  31. .where(d => d.mode == DestinationType.TUNNEL);
  32. foreach (var dest in tunnel_dests) {
  33. print(@"Whitelisting $(dest.socket_address.address) in the tunnel server\n");
  34. tunnel.whitelist_client(dest.socket_address.address);
  35. }
  36. }
  37. service.incoming.connect((c, o) => {
  38. handle_connection.begin(c);
  39. return false;
  40. });
  41. service.start ();
  42. }
  43. private async void handle_connection(SocketConnection connection) {
  44. try {
  45. var port = ((InetSocketAddress)connection.get_local_address()).port;
  46. var buffer = yield HeaderReader.read_sample(connection.input_stream);
  47. var type = HeaderReader.determine_type(buffer);
  48. if(type == ConnectionType.INVALID) {
  49. print(@"Could not determine type of incoming connection on port $port.\n");
  50. yield connection.close_async();
  51. return;
  52. }
  53. print(@"New $type connection on port $(port)\n");
  54. var name = HeaderReader.read_name(buffer, type);
  55. if(name != null) {
  56. var entry = get_entry(type, (uint16)port, name);
  57. if(entry == null) {
  58. print(@"No matching configuration entry found matching $(type) $(name) $(port).\n");
  59. yield connection.close_async();
  60. return;
  61. }
  62. yield forward_connection(connection, entry, buffer);
  63. }
  64. else {
  65. print(@"Could not determine destination hostname\n");
  66. yield connection.close_async();
  67. return;
  68. }
  69. }
  70. catch(Error e) {
  71. warning(@"Error servicing connection: $(e.message)");
  72. }
  73. }
  74. private async void forward_connection(SocketConnection connection, ConfigurationItem entry, BinaryData initial_buffer) throws Error {
  75. ProxyConnection proxy;
  76. var remote_address = (InetSocketAddress)connection.get_remote_address();
  77. print(@"Proxying connection from $(remote_address) intended for $(entry.hostname):$(entry.source_port).\n");
  78. switch (entry.forward_mode) {
  79. case ForwardMode.ROUND_ROBIN:
  80. proxy = yield connect_round_robin(entry, remote_address);
  81. break;
  82. case ForwardMode.RANDOM:
  83. proxy = yield connect_random(entry, remote_address);
  84. break;
  85. case ForwardMode.FIRST_RESPONDER:
  86. proxy = yield connect_first_responder(entry, remote_address);
  87. break;
  88. default:
  89. assert_not_reached();
  90. }
  91. print(@"Connection from $(connection.get_remote_address().to_string()) forwarded to $(proxy.connection.get_remote_address()).\n");
  92. yield proxy.connection.output_stream.write_async(initial_buffer.to_array(), 0);
  93. yield proxy.connection.output_stream.flush_async(0);
  94. proxy.connection.output_stream.splice_async.begin(connection.input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE | OutputStreamSpliceFlags.CLOSE_TARGET, 0, proxy.cancellation_token, () => proxy.cleanup());
  95. connection.output_stream.splice_async.begin(proxy.connection.input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE | OutputStreamSpliceFlags.CLOSE_TARGET, 0, proxy.cancellation_token, () => proxy.cleanup());
  96. }
  97. private async ProxyConnection connect_round_robin(ConfigurationItem entry, InetSocketAddress source) throws Error {
  98. ProxyConnection? connection = null;
  99. foreach (var address in entry.destinations) {
  100. try{
  101. connection = yield connect_to_destination(source, address);
  102. }
  103. catch (Error e) {
  104. print(@"Failure during round-robin: $(e.message)\n");
  105. }
  106. }
  107. if(connection == null) {
  108. throw new IOError.HOST_UNREACHABLE("Could not reach any specified forwarding host");
  109. }
  110. return connection;
  111. }
  112. private async ProxyConnection connect_random(ConfigurationItem entry, InetSocketAddress source) throws Error {
  113. var items = entry.destinations.to_array();
  114. var i = Random.int_range(0, items.length);
  115. var address = items[i];
  116. var client = new SocketClient();
  117. return yield connect_to_destination(source, address);
  118. }
  119. private async ProxyConnection connect_first_responder(ConfigurationItem entry, InetSocketAddress source) throws Error {
  120. var cancel_token = new Cancellable();
  121. ProxyConnection? connection = null;
  122. var in_flight = 0;
  123. AsyncReadyCallback connection_cb = (obj, res) => {
  124. ProxyConnection conn;
  125. try {
  126. conn = connect_to_destination.end(res);
  127. if(connection == null){
  128. connection = conn;
  129. }
  130. }
  131. catch(IOError.CANCELLED e) {
  132. print("Cancelled late connection\n");
  133. return;
  134. }
  135. catch(Error e) {
  136. print(@"Failure during connect-first (aka. fastest): $(e.message)\n");
  137. }
  138. in_flight--;
  139. connect_first_responder.callback();
  140. };
  141. foreach (var address in entry.destinations) {
  142. in_flight++;
  143. connect_to_destination.begin(source, address, cancel_token, connection_cb);
  144. }
  145. while(in_flight > 0 && connection == null) {
  146. print(@"In flight requests: $(in_flight)\n");
  147. yield;
  148. }
  149. if(connection == null) {
  150. throw new IOError.HOST_UNREACHABLE("Could not reach any specified forwarding host");
  151. }
  152. cancel_token.cancel();
  153. return connection;
  154. }
  155. private ConfigurationItem get_entry(ConnectionType type, uint16 port, string name) {
  156. var entry = config.entries
  157. .where(e => e.protocol == type)
  158. .where(e => e.source_port == port)
  159. .where(e => e.hostname == name)
  160. .first_or_default();
  161. // If there was no HTTP_WELL_KNOWN entry, try fallback to an HTTP entry
  162. if(entry == null && type == ConnectionType.HTTP_WELL_KNOWN) {
  163. return get_entry(ConnectionType.HTTP, port, name);
  164. }
  165. return entry;
  166. }
  167. private async ProxyConnection connect_to_destination(InetSocketAddress source, ConfigurationDestination dest, Cancellable? cancellation_token = null) throws Error {
  168. var client = new SocketClient();
  169. if(dest.mode == DestinationType.INTERNET) {
  170. var connection = yield client.connect_async(dest.socket_address, cancellation_token);
  171. return new ProxyConnection(connection);
  172. }
  173. if(tunnel == null) {
  174. throw new TunnelError.NOT_CONFIGURED(@"Cannot connect to $(dest.socket_address) via tunnel, tunnel is not configured");
  175. }
  176. var mapping = tunnel.map(source.address, dest.socket_address.address, (uint16)dest.socket_address.port);
  177. var local_addr = new InetSocketAddress(tunnel.proxy_internal_ip, mapping.source_port);
  178. client.set_local_address(local_addr);
  179. print(@"TUNNEL CONNECTION: $(local_addr) -> $(new InetSocketAddress(mapping.internal_address, mapping.target_port))");
  180. var connection = yield client.connect_async(new InetSocketAddress(mapping.internal_address, mapping.target_port), cancellation_token);
  181. connection.notify["closed"].connect(() => tunnel.unmap(mapping));
  182. return new ProxyConnection(connection, mapping);
  183. }
  184. private class ProxyConnection {
  185. public SocketConnection connection { get; private set; }
  186. public Cancellable cancellation_token { get; private set; }
  187. private TunnelAddressMapping? mapping;
  188. public ProxyConnection(SocketConnection conn, TunnelAddressMapping? map = null) {
  189. connection = conn;
  190. mapping = map;
  191. cancellation_token = new Cancellable();
  192. }
  193. public void cleanup() {
  194. closing_connections.add(this);
  195. print("Cleanup connection!\n");
  196. cancellation_token.cancel();
  197. connection = null;
  198. GLib.Timeout.add_seconds_once(10, () => {
  199. if(mapping != null) {
  200. tunnel.unmap(mapping);
  201. }
  202. closing_connections.remove(this);
  203. });
  204. }
  205. }
  206. }