InputStream.vala 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. using LibPeer.Protocols.Stp.Sessions;
  2. using LibPeer.Protocols.Mx2;
  3. namespace LibPeer.Protocols.Stp.Streams {
  4. public class StpInputStream : InputStream {
  5. private IngressSession session;
  6. private uint8[] unread_data;
  7. private Cond data_cond = Cond();
  8. private Mutex data_mutex = Mutex();
  9. public InstanceReference target { get { return session.target; }}
  10. public uint8[] session_id { get { return session.identifier; }}
  11. public StpInputStream(IngressSession session) {
  12. this.session = session;
  13. session.incoming_app_data.connect(handle_data);
  14. }
  15. private void handle_data(uint8[] data) {
  16. data_mutex.lock();
  17. unread_data = new Util.ByteComposer().add_byte_array(unread_data).add_byte_array(data).to_byte_array();
  18. data_cond.broadcast();
  19. data_mutex.unlock();
  20. }
  21. public override bool close (GLib.Cancellable? cancellable) {
  22. session.close();
  23. return true;
  24. }
  25. public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
  26. data_mutex.lock();
  27. while(unread_data.length < buffer.length) {
  28. data_cond.wait(data_mutex);
  29. }
  30. for(int i = 0; i < buffer.length; i++) {
  31. buffer[i] = unread_data[i];
  32. }
  33. unread_data = unread_data[buffer.length:unread_data.length];
  34. data_mutex.unlock();
  35. return buffer.length;
  36. }
  37. }
  38. }