fix typing for mypy

This commit is contained in:
Janik Besendorf
2025-12-20 09:50:55 +01:00
parent 801c464492
commit c779009550
59 changed files with 366 additions and 225 deletions

View File

@@ -20,23 +20,39 @@ class AndroidArtifact(Artifact):
:param binary: whether the dumpsys should be pared as binary or not (bool)
:return: section extracted (string or bytes)
"""
lines = []
in_section = False
delimiter = "------------------------------------------------------------------------------"
delimiter_str = "------------------------------------------------------------------------------"
delimiter_bytes = b"------------------------------------------------------------------------------"
if binary:
delimiter = delimiter.encode("utf-8")
lines_bytes = []
for line in dumpsys.splitlines(): # type: ignore[union-attr]
if line.strip() == separator: # type: ignore[arg-type]
in_section = True
continue
for line in dumpsys.splitlines():
if line.strip() == separator:
in_section = True
continue
if not in_section:
continue
if not in_section:
continue
if line.strip().startswith(delimiter_bytes): # type: ignore[arg-type]
break
if line.strip().startswith(delimiter):
break
lines_bytes.append(line) # type: ignore[arg-type]
lines.append(line)
return b"\n".join(lines_bytes) # type: ignore[return-value,arg-type]
else:
lines_str = []
for line in dumpsys.splitlines(): # type: ignore[union-attr]
if line.strip() == separator: # type: ignore[arg-type]
in_section = True
continue
return b"\n".join(lines) if binary else "\n".join(lines)
if not in_section:
continue
if line.strip().startswith(delimiter_str): # type: ignore[arg-type]
break
lines_str.append(line) # type: ignore[arg-type]
return "\n".join(lines_str) # type: ignore[return-value,arg-type]

View File

@@ -84,7 +84,7 @@ class DumpsysADBArtifact(AndroidArtifact):
return keystore
@staticmethod
def calculate_key_info(user_key: bytes) -> str:
def calculate_key_info(user_key: bytes) -> dict:
if b" " in user_key:
key_base64, user = user_key.split(b" ", 1)
else:

View File

@@ -60,7 +60,8 @@ class GetProp(AndroidArtifact):
if entry["name"] == "ro.build.version.security_patch":
warning_message = warn_android_patch_level(entry["value"], self.log)
self.alertstore.medium(warning_message, "", entry)
if isinstance(warning_message, str):
self.alertstore.medium(warning_message, "", entry)
if not self.indicators:
return

View File

@@ -167,6 +167,8 @@ class CmdAndroidCheckAndroidQF(Command):
if bugreport:
bugreport.close()
return True
def run_backup_cmd(self) -> bool:
try:
backup = self.load_backup()

View File

@@ -9,7 +9,7 @@ import os
import sys
import tarfile
from pathlib import Path
from typing import List, Optional
from typing import List, Optional, cast
from mvt.android.modules.backup.base import BackupModule
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
@@ -93,22 +93,28 @@ class CmdAndroidCheckBackup(Command):
self.__files.append(member.name)
def init(self) -> None:
if not self.target_path:
if not self.target_path: # type: ignore[has-type]
return
if os.path.isfile(self.target_path):
# Type guard: we know it's not None here after the check above
assert self.target_path is not None # type: ignore[has-type]
# Use a different local variable name to avoid any scoping issues
backup_path: str = self.target_path # type: ignore[has-type]
if os.path.isfile(backup_path):
self.__type = "ab"
with open(self.target_path, "rb") as handle:
with open(backup_path, "rb") as handle:
ab_file_bytes = handle.read()
self.from_ab(ab_file_bytes)
elif os.path.isdir(self.target_path):
elif os.path.isdir(backup_path):
self.__type = "folder"
self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
backup_path = Path(backup_path).absolute().as_posix()
self.target_path = backup_path
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
for fname in subfiles:
self.__files.append(
os.path.relpath(os.path.join(root, fname), self.target_path)
os.path.relpath(os.path.join(root, fname), backup_path)
)
else:
log.critical(

View File

@@ -96,6 +96,8 @@ class CmdAndroidCheckBugreport(Command):
if self.__format == "zip":
module.from_zip(self.__zip, self.__files)
else:
if not self.target_path:
raise ValueError("target_path is not set")
module.from_dir(self.target_path, self.__files)
def finish(self) -> None:

View File

@@ -40,7 +40,7 @@ class ChromeHistory(AndroidExtraction):
log=log,
results=results,
)
self.results = []
self.results: list = []
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {

View File

@@ -6,9 +6,10 @@
import logging
from typing import Optional
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
from .base import AndroidExtraction
class SELinuxStatus(AndroidExtraction):
"""This module checks if SELinux is being enforced."""
@@ -33,7 +34,7 @@ class SELinuxStatus(AndroidExtraction):
results=results,
)
self.results = {} if not results else results
self.results: dict = {}
def run(self) -> None:
self._adb_connect()

View File

@@ -112,7 +112,7 @@ class AQFFiles(AndroidQFModule):
if result.get("sha256", "") == "":
continue
ioc_match = self.indicators.check_file_hash(result.get("sha256"))
ioc_match = self.indicators.check_file_hash(result.get("sha256") or "")
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc

View File

@@ -7,9 +7,9 @@ import logging
from typing import Optional
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from mvt.common.module_types import ModuleResults
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFGetProp(GetPropArtifact, AndroidQFModule):
@@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results = []
self.results: list = []
def run(self) -> None:
getprop_files = self._get_files_by_pattern("*/getprop.txt")

View File

@@ -3,15 +3,16 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import datetime
import logging
import os
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleResults
from .base import AndroidQFModule
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
from mvt.common.module_types import ModuleResults
from mvt.common.utils import convert_datetime_to_iso
from .base import AndroidQFModule
class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
@@ -37,11 +38,13 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
results=results,
)
def _get_file_modification_time(self, file_path: str) -> dict:
def _get_file_modification_time(self, file_path: str) -> datetime.datetime:
if self.archive:
file_timetuple = self.archive.getinfo(file_path).date_time
return datetime.datetime(*file_timetuple)
else:
if not self.parent_path:
raise ValueError("parent_path is not set")
file_stat = os.stat(os.path.join(self.parent_path, file_path))
return datetime.datetime.fromtimestamp(file_stat.st_mtime)

View File

@@ -98,7 +98,7 @@ class AQFPackages(AndroidQFModule):
if not self.indicators:
continue
ioc_match = self.indicators.check_app_id(result.get("name"))
ioc_match = self.indicators.check_app_id(result.get("name") or "")
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
@@ -106,7 +106,9 @@ class AQFPackages(AndroidQFModule):
self.alertstore.log_latest()
for package_file in result.get("files", []):
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
ioc_match = self.indicators.check_file_hash(
package_file.get("sha256") or ""
)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc

View File

@@ -7,9 +7,9 @@ import logging
from typing import Optional
from mvt.android.artifacts.settings import Settings as SettingsArtifact
from mvt.common.module_types import ModuleResults
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFSettings(SettingsArtifact, AndroidQFModule):
@@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results = {}
self.results: dict = {}
def run(self) -> None:
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):

View File

@@ -33,8 +33,8 @@ class AndroidQFModule(MVTModule):
log=log,
results=results,
)
self.parent_path = None
self._path: str = target_path
self.parent_path: Optional[str] = None
self._path: Optional[str] = target_path
self.files: List[str] = []
self.archive: Optional[zipfile.ZipFile] = None

View File

@@ -3,8 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import json
import logging
from typing import Optional
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
@@ -32,7 +32,7 @@ class Mounts(MountsArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results = []
self.results: list = []
def run(self) -> None:
"""

View File

@@ -9,7 +9,7 @@ import os
from tarfile import TarFile
from typing import List, Optional
from mvt.common.module import MVTModule, ModuleResults
from mvt.common.module import ModuleResults, MVTModule
class BackupModule(MVTModule):
@@ -32,10 +32,10 @@ class BackupModule(MVTModule):
log=log,
results=results,
)
self.ab = None
self.backup_path = None
self.tar = None
self.files = []
self.ab: Optional[str] = None
self.backup_path: Optional[str] = None
self.tar: Optional[TarFile] = None
self.files: list = []
def from_dir(self, backup_path: Optional[str], files: List[str]) -> None:
self.backup_path = backup_path
@@ -55,12 +55,15 @@ class BackupModule(MVTModule):
return fnmatch.filter(self.files, pattern)
def _get_file_content(self, file_path: str) -> bytes:
handle = None
if self.tar:
try:
member = self.tar.getmember(file_path)
handle = self.tar.extractfile(member)
if not handle:
raise ValueError(f"Could not extract file: {file_path}")
except KeyError:
return None
handle = self.tar.extractfile(member)
raise FileNotFoundError(f"File not found in tar: {file_path}")
elif self.backup_path:
handle = open(os.path.join(self.backup_path, file_path), "rb")
else:

View File

