mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-12 16:42:45 +00:00
Close open archive (zip/tar) file handles
This commit is contained in:
@@ -159,6 +159,9 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
finally:
|
||||
if bugreport:
|
||||
bugreport.close()
|
||||
|
||||
def run_backup_cmd(self) -> bool:
|
||||
try:
|
||||
@@ -183,6 +186,10 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
finally:
|
||||
if backup:
|
||||
backup.close()
|
||||
|
||||
|
||||
def finish(self) -> None:
|
||||
"""
|
||||
|
||||
@@ -56,12 +56,12 @@ class CmdAndroidCheckBackup(Command):
|
||||
self.name = "check-backup"
|
||||
self.modules = BACKUP_MODULES
|
||||
|
||||
self.backup_type: str = ""
|
||||
self.backup_archive: Optional[tarfile.TarFile] = None
|
||||
self.backup_files: List[str] = []
|
||||
self.__type: str = ""
|
||||
self.__tar: Optional[tarfile.TarFile] = None
|
||||
self.__files: List[str] = []
|
||||
|
||||
def from_ab(self, ab_file_bytes: bytes) -> None:
|
||||
self.backup_type = "ab"
|
||||
self.__type = "ab"
|
||||
header = parse_ab_header(ab_file_bytes)
|
||||
if not header["backup"]:
|
||||
log.critical("Invalid backup format, file should be in .ab format")
|
||||
@@ -84,26 +84,26 @@ class CmdAndroidCheckBackup(Command):
|
||||
sys.exit(1)
|
||||
|
||||
dbytes = io.BytesIO(tardata)
|
||||
self.backup_archive = tarfile.open(fileobj=dbytes)
|
||||
for member in self.backup_archive:
|
||||
self.backup_files.append(member.name)
|
||||
self.__tar = tarfile.open(fileobj=dbytes)
|
||||
for member in self.__tar:
|
||||
self.__files.append(member.name)
|
||||
|
||||
def init(self) -> None:
|
||||
if not self.target_path:
|
||||
return
|
||||
|
||||
if os.path.isfile(self.target_path):
|
||||
self.backup_type = "ab"
|
||||
self.__type = "ab"
|
||||
with open(self.target_path, "rb") as handle:
|
||||
ab_file_bytes = handle.read()
|
||||
self.from_ab(ab_file_bytes)
|
||||
|
||||
elif os.path.isdir(self.target_path):
|
||||
self.backup_type = "folder"
|
||||
self.__type = "folder"
|
||||
self.target_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for fname in subfiles:
|
||||
self.backup_files.append(
|
||||
self.__files.append(
|
||||
os.path.relpath(os.path.join(root, fname), self.target_path)
|
||||
)
|
||||
else:
|
||||
@@ -114,7 +114,11 @@ class CmdAndroidCheckBackup(Command):
|
||||
sys.exit(1)
|
||||
|
||||
def module_init(self, module: BackupModule) -> None: # type: ignore[override]
|
||||
if self.backup_type == "folder":
|
||||
module.from_dir(self.target_path, self.backup_files)
|
||||
if self.__type == "folder":
|
||||
module.from_dir(self.target_path, self.__files)
|
||||
else:
|
||||
module.from_ab(self.target_path, self.backup_archive, self.backup_files)
|
||||
module.from_ab(self.target_path, self.__tar, self.__files)
|
||||
|
||||
def finish(self) -> None:
|
||||
if self.__tar:
|
||||
self.__tar.close()
|
||||
|
||||
Reference in New Issue
Block a user