using Pprf.Messages; using Invercargill; namespace Pprf { public errordomain ClientError { UNEXPECTED_RESPONSE, NO_PEERS } public enum UploadStatus { INITIATING_SESSION, UPLOADING_CHUNK, UPLOADED_CHUNK, UNPUBLISHING, FINALISING_SESSION, COMPLETE } public delegate void UploadProgressDelegate(uint64 bytes_sent, uint64 bytes_total, UploadStatus status); public Client get_client_for_uri(Ppub.CollectionUri uri) throws Error { if(uri.via_server_record == null){ throw new ClientError.NO_PEERS("Decentralised PPCL not yet supported, please specify domain"); } var dns = Ppub.CollectionServerRecord.resolve_records(uri.via_server_record); var collection_id = Wrap.byte_array(uri.collection_id); foreach (var entry in dns) { if(!collection_id.equals(Wrap.array(entry.collection_id))) { continue; } if(entry.uri.get_scheme() == "http" || entry.uri.get_scheme() == "https") { print(@"Creating client: $(entry.uri)\n"); return new HttpPprfClient(entry.uri.to_string() + "/pprf.php"); } } throw new ClientError.NO_PEERS("Can't find any HTTP or HTTPS PPRF servers listed for that domain"); } public abstract class Client { public abstract Message send_message(Message message) throws Error; private void assert_expected_type(Messages.Message message, Type expected) throws ClientError, Messages.PprfFailureError { var type = Type.from_instance(message); if(message is Messages.Failure) { throw message.to_error(); } else if(!type.is_a(expected)) { var expected_type = expected.name().replace("PprfMessages", ""); var got_type = type.name().replace("PprfMessages", ""); throw new ClientError.UNEXPECTED_RESPONSE(@"Expected $expected_type message or Failure message, got $got_type message"); } } public Message send_authenticated_message(AuthenticatedMessage message, MemberIdentity identity) throws Error { var collection = get_collection(identity.collection_id); identity.refresh(collection); message.authenticate(identity); return send_message(message); } public Ppub.Collection get_collection(BinaryData identifier) throws Error { var message = new Messages.GetCollection(); message.collection_id = identifier; var response = send_message(message); assert_expected_type(response, typeof(Messages.Collection)); return ((Messages.Collection)response).collection; } public Messages.CollectionListing get_listing(BinaryData identifier, uint32 skip, uint8 take, uint16 columns, string? tag = null, string? search = null, DateTime? since = null) throws Error { var message = new Messages.GetListing(); message.collection_id = identifier; message.skip = skip; message.take = take; message.columns = columns; message.tag = tag; message.query = search; message.since = since; if(tag != null) { message.flags |= ListingRequestFlags.TAG; } if(search != null) { message.flags |= ListingRequestFlags.SEARCH; } if(since != null) { message.flags |= ListingRequestFlags.SINCE; } var response = send_message(message); assert_expected_type(response, typeof(Messages.CollectionListing)); return (Messages.CollectionListing)response; } public Messages.Asset get_asset(BinaryData identifier, string publication, string asset, uint16 flags = 0) throws Error { var message = new Messages.GetAsset(); message.collection_id = identifier; message.publication_name = publication; message.asset_name = asset; message.flags = flags; var response = send_message(message); assert_expected_type(response, typeof(Messages.Asset)); return ((Messages.Asset)response); } public Messages.Publication get_publication(BinaryData identifier, string publication, uint16 flags = 0) throws Error { var message = new Messages.GetPublication(); message.collection_id = identifier; message.publication_name = publication; message.flags = flags; var response = send_message(message); assert_expected_type(response, typeof(Messages.Publication)); return ((Messages.Publication)response); } public RemotePublication open_publication(BinaryData identifier, string publication, bool autofetch_default = false) throws Error { var truncate_level = autofetch_default ? Messages.PublicationTruncationFlags.TRUNCATE_AFTER_DEFAULT_ASSET : Messages.PublicationTruncationFlags.TRUNCATE_AFTER_METADATA; var result = get_publication(identifier, publication, truncate_level); var stream = result.ppub_data.as_stream(); var data = new ByteComposition(); var size = (size_t)result.ppub_data.body_size; while(data.read_in(stream.read_bytes(size), ref size)); print("Got publication\n"); return new RemotePublication(this, identifier, publication, data); } public void publish(BinaryData collection_id, Ppub.CollectionPublication publication, MemberIdentity identity) throws Error { var message = new Messages.Publish(); message.collection_id = collection_id; message.publication_string = publication.to_string(); var response = send_authenticated_message(message, identity); assert_expected_type(response, typeof(Messages.Confirmation)); } public void unpublish(BinaryData collection_id, string publication_name, MemberIdentity identity) throws Error { var message = new Messages.Unpublish(); message.collection_id = collection_id; message.publication_name = publication_name; var response = send_authenticated_message(message, identity); assert_expected_type(response, typeof(Messages.Confirmation)); } public void register_name(BinaryData collection_id, string name, MemberIdentity identity) throws Error { var message = new Messages.RegisterName(); message.collection_id = collection_id; message.name = name; var response = send_authenticated_message(message, identity); assert_expected_type(response, typeof(Messages.Confirmation)); } public Messages.UploadSession start_upload_session(BinaryData collection_id, uint64 size, MemberIdentity identity) throws Error { var message = new Messages.BeginUpload(); message.collection_id = collection_id; message.file_size = size; message.member_name = identity.name; var response = send_message(message); assert_expected_type(response, typeof(Messages.UploadSession)); return (Messages.UploadSession)response; } public void send_upload_chunk(BinaryData collection_id, Messages.UploadSession session, uint64 offset, Bytes chunk, MemberIdentity identity, Messages.TrackedBytesMessageBodyCallback? callback = null) throws Error { var message = new Messages.Upload(); message.collection_id = collection_id; message.offset = offset; if(callback == null) { message.upload_chunk = new Messages.BytesMessageBody(chunk); } else { message.upload_chunk = new Messages.TrackedBytesMessageBody(chunk, callback); } var checksum = Util.data_checksum(chunk.get_data()); message.authenticate(session.session_authentication, checksum, identity.credentials); var response = send_message(message); assert_expected_type(response, typeof(Messages.Confirmation)); } public void finalise_upload_session(BinaryData collection_id, Messages.UploadSession session, uint8 flags, string destination, uint8[] checksum, MemberIdentity identity) throws Error { var message = new Messages.FinaliseUpload(); message.collection_id = collection_id; message.flags = flags; message.destination = destination; message.authenticate(session.session_authentication, checksum, identity.credentials); var response = send_message(message); assert_expected_type(response, typeof(Messages.Confirmation)); } const uint32 MAX_CHUNK_SIZE = 1073741824; public void upload(BinaryData collection_id, InputStream data, uint64 size, string destination, bool unpublish_before_finalise, MemberIdentity identity, UploadProgressDelegate? progress_cb = null, uint8 flags = 0) throws Error { UploadProgressDelegate cb = () => {}; if(progress_cb != null) { cb = progress_cb; } cb(0, size, UploadStatus.INITIATING_SESSION); var session = start_upload_session(collection_id, size, identity); var checksum = new Checksum(ChecksumType.SHA512); var chunk_size = uint32.min(MAX_CHUNK_SIZE, session.max_chunk_size); uint64 offset = 0; uint64 written = 0; cb(0, size, UploadStatus.UPLOADING_CHUNK); while(offset < size) { var to_read = uint32.min(chunk_size, (uint32)(size - offset)); var chunk = data.read_bytes(to_read); checksum.update(chunk.get_data(), chunk.length); send_upload_chunk(collection_id, session, offset, chunk, identity, (w) => cb(written += w, size, UploadStatus.UPLOADING_CHUNK)); offset += chunk.length; cb(offset, size, UploadStatus.UPLOADED_CHUNK); } size_t dig_len = 64; var digest = new uint8[64]; checksum.get_digest(digest, ref dig_len); digest.length = (int)dig_len; if(unpublish_before_finalise) { cb(offset, size, UploadStatus.UNPUBLISHING); unpublish(collection_id, destination, identity); } cb(offset, size, UploadStatus.FINALISING_SESSION); finalise_upload_session(collection_id, session, flags, destination, digest, identity); cb(offset, size, UploadStatus.COMPLETE); } public void rebuild_index(BinaryData collection_id, MemberIdentity identity) throws Error { var message = new Messages.RebuildIndex(); message.collection_id = collection_id; var response = send_authenticated_message(message, identity); assert_expected_type(response, typeof(Messages.Confirmation)); } } }