diff --git a/docs/android/methodology.md b/docs/android/methodology.md index fc1a6c7..797ada8 100644 --- a/docs/android/methodology.md +++ b/docs/android/methodology.md @@ -38,6 +38,22 @@ By separating artifact collection from forensic analysis, this approach ensures For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf). +### VirusTotal package lookups + +AndroidQF records APK file hashes in `packages.json`. MVT can optionally look up non-system APK hashes on VirusTotal while checking an AndroidQF acquisition: + +```bash +MVT_VT_API_KEY= mvt-android check-androidqf --virustotal /path/to/androidqf-output +``` + +The `--virustotal` option is disabled by default because it sends APK hashes to VirusTotal and requires network access. It uses the `VT_API_KEY` MVT configuration value, which can also be provided through the `MVT_VT_API_KEY` environment variable. + +To avoid exhausting free VirusTotal API quotas, MVT waits 16 seconds between package hash requests by default. Use `--delay` to change the delay, or `--delay 0` to disable throttling: + +```bash +mvt-android check-androidqf --virustotal --delay 30 /path/to/androidqf-output +``` + ## Android Intrusion Logs On devices where the user has opted into Android's [**Advanced Protection Mode**](https://support.google.com/android/answer/16339980) and turned on the optional Intrusion Logging featrue, Android can create and archive structured *Intrusion Logs* in an encrypted format. These logs record DNS queries, outbound network connections, process starts, ADB activity and other security-relevant events, and are a high-fidelity complement to the rest of an AndroidQF acquisition. The logs are generated on-device and encrypted before being stored in the Google account associated with the device. The encryption key is protected by the user device PIN. The intrusion log data is not accessible to Google. diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index d13f86c..1123742 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -23,6 +23,7 @@ from mvt.common.help import ( HELP_MSG_CHECK_BUGREPORT, HELP_MSG_CHECK_IOCS, HELP_MSG_CHECK_INTRUSION_LOGS, + HELP_MSG_DELAY_CHECKS, HELP_MSG_COMPLETION, HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK, HELP_MSG_DISABLE_UPDATE_CHECK, @@ -36,6 +37,7 @@ from mvt.common.help import ( HELP_MSG_STIX2, HELP_MSG_VERBOSE, HELP_MSG_VERSION, + HELP_MSG_VIRUS_TOTAL, ) from mvt.common.logo import logo from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules @@ -310,6 +312,10 @@ def check_backup( help=HELP_MSG_LOAD_MODULE, ) @click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES) +@click.option("--virustotal", "-V", is_flag=True, help=HELP_MSG_VIRUS_TOTAL) +@click.option( + "--delay", "-d", type=click.IntRange(min=0), default=16, help=HELP_MSG_DELAY_CHECKS +) @click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE) @click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD) @click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) @@ -323,6 +329,8 @@ def check_androidqf( module, load_module, hashes, + virustotal, + delay, non_interactive, backup_password, verbose, @@ -340,6 +348,8 @@ def check_androidqf( module_options={ "interactive": not non_interactive, "backup_password": cli_load_android_backup_password(log, backup_password), + "virustotal": virustotal, + "virustotal_delay": delay, }, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], diff --git a/src/mvt/android/modules/androidqf/aqf_packages.py b/src/mvt/android/modules/androidqf/aqf_packages.py index 264fd1e..294ad53 100644 --- a/src/mvt/android/modules/androidqf/aqf_packages.py +++ b/src/mvt/android/modules/androidqf/aqf_packages.py @@ -5,8 +5,11 @@ import json import logging +import time from typing import Optional +from rich.progress import track + from mvt.android.utils import ( BROWSER_INSTALLERS, PLAY_STORE_INSTALLERS, @@ -15,7 +18,8 @@ from mvt.android.utils import ( SYSTEM_UPDATE_PACKAGES, THIRD_PARTY_STORE_INSTALLERS, ) -from mvt.common.module_types import ModuleResults +from mvt.common.module_types import ModuleAtomicResult, ModuleResults +from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup from .base import AndroidQFModule @@ -125,6 +129,65 @@ class AQFPackages(AndroidQFModule): ) break + if self.module_options.get("virustotal", False): + self.check_virustotal( + delay=self.module_options.get("virustotal_delay", 0) + ) + + def check_virustotal(self, delay: int = 0) -> None: + files_by_hash: dict[ + str, list[tuple[ModuleAtomicResult, ModuleAtomicResult]] + ] = {} + for package in self.results: + if package.get("system", False): + continue + + for package_file in package.get("files", []): + file_hash = package_file.get("sha256") + if not file_hash: + continue + + files_by_hash.setdefault(file_hash, []).append((package, package_file)) + + total_hashes = len(files_by_hash) + if total_hashes == 0: + return + + progress_desc = f"Looking up {total_hashes} package files on VirusTotal..." + for index, file_hash in enumerate( + track(files_by_hash, description=progress_desc) + ): + try: + results = virustotal_lookup(file_hash) + except VTNoKey as exc: + self.log.warning("%s", exc) + return + except VTQuotaExceeded as exc: + self.log.warning("Unable to continue VirusTotal lookups: %s", exc) + break + + if index < total_hashes - 1 and delay > 0: + time.sleep(delay) + + if not results: + continue + + attributes = results.get("attributes", {}) + stats = attributes.get("last_analysis_stats", {}) + positives = stats.get("malicious", 0) + total = len(attributes.get("last_analysis_results", {})) + detection = f"{positives}/{total}" + + for package, package_file in files_by_hash[file_hash]: + package_file["virustotal"] = detection + if positives > 0: + self.alertstore.high( + f'VirusTotal flagged package "{package["name"]}" file ' + f'"{package_file["path"]}" with {detection} detections', + "", + package, + ) + def run(self) -> None: packages = self._get_files_by_pattern("*/packages.json") if not packages: diff --git a/src/mvt/common/help.py b/src/mvt/common/help.py index 5a2dc25..90e71ca 100644 --- a/src/mvt/common/help.py +++ b/src/mvt/common/help.py @@ -53,3 +53,5 @@ HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report" HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup" HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF" HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files" +HELP_MSG_VIRUS_TOTAL = "Check package APK hashes on VirusTotal" +HELP_MSG_DELAY_CHECKS = "Delay in seconds between VirusTotal requests" diff --git a/src/mvt/common/virustotal.py b/src/mvt/common/virustotal.py new file mode 100644 index 0000000..9a5df32 --- /dev/null +++ b/src/mvt/common/virustotal.py @@ -0,0 +1,53 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Any, Optional + +import requests + +from .config import settings + +log = logging.getLogger(__name__) + + +class VTNoKey(Exception): + pass + + +class VTQuotaExceeded(Exception): + pass + + +def virustotal_lookup(file_hash: str) -> Optional[dict[str, Any]]: + if not settings.VT_API_KEY: + raise VTNoKey( + "No VirusTotal API key provided: to use VirusTotal lookups please set " + "MVT_VT_API_KEY or VT_API_KEY in the MVT configuration file" + ) + + headers = { + "User-Agent": "VirusTotal", + "Content-Type": "application/json", + "x-apikey": settings.VT_API_KEY, + } + res = requests.get( + f"https://www.virustotal.com/api/v3/files/{file_hash}", + headers=headers, + timeout=settings.NETWORK_TIMEOUT, + ) + + if res.status_code == 200: + report = res.json() + return report["data"] + + if res.status_code == 404: + log.info("Could not find results for file with hash %s", file_hash) + elif res.status_code == 429: + raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key") + else: + raise RuntimeError(f"Unexpected response from VirusTotal: {res.status_code}") + + return None diff --git a/tests/android_androidqf/test_packages.py b/tests/android_androidqf/test_packages.py index ae0ae62..a879e88 100644 --- a/tests/android_androidqf/test_packages.py +++ b/tests/android_androidqf/test_packages.py @@ -7,8 +7,11 @@ import logging from pathlib import Path import pytest +from click.testing import CliRunner +from mvt.android.cli import check_androidqf from mvt.android.modules.androidqf.aqf_packages import AQFPackages +from mvt.android.modules.androidqf import aqf_packages as aqf_packages_module from mvt.common.module import run_module from ..utils import get_android_androidqf, list_files @@ -132,3 +135,55 @@ class TestAndroidqfPackages: possible_detected_app[0].matched_indicator.value == "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730" ) + + def test_virustotal_delays_after_missing_result(self, monkeypatch): + lookups = [] + sleeps = [] + + def fake_virustotal_lookup(file_hash): + lookups.append(file_hash) + if file_hash == "missing_hash": + return None + return { + "attributes": { + "last_analysis_stats": {"malicious": 1}, + "last_analysis_results": {"engine": {}}, + } + } + + monkeypatch.setattr( + aqf_packages_module, "virustotal_lookup", fake_virustotal_lookup + ) + monkeypatch.setattr(aqf_packages_module.time, "sleep", sleeps.append) + + module = AQFPackages( + module_options={"virustotal": True, "virustotal_delay": 16}, + results=[ + { + "name": "org.example", + "installer": "com.android.vending", + "disabled": False, + "system": False, + "files": [ + {"path": "/data/app/missing.apk", "sha256": "missing_hash"}, + {"path": "/data/app/found.apk", "sha256": "found_hash"}, + ], + } + ], + ) + + module.check_indicators() + + assert lookups == ["missing_hash", "found_hash"] + assert sleeps == [16] + assert module.results[0]["files"][1]["virustotal"] == "1/1" + assert len(module.alertstore.alerts) == 1 + + +def test_check_androidqf_rejects_negative_virustotal_delay(data_path): + runner = CliRunner() + + result = runner.invoke(check_androidqf, ["--delay", "-1", data_path]) + + assert result.exit_code == 2 + assert "Invalid value for '--delay'" in result.output