IngressSession.vala 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. using LibPeer.Protocols.Mx2;
  2. using LibPeer.Protocols.Stp.Segments;
  3. using LibPeer.Util;
  4. using Gee;
  5. namespace LibPeer.Protocols.Stp.Sessions {
  6. public class IngressSession : Session {
  7. private uint64 next_expected_sequence_number = 0;
  8. private ConcurrentHashMap<uint64?, Payload> reconstruction = new ConcurrentHashMap<uint64?, Payload>(i => (uint)i, (a, b) => a == b);
  9. public signal void incoming_app_data(uint8[] data);
  10. public IngressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
  11. base(target, session_id, ping);
  12. open = true;
  13. }
  14. public override void process_segment(Segment segment) {
  15. // We have received a segment from the muxer
  16. // Determine the segment type
  17. if(segment is Payload) {
  18. handle_payload((Payload)segment);
  19. return;
  20. }
  21. if(segment is Control) {
  22. handle_control((Control)segment);
  23. return;
  24. }
  25. }
  26. private void handle_payload(Payload segment) {
  27. // TODO: Feature handling
  28. // Is this a packet we are interested in?
  29. if(next_expected_sequence_number <= segment.sequence_number) {
  30. // Add to reconstruction dictionary
  31. reconstruction.set(segment.sequence_number, segment);
  32. // Is this the next expected sequence number?
  33. if(next_expected_sequence_number == segment.sequence_number) {
  34. // Reconstruct the data
  35. incoming_app_data(complete_reconstruction());
  36. }
  37. }
  38. // Send an acknowledgement to the segment
  39. var acknowledgement = new Acknowledgement(segment);
  40. queue_segment(acknowledgement);
  41. }
  42. private void handle_control(Control segment) {
  43. // We have a control segment, what is it telling us?
  44. switch(segment.command) {
  45. case ControlCommand.COMPLETE:
  46. close_session("The remote peer completed the stream");
  47. break;
  48. case ControlCommand.ABORT:
  49. close_session("The stream was aborted by the remote peer");
  50. break;
  51. case ControlCommand.NOT_CONFIGURED:
  52. close_session("The remote peer claims to not know about this session");
  53. break;
  54. }
  55. }
  56. private uint8[] complete_reconstruction() {
  57. // Create a byte composer
  58. var composer = new Util.ByteComposer();
  59. // Start a counter
  60. uint64 sequence = next_expected_sequence_number;
  61. // Loop until we don't have anything to reconstruct
  62. for (;reconstruction.has_key(sequence); sequence++) {
  63. // Get and remove the segment from the dictionary
  64. Payload segment;
  65. reconstruction.unset(sequence, out segment);
  66. // Compose
  67. composer.add_byte_array(segment.data);
  68. }
  69. // print(@"$(next_expected_sequence_number) => $(sequence)\n");
  70. // Sequence is now the next expected sequence number
  71. next_expected_sequence_number = sequence;
  72. // Return the composed reconstruction
  73. return composer.to_byte_array();
  74. }
  75. }
  76. }