mirror of
https://github.com/Virtual-World-RE/NeoGF.git
synced 2024-12-26 17:51:34 +01:00
Update afstest.py
This commit is contained in:
parent
9d432a824e
commit
db1c0ca6a0
677
afstest.py
677
afstest.py
|
@ -1,12 +1,10 @@
|
|||
#!/usr/bin/env python3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from math import ceil, floor
|
||||
from afstool import AfsInvalidFileLenError, Afs, FilenameResolver
|
||||
from math import floor, ceil
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from time import time
|
||||
|
||||
__version__ = "0.0.3"
|
||||
__author__ = "rigodron, algoflash, GGLinnk"
|
||||
|
@ -14,413 +12,288 @@ __license__ = "MIT"
|
|||
__status__ = "developpement"
|
||||
|
||||
|
||||
class AfsInvalidFileLenError(Exception): pass
|
||||
##################################################
|
||||
# Set afss_path with your AFSs folder
|
||||
# Set afspacker_path with the path of AFSPacker.exe
|
||||
##################################################
|
||||
afss_path = Path("afs")
|
||||
afspacker_path = Path("../_autres/_soft/AFSPacker.exe")
|
||||
|
||||
afspacker_unpack_path = Path("afspacker_unpack")
|
||||
# Created tmp paths
|
||||
unpack_path = Path("unpack")
|
||||
unpack2_path = Path("unpack2")
|
||||
repack_path = Path("repack")
|
||||
|
||||
|
||||
class FilenameResolver:
|
||||
__sys_path = None
|
||||
__names_tuples = None
|
||||
__resolve_buffer = ""
|
||||
__separator = '/'
|
||||
def __init__(self, sys_path:Path):
|
||||
self.__sys_path = sys_path
|
||||
self.__names_tuples = {}
|
||||
self.__load()
|
||||
def __load(self):
|
||||
if (self.__sys_path / "filename_resolver.txt").is_file():
|
||||
self.__resolve_buffer = (self.__sys_path / "filename_resolver.txt").read_text()
|
||||
for line in self.__resolve_buffer.split('\n'):
|
||||
name_tuple = line.split(self.__separator)
|
||||
self.__names_tuples[name_tuple[2]] = (int(name_tuple[0]), name_tuple[1])
|
||||
def save(self):
|
||||
if len(self.__resolve_buffer) > 0:
|
||||
logging.info("Writting filename_resolver.txt")
|
||||
(self.__sys_path / "filename_resolver.txt").write_text(self.__resolve_buffer[:-1])
|
||||
# resolve generate a unique filename when unpacking
|
||||
def resolve_new(self, fileindex:int, filename:str):
|
||||
if filename in self.__names_tuples:
|
||||
if self.__names_tuples[filename][0] == fileindex:
|
||||
return filename
|
||||
i = 1
|
||||
new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}"
|
||||
while new_filename in self.__names_tuples:
|
||||
if self.__names_tuples[new_filename][0] == fileindex:
|
||||
return new_filename
|
||||
i+=1
|
||||
new_filename = f"{Path(filename).stem} ({i}){Path(filename).suffix}"
|
||||
self.__names_tuples[new_filename] = (fileindex, filename)
|
||||
self.__resolve_buffer += f"{fileindex}{self.__separator}{filename}{self.__separator}{new_filename}\n"
|
||||
return new_filename
|
||||
self.__names_tuples[filename] = (fileindex, filename)
|
||||
return filename
|
||||
# return generated filename if it exist else filename
|
||||
def resolve_from_index(self, fileindex:int, filename:str):
|
||||
for filename_key, name_tuple in self.__names_tuples.items():
|
||||
if name_tuple[0] == fileindex:
|
||||
return filename_key
|
||||
return filename
|
||||
class AfsTest(Afs):
|
||||
# return a list of tuples with (offset, resolved filename)
|
||||
def get_range(self, folder_path:Path):
|
||||
sys_path = folder_path / "sys"
|
||||
self._Afs__loadsys_from_folder(sys_path)
|
||||
resolver = FilenameResolver(sys_path)
|
||||
|
||||
offsets_names_map = [(0, "SYS TOC")]
|
||||
for i in range(0, self._Afs__file_count):
|
||||
filename = resolver.resolve_from_index(i, self._Afs__get_file_name(i)) if self._Afs__filenamedirectory else f"{i:08}"
|
||||
offsets_names_map.append( (self._Afs__get_file_offset(i), filename) )
|
||||
if self._Afs__filenamedirectory:
|
||||
offsets_names_map.append( (self._Afs__get_filenamedirectory_offset(), "SYS FD") )
|
||||
return offsets_names_map
|
||||
|
||||
|
||||
# http://wiki.xentax.com/index.php/GRAF:AFS_AFS
|
||||
class Afs:
|
||||
MAGIC_00 = b"AFS\x00"
|
||||
MAGIC_20 = b"AFS\x20"
|
||||
ALIGN = 0x800
|
||||
HEADER_LEN = 8
|
||||
FILENAMEBLOCK_ENTRY_LEN = 0x30
|
||||
__file_count = None
|
||||
__filenamedirectory_offset_offset = None
|
||||
__filenamedirectory_offset = None
|
||||
__filenamedirectory_len = None
|
||||
__filenamedirectory = None
|
||||
__tableofcontent = None
|
||||
def __get_magic(self):
|
||||
return bytes(self.__tableofcontent[0:4])
|
||||
def __get_file_count(self):
|
||||
return int.from_bytes(self.__tableofcontent[4:8], "little")
|
||||
def __get_filenamedirectory_offset(self):
|
||||
return int.from_bytes(self.__tableofcontent[self.__filenamedirectory_offset_offset:self.__filenamedirectory_offset_offset+4], "little")
|
||||
def __get_filenamedirectory_len(self):
|
||||
return int.from_bytes(self.__tableofcontent[self.__filenamedirectory_offset_offset+4:self.__filenamedirectory_offset_offset+8], "little")
|
||||
def __get_file_offset(self, fileindex:int):
|
||||
return int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+fileindex*8:Afs.HEADER_LEN+fileindex*8+4], "little")
|
||||
def __get_file_len(self, fileindex:int):
|
||||
return int.from_bytes(self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8], "little")
|
||||
def __get_file_name(self, fileindex:int):
|
||||
return self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32].split(b"\x00")[0].decode("utf-8")
|
||||
def __get_file_fdlast(self, fileindex:int):
|
||||
return int.from_bytes(self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48], "little")
|
||||
def __get_mtime(self, fileindex:int):
|
||||
mtime_data = self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+32:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44]
|
||||
year = int.from_bytes(mtime_data[0:2], "little")
|
||||
month = int.from_bytes(mtime_data[2:4], "little")
|
||||
day = int.from_bytes(mtime_data[4:6], "little")
|
||||
hour = int.from_bytes(mtime_data[6:8], "little")
|
||||
minute = int.from_bytes(mtime_data[8:10], "little")
|
||||
second = int.from_bytes(mtime_data[10:12], "little")
|
||||
return time.mktime(datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second).timetuple())
|
||||
def __patch_file_len(self, fileindex:int, file_len:int):
|
||||
# Patch file_len in the FD
|
||||
if self.__filenamedirectory:
|
||||
if self.__get_file_len(fileindex) == self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48]:
|
||||
self.__filenamedirectory[fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+44:fileindex*Afs.FILENAMEBLOCK_ENTRY_LEN+48] = file_len.to_bytes(4, "little")
|
||||
# Patch file_len in the TOC
|
||||
self.__tableofcontent[Afs.HEADER_LEN+fileindex*8+4:Afs.HEADER_LEN+fileindex*8+8] = file_len.to_bytes(4, "little")
|
||||
def __patch_mtime(self, fileindex:int, mtime):
|
||||
mtime = datetime.fromtimestamp(mtime)
|
||||
self.__filenamedirectory[Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+32:Afs.FILENAMEBLOCK_ENTRY_LEN*fileindex+44] = \
|
||||
mtime.year.to_bytes(2,"little")+ \
|
||||
mtime.month.to_bytes(2,"little")+ \
|
||||
mtime.day.to_bytes(2,"little")+ \
|
||||
mtime.hour.to_bytes(2,"little")+ \
|
||||
mtime.minute.to_bytes(2,"little")+\
|
||||
mtime.second.to_bytes(2,"little")
|
||||
def __pad(self, data:bytes):
|
||||
if len(data) % self.ALIGN != 0:
|
||||
data += b"\x00" * (self.ALIGN - (len(data) % self.ALIGN))
|
||||
return data
|
||||
def __clean_filenamedirectory(self):
|
||||
self.__filenamedirectory = None
|
||||
self.__filenamedirectory_offset = None
|
||||
self.__filenamedirectory_len = None
|
||||
def __loadsys_from_afs(self, afs_file, afs_len:int):
|
||||
self.__tableofcontent = afs_file.read(Afs.HEADER_LEN)
|
||||
if self.__get_magic() not in [Afs.MAGIC_00, Afs.MAGIC_20]:
|
||||
raise Exception("Invalid AFS magic number.")
|
||||
self.__file_count = self.__get_file_count()
|
||||
self.__tableofcontent += afs_file.read(self.__file_count*8)
|
||||
tableofcontent_len = len(self.__tableofcontent)
|
||||
def print_paths_differences(folder1_paths:list, folder2_paths:list):
|
||||
backup_paths = folder2_paths.copy()
|
||||
for path in folder1_paths:
|
||||
if path in folder2_paths:
|
||||
folder2_paths.remove(path)
|
||||
for path in backup_paths:
|
||||
if path in folder1_paths:
|
||||
folder1_paths.remove(path)
|
||||
print("folder1 diff :")
|
||||
for path in folder1_paths:
|
||||
print(path)
|
||||
print("folder2 diff :")
|
||||
for path in folder2_paths:
|
||||
print(path)
|
||||
|
||||
offset = tableofcontent_len
|
||||
|
||||
tmp_block = int.from_bytes(afs_file.read(4), "little")
|
||||
if tmp_block != 0:
|
||||
self.__filenamedirectory_offset_offset = offset
|
||||
self.__filenamedirectory_offset = tmp_block
|
||||
else:
|
||||
# If filenamedirectory_offset is not directly after the files offsets and lens
|
||||
# --> we search the next uint32 != 0
|
||||
offset += 4
|
||||
block_len = 0x800
|
||||
tmp_block = afs_file.read(block_len)
|
||||
while tmp_block:
|
||||
match = re.search(b"^(?:\x00{4})*(?!\x00{4})(.{4})", tmp_block) # match next uint32
|
||||
if match:
|
||||
self.__filenamedirectory_offset_offset = offset + match.start(1)
|
||||
self.__filenamedirectory_offset = int.from_bytes(match[1], "little")
|
||||
break
|
||||
offset += block_len
|
||||
tmp_block = afs_file.read(block_len)
|
||||
|
||||
if self.__filenamedirectory_offset is None:
|
||||
raise Exception("Error - Empty AFS.")
|
||||
|
||||
afs_file.seek(self.__filenamedirectory_offset_offset+4)
|
||||
self.__filenamedirectory_len = int.from_bytes(afs_file.read(4), "little")
|
||||
|
||||
# Test if offset of filenamedirectory is valid and if number of entries match between filenamedirectory and tableofcontent
|
||||
if self.__filenamedirectory_offset + self.__filenamedirectory_len > afs_len or \
|
||||
self.__filenamedirectory_offset < self.__filenamedirectory_offset_offset or \
|
||||
(tableofcontent_len - self.HEADER_LEN) / 8 != self.__filenamedirectory_len / Afs.FILENAMEBLOCK_ENTRY_LEN:
|
||||
self.__clean_filenamedirectory()
|
||||
# compare two files
|
||||
def compare_files(file1_path:Path, file2_path:Path):
|
||||
CLUSTER_LEN = 131072
|
||||
with file1_path.open("rb") as file1, file2_path.open("rb") as file2:
|
||||
# Init
|
||||
bytes1 = file1.read(CLUSTER_LEN)
|
||||
bytes2 = file2.read(CLUSTER_LEN)
|
||||
while bytes1 or bytes2: # continue if bytes1 and bytes2 have a len > 0
|
||||
if bytes1 != bytes2:
|
||||
return False
|
||||
|
||||
afs_file.seek(self.__filenamedirectory_offset)
|
||||
self.__filenamedirectory = afs_file.read(self.__filenamedirectory_len)
|
||||
|
||||
# Test if filename is correct by very basic pattern matching
|
||||
pattern = re.compile(b"^(?=.{32}$)[^\x00]+\x00+$")
|
||||
for i in range(0, self.__file_count):
|
||||
if not pattern.fullmatch(self.__filenamedirectory[i*Afs.FILENAMEBLOCK_ENTRY_LEN:i*Afs.FILENAMEBLOCK_ENTRY_LEN+32]):
|
||||
self.__clean_filenamedirectory()
|
||||
return False
|
||||
|
||||
afs_file.seek(tableofcontent_len)
|
||||
self.__tableofcontent += afs_file.read(self.__filenamedirectory_offset_offset+8 - tableofcontent_len)
|
||||
return True
|
||||
def __loadsys_from_folder(self, sys_path:Path):
|
||||
self.__tableofcontent = bytearray( (sys_path / "tableofcontent.bin").read_bytes() )
|
||||
self.__file_count = self.__get_file_count()
|
||||
|
||||
# If there is a filenamedirectory we load it
|
||||
if (sys_path / "filenamedirectory.bin").is_file():
|
||||
self.__filenamedirectory = bytearray((sys_path / "filenamedirectory.bin").read_bytes())
|
||||
self.__filenamedirectory_offset_offset = len(self.__tableofcontent) - 8
|
||||
self.__filenamedirectory_offset = self.__get_filenamedirectory_offset()
|
||||
self.__filenamedirectory_len = self.__get_filenamedirectory_len()
|
||||
if self.__filenamedirectory_len != len(self.__filenamedirectory):
|
||||
raise Exception("Error - Tableofcontent filenamedirectory length does not match real filenamedirectory length.")
|
||||
def unpack(self, afs_path:Path, folder_path:Path):
|
||||
sys_path = folder_path / "sys"
|
||||
root_path = folder_path / "root"
|
||||
sys_path.mkdir(parents=True)
|
||||
root_path.mkdir()
|
||||
|
||||
with afs_path.open("rb") as afs_file:
|
||||
if not self.__loadsys_from_afs(afs_file, afs_path.stat().st_size):
|
||||
logging.info("There is no filename directory. Creating new names and dates for files.")
|
||||
else:
|
||||
logging.debug(f"filenamedirectory_offset:0x{self.__filenamedirectory_offset:x}, filenamedirectory_len:0x{self.__filenamedirectory_len:x}.")
|
||||
logging.info("Writting sys/filenamedirectory.bin")
|
||||
(sys_path / "filenamedirectory.bin").write_bytes(self.__filenamedirectory)
|
||||
resolver = FilenameResolver(sys_path)
|
||||
|
||||
logging.info("Writting sys/tableofcontent.bin")
|
||||
(sys_path / "tableofcontent.bin").write_bytes(self.__tableofcontent)
|
||||
|
||||
logging.info(f"Extracting {self.__file_count} files.")
|
||||
for i in range(0, self.__file_count):
|
||||
file_offset = self.__get_file_offset(i)
|
||||
file_len = self.__get_file_len(i)
|
||||
filename = resolver.resolve_new(i, self.__get_file_name(i)) if self.__filenamedirectory else f"{i:08}"
|
||||
|
||||
logging.debug(f"Writting {root_path / filename} 0x{file_offset:x}:0x{file_offset + file_len:x}")
|
||||
afs_file.seek(file_offset)
|
||||
(root_path / filename).write_bytes(afs_file.read(file_len))
|
||||
|
||||
if self.__filenamedirectory:
|
||||
mtime = self.__get_mtime(i)
|
||||
os.utime(root_path / filename, (mtime, mtime))
|
||||
if self.__filenamedirectory:
|
||||
resolver.save()
|
||||
def pack(self, folder_path:Path, afs_path:Path = None):
|
||||
if afs_path == None:
|
||||
afs_path = folder_path / Path(folder_path.name).with_suffix(".afs")
|
||||
elif afs_path.suffix != ".afs":
|
||||
logging.warning("Dest file should have .afs file extension.")
|
||||
|
||||
sys_path = folder_path / "sys"
|
||||
root_path = folder_path / "root"
|
||||
|
||||
self.__loadsys_from_folder(sys_path)
|
||||
|
||||
if self.__filenamedirectory:
|
||||
resolver = FilenameResolver(sys_path)
|
||||
|
||||
offsets_map = self.__get_offsets_map()
|
||||
with afs_path.open("wb") as afs_file:
|
||||
# We update files
|
||||
for i in range(0, self.__file_count):
|
||||
file_offset = self.__get_file_offset(i)
|
||||
file_len = self.__get_file_len(i)
|
||||
filename = resolver.resolve_from_index(i, self.__get_file_name(i)) if self.__filenamedirectory else f"{i:08}"
|
||||
|
||||
file_path = root_path / filename
|
||||
new_file_len = file_path.stat().st_size
|
||||
|
||||
if new_file_len != file_len:
|
||||
next_offset = None
|
||||
# If no FD, we can raise AFS length without constraint
|
||||
if offsets_map.index(file_offset) + 1 < len(offsets_map):
|
||||
next_offset = offsets_map[offsets_map.index(file_offset)+1]
|
||||
if next_offset:
|
||||
if file_offset + new_file_len > next_offset:
|
||||
raise AfsInvalidFileLenError(f"File {file_path} as a new file_len (0x{new_file_len:x}) > next file offset (0x{next_offset:x}). "\
|
||||
"This means that we have to rebuild the AFS using -r and changing offset of all next files and this could lead to bugs if the main dol use AFS relative file offsets.")
|
||||
self.__patch_file_len(i, new_file_len)
|
||||
# If there is a filenamedirectory we update mtime :
|
||||
if self.__filenamedirectory:
|
||||
self.__patch_mtime(i, round(file_path.stat().st_mtime))
|
||||
logging.debug(f"Packing {file_path} 0x{file_offset:x}:0x{file_offset+new_file_len:x} in AFS.")
|
||||
afs_file.seek(file_offset)
|
||||
afs_file.write(self.__pad(file_path.read_bytes()))
|
||||
if self.__filenamedirectory:
|
||||
afs_file.seek(self.__filenamedirectory_offset)
|
||||
afs_file.write(self.__pad(self.__filenamedirectory))
|
||||
logging.debug(f"Packing {sys_path}/tableofcontent.bin at the beginning of the AFS.")
|
||||
afs_file.seek(0)
|
||||
afs_file.write(self.__tableofcontent)
|
||||
def rebuild(self, folder_path:Path):
|
||||
raise Exception("Not implemented yet")
|
||||
def stats(self, path:Path):
|
||||
if path.is_file():
|
||||
with path.open("rb") as afs_file:
|
||||
self.__loadsys_from_afs(afs_file, path.stat().st_size)
|
||||
else:
|
||||
self.__loadsys_from_folder(path / "sys")
|
||||
|
||||
files_map = self.__get_formated_map()
|
||||
files_map.sort(key=lambda x: x[1]) # sort by offset
|
||||
|
||||
# Offsets intersect
|
||||
dup_offsets_tuples = []
|
||||
last_tuple = (-1, "-1", "0") # empty space search init
|
||||
new_set = True
|
||||
# Filenames duplicates
|
||||
dup_names_dict = {} # tmp dict for grouping by filename
|
||||
dup_names_tuples = []
|
||||
# For empty blocks
|
||||
empty_space_tuples = []
|
||||
for file_tuple in files_map:
|
||||
# Filenames duplicates
|
||||
if not file_tuple[6] in dup_names_dict:
|
||||
dup_names_dict[file_tuple[6]] = [file_tuple]
|
||||
else:
|
||||
dup_names_dict[file_tuple[6]].append(file_tuple)
|
||||
# Offsets intersect
|
||||
if file_tuple[1] < last_tuple[1]:
|
||||
if new_set:
|
||||
dup_offsets_tuples.append("Files sharing same offsets:\n")
|
||||
new_set = False
|
||||
dup_offsets_tuples.append(file_tuple)
|
||||
else:
|
||||
new_set = True
|
||||
# Empty blocks
|
||||
last_block_end = ceil(int(last_tuple[2], base=16) / Afs.ALIGN) * Afs.ALIGN
|
||||
if int(file_tuple[1], base=16) - last_block_end >= Afs.ALIGN:
|
||||
empty_space_tuples.append( (last_tuple[2], file_tuple[1], f"{int(file_tuple[1], base=16) - int(last_tuple[2], base=16):08x}", file_tuple[6]) )
|
||||
last_tuple = file_tuple
|
||||
|
||||
for filename in dup_names_dict:
|
||||
if len(dup_names_dict[filename]) > 1:
|
||||
dup_names_tuples += ["Files sharing same name:\n"] + [file_tuple for file_tuple in dup_names_dict[filename]]
|
||||
|
||||
dup_offsets = "Yes" if len(dup_offsets_tuples) > 1 else "No"
|
||||
dup_names = "Yes" if len(dup_names_tuples) > 1 else "No"
|
||||
empty_space = "Yes" if len(empty_space_tuples) > 1 else "No"
|
||||
|
||||
files_info = f"AFS Magic/Version : {str(self.__get_magic())[2:-1]}\n"+\
|
||||
f"TOC offset of the FD offset : 0x{self.__filenamedirectory_offset_offset:x}\n"+\
|
||||
f"Multiple files using same offsets: {dup_offsets}\n"+\
|
||||
f"Multiple files using same name : {dup_names}\n"+\
|
||||
f"Empty blocks : {empty_space}\n"
|
||||
self.__print("Global infos and AFS space mapping:", files_map, infos=files_info)
|
||||
if dup_offsets_tuples:
|
||||
self.__print("Files sharing same AFS offsets:", dup_offsets_tuples)
|
||||
if dup_names_tuples:
|
||||
self.__print("Files using same filenames:", dup_names_tuples)
|
||||
if empty_space_tuples:
|
||||
self.__print("Empty blocks between files (filename = name of the previous file):", empty_space_tuples, columns=[1,2,3,6])
|
||||
def __print(self, title:str, lines_tuples, columns:list = list(range(0,7)), infos:str = ""):
|
||||
stats_buffer = "#"*100+f"\n# {title}\n"+"#"*100+f"\n{infos}|"+"-"*99+"\n"
|
||||
if 0 in columns: stats_buffer += "| Index ";
|
||||
if 1 in columns: stats_buffer += "| b offset ";
|
||||
if 2 in columns: stats_buffer += "| e offset ";
|
||||
if 3 in columns: stats_buffer += "| length ";
|
||||
if 4 in columns: stats_buffer += "| YYYY-mm-dd HH:MM:SS ";
|
||||
if 5 in columns: stats_buffer += "| FD last ";
|
||||
if 6 in columns: stats_buffer += "| Filename";
|
||||
stats_buffer += "\n|"+"-"*99+"\n"
|
||||
for line in lines_tuples:
|
||||
stats_buffer += line if type(line) == str else "| "+" | ".join(line)+"\n"
|
||||
print(stats_buffer, end='')
|
||||
# end offset not included (0,1) -> len=1
|
||||
def __get_offsets_map(self):
|
||||
# offsets_map is used to check next used offset when updating files
|
||||
# we also check if there is intersect between files
|
||||
offsets_map = [(0, len(self.__tableofcontent))]
|
||||
for i in range(0, self.__file_count):
|
||||
file_offset = self.__get_file_offset(i)
|
||||
offsets_map.append( (file_offset, file_offset + self.__get_file_len(i)) )
|
||||
if self.__filenamedirectory:
|
||||
filenamedirectory_offset = self.__get_filenamedirectory_offset()
|
||||
offsets_map.append( (filenamedirectory_offset, filenamedirectory_offset + self.__get_filenamedirectory_len()) )
|
||||
offsets_map.sort(key=lambda x: x[0])
|
||||
|
||||
# Check if there is problems in file memory mapping
|
||||
last_tuple = (-1, -1)
|
||||
for i, offsets_tuple in enumerate(offsets_map):
|
||||
if offsets_tuple[0] < last_tuple[1]:
|
||||
raise Exception(f"Error - Multiple files use same file offsets ranges.")
|
||||
last_tuple = offsets_tuple
|
||||
offsets_map[i] = offsets_tuple[0]
|
||||
return offsets_map
|
||||
# end offset not included (0,1) -> len=1
|
||||
def __get_formated_map(self):
|
||||
files_map = [("SYS TOC ", "00000000", f"{len(self.__tableofcontent):08x}", f"{len(self.__tableofcontent):08x}", "SYS TOC"+' '*12, "SYS TOC ", "SYS TOC")]
|
||||
|
||||
for i in range(0, self.__file_count):
|
||||
file_offset = self.__get_file_offset(i)
|
||||
file_len = self.__get_file_len(i)
|
||||
file_date = datetime.fromtimestamp(self.__get_mtime(i)).strftime("%Y-%m-%d %H:%M:%S") if self.__filenamedirectory else " "*19
|
||||
filename = self.__get_file_name(i) if self.__filenamedirectory else f"{i:08}"
|
||||
files_map.append((f"{i:08x}", f"{file_offset:08x}", f"{file_offset + file_len:08x}", f"{file_len:08x}", file_date, f"{self.__get_file_fdlast(i):08x}", filename))
|
||||
|
||||
if self.__filenamedirectory:
|
||||
files_map.append(("SYS FD ", f"{self.__filenamedirectory_offset:08x}", \
|
||||
f"{self.__filenamedirectory_offset + len(self.__filenamedirectory):08x}", \
|
||||
f"{len(self.__filenamedirectory):08x}", "SYS FD"+' '*13, "SYS FD ", "SYS FD"))
|
||||
return files_map
|
||||
bytes1 = file1.read(CLUSTER_LEN)
|
||||
bytes2 = file2.read(CLUSTER_LEN)
|
||||
return True
|
||||
|
||||
|
||||
def get_argparser():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='AFS packer & unpacker - [GameCube] v' + __version__)
|
||||
parser.add_argument('--version', action='version', version='%(prog)s ' + __version__)
|
||||
parser.add_argument('-v', '--verbose', action='store_true', help='verbose mode')
|
||||
parser.add_argument('input_path', metavar='INPUT', help='')
|
||||
parser.add_argument('output_path', metavar='OUTPUT', help='', nargs='?', default="")
|
||||
# compare two folder
|
||||
# -> raise an exception if there is a difference in paths or in file content
|
||||
def compare_folders(folder1: Path, folder2: Path, compare_mtime=False):
|
||||
folder1_tmp_paths = list(folder1.glob("*"))
|
||||
folder1_file_count = len(folder1_tmp_paths)
|
||||
print(f"compare \"{folder1}\" - \"{folder2}\" ({folder1_file_count} files)")
|
||||
if folder1_file_count == 0:
|
||||
raise Exception(f"ERROR - EMPTY FOLDER: {folder1}")
|
||||
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument('-p', '--pack', action='store_true', help="-p source_folder (dest_file.afs) : Pack source_folder in new file source_folder.afs or dest_file.afs if specified.")
|
||||
group.add_argument('-u', '--unpack', action='store_true', help="-u source_afs.afs (dest_folder) : Unpack the AFS in new folder source_afs or dest_folder if specified.")
|
||||
group.add_argument('-s', '--stats', action='store_true', help="-s source_afs.afs or source_folder : Get stats about AFS, files, memory, lengths and offsets.")
|
||||
group.add_argument('-r', '--rebuild', help="-r source_folder fndo_offset : Rebuild AFS tableofcontent (TOC) and filenamedirectory (FND) using filenamedirectory_offset_offset=fndo_offset (the offset in the TOC).")
|
||||
return parser
|
||||
len1 = len(folder1.parts)
|
||||
len2 = len(folder2.parts)
|
||||
# 1. Compare names in filesystems
|
||||
folder1_paths = [Path(*path.parts[len1:]) for path in folder1_tmp_paths]
|
||||
folder2_paths = [Path(*path.parts[len2:]) for path in folder2.glob("*")]
|
||||
if folder1_paths != folder2_paths:
|
||||
print_paths_differences(folder1_paths, folder2_paths)
|
||||
raise Exception(f"Folders \"{folder1}\" and \"{folder2}\" are different (not the same folders or files names).")
|
||||
# 2. Compare files content
|
||||
for path1 in folder1_tmp_paths:
|
||||
path2 = folder2 / Path(*path1.parts[len1:])
|
||||
if compare_mtime:
|
||||
# When using FD Date it floor to second
|
||||
if round(path1.stat().st_mtime) != round(path2.stat().st_mtime):
|
||||
raise Exception(f"\"{path1}\" and \"{path2}\" mtime (update time) are different:\n {round(path1.stat().st_mtime)}-{round(path2.stat().st_mtime)}")
|
||||
if not compare_files(path1, path2):
|
||||
raise Exception(f"\"{path1}\" and \"{path2}\" are different.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
|
||||
args = get_argparser().parse_args()
|
||||
# compare two AFS
|
||||
# -> raise an exception if there is a difference in:
|
||||
# -paths
|
||||
# -files content
|
||||
# -mtime if there is a filename directory
|
||||
def compare_unpacked_AFS(folder1: Path, folder2: Path):
|
||||
compare_mtime = False
|
||||
if (folder1 / "sys" / "filenamedirectory.bin").is_file():
|
||||
compare_mtime = True
|
||||
compare_folders(folder1 / "root", folder2 / "root", compare_mtime)
|
||||
|
||||
p_input = Path(args.input_path)
|
||||
p_output = Path(args.output_path)
|
||||
|
||||
afs = Afs()
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
def patch_all_bytes(file_path:Path, max_len:int = None):
|
||||
file_data = bytearray(file_path.read_bytes())
|
||||
if max_len == None:
|
||||
max_len = len(file_data)
|
||||
elif max_len < len(file_data):
|
||||
file_data = file_data[:max_len]
|
||||
for i in range(0, len(file_data)):
|
||||
file_data[i] = (file_data[i] + 1) % 255
|
||||
if max_len > len(file_data):
|
||||
file_data.extend(b"\x01"*(max_len - len(file_data)))
|
||||
file_path.write_bytes(file_data)
|
||||
|
||||
if args.pack:
|
||||
logging.info("### Pack in new AFS")
|
||||
if(p_output == Path(".")):
|
||||
p_output = Path(p_input.with_suffix(".afs"))
|
||||
logging.info(f"packing folder {p_input} in {p_output}")
|
||||
afs.pack( p_input, p_output )
|
||||
elif args.unpack:
|
||||
logging.info("### Unpack AFS in new folder")
|
||||
if p_output == Path("."):
|
||||
p_output = p_input.parent / p_input.stem
|
||||
logging.info(f"unpacking AFS {p_input} in {p_output}")
|
||||
afs.unpack( p_input, p_output )
|
||||
elif args.stats:
|
||||
afs.stats(p_input)
|
||||
|
||||
# if not bool_len: patch all files with max len
|
||||
# if bool_len: patch first file found with max len + 1
|
||||
def patch_unpackedfiles_in_folder(folder_path:Path, bool_len:bool = False):
|
||||
for afsfolder_path in folder_path.glob("*"):
|
||||
print(f"Patching {afsfolder_path}...")
|
||||
afs_test = AfsTest()
|
||||
offsets_names_map = afs_test.get_range(afsfolder_path)
|
||||
|
||||
for file_path in afsfolder_path.glob("root/*"):
|
||||
max_len = None
|
||||
# Search by resolved name and get begin offset of next file / SYS File
|
||||
for i in range(0, len(offsets_names_map)):
|
||||
if offsets_names_map[i][1] == file_path.name:
|
||||
if i+1 < len(offsets_names_map):
|
||||
max_len = offsets_names_map[i+1][0] - offsets_names_map[i][0]
|
||||
if bool_len:
|
||||
max_len += 1
|
||||
# else there is no limit because last file
|
||||
else:
|
||||
max_len = file_path.stat().st_size + Afs.ALIGN
|
||||
break
|
||||
patch_all_bytes(file_path, max_len)
|
||||
if bool_len:
|
||||
break
|
||||
|
||||
|
||||
##################################################
|
||||
# afstool.py commands wrappers
|
||||
##################################################
|
||||
def afspacker_extract(afs_path:Path, folder_path:Path):
|
||||
print(f"AFSPacker.exe : Extracting \"{afs_path}\" in \"{folder_path}\"")
|
||||
if os.system(f"{afspacker_path} -e \"{afs_path}\" \"{folder_path}\" > NUL") != 0:
|
||||
raise Exception("Error while unpacking with AFSPacker.exe")
|
||||
def afstool_pack(folder_path:Path, afs_path:Path):
|
||||
if os.system(f"python afstool.py -p \"{folder_path}\" \"{afs_path}\"") != 0:
|
||||
raise Exception("Error while (re)packing.")
|
||||
def afstool_unpack(afs_path:Path, folder_path:Path):
|
||||
if os.system(f"python afstool.py -u \"{afs_path}\" \"{folder_path}\"") != 0:
|
||||
raise Exception("Error while unpacking.")
|
||||
|
||||
|
||||
def repack_unpack2_compare():
|
||||
repack_path.mkdir()
|
||||
unpack2_path.mkdir()
|
||||
unpack_paths = list(unpack_path.glob("*"))
|
||||
|
||||
# repack unpack_path repack_path
|
||||
for folder_path in unpack_paths:
|
||||
afstool_pack(folder_path, repack_path / Path(folder_path.stem).with_suffix('.afs'))
|
||||
|
||||
# unpack repack_path unpack2_path
|
||||
for afs_path in repack_path.glob("*"):
|
||||
afstool_unpack(afs_path, unpack2_path / afs_path.stem)
|
||||
|
||||
shutil.rmtree(repack_path)
|
||||
|
||||
# compare unpack_path unpack2_path
|
||||
for folder_path in unpack_paths:
|
||||
compare_unpacked_AFS(folder_path, unpack2_path / folder_path.name)
|
||||
|
||||
shutil.rmtree(unpack2_path)
|
||||
|
||||
|
||||
TEST_COUNT = 6
|
||||
|
||||
start = time()
|
||||
print("###############################################################################")
|
||||
print("# Checking tests folder")
|
||||
print("###############################################################################")
|
||||
# Check if tests folders exist
|
||||
if unpack_path.is_dir() or unpack2_path.is_dir() or repack_path.is_dir():
|
||||
raise Exception(f"Error - Please remove:\n-{unpack_path}\n-{unpack2_path}\n-{repack_path}")
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 1/{TEST_COUNT}")
|
||||
print("# Comparing afss_path->unpack->[unpack_path] AFS with [afspacker_unpack_path].")
|
||||
print("###############################################################################")
|
||||
|
||||
unpack_path.mkdir()
|
||||
afss_paths = list(afss_path.glob("*"))
|
||||
|
||||
if not afspacker_unpack_path.is_dir():
|
||||
afspacker_unpack_path.mkdir()
|
||||
# AFSPacker.exe unpack afss_path in afspacker_unpack_path
|
||||
for afs_path in afss_paths:
|
||||
afspacker_extract(afs_path, afspacker_unpack_path / afs_path.stem)
|
||||
|
||||
# unpack afss_path unpack_path
|
||||
for afs_path in afss_paths:
|
||||
afstool_unpack(afs_path, unpack_path / afs_path.stem)
|
||||
|
||||
# compare unpack_path afspacker_unpack_path
|
||||
# AFSPacker don't store the date present in the filename directory in the file metadatas and update every dates when packing
|
||||
for folder_path in unpack_path.glob("*"):
|
||||
compare_folders(folder_path / "root", afspacker_unpack_path / folder_path.stem)
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 2/{TEST_COUNT}")
|
||||
print("# Comparing unpack_path->pack->[repack_path] AFS with [afss_path].")
|
||||
print("###############################################################################")
|
||||
repack_path.mkdir()
|
||||
# repack unpack_path repack_path
|
||||
for folder_path in unpack_path.glob("*"):
|
||||
afstool_pack(folder_path, repack_path / Path(folder_path.stem).with_suffix('.afs'))
|
||||
|
||||
# compare repack_path afss_path
|
||||
compare_folders(afss_path, repack_path)
|
||||
|
||||
shutil.rmtree(repack_path)
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 3/{TEST_COUNT}")
|
||||
print("# Comparing [unpack_path]->patch->pack->unpack->[unpack2_path].")
|
||||
print("###############################################################################")
|
||||
# Patch unpack files whithout changing their len
|
||||
for folder_path in unpack_path.glob("*"):
|
||||
print(f"Patching {folder_path}...")
|
||||
for file_path in folder_path.glob("root/*"):
|
||||
patch_all_bytes(file_path)
|
||||
|
||||
repack_unpack2_compare()
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 4/{TEST_COUNT}")
|
||||
print("# Comparing [unpack_path]->patch(max_size)->pack->unpack->[unpack2_path].")
|
||||
print("###############################################################################")
|
||||
# Patch unpack files changing len to max
|
||||
patch_unpackedfiles_in_folder(unpack_path)
|
||||
|
||||
repack_unpack2_compare()
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 5/{TEST_COUNT}")
|
||||
print("# Testing exception unpack_path->patch(max_size+1)->[pack]->repack_path.")
|
||||
print("###############################################################################")
|
||||
# Patch unpack files with 1 byte in a new used block in the first file
|
||||
patch_unpackedfiles_in_folder(unpack_path, True)
|
||||
|
||||
repack_path.mkdir()
|
||||
|
||||
# repack unpack_path repack_path
|
||||
for folder_path in unpack_path.glob("*"):
|
||||
try:
|
||||
afs = Afs()
|
||||
afs.pack(folder_path, repack_path / Path(folder_path.stem).with_suffix('.afs'))
|
||||
raise Exception(f"Error - Invalid file len check. Must raise an exception.")
|
||||
except AfsInvalidFileLenError:
|
||||
print(f"Correct AfsInvalidFileLenError - {folder_path}")
|
||||
|
||||
shutil.rmtree(repack_path)
|
||||
|
||||
print("###############################################################################")
|
||||
print(f"# TEST 6/{TEST_COUNT}")
|
||||
print("# Comparing [unpack_path]->patch(blocks - 1)->pack->unpack->[unpack2_path].")
|
||||
print("###############################################################################")
|
||||
# Patch unpack files with 1 block less
|
||||
for file_path in unpack_path.glob("*/root/*"):
|
||||
patch_all_bytes(file_path, file_path.stat().st_size - Afs.ALIGN)
|
||||
|
||||
repack_unpack2_compare()
|
||||
|
||||
print("###############################################################################")
|
||||
print("# Cleaning test folders.")
|
||||
print("###############################################################################")
|
||||
# Remove tests folders
|
||||
shutil.rmtree(unpack_path)
|
||||
|
||||
end = time()
|
||||
print("###############################################################################")
|
||||
print(f"# All tests are OK - elapsed time : {end - start}")
|
||||
print("###############################################################################")
|
||||
|
|
Loading…
Reference in New Issue
Block a user