Billy Barrow преди 1 година
родител
ревизия
ebb7fe4f3f

+ 46 - 0
src/pprf/Client.vala

@@ -71,6 +71,52 @@ namespace Pprf {
             return ((Messages.Collection)response).collection;
         }
 
+        public Messages.CollectionListing get_listing(BinaryData identifier, uint32 skip, uint8 take, uint16 columns, string? tag = null, string? search = null, DateTime? since = null) throws Error {
+            var message = new Messages.GetListing();
+            message.collection_id = identifier;
+            message.skip = skip;
+            message.take = take;
+            message.columns = columns;
+            message.tag = tag;
+            message.query = search;
+            message.since = since;
+
+            if(tag != null) {
+                message.flags |= ListingRequestFlags.TAG;
+            }
+            if(search != null) {
+                message.flags |= ListingRequestFlags.SEARCH;
+            }
+            if(since != null) {
+                message.flags |= ListingRequestFlags.SINCE;
+            }
+
+            var response = send_message(message);
+            assert_expected_type(response, typeof(Messages.CollectionListing));
+            return (Messages.CollectionListing)response;
+        }
+
+        public Messages.Asset get_asset(BinaryData identifier, string publication, string asset) throws Error {
+            var message = new Messages.GetAsset();
+            message.collection_id = identifier;
+            message.publication_name = publication;
+            message.asset_name = asset;
+
+            var response = send_message(message);
+            assert_expected_type(response, typeof(Messages.Asset));
+            return ((Messages.Asset)response);
+        }
+
+        public Messages.Publication get_publication(BinaryData identifier, string publication) throws Error {
+            var message = new Messages.GetPublication();
+            message.collection_id = identifier;
+            message.publication_name = publication;
+
+            var response = send_message(message);
+            assert_expected_type(response, typeof(Messages.Publication));
+            return ((Messages.Publication)response);
+        }
+
         public void publish(BinaryData collection_id, Ppub.CollectionPublication publication, MemberIdentity identity) throws Error {
             var message = new Messages.Publish();
             message.collection_id = collection_id;

+ 1 - 0
src/pprf/HttpClient.vala

@@ -14,6 +14,7 @@ namespace Pprf {
         public HttpPprfClient(string endpoint) {
             this.endpoint = endpoint;
             session = new Soup.Session();
+            session.idle_timeout = 30;
         }
 
         public override Message send_message(Message message) throws Error {

+ 2 - 0
src/pprf/Messages/Failure.vala

@@ -105,5 +105,7 @@ namespace Pprf.Messages {
         PPCL_ERROR = 14,
         DESTINATION_PUBLISHED = 15,
         PUBLICATION_NOT_FOUND = 16,
+        ASSET_NOT_FOUND = 17,
+        ASSET_NOT_STREAMABLE = 18,
     }
 }

+ 123 - 0
src/pprf/Messages/GetAsset.vala

@@ -0,0 +1,123 @@
+using Invercargill;
+
+namespace Pprf.Messages {
+
+    public class GetAsset : CollectionMessage {
+
+        public string publication_name { get; set; }
+        public string asset_name { get; set; }
+
+        public uint64 skip { get; set; }
+        public uint64 take { get; set; }
+
+        public GetAsset() {
+            message_type = MessageType.GET_ASSET;
+        }
+
+        public override void deserialise (GLib.DataInputStream stream) throws Error {
+            base.deserialise (stream);
+            
+            // Reserved flags
+            stream.read_uint16();
+
+            var pub_name_len = stream.read_byte();
+            if(pub_name_len > 0) {
+                var data = new uint8[pub_name_len];
+                stream.read(data);
+                publication_name = new BinaryData.from_byte_array(data).to_raw_string();
+            }
+
+            var ass_name_len = stream.read_uint16();
+            if(ass_name_len > 0) {
+                var data = new uint8[ass_name_len];
+                stream.read(data);
+                asset_name = new BinaryData.from_byte_array(data).to_raw_string();
+            }
+
+            skip = stream.read_uint64();
+            take = stream.read_uint64();
+        }
+    
+        public override uint64 calculate_size() {
+            return base.calculate_size() + 
+                2 + // Flags
+                1 + // Publication name length
+                publication_name.data.length +
+                2 + // Asset name length
+                asset_name.data.length +
+                8 + // Skip
+                8;  // Take
+        }
+        
+        public override void serialise(DataOutputStream stream) throws Error {
+            base.serialise(stream);
+
+            stream.put_uint16(0);
+            var str_size = publication_name == null ? 0 : publication_name.data.length;
+            stream.put_byte(str_size);
+            if(str_size > 0) {
+                stream.put_string(publication_name);
+            }
+
+            str_size = asset_name == null ? 0 : asset_name.data.length;
+            stream.put_uint16(str_size);
+            if(str_size > 0) {
+                stream.put_string(asset_name);
+            }
+
+            stream.put_uint64(skip);
+            stream.put_uint64(take);
+        }
+    }
+
+    public class Asset : Message {
+    
+        public string mime_type { get; set; }
+        public MessageBody asset_data { get; set; }
+    
+        public Asset() {
+            message_type = MessageType.ASSET;
+        }
+    
+        public override void deserialise (GLib.DataInputStream stream) throws Error {
+            base.deserialise (stream);
+            
+            // Reserved flags
+            stream.read_uint16();
+
+            var mime_len = stream.read_byte();
+            if(mime_len > 0) {
+                var data = new uint8[mime_len];
+                stream.read(data);
+                mime_type = new BinaryData.from_byte_array(data).to_raw_string();
+            }
+
+            var data_len = (size_t)stream.read_uint64();
+            asset_data = new StreamMessageBody(data_len, stream);
+        }
+    
+        public override uint64 calculate_size() {
+            return base.calculate_size() + 
+                2 + // Flags
+                1 + // Mimetype length
+                mime_type.data.length +
+                8 + // Data length
+                asset_data.body_size;
+        }
+        
+        public override void serialise(DataOutputStream stream) throws Error {
+            base.serialise(stream);
+
+            // Reserved flags
+            stream.put_uint16(0);
+            var str_size = mime_type == null ? 0 : mime_type.data.length;
+            stream.put_byte(str_size);
+            if(str_size > 0) {
+                stream.put_string(mime_type);
+            }
+
+            stream.put_uint64(asset_data.body_size);
+            asset_data.write_to(stream);
+        }
+    }
+}

+ 71 - 40
src/pprf/Messages/GetListing.vala

@@ -15,13 +15,22 @@ namespace Pprf.Messages {
         CHECKSUM = (1 << 7),
     }
 
+    public enum ListingRequestFlags {
+        TAG = (1 << 0),
+        SEARCH = (2 << 0),
+        SINCE = (3 << 0),
+        INCLUDE_HIDDEN = (4 << 0)
+    }
+
     public class GetListing : CollectionMessage {
     
+        public uint16 flags { get; set; }
         public uint16 columns { get; set; }
         public uint32 skip { get; set; }
         public uint8 take { get; set; }
         public string? tag { get; set; }
         public string? query { get; set; }
+        public DateTime? since { get; set; }
     
         public GetListing() {
             message_type = MessageType.GET_LISTING;
@@ -29,68 +38,96 @@ namespace Pprf.Messages {
     
         public override void deserialise (GLib.DataInputStream stream) throws Error {
             base.deserialise (stream);
+            flags = stream.read_uint16();
             columns = stream.read_uint16();
             skip = stream.read_uint32();
             take = stream.read_byte();
 
-            var tag_len = stream.read_uint16();
-            if(tag_len > 0) {
-                var tag_data = new uint8[tag_len];
-                stream.read(tag_data);
-                tag = new BinaryData.from_byte_array(tag_data).to_raw_string();
+            if((flags & ListingRequestFlags.TAG) != 0) {
+                var tag_len = stream.read_uint16();
+                if(tag_len > 0) {
+                    var tag_data = new uint8[tag_len];
+                    stream.read(tag_data);
+                    tag = new BinaryData.from_byte_array(tag_data).to_raw_string();
+                }
             }
 
-            var query_len = stream.read_uint16();
-            if(query_len > 0) {
-                var query_data = new uint8[query_len];
-                stream.read(query_data);
-                query = new BinaryData.from_byte_array(query_data).to_raw_string();
+            if((flags & ListingRequestFlags.SEARCH) != 0) {
+                var query_len = stream.read_uint16();
+                if(query_len > 0) {
+                    var query_data = new uint8[query_len];
+                    stream.read(query_data);
+                    query = new BinaryData.from_byte_array(query_data).to_raw_string();
+                }
+            }
+
+            if((flags & ListingRequestFlags.SINCE) != 0) {
+                var date_len = stream.read_byte();
+                if(date_len > 0) {
+                    var date_data = new uint8[date_len];
+                    stream.read(date_data);
+                    var date_str = new BinaryData.from_byte_array(date_data).to_raw_string();
+                    since = new DateTime.from_iso8601(date_str, null);
+                }
             }
         }
     
         public override uint64 calculate_size() {
-            var text_size = 0;
-            if(query != null) {
-                text_size += query.data.length;
-            }
-            if(tag != null) {
-                text_size += tag.data.length;
-            }
+            var tag_size = (flags & ListingRequestFlags.TAG) != 0 ? 2 + tag.data.length : 0;
+            var query_size = (flags & ListingRequestFlags.SEARCH) != 0 ? 2 + query.data.length : 0;
+            var date_size = (flags & ListingRequestFlags.SINCE) != 0 ? 2 + since.format_iso8601().data.length : 0;
             return base.calculate_size() + 
+                2 + // Flags
                 2 + // Columns
                 4 + // Skip
                 1 + // Take
-                2 + // Tag size
-                2 + // Query size
-                text_size; // Query + tag
+                tag_size +
+                query_size +
+                date_size;
         }
         
         public override void serialise(DataOutputStream stream) throws Error {
             base.serialise(stream);
+            stream.put_uint16(flags);
             stream.put_uint16(columns);
             stream.put_uint32(skip);
             stream.put_byte(take);
 
-            var tag_size = tag == null ? 0 : tag.data.length;
-            stream.put_uint16(tag_size);
-            if(tag_size > 0) {
-                stream.put_string(tag);
+            if((flags & ListingRequestFlags.TAG) != 0) {
+                var tag_size = tag == null ? 0 : tag.data.length;
+                stream.put_uint16(tag_size);
+                if(tag_size > 0) {
+                    stream.put_string(tag);
+                }
+            }
+
+            if((flags & ListingRequestFlags.SEARCH) != 0) {
+                var query_size = query == null ? 0 : query.data.length;
+                stream.put_uint16(query_size);
+                if(query_size > 0) {
+                    stream.put_string(query);
+                }
             }
 
-            var query_size = query == null ? 0 : query.data.length;
-            stream.put_uint16(query_size);
-            if(query_size > 0) {
-                stream.put_string(query);
+            if((flags & ListingRequestFlags.SINCE) != 0) {
+                var date_str = since.format_iso8601();
+                stream.put_byte((uint8)date_str.data.length);
+                stream.put_string(date_str);
             }
 
         }
     }
 
-    public class CollectionListing : CollectionMessage {
+    public class CollectionListing : Message {
 
         public uint16 columns { get; set; }
         public Vector<CollectionListingItem> results { get; set; }
 
+        public CollectionListing() {
+            message_type = MessageType.COLLECTION_LISTING;
+            results = new Vector<CollectionListingItem>();
+        }
+
         public override void deserialise (GLib.DataInputStream stream) throws Error {
             base.deserialise (stream);
             columns = stream.read_uint16();
@@ -130,7 +167,6 @@ namespace Pprf.Messages {
         public string name { get; set; }
         public Metadata metadata { get; set; }
         public uint8[]? checksum { get; set; }
-        public uint8[]? poster { get; set; }
 
         public CollectionListingItem.from_stream(DataInputStream stream, uint16 cols) throws Error {
             metadata = new Metadata();
@@ -160,11 +196,8 @@ namespace Pprf.Messages {
                 metadata.tags = read_string(stream, size).split(" ");
             }
             if((cols & ListingColumn.POSTER) != 0) {
-                var size = stream.read_uint32();
-                var data = new BinaryData();
-                var remaining = (size_t)size;
-                while(data.read_in(stream.read_bytes(remaining), ref remaining));
-                poster = data.to_array();
+                var size = stream.read_uint16();
+                metadata.poster = read_string(stream, size);
             }
             if((cols & ListingColumn.COPYRIGHT) != 0) {
                 var size = stream.read_uint16();
@@ -197,7 +230,7 @@ namespace Pprf.Messages {
                 s += 2 + tag_str.data.length;
             }
             if((cols & ListingColumn.POSTER) != 0) {
-                s += 4 + poster.length;
+                s += 2 + metadata.poster.data.length;
             }
             if((cols & ListingColumn.COPYRIGHT) != 0) {
                 s += 2 + metadata.copyright.data.length;
@@ -237,10 +270,8 @@ namespace Pprf.Messages {
                 stream.put_string(tags);
             }
             if((cols & ListingColumn.POSTER) != 0) {
-                stream.put_uint32(poster.length);
-                if(poster.length > 0) {
-                    stream.write(poster);
-                }
+                stream.put_uint16((uint16)metadata.poster.data.length);
+                stream.put_string(metadata.poster);
             }
             if((cols & ListingColumn.COPYRIGHT) != 0) {
                 stream.put_uint16((uint16)metadata.copyright.data.length);

+ 81 - 0
src/pprf/Messages/GetPublication.vala

@@ -0,0 +1,81 @@
+using Invercargill;
+
+namespace Pprf.Messages {
+
+    public class GetPublication : CollectionMessage {
+
+        public string publication_name { get; set; }
+
+        public GetPublication() {
+            message_type = MessageType.GET_PUBLICATION;
+        }
+
+        public override void deserialise (GLib.DataInputStream stream) throws Error {
+            base.deserialise (stream);
+            
+            // Reserved flags
+            stream.read_uint16();
+
+            var pub_name_len = stream.read_byte();
+            if(pub_name_len > 0) {
+                var data = new uint8[pub_name_len];
+                stream.read(data);
+                publication_name = new BinaryData.from_byte_array(data).to_raw_string();
+            }
+        }
+    
+        public override uint64 calculate_size() {
+            return base.calculate_size() + 
+                2 + // Flags
+                1 + // Publication name length
+                publication_name.data.length;
+        }
+        
+        public override void serialise(DataOutputStream stream) throws Error {
+            base.serialise(stream);
+
+            stream.put_uint16(0);
+            var str_size = publication_name == null ? 0 : publication_name.data.length;
+            stream.put_byte(str_size);
+            if(str_size > 0) {
+                stream.put_string(publication_name);
+            }
+        }
+    }
+
+    public class Publication : Message {
+    
+        public MessageBody ppub_data { get; set; }
+    
+        public Publication() {
+            message_type = MessageType.PUBLICATION;
+        }
+    
+        public override void deserialise (GLib.DataInputStream stream) throws Error {
+            base.deserialise (stream);
+            
+            // Reserved flags
+            stream.read_uint16();
+
+            var data_len = (size_t)stream.read_uint64();
+            ppub_data = new StreamMessageBody(data_len, stream);
+        }
+    
+        public override uint64 calculate_size() {
+            return base.calculate_size() + 
+                2 + // Flags
+                8 + // Data length
+                ppub_data.body_size;
+        }
+        
+        public override void serialise(DataOutputStream stream) throws Error {
+            base.serialise(stream);
+
+            // Reserved flags
+            stream.put_uint16(0);
+
+            stream.put_uint64(ppub_data.body_size);
+            ppub_data.write_to(stream);
+        }
+    }
+}

+ 19 - 2
src/pprf/Messages/Message.vala

@@ -3,7 +3,8 @@ using Invercargill;
 namespace Pprf.Messages {
 
     public errordomain PprfMessageError {
-        INVALID_HEADER
+        INVALID_HEADER,
+        UNRECOGNISED_MESSAGE_TYPE
     }
 
     public class Message {
@@ -44,6 +45,7 @@ namespace Pprf.Messages {
     public abstract class MessageBody {
         public uint64 body_size { get; set; }
         public abstract void write_to(OutputStream out_stream) throws Error;
+        public abstract InputStream as_stream() throws Error;
     }
 
     public class StreamMessageBody : MessageBody {
@@ -51,7 +53,9 @@ namespace Pprf.Messages {
         private DataInputStream stream;
         private uint64 data_read = 0;
 
-        public StreamMessageBody(int size, DataInputStream stream) {
+        public signal void data_written(uint64 written, uint64 total);
+
+        public StreamMessageBody(uint64 size, DataInputStream stream) {
             body_size = size;
             this.stream = stream;
         }
@@ -67,9 +71,14 @@ namespace Pprf.Messages {
                 data_read += (uint64)read;
                 data.length = (int)read;
                 out_stream.write(data);
+                data_written(data_read, body_size);
             }
         }
 
+        public override InputStream as_stream() throws Error {
+            return stream;
+        }
+
     }
 
     public class BytesMessageBody : MessageBody {
@@ -84,6 +93,10 @@ namespace Pprf.Messages {
             out_stream.write(data.get_data());
         }
 
+        public override InputStream as_stream() throws Error {
+            return new MemoryInputStream.from_bytes(data);
+        }
+
     }
 
     public delegate void TrackedBytesMessageBodyCallback(int bytes_written);
@@ -110,6 +123,10 @@ namespace Pprf.Messages {
                 write = write[to_send:];
             }
         }
+
+        public override InputStream as_stream() throws Error {
+            return new MemoryInputStream.from_bytes(data);
+        }
     }
 
     public enum MessageType {

+ 8 - 2
src/pprf/Messages/MessageFactory.vala

@@ -3,7 +3,7 @@ namespace Pprf.Messages {
 
     public class MessageFactory {
 
-        public static Message construct_message(MessageType mtype) {
+        public static Message construct_message(MessageType mtype) throws PprfMessageError {
             switch (mtype) {
                 case MessageType.GET_COLLECTION:
                     return new GetCollection();
@@ -19,6 +19,12 @@ namespace Pprf.Messages {
                     return new FinaliseUpload();
                 case MessageType.PUBLISH:
                     return new RegisterName();
+                case MessageType.COLLECTION_LISTING:
+                    return new CollectionListing();
+                case MessageType.ASSET:
+                    return new Asset();
+                case MessageType.PUBLICATION:
+                    return new Publication();
                 case MessageType.COLLECTION:
                     return new Collection();
                 case MessageType.CONFIRMATION:
@@ -27,7 +33,7 @@ namespace Pprf.Messages {
                     return new Failure();
 
                 default:
-                    assert_not_reached();
+                    throw new PprfMessageError.UNRECOGNISED_MESSAGE_TYPE(@"Unrecognised message type $((int)mtype)");
             }
         }
 

+ 27 - 4
src/tools/pprf/Pprf.vala

@@ -4,13 +4,36 @@ public static int main(string[] args) {
 
     var creds = new Ppub.CollectionMemberCredentials.from_string("PPCLMC\nPKSIG EDXLsUvOZEne+xcv+huvSaqNBs8TTldCv6hd69GdmYw=\nPKENC WF6pIAlSeol1Ikpr2N5iO8fsjoFGf02DGafJ/aBuSB4=\nSKSIG 0njSgyndiE1tND5IGM8rrd/6QQQ284mDYGarkjfK57MQNcuxS85kSd77Fy/6G69Jqo0GzxNOV0K/qF3r0Z2ZjA==\nSKENC R18jlqqbdjmf4lbocONVudFfNoB+mhSe+iffoH8DEN4=");
 
-    var uri = new Ppub.CollectionUri.from_string(args[2]);
-    var collection_id = new Invercargill.BinaryData.from_byte_array(uri.collection_id); //new Invercargill.BinaryData.from_base64("y8ibw54A93LDBKbgWm1EJ/WlbOkGX60DK+qp2lBHpjk=");
+    //  var uri = new Ppub.CollectionUri.from_string(args[2]);
+    var collection_id = new Invercargill.BinaryData.from_base64("y8ibw54A93LDBKbgWm1EJ/WlbOkGX60DK+qp2lBHpjk="); // new Invercargill.BinaryData.from_byte_array(uri.collection_id); //new Invercargill.BinaryData.from_base64("y8ibw54A93LDBKbgWm1EJ/WlbOkGX60DK+qp2lBHpjk=");
     
+    var client = new Pprf.HttpPprfClient("http://localhost:8080/");
+    var results = client.get_listing(collection_id, 0, 10, Pprf.Messages.ListingColumn.TITLE | Pprf.Messages.ListingColumn.AUTHOR | Pprf.Messages.ListingColumn.POSTER);
+    foreach (var result in results.results) {
+        print(@"\t$(result.name): $(result.metadata.title) by $(result.metadata.author)");
+        if(result.metadata.poster != null) {
+            print(@" ($(result.metadata.poster))\n");
+            var save = File.new_for_path("image").create(FileCreateFlags.REPLACE_DESTINATION);
+            var asset = client.get_asset (collection_id, result.name, result.metadata.poster);
+            print(@"Downloading $(result.metadata.poster) ($(asset.mime_type)) $(asset.asset_data.body_size)\n");
+            asset.asset_data.write_to (save);
+
+            print(@"Downloading PPUB\n");
+            var save2 = File.new_for_path("ppub").create(FileCreateFlags.REPLACE_DESTINATION);
+            var pub = client.get_publication(collection_id, result.name);
+            pub.ppub_data.write_to(save2);
+        }
+        else{
+            print("\n");
+        }
+    }
+    return 0;
+
     var upload_file = File.new_for_path(args[1]);
     var file_size = upload_file.query_info("*", FileQueryInfoFlags.NONE).get_size();
-    var client = new Pprf.HttpPprfClient("http://localhost:8080/");
     var file_name = upload_file.get_basename();
+
+
     
     var collection = client.get_collection(collection_id);
     var member = Pprf.MemberIdentity.get_usable_identities(Invercargill.single(creds), collection).first_or_default();
@@ -19,7 +42,7 @@ public static int main(string[] args) {
         return -2;
     }
 
-    print(@"Acting as $(member.name) on $uri\n");
+    //  print(@"Acting as $(member.name) on $uri\n");
 
     print("Regisering name\n");
     var exists = false;