From 84dc13144d2f680743aa400c8789d832cacbd5ab Mon Sep 17 00:00:00 2001 From: tek Date: Tue, 1 Aug 2023 11:58:20 +0200 Subject: [PATCH] Refactor DumpsysAppOps --- mvt/android/artifacts/dumpsys_appops.py | 149 ++++++++++++++++++ mvt/android/modules/adb/dumpsys_appops.py | 47 +----- .../modules/androidqf/dumpsys_appops.py | 66 ++------ mvt/android/modules/bugreport/appops.py | 67 +------- mvt/android/parsers/__init__.py | 1 - mvt/android/parsers/dumpsys.py | 101 ------------ tests/android/test_artifact_dumpsys_appops.py | 47 ++++++ tests/android/test_dumpsys_parser.py | 18 --- 8 files changed, 216 insertions(+), 280 deletions(-) create mode 100644 mvt/android/artifacts/dumpsys_appops.py create mode 100644 tests/android/test_artifact_dumpsys_appops.py diff --git a/mvt/android/artifacts/dumpsys_appops.py b/mvt/android/artifacts/dumpsys_appops.py new file mode 100644 index 0000000..24fb8b5 --- /dev/null +++ b/mvt/android/artifacts/dumpsys_appops.py @@ -0,0 +1,149 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 Claudio Guarnieri. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ +from datetime import datetime +from typing import Any, Dict, List, Union + +from mvt.common.utils import convert_datetime_to_iso + +from .artifact import AndroidArtifact + + +class DumpsysAppops(AndroidArtifact): + """ + Parser for dumpsys app ops info + """ + + def serialize(self, record: dict) -> Union[dict, list]: + records = [] + for perm in record["permissions"]: + if "entries" not in perm: + continue + + for entry in perm["entries"]: + if "timestamp" in entry: + records.append( + { + "timestamp": entry["timestamp"], + "module": self.__class__.__name__, + "event": entry["access"], + "data": f"{record['package_name']} access to " + f"{perm['name']}: {entry['access']}", + } + ) + + return records + + def check_indicators(self) -> None: + for result in self.results: + if self.indicators: + ioc = self.indicators.check_app_id(result.get("package_name")) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + for perm in result["permissions"]: + if ( + perm["name"] == "REQUEST_INSTALL_PACKAGES" + and perm["access"] == "allow" + ): + self.log.info( + "Package %s with REQUEST_INSTALL_PACKAGES " "permission", + result["package_name"], + ) + + def parse(self, output: str) -> List[Dict[str, Any]]: + self.results: List[Dict[str, Any]] = [] + perm = {} + package = {} + entry = {} + uid = None + in_packages = False + + for line in output.splitlines(): + if line.startswith(" Uid 0:"): + in_packages = True + + if not in_packages: + continue + + if line.startswith(" Uid "): + uid = line[6:-1] + if entry: + perm["entries"].append(entry) + entry = {} + if package: + if perm: + package["permissions"].append(perm) + + perm = {} + self.results.append(package) + package = {} + continue + + if line.startswith(" Package "): + if entry: + perm["entries"].append(entry) + entry = {} + + if package: + if perm: + package["permissions"].append(perm) + + perm = {} + self.results.append(package) + + package = { + "package_name": line[12:-1], + "permissions": [], + "uid": uid, + } + continue + + if package and line.startswith(" ") and line[6] != " ": + if entry: + perm["entries"].append(entry) + entry = {} + if perm: + package["permissions"].append(perm) + perm = {} + + perm["name"] = line.split()[0] + perm["entries"] = [] + if len(line.split()) > 1: + perm["access"] = line.split()[1][1:-2] + + continue + + if line.startswith(" "): + # Permission entry like: + # Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms) + if entry: + perm["entries"].append(entry) + entry = {} + + entry["access"] = line.split(":")[0].strip() + entry["type"] = line[line.find("[") + 1 : line.find("]")] + + try: + entry["timestamp"] = convert_datetime_to_iso( + datetime.strptime( + line[line.find("]") + 1 : line.find("(")].strip(), + "%Y-%m-%d %H:%M:%S.%f", + ) + ) + except ValueError: + # Invalid date format + pass + + if line.strip() == "": + break + + if entry: + perm["entries"].append(entry) + if perm: + package["permissions"].append(perm) + if package: + self.results.append(package) diff --git a/mvt/android/modules/adb/dumpsys_appops.py b/mvt/android/modules/adb/dumpsys_appops.py index aaaf537..882f1b9 100644 --- a/mvt/android/modules/adb/dumpsys_appops.py +++ b/mvt/android/modules/adb/dumpsys_appops.py @@ -4,14 +4,14 @@ # https://license.mvt.re/1.1/ import logging -from typing import Optional, Union +from typing import Optional -from mvt.android.parsers.dumpsys import parse_dumpsys_appops +from mvt.android.artifacts.dumpsys_appops import DumpsysAppops as DAO from .base import AndroidExtraction -class DumpsysAppOps(AndroidExtraction): +class DumpsysAppOps(DAO, AndroidExtraction): """This module extracts records from App-op Manager.""" slug = "dumpsys_appops" @@ -34,51 +34,12 @@ class DumpsysAppOps(AndroidExtraction): results=results, ) - def serialize(self, record: dict) -> Union[dict, list]: - records = [] - for perm in record["permissions"]: - if "entries" not in perm: - continue - - for entry in perm["entries"]: - if "timestamp" in entry: - records.append( - { - "timestamp": entry["timestamp"], - "module": self.__class__.__name__, - "event": entry["access"], - "data": f"{record['package_name']} access to " - f"{perm['name']}: {entry['access']}", - } - ) - - return records - - def check_indicators(self) -> None: - for result in self.results: - if self.indicators: - ioc = self.indicators.check_app_id(result.get("package_name")) - if ioc: - result["matched_indicator"] = ioc - self.detected.append(result) - continue - - for perm in result["permissions"]: - if ( - perm["name"] == "REQUEST_INSTALL_PACKAGES" - and perm["access"] == "allow" - ): - self.log.info( - "Package %s with REQUEST_INSTALL_PACKAGES " "permission", - result["package_name"], - ) - def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys appops") self._adb_disconnect() - self.results = parse_dumpsys_appops(output) + self.parse(output) self.log.info( "Extracted a total of %d records from app-ops manager", len(self.results) diff --git a/mvt/android/modules/androidqf/dumpsys_appops.py b/mvt/android/modules/androidqf/dumpsys_appops.py index 3f5efc9..9e6bf44 100644 --- a/mvt/android/modules/androidqf/dumpsys_appops.py +++ b/mvt/android/modules/androidqf/dumpsys_appops.py @@ -4,14 +4,14 @@ # https://license.mvt.re/1.1/ import logging -from typing import Optional, Union +from typing import Optional -from mvt.android.parsers import parse_dumpsys_appops +from mvt.android.artifacts.dumpsys_appops import DumpsysAppops as DAO from .base import AndroidQFModule -class DumpsysAppops(AndroidQFModule): +class DumpsysAppops(DAO, AndroidQFModule): def __init__( self, file_path: Optional[str] = None, @@ -30,65 +30,17 @@ class DumpsysAppops(AndroidQFModule): results=results, ) - def serialize(self, record: dict) -> Union[dict, list]: - records = [] - for perm in record["permissions"]: - if "entries" not in perm: - continue - - for entry in perm["entries"]: - if "timestamp" in entry: - records.append( - { - "timestamp": entry["timestamp"], - "module": self.__class__.__name__, - "event": entry["access"], - "data": f"{record['package_name']} access to " - f"{perm['name']} : {entry['access']}", - } - ) - - return records - - def check_indicators(self) -> None: - for result in self.results: - if self.indicators: - ioc = self.indicators.check_app_id(result.get("package_name")) - if ioc: - result["matched_indicator"] = ioc - self.detected.append(result) - continue - - for perm in result["permissions"]: - if ( - perm["name"] == "REQUEST_INSTALL_PACKAGES" - and perm["access"] == "allow" - ): - self.log.info( - "Package %s with REQUEST_INSTALL_PACKAGES permission", - result["package_name"], - ) - def run(self) -> None: dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt") if not dumpsys_file: return - lines = [] - in_package = False + # Extract section data = self._get_file_content(dumpsys_file[0]) - for line in data.decode("utf-8").split("\n"): - if line.startswith("DUMP OF SERVICE appops:"): - in_package = True - continue + section = self.extract_dumpsys_section( + data.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:" + ) - if in_package: - if line.startswith( - "-------------------------------------------------------------------------------" - ): # pylint: disable=line-too-long - break - - lines.append(line.rstrip()) - - self.results = parse_dumpsys_appops("\n".join(lines)) + # Parse it + self.parse(section) self.log.info("Identified %d applications in AppOps Manager", len(self.results)) diff --git a/mvt/android/modules/bugreport/appops.py b/mvt/android/modules/bugreport/appops.py index 0beb022..e5ef0ae 100644 --- a/mvt/android/modules/bugreport/appops.py +++ b/mvt/android/modules/bugreport/appops.py @@ -4,14 +4,14 @@ # https://license.mvt.re/1.1/ import logging -from typing import Optional, Union +from typing import Optional -from mvt.android.parsers import parse_dumpsys_appops +from mvt.android.artifacts.dumpsys_appops import DumpsysAppops from .base import BugReportModule -class Appops(BugReportModule): +class Appops(DumpsysAppops, BugReportModule): """This module extracts information on package from App-Ops Manager.""" def __init__( @@ -32,45 +32,6 @@ class Appops(BugReportModule): results=results, ) - def serialize(self, record: dict) -> Union[dict, list]: - records = [] - for perm in record["permissions"]: - if "entries" not in perm: - continue - - for entry in perm["entries"]: - if "timestamp" in entry: - records.append( - { - "timestamp": entry["timestamp"], - "module": self.__class__.__name__, - "event": entry["access"], - "data": f"{record['package_name']} access to " - f"{perm['name']}: {entry['access']}", - } - ) - - return records - - def check_indicators(self) -> None: - for result in self.results: - if self.indicators: - ioc = self.indicators.check_app_id(result.get("package_name")) - if ioc: - result["matched_indicator"] = ioc - self.detected.append(result) - continue - - for perm in result["permissions"]: - if ( - perm["name"] == "REQUEST_INSTALL_PACKAGES" - and perm["access"] == "allow" - ): - self.log.info( - "Package %s with REQUEST_INSTALL_PACKAGES permission", - result["package_name"], - ) - def run(self) -> None: content = self._get_dumpstate_file() if not content: @@ -80,24 +41,10 @@ class Appops(BugReportModule): ) return - lines = [] - in_appops = False - for line in content.decode(errors="ignore").splitlines(): - if line.strip() == "DUMP OF SERVICE appops:": - in_appops = True - continue - - if not in_appops: - continue - - if line.strip().startswith( - "------------------------------------------------------------------------------" - ): # pylint: disable=line-too-long - break - - lines.append(line) - - self.results = parse_dumpsys_appops("\n".join(lines)) + section = self.extract_dumpsys_section( + content.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:" + ) + self.parse(section) self.log.info( "Identified a total of %d packages in App-Ops Manager", len(self.results) diff --git a/mvt/android/parsers/__init__.py b/mvt/android/parsers/__init__.py index 9e48ae9..dd1afcb 100644 --- a/mvt/android/parsers/__init__.py +++ b/mvt/android/parsers/__init__.py @@ -4,7 +4,6 @@ # https://license.mvt.re/1.1/ from .dumpsys import ( - parse_dumpsys_appops, parse_dumpsys_battery_daily, parse_dumpsys_battery_history, parse_dumpsys_receiver_resolver_table, diff --git a/mvt/android/parsers/dumpsys.py b/mvt/android/parsers/dumpsys.py index 8720194..0b4b156 100644 --- a/mvt/android/parsers/dumpsys.py +++ b/mvt/android/parsers/dumpsys.py @@ -4,11 +4,8 @@ # https://license.mvt.re/1.1/ import re -from datetime import datetime from typing import Any, Dict, List -from mvt.common.utils import convert_datetime_to_iso - def parse_dumpsys_battery_daily(output: str) -> list: results = [] @@ -176,104 +173,6 @@ def parse_dumpsys_receiver_resolver_table(output: str) -> Dict[str, Any]: return results -def parse_dumpsys_appops(output: str) -> List[Dict[str, Any]]: - results = [] - perm = {} - package = {} - entry = {} - uid = None - in_packages = False - - for line in output.splitlines(): - if line.startswith(" Uid 0:"): - in_packages = True - - if not in_packages: - continue - - if line.startswith(" Uid "): - uid = line[6:-1] - if entry: - perm["entries"].append(entry) - entry = {} - - if package: - if perm: - package["permissions"].append(perm) - - perm = {} - results.append(package) - package = {} - continue - - if line.startswith(" Package "): - if entry: - perm["entries"].append(entry) - entry = {} - - if package: - if perm: - package["permissions"].append(perm) - - perm = {} - results.append(package) - - package = { - "package_name": line[12:-1], - "permissions": [], - "uid": uid, - } - continue - - if package and line.startswith(" ") and line[6] != " ": - if entry: - perm["entries"].append(entry) - entry = {} - if perm: - package["permissions"].append(perm) - perm = {} - - perm["name"] = line.split()[0] - perm["entries"] = [] - if len(line.split()) > 1: - perm["access"] = line.split()[1][1:-2] - - continue - - if line.startswith(" "): - # Permission entry like: - # Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms) - if entry: - perm["entries"].append(entry) - entry = {} - - entry["access"] = line.split(":")[0].strip() - entry["type"] = line[line.find("[") + 1 : line.find("]")] - - try: - entry["timestamp"] = convert_datetime_to_iso( - datetime.strptime( - line[line.find("]") + 1 : line.find("(")].strip(), - "%Y-%m-%d %H:%M:%S.%f", - ) - ) - except ValueError: - # Invalid date format - pass - - if line.strip() == "": - break - - if entry: - perm["entries"].append(entry) - if perm: - package["permissions"].append(perm) - if package: - results.append(package) - - return results - - def parse_dumpsys_package_for_details(output: str) -> Dict[str, Any]: """ Parse one entry of a dumpsys package information diff --git a/tests/android/test_artifact_dumpsys_appops.py b/tests/android/test_artifact_dumpsys_appops.py new file mode 100644 index 0000000..489c2d9 --- /dev/null +++ b/tests/android/test_artifact_dumpsys_appops.py @@ -0,0 +1,47 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 Claudio Guarnieri. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ +import logging + +from mvt.android.artifacts.dumpsys_appops import DumpsysAppops +from mvt.common.indicators import Indicators + +from ..utils import get_artifact + + +class TestDumpsysAppopsArtifact: + def test_parsing(self): + da = DumpsysAppops() + da.log = logging + file = get_artifact("android_data/dumpsys_appops.txt") + with open(file) as f: + data = f.read() + + assert len(da.results) == 0 + da.parse(data) + assert len(da.results) == 13 + assert da.results[0]["package_name"] == "com.android.phone" + assert da.results[0]["uid"] == "0" + assert len(da.results[0]["permissions"]) == 1 + assert da.results[0]["permissions"][0]["name"] == "MANAGE_IPSEC_TUNNELS" + assert da.results[0]["permissions"][0]["access"] == "allow" + assert da.results[6]["package_name"] == "com.sec.factory.camera" + assert len(da.results[6]["permissions"][1]["entries"]) == 1 + assert len(da.results[11]["permissions"]) == 4 + + def test_ioc_check(self, indicator_file): + da = DumpsysAppops() + da.log = logging + file = get_artifact("android_data/dumpsys_appops.txt") + with open(file) as f: + data = f.read() + da.parse(data) + + ind = Indicators(log=logging.getLogger()) + ind.parse_stix2(indicator_file) + ind.ioc_collections[0]["app_ids"].append("com.facebook.katana") + da.indicators = ind + assert len(da.detected) == 0 + da.check_indicators() + assert len(da.detected) == 1 diff --git a/tests/android/test_dumpsys_parser.py b/tests/android/test_dumpsys_parser.py index c8e33b4..ce3604e 100644 --- a/tests/android/test_dumpsys_parser.py +++ b/tests/android/test_dumpsys_parser.py @@ -4,7 +4,6 @@ # https://license.mvt.re/1.1/ from mvt.android.parsers.dumpsys import ( - parse_dumpsys_appops, parse_dumpsys_battery_history, parse_dumpsys_packages, ) @@ -13,23 +12,6 @@ from ..utils import get_artifact class TestDumpsysParsing: - def test_appops_parsing(self): - file = get_artifact("android_data/dumpsys_appops.txt") - with open(file) as f: - data = f.read() - - res = parse_dumpsys_appops(data) - - assert len(res) == 13 - assert res[0]["package_name"] == "com.android.phone" - assert res[0]["uid"] == "0" - assert len(res[0]["permissions"]) == 1 - assert res[0]["permissions"][0]["name"] == "MANAGE_IPSEC_TUNNELS" - assert res[0]["permissions"][0]["access"] == "allow" - assert res[6]["package_name"] == "com.sec.factory.camera" - assert len(res[6]["permissions"][1]["entries"]) == 1 - assert len(res[11]["permissions"]) == 4 - def test_battery_history_parsing(self): file = get_artifact("android_data/dumpsys_battery.txt") with open(file) as f: