mirror of
				https://github.com/Virtual-World-RE/NeoGF.git
				synced 2025-10-31 18:30:31 +01:00 
			
		
		
		
	Update afstool.py
added stats added pack update using next file offset as max length except if last file and no FD
This commit is contained in:
		
							
								
								
									
										496
									
								
								afstool.py
									
									
									
									
									
								
							
							
						
						
									
										496
									
								
								afstool.py
									
									
									
									
									
								
							| @@ -2,16 +2,65 @@ | |||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| import logging | import logging | ||||||
|  | from math import ceil, floor | ||||||
| import os | import os | ||||||
| import re | import re | ||||||
| import time | import time | ||||||
|  |  | ||||||
| __version__ = "0.0.2" |  | ||||||
|  | __version__ = "0.0.3" | ||||||
| __author__ = "rigodron, algoflash, GGLinnk" | __author__ = "rigodron, algoflash, GGLinnk" | ||||||
| __license__ = "MIT" | __license__ = "MIT" | ||||||
| __status__ = "developpement" | __status__ = "developpement" | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class AfsInvalidFileLenError(Exception): pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class FilenameResolver: | ||||||
|  |     __sys_path = None | ||||||
|  |     __names_tuples = None | ||||||
|  |     __resolve_buffer = "" | ||||||
|  |     __separator = '/' | ||||||
|  |     def __init__(self, sys_path:Path): | ||||||
|  |         self.__sys_path = sys_path | ||||||
|  |         self.__names_tuples = {} | ||||||
|  |         self.__load() | ||||||
|  |     def __load(self): | ||||||
|  |         if (self.__sys_path / "filename_resolver.txt").is_file(): | ||||||
|  |             self.__resolve_buffer = (self.__sys_path / "filename_resolver.txt").read_text() | ||||||
|  |             for line in self.__resolve_buffer.split('\n'): | ||||||
|  |                 name_tuple = line.split(self.__separator) | ||||||
|  |                 self.__names_tuples[name_tuple[2]] = (int(name_tuple[0]), name_tuple[1]) | ||||||
|  |     def save(self): | ||||||
|  |         if len(self.__resolve_buffer) > 0: | ||||||
|  |             logging.info("Writting filename_resolver.txt") | ||||||
|  |             (self.__sys_path / "filename_resolver.txt").write_text(self.__resolve_buffer[:-1]) | ||||||
|  |     # resolve generate a unique filename when unpacking | ||||||
|  |     def resolve_new(self, fileindex:int, filename:str): | ||||||
|  |         if filename in self.__names_tuples: | ||||||
|  |             if self.__names_tuples[filename][0] == fileindex: | ||||||
|  |                 return filename | ||||||
|  |             i = 1 | ||||||
|  |             new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}" | ||||||
|  |             while new_filename in self.__names_tuples: | ||||||
|  |                 if self.__names_tuples[new_filename][0] == fileindex: | ||||||
|  |                     return new_filename | ||||||
|  |                 i+=1 | ||||||
|  |                 new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}" | ||||||
|  |             self.__names_tuples[new_filename] = (fileindex, filename) | ||||||
|  |             self.__resolve_buffer += f"{fileindex}{self.__separator}{filename}{self.__separator}{new_filename}\n" | ||||||
|  |             return new_filename | ||||||
|  |         self.__names_tuples[filename] = (fileindex, filename) | ||||||
|  |         return filename | ||||||
|  |     # return generated filename if it exist else filename | ||||||
|  |     def resolve_from_index(self, fileindex:int, filename:str): | ||||||
|  |         for filename_key, name_tuple in self.__names_tuples.items(): | ||||||
|  |             if name_tuple[0] == fileindex: | ||||||
|  |                 return filename_key | ||||||
|  |         return filename | ||||||
|  |  | ||||||
|  |  | ||||||
| # http://wiki.xentax.com/index.php/GRAF:AFS_AFS | # http://wiki.xentax.com/index.php/GRAF:AFS_AFS | ||||||
| class Afs: | class Afs: | ||||||
|     MAGIC_00 = b"AFS\x00" |     MAGIC_00 = b"AFS\x00" | ||||||
| @@ -19,150 +68,319 @@ class Afs: | |||||||
|     ALIGN = 0x800 |     ALIGN = 0x800 | ||||||
|     HEADER_LEN = 8 |     HEADER_LEN = 8 | ||||||
|     FILENAMEBLOCK_ENTRY_LEN = 0x30 |     FILENAMEBLOCK_ENTRY_LEN = 0x30 | ||||||
|     __len = None |     __file_count = None | ||||||
|     __file_number = None |     __filenamedirectory_offset_offset = None | ||||||
|     __filenameblock_offset_offset = None |     __filenamedirectory_offset = None | ||||||
|     __filenameblock_offset = None |     __filenamedirectory_len = None | ||||||
|     __filenameblock_len = None |     __filenamedirectory = None | ||||||
|     __filenameblock = None |  | ||||||
|     __tableofcontent = None |     __tableofcontent = None | ||||||
|     def unpack(self, afs_path:Path, folder_path:Path): |     def __get_magic(self): | ||||||
|         sys_path = folder_path / "sys" |         return bytes(self.__tableofcontent[0:4]) | ||||||
|         root_path = folder_path / "root" |     def __get_file_count(self): | ||||||
|         sys_path.mkdir(parents=True, exist_ok=True) |         return int.from_bytes(self.__tableofcontent[4:8], "little") | ||||||
|         root_path.mkdir(exist_ok=True) |     def __get_filenamedirectory_offset(self): | ||||||
|         self.__len = afs_path.stat().st_size |         return int.from_bytes(self.__tableofcontent[self.__filenamedirectory_offset_offset:self.__filenamedirectory_offset_offset+4], "little") | ||||||
|         with afs_path.open("rb") as afs_file: |     def __get_filenamedirectory_len(self): | ||||||
|             self.__tableofcontent = afs_file.read(Afs.HEADER_LEN) |         return int.from_bytes(self.__tableofcontent[self.__filenamedirectory_offset_offset+4:self.__filenamedirectory_offset_offset+8], "little") | ||||||
|             if self.__get_magic() not in [Afs.MAGIC_00, Afs.MAGIC_20]: |     def __get_file_offset(self, fileindex:int): | ||||||
|                 raise Exception("Invalid AFS magic number.") |         return int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+fileindex*8:Afs.HEADER_LEN+fileindex*8+4], "little") | ||||||
|             self.__file_number = int.from_bytes(self.__tableofcontent[4:8], "little") |     def __get_file_len(self, fileindex:int): | ||||||
|             self.__tableofcontent += afs_file.read(self.__file_number*8) |         return int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8], "little") | ||||||
|  |     def __get_file_name(self, fileindex:int): | ||||||
|             (self.__filenameblock_offset_offset, self.__filenameblock_offset) = self.__get_next_uint32(afs_file, Afs.HEADER_LEN+self.__file_number*8) |         return self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8") | ||||||
|  |     def __get_file_fdlast(self, fileindex:int): | ||||||
|             afs_file.seek(self.__filenameblock_offset_offset+4) |         return int.from_bytes(self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48], "little") | ||||||
|             self.__filenameblock_len = int.from_bytes(afs_file.read(4), "little") |     def __get_mtime(self, fileindex:int): | ||||||
|              |         mtime_data = self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44] | ||||||
|             if not self.__load_filenameblock(afs_file): |         year   = int.from_bytes(mtime_data[0:2], "little") | ||||||
|                 logging.info("There is no filename block. Creating new names and dates for files.") |         month  = int.from_bytes(mtime_data[2:4], "little") | ||||||
|             else: |         day    = int.from_bytes(mtime_data[4:6], "little") | ||||||
|                 logging.debug(f"Filenameblock_offset:0x{self.__filenameblock_offset:x}, filenameblock_len:0x{self.__filenameblock_len:x}.") |         hour   = int.from_bytes(mtime_data[6:8], "little") | ||||||
|                 afs_file.seek(len(self.__tableofcontent)) |         minute = int.from_bytes(mtime_data[8:10], "little") | ||||||
|                 self.__tableofcontent += afs_file.read(self.__filenameblock_offset_offset+8 - len(self.__tableofcontent)) |         second = int.from_bytes(mtime_data[10:12], "little") | ||||||
|                  |         return time.mktime(datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second).timetuple()) | ||||||
|                 logging.info("Writting sys/filenameblock.bin") |     def __patch_file_len(self, fileindex:int, file_len:int): | ||||||
|                 (sys_path / "filenameblock.bin").write_bytes(self.__filenameblock) |         # Patch file_len in the FD | ||||||
|             logging.info("Writting sys/tableofcontent.bin") |         if self.__filenamedirectory: | ||||||
|             (sys_path / "tableofcontent.bin").write_bytes(self.__tableofcontent) |             if self.__get_file_len(fileindex) == self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48]: | ||||||
|             self.__extract_files(root_path, afs_file) |                 self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48] = file_len.to_bytes(4, "little") | ||||||
|     def pack(self, folder_path:Path, afs_path:Path = None): |         # Patch file_len in the TOC | ||||||
|         if afs_path == None: |         self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8] = file_len.to_bytes(4, "little") | ||||||
|             afs_path = folder_path / Path(folder_path.name).with_suffix(".afs") |     def __patch_mtime(self, fileindex:int, mtime): | ||||||
|         sys_path = folder_path / "sys" |         mtime = datetime.fromtimestamp(mtime) | ||||||
|         root_path = folder_path / "root" |         self.__filenamedirectory[Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+32:Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+44] = \ | ||||||
|         with afs_path.open("wb") as afs_file, (sys_path / "tableofcontent.bin").open("rb") as tableofcontent_file: |             mtime.year.to_bytes(2,"little")+ \ | ||||||
|             logging.debug(f"Writting {sys_path}/tableofcontent.bin in AFS.") |             mtime.month.to_bytes(2,"little")+ \ | ||||||
|             self.__tableofcontent = tableofcontent_file.read() |             mtime.day.to_bytes(2,"little")+ \ | ||||||
|             self.__file_number = int.from_bytes(self.__tableofcontent[4:8], "little") |             mtime.hour.to_bytes(2,"little")+ \ | ||||||
|             afs_file.write(self.__pad(self.__tableofcontent)) |             mtime.minute.to_bytes(2,"little")+\ | ||||||
|             if (sys_path / "filenameblock.bin").is_file(): |             mtime.second.to_bytes(2,"little") | ||||||
|                 (self.__filenameblock_offset_offset, self.__filenameblock_offset) = self.__get_next_uint32(tableofcontent_file, Afs.HEADER_LEN+self.__file_number*8, (sys_path / "tableofcontent.bin").stat().st_size) |  | ||||||
|                 self.__filenameblock_len = int.from_bytes(self.__tableofcontent[self.__filenameblock_offset_offset+4:self.__filenameblock_offset_offset+8], "little") |  | ||||||
|                 self.__filenameblock = (sys_path / "filenameblock.bin").read_bytes() |  | ||||||
|             for i in range(0, self.__file_number): |  | ||||||
|                 file_offset = int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+i*8:Afs.HEADER_LEN+i*8+4], "little") |  | ||||||
|                 file_len    = int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+i*8+4:Afs.HEADER_LEN+i*8+8], "little") |  | ||||||
|                 if self.__filenameblock != None : |  | ||||||
|                     filename = self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8") |  | ||||||
|                 else: |  | ||||||
|                     filename = f"{i:08}" |  | ||||||
|                 logging.debug(f"Packing {root_path/filename} 0x{file_offset:x}:0x{file_offset+file_len:x} in iso.") |  | ||||||
|                 afs_file.seek(file_offset) |  | ||||||
|                 afs_file.write(self.__pad((root_path / filename).read_bytes())) |  | ||||||
|             if self.__filenameblock != None: |  | ||||||
|                 afs_file.write(self.__pad(self.__filenameblock)) |  | ||||||
|     def rebuild(self, folder_path:Path): |  | ||||||
|         raise Exception("Not implemented yet") |  | ||||||
|     def list(self, path:Path): |  | ||||||
|         raise Exception("Not implemented yet") |  | ||||||
|         if path.is_file(): |  | ||||||
|             self.__list_afs(path) |  | ||||||
|         else: |  | ||||||
|             self.__list_afsdir(path) |  | ||||||
|     def __pad(self, data:bytes): |     def __pad(self, data:bytes): | ||||||
|         if len(data) % self.ALIGN != 0: |         if len(data) % self.ALIGN != 0: | ||||||
|             data += b"\x00" * (self.ALIGN - (len(data) % self.ALIGN)) |             data += b"\x00" * (self.ALIGN - (len(data) % self.ALIGN)) | ||||||
|         return data |         return data | ||||||
|     def __extract_files(self, root_path:Path, afs_file): |     def __clean_filenamedirectory(self): | ||||||
|         logging.info(f"Extracting {self.__file_number} files.") |         self.__filenamedirectory = None | ||||||
|         for i in range(0, self.__file_number): |         self.__filenamedirectory_offset = None | ||||||
|             file_offset = int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+i*8:Afs.HEADER_LEN+i*8+4], "little") |         self.__filenamedirectory_len = None | ||||||
|             file_len    = int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+i*8+4:Afs.HEADER_LEN+i*8+8], "little") |     def __loadsys_from_afs(self, afs_file, afs_len:int): | ||||||
|             if self.__filenameblock != None : |             self.__tableofcontent = afs_file.read(Afs.HEADER_LEN) | ||||||
|                 filename    = self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8") |             if self.__get_magic() not in [Afs.MAGIC_00, Afs.MAGIC_20]: | ||||||
|                 year        = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+32:i*Afs.FILENAMEBLOCK_ENTRY_LEN+34], "little") |                 raise Exception("Invalid AFS magic number.") | ||||||
|                 month       = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+34:i*Afs.FILENAMEBLOCK_ENTRY_LEN+36], "little") |             self.__file_count = self.__get_file_count() | ||||||
|                 day         = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+36:i*Afs.FILENAMEBLOCK_ENTRY_LEN+38], "little") |             self.__tableofcontent += afs_file.read(self.__file_count*8) | ||||||
|                 hour        = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+38:i*Afs.FILENAMEBLOCK_ENTRY_LEN+40], "little") |             tableofcontent_len = len(self.__tableofcontent) | ||||||
|                 minute      = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+40:i*Afs.FILENAMEBLOCK_ENTRY_LEN+42], "little") |  | ||||||
|                 second      = int.from_bytes(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN+42:i*Afs.FILENAMEBLOCK_ENTRY_LEN+44], "little") |  | ||||||
|                 mtime = time.mktime(datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second).timetuple()) |  | ||||||
|             else: |  | ||||||
|                 filename = f"{i:08}" |  | ||||||
|  |  | ||||||
|             logging.debug(f"Writting {root_path/filename} 0x{file_offset:x}:0x{file_offset+file_len:x}") |             offset = tableofcontent_len | ||||||
|  |  | ||||||
|  |             tmp_block = int.from_bytes(afs_file.read(4), "little") | ||||||
|  |             if tmp_block != 0: | ||||||
|  |                 self.__filenamedirectory_offset_offset = offset | ||||||
|  |                 self.__filenamedirectory_offset = tmp_block | ||||||
|  |             else: | ||||||
|  |                 # If filenamedirectory_offset is not directly after the files offsets and lens | ||||||
|  |                 # --> we search the next uint32 != 0 | ||||||
|  |                 offset += 4 | ||||||
|  |                 block_len = 0x800 | ||||||
|  |                 tmp_block = afs_file.read(block_len) | ||||||
|  |                 while tmp_block: | ||||||
|  |                     match = re.search(b"^(?:\x00{4})*(?!\x00{4})(.{4})", tmp_block) # match next uint32 | ||||||
|  |                     if match: | ||||||
|  |                         self.__filenamedirectory_offset_offset = offset + match.start(1) | ||||||
|  |                         self.__filenamedirectory_offset = int.from_bytes(match[1], "little") | ||||||
|  |                         break | ||||||
|  |                     offset += block_len | ||||||
|  |                     tmp_block = afs_file.read(block_len) | ||||||
|  |  | ||||||
|  |             if self.__filenamedirectory_offset is None: | ||||||
|  |                 raise Exception("Error - Empty AFS.") | ||||||
|  |  | ||||||
|  |             afs_file.seek(self.__filenamedirectory_offset_offset+4) | ||||||
|  |             self.__filenamedirectory_len = int.from_bytes(afs_file.read(4), "little") | ||||||
|  |  | ||||||
|  |             # Test if offset of filenamedirectory is valid and if number of entries match between filenamedirectory and tableofcontent | ||||||
|  |             if self.__filenamedirectory_offset + self.__filenamedirectory_len > afs_len or \ | ||||||
|  |                self.__filenamedirectory_offset < self.__filenamedirectory_offset_offset or \ | ||||||
|  |                (tableofcontent_len - self.HEADER_LEN) / 8 != self.__filenamedirectory_len / Afs.FILENAMEBLOCK_ENTRY_LEN: | ||||||
|  |                 self.__clean_filenamedirectory() | ||||||
|  |                 return False | ||||||
|  |  | ||||||
|  |             afs_file.seek(self.__filenamedirectory_offset) | ||||||
|  |             self.__filenamedirectory = afs_file.read(self.__filenamedirectory_len) | ||||||
|  |  | ||||||
|  |             # Test if filename is correct by very basic pattern matching | ||||||
|  |             pattern = re.compile(b"^(?=.{32}$)[^\x00]+\x00+$") | ||||||
|  |             for i in range(0, self.__file_count): | ||||||
|  |                 if not pattern.fullmatch(self.__filenamedirectory[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32]): | ||||||
|  |                     self.__clean_filenamedirectory() | ||||||
|  |                     return False | ||||||
|  |  | ||||||
|  |             afs_file.seek(tableofcontent_len) | ||||||
|  |             self.__tableofcontent += afs_file.read(self.__filenamedirectory_offset_offset+8 - tableofcontent_len) | ||||||
|  |             return True | ||||||
|  |     def __loadsys_from_folder(self, sys_path:Path): | ||||||
|  |         self.__tableofcontent = bytearray( (sys_path / "tableofcontent.bin").read_bytes() ) | ||||||
|  |         self.__file_count = self.__get_file_count() | ||||||
|  |  | ||||||
|  |         # If there is a filenamedirectory we load it | ||||||
|  |         if (sys_path / "filenamedirectory.bin").is_file(): | ||||||
|  |             self.__filenamedirectory = bytearray((sys_path / "filenamedirectory.bin").read_bytes()) | ||||||
|  |             self.__filenamedirectory_offset_offset = len(self.__tableofcontent) - 8 | ||||||
|  |             self.__filenamedirectory_offset = self.__get_filenamedirectory_offset() | ||||||
|  |             self.__filenamedirectory_len = self.__get_filenamedirectory_len() | ||||||
|  |             if self.__filenamedirectory_len != len(self.__filenamedirectory): | ||||||
|  |                 raise Exception("Error - Tableofcontent filenamedirectory length does not match real filenamedirectory length.") | ||||||
|  |     def unpack(self, afs_path:Path, folder_path:Path): | ||||||
|  |         sys_path = folder_path / "sys" | ||||||
|  |         root_path = folder_path / "root" | ||||||
|  |         sys_path.mkdir(parents=True) | ||||||
|  |         root_path.mkdir() | ||||||
|  |  | ||||||
|  |         with afs_path.open("rb") as afs_file: | ||||||
|  |             if not self.__loadsys_from_afs(afs_file, afs_path.stat().st_size): | ||||||
|  |                 logging.info("There is no filename directory. Creating new names and dates for files.") | ||||||
|  |             else: | ||||||
|  |                 logging.debug(f"filenamedirectory_offset:0x{self.__filenamedirectory_offset:x}, filenamedirectory_len:0x{self.__filenamedirectory_len:x}.") | ||||||
|  |                 logging.info("Writting sys/filenamedirectory.bin") | ||||||
|  |                 (sys_path / "filenamedirectory.bin").write_bytes(self.__filenamedirectory) | ||||||
|  |                 resolver = FilenameResolver(sys_path) | ||||||
|  |  | ||||||
|  |             logging.info("Writting sys/tableofcontent.bin") | ||||||
|  |             (sys_path / "tableofcontent.bin").write_bytes(self.__tableofcontent) | ||||||
|  |  | ||||||
|  |             logging.info(f"Extracting {self.__file_count} files.") | ||||||
|  |             for i in range(0, self.__file_count): | ||||||
|  |                 file_offset = self.__get_file_offset(i) | ||||||
|  |                 file_len    = self.__get_file_len(i) | ||||||
|  |                 filename    = resolver.resolve_new(i, self.__get_file_name(i)) if self.__filenamedirectory else f"{i:08}" | ||||||
|  |                  | ||||||
|  |                 logging.debug(f"Writting {root_path / filename} 0x{file_offset:x}:0x{file_offset + file_len:x}") | ||||||
|                 afs_file.seek(file_offset) |                 afs_file.seek(file_offset) | ||||||
|                 (root_path / filename).write_bytes(afs_file.read(file_len)) |                 (root_path / filename).write_bytes(afs_file.read(file_len)) | ||||||
|             if self.__filenameblock != None : |  | ||||||
|                 os.utime(root_path/filename, (mtime, mtime)) |  | ||||||
|     def __load_filenameblock(self, afs_file): |  | ||||||
|         if self.__filenameblock_offset + self.__filenameblock_len > self.__len or \ |  | ||||||
|            self.__filenameblock_offset < self.__filenameblock_offset_offset or \ |  | ||||||
|            (len(self.__tableofcontent) - self.HEADER_LEN) / 8 != self.__filenameblock_len / Afs.FILENAMEBLOCK_ENTRY_LEN: |  | ||||||
|             self.__clean_filenameblock() |  | ||||||
|             return False |  | ||||||
|         afs_file.seek(self.__filenameblock_offset) |  | ||||||
|         self.__filenameblock = afs_file.read(self.__filenameblock_len) |  | ||||||
|  |  | ||||||
|         pattern = re.compile(b"^(?=.{32}$)[^\x00]+\x00+$") |                 if self.__filenamedirectory: | ||||||
|         for i in range(0, self.__file_number): |                     mtime = self.__get_mtime(i) | ||||||
|             if not pattern.fullmatch(self.__filenameblock[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32]): |                     os.utime(root_path / filename, (mtime, mtime)) | ||||||
|                 self.__clean_filenameblock() |             if self.__filenamedirectory: | ||||||
|                 return False |                 resolver.save() | ||||||
|         return True |     def pack(self, folder_path:Path, afs_path:Path = None): | ||||||
|     def __clean_filenameblock(self): |         if afs_path == None: | ||||||
|         self.__filenameblock = None |             afs_path = folder_path / Path(folder_path.name).with_suffix(".afs") | ||||||
|         self.__filenameblock_offset = None |         elif afs_path.suffix != ".afs": | ||||||
|         self.__filenameblock_len = None |             logging.warning("Dest file should have .afs file extension.") | ||||||
|     def __get_magic(self): |  | ||||||
|         return self.__tableofcontent[0:4] |         sys_path = folder_path / "sys" | ||||||
|     # return a tuple with (offset:int, value:int) |         root_path = folder_path / "root" | ||||||
|     def __get_next_uint32(self, file, offset:int, file_len=None): |  | ||||||
|         if file_len == None: |         self.__loadsys_from_folder(sys_path) | ||||||
|             file_len = self.__len |  | ||||||
|         file.seek(offset) |         if self.__filenamedirectory: | ||||||
|         next_uint32 = int.from_bytes(file.read(4), "little") |             resolver = FilenameResolver(sys_path) | ||||||
|         if next_uint32 != 0: |  | ||||||
|             return (offset, next_uint32) |         offsets_map = self.__get_offsets_map() | ||||||
|         offset += 4 |         with afs_path.open("wb") as afs_file: | ||||||
|         # If filename_block_offset is not directly after the files offsets and lens |             # We update files | ||||||
|         # --> we search the next uint32 != 0 |             for i in range(0, self.__file_count): | ||||||
|         while offset < file_len: |                 file_offset = self.__get_file_offset(i) | ||||||
|             max_size = 0x800 |                 file_len    = self.__get_file_len(i) | ||||||
|             if offset + max_size > file_len: |                 filename    = resolver.resolve_from_index(i, self.__get_file_name(i)) if self.__filenamedirectory else f"{i:08}" | ||||||
|                 max_size = file_len - offset |  | ||||||
|             tmp_block = file.read(max_size) |                 file_path = root_path / filename | ||||||
|             for i in range(0, max_size, 4): |                 new_file_len = file_path.stat().st_size | ||||||
|                 next_uint32 = int.from_bytes(tmp_block[i:i+4], "little") |                  | ||||||
|                 if next_uint32 != 0: |                 if new_file_len != file_len: | ||||||
|                     return (offset+i, next_uint32) |                     next_offset = None | ||||||
|             offset += max_size |                     # If no FD, we can raise AFS length without constraint | ||||||
|         raise Exception("Empty AFS file.") |                     if offsets_map.index(file_offset) + 1 < len(offsets_map): | ||||||
|     def __list_afs(self, afs_path:Path): |                         next_offset = offsets_map[offsets_map.index(file_offset)+1] | ||||||
|         pass |                     if next_offset: | ||||||
|     def __list_afsdir(self, root_dir:Path): |                         if file_offset + new_file_len > next_offset: | ||||||
|         pass |                             raise AfsInvalidFileLenError(f"File {file_path} as a new file_len (0x{new_file_len:x}) > next file offset (0x{next_offset:x}). "\ | ||||||
|  |                                 "This means that we have to rebuild the AFS using -r and changing offset of all next files and this could lead to bugs if the main dol use AFS relative file offsets.") | ||||||
|  |                     self.__patch_file_len(i, new_file_len) | ||||||
|  |                 # If there is a filenamedirectory we update mtime : | ||||||
|  |                 if self.__filenamedirectory: | ||||||
|  |                     self.__patch_mtime(i, round(file_path.stat().st_mtime)) | ||||||
|  |                 logging.debug(f"Packing {file_path} 0x{file_offset:x}:0x{file_offset+new_file_len:x} in AFS.") | ||||||
|  |                 afs_file.seek(file_offset) | ||||||
|  |                 afs_file.write(self.__pad(file_path.read_bytes())) | ||||||
|  |             if self.__filenamedirectory: | ||||||
|  |                 afs_file.seek(self.__filenamedirectory_offset) | ||||||
|  |                 afs_file.write(self.__pad(self.__filenamedirectory)) | ||||||
|  |             logging.debug(f"Packing {sys_path}/tableofcontent.bin at the beginning of the AFS.") | ||||||
|  |             afs_file.seek(0) | ||||||
|  |             afs_file.write(self.__tableofcontent) | ||||||
|  |     def rebuild(self, folder_path:Path): | ||||||
|  |         raise Exception("Not implemented yet") | ||||||
|  |     def stats(self, path:Path): | ||||||
|  |         if path.is_file(): | ||||||
|  |             with path.open("rb") as afs_file: | ||||||
|  |                 self.__loadsys_from_afs(afs_file, path.stat().st_size) | ||||||
|  |         else: | ||||||
|  |             self.__loadsys_from_folder(path / "sys") | ||||||
|  |  | ||||||
|  |         files_map = self.__get_formated_map() | ||||||
|  |         files_map.sort(key=lambda x: x[1]) # sort by offset | ||||||
|  |  | ||||||
|  |         # Offsets intersect | ||||||
|  |         dup_offsets_tuples = [] | ||||||
|  |         last_tuple = (-1, "-1", "0") # empty space search init | ||||||
|  |         new_set = True | ||||||
|  |         # Filenames duplicates | ||||||
|  |         dup_names_dict = {} # tmp dict for grouping by filename | ||||||
|  |         dup_names_tuples = [] | ||||||
|  |         # For empty blocks | ||||||
|  |         empty_space_tuples = [] | ||||||
|  |         for file_tuple in files_map: | ||||||
|  |             # Filenames duplicates | ||||||
|  |             if not file_tuple[6] in dup_names_dict: | ||||||
|  |                 dup_names_dict[file_tuple[6]] = [file_tuple] | ||||||
|  |             else: | ||||||
|  |                 dup_names_dict[file_tuple[6]].append(file_tuple) | ||||||
|  |             # Offsets intersect | ||||||
|  |             if file_tuple[1] < last_tuple[1]: | ||||||
|  |                 if new_set: | ||||||
|  |                     dup_offsets_tuples.append("Files sharing same offsets:\n") | ||||||
|  |                     new_set = False | ||||||
|  |                 dup_offsets_tuples.append(file_tuple) | ||||||
|  |             else: | ||||||
|  |                 new_set = True | ||||||
|  |             # Empty blocks | ||||||
|  |             last_block_end = ceil(int(last_tuple[2], base=16) / Afs.ALIGN) * Afs.ALIGN | ||||||
|  |             if int(file_tuple[1], base=16) - last_block_end >= Afs.ALIGN: | ||||||
|  |                 empty_space_tuples.append( (last_tuple[2], file_tuple[1], f"{int(file_tuple[1], base=16) - int(last_tuple[2], base=16):08x}", file_tuple[6]) ) | ||||||
|  |             last_tuple = file_tuple | ||||||
|  |  | ||||||
|  |         for filename in dup_names_dict: | ||||||
|  |             if len(dup_names_dict[filename]) > 1: | ||||||
|  |                 dup_names_tuples += ["Files sharing same name:\n"] + [file_tuple for file_tuple in dup_names_dict[filename]] | ||||||
|  |  | ||||||
|  |         dup_offsets = "Yes" if len(dup_offsets_tuples) > 1 else "No" | ||||||
|  |         dup_names   = "Yes" if len(dup_names_tuples) > 1 else "No" | ||||||
|  |         empty_space = "Yes" if len(empty_space_tuples) > 1 else "No" | ||||||
|  |  | ||||||
|  |         files_info = f"AFS Magic/Version                : {str(self.__get_magic())[2:-1]}\n"+\ | ||||||
|  |             f"TOC offset of the FD offset      : 0x{self.__filenamedirectory_offset_offset:x}\n"+\ | ||||||
|  |             f"Multiple files using same offsets: {dup_offsets}\n"+\ | ||||||
|  |             f"Multiple files using same name   : {dup_names}\n"+\ | ||||||
|  |             f"Empty blocks                     : {empty_space}\n" | ||||||
|  |         self.__print("Global infos and AFS space mapping:", files_map, infos=files_info) | ||||||
|  |         if dup_offsets_tuples: | ||||||
|  |             self.__print("Files sharing same AFS offsets:", dup_offsets_tuples) | ||||||
|  |         if dup_names_tuples: | ||||||
|  |             self.__print("Files using same filenames:", dup_names_tuples) | ||||||
|  |         if empty_space_tuples: | ||||||
|  |             self.__print("Empty blocks between files (filename = name of the previous file):", empty_space_tuples, columns=[1,2,3,6]) | ||||||
|  |     def __print(self, title:str, lines_tuples, columns:list = list(range(0,7)), infos:str = ""): | ||||||
|  |         stats_buffer = "#"*100+f"\n# {title}\n"+"#"*100+f"\n{infos}|"+"-"*99+"\n" | ||||||
|  |         if 0 in columns: stats_buffer += "| Index    "; | ||||||
|  |         if 1 in columns: stats_buffer += "| b offset "; | ||||||
|  |         if 2 in columns: stats_buffer += "| e offset "; | ||||||
|  |         if 3 in columns: stats_buffer += "| length   "; | ||||||
|  |         if 4 in columns: stats_buffer += "| YYYY-mm-dd HH:MM:SS "; | ||||||
|  |         if 5 in columns: stats_buffer += "| FD last  "; | ||||||
|  |         if 6 in columns: stats_buffer += "| Filename"; | ||||||
|  |         stats_buffer += "\n|"+"-"*99+"\n" | ||||||
|  |         for line in lines_tuples: | ||||||
|  |             stats_buffer += line if type(line) == str else "| "+" | ".join(line)+"\n" | ||||||
|  |         print(stats_buffer, end='') | ||||||
|  |     # end offset not included (0,1) -> len=1 | ||||||
|  |     def __get_offsets_map(self): | ||||||
|  |         # offsets_map is used to check next used offset when updating files | ||||||
|  |         # we also check if there is intersect between files | ||||||
|  |         offsets_map = [(0, len(self.__tableofcontent))] | ||||||
|  |         for i in range(0, self.__file_count): | ||||||
|  |             file_offset = self.__get_file_offset(i) | ||||||
|  |             offsets_map.append( (file_offset, file_offset + self.__get_file_len(i)) ) | ||||||
|  |         if self.__filenamedirectory: | ||||||
|  |             filenamedirectory_offset = self.__get_filenamedirectory_offset() | ||||||
|  |             offsets_map.append( (filenamedirectory_offset, filenamedirectory_offset + self.__get_filenamedirectory_len()) ) | ||||||
|  |         offsets_map.sort(key=lambda x: x[0]) | ||||||
|  |  | ||||||
|  |         # Check if there is problems in file memory mapping | ||||||
|  |         last_tuple = (-1, -1) | ||||||
|  |         for i, offsets_tuple in enumerate(offsets_map): | ||||||
|  |             if offsets_tuple[0] < last_tuple[1]: | ||||||
|  |                 raise Exception(f"Error - Multiple files use same file offsets ranges.") | ||||||
|  |             last_tuple = offsets_tuple | ||||||
|  |             offsets_map[i] = offsets_tuple[0] | ||||||
|  |         return offsets_map | ||||||
|  |     # end offset not included (0,1) -> len=1 | ||||||
|  |     def __get_formated_map(self): | ||||||
|  |         files_map = [("SYS TOC ", "00000000", f"{len(self.__tableofcontent):08x}", f"{len(self.__tableofcontent):08x}", "SYS TOC"+' '*12, "SYS TOC ", "SYS TOC")] | ||||||
|  |  | ||||||
|  |         for i in range(0, self.__file_count): | ||||||
|  |             file_offset = self.__get_file_offset(i) | ||||||
|  |             file_len    = self.__get_file_len(i) | ||||||
|  |             file_date   = datetime.fromtimestamp(self.__get_mtime(i)).strftime("%Y-%m-%d %H:%M:%S") if self.__filenamedirectory else " "*19 | ||||||
|  |             filename    = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}" | ||||||
|  |             files_map.append((f"{i:08x}", f"{file_offset:08x}", f"{file_offset + file_len:08x}", f"{file_len:08x}", file_date, f"{self.__get_file_fdlast(i):08x}", filename)) | ||||||
|  |  | ||||||
|  |         if self.__filenamedirectory: | ||||||
|  |             files_map.append(("SYS FD  ", f"{self.__filenamedirectory_offset:08x}", \ | ||||||
|  |                 f"{self.__filenamedirectory_offset + len(self.__filenamedirectory):08x}", \ | ||||||
|  |                 f"{len(self.__filenamedirectory):08x}", "SYS FD"+' '*13, "SYS FD  ", "SYS FD")) | ||||||
|  |         return files_map | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_argparser(): | def get_argparser(): | ||||||
| @@ -176,7 +394,7 @@ def get_argparser(): | |||||||
|     group = parser.add_mutually_exclusive_group(required=True) |     group = parser.add_mutually_exclusive_group(required=True) | ||||||
|     group.add_argument('-p', '--pack',   action='store_true', help="-p source_folder (dest_file.afs) : Pack source_folder in new file source_folder.afs or dest_file.afs if specified.") |     group.add_argument('-p', '--pack',   action='store_true', help="-p source_folder (dest_file.afs) : Pack source_folder in new file source_folder.afs or dest_file.afs if specified.") | ||||||
|     group.add_argument('-u', '--unpack', action='store_true', help="-u source_afs.afs (dest_folder) : Unpack the AFS in new folder source_afs or dest_folder if specified.") |     group.add_argument('-u', '--unpack', action='store_true', help="-u source_afs.afs (dest_folder) : Unpack the AFS in new folder source_afs or dest_folder if specified.") | ||||||
|     group.add_argument('-l', '--list',   action='store_true', help="-l source_afs.afs or source_folder : List AFS files, length and offsets.") |     group.add_argument('-s', '--stats',   action='store_true', help="-s source_afs.afs or source_folder : Get stats about AFS, files, memory, lengths and offsets.") | ||||||
|     group.add_argument('-r', '--rebuild', help="-r source_folder fndo_offset : Rebuild AFS tableofcontent (TOC) and filenamedirectory (FND) using filenamedirectory_offset_offset=fndo_offset (the offset in the TOC).") |     group.add_argument('-r', '--rebuild', help="-r source_folder fndo_offset : Rebuild AFS tableofcontent (TOC) and filenamedirectory (FND) using filenamedirectory_offset_offset=fndo_offset (the offset in the TOC).") | ||||||
|     return parser |     return parser | ||||||
|  |  | ||||||
| @@ -204,5 +422,5 @@ if __name__ == '__main__': | |||||||
|             p_output = p_input.parent / p_input.stem |             p_output = p_input.parent / p_input.stem | ||||||
|         logging.info(f"unpacking AFS {p_input} in {p_output}") |         logging.info(f"unpacking AFS {p_input} in {p_output}") | ||||||
|         afs.unpack( p_input, p_output ) |         afs.unpack( p_input, p_output ) | ||||||
|     elif args.list: |     elif args.stats: | ||||||
|         afs.list(p_input) |         afs.stats(p_input) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user