Merge branch 'main' into feature/tombstone-parser

This commit is contained in:
Donncha Ó Cearbhaill
2025-02-03 18:44:00 +01:00
45 changed files with 683 additions and 107 deletions
+40 -7
View File
@@ -11,6 +11,10 @@ from mvt.common.utils import convert_datetime_to_iso
from .artifact import AndroidArtifact
RISKY_PERMISSIONS = ["REQUEST_INSTALL_PACKAGES"]
RISKY_PACKAGES = ["com.android.shell"]
class DumpsysAppopsArtifact(AndroidArtifact):
"""
Parser for dumpsys app ops info
@@ -45,15 +49,39 @@ class DumpsysAppopsArtifact(AndroidArtifact):
self.detected.append(result)
continue
detected_permissions = []
for perm in result["permissions"]:
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
perm["name"] in RISKY_PERMISSIONS
# and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
result["package_name"],
)
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Package '%s' had risky permission '%s' set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
entry["timestamp"],
)
elif result["package_name"] in RISKY_PACKAGES:
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Risky package '%s' had '%s' permission set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
entry["timestamp"],
)
if detected_permissions:
# We clean the result to only include the risky permission, otherwise the timeline
# will be polluted with all the other irrelevant permissions
cleaned_result = result.copy()
cleaned_result["permissions"] = detected_permissions
self.detected.append(cleaned_result)
def parse(self, output: str) -> None:
self.results: List[Dict[str, Any]] = []
@@ -121,11 +149,16 @@ class DumpsysAppopsArtifact(AndroidArtifact):
if line.startswith(" "):
# Permission entry like:
# Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms)
access_type = line.split(":")[0].strip()
if access_type not in ["Access", "Reject"]:
# Skipping invalid access type. Some entries are not in the format we expect
continue
if entry:
perm["entries"].append(entry)
entry = {}
entry["access"] = line.split(":")[0].strip()
entry["access"] = access_type
entry["type"] = line[line.find("[") + 1 : line.find("]")]
try:
@@ -16,8 +16,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
'Found an installed package related to rooting/jailbreaking: "%s"',
result["package_name"],
)
self.detected.append(result)
@@ -0,0 +1,42 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .artifact import AndroidArtifact
class DumpsysPlatformCompatArtifact(AndroidArtifact):
"""
Parser for uninstalled apps listed in platform_compat section.
"""
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, data: str) -> None:
for line in data.splitlines():
if not line.startswith("ChangeId(168419799; name=DOWNSCALED;"):
continue
if line.strip() == "":
break
# Look for rawOverrides field
if "rawOverrides={" in line:
# Extract the content inside the braces for rawOverrides
overrides_field = line.split("rawOverrides={", 1)[1].split("};", 1)[0]
for entry in overrides_field.split(", "):
# Extract app name
uninstall_app = entry.split("=")[0].strip()
self.results.append({"package_name": uninstall_app})
+5
View File
@@ -16,6 +16,11 @@ ANDROID_DANGEROUS_SETTINGS = [
"key": "package_verifier_enable",
"safe_value": "1",
},
{
"description": "disabled APK package verification",
"key": "package_verifier_state",
"safe_value": "1",
},
{
"description": "disabled Google Play Protect",
"key": "package_verifier_user_consent",
+1 -2
View File
@@ -326,8 +326,7 @@ class AndroidExtraction(MVTModule):
if not header["backup"]:
self.log.error(
"Extracting SMS via Android backup failed. "
"No valid backup data found."
"Extracting SMS via Android backup failed. No valid backup data found."
)
return None
+1 -2
View File
@@ -75,8 +75,7 @@ class Packages(AndroidExtraction):
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
'Found an installed package related to rooting/jailbreaking: "%s"',
result["package_name"],
)
self.detected.append(result)
+1 -1
View File
@@ -70,7 +70,7 @@ class SMS(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"sms_{record['direction']}",
"data": f"{record.get('address', 'unknown source')}: \"{body}\"",
"data": f'{record.get("address", "unknown source")}: "{body}"',
}
def check_indicators(self) -> None:
@@ -14,6 +14,7 @@ from .dumpsys_receivers import DumpsysReceivers
from .dumpsys_adb import DumpsysADBState
from .getprop import Getprop
from .packages import Packages
from .dumpsys_platform_compat import DumpsysPlatformCompat
from .processes import Processes
from .settings import Settings
from .sms import SMS
@@ -29,6 +30,7 @@ ANDROIDQF_MODULES = [
DumpsysBatteryHistory,
DumpsysADBState,
Packages,
DumpsysPlatformCompat,
Processes,
Getprop,
Settings,
@@ -0,0 +1,44 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
from .base import AndroidQFModule
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, AndroidQFModule):
"""This module extracts details on uninstalled apps."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
self.parse(content)
self.log.info("Found %d uninstalled apps", len(self.results))
+1 -1
View File
@@ -74,7 +74,7 @@ class Files(AndroidQFModule):
for result in self.results:
ioc = self.indicators.check_file_path(result["path"])
if ioc:
result["matched_indicator"] == ioc
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@@ -44,8 +44,7 @@ class Packages(AndroidQFModule):
for result in self.results:
if result["name"] in ROOT_PACKAGES:
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
'Found an installed package related to rooting/jailbreaking: "%s"',
result["name"],
)
self.detected.append(result)
+9 -8
View File
@@ -3,10 +3,11 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
from rich.prompt import Prompt
from mvt.common.config import settings
MVT_ANDROID_BACKUP_PASSWORD = "MVT_ANDROID_BACKUP_PASSWORD"
@@ -16,24 +17,24 @@ def cli_load_android_backup_password(log, backup_password):
Used in MVT CLI command parsers.
"""
password_from_env = os.environ.get(MVT_ANDROID_BACKUP_PASSWORD, None)
password_from_env_or_config = settings.ANDROID_BACKUP_PASSWORD
if backup_password:
log.info(
"Your password may be visible in the process table because it "
"was supplied on the command line!"
)
if password_from_env:
if password_from_env_or_config:
log.info(
"Ignoring %s environment variable, using --backup-password argument instead",
MVT_ANDROID_BACKUP_PASSWORD,
"MVT_ANDROID_BACKUP_PASSWORD",
)
return backup_password
elif password_from_env:
elif password_from_env_or_config:
log.info(
"Using backup password from %s environment variable",
MVT_ANDROID_BACKUP_PASSWORD,
"Using backup password from %s environment variable or config file",
"MVT_ANDROID_BACKUP_PASSWORD",
)
return password_from_env
return password_from_env_or_config
def prompt_or_load_android_backup_password(log, module_options):
@@ -11,6 +11,7 @@ from .battery_history import BatteryHistory
from .dbinfo import DBInfo
from .getprop import Getprop
from .packages import Packages
from .platform_compat import PlatformCompat
from .receivers import Receivers
from .adb_state import DumpsysADBState
@@ -23,6 +24,7 @@ BUGREPORT_MODULES = [
DBInfo,
Getprop,
Packages,
PlatformCompat,
Receivers,
DumpsysADBState,
]
@@ -0,0 +1,48 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
from mvt.android.modules.bugreport.base import BugReportModule
class PlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
"""This module extracts details on uninstalled apps."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
data = self._get_dumpstate_file()
if not data:
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
data = data.decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
self.parse(content)
self.log.info("Found %d uninstalled apps", len(self.results))
+4 -3
View File
@@ -17,6 +17,7 @@ from mvt.common.utils import (
generate_hashes_from_path,
get_sha256_from_file_path,
)
from mvt.common.config import settings
from mvt.common.version import MVT_VERSION
@@ -81,7 +82,7 @@ class Command:
os.path.join(self.results_path, "command.log")
)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - " "%(levelname)s - %(message)s"
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
@@ -132,7 +133,7 @@ class Command:
if ioc_file_path and ioc_file_path not in info["ioc_files"]:
info["ioc_files"].append(ioc_file_path)
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
if self.target_path and (settings.HASH_FILES or self.hashes):
self.generate_hashes()
info["hashes"] = self.hash_values
@@ -141,7 +142,7 @@ class Command:
with open(info_path, "w+", encoding="utf-8") as handle:
json.dump(info, handle, indent=4)
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
if self.target_path and (settings.HASH_FILES or self.hashes):
info_hash = get_sha256_from_file_path(info_path)
self.log.info('Reference hash of the info.json file: "%s"', info_hash)
+105
View File
@@ -0,0 +1,105 @@
import os
import yaml
import json
from typing import Tuple, Type, Optional
from appdirs import user_config_dir
from pydantic import AnyHttpUrl, Field
from pydantic_settings import (
BaseSettings,
InitSettingsSource,
PydanticBaseSettingsSource,
SettingsConfigDict,
YamlConfigSettingsSource,
)
MVT_CONFIG_FOLDER = user_config_dir("mvt")
MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml")
class MVTSettings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="MVT_",
env_nested_delimiter="_",
extra="ignore",
nested_model_default_partial_updates=True,
)
# Allow to decided if want to load environment variables
load_env: bool = Field(True, exclude=True)
# General settings
PYPI_UPDATE_URL: AnyHttpUrl = Field(
"https://pypi.org/pypi/mvt/json",
validate_default=False,
)
NETWORK_ACCESS_ALLOWED: bool = True
NETWORK_TIMEOUT: int = 15
# Command default settings, all can be specified by MVT_ prefixed environment variables too.
IOS_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt iOS backups"
)
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt Android backups"
)
STIX2: Optional[str] = Field(
None, description="List of directories where STIX2 files are stored"
)
VT_API_KEY: Optional[str] = Field(
None, description="API key to use for VirusTotal lookups"
)
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[BaseSettings],
init_settings: InitSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
sources = (
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
init_settings,
)
# Load env variables if enabled
if init_settings.init_kwargs.get("load_env", True):
sources = (env_settings,) + sources
return sources
def save_settings(
self,
) -> None:
"""
Save the current settings to a file.
"""
if not os.path.isdir(MVT_CONFIG_FOLDER):
os.makedirs(MVT_CONFIG_FOLDER)
# Dump the settings to the YAML file
model_serializable = json.loads(self.model_dump_json(exclude_defaults=True))
with open(MVT_CONFIG_PATH, "w") as config_file:
config_file.write(yaml.dump(model_serializable, default_flow_style=False))
@classmethod
def initialise(cls) -> "MVTSettings":
"""
Initialise the settings file.
We first initialise the settings (without env variable) and then persist
them to file. This way we can update the config file with the default values.
Afterwards we load the settings again, this time including the env variables.
"""
# Set invalid env prefix to avoid loading env variables.
settings = MVTSettings(load_env=False)
settings.save_settings()
# Load the settings again with any ENV variables.
settings = MVTSettings(load_env=True)
return settings
settings = MVTSettings.initialise()
+5 -5
View File
@@ -14,6 +14,7 @@ import ahocorasick
from appdirs import user_data_dir
from .url import URL
from .config import settings
MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
@@ -41,12 +42,12 @@ class Indicators:
def _check_stix2_env_variable(self) -> None:
"""
Checks if a variable MVT_STIX2 contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
"""
if "MVT_STIX2" not in os.environ:
if not settings.STIX2:
return
paths = os.environ["MVT_STIX2"].split(":")
paths = settings.STIX2.split(":")
for path in paths:
if os.path.isfile(path) and path.lower().endswith(".stix2"):
self.parse_stix2(path)
@@ -383,8 +384,7 @@ class Indicators:
for ioc in self.get_iocs("urls"):
if ioc["value"] == url:
self.log.warning(
"Found a known suspicious URL %s "
'matching indicator "%s" from "%s"',
'Found a known suspicious URL %s matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
+2 -1
View File
@@ -14,6 +14,7 @@ from packaging import version
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
from .version import MVT_VERSION
from .config import settings
log = logging.getLogger(__name__)
@@ -23,7 +24,7 @@ INDICATORS_CHECK_FREQUENCY = 12
class MVTUpdates:
def check(self) -> str:
res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15)
res = requests.get(settings.PYPI_UPDATE_URL, timeout=15)
data = res.json()
latest_version = data.get("info", {}).get("version", "")
+2 -1
View File
@@ -13,6 +13,7 @@ import re
from typing import Any, Iterator, Union
from rich.logging import RichHandler
from mvt.common.config import settings
class CustomJSONEncoder(json.JSONEncoder):
@@ -256,7 +257,7 @@ def set_verbose_logging(verbose: bool = False):
def exec_or_profile(module, globals, locals):
"""Hook for profiling MVT modules"""
if int(os.environ.get("MVT_PROFILE", False)):
if settings.PROFILE:
cProfile.runctx(module, globals, locals)
else:
exec(module, globals, locals)
+1 -1
View File
@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "2.5.4"
MVT_VERSION = "2.6.0"
+3 -4
View File
@@ -100,7 +100,7 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
if key_file:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info(
"Ignoring %s environment variable, using --key-file" "'%s' instead",
"Ignoring %s environment variable, using --key-file'%s' instead",
MVT_IOS_BACKUP_PASSWORD,
key_file,
)
@@ -114,7 +114,7 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info(
"Ignoring %s environment variable, using --password" "argument instead",
"Ignoring %s environment variable, using --passwordargument instead",
MVT_IOS_BACKUP_PASSWORD,
)
@@ -168,8 +168,7 @@ def extract_key(password, key_file, backup_path):
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info(
"Ignoring %s environment variable, using --password "
"argument instead",
"Ignoring %s environment variable, using --password argument instead",
MVT_IOS_BACKUP_PASSWORD,
)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
+20
View File
@@ -1083,5 +1083,25 @@
{
"version": "18.0.1",
"build": "22A3370"
},
{
"version": "18.1",
"build": "22B83"
},
{
"version": "18.1.1",
"build": "22B91"
},
{
"version": "18.2",
"build": "22C152"
},
{
"version": "18.2.1",
"build": "22C161"
},
{
"version": "18.3",
"build": "22D63"
}
]
+1 -1
View File
@@ -41,7 +41,7 @@ class BackupInfo(IOSExtraction):
info_path = os.path.join(self.target_path, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError(
"No Info.plist at backup path, unable to extract device " "information"
"No Info.plist at backup path, unable to extract device information"
)
with open(info_path, "rb") as handle:
+1 -2
View File
@@ -110,8 +110,7 @@ class Manifest(IOSExtraction):
ioc = self.indicators.check_url(part)
if ioc:
self.log.warning(
'Found mention of domain "%s" in a backup file with '
"path: %s",
'Found mention of domain "%s" in a backup file with path: %s',
ioc["value"],
rel_path,
)
+1 -1
View File
@@ -74,7 +74,7 @@ class IOSExtraction(MVTModule):
if not shutil.which("sqlite3"):
raise DatabaseCorruptedError(
"failed to recover without sqlite3 binary: please install " "sqlite3!"
"failed to recover without sqlite3 binary: please install sqlite3!"
)
if '"' in file_path:
raise DatabaseCorruptedError(
+10 -6
View File
@@ -17,6 +17,12 @@ from mvt.ios.modules.base import IOSExtraction
APPLICATIONS_DB_PATH = [
"private/var/containers/Bundle/Application/*/iTunesMetadata.plist"
]
KNOWN_APP_INSTALLERS = [
"com.apple.AppStore",
"com.apple.AppStore.ProductPageExtension",
"com.apple.dmd",
"dmd",
]
class Applications(IOSExtraction):
@@ -80,12 +86,10 @@ class Applications(IOSExtraction):
self.detected.append(result)
continue
# Some apps installed from apple store with sourceApp "com.apple.AppStore.ProductPageExtension"
if result.get("sourceApp", "com.apple.AppStore") not in [
"com.apple.AppStore",
"com.apple.AppStore.ProductPageExtension",
"com.apple.dmd",
"dmd",
]:
if (
result.get("sourceApp", "com.apple.AppStore")
not in KNOWN_APP_INSTALLERS
):
self.log.warning(
"Suspicious app not installed from the App Store or MDM: %s",
result["softwareVersionBundleId"],
+1 -1
View File
@@ -43,7 +43,7 @@ class SMS(IOSExtraction):
def serialize(self, record: dict) -> Union[dict, list]:
text = record["text"].replace("\n", "\\n")
sms_data = f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})"
sms_data = f'{record["service"]}: {record["guid"]} "{text}" from {record["phone_number"]} ({record["account"]})'
records = [
{
"timestamp": record["isodate"],
@@ -100,7 +100,7 @@ class WebkitSessionResourceLog(IOSExtraction):
redirect_path += ", ".join(source_domains)
redirect_path += " -> "
redirect_path += f"ORIGIN: \"{entry['origin']}\""
redirect_path += f'ORIGIN: "{entry["origin"]}"'
if len(destination_domains) > 0:
redirect_path += " -> "
+68 -42
View File
@@ -38,44 +38,70 @@ class NetBase(IOSExtraction):
def _extract_net_data(self):
conn = sqlite3.connect(self.file_path)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
try:
cur.execute(
"""
SELECT
ZPROCESS.ZFIRSTTIMESTAMP,
ZPROCESS.ZTIMESTAMP,
ZPROCESS.ZPROCNAME,
ZPROCESS.ZBUNDLENAME,
ZPROCESS.Z_PK AS ZPROCESS_PK,
ZLIVEUSAGE.ZWIFIIN,
ZLIVEUSAGE.ZWIFIOUT,
ZLIVEUSAGE.ZWWANIN,
ZLIVEUSAGE.ZWWANOUT,
ZLIVEUSAGE.Z_PK AS ZLIVEUSAGE_PK,
ZLIVEUSAGE.ZHASPROCESS,
ZLIVEUSAGE.ZTIMESTAMP AS ZL_TIMESTAMP
FROM ZLIVEUSAGE
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
UNION
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
NULL, NULL, NULL, NULL, NULL, NULL, NULL
FROM ZPROCESS WHERE Z_PK NOT IN
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
"""
SELECT
ZPROCESS.ZFIRSTTIMESTAMP,
ZPROCESS.ZTIMESTAMP,
ZPROCESS.ZPROCNAME,
ZPROCESS.ZBUNDLENAME,
ZPROCESS.Z_PK,
ZLIVEUSAGE.ZWIFIIN,
ZLIVEUSAGE.ZWIFIOUT,
ZLIVEUSAGE.ZWWANIN,
ZLIVEUSAGE.ZWWANOUT,
ZLIVEUSAGE.Z_PK,
ZLIVEUSAGE.ZHASPROCESS,
ZLIVEUSAGE.ZTIMESTAMP
FROM ZLIVEUSAGE
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
UNION
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
NULL, NULL, NULL, NULL, NULL, NULL, NULL
FROM ZPROCESS WHERE Z_PK NOT IN
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
"""
)
)
except sqlite3.OperationalError:
# Recent phones don't have ZWIFIIN and ZWIFIOUT columns
cur.execute(
"""
SELECT
ZPROCESS.ZFIRSTTIMESTAMP,
ZPROCESS.ZTIMESTAMP,
ZPROCESS.ZPROCNAME,
ZPROCESS.ZBUNDLENAME,
ZPROCESS.Z_PK AS ZPROCESS_PK,
ZLIVEUSAGE.ZWWANIN,
ZLIVEUSAGE.ZWWANOUT,
ZLIVEUSAGE.Z_PK AS ZLIVEUSAGE_PK,
ZLIVEUSAGE.ZHASPROCESS,
ZLIVEUSAGE.ZTIMESTAMP AS ZL_TIMESTAMP
FROM ZLIVEUSAGE
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
UNION
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
NULL, NULL, NULL, NULL, NULL
FROM ZPROCESS WHERE Z_PK NOT IN
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
"""
)
for row in cur:
# ZPROCESS records can be missing after the JOIN.
# Handle NULL timestamps.
if row[0] and row[1]:
first_isodate = convert_mactime_to_iso(row[0])
isodate = convert_mactime_to_iso(row[1])
if row["ZFIRSTTIMESTAMP"] and row["ZTIMESTAMP"]:
first_isodate = convert_mactime_to_iso(row["ZFIRSTTIMESTAMP"])
isodate = convert_mactime_to_iso(row["ZTIMESTAMP"])
else:
first_isodate = row[0]
isodate = row[1]
first_isodate = row["ZFIRSTTIMESTAMP"]
isodate = row["ZTIMESTAMP"]
if row[11]:
live_timestamp = convert_mactime_to_iso(row[11])
if row["ZL_TIMESTAMP"]:
live_timestamp = convert_mactime_to_iso(row["ZL_TIMESTAMP"])
else:
live_timestamp = ""
@@ -83,16 +109,18 @@ class NetBase(IOSExtraction):
{
"first_isodate": first_isodate,
"isodate": isodate,
"proc_name": row[2],
"bundle_id": row[3],
"proc_id": row[4],
"wifi_in": row[5],
"wifi_out": row[6],
"wwan_in": row[7],
"wwan_out": row[8],
"live_id": row[9],
"live_proc_id": row[10],
"live_isodate": live_timestamp if row[11] else first_isodate,
"proc_name": row["ZPROCNAME"],
"bundle_id": row["ZBUNDLENAME"],
"proc_id": row["ZPROCESS_PK"],
"wifi_in": row["ZWIFIIN"] if "ZWIFIIN" in row.keys() else None,
"wifi_out": row["ZWIFIOUT"] if "ZWIFIOUT" in row.keys() else None,
"wwan_in": row["ZWWANIN"],
"wwan_out": row["ZWWANOUT"],
"live_id": row["ZLIVEUSAGE_PK"],
"live_proc_id": row["ZHASPROCESS"],
"live_isodate": live_timestamp
if row["ZL_TIMESTAMP"]
else first_isodate,
}
)
@@ -108,8 +136,6 @@ class NetBase(IOSExtraction):
)
record_data_usage = (
record_data + " "
f"WIFI IN: {record['wifi_in']}, "
f"WIFI OUT: {record['wifi_out']} - "
f"WWAN IN: {record['wwan_in']}, "
f"WWAN OUT: {record['wwan_out']}"
)