mirror of
https://github.com/mvt-project/mvt.git
synced 2026-03-26 13:30:21 +01:00
Compare commits
7 Commits
todos
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6757e212b | ||
|
|
5cba61b180 | ||
|
|
29475acb47 | ||
|
|
1d5c83582c | ||
|
|
85b26c2dbd | ||
|
|
2dd1428787 | ||
|
|
3e212876a7 |
@@ -2,4 +2,4 @@ mkdocs==1.6.1
|
||||
mkdocs-autorefs==1.4.3
|
||||
mkdocs-material==9.6.20
|
||||
mkdocs-material-extensions==1.3.1
|
||||
mkdocstrings==0.30.1
|
||||
mkdocstrings==1.0.0
|
||||
@@ -17,7 +17,7 @@ classifiers = [
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
dependencies = [
|
||||
"click==8.3.0",
|
||||
"click==8.3.1",
|
||||
"rich==14.1.0",
|
||||
"tld==0.13.1",
|
||||
"requests==2.32.5",
|
||||
@@ -27,15 +27,15 @@ dependencies = [
|
||||
"iOSbackup==0.9.925",
|
||||
"adb-shell[usb]==0.4.4",
|
||||
"libusb1==3.3.1",
|
||||
"cryptography==46.0.3",
|
||||
"cryptography==46.0.5",
|
||||
"PyYAML>=6.0.2",
|
||||
"pyahocorasick==2.2.0",
|
||||
"betterproto==1.2.5",
|
||||
"pydantic==2.12.3",
|
||||
"pydantic==2.12.5",
|
||||
"pydantic-settings==2.10.1",
|
||||
"NSKeyedUnArchiver==1.5.2",
|
||||
"python-dateutil==2.9.0.post0",
|
||||
"tzdata==2025.2",
|
||||
"tzdata==2025.3",
|
||||
]
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
|
||||
@@ -186,7 +186,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
package = []
|
||||
|
||||
in_package_list = False
|
||||
for line in content.splitlines():
|
||||
for line in content.split("\n"):
|
||||
if line.startswith("Packages:"):
|
||||
in_package_list = True
|
||||
continue
|
||||
|
||||
@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class Processes(AndroidArtifact):
|
||||
def parse(self, entry: str) -> None:
|
||||
for line in entry.splitlines()[1:]:
|
||||
for line in entry.split("\n")[1:]:
|
||||
proc = line.split()
|
||||
|
||||
# Skip empty lines
|
||||
|
||||
@@ -193,7 +193,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
# eg. "Process uptime: 40s"
|
||||
tombstone[destination_key] = int(value_clean.rstrip("s"))
|
||||
elif destination_key == "command_line":
|
||||
# Wrap in list for consistency with protobuf format (repeated string).
|
||||
# XXX: Check if command line should be a single string in a list, or a list of strings.
|
||||
tombstone[destination_key] = [value_clean]
|
||||
else:
|
||||
tombstone[destination_key] = value_clean
|
||||
|
||||
@@ -117,6 +117,8 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose)
|
||||
if from_file:
|
||||
download = DownloadAPKs.from_json(from_file)
|
||||
else:
|
||||
# TODO: Do we actually want to be able to run without storing any
|
||||
# file?
|
||||
if not output:
|
||||
log.critical("You need to specify an output folder with --output!")
|
||||
ctx.exit(1)
|
||||
|
||||
@@ -105,15 +105,15 @@ class AQFFiles(AndroidQFModule):
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
for hash_key in ("sha256", "sha1", "md5"):
|
||||
file_hash = result.get(hash_key, "")
|
||||
if not file_hash:
|
||||
continue
|
||||
ioc = self.indicators.check_file_hash(file_hash)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
break
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_file_hash(result["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
|
||||
def run(self) -> None:
|
||||
if timezone := self._get_device_timezone():
|
||||
@@ -128,7 +128,7 @@ class AQFFiles(AndroidQFModule):
|
||||
data = json.loads(rawdata)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = []
|
||||
for line in rawdata.splitlines():
|
||||
for line in rawdata.split("\n"):
|
||||
if line.strip() == "":
|
||||
continue
|
||||
data.append(json.loads(line))
|
||||
@@ -139,7 +139,7 @@ class AQFFiles(AndroidQFModule):
|
||||
utc_timestamp = datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
# Convert the UTC timestamp to local time on Android device's local timezone
|
||||
# Convert the UTC timestamp to local tiem on Android device's local timezone
|
||||
local_timestamp = utc_timestamp.astimezone(device_timezone)
|
||||
|
||||
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
|
||||
|
||||
@@ -39,7 +39,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
|
||||
self.results[namespace] = {}
|
||||
data = self._get_file_content(setting_file)
|
||||
for line in data.decode("utf-8").splitlines():
|
||||
for line in data.decode("utf-8").split("\n"):
|
||||
line = line.strip()
|
||||
try:
|
||||
key, value = line.split("=", 1)
|
||||
|
||||
@@ -222,6 +222,7 @@ class Command:
|
||||
if self.module_name and module.__name__ != self.module_name:
|
||||
continue
|
||||
|
||||
# FIXME: do we need the logger here
|
||||
module_logger = logging.getLogger(module.__module__)
|
||||
|
||||
m = module(
|
||||
|
||||
@@ -180,8 +180,10 @@ class IndicatorsUpdates:
|
||||
def _get_remote_file_latest_commit(
|
||||
self, owner: str, repo: str, branch: str, path: str
|
||||
) -> int:
|
||||
# TODO: The branch is currently not taken into consideration.
|
||||
# How do we specify which branch to look up to the API?
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
|
||||
@@ -119,9 +119,10 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
|
||||
if from_2001:
|
||||
timestamp = timestamp + 978307200
|
||||
|
||||
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
|
||||
try:
|
||||
return convert_unix_to_utc_datetime(timestamp)
|
||||
except (OSError, OverflowError, ValueError):
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -87,35 +87,6 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_key(d: dict, key: str) -> None:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_keys(d: dict, keys: list) -> None:
|
||||
for key in keys:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
def _b64encode_plist_bytes(self, plist: dict) -> None:
|
||||
"""Encode binary plist values to base64 for JSON serialization."""
|
||||
if "SignerCerts" in plist:
|
||||
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
|
||||
|
||||
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
|
||||
|
||||
if "OTAProfileStub" in plist:
|
||||
stub = plist["OTAProfileStub"]
|
||||
if "SignerCerts" in stub:
|
||||
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
|
||||
if "PayloadContent" in stub:
|
||||
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
|
||||
|
||||
if "PayloadContent" in plist:
|
||||
for entry in plist["PayloadContent"]:
|
||||
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
|
||||
|
||||
def run(self) -> None:
|
||||
for conf_file in self._get_backup_files_from_manifest(
|
||||
domain=CONF_PROFILES_DOMAIN
|
||||
@@ -144,7 +115,65 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
except Exception:
|
||||
conf_plist = {}
|
||||
|
||||
self._b64encode_plist_bytes(conf_plist)
|
||||
# TODO: Tidy up the following code hell.
|
||||
|
||||
if "SignerCerts" in conf_plist:
|
||||
conf_plist["SignerCerts"] = [
|
||||
b64encode(x) for x in conf_plist["SignerCerts"]
|
||||
]
|
||||
|
||||
if "OTAProfileStub" in conf_plist:
|
||||
if "SignerCerts" in conf_plist["OTAProfileStub"]:
|
||||
conf_plist["OTAProfileStub"]["SignerCerts"] = [
|
||||
b64encode(x)
|
||||
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
|
||||
]
|
||||
|
||||
if "PayloadContent" in conf_plist["OTAProfileStub"]:
|
||||
if (
|
||||
"EnrollmentIdentityPersistentID"
|
||||
in conf_plist["OTAProfileStub"]["PayloadContent"]
|
||||
):
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
] = b64encode(
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
]
|
||||
)
|
||||
|
||||
if "PushTokenDataSentToServerKey" in conf_plist:
|
||||
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
|
||||
conf_plist["PushTokenDataSentToServerKey"]
|
||||
)
|
||||
|
||||
if "LastPushTokenHash" in conf_plist:
|
||||
conf_plist["LastPushTokenHash"] = b64encode(
|
||||
conf_plist["LastPushTokenHash"]
|
||||
)
|
||||
|
||||
if "PayloadContent" in conf_plist:
|
||||
for content_entry in range(len(conf_plist["PayloadContent"])):
|
||||
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
]
|
||||
)
|
||||
|
||||
if (
|
||||
"IdentityPersistentRef"
|
||||
in conf_plist["PayloadContent"][content_entry]
|
||||
):
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
]
|
||||
)
|
||||
|
||||
self.results.append(
|
||||
{
|
||||
|
||||
@@ -73,7 +73,7 @@ class ShutdownLog(IOSExtraction):
|
||||
recent_processes = []
|
||||
times_delayed = 0
|
||||
delay = 0.0
|
||||
for line in content.splitlines():
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith("remaining client pid:"):
|
||||
|
||||
@@ -11,6 +11,7 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
|
||||
from ..base import IOSExtraction
|
||||
|
||||
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_FAVICON_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
|
||||
]
|
||||
|
||||
@@ -13,6 +13,7 @@ from ..base import IOSExtraction
|
||||
CHROME_HISTORY_BACKUP_IDS = [
|
||||
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
|
||||
]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_HISTORY_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
|
||||
]
|
||||
|
||||
@@ -79,55 +79,32 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# FIXME: table contains extra fields with timestamp here
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction,
|
||||
mostRecentUserInteractionTime,
|
||||
mostRecentWebPushInteractionTime
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
)
|
||||
has_extra_timestamps = True
|
||||
except sqlite3.OperationalError:
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
)
|
||||
has_extra_timestamps = False
|
||||
except sqlite3.OperationalError:
|
||||
return
|
||||
return
|
||||
|
||||
for row in cur:
|
||||
result = {
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
if has_extra_timestamps:
|
||||
result["most_recent_user_interaction_time"] = row[4]
|
||||
result["most_recent_user_interaction_time_isodate"] = (
|
||||
convert_unix_to_iso(row[4])
|
||||
)
|
||||
result["most_recent_web_push_interaction_time"] = row[5]
|
||||
result["most_recent_web_push_interaction_time_isodate"] = (
|
||||
convert_unix_to_iso(row[5])
|
||||
)
|
||||
self.results.append(result)
|
||||
self.results.append(
|
||||
{
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
|
||||
if len(self.results) > 0:
|
||||
self.log.info(
|
||||
|
||||
@@ -76,6 +76,12 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
entry["redirect_destination"]
|
||||
)
|
||||
|
||||
# TODO: Currently not used.
|
||||
# subframe_origins = self._extract_domains(
|
||||
# entry["subframe_under_origin"])
|
||||
# subresource_domains = self._extract_domains(
|
||||
# entry["subresource_under_origin"])
|
||||
|
||||
all_origins = set(
|
||||
[entry["origin"]] + source_domains + destination_domains
|
||||
)
|
||||
|
||||
@@ -311,11 +311,14 @@ class NetBase(IOSExtraction):
|
||||
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
# check_manipulated/find_deleted require "live_isodate" and
|
||||
# "live_proc_id" keys which may be absent in older result formats.
|
||||
if self.results and "live_isodate" in self.results[0]:
|
||||
# Check for manipulated process records.
|
||||
# TODO: Catching KeyError for live_isodate for retro-compatibility.
|
||||
# This is not very good.
|
||||
try:
|
||||
self.check_manipulated()
|
||||
self.find_deleted()
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user