Compare commits

..

1 Commits

Author SHA1 Message Date
besendorf
d8128238a1 Replace betterproto with betterproto2 in dependencies 2026-04-06 18:22:19 +02:00
15 changed files with 180 additions and 372 deletions

View File

@@ -11,7 +11,7 @@ jobs:
name: Add issue to project
runs-on: ubuntu-latest
steps:
- uses: actions/add-to-project@v0.5.0
- uses: actions/add-to-project@v1
with:
# You can target a project in a different organization
# to the issue

View File

@@ -23,7 +23,7 @@ test-requirements:
generate-proto-parsers:
# Generate python parsers for protobuf files
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto2_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
clean:
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info

View File

@@ -4,9 +4,6 @@
# Mobile Verification Toolkit
> [!IMPORTANT]
> Soon we will merge the v3 pull request which will result in breaking changes. If you rely on mvt output in other script make sure to the the branch before we merge. More details: https://github.com/mvt-project/mvt/issues/757
[![](https://img.shields.io/pypi/v/mvt)](https://pypi.org/project/mvt/)
[![Documentation Status](https://readthedocs.org/projects/mvt/badge/?version=latest)](https://docs.mvt.re/en/latest/?badge=latest)
[![CI](https://github.com/mvt-project/mvt/actions/workflows/tests.yml/badge.svg)](https://github.com/mvt-project/mvt/actions/workflows/tests.yml)

View File

@@ -17,8 +17,8 @@ classifiers = [
"Programming Language :: Python",
]
dependencies = [
"click==8.3.2",
"rich==14.3.3",
"click==8.3.1",
"rich==14.2.0",
"tld==0.13.1",
"requests==2.33.1",
"simplejson==3.20.2",
@@ -35,7 +35,7 @@ dependencies = [
"pydantic-settings==2.13.1",
"NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0",
"tzdata==2026.1",
"tzdata==2025.3",
]
requires-python = ">= 3.10"
@@ -57,7 +57,7 @@ dev = [
"stix2>=3.0.1",
"ruff>=0.1.6",
"mypy>=1.7.1",
"betterproto2-compiler",
"betterproto[compiler]",
]
[build-system]

View File

@@ -22,13 +22,13 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
def parse(self, content: str) -> None:
"""
Parse the Dumpsys Accessibility section.
Adds results to self.results (List[Dict[str, Any]])
Parse the Dumpsys Accessibility section/
Adds results to self.results (List[Dict[str, str]])
:param content: content of the accessibility section (string)
"""
# Parse installed services
# "Old" syntax
in_services = False
for line in content.splitlines():
if line.strip().startswith("installed services:"):
@@ -39,6 +39,7 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
continue
if line.strip() == "}":
# At end of installed services
break
service = line.split(":")[1].strip()
@@ -47,66 +48,21 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
{
"package_name": service.split("/")[0],
"service": service,
"enabled": False,
}
)
# Parse enabled services from both old and new formats.
#
# Old format (multi-line block):
# enabled services: {
# 0 : com.example/.MyService
# }
#
# New format (single line, AOSP >= 14):
# Enabled services:{{com.example/com.example.MyService}, {com.other/com.other.Svc}}
enabled_services = set()
# "New" syntax - AOSP >= 14 (?)
# Looks like:
# Enabled services:{{com.azure.authenticator/com.microsoft.brooklyn.module.accessibility.BrooklynAccessibilityService}, {com.agilebits.onepassword/com.agilebits.onepassword.filling.accessibility.FillingAccessibilityService}}
in_enabled = False
for line in content.splitlines():
stripped = line.strip()
if line.strip().startswith("Enabled services:"):
matches = re.finditer(r"{([^{]+?)}", line)
if in_enabled:
if stripped == "}":
in_enabled = False
continue
service = line.split(":")[1].strip()
enabled_services.add(service)
continue
if re.match(r"enabled services:\s*\{\s*$", stripped, re.IGNORECASE):
# Old multi-line format: "enabled services: {"
in_enabled = True
continue
if re.match(r"enabled services:\s*\{", stripped, re.IGNORECASE):
# New single-line format: "Enabled services:{{pkg/svc}, {pkg2/svc2}}"
matches = re.finditer(r"\{([^{}]+)\}", stripped)
for match in matches:
enabled_services.add(match.group(1).strip())
# Each match is in format: <package_name>/<service>
package_name, _, service = match.group(1).partition("/")
# Mark installed services that are enabled.
# Installed service names may include trailing annotations like
# "(A11yTool)" that are absent from the enabled services list,
# so strip annotations before comparing.
def _strip_annotation(s: str) -> str:
return re.sub(r"\s+\(.*?\)\s*$", "", s)
installed_stripped = {
_strip_annotation(r["service"]): r for r in self.results
}
for enabled in enabled_services:
if enabled in installed_stripped:
installed_stripped[enabled]["enabled"] = True
# Add enabled services not found in the installed list
for service in enabled_services:
if service not in installed_stripped:
package_name, _, _ = service.partition("/")
self.results.append(
{
"package_name": package_name,
"service": service,
"enabled": True,
}
)
self.results.append(
{"package_name": package_name, "service": service}
)

View File

@@ -7,7 +7,7 @@ import datetime
from typing import List, Optional, Union
import pydantic
import betterproto2
import betterproto
from dateutil import parser
from mvt.common.utils import convert_datetime_to_iso
@@ -124,7 +124,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
"""Parse Android tombstone crash files from a protobuf object."""
tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict(
casing=betterproto2.Casing.SNAKE, include_default_values=True
betterproto.Casing.SNAKE, include_default_values=True
)
# Add some extra metadata

View File

@@ -49,14 +49,9 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
for result in self.results:
self.log.info(
'Found installed accessibility service "%s" (enabled: %s)',
result.get("service"),
result.get("enabled", False),
'Found installed accessibility service "%s"', result.get("service")
)
enabled_count = sum(1 for r in self.results if r.get("enabled"))
self.log.info(
"Identified a total of %d accessibility services, %d enabled",
len(self.results),
enabled_count,
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -1,12 +1,13 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: tombstone.proto
# plugin: python-betterproto2
# plugin: python-betterproto
from dataclasses import dataclass
from typing import Dict, List
import betterproto2
import betterproto
class Architecture(betterproto2.Enum):
class Architecture(betterproto.Enum):
ARM32 = 0
ARM64 = 1
X86 = 2
@@ -15,12 +16,12 @@ class Architecture(betterproto2.Enum):
NONE = 5
class MemoryErrorTool(betterproto2.Enum):
class MemoryErrorTool(betterproto.Enum):
GWP_ASAN = 0
SCUDO = 1
class MemoryErrorType(betterproto2.Enum):
class MemoryErrorType(betterproto.Enum):
UNKNOWN = 0
USE_AFTER_FREE = 1
DOUBLE_FREE = 2
@@ -29,179 +30,179 @@ class MemoryErrorType(betterproto2.Enum):
BUFFER_UNDERFLOW = 5
@dataclass(eq=False, repr=False)
class CrashDetail(betterproto2.Message):
@dataclass
class CrashDetail(betterproto.Message):
"""
NOTE TO OEMS: If you add custom fields to this proto, do not use numbers in
the reserved range.
"""
name: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
data: "bytes" = betterproto2.field(2, betterproto2.TYPE_BYTES)
name: bytes = betterproto.bytes_field(1)
data: bytes = betterproto.bytes_field(2)
@dataclass(eq=False, repr=False)
class StackHistoryBufferEntry(betterproto2.Message):
addr: "BacktraceFrame | None" = betterproto2.field(1, betterproto2.TYPE_MESSAGE, optional=True)
fp: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
tag: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
@dataclass
class StackHistoryBufferEntry(betterproto.Message):
addr: "BacktraceFrame" = betterproto.message_field(1)
fp: int = betterproto.uint64_field(2)
tag: int = betterproto.uint64_field(3)
@dataclass(eq=False, repr=False)
class StackHistoryBuffer(betterproto2.Message):
tid: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
entries: "list[StackHistoryBufferEntry]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
@dataclass
class StackHistoryBuffer(betterproto.Message):
tid: int = betterproto.uint64_field(1)
entries: List["StackHistoryBufferEntry"] = betterproto.message_field(2)
@dataclass(eq=False, repr=False)
class Tombstone(betterproto2.Message):
arch: "Architecture" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
guest_arch: "Architecture" = betterproto2.field(24, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
build_fingerprint: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
revision: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
timestamp: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
pid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT32)
tid: "int" = betterproto2.field(6, betterproto2.TYPE_UINT32)
uid: "int" = betterproto2.field(7, betterproto2.TYPE_UINT32)
selinux_label: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
command_line: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
@dataclass
class Tombstone(betterproto.Message):
arch: "Architecture" = betterproto.enum_field(1)
guest_arch: "Architecture" = betterproto.enum_field(24)
build_fingerprint: str = betterproto.string_field(2)
revision: str = betterproto.string_field(3)
timestamp: str = betterproto.string_field(4)
pid: int = betterproto.uint32_field(5)
tid: int = betterproto.uint32_field(6)
uid: int = betterproto.uint32_field(7)
selinux_label: str = betterproto.string_field(8)
command_line: List[str] = betterproto.string_field(9)
# Process uptime in seconds.
process_uptime: "int" = betterproto2.field(20, betterproto2.TYPE_UINT32)
signal_info: "Signal | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
abort_message: "str" = betterproto2.field(14, betterproto2.TYPE_STRING)
crash_details: "list[CrashDetail]" = betterproto2.field(21, betterproto2.TYPE_MESSAGE, repeated=True)
causes: "list[Cause]" = betterproto2.field(15, betterproto2.TYPE_MESSAGE, repeated=True)
threads: "dict[int, Thread]" = betterproto2.field(
16, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
process_uptime: int = betterproto.uint32_field(20)
signal_info: "Signal" = betterproto.message_field(10)
abort_message: str = betterproto.string_field(14)
crash_details: List["CrashDetail"] = betterproto.message_field(21)
causes: List["Cause"] = betterproto.message_field(15)
threads: Dict[int, "Thread"] = betterproto.map_field(
16, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
)
guest_threads: "dict[int, Thread]" = betterproto2.field(
25, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
guest_threads: Dict[int, "Thread"] = betterproto.map_field(
25, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
)
memory_mappings: "list[MemoryMapping]" = betterproto2.field(17, betterproto2.TYPE_MESSAGE, repeated=True)
log_buffers: "list[LogBuffer]" = betterproto2.field(18, betterproto2.TYPE_MESSAGE, repeated=True)
open_fds: "list[FD]" = betterproto2.field(19, betterproto2.TYPE_MESSAGE, repeated=True)
page_size: "int" = betterproto2.field(22, betterproto2.TYPE_UINT32)
has_been_16kb_mode: "bool" = betterproto2.field(23, betterproto2.TYPE_BOOL)
stack_history_buffer: "StackHistoryBuffer | None" = betterproto2.field(26, betterproto2.TYPE_MESSAGE, optional=True)
memory_mappings: List["MemoryMapping"] = betterproto.message_field(17)
log_buffers: List["LogBuffer"] = betterproto.message_field(18)
open_fds: List["FD"] = betterproto.message_field(19)
page_size: int = betterproto.uint32_field(22)
has_been_16kb_mode: bool = betterproto.bool_field(23)
stack_history_buffer: "StackHistoryBuffer" = betterproto.message_field(26)
@dataclass(eq=False, repr=False)
class Signal(betterproto2.Message):
number: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
code: "int" = betterproto2.field(3, betterproto2.TYPE_INT32)
code_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
has_sender: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
sender_uid: "int" = betterproto2.field(6, betterproto2.TYPE_INT32)
sender_pid: "int" = betterproto2.field(7, betterproto2.TYPE_INT32)
has_fault_address: "bool" = betterproto2.field(8, betterproto2.TYPE_BOOL)
fault_address: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
@dataclass
class Signal(betterproto.Message):
number: int = betterproto.int32_field(1)
name: str = betterproto.string_field(2)
code: int = betterproto.int32_field(3)
code_name: str = betterproto.string_field(4)
has_sender: bool = betterproto.bool_field(5)
sender_uid: int = betterproto.int32_field(6)
sender_pid: int = betterproto.int32_field(7)
has_fault_address: bool = betterproto.bool_field(8)
fault_address: int = betterproto.uint64_field(9)
# Note, may or may not contain the dump of the actual memory contents.
# Currently, on arm64, we only include metadata, and not the contents.
fault_adjacent_metadata: "MemoryDump | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
fault_adjacent_metadata: "MemoryDump" = betterproto.message_field(10)
@dataclass(eq=False, repr=False)
class HeapObject(betterproto2.Message):
address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
size: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
allocation_tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
allocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
deallocation_tid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
deallocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, repeated=True)
@dataclass
class HeapObject(betterproto.Message):
address: int = betterproto.uint64_field(1)
size: int = betterproto.uint64_field(2)
allocation_tid: int = betterproto.uint64_field(3)
allocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
deallocation_tid: int = betterproto.uint64_field(5)
deallocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(6)
@dataclass(eq=False, repr=False)
class MemoryError(betterproto2.Message):
tool: "MemoryErrorTool" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorTool(0))
type: "MemoryErrorType" = betterproto2.field(2, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorType(0))
heap: "HeapObject | None" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, optional=True, group="location")
@dataclass
class MemoryError(betterproto.Message):
tool: "MemoryErrorTool" = betterproto.enum_field(1)
type: "MemoryErrorType" = betterproto.enum_field(2)
heap: "HeapObject" = betterproto.message_field(3, group="location")
@dataclass(eq=False, repr=False)
class Cause(betterproto2.Message):
human_readable: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
memory_error: "MemoryError | None" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, optional=True, group="details")
@dataclass
class Cause(betterproto.Message):
human_readable: str = betterproto.string_field(1)
memory_error: "MemoryError" = betterproto.message_field(2, group="details")
@dataclass(eq=False, repr=False)
class Register(betterproto2.Message):
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
u64: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
@dataclass
class Register(betterproto.Message):
name: str = betterproto.string_field(1)
u64: int = betterproto.uint64_field(2)
@dataclass(eq=False, repr=False)
class Thread(betterproto2.Message):
id: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
registers: "list[Register]" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, repeated=True)
backtrace_note: "list[str]" = betterproto2.field(7, betterproto2.TYPE_STRING, repeated=True)
unreadable_elf_files: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
current_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
memory_dump: "list[MemoryDump]" = betterproto2.field(5, betterproto2.TYPE_MESSAGE, repeated=True)
tagged_addr_ctrl: "int" = betterproto2.field(6, betterproto2.TYPE_INT64)
pac_enabled_keys: "int" = betterproto2.field(8, betterproto2.TYPE_INT64)
@dataclass
class Thread(betterproto.Message):
id: int = betterproto.int32_field(1)
name: str = betterproto.string_field(2)
registers: List["Register"] = betterproto.message_field(3)
backtrace_note: List[str] = betterproto.string_field(7)
unreadable_elf_files: List[str] = betterproto.string_field(9)
current_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
memory_dump: List["MemoryDump"] = betterproto.message_field(5)
tagged_addr_ctrl: int = betterproto.int64_field(6)
pac_enabled_keys: int = betterproto.int64_field(8)
@dataclass(eq=False, repr=False)
class BacktraceFrame(betterproto2.Message):
rel_pc: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
pc: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
sp: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
function_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
function_offset: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
file_name: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
file_map_offset: "int" = betterproto2.field(7, betterproto2.TYPE_UINT64)
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
@dataclass
class BacktraceFrame(betterproto.Message):
rel_pc: int = betterproto.uint64_field(1)
pc: int = betterproto.uint64_field(2)
sp: int = betterproto.uint64_field(3)
function_name: str = betterproto.string_field(4)
function_offset: int = betterproto.uint64_field(5)
file_name: str = betterproto.string_field(6)
file_map_offset: int = betterproto.uint64_field(7)
build_id: str = betterproto.string_field(8)
@dataclass(eq=False, repr=False)
class ArmMTEMetadata(betterproto2.Message):
@dataclass
class ArmMTEMetadata(betterproto.Message):
# One memory tag per granule (e.g. every 16 bytes) of regular memory.
memory_tags: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
memory_tags: bytes = betterproto.bytes_field(1)
@dataclass(eq=False, repr=False)
class MemoryDump(betterproto2.Message):
register_name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
mapping_name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
begin_address: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
memory: "bytes" = betterproto2.field(4, betterproto2.TYPE_BYTES)
arm_mte_metadata: "ArmMTEMetadata | None" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, optional=True, group="metadata")
@dataclass
class MemoryDump(betterproto.Message):
register_name: str = betterproto.string_field(1)
mapping_name: str = betterproto.string_field(2)
begin_address: int = betterproto.uint64_field(3)
memory: bytes = betterproto.bytes_field(4)
arm_mte_metadata: "ArmMTEMetadata" = betterproto.message_field(6, group="metadata")
@dataclass(eq=False, repr=False)
class MemoryMapping(betterproto2.Message):
begin_address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
end_address: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
offset: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
read: "bool" = betterproto2.field(4, betterproto2.TYPE_BOOL)
write: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
execute: "bool" = betterproto2.field(6, betterproto2.TYPE_BOOL)
mapping_name: "str" = betterproto2.field(7, betterproto2.TYPE_STRING)
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
load_bias: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
@dataclass
class MemoryMapping(betterproto.Message):
begin_address: int = betterproto.uint64_field(1)
end_address: int = betterproto.uint64_field(2)
offset: int = betterproto.uint64_field(3)
read: bool = betterproto.bool_field(4)
write: bool = betterproto.bool_field(5)
execute: bool = betterproto.bool_field(6)
mapping_name: str = betterproto.string_field(7)
build_id: str = betterproto.string_field(8)
load_bias: int = betterproto.uint64_field(9)
@dataclass(eq=False, repr=False)
class FD(betterproto2.Message):
fd: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
path: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
owner: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
tag: "int" = betterproto2.field(4, betterproto2.TYPE_UINT64)
@dataclass
class FD(betterproto.Message):
fd: int = betterproto.int32_field(1)
path: str = betterproto.string_field(2)
owner: str = betterproto.string_field(3)
tag: int = betterproto.uint64_field(4)
@dataclass(eq=False, repr=False)
class LogBuffer(betterproto2.Message):
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
logs: "list[LogMessage]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
@dataclass
class LogBuffer(betterproto.Message):
name: str = betterproto.string_field(1)
logs: List["LogMessage"] = betterproto.message_field(2)
@dataclass(eq=False, repr=False)
class LogMessage(betterproto2.Message):
timestamp: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
pid: "int" = betterproto2.field(2, betterproto2.TYPE_UINT32)
tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT32)
priority: "int" = betterproto2.field(4, betterproto2.TYPE_UINT32)
tag: "str" = betterproto2.field(5, betterproto2.TYPE_STRING)
message: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
@dataclass
class LogMessage(betterproto.Message):
timestamp: str = betterproto.string_field(1)
pid: int = betterproto.uint32_field(2)
tid: int = betterproto.uint32_field(3)
priority: int = betterproto.uint32_field(4)
tag: str = betterproto.string_field(5)
message: str = betterproto.string_field(6)

View File

@@ -100,17 +100,6 @@ class Indicators:
key, value = indicator.get("pattern", "").strip("[]").split("=")
key = key.strip()
# Normalize hash algorithm keys so that both the STIX2-spec-compliant
# form (e.g. file:hashes.'SHA-256', which requires quotes around
# algorithm names that contain hyphens) and the non-standard lowercase
# form (e.g. file:hashes.sha256) are accepted. Strip single quotes and
# hyphens from the algorithm name only, then lowercase it.
for sep in ("hashes.", "cert."):
if sep in key:
prefix, _, algo = key.partition(sep)
key = prefix + sep + algo.replace("'", "").replace("-", "").lower()
break
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(

View File

@@ -123,11 +123,6 @@ class SMS(IOSExtraction):
"""
)
items = list(cur)
elif "no such table" in str(exc):
self.log.info(
"No SMS tables found in the database, skipping: %s", exc
)
return
else:
raise exc
names = [description[0] for description in cur.description]

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from base64 import b64encode
from typing import Optional, Union
@@ -80,29 +79,21 @@ class SMSAttachments(IOSExtraction):
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
try:
cur.execute(
"""
SELECT
attachment.ROWID as "attachment_id",
attachment.*,
message.service as "service",
handle.id as "phone_number"
FROM attachment
LEFT JOIN message_attachment_join ON
message_attachment_join.attachment_id = attachment.ROWID
LEFT JOIN message ON
message.ROWID = message_attachment_join.message_id
LEFT JOIN handle ON handle.ROWID = message.handle_id;
cur.execute(
"""
)
except sqlite3.OperationalError as exc:
self.log.info(
"No SMS attachment tables found in the database, skipping: %s", exc
)
cur.close()
conn.close()
return
SELECT
attachment.ROWID as "attachment_id",
attachment.*,
message.service as "service",
handle.id as "phone_number"
FROM attachment
LEFT JOIN message_attachment_join ON
message_attachment_join.attachment_id = attachment.ROWID
LEFT JOIN message ON
message.ROWID = message_attachment_join.message_id
LEFT JOIN handle ON handle.ROWID = message.handle_id;
"""
)
names = [description[0] for description in cur.description]
for item in cur:

View File

@@ -25,9 +25,6 @@ class TestDumpsysAccessibilityArtifact:
da.results[0]["service"]
== "com.android.settings/com.samsung.android.settings.development.gpuwatch.GPUWatchInterceptor"
)
# All services are installed but none enabled in this fixture
for result in da.results:
assert result["enabled"] is False
def test_parsing_v14_aosp_format(self):
da = DumpsysAccessibilityArtifact()
@@ -39,32 +36,7 @@ class TestDumpsysAccessibilityArtifact:
da.parse(data)
assert len(da.results) == 1
assert da.results[0]["package_name"] == "com.malware.accessibility"
assert (
da.results[0]["service"]
== "com.malware.accessibility/com.malware.service.malwareservice"
)
assert da.results[0]["enabled"] is True
def test_parsing_installed_and_enabled(self):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility_enabled.txt")
with open(file) as f:
data = f.read()
assert len(da.results) == 0
da.parse(data)
assert len(da.results) == 5
enabled = [r for r in da.results if r["enabled"]]
assert len(enabled) == 1
assert enabled[0]["package_name"] == "com.samsung.accessibility"
assert (
enabled[0]["service"]
== "com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)"
)
not_enabled = [r for r in da.results if not r["enabled"]]
assert len(not_enabled) == 4
assert da.results[0]["service"] == "com.malware.service.malwareservice"
def test_ioc_check(self, indicator_file):
da = DumpsysAccessibilityArtifact()

View File

@@ -1,16 +0,0 @@
ACCESSIBILITY MANAGER (dumpsys accessibility)
currentUserId=0
User state[
attributes:{id=0, touchExplorationEnabled=false, installedServiceCount=5}
installed services: {
0 : com.google.android.apps.accessibility.voiceaccess/.JustSpeakService (A11yTool)
1 : com.microsoft.appmanager/com.microsoft.mmx.screenmirroringsrc.accessibility.ScreenMirroringAccessibilityService
2 : com.samsung.accessibility/.assistantmenu.serviceframework.AssistantMenuService (A11yTool)
3 : com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)
4 : com.samsung.android.accessibility.talkback/com.samsung.android.marvin.talkback.TalkBackService (A11yTool)
}
Bound services:{}
Enabled services:{{com.samsung.accessibility/.universalswitch.UniversalSwitchService}}
Binding services:{}
Crashed services:{}

View File

@@ -82,7 +82,7 @@ def generate_test_stix_file(file_path):
for h in sha256:
i = Indicator(
indicator_types=["malicious-activity"],
pattern="[file:hashes.'SHA-256'='{}']".format(h),
pattern="[file:hashes.sha256='{}']".format(h),
pattern_type="stix",
)
res.append(i)
@@ -91,7 +91,7 @@ def generate_test_stix_file(file_path):
for h in sha1:
i = Indicator(
indicator_types=["malicious-activity"],
pattern="[file:hashes.'SHA-1'='{}']".format(h),
pattern="[file:hashes.sha1='{}']".format(h),
pattern_type="stix",
)
res.append(i)

View File

@@ -94,78 +94,6 @@ class TestIndicators:
)
assert ind.check_file_hash("da0611a300a9ce9aa7a09d1212f203fca5856794")
def test_parse_stix2_hash_key_variants(self, tmp_path):
"""STIX2 spec requires single-quoted algorithm names that contain hyphens,
e.g. file:hashes.'SHA-256'. Verify MVT accepts both spec-compliant and
non-standard lowercase spellings for MD5, SHA-1 and SHA-256."""
import json
sha256_hash = "570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6"
sha1_hash = "da0611a300a9ce9aa7a09d1212f203fca5856794"
md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
variants = [
# (pattern_key, expected_bucket)
("file:hashes.'SHA-256'", "files_sha256"),
("file:hashes.SHA-256", "files_sha256"),
("file:hashes.SHA256", "files_sha256"),
("file:hashes.sha256", "files_sha256"),
("file:hashes.'SHA-1'", "files_sha1"),
("file:hashes.SHA-1", "files_sha1"),
("file:hashes.SHA1", "files_sha1"),
("file:hashes.sha1", "files_sha1"),
("file:hashes.MD5", "files_md5"),
("file:hashes.'MD5'", "files_md5"),
("file:hashes.md5", "files_md5"),
]
hash_for = {
"files_sha256": sha256_hash,
"files_sha1": sha1_hash,
"files_md5": md5_hash,
}
for pattern_key, bucket in variants:
h = hash_for[bucket]
stix = {
"type": "bundle",
"id": "bundle--test",
"objects": [
{
"type": "malware",
"id": "malware--test",
"name": "TestMalware",
"is_family": False,
},
{
"type": "indicator",
"id": "indicator--test",
"indicator_types": ["malicious-activity"],
"pattern": f"[{pattern_key}='{h}']",
"pattern_type": "stix",
"valid_from": "2024-01-01T00:00:00Z",
},
{
"type": "relationship",
"id": "relationship--test",
"relationship_type": "indicates",
"source_ref": "indicator--test",
"target_ref": "malware--test",
},
],
}
stix_file = tmp_path / "test.stix2"
stix_file.write_text(json.dumps(stix))
ind = Indicators(log=logging)
ind.load_indicators_files([str(stix_file)], load_default=False)
assert len(ind.ioc_collections[0][bucket]) == 1, (
f"Pattern key '{pattern_key}' was not parsed into '{bucket}'"
)
assert ind.check_file_hash(h) is not None, (
f"check_file_hash failed for pattern key '{pattern_key}'"
)
def test_check_android_property(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)