NetSim.vala 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. using LibPeer.Networks;
  2. using LibPeer.Protocols.Mx2;
  3. using LibPeer.Util;
  4. namespace LibPeer.Networks.Simulation {
  5. public class NetSim : Network {
  6. private Conduit conduit;
  7. private int count;
  8. public Bytes identifier;
  9. public int delay;
  10. public int latency;
  11. public float loss_frac;
  12. private bool up = false;
  13. private AsyncQueue<QueueCommand<Packet>> packet_queue = new AsyncQueue<QueueCommand<Packet>>();
  14. private Thread<bool> worker_thread;
  15. public NetSim(Conduit conduit, uint8[] uuid, int count, int delay, int latency, float loss_frac) {
  16. this.conduit = conduit;
  17. this.count = count;
  18. this.identifier = new Bytes(uuid);
  19. this.delay = delay;
  20. this.latency = latency;
  21. this.loss_frac = loss_frac;
  22. }
  23. public override GLib.Bytes get_network_identifier () {
  24. return new Bytes({'N', 'e', 't', 'S', 'i', 'm'});
  25. }
  26. public override void bring_up() throws IOError, Error {
  27. if (up) {
  28. return;
  29. }
  30. up = true;
  31. ThreadFunc<bool> queue_worker = () => {
  32. while (true) {
  33. QueueCommand<Packet> command = packet_queue.pop();
  34. if(command.command == QueueControl.Stop) {
  35. return true;
  36. }
  37. assert(command.command == QueueControl.Payload);
  38. // Delay
  39. Posix.usleep(delay * 1000);
  40. // Drop
  41. if (Random.int_range(1, 100) == loss_frac * 100) {
  42. continue;
  43. }
  44. // Create a stream
  45. var stream = new MemoryInputStream.from_bytes(command.payload.data);
  46. // Create ane emit receiption
  47. var receiption = new Receiption(stream, command.payload.peer_info, this);
  48. incoming_receiption(receiption);
  49. }
  50. };
  51. worker_thread = new Thread<bool>(@"NetSim-iface-$(count)", queue_worker);
  52. }
  53. public override void bring_down() throws IOError, Error {
  54. if(!up) {
  55. return;
  56. }
  57. up = false;
  58. this.packet_queue.push_front(new QueueCommand<Packet>.stop());
  59. worker_thread.join();
  60. }
  61. public override void advertise(InstanceReference instance_reference) throws IOError, Error {
  62. var advertisement = new Advertisement(instance_reference, new NetSimPeerInfo(identifier));
  63. conduit.advertise(identifier, advertisement);
  64. }
  65. public override void send(Bytes bytes, PeerInfo peer_info) throws IOError, Error {
  66. NetSimPeerInfo info = (NetSimPeerInfo)peer_info;
  67. conduit.send_packet(this.identifier, new Bytes(info.identifier), bytes);
  68. }
  69. internal void receive_data(Bytes origin, Bytes data) {
  70. // Create the peer info
  71. var peer_info = new NetSimPeerInfo(origin);
  72. // Create the packet
  73. var packet = new Packet(peer_info, data);
  74. // Add packet to queue
  75. packet_queue.push(new QueueCommand<Packet>.with_payload(packet));
  76. }
  77. internal void receive_advertisment(Advertisement advertisement) {
  78. incoming_advertisment(advertisement);
  79. }
  80. }
  81. }