Reorganise code for backup modules

This commit is contained in:
tek
2022-03-04 10:10:56 +01:00
parent 639c163297
commit 86c79075ff
4 changed files with 154 additions and 59 deletions

View File

@@ -5,6 +5,9 @@
import logging
import os
import getpass
import io
import tarfile
from pathlib import Path
from zipfile import ZipFile
@@ -17,6 +20,8 @@ from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
from mvt.common.indicators import Indicators, download_indicators_files
from mvt.common.logo import logo
from mvt.common.module import run_module, save_timeline
from mvt.android.parsers.backup import parse_ab_header, parse_backup_file
from mvt.android.parsers.backup import InvalidBackupPassword, AndroidBackupParsingError
from .download_apks import DownloadAPKs
from .lookups.koodous import koodous_lookup
@@ -246,6 +251,44 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
def check_backup(ctx, iocs, output, backup_path, serial):
log.info("Checking ADB backup located at: %s", backup_path)
if os.path.isfile(backup_path):
# AB File
backup_type = "ab"
with open(backup_path, "rb") as handle:
data = handle.read()
header = parse_ab_header(data)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
ctx.exit(1)
password = None
if header["encryption"] != "none":
password = getpass.getpass(prompt="Backup Password: ", stream=None)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
ctx.exit(1)
except AndroidBackupParsingError:
log.critical("Impossible to parse this backup file, please use Android Backup Extractor instead")
ctx.exit(1)
dbytes = io.BytesIO(tardata)
tar = tarfile.open(fileobj=dbytes)
files = []
for member in tar:
files.append(member.name)
elif os.path.isdir(backup_path):
backup_type = "folder"
backup_path = Path(backup_path).absolute().as_posix()
files = []
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
for fname in subfiles:
files.append(os.path.relpath(os.path.join(root, fname), backup_path))
else:
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
ctx.exit(1)
if output and not os.path.exists(output):
try:
os.makedirs(output)
@@ -265,6 +308,11 @@ def check_backup(ctx, iocs, output, backup_path, serial):
if serial:
m.serial = serial
if backup_type == "folder":
m.from_folder(backup_path, files)
else:
m.from_ab(backup_path, tar, files)
run_module(m)

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import fnmatch
from mvt.common.module import MVTModule
class BackupExtraction(MVTModule):
"""This class provides a base for all backup extractios modules"""
ab = None
def from_folder(self, backup_path, files):
"""
Get all the files and list them
"""
self.backup_path = backup_path
self.files = files
def from_ab(self, file_path, tar, files):
"""
Extract the files
"""
self.ab = file_path
self.tar = tar
self.files = files
def _get_files_by_pattern(self, pattern):
return fnmatch.filter(self.files, pattern)
def _get_file_content(self, file_path):
if self.ab:
try:
member = self.tar.getmember(file_path)
except KeyError:
return None
handle = self.tar.extractfile(member)
else:
handle = open(os.path.join(self.backup_path, file_path), "rb")
data = handle.read()
handle.close()
return data

View File

@@ -3,20 +3,18 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import getpass
from mvt.common.module import MVTModule
from mvt.android.modules.backup.base import BackupExtraction
from mvt.common.utils import check_for_links
from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword, AndroidBackupParsingError
from mvt.android.parsers.backup import parse_sms_file
class SMS(MVTModule):
class SMS(BackupExtraction):
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.results = []
def check_indicators(self):
if not self.indicators:
@@ -26,59 +24,13 @@ class SMS(MVTModule):
if "body" not in message:
continue
message_links = check_for_links(message["body"])
if self.indicators.check_domains(message_links):
if self.indicators.check_domains(message["links"]):
self.detected.append(message)
def _process_sms_file(self, file_path):
self.log.info("Processing SMS backup file at %s", file_path)
with open(file_path, "rb") as handle:
data = handle.read()
self.results = parse_sms_file(data)
def run(self):
# FIXME: this should be done in the Module code if there are other modules on backups
if os.path.isfile(self.base_folder):
# ab file
with open(self.base_folder, "rb") as handle:
data = handle.read()
header = parse_ab_header(data)
if not header["backup"]:
self.log.info("Not a valid Android Backup file, quitting...")
return
pwd = None
if header["encryption"] != "none":
pwd = getpass.getpass(prompt="Backup Password: ", stream=None)
try:
messages = parse_sms_backup(data, password=pwd)
except InvalidBackupPassword:
self.log.info("Invalid password, impossible de decrypt the backup, quitting...")
return
except AndroidBackupParsingError:
self.log.info("Impossible to extract data from this Android Backup, please regenerate the backup using the -nocompress option or extract it using Android Backup Extractor instead.")
self.log.info("Quitting...")
return
self.results = messages
else:
app_folder = os.path.join(self.base_folder,
"apps",
"com.android.providers.telephony",
"d_f")
if not os.path.exists(app_folder):
self.log.info("Unable to find the SMS backup folder")
return
for file_name in os.listdir(app_folder):
if not file_name.endswith("_sms_backup"):
continue
file_path = os.path.join(app_folder, file_name)
self._process_sms_file(file_path)
for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"):
self.log.info("Processing SMS backup file at %s", file)
data = self._get_file_content(file)
self.results.extend(parse_sms_file(data))
self.log.info("Extracted a total of %d SMS messages containing links",
len(self.results))

View File

@@ -5,17 +5,25 @@
import logging
import os
import tarfile
import io
from mvt.android.modules.backup.sms import SMS
from mvt.common.module import run_module
from mvt.android.parsers.backup import parse_backup_file
from ..utils import get_android_backup_folder
class TestBackupModule:
def test_module_folder(self):
fpath = get_android_backup_folder()
mod = SMS(base_folder=fpath, log=logging)
backup_path = get_android_backup_folder()
mod = SMS(base_folder=backup_path, log=logging)
files = []
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
for fname in subfiles:
files.append(os.path.relpath(os.path.join(root, fname), backup_path))
mod.from_folder(backup_path, files)
run_module(mod)
assert len(mod.results) == 1
assert len(mod.results[0]["links"]) == 1
@@ -24,6 +32,47 @@ class TestBackupModule:
def test_module_file(self):
fpath = os.path.join(get_android_backup_folder(), "backup.ab")
mod = SMS(base_folder=fpath, log=logging)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data)
dbytes = io.BytesIO(tardata)
tar = tarfile.open(fileobj=dbytes)
files = []
for member in tar:
files.append(member.name)
mod.from_ab(fpath, tar, files)
run_module(mod)
assert len(mod.results) == 1
assert len(mod.results[0]["links"]) == 1
def test_module_file2(self):
fpath = os.path.join(get_android_backup_folder(), "backup2.ab")
mod = SMS(base_folder=fpath, log=logging)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data, password="123456")
dbytes = io.BytesIO(tardata)
tar = tarfile.open(fileobj=dbytes)
files = []
for member in tar:
files.append(member.name)
mod.from_ab(fpath, tar, files)
run_module(mod)
assert len(mod.results) == 1
assert len(mod.results[0]["links"]) == 1
def test_module_file3(self):
fpath = os.path.join(get_android_backup_folder(), "backup3.ab")
mod = SMS(base_folder=fpath, log=logging)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data)
dbytes = io.BytesIO(tardata)
tar = tarfile.open(fileobj=dbytes)
files = []
for member in tar:
files.append(member.name)
mod.from_ab(fpath, tar, files)
run_module(mod)
assert len(mod.results) == 1
assert len(mod.results[0]["links"]) == 1