Client.vala 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. using Pprf.Messages;
  2. using Invercargill;
  3. namespace Pprf {
  4. public errordomain ClientError {
  5. UNEXPECTED_RESPONSE,
  6. NO_PEERS
  7. }
  8. public enum UploadStatus {
  9. INITIATING_SESSION,
  10. UPLOADING_CHUNK,
  11. UPLOADED_CHUNK,
  12. UNPUBLISHING,
  13. FINALISING_SESSION,
  14. COMPLETE
  15. }
  16. public delegate void UploadProgressDelegate(uint64 bytes_sent, uint64 bytes_total, UploadStatus status);
  17. public Client get_client_for_uri(Ppub.CollectionUri uri) throws Error {
  18. if(uri.via_server_record == null){
  19. throw new ClientError.NO_PEERS("Decentralised PPCL not yet supported, please specify domain");
  20. }
  21. var dns = Ppub.CollectionServerRecord.resolve_records(uri.via_server_record);
  22. var collection_id = Wrap.byte_array(uri.collection_id);
  23. foreach (var entry in dns) {
  24. if(!collection_id.equals(Wrap.array(entry.collection_id))) {
  25. continue;
  26. }
  27. if(entry.uri.get_scheme() == "http" || entry.uri.get_scheme() == "https") {
  28. print(@"Creating client: $(entry.uri)\n");
  29. return new HttpPprfClient(entry.uri.to_string() + "/pprf.php");
  30. }
  31. }
  32. throw new ClientError.NO_PEERS("Can't find any HTTP or HTTPS PPRF servers listed for that domain");
  33. }
  34. public abstract class Client {
  35. public abstract Message send_message(Message message) throws Error;
  36. private void assert_expected_type(Messages.Message message, Type expected) throws ClientError, Messages.PprfFailureError {
  37. var type = Type.from_instance(message);
  38. if(message is Messages.Failure) {
  39. throw message.to_error();
  40. }
  41. else if(!type.is_a(expected)) {
  42. var expected_type = expected.name().replace("PprfMessages", "");
  43. var got_type = type.name().replace("PprfMessages", "");
  44. throw new ClientError.UNEXPECTED_RESPONSE(@"Expected $expected_type message or Failure message, got $got_type message");
  45. }
  46. }
  47. public Message send_authenticated_message(AuthenticatedMessage message, MemberIdentity identity) throws Error {
  48. var collection = get_collection(identity.collection_id);
  49. identity.refresh(collection);
  50. message.authenticate(identity);
  51. return send_message(message);
  52. }
  53. public Ppub.Collection get_collection(BinaryData identifier) throws Error {
  54. var message = new Messages.GetCollection();
  55. message.collection_id = identifier;
  56. var response = send_message(message);
  57. assert_expected_type(response, typeof(Messages.Collection));
  58. return ((Messages.Collection)response).collection;
  59. }
  60. public Messages.CollectionListing get_listing(BinaryData identifier, uint32 skip, uint8 take, uint16 columns, string? tag = null, string? search = null, DateTime? since = null) throws Error {
  61. var message = new Messages.GetListing();
  62. message.collection_id = identifier;
  63. message.skip = skip;
  64. message.take = take;
  65. message.columns = columns;
  66. message.tag = tag;
  67. message.query = search;
  68. message.since = since;
  69. if(tag != null) {
  70. message.flags |= ListingRequestFlags.TAG;
  71. }
  72. if(search != null) {
  73. message.flags |= ListingRequestFlags.SEARCH;
  74. }
  75. if(since != null) {
  76. message.flags |= ListingRequestFlags.SINCE;
  77. }
  78. var response = send_message(message);
  79. assert_expected_type(response, typeof(Messages.CollectionListing));
  80. return (Messages.CollectionListing)response;
  81. }
  82. public Messages.Asset get_asset(BinaryData identifier, string publication, string asset, uint16 flags = 0) throws Error {
  83. var message = new Messages.GetAsset();
  84. message.collection_id = identifier;
  85. message.publication_name = publication;
  86. message.asset_name = asset;
  87. message.flags = flags;
  88. var response = send_message(message);
  89. assert_expected_type(response, typeof(Messages.Asset));
  90. return ((Messages.Asset)response);
  91. }
  92. public Messages.Publication get_publication(BinaryData identifier, string publication, uint16 flags = 0) throws Error {
  93. var message = new Messages.GetPublication();
  94. message.collection_id = identifier;
  95. message.publication_name = publication;
  96. message.flags = flags;
  97. var response = send_message(message);
  98. assert_expected_type(response, typeof(Messages.Publication));
  99. return ((Messages.Publication)response);
  100. }
  101. public RemotePublication open_publication(BinaryData identifier, string publication, bool autofetch_default = false) throws Error {
  102. var truncate_level = autofetch_default ? Messages.PublicationTruncationFlags.TRUNCATE_AFTER_DEFAULT_ASSET : Messages.PublicationTruncationFlags.TRUNCATE_AFTER_METADATA;
  103. var result = get_publication(identifier, publication, truncate_level);
  104. var stream = result.ppub_data.as_stream();
  105. var data = new ByteComposition();
  106. var size = (size_t)result.ppub_data.body_size;
  107. while(data.read_in(stream.read_bytes(size), ref size));
  108. print("Got publication\n");
  109. return new RemotePublication(this, identifier, publication, data);
  110. }
  111. public void publish(BinaryData collection_id, Ppub.CollectionPublication publication, MemberIdentity identity) throws Error {
  112. var message = new Messages.Publish();
  113. message.collection_id = collection_id;
  114. message.publication_string = publication.to_string();
  115. var response = send_authenticated_message(message, identity);
  116. assert_expected_type(response, typeof(Messages.Confirmation));
  117. }
  118. public void unpublish(BinaryData collection_id, string publication_name, MemberIdentity identity) throws Error {
  119. var message = new Messages.Unpublish();
  120. message.collection_id = collection_id;
  121. message.publication_name = publication_name;
  122. var response = send_authenticated_message(message, identity);
  123. assert_expected_type(response, typeof(Messages.Confirmation));
  124. }
  125. public void register_name(BinaryData collection_id, string name, MemberIdentity identity) throws Error {
  126. var message = new Messages.RegisterName();
  127. message.collection_id = collection_id;
  128. message.name = name;
  129. var response = send_authenticated_message(message, identity);
  130. assert_expected_type(response, typeof(Messages.Confirmation));
  131. }
  132. public Messages.UploadSession start_upload_session(BinaryData collection_id, uint64 size, MemberIdentity identity) throws Error {
  133. var message = new Messages.BeginUpload();
  134. message.collection_id = collection_id;
  135. message.file_size = size;
  136. message.member_name = identity.name;
  137. var response = send_message(message);
  138. assert_expected_type(response, typeof(Messages.UploadSession));
  139. return (Messages.UploadSession)response;
  140. }
  141. public void send_upload_chunk(BinaryData collection_id, Messages.UploadSession session, uint64 offset, Bytes chunk, MemberIdentity identity, Messages.TrackedBytesMessageBodyCallback? callback = null) throws Error {
  142. var message = new Messages.Upload();
  143. message.collection_id = collection_id;
  144. message.offset = offset;
  145. if(callback == null) {
  146. message.upload_chunk = new Messages.BytesMessageBody(chunk);
  147. }
  148. else {
  149. message.upload_chunk = new Messages.TrackedBytesMessageBody(chunk, callback);
  150. }
  151. var checksum = Util.data_checksum(chunk.get_data());
  152. message.authenticate(session.session_authentication, checksum, identity.credentials);
  153. var response = send_message(message);
  154. assert_expected_type(response, typeof(Messages.Confirmation));
  155. }
  156. public void finalise_upload_session(BinaryData collection_id, Messages.UploadSession session, uint8 flags, string destination, uint8[] checksum, MemberIdentity identity) throws Error {
  157. var message = new Messages.FinaliseUpload();
  158. message.collection_id = collection_id;
  159. message.flags = flags;
  160. message.destination = destination;
  161. message.authenticate(session.session_authentication, checksum, identity.credentials);
  162. var response = send_message(message);
  163. assert_expected_type(response, typeof(Messages.Confirmation));
  164. }
  165. const uint32 MAX_CHUNK_SIZE = 1073741824;
  166. public void upload(BinaryData collection_id, InputStream data, uint64 size, string destination, bool unpublish_before_finalise, MemberIdentity identity, UploadProgressDelegate? progress_cb = null, uint8 flags = 0) throws Error {
  167. UploadProgressDelegate cb = () => {};
  168. if(progress_cb != null) {
  169. cb = progress_cb;
  170. }
  171. cb(0, size, UploadStatus.INITIATING_SESSION);
  172. var session = start_upload_session(collection_id, size, identity);
  173. var checksum = new Checksum(ChecksumType.SHA512);
  174. var chunk_size = uint32.min(MAX_CHUNK_SIZE, session.max_chunk_size);
  175. uint64 offset = 0;
  176. uint64 written = 0;
  177. cb(0, size, UploadStatus.UPLOADING_CHUNK);
  178. while(offset < size) {
  179. var to_read = uint32.min(chunk_size, (uint32)(size - offset));
  180. var chunk = data.read_bytes(to_read);
  181. checksum.update(chunk.get_data(), chunk.length);
  182. send_upload_chunk(collection_id, session, offset, chunk, identity, (w) => cb(written += w, size, UploadStatus.UPLOADING_CHUNK));
  183. offset += chunk.length;
  184. cb(offset, size, UploadStatus.UPLOADED_CHUNK);
  185. }
  186. size_t dig_len = 64;
  187. var digest = new uint8[64];
  188. checksum.get_digest(digest, ref dig_len);
  189. digest.length = (int)dig_len;
  190. if(unpublish_before_finalise) {
  191. cb(offset, size, UploadStatus.UNPUBLISHING);
  192. unpublish(collection_id, destination, identity);
  193. }
  194. cb(offset, size, UploadStatus.FINALISING_SESSION);
  195. finalise_upload_session(collection_id, session, flags, destination, digest, identity);
  196. cb(offset, size, UploadStatus.COMPLETE);
  197. }
  198. public void rebuild_index(BinaryData collection_id, MemberIdentity identity) throws Error {
  199. var message = new Messages.RebuildIndex();
  200. message.collection_id = collection_id;
  201. var response = send_authenticated_message(message, identity);
  202. assert_expected_type(response, typeof(Messages.Confirmation));
  203. }
  204. }
  205. }