OutputStream.vala 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. using LibPeer.Protocols.Stp.Sessions;
  2. using LibPeer.Protocols.Mx2;
  3. namespace LibPeer.Protocols.Stp.Streams {
  4. public class StpOutputStream : OutputStream {
  5. private EgressSession session;
  6. public InstanceReference target { get { return session.target; }}
  7. public uint8[] session_id { get { return session.identifier; }}
  8. public signal void reply(StpInputStream stream);
  9. public StpOutputStream(EgressSession session) {
  10. this.session = session;
  11. this.session.received_reply.connect(s => reply(new StpInputStream(s)));
  12. }
  13. public override bool close (GLib.Cancellable? cancellable) {
  14. session.close();
  15. return true;
  16. }
  17. public override ssize_t write(uint8[] buffer, GLib.Cancellable? cancellable = null) throws IOError {
  18. Cond cond = Cond();
  19. Mutex mutex = Mutex();
  20. IOError error_result = null;
  21. bool complete = false;
  22. var tracker = session.queue_send(buffer);
  23. tracker.on_complete.connect(() => {
  24. mutex.lock();
  25. complete = true;
  26. cond.broadcast();
  27. mutex.unlock();
  28. });
  29. tracker.on_error.connect(e => {
  30. mutex.lock();
  31. error_result = e;
  32. complete = true;
  33. cond.broadcast();
  34. mutex.unlock();
  35. });
  36. mutex.lock();
  37. while(!complete) {
  38. cond.wait(mutex);
  39. }
  40. if(error_result != null) {
  41. throw error_result;
  42. }
  43. return buffer.length;
  44. }
  45. }
  46. }