From 0a890b6b50ef692fa73b9e4fc65d9e2f1dba0a7f Mon Sep 17 00:00:00 2001 From: tmpz23 <28760271+tmpz23@users.noreply.github.com> Date: Mon, 22 Nov 2021 08:09:08 +0100 Subject: [PATCH] Update pzztool.py --- pzztool.py | 190 ++++++++++++++++++++++++++--------------------------- 1 file changed, 94 insertions(+), 96 deletions(-) diff --git a/pzztool.py b/pzztool.py index 89d87c4..1918d6e 100644 --- a/pzztool.py +++ b/pzztool.py @@ -24,15 +24,21 @@ TPL_MAGIC_NUMBER = b"\x00\x20\xAF\x30" # http://virtualre.rf.gd/index.php/TPL_(F def get_file_ext(file_content: bytes): - if file_content[0:4] == TPL_MAGIC_NUMBER: + if file_content.startswith(TPL_MAGIC_NUMBER): return ".tpl" # Par défaut return ".dat" +# Non implémenté : pour supprimer le pad à la fin des fichiers unpack +# Les fichiers sans pad se terminent éventuellement par des b"\x00" +# ce qui impose de connaître le format de fichier pour implémenter cette fonction +def remove_padding(file_content: bytearray): + return file_content + # return file_content.rstrip(b'\x00') + def bytes_align(bout: bytes): - while len(bout) % CHUNK_SIZE > 0: - bout.extend(b"\x00") + return bout.ljust(CHUNK_SIZE * ceil(len(bout) / CHUNK_SIZE), b"\x00") def pzz_decompress(compressed_bytes: bytes): @@ -40,10 +46,10 @@ def pzz_decompress(compressed_bytes: bytes): compressed_bytes_size = len(compressed_bytes) // 2 * 2 cb = 0 # Control bytes - cb_bit = -1 + cb_bit = -1 # rotations de 15 à 0 pour le flag de compression i = 0 while i < compressed_bytes_size: - if cb_bit < 0: + if cb_bit < 0: # tous les cb = compressed_bytes[i + 1] cb |= compressed_bytes[i + 0] << 8 cb_bit = 15 @@ -53,10 +59,10 @@ def pzz_decompress(compressed_bytes: bytes): compress_flag = cb & (1 << cb_bit) cb_bit -= 1 - # logging.debug(compress_flag) if compress_flag: c = compressed_bytes[i + 1] c |= compressed_bytes[i + 0] << 8 + offset = (c & 0x7FF) * 2 if offset == 0: break # End of the compressed data @@ -71,86 +77,86 @@ def pzz_decompress(compressed_bytes: bytes): for j in range(count): uncompressed_bytes.append(uncompressed_bytes[index + j]) else: - uncompressed_bytes.extend(compressed_bytes[i: i + 2]) + uncompressed_bytes += compressed_bytes[i: i+2] i += 2 return uncompressed_bytes -def pzz_compress(b: bytes): - bout = bytearray() - size_b = len(b) // 2 * 2 +def pzz_compress(uncompressed_bytes: bytes): + compressed_bytes = bytearray(2) + size_uncompressed_bytes = len(uncompressed_bytes) // 2 * 2 cb = 0 # Control bytes - cb_bit = 15 + cb_bit = 15 # rotations de 15 à 0 pour le flag de compression cb_pos = 0 - bout.extend(b"\x00\x00") i = 0 - while i < size_b: - start = max(i - 0x7FF * 2, 0) + while i < size_uncompressed_bytes: + start = max(i - 4094, 0) # start = 2 si i = 4096 (0x800*2) count_r = 0 max_i = -1 - tmp = b[i: i + 2] - init_count = len(tmp) + + ####################################################### + # start : contient l'index .. (en cours de rédaction) + ####################################################### while True: - start = b.find(tmp, start, i + 1) - if start != -1 and start % 2 != 0: + # start = index première occurence de uncompressed_bytes[i:i+2] entre start et i+1 + # on regarde maxi dans les 4094 derniers octets + start = uncompressed_bytes.find(uncompressed_bytes[i: i+2], start, i+1) + + # si les 2 octets étudiés n'apparaissent pas dans les 4094 derniers octets + if start == -1: + break + + # si la première occurence n'est pas à un index multiple de 2, on l'ignore + if start % 2 != 0: start += 1 continue - if start != -1: - count = init_count - while i < size_b - count \ - and count < 0xFFFF * 2 \ - and b[start + count] == b[i + count] \ - and b[start + count + 1] == b[i + count + 1]: - count += 2 - if count_r < count: - count_r = count - max_i = start - start += 2 - else: - break + count = 2 + while i < size_uncompressed_bytes - count and \ + count < 0xFFFF * 2 and \ + uncompressed_bytes[start+count] == uncompressed_bytes[i+count] and \ + uncompressed_bytes[start+count+1] == uncompressed_bytes[i+count+1]: + count += 2 + if count_r < count: + count_r = count + max_i = start + start += 2 start = max_i + ####################################################### + # + ####################################################### compress_flag = 0 if count_r >= 4: compress_flag = 1 - offset = i - start - offset //= 2 + offset = (i - start) // 2 count_r //= 2 c = offset if count_r <= 0x1F: c |= count_r << 11 - bout.append((c >> 8)) - bout.append(c & 0xFF) + compressed_bytes += c.to_bytes(2, "big") else: - bout.append((c >> 8)) - bout.append(c & 0xFF) - bout.append((count_r >> 8)) - bout.append(count_r & 0xFF) + compressed_bytes += c.to_bytes(2, "big") + count_r.to_bytes(2, "big") i += count_r * 2 else: - bout.extend(b[i: i + 2]) + compressed_bytes += uncompressed_bytes[i: i+2] i += 2 cb |= (compress_flag << cb_bit) cb_bit -= 1 if cb_bit < 0: - bout[cb_pos + 1] = cb & 0xFF - bout[cb_pos + 0] = cb >> 8 - cb = 0x0000 + compressed_bytes[cb_pos:cb_pos + 2] = cb.to_bytes(2, "big") + cb = 0 cb_bit = 15 - cb_pos = len(bout) - bout.extend(b"\x00\x00") + cb_pos = len(compressed_bytes) + compressed_bytes += b"\x00\x00" cb |= (1 << cb_bit) - bout[cb_pos + 1] = cb & 0xFF - bout[cb_pos + 0] = cb >> 8 - bout.extend(b"\x00\x00") + compressed_bytes[cb_pos:cb_pos + 2] = cb.to_bytes(2, "big") + compressed_bytes += b"\x00\x00" - bytes_align(bout) - - return bout + return bytes_align(compressed_bytes) def pzz_unpack(pzz_path: Path, dest_folder: Path, auto_decompress: bool = False): @@ -168,7 +174,7 @@ def pzz_unpack(pzz_path: Path, dest_folder: Path, auto_decompress: bool = False) logging.info(f" unpacking {pzz_path} in folder {unpacked_pzz_path}") unpacked_pzz_path.mkdir(exist_ok=True) - with open(pzz_path, "rb") as pzz_file: + with pzz_path.open("rb") as pzz_file: # file_count reçoit le nombre de fichiers présent dans le PZZ : # On lit les 4 premiers octets (uint32 big-endian) file_count, = unpack(">I", pzz_file.read(4)) @@ -216,6 +222,8 @@ def pzz_unpack(pzz_path: Path, dest_folder: Path, auto_decompress: bool = False) else: file_content = pzz_file.read(file_len) + file_content = remove_padding(bytearray(file_content)) + if not auto_decompress and compression_status != 'U': file_path = file_path.with_suffix(".pzzp") else: @@ -237,59 +245,53 @@ def pzz_pack(src_path: Path, dest_file: Path, auto_compress: bool = False): # On récupère les fichiers du dossier à compresser src_files = listdir(src_path) - # On récupère le nombre total de fichiers - file_count = len(src_files) - if auto_compress: logging.info(f" pzz({src_path}) in pzz {dest_file}") else: logging.info(f" packing {src_path} in pzz {dest_file}") - logging.debug(f" -> {file_count} files to pack") + logging.debug(f" -> {len(src_files)} files to pack") with dest_file.open("wb") as pzz_file: - # On écrit file_count au début de header - pzz_file.write(file_count.to_bytes(4, byteorder='big')) - # On se place à la fin du header PZZ pzz_file.seek(CHUNK_SIZE) - file_descriptors = [] + # On récupère le nombre total de fichiers pour le mettre au début du header + header_bytes = len(src_files).to_bytes(4, byteorder='big') + # On écrit tous les fichiers à la suite du header for src_file_name in src_files: is_compressed = Path(src_file_name).suffix == ".pzzp" compression_status = src_file_name[3:4] - with (src_path / src_file_name).open("rb") as src_file: - src_file = src_file.read() + src_file = (src_path / src_file_name).read_bytes() - # Le fichier doit être compressé avant d'être pack - if compression_status == 'C' and not is_compressed and auto_compress: - src_file = pzz_compress(src_file) - # Le fichier doit être décompressé avant d'être pack - elif compression_status == 'U' and is_compressed and auto_compress: - src_file = pzz_decompress(src_file) # padding à gérer + # Le fichier doit être compressé avant d'être pack + if compression_status == 'C' and not is_compressed and auto_compress: + src_file = pzz_compress(src_file) + # Le fichier doit être décompressé avant d'être pack + elif compression_status == 'U' and is_compressed and auto_compress: + src_file = pzz_decompress(src_file) # padding à gérer - # on ajoute le padding pour correspondre à un multiple de CHUNK_SIZE - if compression_status == 'U': - if (len(src_file) % CHUNK_SIZE) > 0: - src_file.extend(b"\x00" * (CHUNK_SIZE - (len(src_file) % CHUNK_SIZE))) + """ + # on ajoute le padding pour correspondre à un multiple de CHUNK_SIZE + if compression_status == 'U': + if (len(src_file) % CHUNK_SIZE) > 0: + src_file.extend(b"\x00" * (CHUNK_SIZE - (len(src_file) % CHUNK_SIZE))) + """ - # file_descriptor = arrondi supérieur de la taille / CHUNK_SIZE - file_descriptor = ceil(len(src_file) / CHUNK_SIZE) + # file_descriptor = arrondi supérieur de la taille / CHUNK_SIZE + file_descriptor = ceil(len(src_file) / CHUNK_SIZE) - # On ajoute le flag de compression au file_descriptor - if compression_status == 'C': - file_descriptor |= BIT_COMPRESSION_FLAG + # On ajoute le flag de compression au file_descriptor + if compression_status == 'C': + file_descriptor |= BIT_COMPRESSION_FLAG - file_descriptors.append(file_descriptor) - pzz_file.write(src_file) + header_bytes += file_descriptor.to_bytes(4, byteorder='big') + pzz_file.write(src_file) - pzz_file.seek(4) - # On écrit les file_descriptor dans le header du PZZ pour chaque fichier - tmp = bytearray() - for file_descriptor in file_descriptors: - tmp.extend(file_descriptor.to_bytes(4, byteorder='big')) - pzz_file.write(tmp) + pzz_file.seek(0) + # On écrit le header + pzz_file.write(header_bytes) def unpzz(src_path: Path, dest_file: Path): @@ -303,10 +305,10 @@ def pzz(src_path: Path, dest_file: Path): def get_argparser(): import argparse parser = argparse.ArgumentParser(description='PZZ (de)compressor & unpacker - [GameCube] Gotcha Force v' + __version__) - parser.add_argument('--version', action='version', version='%(prog)s ' + __version__) + parser.add_argument('--version', action='version', version='%(prog)s ' + __version__) parser.add_argument('-v', '--verbose', action='store_true', help='verbose mode') - parser.add_argument('-di', '--disable-ignore', action='store_true', help="Disable .pzzp or .pzz file extension verification.") - parser.add_argument('input_path', metavar='INPUT', help='') + parser.add_argument('-di', '--disable-ignore', action='store_true', help="Disable .pzzp or .pzz file extension verification.") + parser.add_argument('input_path', metavar='INPUT', help='') parser.add_argument('output_path', metavar='OUTPUT', help='', nargs='?', default="") group = parser.add_mutually_exclusive_group(required=True) @@ -369,10 +371,8 @@ if __name__ == '__main__': logging.warning(f"Ignored - {filename} - bad extension - musn't be a pzzp") shutil.copy(p_input / filename, p_output / filename) continue - logging.info(f"Compressing {filename}") - with open(p_input / filename, 'rb') as uncompressed, open(p_output / (Path(filename).stem + ".pzzp"), 'wb') as recompressed: - recompressed.write(pzz_compress(uncompressed.read())) + (p_output / (Path(filename).stem + ".pzzp")).write_bytes(pzz_compress((p_input / filename).read_bytes())) elif args.batch_decompress: logging.info("### Batch Decompress") p_output.mkdir(exist_ok=True) @@ -382,12 +382,10 @@ if __name__ == '__main__': logging.warning(f"Ignored - {filename} - bad extension - must be a pzzp") shutil.copy(p_input / filename, p_output / filename) continue - logging.info(f"Decompressing {filename}") - with open(p_input / filename, 'rb') as compressed: - uncompressed_content = pzz_decompress(compressed.read()) - with open(p_output / Path(filename).with_suffix(get_file_ext(uncompressed_content)), 'wb') as uncompressed : - uncompressed.write(uncompressed_content) + uncompressed_content = pzz_decompress((p_input / filename).read_bytes()) + uncompressed_path = p_output / Path(filename).with_suffix(get_file_ext(uncompressed_content)) + uncompressed_path.write_bytes(uncompressed_content) elif args.pack: logging.info("### Pack") pzz_pack(p_input, p_output)