mirror of
https://github.com/mvt-project/mvt.git
synced 2026-06-07 23:43:55 +02:00
Replace split("\n") with splitlines() for platform compatibility and other todos and chores (#746)
* Replace split("\n") with splitlines() for platform compatibility
* Remove dead commented-out code in webkit_session_resource_log
* Remove stale FIXME comment in command.py
* Narrow bare except to specific exception types in convert_mactime_to_datetime
* Fix typo in aqf_files.py comment
* Refactor b64 encoding in configuration_profiles into helper methods
* Pass branch parameter to GitHub commits API in update checker
* Replace bare KeyError catch with explicit key check in net_base
* Remove confirmed Chrome database path TODOs
Backup IDs verified via SHA-1 of AppDomain-com.google.chrome.ios paths.
* Extract additional timestamps from WebKit ObservedDomains table
Query mostRecentUserInteractionTime and mostRecentWebPushInteractionTime
with fallback to the original 4-column query for older iOS versions.
* Clarify command_line list format matches protobuf schema in tombstone parser
* Support SHA1 and MD5 hash matching in AQF files module
* Remove resolved TODO about --output requirement in download-apks
* Clean up code TODOs and type checks
* Fix WebKit timestamp schema handling
This commit is contained in:
@@ -131,10 +131,17 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
)
|
||||
return
|
||||
|
||||
# TODO: Parse AdbDebuggingManager line in output.
|
||||
start_of_json = content.find(b"\n{") + 2
|
||||
end_of_json = content.rfind(b"}\n") - 2
|
||||
json_content = content[start_of_json:end_of_json].rstrip()
|
||||
start_of_json = content.find(b"\n{")
|
||||
if start_of_json == -1:
|
||||
self.log.error("Unable to find ADB manager state in dumpsys output")
|
||||
return
|
||||
|
||||
end_of_json = content.rfind(b"}\n")
|
||||
if end_of_json == -1 or end_of_json <= start_of_json:
|
||||
self.log.error("Unable to find complete ADB manager state in dumpsys output")
|
||||
return
|
||||
|
||||
json_content = content[start_of_json + 2 : end_of_json - 2].rstrip()
|
||||
|
||||
parsed = self.indented_dump_parser(json_content)
|
||||
if parsed.get("debugging_manager") is None:
|
||||
|
||||
@@ -14,9 +14,12 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
def check_indicators(self) -> None:
|
||||
alerted_root_packages = set()
|
||||
for result in self.results:
|
||||
# XXX: De-duplication Package detections
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
if result["package_name"] in alerted_root_packages:
|
||||
continue
|
||||
alerted_root_packages.add(result["package_name"])
|
||||
self.alertstore.medium(
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
|
||||
"",
|
||||
@@ -188,7 +191,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
package = []
|
||||
|
||||
in_package_list = False
|
||||
for line in content.split("\n"):
|
||||
for line in content.splitlines():
|
||||
if line.startswith("Packages:"):
|
||||
in_package_list = True
|
||||
continue
|
||||
|
||||
@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class Processes(AndroidArtifact):
|
||||
def parse(self, entry: str) -> None:
|
||||
for line in entry.split("\n")[1:]:
|
||||
for line in entry.splitlines()[1:]:
|
||||
proc = line.split()
|
||||
|
||||
# Skip empty lines
|
||||
|
||||
@@ -67,11 +67,14 @@ class Settings(AndroidArtifact):
|
||||
# Check if one of the dangerous settings is using an unsafe
|
||||
# value (different than the one specified).
|
||||
if danger["key"] == key and danger["safe_value"] != value:
|
||||
self.log.warning(
|
||||
'Found suspicious "%s" setting "%s = %s" (%s)',
|
||||
namespace,
|
||||
key,
|
||||
value,
|
||||
danger["description"],
|
||||
self.alertstore.medium(
|
||||
f'Found suspicious "{namespace}" setting "{key} = {value}" ({danger["description"]})',
|
||||
"",
|
||||
{
|
||||
"namespace": namespace,
|
||||
"key": key,
|
||||
"value": value,
|
||||
"description": danger["description"],
|
||||
},
|
||||
)
|
||||
break
|
||||
|
||||
@@ -101,8 +101,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
continue
|
||||
|
||||
if result.get("command_line", []):
|
||||
command_name = result.get("command_line")[0].split("/")[-1]
|
||||
command_name = result["command_line"][0]
|
||||
command_name = result["command_line"][0].split("/")[-1]
|
||||
ioc_match = self.indicators.check_process(command_name)
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
@@ -200,7 +199,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
# eg. "Process uptime: 40s"
|
||||
tombstone[destination_key] = int(value_clean.rstrip("s"))
|
||||
elif destination_key == "command_line":
|
||||
# XXX: Check if command line should be a single string in a list, or a list of strings.
|
||||
# Wrap in list for consistency with protobuf format (repeated string).
|
||||
tombstone[destination_key] = [value_clean]
|
||||
else:
|
||||
tombstone[destination_key] = value_clean
|
||||
@@ -262,7 +261,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
@staticmethod
|
||||
def _parse_timestamp_string(timestamp: str) -> str:
|
||||
timestamp_parsed = parser.parse(timestamp)
|
||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||
# Preserve the source wall-clock time while returning the project-wide ISO format.
|
||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||
return convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
|
||||
@@ -355,7 +355,13 @@ def check_intrusion_logs(
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = (
|
||||
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
|
||||
)
|
||||
|
||||
@@ -41,7 +41,7 @@ class AQFFiles(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -107,16 +107,16 @@ class AQFFiles(AndroidQFModule):
|
||||
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
|
||||
self.alertstore.high(msg, "", result)
|
||||
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_file_hash(result.get("sha256") or "")
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
for hash_key in ("sha256", "sha1", "md5"):
|
||||
file_hash = result.get(hash_key, "")
|
||||
if not file_hash:
|
||||
continue
|
||||
ioc_match = self.indicators.check_file_hash(file_hash)
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
break
|
||||
|
||||
def run(self) -> None:
|
||||
if timezone := self._get_device_timezone():
|
||||
@@ -131,7 +131,7 @@ class AQFFiles(AndroidQFModule):
|
||||
data = json.loads(rawdata)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = []
|
||||
for line in rawdata.split("\n"):
|
||||
for line in rawdata.splitlines():
|
||||
if line.strip() == "":
|
||||
continue
|
||||
data.append(json.loads(line))
|
||||
@@ -142,11 +142,11 @@ class AQFFiles(AndroidQFModule):
|
||||
utc_timestamp = datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
# Convert the UTC timestamp to local tiem on Android device's local timezone
|
||||
# Convert the UTC timestamp to local time on Android device's local timezone
|
||||
local_timestamp = utc_timestamp.astimezone(device_timezone)
|
||||
|
||||
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
|
||||
# set the timestamp timezone to UTC, to avoid the timezone conversion again.
|
||||
# Preserve the device-local wall-clock time while using
|
||||
# the project-wide ISO conversion helper.
|
||||
local_timestamp = local_timestamp.replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def run(self) -> None:
|
||||
getprop_files = self._get_files_by_pattern("*/getprop.txt")
|
||||
|
||||
@@ -27,7 +27,7 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -30,7 +30,7 @@ class AQFPackages(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFProcesses(ProcessesArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
def run(self) -> None:
|
||||
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
|
||||
@@ -40,7 +40,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
|
||||
self.results[namespace] = {}
|
||||
data = self._get_file_content(setting_file)
|
||||
for line in data.decode("utf-8").split("\n"):
|
||||
for line in data.decode("utf-8").splitlines():
|
||||
line = line.strip()
|
||||
try:
|
||||
key, value = line.split("=", 1)
|
||||
|
||||
@@ -23,7 +23,7 @@ class AndroidQFModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -32,7 +32,7 @@ class Mounts(MountsArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
@@ -66,6 +66,9 @@ class Mounts(MountsArtifact, AndroidQFModule):
|
||||
# AndroidQF format: array of strings like
|
||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
||||
mount_content = "\n".join(json_data)
|
||||
else:
|
||||
self.log.error("Expected mounts.json to contain a list of mount lines")
|
||||
return
|
||||
self.parse(mount_content)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to parse mount information: %s", exc)
|
||||
|
||||
@@ -21,10 +21,6 @@ from .base import AndroidQFModule
|
||||
class SMS(AndroidQFModule):
|
||||
"""
|
||||
This module analyse SMS file in backup
|
||||
|
||||
XXX: We should also de-duplicate this AQF module, but first we
|
||||
need to add tests for loading encrypted SMS backups through the backup
|
||||
sub-module.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -22,7 +22,7 @@ class BackupModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -20,7 +20,7 @@ class SMS(BackupModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class BugReportModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysADBState(DumpsysADBArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysGetProp(GetPropArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,8 +42,10 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
)
|
||||
return
|
||||
|
||||
data = data.decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
|
||||
decoded_data = data.decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(
|
||||
decoded_data, "DUMP OF SERVICE platform_compat:"
|
||||
)
|
||||
self.parse(content)
|
||||
|
||||
self.log.info("Found %d uninstalled apps", len(self.results))
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -29,9 +29,6 @@ class InvalidBackupPassword(AndroidBackupParsingError):
|
||||
pass
|
||||
|
||||
|
||||
# TODO: Need to clean all the following code and conform it to the coding style.
|
||||
|
||||
|
||||
def to_utf8_bytes(input_bytes):
|
||||
output = []
|
||||
for byte in input_bytes:
|
||||
@@ -157,13 +154,13 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
|
||||
checksum_salt=checksum_salt,
|
||||
)
|
||||
|
||||
# Decrypt and unpad backup data using derivied key.
|
||||
# Decrypt and unpad backup data using derived key.
|
||||
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
return unpadder.update(decrypted_tar)
|
||||
return unpadder.update(decrypted_tar) + unpadder.finalize()
|
||||
|
||||
|
||||
def parse_backup_file(data, password=None):
|
||||
@@ -210,6 +207,8 @@ def parse_tar_for_sms(data):
|
||||
or member.name.endswith("_mms_backup")
|
||||
):
|
||||
dhandler = tar.extractfile(member)
|
||||
if not dhandler:
|
||||
continue
|
||||
res.extend(parse_sms_file(dhandler.read()))
|
||||
|
||||
return res
|
||||
|
||||
@@ -8,5 +8,6 @@ from .module import MVTModule
|
||||
class Artifact(MVTModule):
|
||||
"""Base class for artifacts.
|
||||
|
||||
XXX: Inheriting from MVTModule to have the same signature as other modules. Not sure if this is a good idea.
|
||||
Artifacts share the MVTModule lifecycle so commands can run artifacts and
|
||||
extraction modules through the same interface.
|
||||
"""
|
||||
|
||||
@@ -273,7 +273,6 @@ class Command:
|
||||
):
|
||||
continue
|
||||
|
||||
# FIXME: do we need the logger here
|
||||
module_logger = logging.getLogger(module.__module__)
|
||||
|
||||
m = module(
|
||||
|
||||
@@ -51,7 +51,7 @@ class MVTModule:
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[Dict[str, Any]] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
"""Initialize module.
|
||||
|
||||
@@ -71,15 +71,13 @@ class MVTModule:
|
||||
self.file_path: Optional[str] = file_path
|
||||
self.target_path: Optional[str] = target_path
|
||||
self.results_path: Optional[str] = results_path
|
||||
self.module_options: Optional[Dict[str, Any]] = (
|
||||
module_options if module_options else {}
|
||||
)
|
||||
self.module_options: Dict[str, Any] = module_options if module_options else {}
|
||||
|
||||
self.log = log
|
||||
self.indicators: Optional[Indicators] = None
|
||||
self.alertstore: AlertStore = AlertStore(log=log)
|
||||
|
||||
self.results: ModuleResults = results if results else []
|
||||
self.results: ModuleResults = results if results is not None else []
|
||||
self.timeline: ModuleTimeline = []
|
||||
|
||||
@classmethod
|
||||
@@ -109,11 +107,14 @@ class MVTModule:
|
||||
name = self.get_slug()
|
||||
|
||||
if self.results:
|
||||
converted_results: Any
|
||||
if isinstance(self.results, dict):
|
||||
converted_results = self.results
|
||||
else:
|
||||
converted_results = [
|
||||
asdict(result) if is_dataclass(result) else result
|
||||
asdict(result)
|
||||
if is_dataclass(result) and not isinstance(result, type)
|
||||
else result
|
||||
for result in self.results
|
||||
]
|
||||
results_file_name = f"{name}.json"
|
||||
|
||||
@@ -16,7 +16,10 @@ from typing import Any, Dict, List, Union
|
||||
ModuleAtomicResult = Dict[str, Any]
|
||||
|
||||
|
||||
ModuleResults = List[ModuleAtomicResult]
|
||||
# Extraction modules historically use either a list of records or grouped
|
||||
# dictionaries keyed by source path. Keep this alias broad until those shapes
|
||||
# are modeled per module.
|
||||
ModuleResults = Any
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -180,10 +180,8 @@ class IndicatorsUpdates:
|
||||
def _get_remote_file_latest_commit(
|
||||
self, owner: str, repo: str, branch: str, path: str
|
||||
) -> int:
|
||||
# TODO: The branch is currently not taken into consideration.
|
||||
# How do we specify which branch to look up to the API?
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
|
||||
)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
|
||||
@@ -123,10 +123,9 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
|
||||
if from_2001:
|
||||
timestamp = timestamp + 978307200
|
||||
|
||||
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
|
||||
try:
|
||||
return convert_unix_to_utc_datetime(timestamp)
|
||||
except Exception:
|
||||
except (OSError, OverflowError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ class BackupInfo(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -36,7 +36,7 @@ class BackupInfo(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
|
||||
@@ -33,7 +33,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -85,6 +85,35 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
self.alertstore.medium(warning_message, "", result)
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_key(d: dict, key: str) -> None:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_keys(d: dict, keys: list) -> None:
|
||||
for key in keys:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
def _b64encode_plist_bytes(self, plist: dict) -> None:
|
||||
"""Encode binary plist values to base64 for JSON serialization."""
|
||||
if "SignerCerts" in plist:
|
||||
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
|
||||
|
||||
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
|
||||
|
||||
if "OTAProfileStub" in plist:
|
||||
stub = plist["OTAProfileStub"]
|
||||
if "SignerCerts" in stub:
|
||||
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
|
||||
if "PayloadContent" in stub:
|
||||
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
|
||||
|
||||
if "PayloadContent" in plist:
|
||||
for entry in plist["PayloadContent"]:
|
||||
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
|
||||
|
||||
def run(self) -> None:
|
||||
for conf_file in self._get_backup_files_from_manifest(
|
||||
domain=CONF_PROFILES_DOMAIN
|
||||
@@ -113,65 +142,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
except Exception:
|
||||
conf_plist = {}
|
||||
|
||||
# TODO: Tidy up the following code hell.
|
||||
|
||||
if "SignerCerts" in conf_plist:
|
||||
conf_plist["SignerCerts"] = [
|
||||
b64encode(x) for x in conf_plist["SignerCerts"]
|
||||
]
|
||||
|
||||
if "OTAProfileStub" in conf_plist:
|
||||
if "SignerCerts" in conf_plist["OTAProfileStub"]:
|
||||
conf_plist["OTAProfileStub"]["SignerCerts"] = [
|
||||
b64encode(x)
|
||||
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
|
||||
]
|
||||
|
||||
if "PayloadContent" in conf_plist["OTAProfileStub"]:
|
||||
if (
|
||||
"EnrollmentIdentityPersistentID"
|
||||
in conf_plist["OTAProfileStub"]["PayloadContent"]
|
||||
):
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
] = b64encode(
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
]
|
||||
)
|
||||
|
||||
if "PushTokenDataSentToServerKey" in conf_plist:
|
||||
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
|
||||
conf_plist["PushTokenDataSentToServerKey"]
|
||||
)
|
||||
|
||||
if "LastPushTokenHash" in conf_plist:
|
||||
conf_plist["LastPushTokenHash"] = b64encode(
|
||||
conf_plist["LastPushTokenHash"]
|
||||
)
|
||||
|
||||
if "PayloadContent" in conf_plist:
|
||||
for content_entry in range(len(conf_plist["PayloadContent"])):
|
||||
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
]
|
||||
)
|
||||
|
||||
if (
|
||||
"IdentityPersistentRef"
|
||||
in conf_plist["PayloadContent"][content_entry]
|
||||
):
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
]
|
||||
)
|
||||
self._b64encode_plist_bytes(conf_plist)
|
||||
|
||||
self.results.append(
|
||||
{
|
||||
|
||||
@@ -33,7 +33,7 @@ class Manifest(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -34,7 +34,7 @@ class ProfileEvents(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class IOSExtraction(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -53,7 +53,7 @@ class IOSExtraction(MVTModule):
|
||||
:param file_path: Path to the malformed database file.
|
||||
|
||||
"""
|
||||
# TODO: Find a better solution.
|
||||
# SQLite's immutable mode cannot open databases with active WAL files.
|
||||
if not forced:
|
||||
# If the database is open, do not use immutable
|
||||
if os.path.isfile(file_path + "-shm"):
|
||||
|
||||
@@ -34,7 +34,7 @@ class Analytics(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -44,7 +44,7 @@ class Analytics(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -30,7 +30,7 @@ class AnalyticsIOSVersions(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -25,7 +25,7 @@ class CacheFiles(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -99,6 +99,10 @@ class CacheFiles(IOSExtraction):
|
||||
|
||||
def run(self) -> None:
|
||||
self.results: dict = {}
|
||||
if not self.target_path:
|
||||
self.log.error("No filesystem dump path provided")
|
||||
return
|
||||
|
||||
for root, _, files in os.walk(self.target_path):
|
||||
for file_name in files:
|
||||
if file_name != "Cache.db":
|
||||
|
||||
@@ -29,7 +29,7 @@ class Filesystem(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -73,6 +73,10 @@ class Filesystem(IOSExtraction):
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
self.log.error("No filesystem dump path provided")
|
||||
return
|
||||
|
||||
for root, dirs, files in os.walk(self.target_path):
|
||||
for dir_name in dirs:
|
||||
try:
|
||||
|
||||
@@ -30,7 +30,7 @@ class Netusage(NetBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class SafariFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -41,7 +41,7 @@ class SafariFavicon(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -30,7 +30,7 @@ class ShutdownLog(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -78,7 +78,7 @@ class ShutdownLog(IOSExtraction):
|
||||
recent_processes = []
|
||||
times_delayed = 0
|
||||
delay = 0.0
|
||||
for line in content.split("\n"):
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith("remaining client pid:"):
|
||||
|
||||
@@ -32,7 +32,7 @@ class IOSVersionHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,7 +42,7 @@ class IOSVersionHistory(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -34,7 +34,7 @@ class WebkitIndexedDB(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -32,7 +32,7 @@ class WebkitLocalStorage(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -28,7 +28,7 @@ class WebkitSafariViewService(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -41,7 +41,7 @@ class Applications(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class Calendar(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -27,7 +27,7 @@ class Calls(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: list = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -16,7 +16,6 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
|
||||
from ..base import IOSExtraction
|
||||
|
||||
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_FAVICON_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
|
||||
]
|
||||
@@ -32,7 +31,7 @@ class ChromeFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -18,7 +18,6 @@ from ..base import IOSExtraction
|
||||
CHROME_HISTORY_BACKUP_IDS = [
|
||||
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
|
||||
]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_HISTORY_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
|
||||
]
|
||||
@@ -34,7 +33,7 @@ class ChromeHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -29,7 +29,7 @@ class Contacts(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -33,7 +33,7 @@ class FirefoxFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class FirefoxHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -27,7 +27,7 @@ class GlobalPreferences(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class IDStatusCache(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -228,7 +228,7 @@ class InteractionC(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class LocationdClients(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -147,7 +147,6 @@ class LocationdClients(IOSExtraction):
|
||||
# Some migration information are int and not dicts
|
||||
if not isinstance(file_plist[key], dict):
|
||||
continue
|
||||
# FIXME: unclear key format in iOS 17
|
||||
result = file_plist[key]
|
||||
result["package"] = key.rstrip(":")
|
||||
for timestamp in self.timestamps:
|
||||
|
||||
@@ -31,7 +31,7 @@ class Datausage(NetBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class OSAnalyticsADDaily(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class SafariBrowserState(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -38,7 +38,7 @@ class SafariHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class Shortcuts(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class SMS(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class SMSAttachments(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -56,7 +56,7 @@ class TCC(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -85,32 +85,56 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
|
||||
try:
|
||||
try:
|
||||
# FIXME: table contains extra fields with timestamp here
|
||||
cur.execute("PRAGMA table_info(ObservedDomains);")
|
||||
available_columns = {row[1] for row in cur}
|
||||
required_columns = [
|
||||
"domainID",
|
||||
"registrableDomain",
|
||||
"lastSeen",
|
||||
"hadUserInteraction",
|
||||
]
|
||||
if not set(required_columns).issubset(available_columns):
|
||||
return
|
||||
|
||||
optional_columns = [
|
||||
column
|
||||
for column in [
|
||||
"mostRecentUserInteractionTime",
|
||||
"mostRecentWebPushInteractionTime",
|
||||
]
|
||||
if column in available_columns
|
||||
]
|
||||
selected_columns = required_columns + optional_columns
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
f"SELECT {', '.join(selected_columns)} FROM ObservedDomains;"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
return
|
||||
|
||||
for row in cur:
|
||||
self.results.append(
|
||||
{
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
result = {
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
for index, column in enumerate(optional_columns, start=4):
|
||||
field = {
|
||||
"mostRecentUserInteractionTime": (
|
||||
"most_recent_user_interaction_time"
|
||||
),
|
||||
"mostRecentWebPushInteractionTime": (
|
||||
"most_recent_web_push_interaction_time"
|
||||
),
|
||||
}[column]
|
||||
timestamp = row[index]
|
||||
result[field] = timestamp
|
||||
if timestamp is not None and timestamp >= 0:
|
||||
result[f"{field}_isodate"] = convert_unix_to_iso(timestamp)
|
||||
self.results.append(result)
|
||||
finally:
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
@@ -39,7 +39,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -50,7 +50,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
@staticmethod
|
||||
def _extract_domains(entries):
|
||||
@@ -77,14 +77,21 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
entry["redirect_destination"]
|
||||
)
|
||||
|
||||
# TODO: Currently not used.
|
||||
# subframe_origins = self._extract_domains(
|
||||
# entry["subframe_under_origin"])
|
||||
# subresource_domains = self._extract_domains(
|
||||
# entry["subresource_under_origin"])
|
||||
subframe_origins = self._extract_domains(
|
||||
entry["subframe_under_origin"]
|
||||
)
|
||||
subresource_domains = self._extract_domains(
|
||||
entry["subresource_under_origin"]
|
||||
)
|
||||
|
||||
all_origins = list(
|
||||
set([entry["origin"]] + source_domains + destination_domains)
|
||||
set(
|
||||
[entry["origin"]]
|
||||
+ source_domains
|
||||
+ destination_domains
|
||||
+ subframe_origins
|
||||
+ subresource_domains
|
||||
)
|
||||
)
|
||||
|
||||
ioc_match = self.indicators.check_urls(all_origins)
|
||||
|
||||
@@ -33,7 +33,7 @@ class Whatsapp(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -30,7 +30,7 @@ class NetBase(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -322,14 +322,11 @@ class NetBase(IOSExtraction):
|
||||
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
# Check for manipulated process records.
|
||||
# TODO: Catching KeyError for live_isodate for retro-compatibility.
|
||||
# This is not very good.
|
||||
try:
|
||||
# check_manipulated/find_deleted require "live_isodate" and
|
||||
# "live_proc_id" keys which may be absent in older result formats.
|
||||
if self.results and "live_isodate" in self.results[0]:
|
||||
self.check_manipulated()
|
||||
self.find_deleted()
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
@@ -21,4 +21,5 @@ class TestSettingsModule:
|
||||
run_module(m)
|
||||
assert len(m.results) == 1
|
||||
assert "random" in m.results.keys()
|
||||
assert len(m.alertstore.alerts) == 0
|
||||
assert len(m.alertstore.alerts) == 1
|
||||
assert "samsung_errorlog_agree" in m.alertstore.alerts[0].message
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import sqlite3
|
||||
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.mixed.webkit_resource_load_statistics import (
|
||||
WebkitResourceLoadStatistics,
|
||||
@@ -19,3 +21,50 @@ class TestWebkitResourceLoadStatisticsModule:
|
||||
assert len(m.results) == 2
|
||||
assert len(m.timeline) == 2
|
||||
assert len(m.alertstore.alerts) == 0
|
||||
|
||||
results = {result["registrable_domain"]: result for result in m.results}
|
||||
assert results["google.com"]["most_recent_user_interaction_time"] > 0
|
||||
assert "most_recent_user_interaction_time_isodate" in results["google.com"]
|
||||
assert results["gstatic.com"]["most_recent_user_interaction_time"] == -1.0
|
||||
assert (
|
||||
"most_recent_user_interaction_time_isodate"
|
||||
not in results["gstatic.com"]
|
||||
)
|
||||
assert all(
|
||||
"most_recent_web_push_interaction_time" not in result
|
||||
for result in m.results
|
||||
)
|
||||
|
||||
def test_webkit_full_timestamp_schema(self, tmp_path):
|
||||
db_path = tmp_path / "observations.db"
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE ObservedDomains (
|
||||
domainID INTEGER PRIMARY KEY,
|
||||
registrableDomain TEXT NOT NULL,
|
||||
lastSeen REAL NOT NULL,
|
||||
hadUserInteraction INTEGER NOT NULL,
|
||||
mostRecentUserInteractionTime REAL NOT NULL,
|
||||
mostRecentWebPushInteractionTime REAL NOT NULL
|
||||
);
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO ObservedDomains VALUES (?, ?, ?, ?, ?, ?);
|
||||
""",
|
||||
(1, "example.com", 1634560250.0, 1, 1634560030.0, -1.0),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
m = WebkitResourceLoadStatistics(target_path=str(tmp_path))
|
||||
m._process_observations_db(str(db_path), "", "observations.db")
|
||||
|
||||
assert len(m.results) == 1
|
||||
result = m.results[0]
|
||||
assert result["most_recent_user_interaction_time"] == 1634560030.0
|
||||
assert "most_recent_user_interaction_time_isodate" in result
|
||||
assert result["most_recent_web_push_interaction_time"] == -1.0
|
||||
assert "most_recent_web_push_interaction_time_isodate" not in result
|
||||
|
||||
Reference in New Issue
Block a user