1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- using LibPeer.Protocols.Stp.Sessions;
- using LibPeer.Protocols.Mx2;
- namespace LibPeer.Protocols.Stp.Streams {
- public class StpInputStream : InputStream {
- private IngressSession session;
- private uint8[] unread_data;
- private Cond data_cond = Cond();
- private Mutex data_mutex = Mutex();
- public InstanceReference target { get { return session.target; }}
- public uint8[] session_id { get { return session.identifier; }}
- public StpInputStream(IngressSession session) {
- this.session = session;
- session.incoming_app_data.connect(handle_data);
- }
- private void handle_data(uint8[] data) {
- data_mutex.lock();
- unread_data = new Util.ByteComposer().add_byte_array(unread_data).add_byte_array(data).to_byte_array();
- data_cond.broadcast();
- data_mutex.unlock();
- }
- public override bool close (GLib.Cancellable? cancellable) {
- session.close();
- return true;
- }
- public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
- data_mutex.lock();
- while(unread_data.length < buffer.length) {
- data_cond.wait(data_mutex);
- }
- for(int i = 0; i < buffer.length; i++) {
- buffer[i] = unread_data[i];
- }
- unread_data = unread_data[buffer.length:unread_data.length];
- data_mutex.unlock();
- return buffer.length;
- }
- }
- }
|