Merge branch 'main' into fix/enforce-network-access-allowed

This commit is contained in:
besendorf
2026-06-05 20:19:49 +02:00
committed by GitHub
82 changed files with 334 additions and 235 deletions
@@ -10,16 +10,20 @@ from .artifact import AndroidArtifact
class DumpsysAccessibilityArtifact(AndroidArtifact):
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
if self.indicators:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
self.alertstore.medium(
f'Found accessibility service: "{result["service"]}"',
"",
result,
)
continue
def parse(self, content: str) -> None:
"""
+11 -4
View File
@@ -131,10 +131,17 @@ class DumpsysADBArtifact(AndroidArtifact):
)
return
# TODO: Parse AdbDebuggingManager line in output.
start_of_json = content.find(b"\n{") + 2
end_of_json = content.rfind(b"}\n") - 2
json_content = content[start_of_json:end_of_json].rstrip()
start_of_json = content.find(b"\n{")
if start_of_json == -1:
self.log.error("Unable to find ADB manager state in dumpsys output")
return
end_of_json = content.rfind(b"}\n")
if end_of_json == -1 or end_of_json <= start_of_json:
self.log.error("Unable to find complete ADB manager state in dumpsys output")
return
json_content = content[start_of_json + 2 : end_of_json - 2].rstrip()
parsed = self.indented_dump_parser(json_content)
if parsed.get("debugging_manager") is None:
@@ -14,9 +14,12 @@ from .artifact import AndroidArtifact
class DumpsysPackagesArtifact(AndroidArtifact):
def check_indicators(self) -> None:
alerted_root_packages = set()
for result in self.results:
# XXX: De-duplication Package detections
if result["package_name"] in ROOT_PACKAGES:
if result["package_name"] in alerted_root_packages:
continue
alerted_root_packages.add(result["package_name"])
self.alertstore.medium(
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
"",
@@ -188,7 +191,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
package = []
in_package_list = False
for line in content.split("\n"):
for line in content.splitlines():
if line.startswith("Packages:"):
in_package_list = True
continue
+1 -1
View File
@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
class Processes(AndroidArtifact):
def parse(self, entry: str) -> None:
for line in entry.split("\n")[1:]:
for line in entry.splitlines()[1:]:
proc = line.split()
# Skip empty lines
+9 -6
View File
@@ -67,11 +67,14 @@ class Settings(AndroidArtifact):
# Check if one of the dangerous settings is using an unsafe
# value (different than the one specified).
if danger["key"] == key and danger["safe_value"] != value:
self.log.warning(
'Found suspicious "%s" setting "%s = %s" (%s)',
namespace,
key,
value,
danger["description"],
self.alertstore.medium(
f'Found suspicious "{namespace}" setting "{key} = {value}" ({danger["description"]})',
"",
{
"namespace": namespace,
"key": key,
"value": value,
"description": danger["description"],
},
)
break
@@ -101,8 +101,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
continue
if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
command_name = result["command_line"][0]
command_name = result["command_line"][0].split("/")[-1]
ioc_match = self.indicators.check_process(command_name)
if ioc_match:
self.alertstore.critical(
@@ -200,7 +199,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
# eg. "Process uptime: 40s"
tombstone[destination_key] = int(value_clean.rstrip("s"))
elif destination_key == "command_line":
# XXX: Check if command line should be a single string in a list, or a list of strings.
# Wrap in list for consistency with protobuf format (repeated string).
tombstone[destination_key] = [value_clean]
else:
tombstone[destination_key] = value_clean
@@ -262,7 +261,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
@staticmethod
def _parse_timestamp_string(timestamp: str) -> str:
timestamp_parsed = parser.parse(timestamp)
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
# Preserve the source wall-clock time while returning the project-wide ISO format.
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
return convert_datetime_to_iso(local_timestamp)
+7 -1
View File
@@ -355,7 +355,13 @@ def check_intrusion_logs(
@click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
cmd = CmdCheckIOCS(
target_path=folder,
ioc_files=iocs,
module_name=module,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
)
cmd.modules = (
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
)
+15 -15
View File
@@ -41,7 +41,7 @@ class AQFFiles(AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -107,16 +107,16 @@ class AQFFiles(AndroidQFModule):
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
self.alertstore.high(msg, "", result)
if result.get("sha256", "") == "":
continue
ioc_match = self.indicators.check_file_hash(result.get("sha256") or "")
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
# TODO: adds SHA1 and MD5 when available in MVT
for hash_key in ("sha256", "sha1", "md5"):
file_hash = result.get(hash_key, "")
if not file_hash:
continue
ioc_match = self.indicators.check_file_hash(file_hash)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
break
def run(self) -> None:
if timezone := self._get_device_timezone():
@@ -131,7 +131,7 @@ class AQFFiles(AndroidQFModule):
data = json.loads(rawdata)
except json.decoder.JSONDecodeError:
data = []
for line in rawdata.split("\n"):
for line in rawdata.splitlines():
if line.strip() == "":
continue
data.append(json.loads(line))
@@ -142,11 +142,11 @@ class AQFFiles(AndroidQFModule):
utc_timestamp = datetime.datetime.fromtimestamp(
file_data[ts], tz=datetime.timezone.utc
)
# Convert the UTC timestamp to local tiem on Android device's local timezone
# Convert the UTC timestamp to local time on Android device's local timezone
local_timestamp = utc_timestamp.astimezone(device_timezone)
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
# set the timestamp timezone to UTC, to avoid the timezone conversion again.
# Preserve the device-local wall-clock time while using
# the project-wide ISO conversion helper.
local_timestamp = local_timestamp.replace(
tzinfo=datetime.timezone.utc
)
@@ -22,7 +22,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results: list = []
self.results: list = [] if results is None else results
def run(self) -> None:
getprop_files = self._get_files_by_pattern("*/getprop.txt")
@@ -27,7 +27,7 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -30,7 +30,7 @@ class AQFPackages(AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class AQFProcesses(ProcessesArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results: dict = {}
self.results: dict = results if results is not None else {}
def run(self) -> None:
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
@@ -40,7 +40,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
self.results[namespace] = {}
data = self._get_file_content(setting_file)
for line in data.decode("utf-8").split("\n"):
for line in data.decode("utf-8").splitlines():
line = line.strip()
try:
key, value = line.split("=", 1)
+1 -1
View File
@@ -23,7 +23,7 @@ class AndroidQFModule(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+4 -1
View File
@@ -32,7 +32,7 @@ class Mounts(MountsArtifact, AndroidQFModule):
log=log,
results=results,
)
self.results: list = []
self.results: list = [] if results is None else results
def run(self) -> None:
"""
@@ -66,6 +66,9 @@ class Mounts(MountsArtifact, AndroidQFModule):
# AndroidQF format: array of strings like
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
mount_content = "\n".join(json_data)
else:
self.log.error("Expected mounts.json to contain a list of mount lines")
return
self.parse(mount_content)
except Exception as exc:
self.log.error("Failed to parse mount information: %s", exc)
-4
View File
@@ -21,10 +21,6 @@ from .base import AndroidQFModule
class SMS(AndroidQFModule):
"""
This module analyse SMS file in backup
XXX: We should also de-duplicate this AQF module, but first we
need to add tests for loading encrypted SMS backups through the backup
sub-module.
"""
def __init__(
+1 -1
View File
@@ -22,7 +22,7 @@ class BackupModule(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -20,7 +20,7 @@ class SMS(BackupModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -23,7 +23,7 @@ class BugReportModule(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -24,7 +24,7 @@ class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysADBState(DumpsysADBArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -24,7 +24,7 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysGetProp(GetPropArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -23,7 +23,7 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -22,7 +22,7 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -42,8 +42,10 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
)
return
data = data.decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
decoded_data = data.decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(
decoded_data, "DUMP OF SERVICE platform_compat:"
)
self.parse(content)
self.log.info("Found %d uninstalled apps", len(self.results))
@@ -22,7 +22,7 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -24,7 +24,7 @@ class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -23,7 +23,7 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+4 -5
View File
@@ -29,9 +29,6 @@ class InvalidBackupPassword(AndroidBackupParsingError):
pass
# TODO: Need to clean all the following code and conform it to the coding style.
def to_utf8_bytes(input_bytes):
output = []
for byte in input_bytes:
@@ -157,13 +154,13 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
checksum_salt=checksum_salt,
)
# Decrypt and unpad backup data using derivied key.
# Decrypt and unpad backup data using derived key.
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
decryptor = cipher.decryptor()
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(decrypted_tar)
return unpadder.update(decrypted_tar) + unpadder.finalize()
def parse_backup_file(data, password=None):
@@ -210,6 +207,8 @@ def parse_tar_for_sms(data):
or member.name.endswith("_mms_backup")
):
dhandler = tar.extractfile(member)
if not dhandler:
continue
res.extend(parse_sms_file(dhandler.read()))
return res
+2 -1
View File
@@ -8,5 +8,6 @@ from .module import MVTModule
class Artifact(MVTModule):
"""Base class for artifacts.
XXX: Inheriting from MVTModule to have the same signature as other modules. Not sure if this is a good idea.
Artifacts share the MVTModule lifecycle so commands can run artifacts and
extraction modules through the same interface.
"""
-1
View File
@@ -273,7 +273,6 @@ class Command:
):
continue
# FIXME: do we need the logger here
module_logger = logging.getLogger(module.__module__)
m = module(
+7 -6
View File
@@ -51,7 +51,7 @@ class MVTModule:
results_path: Optional[str] = None,
module_options: Optional[Dict[str, Any]] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
"""Initialize module.
@@ -71,15 +71,13 @@ class MVTModule:
self.file_path: Optional[str] = file_path
self.target_path: Optional[str] = target_path
self.results_path: Optional[str] = results_path
self.module_options: Optional[Dict[str, Any]] = (
module_options if module_options else {}
)
self.module_options: Dict[str, Any] = module_options if module_options else {}
self.log = log
self.indicators: Optional[Indicators] = None
self.alertstore: AlertStore = AlertStore(log=log)
self.results: ModuleResults = results if results else []
self.results: ModuleResults = results if results is not None else []
self.timeline: ModuleTimeline = []
@classmethod
@@ -109,11 +107,14 @@ class MVTModule:
name = self.get_slug()
if self.results:
converted_results: Any
if isinstance(self.results, dict):
converted_results = self.results
else:
converted_results = [
asdict(result) if is_dataclass(result) else result
asdict(result)
if is_dataclass(result) and not isinstance(result, type)
else result
for result in self.results
]
results_file_name = f"{name}.json"
+4 -1
View File
@@ -16,7 +16,10 @@ from typing import Any, Dict, List, Union
ModuleAtomicResult = Dict[str, Any]
ModuleResults = List[ModuleAtomicResult]
# Extraction modules historically use either a list of records or grouped
# dictionaries keyed by source path. Keep this alias broad until those shapes
# are modeled per module.
ModuleResults = Any
@dataclass
+1 -3
View File
@@ -180,10 +180,8 @@ class IndicatorsUpdates:
def _get_remote_file_latest_commit(
self, owner: str, repo: str, branch: str, path: str
) -> int:
# TODO: The branch is currently not taken into consideration.
# How do we specify which branch to look up to the API?
file_commit_url = (
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
)
try:
res = requests.get(file_commit_url, timeout=5)
+1 -2
View File
@@ -123,10 +123,9 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
if from_2001:
timestamp = timestamp + 978307200
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
try:
return convert_unix_to_utc_datetime(timestamp)
except Exception:
except (OSError, OverflowError, ValueError):
return None
+2 -2
View File
@@ -25,7 +25,7 @@ class BackupInfo(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -36,7 +36,7 @@ class BackupInfo(IOSExtraction):
results=results,
)
self.results: dict = {}
self.results: dict = results if results is not None else {}
def run(self) -> None:
if not self.target_path:
@@ -33,7 +33,7 @@ class ConfigurationProfiles(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -85,6 +85,35 @@ class ConfigurationProfiles(IOSExtraction):
self.alertstore.medium(warning_message, "", result)
continue
@staticmethod
def _b64encode_key(d: dict, key: str) -> None:
if key in d:
d[key] = b64encode(d[key])
@staticmethod
def _b64encode_keys(d: dict, keys: list) -> None:
for key in keys:
if key in d:
d[key] = b64encode(d[key])
def _b64encode_plist_bytes(self, plist: dict) -> None:
"""Encode binary plist values to base64 for JSON serialization."""
if "SignerCerts" in plist:
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
if "OTAProfileStub" in plist:
stub = plist["OTAProfileStub"]
if "SignerCerts" in stub:
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
if "PayloadContent" in stub:
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
if "PayloadContent" in plist:
for entry in plist["PayloadContent"]:
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
def run(self) -> None:
for conf_file in self._get_backup_files_from_manifest(
domain=CONF_PROFILES_DOMAIN
@@ -113,65 +142,7 @@ class ConfigurationProfiles(IOSExtraction):
except Exception:
conf_plist = {}
# TODO: Tidy up the following code hell.
if "SignerCerts" in conf_plist:
conf_plist["SignerCerts"] = [
b64encode(x) for x in conf_plist["SignerCerts"]
]
if "OTAProfileStub" in conf_plist:
if "SignerCerts" in conf_plist["OTAProfileStub"]:
conf_plist["OTAProfileStub"]["SignerCerts"] = [
b64encode(x)
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
]
if "PayloadContent" in conf_plist["OTAProfileStub"]:
if (
"EnrollmentIdentityPersistentID"
in conf_plist["OTAProfileStub"]["PayloadContent"]
):
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
] = b64encode(
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
]
)
if "PushTokenDataSentToServerKey" in conf_plist:
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
conf_plist["PushTokenDataSentToServerKey"]
)
if "LastPushTokenHash" in conf_plist:
conf_plist["LastPushTokenHash"] = b64encode(
conf_plist["LastPushTokenHash"]
)
if "PayloadContent" in conf_plist:
for content_entry in range(len(conf_plist["PayloadContent"])):
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
]
)
if (
"IdentityPersistentRef"
in conf_plist["PayloadContent"][content_entry]
):
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
]
)
self._b64encode_plist_bytes(conf_plist)
self.results.append(
{
+1 -1
View File
@@ -33,7 +33,7 @@ class Manifest(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -34,7 +34,7 @@ class ProfileEvents(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+2 -2
View File
@@ -31,7 +31,7 @@ class IOSExtraction(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -53,7 +53,7 @@ class IOSExtraction(MVTModule):
:param file_path: Path to the malformed database file.
"""
# TODO: Find a better solution.
# SQLite's immutable mode cannot open databases with active WAL files.
if not forced:
# If the database is open, do not use immutable
if os.path.isfile(file_path + "-shm"):
+2 -2
View File
@@ -34,7 +34,7 @@ class Analytics(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -44,7 +44,7 @@ class Analytics(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
self.results: list = [] if results is None else results
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
@@ -30,7 +30,7 @@ class AnalyticsIOSVersions(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+5 -1
View File
@@ -25,7 +25,7 @@ class CacheFiles(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -99,6 +99,10 @@ class CacheFiles(IOSExtraction):
def run(self) -> None:
self.results: dict = {}
if not self.target_path:
self.log.error("No filesystem dump path provided")
return
for root, _, files in os.walk(self.target_path):
for file_name in files:
if file_name != "Cache.db":
+5 -1
View File
@@ -29,7 +29,7 @@ class Filesystem(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -73,6 +73,10 @@ class Filesystem(IOSExtraction):
)
def run(self) -> None:
if not self.target_path:
self.log.error("No filesystem dump path provided")
return
for root, dirs, files in os.walk(self.target_path):
for dir_name in dirs:
try:
+1 -1
View File
@@ -30,7 +30,7 @@ class Netusage(NetBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+2 -2
View File
@@ -31,7 +31,7 @@ class SafariFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -41,7 +41,7 @@ class SafariFavicon(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
self.results: list = [] if results is None else results
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
+2 -2
View File
@@ -30,7 +30,7 @@ class ShutdownLog(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -78,7 +78,7 @@ class ShutdownLog(IOSExtraction):
recent_processes = []
times_delayed = 0
delay = 0.0
for line in content.split("\n"):
for line in content.splitlines():
line = line.strip()
if line.startswith("remaining client pid:"):
+2 -2
View File
@@ -32,7 +32,7 @@ class IOSVersionHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -42,7 +42,7 @@ class IOSVersionHistory(IOSExtraction):
log=log,
results=results,
)
self.results: list = []
self.results: list = [] if results is None else results
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
+1 -1
View File
@@ -34,7 +34,7 @@ class WebkitIndexedDB(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -32,7 +32,7 @@ class WebkitLocalStorage(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -28,7 +28,7 @@ class WebkitSafariViewService(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -41,7 +41,7 @@ class Applications(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -31,7 +31,7 @@ class Calendar(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -27,7 +27,7 @@ class Calls(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: list = [],
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -2
View File
@@ -16,7 +16,6 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
from ..base import IOSExtraction
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
# TODO: Confirm Chrome database path.
CHROME_FAVICON_ROOT_PATHS = [
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
]
@@ -32,7 +31,7 @@ class ChromeFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -2
View File
@@ -18,7 +18,6 @@ from ..base import IOSExtraction
CHROME_HISTORY_BACKUP_IDS = [
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
]
# TODO: Confirm Chrome database path.
CHROME_HISTORY_ROOT_PATHS = [
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
]
@@ -34,7 +33,7 @@ class ChromeHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -29,7 +29,7 @@ class Contacts(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -33,7 +33,7 @@ class FirefoxFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -37,7 +37,7 @@ class FirefoxHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -27,7 +27,7 @@ class GlobalPreferences(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -36,7 +36,7 @@ class IDStatusCache(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -228,7 +228,7 @@ class InteractionC(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -2
View File
@@ -36,7 +36,7 @@ class LocationdClients(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -147,7 +147,6 @@ class LocationdClients(IOSExtraction):
# Some migration information are int and not dicts
if not isinstance(file_plist[key], dict):
continue
# FIXME: unclear key format in iOS 17
result = file_plist[key]
result["package"] = key.rstrip(":")
for timestamp in self.timestamps:
+1 -1
View File
@@ -31,7 +31,7 @@ class Datausage(NetBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -35,7 +35,7 @@ class OSAnalyticsADDaily(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -36,7 +36,7 @@ class SafariBrowserState(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -38,7 +38,7 @@ class SafariHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -37,7 +37,7 @@ class Shortcuts(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -35,7 +35,7 @@ class SMS(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -35,7 +35,7 @@ class SMSAttachments(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+1 -1
View File
@@ -56,7 +56,7 @@ class TCC(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -37,7 +37,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -85,32 +85,56 @@ class WebkitResourceLoadStatistics(IOSExtraction):
try:
try:
# FIXME: table contains extra fields with timestamp here
cur.execute("PRAGMA table_info(ObservedDomains);")
available_columns = {row[1] for row in cur}
required_columns = [
"domainID",
"registrableDomain",
"lastSeen",
"hadUserInteraction",
]
if not set(required_columns).issubset(available_columns):
return
optional_columns = [
column
for column in [
"mostRecentUserInteractionTime",
"mostRecentWebPushInteractionTime",
]
if column in available_columns
]
selected_columns = required_columns + optional_columns
cur.execute(
"""
SELECT
domainID,
registrableDomain,
lastSeen,
hadUserInteraction
from ObservedDomains;
"""
f"SELECT {', '.join(selected_columns)} FROM ObservedDomains;"
)
except sqlite3.OperationalError:
return
for row in cur:
self.results.append(
{
"domain_id": row[0],
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_unix_to_iso(row[2]),
"domain": domain,
"path": path,
}
)
result = {
"domain_id": row[0],
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_unix_to_iso(row[2]),
"domain": domain,
"path": path,
}
for index, column in enumerate(optional_columns, start=4):
field = {
"mostRecentUserInteractionTime": (
"most_recent_user_interaction_time"
),
"mostRecentWebPushInteractionTime": (
"most_recent_web_push_interaction_time"
),
}[column]
timestamp = row[index]
result[field] = timestamp
if timestamp is not None and timestamp >= 0:
result[f"{field}_isodate"] = convert_unix_to_iso(timestamp)
self.results.append(result)
finally:
cur.close()
conn.close()
@@ -39,7 +39,7 @@ class WebkitSessionResourceLog(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -50,7 +50,7 @@ class WebkitSessionResourceLog(IOSExtraction):
results=results,
)
self.results: dict = {}
self.results: dict = results if results is not None else {}
@staticmethod
def _extract_domains(entries):
@@ -77,14 +77,21 @@ class WebkitSessionResourceLog(IOSExtraction):
entry["redirect_destination"]
)
# TODO: Currently not used.
# subframe_origins = self._extract_domains(
# entry["subframe_under_origin"])
# subresource_domains = self._extract_domains(
# entry["subresource_under_origin"])
subframe_origins = self._extract_domains(
entry["subframe_under_origin"]
)
subresource_domains = self._extract_domains(
entry["subresource_under_origin"]
)
all_origins = list(
set([entry["origin"]] + source_domains + destination_domains)
set(
[entry["origin"]]
+ source_domains
+ destination_domains
+ subframe_origins
+ subresource_domains
)
)
ioc_match = self.indicators.check_urls(all_origins)
+1 -1
View File
@@ -33,7 +33,7 @@ class Whatsapp(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
+4 -7
View File
@@ -30,7 +30,7 @@ class NetBase(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[ModuleResults] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -322,14 +322,11 @@ class NetBase(IOSExtraction):
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
def check_indicators(self) -> None:
# Check for manipulated process records.
# TODO: Catching KeyError for live_isodate for retro-compatibility.
# This is not very good.
try:
# check_manipulated/find_deleted require "live_isodate" and
# "live_proc_id" keys which may be absent in older result formats.
if self.results and "live_isodate" in self.results[0]:
self.check_manipulated()
self.find_deleted()
except KeyError:
pass
if not self.indicators:
return
@@ -5,6 +5,7 @@
import logging
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from mvt.common.alerts import AlertLevel
from mvt.common.indicators import Indicators
from ..utils import get_artifact
@@ -38,6 +39,19 @@ class TestDumpsysAccessibilityArtifact:
assert da.results[0]["package_name"] == "com.malware.accessibility"
assert da.results[0]["service"] == "com.malware.service.malwareservice"
def test_accessibility_service_alert(self):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility_v14_or_later.txt")
with open(file) as f:
data = f.read()
da.parse(data)
da.check_indicators()
assert len(da.alertstore.alerts) == 1
assert da.alertstore.alerts[0].level == AlertLevel.MEDIUM
assert da.alertstore.alerts[0].event == da.results[0]
def test_ioc_check(self, indicator_file):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility.txt")
@@ -51,4 +65,12 @@ class TestDumpsysAccessibilityArtifact:
da.indicators = ind
assert len(da.alertstore.alerts) == 0
da.check_indicators()
assert len(da.alertstore.alerts) == 1
assert len(da.alertstore.alerts) == len(da.results)
assert da.alertstore.count(AlertLevel.MEDIUM) == 3
assert da.alertstore.count(AlertLevel.CRITICAL) == 1
critical_alert = next(
alert
for alert in da.alertstore.alerts
if alert.level == AlertLevel.CRITICAL
)
assert critical_alert.event["package_name"] == "com.sec.android.app.camera"
+2 -1
View File
@@ -21,4 +21,5 @@ class TestSettingsModule:
run_module(m)
assert len(m.results) == 1
assert "random" in m.results.keys()
assert len(m.alertstore.alerts) == 0
assert len(m.alertstore.alerts) == 1
assert "samsung_errorlog_agree" in m.alertstore.alerts[0].message
@@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import sqlite3
from mvt.common.module import run_module
from mvt.ios.modules.mixed.webkit_resource_load_statistics import (
WebkitResourceLoadStatistics,
@@ -19,3 +21,50 @@ class TestWebkitResourceLoadStatisticsModule:
assert len(m.results) == 2
assert len(m.timeline) == 2
assert len(m.alertstore.alerts) == 0
results = {result["registrable_domain"]: result for result in m.results}
assert results["google.com"]["most_recent_user_interaction_time"] > 0
assert "most_recent_user_interaction_time_isodate" in results["google.com"]
assert results["gstatic.com"]["most_recent_user_interaction_time"] == -1.0
assert (
"most_recent_user_interaction_time_isodate"
not in results["gstatic.com"]
)
assert all(
"most_recent_web_push_interaction_time" not in result
for result in m.results
)
def test_webkit_full_timestamp_schema(self, tmp_path):
db_path = tmp_path / "observations.db"
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE ObservedDomains (
domainID INTEGER PRIMARY KEY,
registrableDomain TEXT NOT NULL,
lastSeen REAL NOT NULL,
hadUserInteraction INTEGER NOT NULL,
mostRecentUserInteractionTime REAL NOT NULL,
mostRecentWebPushInteractionTime REAL NOT NULL
);
"""
)
conn.execute(
"""
INSERT INTO ObservedDomains VALUES (?, ?, ?, ?, ?, ?);
""",
(1, "example.com", 1634560250.0, 1, 1634560030.0, -1.0),
)
conn.commit()
conn.close()
m = WebkitResourceLoadStatistics(target_path=str(tmp_path))
m._process_observations_db(str(db_path), "", "observations.db")
assert len(m.results) == 1
result = m.results[0]
assert result["most_recent_user_interaction_time"] == 1634560030.0
assert "most_recent_user_interaction_time_isodate" in result
assert result["most_recent_web_push_interaction_time"] == -1.0
assert "most_recent_web_push_interaction_time_isodate" not in result