浏览代码

Temporary fix for PPRF error uploading large PPUBs

Billy Barrow 6 天之前
父节点
当前提交
dc12d97ed4
共有 1 个文件被更改,包括 11 次插入122 次删除
  1. 11 122
      src/pprf/HttpClient.vala

+ 11 - 122
src/pprf/HttpClient.vala

@@ -1,7 +1,6 @@
 
 using Pprf.Messages;
 using Invercargill;
-using Invercargill.DataStructures;
 
 namespace Pprf {
 
@@ -21,130 +20,20 @@ namespace Pprf {
         public override Message send_message(Message message) throws Error {
             var soup_message = new Soup.Message("POST", endpoint);
 
-            var streamer = new MessageStreamer(message);
+            
+            var message_data = new MemoryOutputStream (null, GLib.realloc, GLib.free);
+            var data_stream = new DataOutputStream(message_data);
+            message.serialise(data_stream);
+            data_stream.flush();
+            data_stream.close();
 
-            soup_message.set_request_body(HTTP_CONTENT_TYPE, streamer, (ssize_t)message.calculate_true_size());
+            var raw_data = message_data.steal_as_bytes();
+            soup_message.set_request_body_from_bytes(HTTP_CONTENT_TYPE, raw_data);
 
-            var stream = session.send(soup_message);
-            return MessageFactory.from_stream(stream);
+            var response_stream = session.send(soup_message);
+            return MessageFactory.from_stream(response_stream);
         }
 
     }
 
-    private class MessageStreamer : InputStream {
-
-        private Message message;
-        private Receipticle receipticle;
-        private int total = 0;
-
-        private bool thread_started = false;
-
-        public MessageStreamer(Message message) {
-            this.message = message;
-            this.receipticle = new Receipticle();
-        }
-
-        public override bool close(GLib.Cancellable? cancellable) {
-            return true;
-        }
-        public override ssize_t read(uint8[] buffer, GLib.Cancellable? cancellable) throws IOError {
-            lock(thread_started) {
-                if(!thread_started) {
-                    thread_started = true;
-                    new Thread<bool>("serialisation thread", serialise);
-                }
-            }
-            var s = (ssize_t)receipticle.read(buffer);
-            total += (int)s;
-            return s;
-        }
-
-        private bool serialise() {
-            var stream = new DataOutputStream(receipticle);
-            try {
-                message.serialise(stream);
-                stream.flush();
-                stream.close();
-            }
-            catch(IOError e) {
-                receipticle.set_error(e);
-            }
-            catch(Error e) {
-                receipticle.set_error(new IOError.FAILED("Serialisation error: " + e.message));
-            }
-            return true;
-        }
-
-        public class Receipticle : OutputStream {
-
-            private ByteBuffer output = new ByteBuffer(0);
-            private const int max_size = 8192;
-
-            private Cond cond = Cond ();
-            private Mutex mutex = Mutex ();
-            private bool closed = false;
-            private IOError? error = null;
-            private int written = 0;
-
-            public void set_error(IOError e) {
-                mutex.lock();
-                error = e;
-                cond.broadcast();
-                mutex.unlock();
-            }
-
-            public override bool close(GLib.Cancellable? cancellable) {
-                mutex.lock();
-                closed = true;
-                cond.broadcast();
-                mutex.unlock();
-                return true;
-            }
-            public override ssize_t write(uint8[] buffer, GLib.Cancellable? cancellable) {
-                mutex.lock();
-                var buf = buffer;
-                while(buf.length > 0) {
-                    buf = write_some(buf);
-                    cond.broadcast();
-                }
-                written += buffer.length;
-                mutex.unlock();
-                return buffer.length;
-            }
-
-            private uint8[] write_some(uint8[] buffer) {
-                while(output.count() >= max_size) {
-                    cond.wait(mutex);
-                }
-                var remaining = max_size - output.count();
-                //print(@"Can write up to $remaining bytes\n");
-                if(remaining == 0) {
-                    return buffer;
-                }
-
-                remaining = int.min(buffer.length, remaining);
-                output = output.concat(Wrap.byte_array(buffer[:remaining])).assert_promotion<BinaryData>().to_byte_buffer();
-                return buffer[remaining:];
-            }
-
-            public size_t read(uint8[] buffer) throws IOError {
-                mutex.lock();
-                while(output.count() == 0 && !closed && error == null) {
-                    cond.wait(mutex);
-                }
-                if(error != null) {
-                    throw error;
-                }
-                var written = output.write_to(buffer, buffer.length);
-                output = output.slice((int)written, output.count()).to_byte_buffer();
-                cond.broadcast();
-                mutex.unlock();
-                return written;
-            }
-
-        }
-        
-
-    }
-
-}
+}