Compare commits

..

1 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
74dd566ee6 Run CI tests against Python3.14 too 2025-12-19 12:50:27 +01:00
25 changed files with 140 additions and 288 deletions

View File

@@ -1,9 +1,14 @@
PWD = $(shell pwd)
autofix:
ruff format .
ruff check --fix .
check: ruff mypy
ruff:
ruff check .
ruff format --check .
ruff check -q .
mypy:
mypy

View File

@@ -2,61 +2,4 @@
Thank you for your interest in reporting security issues and vulnerabilities! Security research is of utmost importance and we take all reports seriously. If you discover an issue please report it to us right away!
Please DO NOT file a public issue, instead send your report privately to the MVT maintainers at Amnesty International via `security [at] amnesty [dot] tech`.
You can also write PGP-encrypted emails to key `CFBF9698DCA8EB2A80F48ADEA035A030FA04ED13`. The corresponding PGP public key is lited below.
```
-----BEGIN PGP PUBLIC KEY BLOCK-----
mQINBGlFPwsBEADQ+d7SeHrFPYv3wPOjWs2oMpp0DPdfIyGbg+iYWOC36FegZhKY
+WeK96GqJWt8wD6kwFUVwQI795WZrjSd1q4a7wR+kj/h7xlRB6ZfVICA6O5DOOm6
GNMvqy7ESm8g1XZDpb2u1BXmSS9X8f6rjB0e86kYsF1mB5/2USTM63jgDs0GGTkZ
Q1z4Mq4gYyqH32b3gvXkbb68LeQmONUIM3cgmec9q8/pNc1l7fcoLWhOVADRj17Q
plisa/EUf/SYqdtk9w7EHGggNenKNwVM235mkPcMqmE72bTpjT6XCxvZY3ByG5yi
7L+tHJU45ZuXtt62EvX03azxThVfSmH/WbRk8lH8+CW8XMmiWZphG4ydPWqgVKCB
2UOXm+6CQnKA+7Dt1AeK2t5ciATrv9LvwgSxk5WKc3288XFLA6eGMrTdQygYlLjJ
+42RSdK/7fCt/qk4q13oUw8ZTVcCia98uZFi704XuuYTH6NrntIB7j/0oucIS4Y9
cTWNO5LBerez4v8VI4YHcYESPeIWGFkXhvJzo0VMg1zidBLtiPoGF2JKZGwaK7/p
yY1xALskLp4H+5OY4eB1kf8kl4vGsEK8xA/NNzOiapVmwBXpvVvmXIQJE2k+olNf
sAuyB8+aO1Ws7tFYt3D+olC7iaprOdK7uA4GCgmYYhq6QQPg+cxfczgHfwARAQAB
tD1TZWN1cml0eSBMYWIgYXQgQW1uZXN0eSBJbnRlcm5hdGlvbmFsIDxzZWN1cml0
eUBhbW5lc3R5LnRlY2g+iQJRBBMBCAA7FiEEz7+WmNyo6yqA9IreoDWgMPoE7RMF
AmlFPwsCGwMFCwkIBwICIgIGFQoJCAsCBBYCAwECHgcCF4AACgkQoDWgMPoE7RNr
2w//a88uP90uSN6lgeIwKsHr1ri27QIBbzCV6hLN/gZBFR2uaiOn/xfFDbnR0Cjo
5nMCJCT1k4nrPbMTlfmWLCD+YKELBzVqWlw4J2SOg3nznPl2JrL8QBKjwts0sF+h
QbRWDsT54wBZnl6ZJJ79eLShNTokBbKnQ7071dMrENr5e2P2sClQXyiIc51ga4FM
fHyhsx+GsrdiZNd2AH8912ljW1GuEi3epTO7KMZprmr37mjpZSUToiV59Yhl1Gbo
2pixkYJqi62DG02/gTpCjq9NH3cEMxcxjh4E7yCA8ggLG6+IN6woIvPIdOsnQ+Yj
d3H4rMNBjPSKoL+bdHILkCnp5HokcbVjNY3QAyOAF4qWhk4GtgpTshwxUmb4Tbay
tWLJC2bzjuUBxLkGzMVFfU3B96sVS4Fi0sBaEMBtHskl2f45X8LJhSq//Lw/2L/8
34uP/RxDSn+DPvj/yqMpekdCcmeFSTX1A19xkPcc0rVhMRde4VL338R86vzh0gMI
1LySDAhXZyVWzrQ5s3n6N3EvCaHCn3qu7ieyFJifCSR7gZqevCEznMQRVpkMTzUt
rk13Z6NOOb4IlTW7HFoY3omJG8Z5jV4kMIE7n6nb0qpNYQiG+YvjenQ3VrMoISyh
lpS2De8+oOtwrxBVX3+qKWvQqzufeE3416kw2Z+5mxH7bx25Ag0EaUU/CwEQALyZ
b+kwLN1yHObTm2yDBEn5HbCT3H1GremvPNmbAaTnfrjUngoKa8MuWWzbX5ptgmZR
UpYY/ylOYcgGydz58vUNrPlhIZT9UhmiifPgZLEXyd0uFpr/NsbRajHMkK10iEZf
h5bHNobiB7pGCu4Uj9e1cMiIZ4yEaYeyXYUoNHf6ISP39mJhHy6ov5yIpm9q0wzm
tGUQPupxGXmEZlOPr3lxqXQ3Ekdv6cWDY5r/oOq71QJ/HUQ13QUuGFIbhnMbT8zd
zaS6f/v772YKsWPc4NNUhtlf25VnQ4FuUtjCe3p6iYP4OVD8gJm0GvXyvyTuiQbL
CSk/378JiNT7nZzYXxrWchMwvEoMIU55+/UaBc50HI5xvDQ858CX7PYGiimcdsO1
EkQzhVxRfjlILfWrC2lgt+H5qhTn4Fah250Xe1PnLjXGHVUQnY/f3MFeiWQgf92b
02+MfvOeC5OKttP1z5lcx6RFWCIa1E/u8Nj7YrH9hk0ZBRAnBaeAncDFY8dfX2zX
VMoc0dV16gM7RrZ6i7D3CG3eLLkQlX0jbW9dzTuG/3f098EWB1p8vOfS/RbNCBRX
jqGiqacL/aFF3Ci3nQ4O5tSv1XipbgrUhvXnwm9pxrLPS/45iaO59WN4RRGWLLQ7
LHmeBxoa9avv0SdBYUL+eBxY46GXb/j5VLzHYhSnABEBAAGJAjYEGAEIACAWIQTP
v5aY3KjrKoD0it6gNaAw+gTtEwUCaUU/CwIbDAAKCRCgNaAw+gTtEyvsEACnyFFD
alOZTrrJTXNnUejuiExLh+qTO3T91p5bte597jpwCZnYGwkxEfffsqqhlY6ftEOf
d5tNWE5isai4v8XCbplWomz4KBpepxcn2b+9o5dSyr1vohEFuCJziZDsta1J2DX5
IE9U48kTgLDfdIBhuOyHNRkvXRHP2OVLCaiw4d9q+hlrraR8pehHt2BJSxh+QZoe
n0iHvIZCBIUA45zLEGmXFpNTGeEf2dKPp3xOkAXOhAMPptE0V1itkF3R7kEW4aFO
SZo8L3C1aWSz/gQ4/vvW5t1IJxirNMUgTMQFvqEkAwX3fm6GCxlgRSvTTRXdcrS8
6qyFdH1nkCNsavPahN3N2RGGIlWtODEMTO1Hjy0kZtTYdW+JH9sendliCoJES+yN
DjM125SgdAgrqlSYm/g8n9knWpxZv1QM6jU/sVz1J+l6/ixugL2i+CAL2d6uv4tT
QmXnu7Ei4/2kHBUu3Lf59MNgmLHm6F7AhOWErszSeoJKsp+3yA1oTT/npz67sRzY
VVyxz4NBIollna59a1lz0RhlWzNKqNB27jhylyM4ltdzHB7r4VMAVJyttozmIIOC
35ucYxl5BHLuapaRSaYHdUId1LOccYyaOOFF/PSyCu9dKzXk7zEz2HNcIboWSkAE
8ZDExMYM4WVpVCOj+frdsaBvzItHacRWuijtkw==
=JAXX
-----END PGP PUBLIC KEY BLOCK-----
```
Please DO NOT file a public issue, instead send your report privately to *nex [at] nex [dot] sx*. You can also write PGP-encrypted emails to [this key](https://keybase.io/nex/pgp_keys.asc?fingerprint=05216f3b86848a303c2fe37dd166f1667359d880).

View File

@@ -17,17 +17,17 @@ classifiers = [
"Programming Language :: Python",
]
dependencies = [
"click==8.3.0",
"click==8.2.1",
"rich==14.1.0",
"tld==0.13.1",
"requests==2.32.5",
"simplejson==3.20.2",
"simplejson==3.20.1",
"packaging==25.0",
"appdirs==1.4.4",
"iOSbackup==0.9.925",
"adb-shell[usb]==0.4.4",
"libusb1==3.3.1",
"cryptography==46.0.3",
"cryptography==45.0.6",
"PyYAML>=6.0.2",
"pyahocorasick==2.2.0",
"betterproto==1.2.5",
@@ -80,7 +80,7 @@ packages = "src"
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
testpaths = ["tests"]
[tool.ruff]
[tool.ruff.lint]
select = ["C90", "E", "F", "W"] # flake8 default set
ignore = [
"E501", # don't enforce line length violations
@@ -95,10 +95,10 @@ ignore = [
# "E203", # whitespace-before-punctuation
]
[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"] # unused-import
[tool.ruff.mccabe]
[tool.ruff.lint.mccabe]
max-complexity = 10
[tool.setuptools]

View File

@@ -14,23 +14,12 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
"""
def serialize(self, record: dict) -> Union[dict, list]:
action = record.get("action", "update")
package_name = record["package_name"]
vers = record["vers"]
if vers == "0":
data = f"Recorded uninstall of package {package_name} (vers 0)"
elif action == "downgrade":
prev_vers = record.get("previous_vers", "unknown")
data = f"Recorded downgrade of package {package_name} from vers {prev_vers} to vers {vers}"
else:
data = f"Recorded update of package {package_name} with vers {vers}"
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
"event": "battery_daily",
"data": data,
"data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}",
}
def check_indicators(self) -> None:
@@ -47,7 +36,6 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
def parse(self, output: str) -> None:
daily = None
daily_updates = []
package_versions = {} # Track package versions to detect downgrades
for line in output.splitlines():
if line.startswith(" Daily from "):
if len(daily_updates) > 0:
@@ -76,44 +64,15 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
break
if not already_seen:
update_record = {
"action": "update",
"from": daily["from"],
"to": daily["to"],
"package_name": package_name,
"vers": vers_nr,
}
# Check for uninstall (version 0)
if vers_nr == "0":
self.log.warning(
"Detected uninstall of package %s (vers 0) on %s",
package_name,
daily["from"],
)
# Check for downgrade
elif package_name in package_versions:
try:
current_vers = int(vers_nr)
previous_vers = int(package_versions[package_name])
if current_vers < previous_vers:
update_record["action"] = "downgrade"
update_record["previous_vers"] = str(previous_vers)
self.log.warning(
"Detected downgrade of package %s from vers %d to vers %d on %s",
package_name,
previous_vers,
current_vers,
daily["from"],
)
except ValueError:
# If version numbers aren't integers, skip comparison
pass
# Update tracking dictionary
package_versions[package_name] = vers_nr
daily_updates.append(update_record)
daily_updates.append(
{
"action": "update",
"from": daily["from"],
"to": daily["to"],
"package_name": package_name,
"vers": vers_nr,
}
)
if len(daily_updates) > 0:
self.results.extend(daily_updates)

View File

@@ -186,7 +186,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
package = []
in_package_list = False
for line in content.splitlines():
for line in content.split("\n"):
if line.startswith("Packages:"):
in_package_list = True
continue

View File

@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
class Processes(AndroidArtifact):
def parse(self, entry: str) -> None:
for line in entry.splitlines()[1:]:
for line in entry.split("\n")[1:]:
proc = line.split()
# Skip empty lines

View File

@@ -193,7 +193,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
# eg. "Process uptime: 40s"
tombstone[destination_key] = int(value_clean.rstrip("s"))
elif destination_key == "command_line":
# Wrap in list for consistency with protobuf format (repeated string).
# XXX: Check if command line should be a single string in a list, or a list of strings.
tombstone[destination_key] = [value_clean]
else:
tombstone[destination_key] = value_clean

View File

@@ -117,6 +117,8 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose)
if from_file:
download = DownloadAPKs.from_json(from_file)
else:
# TODO: Do we actually want to be able to run without storing any
# file?
if not output:
log.critical("You need to specify an output folder with --output!")
ctx.exit(1)

View File

@@ -105,15 +105,15 @@ class AQFFiles(AndroidQFModule):
)
self.detected.append(result)
for hash_key in ("sha256", "sha1", "md5"):
file_hash = result.get(hash_key, "")
if not file_hash:
continue
ioc = self.indicators.check_file_hash(file_hash)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
break
if result.get("sha256", "") == "":
continue
ioc = self.indicators.check_file_hash(result["sha256"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
# TODO: adds SHA1 and MD5 when available in MVT
def run(self) -> None:
if timezone := self._get_device_timezone():
@@ -128,7 +128,7 @@ class AQFFiles(AndroidQFModule):
data = json.loads(rawdata)
except json.decoder.JSONDecodeError:
data = []
for line in rawdata.splitlines():
for line in rawdata.split("\n"):
if line.strip() == "":
continue
data.append(json.loads(line))
@@ -139,7 +139,7 @@ class AQFFiles(AndroidQFModule):
utc_timestamp = datetime.datetime.fromtimestamp(
file_data[ts], tz=datetime.timezone.utc
)
# Convert the UTC timestamp to local time on Android device's local timezone
# Convert the UTC timestamp to local tiem on Android device's local timezone
local_timestamp = utc_timestamp.astimezone(device_timezone)
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we

View File

@@ -39,7 +39,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
self.results[namespace] = {}
data = self._get_file_content(setting_file)
for line in data.decode("utf-8").splitlines():
for line in data.decode("utf-8").split("\n"):
line = line.strip()
try:
key, value = line.split("=", 1)

View File

@@ -84,17 +84,13 @@ class BugReportModule(MVTModule):
return self._get_file_content(main_content.decode().strip())
except KeyError:
return None
else:
dumpstate_logs = self._get_files_by_pattern("dumpState_*.log")
if not dumpstate_logs:
return None
dumpstate_logs = self._get_files_by_pattern("dumpState_*.log")
if dumpstate_logs:
return self._get_file_content(dumpstate_logs[0])
dumpsys_files = self._get_files_by_pattern("*/dumpsys.txt")
if dumpsys_files:
return self._get_file_content(dumpsys_files[0])
return None
def _get_file_modification_time(self, file_path: str) -> dict:
if self.zip_archive:
file_timetuple = self.zip_archive.getinfo(file_path).date_time

View File

@@ -34,20 +34,6 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
self.results = results if results else {}
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
receiver_name = self.results[result][0]["receiver"]
# return IoC if the stix2 process name a substring of the receiver name
ioc = self.indicators.check_receiver_prefix(receiver_name)
if ioc:
self.results[result][0]["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:

View File

@@ -222,6 +222,7 @@ class Command:
if self.module_name and module.__name__ != self.module_name:
continue
# FIXME: do we need the logger here
module_logger = logging.getLogger(module.__module__)
m = module(

View File

@@ -768,30 +768,6 @@ class Indicators:
return None
def check_receiver_prefix(self, receiver_name: str) -> Union[dict, None]:
"""Check the provided receiver name against the list of indicators.
An IoC match is detected when a substring of the receiver matches the indicator
:param app_id: App ID to check against the list of indicators
:type app_id: str
:returns: Indicator details if matched, otherwise None
"""
if not receiver_name:
return None
for ioc in self.get_iocs("app_ids"):
if ioc["value"].lower() in receiver_name.lower():
self.log.warning(
'Found a known suspicious receiver with name "%s" '
'matching indicators from "%s"',
receiver_name,
ioc["name"],
)
return ioc
return None
def check_android_property_name(self, property_name: str) -> Optional[dict]:
"""Check the android property name against the list of indicators.

View File

@@ -180,8 +180,10 @@ class IndicatorsUpdates:
def _get_remote_file_latest_commit(
self, owner: str, repo: str, branch: str, path: str
) -> int:
# TODO: The branch is currently not taken into consideration.
# How do we specify which branch to look up to the API?
file_commit_url = (
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
)
try:
res = requests.get(file_commit_url, timeout=5)

View File

@@ -119,9 +119,10 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
if from_2001:
timestamp = timestamp + 978307200
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
try:
return convert_unix_to_utc_datetime(timestamp)
except (OSError, OverflowError, ValueError):
except Exception:
return None

View File

@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "2.7.0"
MVT_VERSION = "2.6.1"

View File

@@ -631,10 +631,6 @@
"build": "16H81",
"version": "12.5.7"
},
{
"version": "12.5.8",
"build": "16H88"
},
{
"build": "17A577",
"version": "13.0"
@@ -903,10 +899,6 @@
"version": "15.8.5",
"build": "19H394"
},
{
"version": "15.8.6",
"build": "19H402"
},
{
"build": "20A362",
"version": "16.0"
@@ -1016,10 +1008,6 @@
"version": "16.7.12",
"build": "20H364"
},
{
"version": "16.7.14",
"build": "20H370"
},
{
"version": "17.0",
"build": "21A327"
@@ -1176,18 +1164,6 @@
"version": "18.7.3",
"build": "22H217"
},
{
"version": "18.7.4",
"build": "22H218"
},
{
"version": "18.7.5",
"build": "22H311"
},
{
"version": "18.7.6",
"build": "22H320"
},
{
"version": "26",
"build": "23A341"
@@ -1203,17 +1179,5 @@
{
"version": "26.2",
"build": "23C55"
},
{
"version": "26.2.1",
"build": "23C71"
},
{
"version": "26.3",
"build": "23D127"
},
{
"version": "26.3.1",
"build": "23D8133"
}
]

View File

@@ -87,35 +87,6 @@ class ConfigurationProfiles(IOSExtraction):
self.detected.append(result)
continue
@staticmethod
def _b64encode_key(d: dict, key: str) -> None:
if key in d:
d[key] = b64encode(d[key])
@staticmethod
def _b64encode_keys(d: dict, keys: list) -> None:
for key in keys:
if key in d:
d[key] = b64encode(d[key])
def _b64encode_plist_bytes(self, plist: dict) -> None:
"""Encode binary plist values to base64 for JSON serialization."""
if "SignerCerts" in plist:
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
if "OTAProfileStub" in plist:
stub = plist["OTAProfileStub"]
if "SignerCerts" in stub:
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
if "PayloadContent" in stub:
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
if "PayloadContent" in plist:
for entry in plist["PayloadContent"]:
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
def run(self) -> None:
for conf_file in self._get_backup_files_from_manifest(
domain=CONF_PROFILES_DOMAIN
@@ -144,7 +115,65 @@ class ConfigurationProfiles(IOSExtraction):
except Exception:
conf_plist = {}
self._b64encode_plist_bytes(conf_plist)
# TODO: Tidy up the following code hell.
if "SignerCerts" in conf_plist:
conf_plist["SignerCerts"] = [
b64encode(x) for x in conf_plist["SignerCerts"]
]
if "OTAProfileStub" in conf_plist:
if "SignerCerts" in conf_plist["OTAProfileStub"]:
conf_plist["OTAProfileStub"]["SignerCerts"] = [
b64encode(x)
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
]
if "PayloadContent" in conf_plist["OTAProfileStub"]:
if (
"EnrollmentIdentityPersistentID"
in conf_plist["OTAProfileStub"]["PayloadContent"]
):
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
] = b64encode(
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
]
)
if "PushTokenDataSentToServerKey" in conf_plist:
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
conf_plist["PushTokenDataSentToServerKey"]
)
if "LastPushTokenHash" in conf_plist:
conf_plist["LastPushTokenHash"] = b64encode(
conf_plist["LastPushTokenHash"]
)
if "PayloadContent" in conf_plist:
for content_entry in range(len(conf_plist["PayloadContent"])):
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
]
)
if (
"IdentityPersistentRef"
in conf_plist["PayloadContent"][content_entry]
):
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
]
)
self.results.append(
{

View File

@@ -73,7 +73,7 @@ class ShutdownLog(IOSExtraction):
recent_processes = []
times_delayed = 0
delay = 0.0
for line in content.splitlines():
for line in content.split("\n"):
line = line.strip()
if line.startswith("remaining client pid:"):

View File

@@ -11,6 +11,7 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
from ..base import IOSExtraction
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
# TODO: Confirm Chrome database path.
CHROME_FAVICON_ROOT_PATHS = [
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
]

View File

@@ -13,6 +13,7 @@ from ..base import IOSExtraction
CHROME_HISTORY_BACKUP_IDS = [
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
]
# TODO: Confirm Chrome database path.
CHROME_HISTORY_ROOT_PATHS = [
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
]

View File

@@ -79,55 +79,32 @@ class WebkitResourceLoadStatistics(IOSExtraction):
cur = conn.cursor()
try:
# FIXME: table contains extra fields with timestamp here
cur.execute(
"""
SELECT
domainID,
registrableDomain,
lastSeen,
hadUserInteraction,
mostRecentUserInteractionTime,
mostRecentWebPushInteractionTime
hadUserInteraction
from ObservedDomains;
"""
)
has_extra_timestamps = True
except sqlite3.OperationalError:
try:
cur.execute(
"""
SELECT
domainID,
registrableDomain,
lastSeen,
hadUserInteraction
from ObservedDomains;
"""
)
has_extra_timestamps = False
except sqlite3.OperationalError:
return
return
for row in cur:
result = {
"domain_id": row[0],
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_unix_to_iso(row[2]),
"domain": domain,
"path": path,
}
if has_extra_timestamps:
result["most_recent_user_interaction_time"] = row[4]
result["most_recent_user_interaction_time_isodate"] = (
convert_unix_to_iso(row[4])
)
result["most_recent_web_push_interaction_time"] = row[5]
result["most_recent_web_push_interaction_time_isodate"] = (
convert_unix_to_iso(row[5])
)
self.results.append(result)
self.results.append(
{
"domain_id": row[0],
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_unix_to_iso(row[2]),
"domain": domain,
"path": path,
}
)
if len(self.results) > 0:
self.log.info(

View File

@@ -76,6 +76,12 @@ class WebkitSessionResourceLog(IOSExtraction):
entry["redirect_destination"]
)
# TODO: Currently not used.
# subframe_origins = self._extract_domains(
# entry["subframe_under_origin"])
# subresource_domains = self._extract_domains(
# entry["subresource_under_origin"])
all_origins = set(
[entry["origin"]] + source_domains + destination_domains
)

View File

@@ -311,11 +311,14 @@ class NetBase(IOSExtraction):
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
def check_indicators(self) -> None:
# check_manipulated/find_deleted require "live_isodate" and
# "live_proc_id" keys which may be absent in older result formats.
if self.results and "live_isodate" in self.results[0]:
# Check for manipulated process records.
# TODO: Catching KeyError for live_isodate for retro-compatibility.
# This is not very good.
try:
self.check_manipulated()
self.find_deleted()
except KeyError:
pass
if not self.indicators:
return