ppix.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import struct
  2. import typing
  3. class Ppix:
  4. def __init__(self, stream):
  5. self.__stream = stream
  6. stream.seek(0)
  7. if(stream.read(5) != b"PPIX\x00"):
  8. raise Exception("Stream does not begin with PPIX magic number")
  9. self.__publication_index_location, self.__collection_index_location, self.__tag_index_location, self.__word_tree_root_location = struct.unpack("<IIII", stream.read(16))
  10. # Check for extended collection metadata
  11. self.__collection_alternatives_location = 0
  12. self.__extended_collection_data_location_count = 0
  13. if(stream.read(8) == b"ECMDATA\x00"):
  14. self.__extended_collection_data_location_count = struct.unpack("<H", self.__stream.read(2))[0]
  15. if(self.__extended_collection_data_location_count > 0):
  16. self.__collection_alternatives_location = struct.unpack("<I", self.__stream.read(4))[0]
  17. def get_publications_count(self) -> int:
  18. self.__stream.seek(self.__publication_index_location)
  19. return struct.unpack("<I", self.__stream.read(4))[0]
  20. def get_publication_by_id(self, id) -> str:
  21. position = self.__publication_index_location + 4 + (id * 6)
  22. self.__stream.seek(position)
  23. string_location, string_length = struct.unpack("<IH", self.__stream.read(6))
  24. self.__stream.seek(string_location)
  25. return self.__stream.read(string_length).decode("utf-8")
  26. def get_collection_by_id(self, id) -> typing.List[int]:
  27. position = self.__collection_index_location + (id * 6)
  28. self.__stream.seek(position)
  29. collection_location, collection_item_count = struct.unpack("<IH", self.__stream.read(6))
  30. self.__stream.seek(collection_location)
  31. return struct.unpack("<{}".format("I"*collection_item_count), self.__stream.read(collection_item_count * 4))
  32. def get_tags_count(self) -> int:
  33. self.__stream.seek(self.__tag_index_location)
  34. return struct.unpack("<H", self.__stream.read(2))[0]
  35. def get_tags(self):
  36. count = self.get_tags_count()
  37. for i in range(count):
  38. tag_string_length, collection_id = struct.unpack("<BI", self.__stream.read(5))
  39. yield (self.__stream.read(tag_string_length).decode('utf-8'), collection_id)
  40. def get_alternative_locations_count(self) -> int:
  41. if(self.__collection_alternatives_location == 0):
  42. return 0
  43. self.__stream.seek(self.__collection_alternatives_location)
  44. return struct.unpack("<H", self.__stream.read(2))[0]
  45. def get_alternative_locations(self):
  46. if(self.__collection_alternatives_location == 0):
  47. return []
  48. count = self.get_alternative_locations_count()
  49. for i in range(count):
  50. url_length = struct.unpack("<H", self.__stream.read(2))
  51. yield self.__stream.read(url_length).decode('utf-8')
  52. def find_word_matches(self, word):
  53. node = self.__get_word_node_from_string(word)
  54. return node[2] if node != None else None
  55. def __get_word_node_from_string(self, word):
  56. bin_string = self.__string_to_bin(word)
  57. node = self.__read_tree_node(self.__word_tree_root_location)
  58. for bit in bin_string:
  59. if(bit == "0" and node[0] != 0):
  60. node = self.__read_tree_node(node[0])
  61. elif(bit == "1" and node[1] != 0):
  62. node = self.__read_tree_node(node[1])
  63. else:
  64. return None
  65. return node
  66. def __read_tree_node(self, position):
  67. self.__stream.seek(position)
  68. c0, has_col, col, c1 = struct.unpack("<IBII", self.__stream.read(13))
  69. if(has_col != 255):
  70. col == None
  71. return (c0, c1, col)
  72. def __string_to_bin(self, string):
  73. data = string.encode("utf-8")
  74. array = []
  75. for byte in data:
  76. for i in [1,2,4,8,16,32,64,128]:
  77. array.append(byte & i == i)
  78. return "".join(("1" if x else "0" for x in array))