WIP: Addition of a timer to virustotal checks (#593)

* Add delay option to virustotal checks (#408)

* Fix missing delay argument

* Fix VirusTotal delay handling

* Fix mypy type for VirusTotal package map

---------

Co-authored-by: Janik Besendorf <janik@besendorf.org>
This commit is contained in:
Nimrod B.
2026-07-01 12:36:56 +02:00
committed by GitHub
parent a18e632ec8
commit f5b0a3cd91
6 changed files with 200 additions and 1 deletions
+16
View File
@@ -38,6 +38,22 @@ By separating artifact collection from forensic analysis, this approach ensures
For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf).
### VirusTotal package lookups
AndroidQF records APK file hashes in `packages.json`. MVT can optionally look up non-system APK hashes on VirusTotal while checking an AndroidQF acquisition:
```bash
MVT_VT_API_KEY=<key> mvt-android check-androidqf --virustotal /path/to/androidqf-output
```
The `--virustotal` option is disabled by default because it sends APK hashes to VirusTotal and requires network access. It uses the `VT_API_KEY` MVT configuration value, which can also be provided through the `MVT_VT_API_KEY` environment variable.
To avoid exhausting free VirusTotal API quotas, MVT waits 16 seconds between package hash requests by default. Use `--delay` to change the delay, or `--delay 0` to disable throttling:
```bash
mvt-android check-androidqf --virustotal --delay 30 /path/to/androidqf-output
```
## Android Intrusion Logs
On devices where the user has opted into Android's [**Advanced Protection Mode**](https://support.google.com/android/answer/16339980) and turned on the optional Intrusion Logging featrue, Android can create and archive structured *Intrusion Logs* in an encrypted format. These logs record DNS queries, outbound network connections, process starts, ADB activity and other security-relevant events, and are a high-fidelity complement to the rest of an AndroidQF acquisition. The logs are generated on-device and encrypted before being stored in the Google account associated with the device. The encryption key is protected by the user device PIN. The intrusion log data is not accessible to Google.
+10
View File
@@ -23,6 +23,7 @@ from mvt.common.help import (
HELP_MSG_CHECK_BUGREPORT,
HELP_MSG_CHECK_IOCS,
HELP_MSG_CHECK_INTRUSION_LOGS,
HELP_MSG_DELAY_CHECKS,
HELP_MSG_COMPLETION,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
HELP_MSG_DISABLE_UPDATE_CHECK,
@@ -36,6 +37,7 @@ from mvt.common.help import (
HELP_MSG_STIX2,
HELP_MSG_VERBOSE,
HELP_MSG_VERSION,
HELP_MSG_VIRUS_TOTAL,
)
from mvt.common.logo import logo
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
@@ -310,6 +312,10 @@ def check_backup(
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--virustotal", "-V", is_flag=True, help=HELP_MSG_VIRUS_TOTAL)
@click.option(
"--delay", "-d", type=click.IntRange(min=0), default=16, help=HELP_MSG_DELAY_CHECKS
)
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@@ -323,6 +329,8 @@ def check_androidqf(
module,
load_module,
hashes,
virustotal,
delay,
non_interactive,
backup_password,
verbose,
@@ -340,6 +348,8 @@ def check_androidqf(
module_options={
"interactive": not non_interactive,
"backup_password": cli_load_android_backup_password(log, backup_password),
"virustotal": virustotal,
"virustotal_delay": delay,
},
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
@@ -5,8 +5,11 @@
import json
import logging
import time
from typing import Optional
from rich.progress import track
from mvt.android.utils import (
BROWSER_INSTALLERS,
PLAY_STORE_INSTALLERS,
@@ -15,7 +18,8 @@ from mvt.android.utils import (
SYSTEM_UPDATE_PACKAGES,
THIRD_PARTY_STORE_INSTALLERS,
)
from mvt.common.module_types import ModuleResults
from mvt.common.module_types import ModuleAtomicResult, ModuleResults
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
from .base import AndroidQFModule
@@ -125,6 +129,65 @@ class AQFPackages(AndroidQFModule):
)
break
if self.module_options.get("virustotal", False):
self.check_virustotal(
delay=self.module_options.get("virustotal_delay", 0)
)
def check_virustotal(self, delay: int = 0) -> None:
files_by_hash: dict[
str, list[tuple[ModuleAtomicResult, ModuleAtomicResult]]
] = {}
for package in self.results:
if package.get("system", False):
continue
for package_file in package.get("files", []):
file_hash = package_file.get("sha256")
if not file_hash:
continue
files_by_hash.setdefault(file_hash, []).append((package, package_file))
total_hashes = len(files_by_hash)
if total_hashes == 0:
return
progress_desc = f"Looking up {total_hashes} package files on VirusTotal..."
for index, file_hash in enumerate(
track(files_by_hash, description=progress_desc)
):
try:
results = virustotal_lookup(file_hash)
except VTNoKey as exc:
self.log.warning("%s", exc)
return
except VTQuotaExceeded as exc:
self.log.warning("Unable to continue VirusTotal lookups: %s", exc)
break
if index < total_hashes - 1 and delay > 0:
time.sleep(delay)
if not results:
continue
attributes = results.get("attributes", {})
stats = attributes.get("last_analysis_stats", {})
positives = stats.get("malicious", 0)
total = len(attributes.get("last_analysis_results", {}))
detection = f"{positives}/{total}"
for package, package_file in files_by_hash[file_hash]:
package_file["virustotal"] = detection
if positives > 0:
self.alertstore.high(
f'VirusTotal flagged package "{package["name"]}" file '
f'"{package_file["path"]}" with {detection} detections',
"",
package,
)
def run(self) -> None:
packages = self._get_files_by_pattern("*/packages.json")
if not packages:
+2
View File
@@ -53,3 +53,5 @@ HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report"
HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup"
HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF"
HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files"
HELP_MSG_VIRUS_TOTAL = "Check package APK hashes on VirusTotal"
HELP_MSG_DELAY_CHECKS = "Delay in seconds between VirusTotal requests"
+53
View File
@@ -0,0 +1,53 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Any, Optional
import requests
from .config import settings
log = logging.getLogger(__name__)
class VTNoKey(Exception):
pass
class VTQuotaExceeded(Exception):
pass
def virustotal_lookup(file_hash: str) -> Optional[dict[str, Any]]:
if not settings.VT_API_KEY:
raise VTNoKey(
"No VirusTotal API key provided: to use VirusTotal lookups please set "
"MVT_VT_API_KEY or VT_API_KEY in the MVT configuration file"
)
headers = {
"User-Agent": "VirusTotal",
"Content-Type": "application/json",
"x-apikey": settings.VT_API_KEY,
}
res = requests.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers=headers,
timeout=settings.NETWORK_TIMEOUT,
)
if res.status_code == 200:
report = res.json()
return report["data"]
if res.status_code == 404:
log.info("Could not find results for file with hash %s", file_hash)
elif res.status_code == 429:
raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key")
else:
raise RuntimeError(f"Unexpected response from VirusTotal: {res.status_code}")
return None
+55
View File
@@ -7,8 +7,11 @@ import logging
from pathlib import Path
import pytest
from click.testing import CliRunner
from mvt.android.cli import check_androidqf
from mvt.android.modules.androidqf.aqf_packages import AQFPackages
from mvt.android.modules.androidqf import aqf_packages as aqf_packages_module
from mvt.common.module import run_module
from ..utils import get_android_androidqf, list_files
@@ -132,3 +135,55 @@ class TestAndroidqfPackages:
possible_detected_app[0].matched_indicator.value
== "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730"
)
def test_virustotal_delays_after_missing_result(self, monkeypatch):
lookups = []
sleeps = []
def fake_virustotal_lookup(file_hash):
lookups.append(file_hash)
if file_hash == "missing_hash":
return None
return {
"attributes": {
"last_analysis_stats": {"malicious": 1},
"last_analysis_results": {"engine": {}},
}
}
monkeypatch.setattr(
aqf_packages_module, "virustotal_lookup", fake_virustotal_lookup
)
monkeypatch.setattr(aqf_packages_module.time, "sleep", sleeps.append)
module = AQFPackages(
module_options={"virustotal": True, "virustotal_delay": 16},
results=[
{
"name": "org.example",
"installer": "com.android.vending",
"disabled": False,
"system": False,
"files": [
{"path": "/data/app/missing.apk", "sha256": "missing_hash"},
{"path": "/data/app/found.apk", "sha256": "found_hash"},
],
}
],
)
module.check_indicators()
assert lookups == ["missing_hash", "found_hash"]
assert sleeps == [16]
assert module.results[0]["files"][1]["virustotal"] == "1/1"
assert len(module.alertstore.alerts) == 1
def test_check_androidqf_rejects_negative_virustotal_delay(data_path):
runner = CliRunner()
result = runner.invoke(check_androidqf, ["--delay", "-1", data_path])
assert result.exit_code == 2
assert "Invalid value for '--delay'" in result.output