Переглянути джерело

Tweaks to make client API nicer

Billy Barrow 1 рік тому
батько
коміт
8651bb8578

+ 3 - 2
src/lib/Ppcl.vala

@@ -460,12 +460,13 @@ namespace Ppub {
 
         public CollectionServerRecord.from_string(string str) throws UriError {
             var parts = str.split(" ");
-            collection_id = Base64.decode(parts[1]);
+            collection_id = Base64.decode(parts[1] + "=");
             uri = Uri.parse(parts[2], UriFlags.PARSE_RELAXED);
         }
 
         public string to_string() {
-            return @"$(Base64.encode(collection_id)): $uri";
+            var id = Base64.encode(collection_id).replace("=", "");
+            return @"PPCLSR $(id) $uri";
         }
 
         public static Enumerable<CollectionServerRecord> resolve_records(string name) throws Error {

+ 15 - 7
src/pprf/Client.vala

@@ -10,7 +10,8 @@ namespace Pprf {
 
     public enum UploadStatus {
         INITIATING_SESSION,
-        SENDING_CHUNKS,
+        UPLOADING_CHUNK,
+        UPLOADED_CHUNK,
         UNPUBLISHING,
         FINALISING_SESSION,
         COMPLETE
@@ -85,11 +86,17 @@ namespace Pprf {
             return (Messages.UploadSession)response;
         }
 
-        public void send_upload_chunk(BinaryData collection_id, Messages.UploadSession session, uint64 offset, Bytes chunk, MemberIdentity identity) throws Error {
+        public void send_upload_chunk(BinaryData collection_id, Messages.UploadSession session, uint64 offset, Bytes chunk, MemberIdentity identity, Messages.TrackedBytesMessageBodyCallback? callback = null) throws Error {
             var message = new Messages.Upload();
             message.collection_id = collection_id;
             message.offset = offset;
-            message.upload_chunk = new Messages.BytesMessageBody(chunk);
+            
+            if(callback == null) {
+                message.upload_chunk = new Messages.BytesMessageBody(chunk);
+            }
+            else {
+                message.upload_chunk = new Messages.TrackedBytesMessageBody(chunk, callback);
+            }
             var checksum = Util.data_checksum(chunk.get_data());
             message.authenticate(session.session_authentication, checksum, identity.credentials);
 
@@ -108,7 +115,7 @@ namespace Pprf {
             assert_expected_type(response, typeof(Messages.Confirmation));
         }
 
-        const uint32 MAX_CHUNK_SIZE = 524288;
+        const uint32 MAX_CHUNK_SIZE = 1073741824;
         public void upload(BinaryData collection_id, InputStream data, uint64 size, string destination, bool unpublish_before_finalise, MemberIdentity identity, UploadProgressDelegate? progress_cb = null, uint8 flags = 0) throws Error {
             UploadProgressDelegate cb = () => {};
             if(progress_cb != null) {
@@ -121,16 +128,17 @@ namespace Pprf {
             var checksum = new Checksum(ChecksumType.SHA512);
             var chunk_size = uint32.min(MAX_CHUNK_SIZE, session.max_chunk_size);
             uint64 offset = 0;
+            uint64 written = 0;
             
-            cb(0, size, UploadStatus.SENDING_CHUNKS);
+            cb(0, size, UploadStatus.UPLOADING_CHUNK);
             while(offset < size) {
                 var to_read = uint32.min(chunk_size, (uint32)(size - offset));
                 var chunk = data.read_bytes(to_read);
                 checksum.update(chunk.get_data(), chunk.length);
                 
-                send_upload_chunk(collection_id, session, offset, chunk, identity);
+                send_upload_chunk(collection_id, session, offset, chunk, identity, (w) => cb(written += w, size, UploadStatus.UPLOADING_CHUNK));
                 offset += chunk.length;
-                cb(offset, size, UploadStatus.SENDING_CHUNKS);
+                cb(offset, size, UploadStatus.UPLOADED_CHUNK);
             }
 
             size_t dig_len = 64;

+ 19 - 4
src/pprf/HttpClient.vala

@@ -100,16 +100,31 @@ namespace Pprf {
             }
             public override ssize_t write(uint8[] buffer, GLib.Cancellable? cancellable) {
                 mutex.lock();
-                while(output.count() > max_size) {
-                    cond.wait(mutex);
+                var buf = buffer;
+                while(buf.length > 0) {
+                    buf = write_some(buf);
+                    cond.broadcast();
                 }
-                output.append_byte_array(buffer);
                 written += buffer.length;
-                cond.broadcast();
                 mutex.unlock();
                 return buffer.length;
             }
 
+            private uint8[] write_some(uint8[] buffer) {
+                while(output.count() >= max_size) {
+                    cond.wait(mutex);
+                }
+                var remaining = max_size - output.count();
+                //print(@"Can write up to $remaining bytes\n");
+                if(remaining == 0) {
+                    return buffer;
+                }
+
+                remaining = int.min(buffer.length, remaining);
+                output.append_byte_array(buffer[:remaining]);
+                return buffer[remaining:];
+            }
+
             public size_t read(uint8[] buffer) throws IOError {
                 mutex.lock();
                 while(output.count() == 0 && !closed && error == null) {

+ 26 - 0
src/pprf/Messages/Message.vala

@@ -86,6 +86,32 @@ namespace Pprf.Messages {
 
     }
 
+    public delegate void TrackedBytesMessageBodyCallback(int bytes_written);
+    public class TrackedBytesMessageBody : MessageBody {
+        private Bytes data;
+        private TrackedBytesMessageBodyCallback cb;
+
+        public TrackedBytesMessageBody(Bytes data, TrackedBytesMessageBodyCallback callback) {
+            body_size = data.length;
+            this.data = data;
+            cb = callback;
+        }
+
+        public override void write_to(OutputStream out_stream) throws Error {
+            var written = 0;
+            const int chunk_size = 102400;
+            var write = data.get_data();
+            while(written < body_size) {
+                var to_send = int.min(write.length, chunk_size);
+                var buf = write[:to_send];
+                out_stream.write(buf);
+                cb(buf.length);
+                written += buf.length;
+                write = write[to_send:];
+            }
+        }
+    }
+
     public enum MessageType {
 
         // 0 - 31: Public user requests

+ 3 - 1
src/pprf/Messages/Upload.vala

@@ -17,13 +17,14 @@ namespace Pprf.Messages {
             offset = stream.read_uint64();
 
             // Size of message minus headers
-            var chunk_size = size - (base.calculate_size() + 8);
+            var chunk_size = stream.read_uint64();
             upload_chunk = new StreamMessageBody((int)chunk_size, stream);
         }
     
         public override uint64 calculate_size() {
             return base.calculate_size() + 
                 8 + // Offset field
+                8 + // Chunk size field
                 upload_chunk.body_size;
         }
         
@@ -31,6 +32,7 @@ namespace Pprf.Messages {
             base.serialise(stream);
             
             stream.put_uint64(offset);
+            stream.put_uint64(upload_chunk.body_size);
             upload_chunk.write_to(stream);
             stream.flush();
         }

+ 10 - 6
src/tools/pprf/Pprf.vala

@@ -2,15 +2,14 @@
 
 public static int main(string[] args) {
 
-    Ppub.CollectionServerRecord.resolve_records("libpeer.pcthingz.com");
-
     var creds = new Ppub.CollectionMemberCredentials.from_string("PYuKgL7SdQYc2Kf6UGG9pCE58m27qrYnCaM45cnxs64=:JbJ6OoNn2KcGX+Tk5C/hotGZCoHOkTNbadUrlk6aCRs=:tL+557eP7kE6ObAW0b5RjvYyU8Dl3oVTOvYA7LAwSdI9i4qAvtJ1BhzYp/pQYb2kITnybbuqticJozjlyfGzrg==:AJbFO6n/cOuD7kk+wu7DmQ58w6z0G3HsukVmIzxGaUM=");
 
-    var collection_id = new Invercargill.BinaryData.from_base64("y8ibw54A93LDBKbgWm1EJ/WlbOkGX60DK+qp2lBHpjk=");
+    var uri = new Ppub.CollectionUri.from_string(args[2]);
+    var collection_id = new Invercargill.BinaryData.from_byte_array(uri.collection_id); //new Invercargill.BinaryData.from_base64("y8ibw54A93LDBKbgWm1EJ/WlbOkGX60DK+qp2lBHpjk=");
     
     var upload_file = File.new_for_path(args[1]);
     var file_size = upload_file.query_info("*", FileQueryInfoFlags.NONE).get_size();
-    var client = new Pprf.HttpPprfClient("http://localhost:8080/test.php");
+    var client = new Pprf.HttpPprfClient("https://billy.barrow.nz/xdelta/pprf.php");
     var file_name = upload_file.get_basename();
     
     var collection = client.get_collection(collection_id);
@@ -20,7 +19,7 @@ public static int main(string[] args) {
         return -2;
     }
 
-    print(@"Acting as $(member.name)\n");
+    print(@"Acting as $(member.name) on $uri\n");
 
     print("Regisering name\n");
     var exists = false;
@@ -35,7 +34,7 @@ public static int main(string[] args) {
     print("Beginning upload\n");
     var file_stream = upload_file.read();
     var flags = exists ? Pprf.Messages.FinaliseUploadFlags.OVERWRITE_DESTINATION : 0;
-    client.upload(collection_id, file_stream, file_size, file_name, exists, member, null, flags);
+    client.upload(collection_id, file_stream, file_size, file_name, exists, member, cb, flags);
 
     print("Computing publication signature\n");
     var digest = Pprf.Util.file_checksum(upload_file);
@@ -46,4 +45,9 @@ public static int main(string[] args) {
     
     print("Done\n");
     return 0;
+}
+
+void cb(uint64 sent, uint64 total, Pprf.UploadStatus status) {
+    var percent = (int)(((float)sent / (float)total)*100);
+    print(@"Callback: $sent / $total\t\t$percent%\t$status\n");
 }