@@ -6,11 +6,10 @@ import datetime
import fnmatch
import logging
import os
from typing import List, Optional
from zipfile import ZipFile
from mvt.common.module import MVTModule, ModuleResults
from mvt.common.module import ModuleResults, MVTModule
class BugReportModule(MVTModule):
@@ -69,6 +68,8 @@ class BugReportModule(MVTModule):
if self.zip_archive:
handle = self.zip_archive.open(file_path)
else:
if not self.extract_path:
raise ValueError("extract_path is not set")
handle = open(os.path.join(self.extract_path, file_path), "rb")
data = handle.read()
@@ -76,7 +77,7 @@ class BugReportModule(MVTModule):
return data
def _get_dumpstate_file(self) -> bytes:
def _get_dumpstate_file(self) -> Optional[bytes]:
main = self._get_files_by_pattern("main_entry.txt")
if main:
main_content = self._get_file_content(main[0])
@@ -91,10 +92,12 @@ class BugReportModule(MVTModule):
return self._get_file_content(dumpstate_logs[0])
def _get_file_modification_time(self, file_path: str) -> dict:
def _get_file_modification_time(self, file_path: str) -> datetime.datetime:
if self.zip_archive:
file_timetuple = self.zip_archive.getinfo(file_path).date_time
return datetime.datetime(*file_timetuple)
else:
if not self.extract_path:
raise ValueError("extract_path is not set")
file_stat = os.stat(os.path.join(self.extract_path, file_path))
return datetime.datetime.fromtimestamp(file_stat.st_mtime)

View File

@@ -6,9 +6,9 @@
import logging
from typing import Optional
from mvt.common.module_types import ModuleResults
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRESHOLD
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -43,8 +43,9 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
)
return
data = data.decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
content = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE package:"
)
self.parse(content)
for result in self.results:

View File

@@ -154,7 +154,7 @@ class Command:
if not self.results_path:
return
target_path = None
target_path: Optional[str] = None
if self.target_path:
target_path = os.path.abspath(self.target_path)

View File

