浏览代码

Stream bugfix

Billy Barrow 3 年之前
父节点
当前提交
96c1b8f230
共有 2 个文件被更改,包括 46 次插入5 次删除
  1. 5 1
      src/lib/Application/Application.vala
  2. 41 4
      src/lib/Protocols/STP/Streams/InputStream.vala

+ 5 - 1
src/lib/Application/Application.vala

@@ -48,12 +48,16 @@ namespace LibPeer {
             }
         }
 
+        protected void inquire(InstanceInformation peer) {
+            muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
+        }
+
         protected virtual void on_new_discovery_peer() {
             find_any_peer();
         }
 
         protected virtual void on_peer_found(InstanceInformation peer) {
-            muxer.inquire(instance, peer.instance_reference, peer.connection_methods);
+            inquire(peer);
         }
 
         protected abstract void on_peer_available(InstanceReference peer);

+ 41 - 4
src/lib/Protocols/STP/Streams/InputStream.vala

@@ -47,12 +47,45 @@ namespace LibPeer.Protocols.Stp.Streams {
             return true;
         }
 
+        //  public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
+        //      print(@"Read: $(buffer.length)\n");
+        //      data_mutex.lock();
+
+        //      var canc = cancellable ?? new Cancellable();
+        //      canc.cancelled.connect(() => {
+        //          print("STP read cancelled\n");
+        //          data_mutex.lock();
+        //          data_cond.broadcast();
+        //          data_mutex.unlock();
+        //      });
+
+        //      while(unread_data.length < buffer.length && (session.open && pending_data == 0 && !canc.is_cancelled())) {
+        //          data_cond.wait(data_mutex);
+        //      }
+        //      var available_data = unread_data.length < buffer.length ? unread_data.length : buffer.length;
+        //      //  print(@"Read $(available_data) of $(buffer.length) bytes\n");
+        //      for(int i = 0; i < available_data; i++) {
+        //          buffer[i] = unread_data[i];
+        //      }
+        //      //  print(@"Read:\n\t\"$(new Util.ByteComposer().add_byte_array(buffer).to_escaped_string())\"\n\tof: \"$(new Util.ByteComposer().add_byte_array(unread_data).to_escaped_string())\"\n");
+        //      unread_data = unread_data[available_data:unread_data.length];
+        //      data_mutex.unlock();
+
+
+
+        //      return available_data;
+        //  }
+
+
         public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable = null) throws GLib.IOError {
             data_mutex.lock();
-            while(unread_data.length < buffer.length && (session.open && pending_data == 0)) {
+
+            // TODO implement cancellable
+
+            while(unread_data.length == 0 && session.open) {
                 data_cond.wait(data_mutex);
             }
-            var available_data = unread_data.length < buffer.length ? unread_data.length : buffer.length;
+            var available_data = int.min(unread_data.length, buffer.length);
             //  print(@"Read $(available_data) of $(buffer.length) bytes\n");
             for(int i = 0; i < available_data; i++) {
                 buffer[i] = unread_data[i];
@@ -61,11 +94,15 @@ namespace LibPeer.Protocols.Stp.Streams {
             unread_data = unread_data[available_data:unread_data.length];
             data_mutex.unlock();
 
-
-
             return available_data;
         }
 
+        public bool has_unread_data {
+            get {
+                return session.open && unread_data.length > 0;
+            }
+        }
+
     }
 
 }