diff --git a/afstool.py b/afstool.py index e001ef8..1f6d751 100644 --- a/afstool.py +++ b/afstool.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +from configparser import ConfigParser from datetime import datetime import logging from math import ceil @@ -8,13 +9,35 @@ import re import time -__version__ = "0.0.4" +__version__ = "0.0.5" __author__ = "rigodron, algoflash, GGLinnk" __license__ = "MIT" __status__ = "developpement" +# Not tested: class AfsInvalidFileLenError(Exception): pass +class AfsEmptyAfsError(Exception): pass +class AfsInvalidFilenameDirectoryLengthError(Exception): pass +class AfsInvalidAfsFolderError(Exception): pass + + +# Tested: +class AfsInvalidMagicNumberError(Exception): pass +class AfsInvalidFilesRebuildStrategy(Exception): pass +class AfsFilenameDirectoryValueError(Exception): pass +class AfsInvalidFilePathError(Exception): pass +class AfsInvalidFieldsCountError(Exception): pass +class AfsIndexValueError(Exception): pass +class AfsIndexOverflowError(Exception): pass +class AfsIndexCollisionError(Exception): pass +class AfsOffsetValueError(Exception): pass +class AfsOffsetAlignError(Exception): pass +class AfsOffsetCollisionError(Exception): pass +class AfsFdOffsetOffsetValueError(Exception): pass +class AfsFdOffsetValueError(Exception): pass +class AfsFdLastAttributeTypeValueError(Exception): pass +class AfsFdOffsetCollisionError(Exception): pass class FilenameResolver: @@ -27,25 +50,21 @@ class FilenameResolver: self.__names_tuples = {} self.__load() def __load(self): - if (self.__sys_path / "filename_resolver.txt").is_file(): - self.__resolve_buffer = (self.__sys_path / "filename_resolver.txt").read_text() + if (self.__sys_path / "filename_resolver.csv").is_file(): + self.__resolve_buffer = (self.__sys_path / "filename_resolver.csv").read_text() for line in self.__resolve_buffer.split('\n'): name_tuple = line.split(self.__separator) self.__names_tuples[name_tuple[1]] = int(name_tuple[0]) def save(self): if len(self.__resolve_buffer) > 0: - logging.info("Writting filename_resolver.txt") - (self.__sys_path / "filename_resolver.txt").write_text(self.__resolve_buffer[:-1]) + logging.info(f"Writting {Path('sys/filename_resolver.csv')}") + (self.__sys_path / "filename_resolver.csv").write_text(self.__resolve_buffer[:-1]) # resolve generate a unique filename when unpacking def resolve_new(self, fileindex:int, filename:str): if filename in self.__names_tuples: - if self.__names_tuples[filename] == fileindex: - return filename i = 1 new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}" while new_filename in self.__names_tuples: - if self.__names_tuples[new_filename] == fileindex: - return new_filename i+=1 new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}" self.__names_tuples[new_filename] = fileindex @@ -53,6 +72,10 @@ class FilenameResolver: return new_filename self.__names_tuples[filename] = fileindex return filename + # Add new entry forcing the unpacked_filename² + def add(self, fileindex:int, unpacked_filename:str): + self.__names_tuples[unpacked_filename] = fileindex + self.__resolve_buffer += f"{fileindex}{self.__separator}{unpacked_filename}\n" # return generated filename if it exist else filename def resolve_from_index(self, fileindex:int, filename:str): for filename_key, fileindex_value in self.__names_tuples.items(): @@ -67,7 +90,7 @@ class Afs: MAGIC_20 = b"AFS\x20" ALIGN = 0x800 HEADER_LEN = 8 - FILENAMEBLOCK_ENTRY_LEN = 0x30 + FILENAMEDIRECTORY_ENTRY_LEN = 0x30 __file_count = None __filenamedirectory_offset_offset = None __filenamedirectory_offset = None @@ -87,11 +110,11 @@ class Afs: def __get_file_len(self, fileindex:int): return int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8], "little") def __get_file_name(self, fileindex:int): - return self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8") + return self.__filenamedirectory[fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN:fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8") def __get_file_fdlast(self, fileindex:int): - return int.from_bytes(self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48], "little") + return int.from_bytes(self.__filenamedirectory[fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+44:fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+48], "little") def __get_mtime(self, fileindex:int): - mtime_data = self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44] + mtime_data = self.__filenamedirectory[fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+32:fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+44] year = int.from_bytes(mtime_data[0:2], "little") month = int.from_bytes(mtime_data[2:4], "little") day = int.from_bytes(mtime_data[4:6], "little") @@ -102,13 +125,13 @@ class Afs: def __patch_file_len(self, fileindex:int, file_len:int): # Patch file_len in the FD if self.__filenamedirectory: - if self.__get_file_len(fileindex) == self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48]: - self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48] = file_len.to_bytes(4, "little") + if self.__get_file_len(fileindex) == self.__filenamedirectory[fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+44:fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+48]: + self.__filenamedirectory[fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+44:fileindex*Afs.FILENAMEDIRECTORY_ENTRY_LEN+48] = file_len.to_bytes(4, "little") # Patch file_len in the TOC self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8] = file_len.to_bytes(4, "little") def __patch_mtime(self, fileindex:int, mtime): mtime = datetime.fromtimestamp(mtime) - self.__filenamedirectory[Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+32:Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+44] = \ + self.__filenamedirectory[Afs.FILENAMEDIRECTORY_ENTRY_LEN*fileindex+32:Afs.FILENAMEDIRECTORY_ENTRY_LEN*fileindex+44] = \ mtime.year.to_bytes(2,"little")+ \ mtime.month.to_bytes(2,"little")+ \ mtime.day.to_bytes(2,"little")+ \ @@ -126,7 +149,7 @@ class Afs: def __loadsys_from_afs(self, afs_file, afs_len:int): self.__tableofcontent = afs_file.read(Afs.HEADER_LEN) if self.__get_magic() not in [Afs.MAGIC_00, Afs.MAGIC_20]: - raise Exception("Error - Invalid AFS magic number.") + raise AfsInvalidMagicNumberError("Error - Invalid AFS magic number.") self.__file_count = self.__get_file_count() self.__tableofcontent += afs_file.read(self.__file_count*8) tableofcontent_len = len(self.__tableofcontent) @@ -152,8 +175,9 @@ class Afs: offset += block_len tmp_block = afs_file.read(block_len) + # This because we retrieve an int valid or not into fd offset if self.__filenamedirectory_offset is None: - raise Exception("Error - Empty AFS.") + raise AfsEmptyAfsError("Error - Empty AFS.") afs_file.seek(self.__filenamedirectory_offset_offset+4) self.__filenamedirectory_len = int.from_bytes(afs_file.read(4), "little") @@ -161,7 +185,7 @@ class Afs: # Test if offset of filenamedirectory is valid and if number of entries match between filenamedirectory and tableofcontent if self.__filenamedirectory_offset + self.__filenamedirectory_len > afs_len or \ self.__filenamedirectory_offset < self.__filenamedirectory_offset_offset or \ - (tableofcontent_len - self.HEADER_LEN) / 8 != self.__filenamedirectory_len / Afs.FILENAMEBLOCK_ENTRY_LEN: + (tableofcontent_len - self.HEADER_LEN) / 8 != self.__filenamedirectory_len / Afs.FILENAMEDIRECTORY_ENTRY_LEN: self.__clean_filenamedirectory() return False @@ -171,7 +195,7 @@ class Afs: # Test if filename is correct by very basic pattern matching pattern = re.compile(b"^(?=.{32}$)[^\x00]+\x00+$") for i in range(0, self.__file_count): - if not pattern.fullmatch(self.__filenamedirectory[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32]): + if not pattern.fullmatch(self.__filenamedirectory[i*Afs.FILENAMEDIRECTORY_ENTRY_LEN:i*Afs.FILENAMEDIRECTORY_ENTRY_LEN+32]): self.__clean_filenamedirectory() return False @@ -189,13 +213,107 @@ class Afs: self.__filenamedirectory_offset = self.__get_filenamedirectory_offset() self.__filenamedirectory_len = self.__get_filenamedirectory_len() if self.__filenamedirectory_len != len(self.__filenamedirectory): - raise Exception("Error - Tableofcontent filenamedirectory length does not match real filenamedirectory length.") + raise AfsInvalidFilenameDirectoryLengthError("Error - Tableofcontent filenamedirectory length does not match real filenamedirectory length.") + def __print(self, title:str, lines_tuples, columns:list = list(range(0,7)), infos:str = ""): + stats_buffer = "#"*100+f"\n# {title}\n"+"#"*100+f"\n{infos}|"+"-"*99+"\n" + if 0 in columns: stats_buffer += "| Index "; + if 1 in columns: stats_buffer += "| b offset "; + if 2 in columns: stats_buffer += "| e offset "; + if 3 in columns: stats_buffer += "| length "; + if 4 in columns: stats_buffer += "| YYYY-mm-dd HH:MM:SS "; + if 5 in columns: stats_buffer += "| FD last "; + if 6 in columns: stats_buffer += "| Filename"; + stats_buffer += "\n|"+"-"*99+"\n" + for line in lines_tuples: + stats_buffer += line if type(line) == str else "| "+" | ".join(line)+"\n" + print(stats_buffer, end='') + # end offset not included (0,1) -> len=1 + def __get_offsets_map(self): + # offsets_map is used to check next used offset when updating files + # we also check if there is intersect between files + offsets_map = [(0, len(self.__tableofcontent))] + for i in range(0, self.__file_count): + file_offset = self.__get_file_offset(i) + offsets_map.append( (file_offset, file_offset + self.__get_file_len(i)) ) + if self.__filenamedirectory: + filenamedirectory_offset = self.__get_filenamedirectory_offset() + offsets_map.append( (filenamedirectory_offset, filenamedirectory_offset + self.__get_filenamedirectory_len()) ) + offsets_map.sort(key=lambda x: x[0]) + + # Check if there is problems in file memory mapping + last_tuple = (-1, -1) + for i, offsets_tuple in enumerate(offsets_map): + if offsets_tuple[0] < last_tuple[1]: + raise AfsOffsetCollisionError(f"Error - Multiple files use same file offsets ranges.") + last_tuple = offsets_tuple + offsets_map[i] = offsets_tuple[0] + return offsets_map + # end offset not included (0,1) -> len=1 + def __get_formated_map(self): + files_map = [("SYS TOC ", "00000000", f"{len(self.__tableofcontent):08x}", f"{len(self.__tableofcontent):08x}", "SYS TOC"+' '*12, "SYS TOC ", "SYS TOC")] + + for i in range(0, self.__file_count): + file_offset = self.__get_file_offset(i) + file_len = self.__get_file_len(i) + file_date = datetime.fromtimestamp(self.__get_mtime(i)).strftime("%Y-%m-%d %H:%M:%S") if self.__filenamedirectory else " "*19 + filename = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}" + fdlast = f"{self.__get_file_fdlast(i):08x}" if self.__filenamedirectory else " "*8 + files_map.append((f"{i:08x}", f"{file_offset:08x}", f"{file_offset + file_len:08x}", f"{file_len:08x}", file_date, fdlast, filename)) + + if self.__filenamedirectory: + files_map.append(("SYS FD ", f"{self.__filenamedirectory_offset:08x}", \ + f"{self.__filenamedirectory_offset + len(self.__filenamedirectory):08x}", \ + f"{len(self.__filenamedirectory):08x}", "SYS FD"+' '*13, "SYS FD ", "SYS FD")) + return files_map + def __get_fd_last_attribute_type(self): + # Try to get the type of FD last attribute + length_type = True + offset_length_type = True + constant_type = self.__get_file_fdlast(0) + + for i in range(0, self.__file_count): + fd_last_attribute = self.__get_file_fdlast(i) + if fd_last_attribute != self.__get_file_len(i): + length_type = None + if fd_last_attribute != self.__tableofcontent[8+i*4:8+i*4+4]: + offset_length_type = None + if fd_last_attribute != constant_type: + constant_type = None + if length_type: return "length" + if offset_length_type: return "offset-length" + if constant_type: return f"0x{constant_type:x}" + logging.info("unknown FD last attribute type.") + return "unknown" + def __write_rebuild_config(self, sys_path:Path, resolver:FilenameResolver): + config = ConfigParser(allow_no_value=True) # allow_no_value to allow adding comments + config.optionxform = str # makes options case sensitive + config.add_section("Default") + config.set("Default", "# Documentation available here: https://github.com/Virtual-World-RE/NeoGF/blob/main/README.md#afs_rebuildconf--afs_rebuildcsv") + config.set("Default", "AFS_MAGIC", f"0x{self.__get_magic().hex()}") + config.set("Default", "files_rebuild_strategy", "mixed") + config.set("Default", "filename_directory", "True" if self.__filenamedirectory else "False") + if self.__filenamedirectory: + config.add_section("FilenameDirectory") + config.set("FilenameDirectory", "toc_offset_of_fd_offset", f"0x{self.__filenamedirectory_offset_offset:x}") + config.set("FilenameDirectory", "fd_offset", f"0x{self.__filenamedirectory_offset:x}") + config.set("FilenameDirectory", "fd_last_attribute_type", self.__get_fd_last_attribute_type()) + config.write((sys_path / "afs_rebuild.conf").open("w")) + + rebuild_csv = "" + # generate and save afs_rebuild.csv + for i in range(0, self.__file_count): + filename = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}" + unpacked_filename = resolver.resolve_from_index(i, filename) if self.__filenamedirectory else f"{i:08}" + rebuild_csv += f"{unpacked_filename}/0x{i:x}/0x{self.__get_file_offset(i):x}/{filename}\n" + if len(rebuild_csv) > 0: + (sys_path / "afs_rebuild.csv").write_text(rebuild_csv[:-1]) def unpack(self, afs_path:Path, folder_path:Path): sys_path = folder_path / "sys" root_path = folder_path / "root" sys_path.mkdir(parents=True) root_path.mkdir() + resolver = None with afs_path.open("rb") as afs_file: if not self.__loadsys_from_afs(afs_file, afs_path.stat().st_size): logging.info("There is no filename directory. Creating new names and dates for files.") @@ -221,10 +339,12 @@ class Afs: if self.__filenamedirectory: mtime = self.__get_mtime(i) os.utime(root_path / filename, (mtime, mtime)) + if self.__filenamedirectory: resolver.save() + self.__write_rebuild_config(sys_path, resolver) def pack(self, folder_path:Path, afs_path:Path = None): - if afs_path == None: + if afs_path is None: afs_path = folder_path / Path(folder_path.name).with_suffix(".afs") elif afs_path.suffix != ".afs": logging.warning("Dest file should have .afs file extension.") @@ -234,8 +354,7 @@ class Afs: self.__loadsys_from_folder(sys_path) - if self.__filenamedirectory: - resolver = FilenameResolver(sys_path) + resolver = FilenameResolver(sys_path) offsets_map = self.__get_offsets_map() with afs_path.open("wb") as afs_file: @@ -243,7 +362,8 @@ class Afs: for i in range(0, self.__file_count): file_offset = self.__get_file_offset(i) file_len = self.__get_file_len(i) - filename = resolver.resolve_from_index(i, self.__get_file_name(i)) if self.__filenamedirectory else f"{i:08}" + filename = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}" + filename = resolver.resolve_from_index(i, filename) file_path = root_path / filename new_file_len = file_path.stat().st_size @@ -271,7 +391,197 @@ class Afs: afs_file.seek(0) afs_file.write(self.__tableofcontent) def rebuild(self, folder_path:Path): - raise Exception("Error - Not implemented yet") + config = ConfigParser() + root_path = folder_path / "root" + sys_path = folder_path / "sys" + config.read(sys_path / "afs_rebuild.conf") + if config["Default"]["AFS_MAGIC"] not in ["0x41465300", "0x41465320"]: + raise AfsInvalidMagicNumberError("Error - Invalid [Default] AFS_MAGIC: must be 0x41465300 or 0x41465320.") + if config["Default"]["files_rebuild_strategy"] not in ["index", "offset", "mixed", "auto"]: + raise AfsInvalidFilesRebuildStrategy("Error - Invalid [Default] files_rebuild_strategy: must be index, offset, mixed or auto.") + if config["Default"]["filename_directory"] not in ["True", "False"]: + raise AfsFilenameDirectoryValueError("Error - Invalid [Default] filename_directory: must be True or False.") + + for path in [sys_path / "tableofcontent.bin", sys_path / "filenamedirectory.bin", sys_path / "filename_resolver.csv"]: + if path.is_file(): + logging.info(f"Removing {path}.") + path.unlink() + + files_paths = list(root_path.glob("*")) + self.__file_count = len(files_paths) + max_offset = None + + if config["Default"]["filename_directory"] == "True": + if config["FilenameDirectory"]["toc_offset_of_fd_offset"] != "auto": + if config["FilenameDirectory"]["toc_offset_of_fd_offset"][:2] != "0x" or len(config["FilenameDirectory"]["toc_offset_of_fd_offset"]) < 3: + raise AfsFdOffsetOffsetValueError("Error - Invalid [FilenameDirectory] toc_offset_of_fd_offset: must use hex format 0xabcdef or auto.") + self.__filenamedirectory_offset_offset = int(config["FilenameDirectory"]["toc_offset_of_fd_offset"][2:], 16) + else: + self.__filenamedirectory_offset_offset = self.__file_count*8 + 8 + max_offset = int(ceil((self.__filenamedirectory_offset_offset + 8) / Afs.ALIGN)) * Afs.ALIGN # TOC length + self.__filenamedirectory_len = self.__file_count * Afs.FILENAMEDIRECTORY_ENTRY_LEN + + if config["FilenameDirectory"]["fd_offset"] != "auto": + if config["FilenameDirectory"]["fd_offset"][:2] != "0x" or len(config["FilenameDirectory"]["fd_offset"]) < 3: + raise AfsFdOffsetValueError("Error - Invalid [FilenameDirectory] fd_offset: must use hex format 0xabcdef or auto.") + self.__filenamedirectory_offset = int(config["FilenameDirectory"]["fd_offset"][2:], 16) + + if config["FilenameDirectory"]["fd_last_attribute_type"] not in ["length", "offset-length", "unknown"]: + if config["FilenameDirectory"]["fd_last_attribute_type"][0:2] != "0x" or len(config["FilenameDirectory"]["fd_last_attribute_type"]) < 3: + raise AfsFdLastAttributeTypeValueError("Error - Invalid [FilenameDirectory] fd_last_attribute_type: must be length, offset-length, 0xabcdef offset or unknown.") + else: + max_offset = int(ceil((self.__file_count*8 + 8) / Afs.ALIGN)) * Afs.ALIGN # TOC length + + self.__tableofcontent = bytearray.fromhex( config["Default"]["AFS_MAGIC"][2:] ) + self.__file_count.to_bytes(4, "little") + files_rebuild_strategy = config["Default"]["files_rebuild_strategy"] + + csv_files_lists = [] + reserved_indexes = [] + + # We parse the file csv and verify entries retrieving length for files + if (sys_path / "afs_rebuild.csv").is_file(): + for line in (sys_path / "afs_rebuild.csv").read_text().split('\n'): + line_splited = line.split('/') + if len(line_splited) == 4: + unpacked_filename = line_splited[0] + + index = None + if files_rebuild_strategy in ["index", "mixed"]: + if line_splited[1] != "auto": + index = line_splited[1] + if index[:2] != "0x" or len(index) < 3: + raise AfsIndexValueError(f"Error - Invalid entry index in afs_rebuild.csv: {index} - \"{line}\"") + index = int(index[2:], 16) + if index >= self.__file_count: + raise AfsIndexOverflowError(f"Error - Invalid entry index in afs_rebuild.csv: 0x{index:x} - \"{line}\" - index must be < files_count.") + if index in reserved_indexes: + raise AfsIndexCollisionError("Error - Multiple files using same index: 0x{index:x}") + reserved_indexes.append( index ) + + file_path = root_path / unpacked_filename + if not file_path.is_file(): + raise AfsInvalidFilePathError(f"Error - File {file_path} doesn't exist.") + file_length = file_path.stat().st_size + + offset = None + if files_rebuild_strategy in ["offset", "mixed"]: + if line_splited[2] != "auto": + offset = line_splited[2] + if offset[:2] != "0x" or len(offset) < 3: + raise AfsOffsetValueError(f"Error - Invalid entry offset in afs_rebuild.csv: {offset} - \"{line}\"") + offset = int(offset[2:], 16) + if offset % Afs.ALIGN > 0: + raise AfsOffsetAlignError(f"Error - Invalid entry offset in afs_rebuild.csv: 0x{offset:x} - \"{line}\" - offset must be aligned to 0x800.") + + csv_files_lists.append( [unpacked_filename, index, offset, line_splited[3], file_length] ) + + files_paths.remove( root_path / unpacked_filename ) + elif len(line_splited) == 2: # empty block + raise Exception(f"Error - Empty blocks not implemented yet in afs_rebuild.csv: \"{line}\"") + else: + raise AfsInvalidFieldsCountError(f"Error - Invalid entry fields count in afs_rebuild.csv: \"{line}\"") + + # We generate file memory map with offsets: + # available_space_ranges is then used to put files that have an adapted length + # max_offset is used here to find memory collisions between files and next available space + available_space_ranges = [] + if files_rebuild_strategy in ["offset", "mixed"]: + # We have to sort offsets before merging to avoid complex algorithm + # TOC is already present to begin + for file_tuple in sorted(csv_files_lists, key=lambda x: (x[2] is not None, x[2])): + offset = file_tuple[2] + if offset is None: + continue + if offset < max_offset: + raise AfsOffsetCollisionError(f"Error - Offsets collision with offset \"0x{offset:x}\".") + elif offset > max_offset: + available_space_ranges.append( [max_offset, offset] ) + max_offset = int(ceil((offset + file_tuple[4]) / Afs.ALIGN)) * Afs.ALIGN + + for file_path in files_paths: + csv_files_lists.append( [file_path.name, None, None, file_path.name, file_path.stat().st_size] ) + + # Now csv_files_lists contains all files for sys rebuild + # "auto" -> csv_files_lists -> index & offsets is None + # "index" -> csv_files_lists -> offsets is None + # "offset" -> csv_files_lists -> index is None -> available_space_ranges to allocate file address space + # "mixed" -> have to consider offsets & indexes -> available_space_ranges to allocate file address space + + # sort by filename + csv_files_lists.sort(key=lambda x: x[3]) + current_offset = max_offset + + # if index==None -> Assign an index not in reserved_indexes + reserved_indexes.sort() + next_index = 0 + for i in range(0, len(csv_files_lists)): + if csv_files_lists[i][1] is None and files_rebuild_strategy in ["index", "mixed"] or files_rebuild_strategy in ["auto", "offset"]: + for j in range(next_index, len(csv_files_lists)): + if j not in reserved_indexes: + next_index = j + 1 + csv_files_lists[i][1] = j + break + # sort by index + csv_files_lists.sort(key=lambda x: x[1]) + + # if offset==None -> Assign an offset in available_space_ranges or at the end of file allocated space + for i in range(0, len(csv_files_lists)): + if files_rebuild_strategy in ["offset", "mixed"] and csv_files_lists[i][2] is None or files_rebuild_strategy in ["auto", "index"]: + block_len = int(ceil(csv_files_lists[i][4] / Afs.ALIGN)) * Afs.ALIGN + for j in range(0, len(available_space_ranges)): + available_block_len = int(ceil((available_space_ranges[j][1] - available_space_ranges[j][0]) / Afs.ALIGN)) * Afs.ALIGN + if block_len <= available_block_len: + csv_files_lists[i][2] = available_space_ranges[j][0] + if block_len == available_block_len: + del available_space_ranges[j] + else: + available_space_ranges[j][0] += block_len + break + else: + # Here we have a bigger file than available ranges so we pick current_offset at the end of allocated space + csv_files_lists[i][2] = current_offset + current_offset += block_len + + if self.__filenamedirectory_offset_offset: + self.__filenamedirectory = b"" + fd_last_attribute_type = config["FilenameDirectory"]["fd_last_attribute_type"] + if fd_last_attribute_type[:2] == "0x": + fd_last_attribute_type = int(fd_last_attribute_type[2:], 16) + + # Have to be sorted by index + # current_offset contains now fd offset if not already set + resolver = FilenameResolver(sys_path) + for i in range(0, len(csv_files_lists)): + self.__tableofcontent += csv_files_lists[i][2].to_bytes(4, "little") + csv_files_lists[i][4].to_bytes(4, "little") + # unpacked_filename, index, offset, filename, file_length + if self.__filenamedirectory_offset_offset: + mtime = b"\x00" * 12 # will be patched next pack + fd_last_attribute = None + if type(fd_last_attribute_type) == int: + fd_last_attribute = fd_last_attribute_type.to_bytes(4, "little") + elif fd_last_attribute_type == "length": + fd_last_attribute = csv_files_lists[i][4].to_bytes(4, "little") + elif fd_last_attribute_type == "offset-length": + fd_last_attribute = self.__tableofcontent[8+i*4:8+i*4+4] + else: # == unknown + fd_last_attribute = b"\x00"*4 + self.__filenamedirectory += bytes(csv_files_lists[i][3], "utf-8").ljust(32, b"\x00") + mtime + fd_last_attribute + # if unpacked_filename != filename we store it into the resolver + if csv_files_lists[i][0] != csv_files_lists[i][3] or not self.__filenamedirectory_offset_offset: + resolver.add(i, csv_files_lists[i][0]) + resolver.save() + if self.__filenamedirectory: + if not self.__filenamedirectory_offset: + self.__filenamedirectory_offset = current_offset + elif self.__filenamedirectory_offset < current_offset: + raise AfsFdOffsetCollisionError(f"Error - Invalid FD offset 0x{self.__filenamedirectory_offset:x} < last used file block end 0x{current_offset:x}.") + self.__tableofcontent = self.__tableofcontent.ljust(self.__filenamedirectory_offset_offset+8, b"\x00") # Add pad if needed + self.__tableofcontent[self.__filenamedirectory_offset_offset:self.__filenamedirectory_offset_offset+8] = self.__filenamedirectory_offset.to_bytes(4, "little") + self.__filenamedirectory_len.to_bytes(4, "little") + + logging.info(f"Writting {Path('sys/filenamedirectory.bin')}") + (sys_path / "filenamedirectory.bin").write_bytes(self.__filenamedirectory) + logging.info(f"Writting {Path('sys/tableofcontent.bin')}") + (sys_path / "tableofcontent.bin").write_bytes(self.__tableofcontent) def stats(self, path:Path): if path.is_file(): with path.open("rb") as afs_file: @@ -280,7 +590,7 @@ class Afs: self.__loadsys_from_folder(path / "sys") files_map = self.__get_formated_map() - files_map.sort(key=lambda x: x[1]) # sort by offset + files_map.sort(key=lambda x: x[1]) # sort by offset (str with fixed len=8) # Offsets intersect dup_offsets_tuples = [] @@ -331,57 +641,6 @@ class Afs: self.__print("Files using same filenames:", dup_names_tuples) if empty_space_tuples: self.__print("Empty blocks between files (filename = name of the previous file):", empty_space_tuples, columns=[1,2,3,6]) - def __print(self, title:str, lines_tuples, columns:list = list(range(0,7)), infos:str = ""): - stats_buffer = "#"*100+f"\n# {title}\n"+"#"*100+f"\n{infos}|"+"-"*99+"\n" - if 0 in columns: stats_buffer += "| Index "; - if 1 in columns: stats_buffer += "| b offset "; - if 2 in columns: stats_buffer += "| e offset "; - if 3 in columns: stats_buffer += "| length "; - if 4 in columns: stats_buffer += "| YYYY-mm-dd HH:MM:SS "; - if 5 in columns: stats_buffer += "| FD last "; - if 6 in columns: stats_buffer += "| Filename"; - stats_buffer += "\n|"+"-"*99+"\n" - for line in lines_tuples: - stats_buffer += line if type(line) == str else "| "+" | ".join(line)+"\n" - print(stats_buffer, end='') - # end offset not included (0,1) -> len=1 - def __get_offsets_map(self): - # offsets_map is used to check next used offset when updating files - # we also check if there is intersect between files - offsets_map = [(0, len(self.__tableofcontent))] - for i in range(0, self.__file_count): - file_offset = self.__get_file_offset(i) - offsets_map.append( (file_offset, file_offset + self.__get_file_len(i)) ) - if self.__filenamedirectory: - filenamedirectory_offset = self.__get_filenamedirectory_offset() - offsets_map.append( (filenamedirectory_offset, filenamedirectory_offset + self.__get_filenamedirectory_len()) ) - offsets_map.sort(key=lambda x: x[0]) - - # Check if there is problems in file memory mapping - last_tuple = (-1, -1) - for i, offsets_tuple in enumerate(offsets_map): - if offsets_tuple[0] < last_tuple[1]: - raise Exception(f"Error - Multiple files use same file offsets ranges.") - last_tuple = offsets_tuple - offsets_map[i] = offsets_tuple[0] - return offsets_map - # end offset not included (0,1) -> len=1 - def __get_formated_map(self): - files_map = [("SYS TOC ", "00000000", f"{len(self.__tableofcontent):08x}", f"{len(self.__tableofcontent):08x}", "SYS TOC"+' '*12, "SYS TOC ", "SYS TOC")] - - for i in range(0, self.__file_count): - file_offset = self.__get_file_offset(i) - file_len = self.__get_file_len(i) - file_date = datetime.fromtimestamp(self.__get_mtime(i)).strftime("%Y-%m-%d %H:%M:%S") if self.__filenamedirectory else " "*19 - filename = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}" - fdlast = f"{self.__get_file_fdlast(i):08x}" if self.__filenamedirectory else " "*8 - files_map.append((f"{i:08x}", f"{file_offset:08x}", f"{file_offset + file_len:08x}", f"{file_len:08x}", file_date, fdlast, filename)) - - if self.__filenamedirectory: - files_map.append(("SYS FD ", f"{self.__filenamedirectory_offset:08x}", \ - f"{self.__filenamedirectory_offset + len(self.__filenamedirectory):08x}", \ - f"{len(self.__filenamedirectory):08x}", "SYS FD"+' '*13, "SYS FD ", "SYS FD")) - return files_map def get_argparser(): @@ -393,10 +652,10 @@ def get_argparser(): parser.add_argument('output_path', metavar='OUTPUT', help='', nargs='?', default="") group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-p', '--pack', action='store_true', help="-p source_folder (dest_file.afs): Pack source_folder in new file source_folder.afs or dest_file.afs if specified.") - group.add_argument('-u', '--unpack', action='store_true', help="-u source_afs.afs (dest_folder): Unpack the AFS in new folder source_afs or dest_folder if specified.") + group.add_argument('-p', '--pack', action='store_true', help="-p source_folder (dest_file.afs): Pack source_folder in new file source_folder.afs or dest_file.afs if specified.") + group.add_argument('-u', '--unpack', action='store_true', help="-u source_afs.afs (dest_folder): Unpack the AFS in new folder source_afs or dest_folder if specified.") group.add_argument('-s', '--stats', action='store_true', help="-s source_afs.afs or source_folder: Get stats about AFS, files, memory, lengths and offsets.") - group.add_argument('-r', '--rebuild', help="-r source_folder: Rebuild AFS tableofcontent (TOC) and filenamedirectory (FD) using rebuild.conf file.") + group.add_argument('-r', '--rebuild', action='store_true', help="-r source_folder: Rebuild AFS tableofcontent (TOC) and filenamedirectory (FD) using afs_rebuild.conf file and afs_rebuild.csv.") return parser @@ -425,3 +684,8 @@ if __name__ == '__main__': afs.unpack( p_input, p_output ) elif args.stats: afs.stats(p_input) + elif args.rebuild: + if not (p_input / "sys").is_dir(): + raise AfsInvalidAfsFolderError(f"Error - Invalid unpacked AFS: {p_input}.") + logging.info(f"rebuilding {p_input}") + afs.rebuild(p_input)