@@ -1,8 +1,8 @@
import os
import yaml
import json
import os
from typing import Optional, Tuple, Type
from typing import Tuple, Type, Optional
import yaml
from appdirs import user_config_dir
from pydantic import AnyHttpUrl, Field
from pydantic_settings import (
@@ -22,51 +22,51 @@ class MVTSettings(BaseSettings):
env_prefix="MVT_",
env_nested_delimiter="_",
extra="ignore",
nested_model_default_partial_updates=True,
)
# Allow to decided if want to load environment variables
load_env: bool = Field(True, exclude=True)
# General settings
PYPI_UPDATE_URL: AnyHttpUrl = Field(
"https://pypi.org/pypi/mvt/json",
validate_default=False,
PYPI_UPDATE_URL: str = Field(
default="https://pypi.org/pypi/mvt/json",
)
NETWORK_ACCESS_ALLOWED: bool = True
NETWORK_TIMEOUT: int = 15
# Command default settings, all can be specified by MVT_ prefixed environment variables too.
IOS_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt iOS backups"
default=None, description="Default password to use to decrypt iOS backups"
)
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt Android backups"
default=None, description="Default password to use to decrypt Android backups"
)
STIX2: Optional[str] = Field(
None, description="List of directories where STIX2 files are stored"
default=None, description="List of directories where STIX2 files are stored"
)
VT_API_KEY: Optional[str] = Field(
None, description="API key to use for VirusTotal lookups"
default=None, description="API key to use for VirusTotal lookups"
)
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
PROFILE: bool = Field(
default=False, description="Profile the execution of MVT modules"
)
HASH_FILES: bool = Field(default=False, description="Should MVT hash output files")
@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[BaseSettings],
init_settings: InitSettingsSource,
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
sources = (
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
yaml_source = YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH)
sources: Tuple[PydanticBaseSettingsSource, ...] = (
yaml_source,
init_settings,
)
# Load env variables if enabled
if init_settings.init_kwargs.get("load_env", True):
sources = (env_settings,) + sources
# Always load env variables by default
sources = (env_settings,) + sources
return sources
def save_settings(
@@ -94,11 +94,11 @@ class MVTSettings(BaseSettings):
Afterwards we load the settings again, this time including the env variables.
"""
# Set invalid env prefix to avoid loading env variables.
settings = MVTSettings(load_env=False)
settings = cls(load_env=False)
settings.save_settings()
# Load the settings again with any ENV variables.
settings = MVTSettings(load_env=True)
settings = cls(load_env=True)
return settings

View File

@@ -7,15 +7,15 @@ import glob
import json
import logging
import os
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Optional
from dataclasses import dataclass
import ahocorasick
from appdirs import user_data_dir
from .url import URL
from .config import settings
from .url import URL
MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
@@ -68,7 +68,7 @@ class Indicators:
self.parse_stix2(path)
elif os.path.isdir(path):
for file in glob.glob(
os.path.join(path, "**", "*.stix2", recursive=True)
os.path.join(path, "**", "*.stix2"), recursive=True
):
self.parse_stix2(file)
else:
@@ -350,7 +350,7 @@ class Indicators:
@lru_cache()
def get_ioc_matcher(
self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None
self, ioc_type: Optional[str] = None, ioc_list: Optional[List[Indicator]] = None
) -> ahocorasick.Automaton:
"""
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
@@ -370,9 +370,9 @@ class Indicators:
"""
automaton = ahocorasick.Automaton()
if ioc_type:
iocs = self.get_iocs(ioc_type)
iocs: Iterator[Indicator] = self.get_iocs(ioc_type)
elif ioc_list:
iocs = ioc_list
iocs = iter(ioc_list)
else:
raise ValueError("Must provide either ioc_type or ioc_list")

View File

@@ -146,7 +146,10 @@ class MVTModule:
for record in timeline:
timeline_set.add(
json.dumps(
asdict(record) if is_dataclass(record) else record, sort_keys=True
asdict(record)
if is_dataclass(record) and not isinstance(record, type)
else record,
sort_keys=True,
)
)
@@ -161,9 +164,9 @@ class MVTModule:
record: ModuleSerializedResult = self.serialize(result)
if record:
if isinstance(record, list):
self.timeline.extend(record)
self.timeline.extend(record) # type: ignore[arg-type]
else:
self.timeline.append(record)
self.timeline.append(record) # type: ignore[arg-type]
# De-duplicate timeline entries.
self.timeline = self._deduplicate_timeline(self.timeline)

View File

@@ -3,15 +3,18 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .indicators import Indicator
from dataclasses import dataclass
from typing import List, Union, Optional
from typing import Any, Dict, List, Optional, Union
from .indicators import Indicator
@dataclass
class ModuleAtomicResult:
timestamp: Optional[str]
matched_indicator: Optional[Indicator]
# ModuleAtomicResult is a flexible dictionary that can contain any data.
# Common fields include:
# - timestamp: Optional[str] - timestamp string
# - isodate: Optional[str] - ISO formatted date string
# - matched_indicator: Optional[Indicator] - indicator that matched this result
# - Any other module-specific fields
ModuleAtomicResult = Dict[str, Any]
ModuleResults = List[ModuleAtomicResult]
@@ -26,4 +29,7 @@ class ModuleAtomicTimeline:
ModuleTimeline = List[ModuleAtomicTimeline]
ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline]
# ModuleSerializedResult can be a proper timeline object or a plain dict for compatibility
ModuleSerializedResult = Union[
ModuleAtomicTimeline, ModuleTimeline, Dict[str, Any], List[Dict[str, Any]]
]

View File

@@ -12,9 +12,9 @@ import requests
import yaml
from packaging import version
from .config import settings
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
from .version import MVT_VERSION
from .config import settings
log = logging.getLogger(__name__)
@@ -25,7 +25,7 @@ INDICATORS_CHECK_FREQUENCY = 12
class MVTUpdates:
def check(self) -> str:
try:
res = requests.get(settings.PYPI_UPDATE_URL, timeout=5)
res = requests.get(str(settings.PYPI_UPDATE_URL), timeout=5)
except requests.exceptions.RequestException as e:
log.error("Failed to check for updates, skipping updates: %s", e)
return ""

View File

@@ -338,11 +338,12 @@ class URL:
:rtype: str
"""
return (
get_tld(self.url, as_object=True, fix_protocol=True)
.parsed_url.netloc.lower()
.lstrip("www.")
)
tld_obj = get_tld(self.url, as_object=True, fix_protocol=True)
if isinstance(tld_obj, str):
return tld_obj
if tld_obj is None:
return ""
return tld_obj.parsed_url.netloc.lower().lstrip("www.")
def get_top_level(self) -> str:
"""Get only the top-level domain from a URL.
@@ -351,7 +352,12 @@ class URL:
:rtype: str
"""
return get_tld(self.url, as_object=True, fix_protocol=True).fld.lower()
tld_obj = get_tld(self.url, as_object=True, fix_protocol=True)
if isinstance(tld_obj, str):
return tld_obj
if tld_obj is None:
return ""
return tld_obj.fld.lower()
def check_if_shortened(self) -> bool:
"""Check if the URL is among list of shortener services.

View File

@@ -58,6 +58,7 @@ class DecryptBackup:
def _process_file(
self, relative_path: str, domain: str, item, file_id: str, item_folder: str
) -> None:
assert self._backup is not None
self._backup.getFileDecryptedCopy(
manifestEntry=item, targetName=file_id, targetFolder=item_folder
)
@@ -70,6 +71,9 @@ class DecryptBackup:
)
def process_backup(self) -> None:
assert self._backup is not None
assert self.dest_path is not None
if not os.path.exists(self.dest_path):
os.makedirs(self.dest_path)
@@ -97,7 +101,7 @@ class DecryptBackup:
)
continue
item_folder = os.path.join(self.dest_path, file_id[0:2])
item_folder = os.path.join(self.dest_path, file_id[0:2]) # type: ignore[arg-type]
if not os.path.exists(item_folder):
os.makedirs(item_folder)

View File

@@ -36,9 +36,11 @@ class BackupInfo(IOSExtraction):
results=results,
)
self.results = {}
self.results: dict = {}
def run(self) -> None:
if not self.target_path:
raise DatabaseNotFoundError("target_path is not set")
info_path = os.path.join(self.target_path, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError(

View File

@@ -9,12 +9,12 @@ import plistlib
from base64 import b64encode
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@@ -72,12 +72,10 @@ class ConfigurationProfiles(IOSExtraction):
result["plist"]["PayloadUUID"]
)
if ioc_match:
warning_message = (
f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"',
)
warning_message = f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"'
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), warning_message, "", result
warning_message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
continue
@@ -85,10 +83,8 @@ class ConfigurationProfiles(IOSExtraction):
# Highlight suspicious configuration profiles which may be used
# to hide notifications.
if payload_content["PayloadType"] in ["com.apple.notificationsettings"]:
warning_message = (
f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}',
)
self.alertstore.medum(self.get_slug(), warning_message, "", result)
warning_message = f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}'
self.alertstore.medium(warning_message, "", result)
self.alertstore.log_latest()
continue

View File

@@ -11,13 +11,13 @@ import plistlib
from typing import Optional
from mvt.common.module import DatabaseNotFoundError
from mvt.common.url import URL
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.url import URL
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
from ..base import IOSExtraction
@@ -66,7 +66,7 @@ class Manifest(IOSExtraction):
return convert_unix_to_iso(timestamp_or_unix_time_int)
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
records: list = []
if "modified" not in record or "status_changed" not in record:
return records
@@ -103,7 +103,9 @@ class Manifest(IOSExtraction):
ioc_match = self.indicators.check_file_path("/" + result["relative_path"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
self.alertstore.high(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
rel_path = result["relative_path"].lower()
@@ -118,13 +120,15 @@ class Manifest(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f'Found mention of domain "{ioc_match.ioc.value}" in a backup file with path: {rel_path}',
"",
result,
matched_indicator=ioc_match.ioc,
)
def run(self) -> None:
if not self.target_path:
raise DatabaseNotFoundError("target_path is not set")
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
if not os.path.isfile(manifest_db_path):
raise DatabaseNotFoundError("unable to find backup's Manifest.db")

View File

@@ -7,12 +7,12 @@ import logging
import plistlib
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@@ -58,29 +58,31 @@ class ProfileEvents(IOSExtraction):
def check_indicators(self) -> None:
for result in self.results:
message = f'On {result.get("timestamp")} process "{result.get("process")}" started operation "{result.get("operation")}" of profile "{result.get("profile_id")}"'
self.alertstore.low(
self.get_slug(), message, result.get("timestamp"), result
)
self.alertstore.low(message, result.get("timestamp") or "", result)
self.alertstore.log_latest()
if not self.indicators:
return
for result in self.results:
ioc_match = self.indicators.check_process(result.get("process"))
ioc_match = self.indicators.check_process(result.get("process") or "")
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
ioc_match = self.indicators.check_profile(result.get("profile_id"))
ioc_match = self.indicators.check_profile(result.get("profile_id") or "")
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
@staticmethod
def parse_profile_events(file_data: bytes) -> list:
results = []
results: list = []
events_plist = plistlib.loads(file_data)

View File

@@ -11,8 +11,12 @@ import sqlite3
import subprocess
from typing import Iterator, Optional, Union
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError
from mvt.common.module import MVTModule, ModuleResults
from mvt.common.module import (
DatabaseCorruptedError,
DatabaseNotFoundError,
ModuleResults,
MVTModule,
)
class IOSExtraction(MVTModule):
@@ -110,6 +114,8 @@ class IOSExtraction(MVTModule):
(Default value = None)
"""
if not self.target_path:
raise DatabaseNotFoundError("target_path is not set")
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
if not os.path.exists(manifest_db_path):
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
@@ -146,6 +152,8 @@ class IOSExtraction(MVTModule):
}
def _get_backup_file_from_id(self, file_id: str) -> Union[str, None]:
if not self.target_path:
return None
file_path = os.path.join(self.target_path, file_id[0:2], file_id)
if os.path.exists(file_path):
return file_path
@@ -153,6 +161,8 @@ class IOSExtraction(MVTModule):
return None
def _get_fs_files_from_patterns(self, root_paths: list) -> Iterator[str]:
if not self.target_path:
return
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.target_path, root_path)):
if not os.path.exists(found_path):
@@ -174,9 +184,10 @@ class IOSExtraction(MVTModule):
:param backup_ids: Default value = None)
"""
file_path = None
file_path: Optional[str] = None
# First we check if the was an explicit file path specified.
if not self.file_path:
# Type narrowing: we know self.file_path is None here, work with local file_path
# If not, we first try with backups.
# We construct the path to the file according to the iTunes backup
# folder structure, if we have a valid ID.
@@ -198,8 +209,9 @@ class IOSExtraction(MVTModule):
# If we do not find any, we fail.
if file_path:
self.file_path = file_path
self.file_path = file_path # type: str
else:
raise DatabaseNotFoundError("unable to find the module's database file")
assert self.file_path is not None
self._recover_sqlite_db_if_needed(self.file_path)

View File

@@ -9,12 +9,12 @@ import plistlib
import sqlite3
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -44,6 +44,7 @@ class Analytics(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
@@ -64,13 +65,11 @@ class Analytics(IOSExtraction):
ioc_match = self.indicators.check_process(value)
if ioc_match:
warning_message = (
f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}',
)
warning_message = f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}'
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), warning_message, "", new_result
warning_message, "", new_result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
continue
@@ -80,7 +79,10 @@ class Analytics(IOSExtraction):
new_result = copy.copy(result)
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", new_result
ioc_match.message,
"",
new_result,
matched_indicator=ioc_match.ioc,
)
def _extract_analytics_data(self):

View File

@@ -10,9 +10,10 @@ from typing import Optional
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -95,7 +96,7 @@ class CacheFiles(IOSExtraction):
)
def run(self) -> None:
self.results = {}
self.results: dict = {}
for root, _, files in os.walk(self.target_path):
for file_name in files:
if file_name != "Cache.db":

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -41,6 +41,7 @@ class SafariFavicon(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
@@ -61,7 +62,9 @@ class SafariFavicon(IOSExtraction):
ioc_match = self.indicators.check_url(result["icon_url"])
if ioc_match:
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
def _process_favicon_db(self, file_path):

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -57,7 +57,9 @@ class ShutdownLog(IOSExtraction):
for result in self.results:
ioc_match = self.indicators.check_file_path(result["client"])
if ioc_match:
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
continue
@@ -66,10 +68,10 @@ class ShutdownLog(IOSExtraction):
if ioc.value in parts:
result["matched_indicator"] = ioc
self.alertstore.critical(
self.get_slug(),
f'Found mention of a known malicious process "{ioc.value}" in shutdown.log',
"",
result,
matched_indicator=ioc,
)
self.alertstore.log_latest()
continue
@@ -135,5 +137,8 @@ class ShutdownLog(IOSExtraction):
def run(self) -> None:
self._find_ios_database(root_paths=SHUTDOWN_LOG_PATH)
self.log.info("Found shutdown log at path: %s", self.file_path)
if not self.file_path:
return
with open(self.file_path, "r", encoding="utf-8") as handle:
self.process_shutdownlog(handle.read())

View File

@@ -8,12 +8,12 @@ import json
import logging
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@@ -42,6 +42,7 @@ class IOSVersionHistory(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {

View File

@@ -21,7 +21,9 @@ class WebkitBase(IOSExtraction):
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def _process_webkit_folder(self, root_paths):

View File

@@ -11,10 +11,14 @@ from datetime import datetime, timezone
from typing import Any, Dict, Optional
from mvt.common.module import DatabaseNotFoundError
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
from mvt.ios.modules.base import IOSExtraction
from mvt.common.module import ModuleResults, ModuleAtomicResult, ModuleSerializedResult
from ..base import IOSExtraction
APPLICATIONS_DB_PATH = [
"private/var/containers/Bundle/Application/*/iTunesMetadata.plist"
@@ -63,7 +67,6 @@ class Applications(IOSExtraction):
if self.indicators:
if "softwareVersionBundleId" not in result:
self.alertstore.high(
self.get_slug(),
"Suspicious application identified without softwareVersionBundleId",
"",
result,
@@ -76,10 +79,10 @@ class Applications(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(),
f"Malicious application {result['softwareVersionBundleId']} identified",
"",
result,
matched_indicator=ioc_match.ioc,
)
continue
@@ -89,10 +92,10 @@ class Applications(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(),
f"Malicious application {result['softwareVersionBundleId']} identified",
"",
result,
matched_indicator=ioc_match.ioc,
)
continue
@@ -102,7 +105,6 @@ class Applications(IOSExtraction):
not in KNOWN_APP_INSTALLERS
):
self.alertstore.medium(
self.get_slug(),
f"Suspicious app not installed from the App Store or MDM: {result['softwareVersionBundleId']}",
"",
result,
@@ -157,6 +159,8 @@ class Applications(IOSExtraction):
def run(self) -> None:
if self.is_backup:
if not self.target_path:
return
plist_path = os.path.join(self.target_path, "Info.plist")
if not os.path.isfile(plist_path):
raise DatabaseNotFoundError("Impossible to find Info.plist file")

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -73,14 +73,13 @@ class Calendar(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
# Custom check for Quadream exploit
if result["summary"] == "Meeting" and result["description"] == "Notes":
self.alertstore.high(
self.get_slug(),
f"Potential Quadream exploit event identified: {result['uuid']}",
"",
result,

View File

@@ -6,8 +6,8 @@
import logging
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -22,9 +22,9 @@ class Calls(IOSExtraction):
def __init__(
self,
file_path: str = None,
target_path: str = None,
results_path: str = None,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: list = [],
@@ -53,6 +53,8 @@ class Calls(IOSExtraction):
)
self.log.info("Found Calls database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from ..base import IOSExtraction
@@ -62,7 +62,9 @@ class ChromeFavicon(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def run(self) -> None:
@@ -71,6 +73,8 @@ class ChromeFavicon(IOSExtraction):
)
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
# Fetch icon cache

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from ..base import IOSExtraction
@@ -63,7 +63,9 @@ class ChromeHistory(IOSExtraction):
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -71,6 +73,8 @@ class ChromeHistory(IOSExtraction):
)
self.log.info("Found Chrome history database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -8,6 +8,7 @@ import sqlite3
from typing import Optional
from mvt.common.module_types import ModuleResults
from ..base import IOSExtraction
CONTACTS_BACKUP_IDS = [
@@ -45,6 +46,8 @@ class Contacts(IOSExtraction):
)
self.log.info("Found Contacts database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
try:

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@@ -64,7 +64,9 @@ class FirefoxFavicon(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -72,6 +74,8 @@ class FirefoxFavicon(IOSExtraction):
)
self.log.info("Found Firefox favicon database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@@ -64,7 +64,9 @@ class FirefoxHistory(IOSExtraction):
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -72,6 +74,8 @@ class FirefoxHistory(IOSExtraction):
)
self.log.info("Found Firefox history database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -42,9 +42,9 @@ class GlobalPreferences(IOSExtraction):
for entry in self.results:
if entry["entry"] == "LDMGlobalEnabled":
if entry["value"]:
self.alertstore.info("Lockdown mode enabled", "", None)
self.alertstore.info("Lockdown mode enabled", "", entry)
else:
self.alertstore.low("Lockdown mode disabled", "", None)
self.alertstore.low("Lockdown mode disabled", "", entry)
continue
def process_file(self, file_path: str) -> None:
@@ -61,6 +61,8 @@ class GlobalPreferences(IOSExtraction):
)
self.log.info("Found Global Preference database at path: %s", self.file_path)
if not self.file_path:
return
self.process_file(self.file_path)
self.log.info("Extracted a total of %d Global Preferences", len(self.results))

View File

@@ -8,12 +8,12 @@ import logging
import plistlib
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -67,13 +67,12 @@ class IDStatusCache(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
if "\\x00\\x00" in result.get("user", ""):
self.alertstore.high(
self.get_slug(),
f"Found an ID Status Cache entry with suspicious patterns: {result.get('user')}",
"",
result,

View File

@@ -7,12 +7,12 @@ import logging
import sqlite3
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -285,6 +285,8 @@ class InteractionC(IOSExtraction):
)
self.log.info("Found InteractionC database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()

View File

@@ -8,12 +8,12 @@ import logging
import plistlib
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -86,7 +86,6 @@ class LocationdClients(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious process name in LocationD entry {result['package']}",
"",
result,
@@ -99,7 +98,6 @@ class LocationdClients(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious process name in LocationD entry {result['package']}",
"",
result,
@@ -111,8 +109,7 @@ class LocationdClients(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['BundlePath']}",
f"Found a known malicious domain in LocationD entry {result['package']}",
"",
result,
)
@@ -124,7 +121,6 @@ class LocationdClients(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['Executable']}",
"",
result,
@@ -141,7 +137,6 @@ class LocationdClients(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['Registered']}",
"",
result,

View File

@@ -7,12 +7,12 @@ import logging
import plistlib
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@@ -65,7 +65,9 @@ class OSAnalyticsADDaily(IOSExtraction):
ioc_match = self.indicators.check_process(result["package"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -76,6 +78,8 @@ class OSAnalyticsADDaily(IOSExtraction):
"Found com.apple.osanalytics.addaily plist at path: %s", self.file_path
)
if not self.file_path:
return
with open(self.file_path, "rb") as handle:
file_plist = plistlib.load(handle)

View File

@@ -10,12 +10,12 @@ import plistlib
import sqlite3
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
from ..base import IOSExtraction
@@ -67,7 +67,7 @@ class SafariBrowserState(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
@@ -80,7 +80,10 @@ class SafariBrowserState(IOSExtraction):
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
ioc_match.message,
"",
result,
matched_indicator=ioc_match.ioc,
)
def _process_browser_state_db(self, db_path):

View File

@@ -7,13 +7,13 @@ import logging
import os
from typing import Optional
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso
from ..base import IOSExtraction
@@ -117,7 +117,9 @@ class SafariHistory(IOSExtraction):
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def _process_history_db(self, history_path):
self._recover_sqlite_db_if_needed(history_path)

View File

@@ -10,12 +10,12 @@ import plistlib
import sqlite3
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@@ -80,7 +80,9 @@ class Shortcuts(IOSExtraction):
ioc_match = self.indicators.check_urls(result["action_urls"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -88,6 +90,8 @@ class Shortcuts(IOSExtraction):
)
self.log.info("Found Shortcuts database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
conn.text_factory = bytes
cur = conn.cursor()

View File

@@ -8,12 +8,12 @@ import sqlite3
from base64 import b64encode
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@@ -95,12 +95,17 @@ class SMS(IOSExtraction):
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
if not self.file_path:
return
try:
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
@@ -118,6 +123,7 @@ class SMS(IOSExtraction):
except sqlite3.DatabaseError as exc:
conn.close()
if "database disk image is malformed" in str(exc):
assert self.file_path is not None
self._recover_sqlite_db_if_needed(self.file_path, forced=True)
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()

View File

@@ -7,12 +7,12 @@ import logging
from base64 import b64encode
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@@ -65,9 +65,7 @@ class SMSAttachments(IOSExtraction):
ioc_match = self.indicators.check_file_path(attachment["filename"])
if ioc_match:
attachment["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(), ioc_match.message, "", attachment
)
self.alertstore.high(ioc_match.message, "", attachment)
if (
attachment["filename"].startswith("/var/tmp/")
@@ -85,6 +83,8 @@ class SMSAttachments(IOSExtraction):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -8,12 +8,12 @@ import os
import sqlite3
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@@ -69,7 +69,9 @@ class WebkitResourceLoadStatistics(IOSExtraction):
ioc_match = self.indicators.check_url(result["registrable_domain"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
def _process_observations_db(self, db_path: str, domain: str, path: str) -> None:

View File

@@ -8,8 +8,8 @@ import os
import plistlib
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleResults
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@@ -50,7 +50,7 @@ class WebkitSessionResourceLog(IOSExtraction):
results=results,
)
self.results = {} if not results else results
self.results: dict = {}
@staticmethod
def _extract_domains(entries):
@@ -83,15 +83,15 @@ class WebkitSessionResourceLog(IOSExtraction):
# subresource_domains = self._extract_domains(
# entry["subresource_under_origin"])
all_origins = set(
[entry["origin"]] + source_domains + destination_domains
all_origins = list(
set([entry["origin"]] + source_domains + destination_domains)
)
ioc_match = self.indicators.check_urls(all_origins)
if ioc_match:
entry["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", entry
ioc_match.message, "", entry, matched_indicator=ioc_match.ioc
)
redirect_path = ""
@@ -114,7 +114,6 @@ class WebkitSessionResourceLog(IOSExtraction):
redirect_path += ", ".join(destination_domains)
self.alertstore.high(
self.get_slug(),
f"Found HTTP redirect between suspicious domains: {redirect_path}",
"",
entry,
@@ -190,6 +189,8 @@ class WebkitSessionResourceLog(IOSExtraction):
self.log.info(
"Found Safari browsing session resource log at path: %s", log_path
)
if not self.target_path:
continue
key = os.path.relpath(log_path, self.target_path)
self.results[key] = self._extract_browsing_stats(log_path)

View File

@@ -6,12 +6,12 @@
import logging
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@@ -65,7 +65,9 @@ class Whatsapp(IOSExtraction):
ioc_match = self.indicators.check_urls(result.get("links", []))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def run(self) -> None:
self._find_ios_database(
@@ -73,6 +75,8 @@ class Whatsapp(IOSExtraction):
)
self.log.info("Found WhatsApp database at path: %s", self.file_path)
if not self.file_path:
return
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
@@ -102,7 +106,9 @@ class Whatsapp(IOSExtraction):
for index, value in enumerate(message_row):
message[names[index]] = value
message["isodate"] = convert_mactime_to_iso(message.get("ZMESSAGEDATE"))
message["isodate"] = convert_mactime_to_iso(
message.get("ZMESSAGEDATE") or 0
)
message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else ""
# Extract links from the WhatsApp message. URLs can be stored in

View File

@@ -9,8 +9,10 @@ from typing import Dict, Optional
import packaging
IPHONE_MODELS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_models.json"))
IPHONE_IOS_VERSIONS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_versions.json"))
IPHONE_MODELS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_models.json") or b"[]")
IPHONE_IOS_VERSIONS = json.loads(
pkgutil.get_data("mvt", "ios/data/ios_versions.json") or b"[]"
)
def get_device_desc_from_id(identifier: str, devices_list: list = IPHONE_MODELS) -> str: