Refactor DumpsysAppOps

This commit is contained in:
tek
2023-08-01 11:58:20 +02:00
parent 6356a4ff87
commit 84dc13144d
8 changed files with 216 additions and 280 deletions

View File

@@ -0,0 +1,149 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from datetime import datetime
from typing import Any, Dict, List, Union
from mvt.common.utils import convert_datetime_to_iso
from .artifact import AndroidArtifact
class DumpsysAppops(AndroidArtifact):
"""
Parser for dumpsys app ops info
"""
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
result["package_name"],
)
def parse(self, output: str) -> List[Dict[str, Any]]:
self.results: List[Dict[str, Any]] = []
perm = {}
package = {}
entry = {}
uid = None
in_packages = False
for line in output.splitlines():
if line.startswith(" Uid 0:"):
in_packages = True
if not in_packages:
continue
if line.startswith(" Uid "):
uid = line[6:-1]
if entry:
perm["entries"].append(entry)
entry = {}
if package:
if perm:
package["permissions"].append(perm)
perm = {}
self.results.append(package)
package = {}
continue
if line.startswith(" Package "):
if entry:
perm["entries"].append(entry)
entry = {}
if package:
if perm:
package["permissions"].append(perm)
perm = {}
self.results.append(package)
package = {
"package_name": line[12:-1],
"permissions": [],
"uid": uid,
}
continue
if package and line.startswith(" ") and line[6] != " ":
if entry:
perm["entries"].append(entry)
entry = {}
if perm:
package["permissions"].append(perm)
perm = {}
perm["name"] = line.split()[0]
perm["entries"] = []
if len(line.split()) > 1:
perm["access"] = line.split()[1][1:-2]
continue
if line.startswith(" "):
# Permission entry like:
# Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms)
if entry:
perm["entries"].append(entry)
entry = {}
entry["access"] = line.split(":")[0].strip()
entry["type"] = line[line.find("[") + 1 : line.find("]")]
try:
entry["timestamp"] = convert_datetime_to_iso(
datetime.strptime(
line[line.find("]") + 1 : line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f",
)
)
except ValueError:
# Invalid date format
pass
if line.strip() == "":
break
if entry:
perm["entries"].append(entry)
if perm:
package["permissions"].append(perm)
if package:
self.results.append(package)

View File

@@ -4,14 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.android.parsers.dumpsys import parse_dumpsys_appops
from mvt.android.artifacts.dumpsys_appops import DumpsysAppops as DAO
from .base import AndroidExtraction
class DumpsysAppOps(AndroidExtraction):
class DumpsysAppOps(DAO, AndroidExtraction):
"""This module extracts records from App-op Manager."""
slug = "dumpsys_appops"
@@ -34,51 +34,12 @@ class DumpsysAppOps(AndroidExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
result["package_name"],
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys appops")
self._adb_disconnect()
self.results = parse_dumpsys_appops(output)
self.parse(output)
self.log.info(
"Extracted a total of %d records from app-ops manager", len(self.results)

View File

@@ -4,14 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.android.parsers import parse_dumpsys_appops
from mvt.android.artifacts.dumpsys_appops import DumpsysAppops as DAO
from .base import AndroidQFModule
class DumpsysAppops(AndroidQFModule):
class DumpsysAppops(DAO, AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
@@ -30,65 +30,17 @@ class DumpsysAppops(AndroidQFModule):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']} : {entry['access']}",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"],
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
lines = []
in_package = False
# Extract section
data = self._get_file_content(dumpsys_file[0])
for line in data.decode("utf-8").split("\n"):
if line.startswith("DUMP OF SERVICE appops:"):
in_package = True
continue
section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:"
)
if in_package:
if line.startswith(
"-------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_appops("\n".join(lines))
# Parse it
self.parse(section)
self.log.info("Identified %d applications in AppOps Manager", len(self.results))

View File

@@ -4,14 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.android.parsers import parse_dumpsys_appops
from mvt.android.artifacts.dumpsys_appops import DumpsysAppops
from .base import BugReportModule
class Appops(BugReportModule):
class Appops(DumpsysAppops, BugReportModule):
"""This module extracts information on package from App-Ops Manager."""
def __init__(
@@ -32,45 +32,6 @@ class Appops(BugReportModule):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"],
)
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
@@ -80,24 +41,10 @@ class Appops(BugReportModule):
)
return
lines = []
in_appops = False
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE appops:":
in_appops = True
continue
if not in_appops:
continue
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_appops("\n".join(lines))
section = self.extract_dumpsys_section(
content.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:"
)
self.parse(section)
self.log.info(
"Identified a total of %d packages in App-Ops Manager", len(self.results)

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
from .dumpsys import (
parse_dumpsys_appops,
parse_dumpsys_battery_daily,
parse_dumpsys_battery_history,
parse_dumpsys_receiver_resolver_table,

View File

@@ -4,11 +4,8 @@
# https://license.mvt.re/1.1/
import re
from datetime import datetime
from typing import Any, Dict, List
from mvt.common.utils import convert_datetime_to_iso
def parse_dumpsys_battery_daily(output: str) -> list:
results = []
@@ -176,104 +173,6 @@ def parse_dumpsys_receiver_resolver_table(output: str) -> Dict[str, Any]:
return results
def parse_dumpsys_appops(output: str) -> List[Dict[str, Any]]:
results = []
perm = {}
package = {}
entry = {}
uid = None
in_packages = False
for line in output.splitlines():
if line.startswith(" Uid 0:"):
in_packages = True
if not in_packages:
continue
if line.startswith(" Uid "):
uid = line[6:-1]
if entry:
perm["entries"].append(entry)
entry = {}
if package:
if perm:
package["permissions"].append(perm)
perm = {}
results.append(package)
package = {}
continue
if line.startswith(" Package "):
if entry:
perm["entries"].append(entry)
entry = {}
if package:
if perm:
package["permissions"].append(perm)
perm = {}
results.append(package)
package = {
"package_name": line[12:-1],
"permissions": [],
"uid": uid,
}
continue
if package and line.startswith(" ") and line[6] != " ":
if entry:
perm["entries"].append(entry)
entry = {}
if perm:
package["permissions"].append(perm)
perm = {}
perm["name"] = line.split()[0]
perm["entries"] = []
if len(line.split()) > 1:
perm["access"] = line.split()[1][1:-2]
continue
if line.startswith(" "):
# Permission entry like:
# Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms)
if entry:
perm["entries"].append(entry)
entry = {}
entry["access"] = line.split(":")[0].strip()
entry["type"] = line[line.find("[") + 1 : line.find("]")]
try:
entry["timestamp"] = convert_datetime_to_iso(
datetime.strptime(
line[line.find("]") + 1 : line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f",
)
)
except ValueError:
# Invalid date format
pass
if line.strip() == "":
break
if entry:
perm["entries"].append(entry)
if perm:
package["permissions"].append(perm)
if package:
results.append(package)
return results
def parse_dumpsys_package_for_details(output: str) -> Dict[str, Any]:
"""
Parse one entry of a dumpsys package information

View File

@@ -0,0 +1,47 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.artifacts.dumpsys_appops import DumpsysAppops
from mvt.common.indicators import Indicators
from ..utils import get_artifact
class TestDumpsysAppopsArtifact:
def test_parsing(self):
da = DumpsysAppops()
da.log = logging
file = get_artifact("android_data/dumpsys_appops.txt")
with open(file) as f:
data = f.read()
assert len(da.results) == 0
da.parse(data)
assert len(da.results) == 13
assert da.results[0]["package_name"] == "com.android.phone"
assert da.results[0]["uid"] == "0"
assert len(da.results[0]["permissions"]) == 1
assert da.results[0]["permissions"][0]["name"] == "MANAGE_IPSEC_TUNNELS"
assert da.results[0]["permissions"][0]["access"] == "allow"
assert da.results[6]["package_name"] == "com.sec.factory.camera"
assert len(da.results[6]["permissions"][1]["entries"]) == 1
assert len(da.results[11]["permissions"]) == 4
def test_ioc_check(self, indicator_file):
da = DumpsysAppops()
da.log = logging
file = get_artifact("android_data/dumpsys_appops.txt")
with open(file) as f:
data = f.read()
da.parse(data)
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["app_ids"].append("com.facebook.katana")
da.indicators = ind
assert len(da.detected) == 0
da.check_indicators()
assert len(da.detected) == 1

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
from mvt.android.parsers.dumpsys import (
parse_dumpsys_appops,
parse_dumpsys_battery_history,
parse_dumpsys_packages,
)
@@ -13,23 +12,6 @@ from ..utils import get_artifact
class TestDumpsysParsing:
def test_appops_parsing(self):
file = get_artifact("android_data/dumpsys_appops.txt")
with open(file) as f:
data = f.read()
res = parse_dumpsys_appops(data)
assert len(res) == 13
assert res[0]["package_name"] == "com.android.phone"
assert res[0]["uid"] == "0"
assert len(res[0]["permissions"]) == 1
assert res[0]["permissions"][0]["name"] == "MANAGE_IPSEC_TUNNELS"
assert res[0]["permissions"][0]["access"] == "allow"
assert res[6]["package_name"] == "com.sec.factory.camera"
assert len(res[6]["permissions"][1]["entries"]) == 1
assert len(res[11]["permissions"]) == 4
def test_battery_history_parsing(self):
file = get_artifact("android_data/dumpsys_battery.txt")
with open(file) as f: