mirror of
https://github.com/mvt-project/mvt.git
synced 2026-07-02 19:25:48 +02:00
Merge branch 'main' into fix/dumpsys-battery-daily-order
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
name: Mypy
|
||||
on: workflow_dispatch
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
mypy_py3:
|
||||
|
||||
@@ -21,6 +21,24 @@ make check
|
||||
|
||||
Run these tests before making new commits or opening pull requests.
|
||||
|
||||
## Module dependencies
|
||||
|
||||
Modules can require other modules to run first by declaring their classes in
|
||||
`dependencies`. The command runner uses a stable topological ordering, so the
|
||||
existing module list order is preserved wherever dependency constraints allow.
|
||||
|
||||
```python
|
||||
class DependentModule(MVTModule):
|
||||
dependencies = (PrerequisiteModule,)
|
||||
|
||||
def run(self):
|
||||
prerequisite_results = self.get_dependency_results(PrerequisiteModule)
|
||||
```
|
||||
|
||||
Selecting a single module also runs its transitive dependencies. If a dependency
|
||||
is unavailable or the dependency graph contains a cycle, the command logs a
|
||||
warning and does not run any modules.
|
||||
|
||||
## Profiling
|
||||
|
||||
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
|
||||
|
||||
+19
-1
@@ -34,6 +34,25 @@ It is also possible to load STIX2 files automatically from the environment varia
|
||||
export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
|
||||
```
|
||||
|
||||
## Network Access
|
||||
|
||||
When checking URL indicators, MVT follows recognized shortened URLs with an
|
||||
HTTP `HEAD` request. The following environment variables control these
|
||||
requests:
|
||||
|
||||
- `MVT_NETWORK_ACCESS_ALLOWED` enables or disables network requests. It defaults
|
||||
to `true`. Set it to `false` to prevent MVT from attempting to resolve
|
||||
shortened URLs.
|
||||
- `MVT_NETWORK_TIMEOUT` sets the request timeout in seconds. It defaults to
|
||||
`15`.
|
||||
|
||||
For example, to run IOC checks without resolving shortened URLs:
|
||||
|
||||
```bash
|
||||
MVT_NETWORK_ACCESS_ALLOWED=false mvt-ios check-iocs \
|
||||
--iocs ~/iocs/malware.stix2 /path/to/iphone/output/
|
||||
```
|
||||
|
||||
## STIX2 Support
|
||||
|
||||
So far MVT implements only a subset of [STIX2 specifications](https://docs.oasis-open.org/cti/stix/v2.1/csprd01/stix-v2.1-csprd01.html):
|
||||
@@ -55,4 +74,3 @@ You can automaticallly download the latest public indicator files with the comma
|
||||
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -10,16 +10,20 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
if self.indicators:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
self.alertstore.medium(
|
||||
f'Found accessibility service: "{result["service"]}"',
|
||||
"",
|
||||
result,
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, content: str) -> None:
|
||||
"""
|
||||
|
||||
@@ -131,10 +131,17 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
)
|
||||
return
|
||||
|
||||
# TODO: Parse AdbDebuggingManager line in output.
|
||||
start_of_json = content.find(b"\n{") + 2
|
||||
end_of_json = content.rfind(b"}\n") - 2
|
||||
json_content = content[start_of_json:end_of_json].rstrip()
|
||||
start_of_json = content.find(b"\n{")
|
||||
if start_of_json == -1:
|
||||
self.log.error("Unable to find ADB manager state in dumpsys output")
|
||||
return
|
||||
|
||||
end_of_json = content.rfind(b"}\n")
|
||||
if end_of_json == -1 or end_of_json <= start_of_json:
|
||||
self.log.error("Unable to find complete ADB manager state in dumpsys output")
|
||||
return
|
||||
|
||||
json_content = content[start_of_json + 2 : end_of_json - 2].rstrip()
|
||||
|
||||
parsed = self.indented_dump_parser(json_content)
|
||||
if parsed.get("debugging_manager") is None:
|
||||
|
||||
@@ -14,9 +14,12 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
def check_indicators(self) -> None:
|
||||
alerted_root_packages = set()
|
||||
for result in self.results:
|
||||
# XXX: De-duplication Package detections
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
if result["package_name"] in alerted_root_packages:
|
||||
continue
|
||||
alerted_root_packages.add(result["package_name"])
|
||||
self.alertstore.medium(
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
|
||||
"",
|
||||
@@ -188,7 +191,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
package = []
|
||||
|
||||
in_package_list = False
|
||||
for line in content.split("\n"):
|
||||
for line in content.splitlines():
|
||||
if line.startswith("Packages:"):
|
||||
in_package_list = True
|
||||
continue
|
||||
|
||||
@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class Processes(AndroidArtifact):
|
||||
def parse(self, entry: str) -> None:
|
||||
for line in entry.split("\n")[1:]:
|
||||
for line in entry.splitlines()[1:]:
|
||||
proc = line.split()
|
||||
|
||||
# Skip empty lines
|
||||
|
||||
@@ -67,11 +67,14 @@ class Settings(AndroidArtifact):
|
||||
# Check if one of the dangerous settings is using an unsafe
|
||||
# value (different than the one specified).
|
||||
if danger["key"] == key and danger["safe_value"] != value:
|
||||
self.log.warning(
|
||||
'Found suspicious "%s" setting "%s = %s" (%s)',
|
||||
namespace,
|
||||
key,
|
||||
value,
|
||||
danger["description"],
|
||||
self.alertstore.medium(
|
||||
f'Found suspicious "{namespace}" setting "{key} = {value}" ({danger["description"]})',
|
||||
"",
|
||||
{
|
||||
"namespace": namespace,
|
||||
"key": key,
|
||||
"value": value,
|
||||
"description": danger["description"],
|
||||
},
|
||||
)
|
||||
break
|
||||
|
||||
@@ -101,8 +101,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
continue
|
||||
|
||||
if result.get("command_line", []):
|
||||
command_name = result.get("command_line")[0].split("/")[-1]
|
||||
command_name = result["command_line"][0]
|
||||
command_name = result["command_line"][0].split("/")[-1]
|
||||
ioc_match = self.indicators.check_process(command_name)
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
@@ -200,7 +199,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
# eg. "Process uptime: 40s"
|
||||
tombstone[destination_key] = int(value_clean.rstrip("s"))
|
||||
elif destination_key == "command_line":
|
||||
# XXX: Check if command line should be a single string in a list, or a list of strings.
|
||||
# Wrap in list for consistency with protobuf format (repeated string).
|
||||
tombstone[destination_key] = [value_clean]
|
||||
else:
|
||||
tombstone[destination_key] = value_clean
|
||||
@@ -262,7 +261,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
@staticmethod
|
||||
def _parse_timestamp_string(timestamp: str) -> str:
|
||||
timestamp_parsed = parser.parse(timestamp)
|
||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||
# Preserve the source wall-clock time while returning the project-wide ISO format.
|
||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||
return convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
|
||||
@@ -355,7 +355,13 @@ def check_intrusion_logs(
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = (
|
||||
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
|
||||
)
|
||||
|
||||
@@ -41,7 +41,7 @@ class AQFFiles(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -107,16 +107,16 @@ class AQFFiles(AndroidQFModule):
|
||||
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
|
||||
self.alertstore.high(msg, "", result)
|
||||
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_file_hash(result.get("sha256") or "")
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
for hash_key in ("sha256", "sha1", "md5"):
|
||||
file_hash = result.get(hash_key, "")
|
||||
if not file_hash:
|
||||
continue
|
||||
ioc_match = self.indicators.check_file_hash(file_hash)
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
break
|
||||
|
||||
def run(self) -> None:
|
||||
if timezone := self._get_device_timezone():
|
||||
@@ -131,7 +131,7 @@ class AQFFiles(AndroidQFModule):
|
||||
data = json.loads(rawdata)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = []
|
||||
for line in rawdata.split("\n"):
|
||||
for line in rawdata.splitlines():
|
||||
if line.strip() == "":
|
||||
continue
|
||||
data.append(json.loads(line))
|
||||
@@ -142,11 +142,11 @@ class AQFFiles(AndroidQFModule):
|
||||
utc_timestamp = datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
# Convert the UTC timestamp to local tiem on Android device's local timezone
|
||||
# Convert the UTC timestamp to local time on Android device's local timezone
|
||||
local_timestamp = utc_timestamp.astimezone(device_timezone)
|
||||
|
||||
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
|
||||
# set the timestamp timezone to UTC, to avoid the timezone conversion again.
|
||||
# Preserve the device-local wall-clock time while using
|
||||
# the project-wide ISO conversion helper.
|
||||
local_timestamp = local_timestamp.replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def run(self) -> None:
|
||||
getprop_files = self._get_files_by_pattern("*/getprop.txt")
|
||||
|
||||
@@ -27,7 +27,7 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -30,7 +30,7 @@ class AQFPackages(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFProcesses(ProcessesArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
def run(self) -> None:
|
||||
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
|
||||
@@ -40,7 +40,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
|
||||
self.results[namespace] = {}
|
||||
data = self._get_file_content(setting_file)
|
||||
for line in data.decode("utf-8").split("\n"):
|
||||
for line in data.decode("utf-8").splitlines():
|
||||
line = line.strip()
|
||||
try:
|
||||
key, value = line.split("=", 1)
|
||||
|
||||
@@ -23,7 +23,7 @@ class AndroidQFModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -32,7 +32,7 @@ class Mounts(MountsArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
@@ -66,6 +66,9 @@ class Mounts(MountsArtifact, AndroidQFModule):
|
||||
# AndroidQF format: array of strings like
|
||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
||||
mount_content = "\n".join(json_data)
|
||||
else:
|
||||
self.log.error("Expected mounts.json to contain a list of mount lines")
|
||||
return
|
||||
self.parse(mount_content)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to parse mount information: %s", exc)
|
||||
|
||||
@@ -21,10 +21,6 @@ from .base import AndroidQFModule
|
||||
class SMS(AndroidQFModule):
|
||||
"""
|
||||
This module analyse SMS file in backup
|
||||
|
||||
XXX: We should also de-duplicate this AQF module, but first we
|
||||
need to add tests for loading encrypted SMS backups through the backup
|
||||
sub-module.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -22,7 +22,7 @@ class BackupModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -20,7 +20,7 @@ class SMS(BackupModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class BugReportModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysADBState(DumpsysADBArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysGetProp(GetPropArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,8 +42,10 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
)
|
||||
return
|
||||
|
||||
data = data.decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
|
||||
decoded_data = data.decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(
|
||||
decoded_data, "DUMP OF SERVICE platform_compat:"
|
||||
)
|
||||
self.parse(content)
|
||||
|
||||
self.log.info("Found %d uninstalled apps", len(self.results))
|
||||
|
||||
@@ -22,7 +22,7 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -24,7 +24,7 @@ class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -23,7 +23,7 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -264,6 +264,7 @@ SECURITY_EVENT_TAGS = {
|
||||
}
|
||||
|
||||
SECURITY_EVENT_METADATA_KEYS = {
|
||||
"event_id",
|
||||
"event_time",
|
||||
"event_type",
|
||||
"timestamp",
|
||||
@@ -302,10 +303,15 @@ class SecurityEvent(IntrusionLogsModule):
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check security events against indicators of compromise."""
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
# Heuristic alerts are intrinsic to the event, so they run even
|
||||
# when no indicator set is loaded.
|
||||
self._check_security_heuristics(result)
|
||||
|
||||
# The remaining checks match events against loaded indicators.
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
# Check app process start events for suspicious package names
|
||||
if "app_process_start" in result:
|
||||
process_info = result["app_process_start"]
|
||||
@@ -389,60 +395,73 @@ class SecurityEvent(IntrusionLogsModule):
|
||||
matched_indicator=ioc.ioc,
|
||||
)
|
||||
|
||||
# Flag failed cryptographic operations as potentially suspicious
|
||||
if "key_generated" in result:
|
||||
if not result["key_generated"].get("success", True):
|
||||
self.log.warning(
|
||||
"Failed key generation detected for key_id: %s",
|
||||
result["key_generated"].get("key_id", "unknown"),
|
||||
)
|
||||
|
||||
# Flag certificate validation failures
|
||||
if "cert_validation_failure" in result:
|
||||
def _check_security_heuristics(self, result: dict) -> None:
|
||||
"""Raise alerts for events that are intrinsically suspicious,
|
||||
independent of any loaded indicators."""
|
||||
# Flag failed cryptographic operations as potentially suspicious
|
||||
if "key_generated" in result:
|
||||
if not result["key_generated"].get("success", True):
|
||||
self.log.warning(
|
||||
"Certificate validation failure detected: %s",
|
||||
result.get("cert_validation_failure"),
|
||||
"Failed key generation detected for key_id: %s",
|
||||
result["key_generated"].get("key_id", "unknown"),
|
||||
)
|
||||
|
||||
# Flag key integrity violations
|
||||
if "key_integrity_violation" in result:
|
||||
# Flag certificate validation failures
|
||||
if "cert_validation_failure" in result:
|
||||
self.alertstore.medium(
|
||||
"Certificate validation failure detected: "
|
||||
f"{result.get('cert_validation_failure')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag key integrity violations
|
||||
if "key_integrity_violation" in result:
|
||||
self.alertstore.medium(
|
||||
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag successful certificate authority installations (potential
|
||||
# MITM); a missing success field is treated as installed
|
||||
if "cert_authority_installed" in result:
|
||||
cert_info = result["cert_authority_installed"]
|
||||
if cert_info.get("success", True):
|
||||
self.alertstore.medium(
|
||||
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
|
||||
"Certificate authority installed: "
|
||||
f"{cert_info.get('subject', 'unknown')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag certificate authority installations (potential MITM)
|
||||
if "cert_authority_installed" in result:
|
||||
cert_info = result["cert_authority_installed"]
|
||||
else:
|
||||
self.log.warning(
|
||||
"Certificate authority installed: %s (success: %s)",
|
||||
"Failed certificate authority install attempt: %s",
|
||||
cert_info.get("subject", "unknown"),
|
||||
cert_info.get("success", "unknown"),
|
||||
)
|
||||
|
||||
# Flag wipe failures
|
||||
if "wipe_failure" in result:
|
||||
# Flag wipe failures
|
||||
if "wipe_failure" in result:
|
||||
self.alertstore.medium(
|
||||
"Device wipe failure detected",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag crypto self test failures
|
||||
if "crypto_self_test_completed" in result:
|
||||
test_result = result["crypto_self_test_completed"]
|
||||
if isinstance(test_result, dict):
|
||||
success = test_result.get("success", True)
|
||||
else:
|
||||
success = test_result == 1
|
||||
if not success:
|
||||
self.alertstore.medium(
|
||||
"Device wipe failure detected",
|
||||
"Cryptographic self test failed",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag crypto self test failures
|
||||
if "crypto_self_test_completed" in result:
|
||||
test_result = result["crypto_self_test_completed"]
|
||||
if isinstance(test_result, dict):
|
||||
success = test_result.get("success", True)
|
||||
else:
|
||||
success = test_result == 1
|
||||
if not success:
|
||||
self.alertstore.medium(
|
||||
"Cryptographic self test failed",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
"""Serialize a security event record for timeline output."""
|
||||
# Determine the event sub-type
|
||||
|
||||
@@ -29,9 +29,6 @@ class InvalidBackupPassword(AndroidBackupParsingError):
|
||||
pass
|
||||
|
||||
|
||||
# TODO: Need to clean all the following code and conform it to the coding style.
|
||||
|
||||
|
||||
def to_utf8_bytes(input_bytes):
|
||||
output = []
|
||||
for byte in input_bytes:
|
||||
@@ -157,13 +154,13 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
|
||||
checksum_salt=checksum_salt,
|
||||
)
|
||||
|
||||
# Decrypt and unpad backup data using derivied key.
|
||||
# Decrypt and unpad backup data using derived key.
|
||||
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
return unpadder.update(decrypted_tar)
|
||||
return unpadder.update(decrypted_tar) + unpadder.finalize()
|
||||
|
||||
|
||||
def parse_backup_file(data, password=None):
|
||||
@@ -210,6 +207,8 @@ def parse_tar_for_sms(data):
|
||||
or member.name.endswith("_mms_backup")
|
||||
):
|
||||
dhandler = tar.extractfile(member)
|
||||
if not dhandler:
|
||||
continue
|
||||
res.extend(parse_sms_file(dhandler.read()))
|
||||
|
||||
return res
|
||||
|
||||
@@ -8,5 +8,6 @@ from .module import MVTModule
|
||||
class Artifact(MVTModule):
|
||||
"""Base class for artifacts.
|
||||
|
||||
XXX: Inheriting from MVTModule to have the same signature as other modules. Not sure if this is a good idea.
|
||||
Artifacts share the MVTModule lifecycle so commands can run artifacts and
|
||||
extraction modules through the same interface.
|
||||
"""
|
||||
|
||||
+82
-12
@@ -8,6 +8,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from heapq import heappop, heappush
|
||||
from typing import Any, Optional
|
||||
|
||||
from rich.console import Console
|
||||
@@ -18,6 +19,7 @@ from .alerts import AlertLevel, AlertStore
|
||||
from .config import settings
|
||||
from .indicators import Indicators
|
||||
from .module import EncryptedBackupError, MVTModule, run_module, save_timeline
|
||||
from .module_types import ModuleTimeline
|
||||
from .utils import (
|
||||
CustomJSONEncoder,
|
||||
convert_datetime_to_iso,
|
||||
@@ -44,7 +46,7 @@ class Command:
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
self.name = ""
|
||||
self.modules: list[Any] = []
|
||||
self.modules: list[type[MVTModule]] = []
|
||||
|
||||
self.target_path = target_path
|
||||
self.results_path = results_path
|
||||
@@ -63,10 +65,10 @@ class Command:
|
||||
|
||||
# This list will contain all executed modules.
|
||||
# We can use this to reference e.g. self.executed[0].results.
|
||||
self.executed: list[Any] = []
|
||||
self.executed: list[MVTModule] = []
|
||||
self.hashes = hashes
|
||||
self.hash_values: list[dict[str, Any]] = []
|
||||
self.timeline: list[dict[str, Any]] = []
|
||||
self.timeline: ModuleTimeline = []
|
||||
|
||||
# Load IOCs
|
||||
self._create_storage()
|
||||
@@ -258,22 +260,85 @@ class Command:
|
||||
console.print("")
|
||||
console.print(panel)
|
||||
|
||||
def _ordered_modules(self) -> Optional[list[type[MVTModule]]]:
|
||||
"""Return enabled modules in stable topological order."""
|
||||
module_indexes = {module: index for index, module in enumerate(self.modules)}
|
||||
|
||||
if self.module_name:
|
||||
selected = [
|
||||
module for module in self.modules if module.__name__ == self.module_name
|
||||
]
|
||||
else:
|
||||
selected = [module for module in self.modules if module.enabled]
|
||||
|
||||
required = set(selected)
|
||||
pending = list(selected)
|
||||
while pending:
|
||||
module = pending.pop()
|
||||
for dependency in module.dependencies:
|
||||
if dependency not in module_indexes:
|
||||
self.log.warning(
|
||||
"Module %s depends on unavailable module %s. "
|
||||
"No modules will be run.",
|
||||
module.__name__,
|
||||
dependency.__name__,
|
||||
)
|
||||
return None
|
||||
if dependency not in required:
|
||||
required.add(dependency)
|
||||
pending.append(dependency)
|
||||
|
||||
dependents: dict[type[MVTModule], list[type[MVTModule]]] = {
|
||||
module: [] for module in required
|
||||
}
|
||||
indegree = {module: 0 for module in required}
|
||||
for module in required:
|
||||
for dependency in module.dependencies:
|
||||
if dependency not in required:
|
||||
continue
|
||||
dependents[dependency].append(module)
|
||||
indegree[module] += 1
|
||||
|
||||
ready: list[tuple[int, type[MVTModule]]] = []
|
||||
for module, count in indegree.items():
|
||||
if count == 0:
|
||||
heappush(ready, (module_indexes[module], module))
|
||||
|
||||
ordered = []
|
||||
while ready:
|
||||
_, module = heappop(ready)
|
||||
ordered.append(module)
|
||||
for dependent in dependents[module]:
|
||||
indegree[dependent] -= 1
|
||||
if indegree[dependent] == 0:
|
||||
heappush(ready, (module_indexes[dependent], dependent))
|
||||
|
||||
if len(ordered) != len(required):
|
||||
cyclic_modules = sorted(
|
||||
(module.__name__ for module, count in indegree.items() if count > 0)
|
||||
)
|
||||
self.log.warning(
|
||||
"Circular module dependency detected involving: %s. "
|
||||
"No modules will be run.",
|
||||
", ".join(cyclic_modules),
|
||||
)
|
||||
return None
|
||||
|
||||
return ordered
|
||||
|
||||
def run(self) -> None:
|
||||
ordered_modules = self._ordered_modules()
|
||||
if ordered_modules is None:
|
||||
return
|
||||
|
||||
try:
|
||||
self.init()
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
for module in self.modules:
|
||||
if self.module_name and module.__name__ != self.module_name:
|
||||
continue
|
||||
executed_by_type: dict[type[MVTModule], MVTModule] = {}
|
||||
for module in ordered_modules:
|
||||
|
||||
if not module.enabled and not (
|
||||
self.module_name and module.__name__ == self.module_name
|
||||
):
|
||||
continue
|
||||
|
||||
# FIXME: do we need the logger here
|
||||
module_logger = logging.getLogger(module.__module__)
|
||||
|
||||
m = module(
|
||||
@@ -282,6 +347,10 @@ class Command:
|
||||
module_options=self.module_options,
|
||||
log=module_logger,
|
||||
)
|
||||
m.dependency_modules = {
|
||||
dependency: executed_by_type[dependency]
|
||||
for dependency in module.dependencies
|
||||
}
|
||||
|
||||
if self.iocs.total_ioc_count:
|
||||
m.indicators = self.iocs
|
||||
@@ -305,6 +374,7 @@ class Command:
|
||||
return
|
||||
|
||||
self.executed.append(m)
|
||||
executed_by_type[module] = m
|
||||
self.timeline.extend(m.timeline)
|
||||
self.alertstore.extend(m.alertstore.alerts)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import logging
|
||||
import os
|
||||
import re
|
||||
from dataclasses import asdict, is_dataclass
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Dict, Optional, Sequence
|
||||
|
||||
from .alerts import AlertStore
|
||||
from .indicators import Indicators
|
||||
@@ -43,6 +43,7 @@ class MVTModule:
|
||||
|
||||
enabled: bool = True
|
||||
slug: Optional[str] = None
|
||||
dependencies: Sequence[type["MVTModule"]] = ()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -51,7 +52,7 @@ class MVTModule:
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[Dict[str, Any]] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
"""Initialize module.
|
||||
|
||||
@@ -71,16 +72,22 @@ class MVTModule:
|
||||
self.file_path: Optional[str] = file_path
|
||||
self.target_path: Optional[str] = target_path
|
||||
self.results_path: Optional[str] = results_path
|
||||
self.module_options: Optional[Dict[str, Any]] = (
|
||||
module_options if module_options else {}
|
||||
)
|
||||
self.serial: Optional[str] = None
|
||||
self.module_options: Dict[str, Any] = module_options if module_options else {}
|
||||
|
||||
self.log = log
|
||||
self.indicators: Optional[Indicators] = None
|
||||
self.alertstore: AlertStore = AlertStore(log=log)
|
||||
|
||||
self.results: ModuleResults = results if results else []
|
||||
self.results: ModuleResults = results if results is not None else []
|
||||
self.timeline: ModuleTimeline = []
|
||||
self.dependency_modules: Dict[type["MVTModule"], "MVTModule"] = {}
|
||||
|
||||
def get_dependency_results(
|
||||
self, module_class: type["MVTModule"]
|
||||
) -> ModuleResults:
|
||||
"""Return the results produced by a prerequisite module."""
|
||||
return self.dependency_modules[module_class].results
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str, log: logging.Logger):
|
||||
@@ -109,11 +116,14 @@ class MVTModule:
|
||||
name = self.get_slug()
|
||||
|
||||
if self.results:
|
||||
converted_results: Any
|
||||
if isinstance(self.results, dict):
|
||||
converted_results = self.results
|
||||
else:
|
||||
converted_results = [
|
||||
asdict(result) if is_dataclass(result) else result
|
||||
asdict(result)
|
||||
if is_dataclass(result) and not isinstance(result, type)
|
||||
else result
|
||||
for result in self.results
|
||||
]
|
||||
results_file_name = f"{name}.json"
|
||||
|
||||
@@ -16,7 +16,10 @@ from typing import Any, Dict, List, Union
|
||||
ModuleAtomicResult = Dict[str, Any]
|
||||
|
||||
|
||||
ModuleResults = List[ModuleAtomicResult]
|
||||
# Extraction modules historically use either a list of records or grouped
|
||||
# dictionaries keyed by source path. Keep this alias broad until those shapes
|
||||
# are modeled per module.
|
||||
ModuleResults = Any
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -180,10 +180,8 @@ class IndicatorsUpdates:
|
||||
def _get_remote_file_latest_commit(
|
||||
self, owner: str, repo: str, branch: str, path: str
|
||||
) -> int:
|
||||
# TODO: The branch is currently not taken into consideration.
|
||||
# How do we specify which branch to look up to the API?
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
|
||||
)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
|
||||
+15
-1
@@ -3,11 +3,16 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from tld import get_tld
|
||||
|
||||
from .config import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SHORTENER_DOMAINS = [
|
||||
"0rz.tw",
|
||||
"1drv.ms",
|
||||
@@ -375,7 +380,16 @@ class URL:
|
||||
|
||||
def unshorten(self) -> Optional[str]:
|
||||
"""Unshorten the URL by requesting an HTTP HEAD response."""
|
||||
res = requests.head(self.url)
|
||||
|
||||
if settings.NETWORK_ACCESS_ALLOWED is False:
|
||||
log.info(
|
||||
"Network access disabled (MVT_NETWORK_ACCESS_ALLOWED=False), "
|
||||
"skipping unshorten for %s",
|
||||
self.url,
|
||||
)
|
||||
return ""
|
||||
|
||||
res = requests.head(self.url, timeout=settings.NETWORK_TIMEOUT)
|
||||
if str(res.status_code).startswith("30"):
|
||||
return res.headers["Location"]
|
||||
|
||||
|
||||
@@ -123,10 +123,9 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
|
||||
if from_2001:
|
||||
timestamp = timestamp + 978307200
|
||||
|
||||
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
|
||||
try:
|
||||
return convert_unix_to_utc_datetime(timestamp)
|
||||
except Exception:
|
||||
except (OSError, OverflowError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ class BackupInfo(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -36,7 +36,7 @@ class BackupInfo(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
|
||||
@@ -33,7 +33,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -85,6 +85,35 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
self.alertstore.medium(warning_message, "", result)
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_key(d: dict, key: str) -> None:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_keys(d: dict, keys: list) -> None:
|
||||
for key in keys:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
def _b64encode_plist_bytes(self, plist: dict) -> None:
|
||||
"""Encode binary plist values to base64 for JSON serialization."""
|
||||
if "SignerCerts" in plist:
|
||||
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
|
||||
|
||||
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
|
||||
|
||||
if "OTAProfileStub" in plist:
|
||||
stub = plist["OTAProfileStub"]
|
||||
if "SignerCerts" in stub:
|
||||
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
|
||||
if "PayloadContent" in stub:
|
||||
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
|
||||
|
||||
if "PayloadContent" in plist:
|
||||
for entry in plist["PayloadContent"]:
|
||||
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
|
||||
|
||||
def run(self) -> None:
|
||||
for conf_file in self._get_backup_files_from_manifest(
|
||||
domain=CONF_PROFILES_DOMAIN
|
||||
@@ -113,65 +142,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
except Exception:
|
||||
conf_plist = {}
|
||||
|
||||
# TODO: Tidy up the following code hell.
|
||||
|
||||
if "SignerCerts" in conf_plist:
|
||||
conf_plist["SignerCerts"] = [
|
||||
b64encode(x) for x in conf_plist["SignerCerts"]
|
||||
]
|
||||
|
||||
if "OTAProfileStub" in conf_plist:
|
||||
if "SignerCerts" in conf_plist["OTAProfileStub"]:
|
||||
conf_plist["OTAProfileStub"]["SignerCerts"] = [
|
||||
b64encode(x)
|
||||
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
|
||||
]
|
||||
|
||||
if "PayloadContent" in conf_plist["OTAProfileStub"]:
|
||||
if (
|
||||
"EnrollmentIdentityPersistentID"
|
||||
in conf_plist["OTAProfileStub"]["PayloadContent"]
|
||||
):
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
] = b64encode(
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
]
|
||||
)
|
||||
|
||||
if "PushTokenDataSentToServerKey" in conf_plist:
|
||||
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
|
||||
conf_plist["PushTokenDataSentToServerKey"]
|
||||
)
|
||||
|
||||
if "LastPushTokenHash" in conf_plist:
|
||||
conf_plist["LastPushTokenHash"] = b64encode(
|
||||
conf_plist["LastPushTokenHash"]
|
||||
)
|
||||
|
||||
if "PayloadContent" in conf_plist:
|
||||
for content_entry in range(len(conf_plist["PayloadContent"])):
|
||||
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
]
|
||||
)
|
||||
|
||||
if (
|
||||
"IdentityPersistentRef"
|
||||
in conf_plist["PayloadContent"][content_entry]
|
||||
):
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
]
|
||||
)
|
||||
self._b64encode_plist_bytes(conf_plist)
|
||||
|
||||
self.results.append(
|
||||
{
|
||||
|
||||
@@ -33,7 +33,7 @@ class Manifest(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -34,7 +34,7 @@ class ProfileEvents(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class IOSExtraction(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -53,7 +53,7 @@ class IOSExtraction(MVTModule):
|
||||
:param file_path: Path to the malformed database file.
|
||||
|
||||
"""
|
||||
# TODO: Find a better solution.
|
||||
# SQLite's immutable mode cannot open databases with active WAL files.
|
||||
if not forced:
|
||||
# If the database is open, do not use immutable
|
||||
if os.path.isfile(file_path + "-shm"):
|
||||
|
||||
@@ -34,7 +34,7 @@ class Analytics(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -44,7 +44,7 @@ class Analytics(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -30,7 +30,7 @@ class AnalyticsIOSVersions(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -25,7 +25,7 @@ class CacheFiles(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -99,6 +99,10 @@ class CacheFiles(IOSExtraction):
|
||||
|
||||
def run(self) -> None:
|
||||
self.results: dict = {}
|
||||
if not self.target_path:
|
||||
self.log.error("No filesystem dump path provided")
|
||||
return
|
||||
|
||||
for root, _, files in os.walk(self.target_path):
|
||||
for file_name in files:
|
||||
if file_name != "Cache.db":
|
||||
|
||||
@@ -29,7 +29,7 @@ class Filesystem(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -73,6 +73,10 @@ class Filesystem(IOSExtraction):
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
self.log.error("No filesystem dump path provided")
|
||||
return
|
||||
|
||||
for root, dirs, files in os.walk(self.target_path):
|
||||
for dir_name in dirs:
|
||||
try:
|
||||
|
||||
@@ -30,7 +30,7 @@ class Netusage(NetBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class SafariFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -41,7 +41,7 @@ class SafariFavicon(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -30,7 +30,7 @@ class ShutdownLog(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -78,7 +78,7 @@ class ShutdownLog(IOSExtraction):
|
||||
recent_processes = []
|
||||
times_delayed = 0
|
||||
delay = 0.0
|
||||
for line in content.split("\n"):
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith("remaining client pid:"):
|
||||
|
||||
@@ -32,7 +32,7 @@ class IOSVersionHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,7 +42,7 @@ class IOSVersionHistory(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
self.results: list = [] if results is None else results
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -34,7 +34,7 @@ class WebkitIndexedDB(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -32,7 +32,7 @@ class WebkitLocalStorage(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -28,7 +28,7 @@ class WebkitSafariViewService(WebkitBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -41,7 +41,7 @@ class Applications(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -31,7 +31,7 @@ class Calendar(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -27,7 +27,7 @@ class Calls(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: list = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -16,7 +16,6 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
|
||||
from ..base import IOSExtraction
|
||||
|
||||
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_FAVICON_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
|
||||
]
|
||||
@@ -32,7 +31,7 @@ class ChromeFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -18,7 +18,6 @@ from ..base import IOSExtraction
|
||||
CHROME_HISTORY_BACKUP_IDS = [
|
||||
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
|
||||
]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_HISTORY_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
|
||||
]
|
||||
@@ -34,7 +33,7 @@ class ChromeHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -29,7 +29,7 @@ class Contacts(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -33,7 +33,7 @@ class FirefoxFavicon(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class FirefoxHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -27,7 +27,7 @@ class GlobalPreferences(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class IDStatusCache(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -228,7 +228,7 @@ class InteractionC(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class LocationdClients(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -147,7 +147,6 @@ class LocationdClients(IOSExtraction):
|
||||
# Some migration information are int and not dicts
|
||||
if not isinstance(file_plist[key], dict):
|
||||
continue
|
||||
# FIXME: unclear key format in iOS 17
|
||||
result = file_plist[key]
|
||||
result["package"] = key.rstrip(":")
|
||||
for timestamp in self.timestamps:
|
||||
|
||||
@@ -31,7 +31,7 @@ class Datausage(NetBase):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class OSAnalyticsADDaily(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -36,7 +36,7 @@ class SafariBrowserState(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -38,7 +38,7 @@ class SafariHistory(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class Shortcuts(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class SMS(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -35,7 +35,7 @@ class SMSAttachments(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -56,7 +56,7 @@ class TCC(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -37,7 +37,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -85,32 +85,56 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
|
||||
try:
|
||||
try:
|
||||
# FIXME: table contains extra fields with timestamp here
|
||||
cur.execute("PRAGMA table_info(ObservedDomains);")
|
||||
available_columns = {row[1] for row in cur}
|
||||
required_columns = [
|
||||
"domainID",
|
||||
"registrableDomain",
|
||||
"lastSeen",
|
||||
"hadUserInteraction",
|
||||
]
|
||||
if not set(required_columns).issubset(available_columns):
|
||||
return
|
||||
|
||||
optional_columns = [
|
||||
column
|
||||
for column in [
|
||||
"mostRecentUserInteractionTime",
|
||||
"mostRecentWebPushInteractionTime",
|
||||
]
|
||||
if column in available_columns
|
||||
]
|
||||
selected_columns = required_columns + optional_columns
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
f"SELECT {', '.join(selected_columns)} FROM ObservedDomains;"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
return
|
||||
|
||||
for row in cur:
|
||||
self.results.append(
|
||||
{
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
result = {
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
for index, column in enumerate(optional_columns, start=4):
|
||||
field = {
|
||||
"mostRecentUserInteractionTime": (
|
||||
"most_recent_user_interaction_time"
|
||||
),
|
||||
"mostRecentWebPushInteractionTime": (
|
||||
"most_recent_web_push_interaction_time"
|
||||
),
|
||||
}[column]
|
||||
timestamp = row[index]
|
||||
result[field] = timestamp
|
||||
if timestamp is not None and timestamp >= 0:
|
||||
result[f"{field}_isodate"] = convert_unix_to_iso(timestamp)
|
||||
self.results.append(result)
|
||||
finally:
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
@@ -39,7 +39,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -50,7 +50,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results: dict = {}
|
||||
self.results: dict = results if results is not None else {}
|
||||
|
||||
@staticmethod
|
||||
def _extract_domains(entries):
|
||||
@@ -77,14 +77,21 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
entry["redirect_destination"]
|
||||
)
|
||||
|
||||
# TODO: Currently not used.
|
||||
# subframe_origins = self._extract_domains(
|
||||
# entry["subframe_under_origin"])
|
||||
# subresource_domains = self._extract_domains(
|
||||
# entry["subresource_under_origin"])
|
||||
subframe_origins = self._extract_domains(
|
||||
entry["subframe_under_origin"]
|
||||
)
|
||||
subresource_domains = self._extract_domains(
|
||||
entry["subresource_under_origin"]
|
||||
)
|
||||
|
||||
all_origins = list(
|
||||
set([entry["origin"]] + source_domains + destination_domains)
|
||||
set(
|
||||
[entry["origin"]]
|
||||
+ source_domains
|
||||
+ destination_domains
|
||||
+ subframe_origins
|
||||
+ subresource_domains
|
||||
)
|
||||
)
|
||||
|
||||
ioc_match = self.indicators.check_urls(all_origins)
|
||||
|
||||
@@ -33,7 +33,7 @@ class Whatsapp(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -30,7 +30,7 @@ class NetBase(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[ModuleResults] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -322,14 +322,11 @@ class NetBase(IOSExtraction):
|
||||
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
# Check for manipulated process records.
|
||||
# TODO: Catching KeyError for live_isodate for retro-compatibility.
|
||||
# This is not very good.
|
||||
try:
|
||||
# check_manipulated/find_deleted require "live_isodate" and
|
||||
# "live_proc_id" keys which may be absent in older result formats.
|
||||
if self.results and "live_isodate" in self.results[0]:
|
||||
self.check_manipulated()
|
||||
self.find_deleted()
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
import logging
|
||||
|
||||
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
|
||||
from mvt.common.alerts import AlertLevel
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from ..utils import get_artifact
|
||||
@@ -38,6 +39,19 @@ class TestDumpsysAccessibilityArtifact:
|
||||
assert da.results[0]["package_name"] == "com.malware.accessibility"
|
||||
assert da.results[0]["service"] == "com.malware.service.malwareservice"
|
||||
|
||||
def test_accessibility_service_alert(self):
|
||||
da = DumpsysAccessibilityArtifact()
|
||||
file = get_artifact("android_data/dumpsys_accessibility_v14_or_later.txt")
|
||||
with open(file) as f:
|
||||
data = f.read()
|
||||
da.parse(data)
|
||||
|
||||
da.check_indicators()
|
||||
|
||||
assert len(da.alertstore.alerts) == 1
|
||||
assert da.alertstore.alerts[0].level == AlertLevel.MEDIUM
|
||||
assert da.alertstore.alerts[0].event == da.results[0]
|
||||
|
||||
def test_ioc_check(self, indicator_file):
|
||||
da = DumpsysAccessibilityArtifact()
|
||||
file = get_artifact("android_data/dumpsys_accessibility.txt")
|
||||
@@ -51,4 +65,12 @@ class TestDumpsysAccessibilityArtifact:
|
||||
da.indicators = ind
|
||||
assert len(da.alertstore.alerts) == 0
|
||||
da.check_indicators()
|
||||
assert len(da.alertstore.alerts) == 1
|
||||
assert len(da.alertstore.alerts) == len(da.results)
|
||||
assert da.alertstore.count(AlertLevel.MEDIUM) == 3
|
||||
assert da.alertstore.count(AlertLevel.CRITICAL) == 1
|
||||
critical_alert = next(
|
||||
alert
|
||||
for alert in da.alertstore.alerts
|
||||
if alert.level == AlertLevel.CRITICAL
|
||||
)
|
||||
assert critical_alert.event["package_name"] == "com.sec.android.app.camera"
|
||||
|
||||
@@ -6,12 +6,14 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mvt.android.cli import check_intrusion_logs
|
||||
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
|
||||
from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule
|
||||
from mvt.android.modules.intrusion_logs.security_event import SecurityEvent
|
||||
from mvt.common.alerts import AlertLevel
|
||||
|
||||
|
||||
def _write_ndjson(path, records):
|
||||
@@ -143,6 +145,58 @@ def test_check_intrusion_logs_parses_core_and_unknown_security_events(
|
||||
assert "Please open an issue on GitHub" in caplog.text
|
||||
|
||||
|
||||
def test_check_intrusion_logs_treats_event_id_as_security_event_metadata(
|
||||
tmp_path, caplog
|
||||
):
|
||||
_write_ndjson(
|
||||
tmp_path / "intrusion.txt",
|
||||
[
|
||||
{
|
||||
"security_event": {
|
||||
"event_id": 191,
|
||||
"event_time": 1_700_000_002_000_000_000,
|
||||
"keyguard_dismiss_auth_attempt": {
|
||||
"success": True,
|
||||
"method_strength": 0,
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"security_event": {
|
||||
"event_id": 192,
|
||||
"event_time": 1_700_000_003_000_000_000,
|
||||
"keyguard_dismissed": {},
|
||||
}
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
|
||||
cmd.run()
|
||||
|
||||
security_module = next(
|
||||
module for module in cmd.executed if isinstance(module, SecurityEvent)
|
||||
)
|
||||
assert security_module.event_type_counts == {
|
||||
"keyguard_dismiss_auth_attempt": 1,
|
||||
"keyguard_dismissed": 1,
|
||||
}
|
||||
assert [event["event_id"] for event in security_module.results] == [191, 192]
|
||||
|
||||
keyguard_events = {
|
||||
event["event"]: event
|
||||
for event in cmd.timeline
|
||||
if event["event"]
|
||||
in {"keyguard_dismiss_auth_attempt", "keyguard_dismissed"}
|
||||
}
|
||||
assert "Auth attempt: Success" in keyguard_events[
|
||||
"keyguard_dismiss_auth_attempt"
|
||||
]["data"]
|
||||
assert keyguard_events["keyguard_dismissed"]["data"] == "Keyguard dismissed"
|
||||
assert "unknown intrusion logging security event type(s): event_id" not in caplog.text
|
||||
|
||||
|
||||
def test_check_intrusion_logs_cli_lists_modules(tmp_path):
|
||||
_write_ndjson(tmp_path / "intrusion.txt", [])
|
||||
|
||||
@@ -152,3 +206,82 @@ def test_check_intrusion_logs_cli_lists_modules(tmp_path):
|
||||
assert "DnsEvent" in result.output
|
||||
assert "ConnectEvent" in result.output
|
||||
assert "SecurityEvent" in result.output
|
||||
|
||||
|
||||
def _run_security_heuristics(results):
|
||||
# No indicators loaded: heuristic alerts must still fire.
|
||||
module = SecurityEvent(results=results)
|
||||
module.check_indicators()
|
||||
return module.alertstore.alerts
|
||||
|
||||
|
||||
def test_cert_authority_installed_raises_medium_alert_without_indicators():
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_authority_installed": {
|
||||
"subject": "CN=Unexpected Root CA",
|
||||
"success": True,
|
||||
},
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0].level == AlertLevel.MEDIUM
|
||||
assert "Certificate authority installed" in alerts[0].message
|
||||
assert "Unexpected Root CA" in alerts[0].message
|
||||
|
||||
|
||||
# Exported logs encode success as a JSON bool, raw SecurityLog as int 0/1.
|
||||
@pytest.mark.parametrize("success", [False, 0])
|
||||
def test_failed_cert_authority_install_does_not_alert(success, caplog):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_authority_installed": {
|
||||
"subject": "CN=Unexpected Root CA",
|
||||
"success": success,
|
||||
},
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert alerts == []
|
||||
assert "Failed certificate authority install attempt" in caplog.text
|
||||
assert "Unexpected Root CA" in caplog.text
|
||||
|
||||
|
||||
def test_cert_validation_failure_raises_medium_alert_without_indicators():
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_validation_failure": "chain validation failed",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0].level == AlertLevel.MEDIUM
|
||||
assert "Certificate validation failure" in alerts[0].message
|
||||
|
||||
|
||||
def test_security_heuristics_fire_when_no_indicators_loaded():
|
||||
# check_indicators() previously returned early with no indicators loaded,
|
||||
# so none of the heuristic alerts fired on a default run.
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{"timestamp": "2024-01-01 00:00:00.000", "wipe_failure": {"reason": "x"}},
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"key_integrity_violation": {"key_id": "k1"},
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 2
|
||||
assert all(alert.level == AlertLevel.MEDIUM for alert in alerts)
|
||||
|
||||
@@ -21,4 +21,5 @@ class TestSettingsModule:
|
||||
run_module(m)
|
||||
assert len(m.results) == 1
|
||||
assert "random" in m.results.keys()
|
||||
assert len(m.alertstore.alerts) == 0
|
||||
assert len(m.alertstore.alerts) == 1
|
||||
assert "samsung_errorlog_agree" in m.alertstore.alerts[0].message
|
||||
|
||||
@@ -4,11 +4,59 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class RecordingModule(MVTModule):
|
||||
run_order: list[str] = []
|
||||
|
||||
def run(self):
|
||||
self.run_order.append(self.__class__.__name__)
|
||||
|
||||
def check_indicators(self):
|
||||
pass
|
||||
|
||||
|
||||
class FirstModule(RecordingModule):
|
||||
def run(self):
|
||||
super().run()
|
||||
self.results = ["first"]
|
||||
|
||||
|
||||
class SecondModule(RecordingModule):
|
||||
dependencies = (FirstModule,)
|
||||
|
||||
def run(self):
|
||||
super().run()
|
||||
self.results = self.get_dependency_results(FirstModule) + ["second"]
|
||||
|
||||
|
||||
class ThirdModule(RecordingModule):
|
||||
dependencies = (SecondModule,)
|
||||
|
||||
|
||||
class IndependentModule(RecordingModule):
|
||||
pass
|
||||
|
||||
|
||||
class RecordingCommand(Command):
|
||||
def init(self):
|
||||
self.initialized = True
|
||||
|
||||
def module_init(self, module):
|
||||
pass
|
||||
|
||||
def finish(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestCommand:
|
||||
def setup_method(self):
|
||||
RecordingModule.run_order = []
|
||||
|
||||
def test_store_alerts_handles_bytes(self, tmp_path):
|
||||
cmd = Command(results_path=str(tmp_path))
|
||||
cmd.alertstore.medium(
|
||||
@@ -21,3 +69,66 @@ class TestCommand:
|
||||
|
||||
alerts = json.loads((tmp_path / "alerts.json").read_text())
|
||||
assert alerts[0]["event"]["payload"] == "\\xa8\\xa9"
|
||||
|
||||
def test_modules_run_in_stable_topological_order(self):
|
||||
cmd = RecordingCommand()
|
||||
cmd.modules = [ThirdModule, IndependentModule, SecondModule, FirstModule]
|
||||
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == [
|
||||
"IndependentModule",
|
||||
"FirstModule",
|
||||
"SecondModule",
|
||||
"ThirdModule",
|
||||
]
|
||||
second = next(module for module in cmd.executed if isinstance(module, SecondModule))
|
||||
assert second.results == ["first", "second"]
|
||||
|
||||
def test_selected_module_runs_transitive_dependencies(self):
|
||||
cmd = RecordingCommand(module_name="ThirdModule")
|
||||
cmd.modules = [ThirdModule, SecondModule, FirstModule, IndependentModule]
|
||||
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == [
|
||||
"FirstModule",
|
||||
"SecondModule",
|
||||
"ThirdModule",
|
||||
]
|
||||
|
||||
def test_circular_dependency_warns_and_stops(self, caplog):
|
||||
class CircularOne(RecordingModule):
|
||||
pass
|
||||
|
||||
class CircularTwo(RecordingModule):
|
||||
dependencies = (CircularOne,)
|
||||
|
||||
CircularOne.dependencies = (CircularTwo,)
|
||||
|
||||
cmd = RecordingCommand()
|
||||
cmd.modules = [CircularOne, CircularTwo]
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == []
|
||||
assert not hasattr(cmd, "initialized")
|
||||
assert "Circular module dependency detected" in caplog.text
|
||||
|
||||
def test_unavailable_dependency_warns_and_stops(self, caplog):
|
||||
class UnavailableModule(RecordingModule):
|
||||
pass
|
||||
|
||||
class DependentModule(RecordingModule):
|
||||
dependencies = (UnavailableModule,)
|
||||
|
||||
cmd = RecordingCommand()
|
||||
cmd.modules = [DependentModule]
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == []
|
||||
assert not hasattr(cmd, "initialized")
|
||||
assert "depends on unavailable module UnavailableModule" in caplog.text
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import sqlite3
|
||||
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.mixed.webkit_resource_load_statistics import (
|
||||
WebkitResourceLoadStatistics,
|
||||
@@ -19,3 +21,50 @@ class TestWebkitResourceLoadStatisticsModule:
|
||||
assert len(m.results) == 2
|
||||
assert len(m.timeline) == 2
|
||||
assert len(m.alertstore.alerts) == 0
|
||||
|
||||
results = {result["registrable_domain"]: result for result in m.results}
|
||||
assert results["google.com"]["most_recent_user_interaction_time"] > 0
|
||||
assert "most_recent_user_interaction_time_isodate" in results["google.com"]
|
||||
assert results["gstatic.com"]["most_recent_user_interaction_time"] == -1.0
|
||||
assert (
|
||||
"most_recent_user_interaction_time_isodate"
|
||||
not in results["gstatic.com"]
|
||||
)
|
||||
assert all(
|
||||
"most_recent_web_push_interaction_time" not in result
|
||||
for result in m.results
|
||||
)
|
||||
|
||||
def test_webkit_full_timestamp_schema(self, tmp_path):
|
||||
db_path = tmp_path / "observations.db"
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE ObservedDomains (
|
||||
domainID INTEGER PRIMARY KEY,
|
||||
registrableDomain TEXT NOT NULL,
|
||||
lastSeen REAL NOT NULL,
|
||||
hadUserInteraction INTEGER NOT NULL,
|
||||
mostRecentUserInteractionTime REAL NOT NULL,
|
||||
mostRecentWebPushInteractionTime REAL NOT NULL
|
||||
);
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO ObservedDomains VALUES (?, ?, ?, ?, ?, ?);
|
||||
""",
|
||||
(1, "example.com", 1634560250.0, 1, 1634560030.0, -1.0),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
m = WebkitResourceLoadStatistics(target_path=str(tmp_path))
|
||||
m._process_observations_db(str(db_path), "", "observations.db")
|
||||
|
||||
assert len(m.results) == 1
|
||||
result = m.results[0]
|
||||
assert result["most_recent_user_interaction_time"] == 1634560030.0
|
||||
assert "most_recent_user_interaction_time_isodate" in result
|
||||
assert result["most_recent_web_push_interaction_time"] == -1.0
|
||||
assert "most_recent_web_push_interaction_time_isodate" not in result
|
||||
|
||||
Reference in New Issue
Block a user