mirror of
https://github.com/mvt-project/mvt.git
synced 2026-04-19 02:06:42 +02:00
Compare commits
5 Commits
fix/better
...
besendorf-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e2a80ba81 | ||
|
|
4edab3c4f8 | ||
|
|
d754f58c1a | ||
|
|
6c537c624e | ||
|
|
fd31f31aae |
2
.github/workflows/add-issue-to-project.yml
vendored
2
.github/workflows/add-issue-to-project.yml
vendored
@@ -11,7 +11,7 @@ jobs:
|
||||
name: Add issue to project
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/add-to-project@v1
|
||||
- uses: actions/add-to-project@v0.5.0
|
||||
with:
|
||||
# You can target a project in a different organization
|
||||
# to the issue
|
||||
|
||||
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ test-requirements:
|
||||
generate-proto-parsers:
|
||||
# Generate python parsers for protobuf files
|
||||
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
|
||||
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
|
||||
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto2_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
|
||||
|
||||
clean:
|
||||
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
|
||||
# Mobile Verification Toolkit
|
||||
|
||||
> [!IMPORTANT]
|
||||
> Soon we will merge the v3 pull request which will result in breaking changes. If you rely on mvt output in other script make sure to the the branch before we merge. More details: https://github.com/mvt-project/mvt/issues/757
|
||||
|
||||
[](https://pypi.org/project/mvt/)
|
||||
[](https://docs.mvt.re/en/latest/?badge=latest)
|
||||
[](https://github.com/mvt-project/mvt/actions/workflows/tests.yml)
|
||||
|
||||
@@ -57,7 +57,7 @@ dev = [
|
||||
"stix2>=3.0.1",
|
||||
"ruff>=0.1.6",
|
||||
"mypy>=1.7.1",
|
||||
"betterproto[compiler]",
|
||||
"betterproto2-compiler",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
|
||||
@@ -7,7 +7,7 @@ import datetime
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import pydantic
|
||||
import betterproto
|
||||
import betterproto2
|
||||
from dateutil import parser
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
@@ -124,7 +124,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
"""Parse Android tombstone crash files from a protobuf object."""
|
||||
tombstone_pb = Tombstone().parse(data)
|
||||
tombstone_dict = tombstone_pb.to_dict(
|
||||
betterproto.Casing.SNAKE, include_default_values=True
|
||||
casing=betterproto2.Casing.SNAKE, include_default_values=True
|
||||
)
|
||||
|
||||
# Add some extra metadata
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# sources: tombstone.proto
|
||||
# plugin: python-betterproto
|
||||
# plugin: python-betterproto2
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List
|
||||
|
||||
import betterproto
|
||||
import betterproto2
|
||||
|
||||
|
||||
class Architecture(betterproto.Enum):
|
||||
class Architecture(betterproto2.Enum):
|
||||
ARM32 = 0
|
||||
ARM64 = 1
|
||||
X86 = 2
|
||||
@@ -16,12 +15,12 @@ class Architecture(betterproto.Enum):
|
||||
NONE = 5
|
||||
|
||||
|
||||
class MemoryErrorTool(betterproto.Enum):
|
||||
class MemoryErrorTool(betterproto2.Enum):
|
||||
GWP_ASAN = 0
|
||||
SCUDO = 1
|
||||
|
||||
|
||||
class MemoryErrorType(betterproto.Enum):
|
||||
class MemoryErrorType(betterproto2.Enum):
|
||||
UNKNOWN = 0
|
||||
USE_AFTER_FREE = 1
|
||||
DOUBLE_FREE = 2
|
||||
@@ -30,179 +29,179 @@ class MemoryErrorType(betterproto.Enum):
|
||||
BUFFER_UNDERFLOW = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrashDetail(betterproto.Message):
|
||||
@dataclass(eq=False, repr=False)
|
||||
class CrashDetail(betterproto2.Message):
|
||||
"""
|
||||
NOTE TO OEMS: If you add custom fields to this proto, do not use numbers in
|
||||
the reserved range.
|
||||
"""
|
||||
|
||||
name: bytes = betterproto.bytes_field(1)
|
||||
data: bytes = betterproto.bytes_field(2)
|
||||
name: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
|
||||
data: "bytes" = betterproto2.field(2, betterproto2.TYPE_BYTES)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBufferEntry(betterproto.Message):
|
||||
addr: "BacktraceFrame" = betterproto.message_field(1)
|
||||
fp: int = betterproto.uint64_field(2)
|
||||
tag: int = betterproto.uint64_field(3)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class StackHistoryBufferEntry(betterproto2.Message):
|
||||
addr: "BacktraceFrame | None" = betterproto2.field(1, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
fp: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
tag: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBuffer(betterproto.Message):
|
||||
tid: int = betterproto.uint64_field(1)
|
||||
entries: List["StackHistoryBufferEntry"] = betterproto.message_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class StackHistoryBuffer(betterproto2.Message):
|
||||
tid: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
entries: "list[StackHistoryBufferEntry]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tombstone(betterproto.Message):
|
||||
arch: "Architecture" = betterproto.enum_field(1)
|
||||
guest_arch: "Architecture" = betterproto.enum_field(24)
|
||||
build_fingerprint: str = betterproto.string_field(2)
|
||||
revision: str = betterproto.string_field(3)
|
||||
timestamp: str = betterproto.string_field(4)
|
||||
pid: int = betterproto.uint32_field(5)
|
||||
tid: int = betterproto.uint32_field(6)
|
||||
uid: int = betterproto.uint32_field(7)
|
||||
selinux_label: str = betterproto.string_field(8)
|
||||
command_line: List[str] = betterproto.string_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Tombstone(betterproto2.Message):
|
||||
arch: "Architecture" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
|
||||
guest_arch: "Architecture" = betterproto2.field(24, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
|
||||
build_fingerprint: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
revision: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
|
||||
timestamp: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
pid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT32)
|
||||
tid: "int" = betterproto2.field(6, betterproto2.TYPE_UINT32)
|
||||
uid: "int" = betterproto2.field(7, betterproto2.TYPE_UINT32)
|
||||
selinux_label: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
command_line: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
|
||||
# Process uptime in seconds.
|
||||
process_uptime: int = betterproto.uint32_field(20)
|
||||
signal_info: "Signal" = betterproto.message_field(10)
|
||||
abort_message: str = betterproto.string_field(14)
|
||||
crash_details: List["CrashDetail"] = betterproto.message_field(21)
|
||||
causes: List["Cause"] = betterproto.message_field(15)
|
||||
threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
16, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
process_uptime: "int" = betterproto2.field(20, betterproto2.TYPE_UINT32)
|
||||
signal_info: "Signal | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
abort_message: "str" = betterproto2.field(14, betterproto2.TYPE_STRING)
|
||||
crash_details: "list[CrashDetail]" = betterproto2.field(21, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
causes: "list[Cause]" = betterproto2.field(15, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
threads: "dict[int, Thread]" = betterproto2.field(
|
||||
16, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
|
||||
)
|
||||
guest_threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
25, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
guest_threads: "dict[int, Thread]" = betterproto2.field(
|
||||
25, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
|
||||
)
|
||||
memory_mappings: List["MemoryMapping"] = betterproto.message_field(17)
|
||||
log_buffers: List["LogBuffer"] = betterproto.message_field(18)
|
||||
open_fds: List["FD"] = betterproto.message_field(19)
|
||||
page_size: int = betterproto.uint32_field(22)
|
||||
has_been_16kb_mode: bool = betterproto.bool_field(23)
|
||||
stack_history_buffer: "StackHistoryBuffer" = betterproto.message_field(26)
|
||||
memory_mappings: "list[MemoryMapping]" = betterproto2.field(17, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
log_buffers: "list[LogBuffer]" = betterproto2.field(18, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
open_fds: "list[FD]" = betterproto2.field(19, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
page_size: "int" = betterproto2.field(22, betterproto2.TYPE_UINT32)
|
||||
has_been_16kb_mode: "bool" = betterproto2.field(23, betterproto2.TYPE_BOOL)
|
||||
stack_history_buffer: "StackHistoryBuffer | None" = betterproto2.field(26, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Signal(betterproto.Message):
|
||||
number: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
code: int = betterproto.int32_field(3)
|
||||
code_name: str = betterproto.string_field(4)
|
||||
has_sender: bool = betterproto.bool_field(5)
|
||||
sender_uid: int = betterproto.int32_field(6)
|
||||
sender_pid: int = betterproto.int32_field(7)
|
||||
has_fault_address: bool = betterproto.bool_field(8)
|
||||
fault_address: int = betterproto.uint64_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Signal(betterproto2.Message):
|
||||
number: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
code: "int" = betterproto2.field(3, betterproto2.TYPE_INT32)
|
||||
code_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
has_sender: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
|
||||
sender_uid: "int" = betterproto2.field(6, betterproto2.TYPE_INT32)
|
||||
sender_pid: "int" = betterproto2.field(7, betterproto2.TYPE_INT32)
|
||||
has_fault_address: "bool" = betterproto2.field(8, betterproto2.TYPE_BOOL)
|
||||
fault_address: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
|
||||
# Note, may or may not contain the dump of the actual memory contents.
|
||||
# Currently, on arm64, we only include metadata, and not the contents.
|
||||
fault_adjacent_metadata: "MemoryDump" = betterproto.message_field(10)
|
||||
fault_adjacent_metadata: "MemoryDump | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeapObject(betterproto.Message):
|
||||
address: int = betterproto.uint64_field(1)
|
||||
size: int = betterproto.uint64_field(2)
|
||||
allocation_tid: int = betterproto.uint64_field(3)
|
||||
allocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
deallocation_tid: int = betterproto.uint64_field(5)
|
||||
deallocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(6)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class HeapObject(betterproto2.Message):
|
||||
address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
size: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
allocation_tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
allocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
deallocation_tid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
|
||||
deallocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryError(betterproto.Message):
|
||||
tool: "MemoryErrorTool" = betterproto.enum_field(1)
|
||||
type: "MemoryErrorType" = betterproto.enum_field(2)
|
||||
heap: "HeapObject" = betterproto.message_field(3, group="location")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryError(betterproto2.Message):
|
||||
tool: "MemoryErrorTool" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorTool(0))
|
||||
type: "MemoryErrorType" = betterproto2.field(2, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorType(0))
|
||||
heap: "HeapObject | None" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, optional=True, group="location")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Cause(betterproto.Message):
|
||||
human_readable: str = betterproto.string_field(1)
|
||||
memory_error: "MemoryError" = betterproto.message_field(2, group="details")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Cause(betterproto2.Message):
|
||||
human_readable: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
memory_error: "MemoryError | None" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, optional=True, group="details")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Register(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
u64: int = betterproto.uint64_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Register(betterproto2.Message):
|
||||
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
u64: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thread(betterproto.Message):
|
||||
id: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
registers: List["Register"] = betterproto.message_field(3)
|
||||
backtrace_note: List[str] = betterproto.string_field(7)
|
||||
unreadable_elf_files: List[str] = betterproto.string_field(9)
|
||||
current_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
memory_dump: List["MemoryDump"] = betterproto.message_field(5)
|
||||
tagged_addr_ctrl: int = betterproto.int64_field(6)
|
||||
pac_enabled_keys: int = betterproto.int64_field(8)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Thread(betterproto2.Message):
|
||||
id: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
registers: "list[Register]" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
backtrace_note: "list[str]" = betterproto2.field(7, betterproto2.TYPE_STRING, repeated=True)
|
||||
unreadable_elf_files: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
|
||||
current_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
memory_dump: "list[MemoryDump]" = betterproto2.field(5, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
tagged_addr_ctrl: "int" = betterproto2.field(6, betterproto2.TYPE_INT64)
|
||||
pac_enabled_keys: "int" = betterproto2.field(8, betterproto2.TYPE_INT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BacktraceFrame(betterproto.Message):
|
||||
rel_pc: int = betterproto.uint64_field(1)
|
||||
pc: int = betterproto.uint64_field(2)
|
||||
sp: int = betterproto.uint64_field(3)
|
||||
function_name: str = betterproto.string_field(4)
|
||||
function_offset: int = betterproto.uint64_field(5)
|
||||
file_name: str = betterproto.string_field(6)
|
||||
file_map_offset: int = betterproto.uint64_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class BacktraceFrame(betterproto2.Message):
|
||||
rel_pc: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
pc: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
sp: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
function_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
function_offset: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
|
||||
file_name: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
|
||||
file_map_offset: "int" = betterproto2.field(7, betterproto2.TYPE_UINT64)
|
||||
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArmMTEMetadata(betterproto.Message):
|
||||
@dataclass(eq=False, repr=False)
|
||||
class ArmMTEMetadata(betterproto2.Message):
|
||||
# One memory tag per granule (e.g. every 16 bytes) of regular memory.
|
||||
memory_tags: bytes = betterproto.bytes_field(1)
|
||||
memory_tags: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryDump(betterproto.Message):
|
||||
register_name: str = betterproto.string_field(1)
|
||||
mapping_name: str = betterproto.string_field(2)
|
||||
begin_address: int = betterproto.uint64_field(3)
|
||||
memory: bytes = betterproto.bytes_field(4)
|
||||
arm_mte_metadata: "ArmMTEMetadata" = betterproto.message_field(6, group="metadata")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryDump(betterproto2.Message):
|
||||
register_name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
mapping_name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
begin_address: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
memory: "bytes" = betterproto2.field(4, betterproto2.TYPE_BYTES)
|
||||
arm_mte_metadata: "ArmMTEMetadata | None" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, optional=True, group="metadata")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryMapping(betterproto.Message):
|
||||
begin_address: int = betterproto.uint64_field(1)
|
||||
end_address: int = betterproto.uint64_field(2)
|
||||
offset: int = betterproto.uint64_field(3)
|
||||
read: bool = betterproto.bool_field(4)
|
||||
write: bool = betterproto.bool_field(5)
|
||||
execute: bool = betterproto.bool_field(6)
|
||||
mapping_name: str = betterproto.string_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
load_bias: int = betterproto.uint64_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryMapping(betterproto2.Message):
|
||||
begin_address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
end_address: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
offset: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
read: "bool" = betterproto2.field(4, betterproto2.TYPE_BOOL)
|
||||
write: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
|
||||
execute: "bool" = betterproto2.field(6, betterproto2.TYPE_BOOL)
|
||||
mapping_name: "str" = betterproto2.field(7, betterproto2.TYPE_STRING)
|
||||
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
load_bias: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FD(betterproto.Message):
|
||||
fd: int = betterproto.int32_field(1)
|
||||
path: str = betterproto.string_field(2)
|
||||
owner: str = betterproto.string_field(3)
|
||||
tag: int = betterproto.uint64_field(4)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class FD(betterproto2.Message):
|
||||
fd: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
path: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
owner: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
|
||||
tag: "int" = betterproto2.field(4, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogBuffer(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
logs: List["LogMessage"] = betterproto.message_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class LogBuffer(betterproto2.Message):
|
||||
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
logs: "list[LogMessage]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogMessage(betterproto.Message):
|
||||
timestamp: str = betterproto.string_field(1)
|
||||
pid: int = betterproto.uint32_field(2)
|
||||
tid: int = betterproto.uint32_field(3)
|
||||
priority: int = betterproto.uint32_field(4)
|
||||
tag: str = betterproto.string_field(5)
|
||||
message: str = betterproto.string_field(6)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class LogMessage(betterproto2.Message):
|
||||
timestamp: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
pid: "int" = betterproto2.field(2, betterproto2.TYPE_UINT32)
|
||||
tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT32)
|
||||
priority: "int" = betterproto2.field(4, betterproto2.TYPE_UINT32)
|
||||
tag: "str" = betterproto2.field(5, betterproto2.TYPE_STRING)
|
||||
message: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
|
||||
|
||||
@@ -100,6 +100,17 @@ class Indicators:
|
||||
key, value = indicator.get("pattern", "").strip("[]").split("=")
|
||||
key = key.strip()
|
||||
|
||||
# Normalize hash algorithm keys so that both the STIX2-spec-compliant
|
||||
# form (e.g. file:hashes.'SHA-256', which requires quotes around
|
||||
# algorithm names that contain hyphens) and the non-standard lowercase
|
||||
# form (e.g. file:hashes.sha256) are accepted. Strip single quotes and
|
||||
# hyphens from the algorithm name only, then lowercase it.
|
||||
for sep in ("hashes.", "cert."):
|
||||
if sep in key:
|
||||
prefix, _, algo = key.partition(sep)
|
||||
key = prefix + sep + algo.replace("'", "").replace("-", "").lower()
|
||||
break
|
||||
|
||||
if key == "domain-name:value":
|
||||
# We force domain names to lower case.
|
||||
self._add_indicator(
|
||||
|
||||
@@ -123,6 +123,11 @@ class SMS(IOSExtraction):
|
||||
"""
|
||||
)
|
||||
items = list(cur)
|
||||
elif "no such table" in str(exc):
|
||||
self.log.info(
|
||||
"No SMS tables found in the database, skipping: %s", exc
|
||||
)
|
||||
return
|
||||
else:
|
||||
raise exc
|
||||
names = [description[0] for description in cur.description]
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from base64 import b64encode
|
||||
from typing import Optional, Union
|
||||
|
||||
@@ -79,21 +80,29 @@ class SMSAttachments(IOSExtraction):
|
||||
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
attachment.ROWID as "attachment_id",
|
||||
attachment.*,
|
||||
message.service as "service",
|
||||
handle.id as "phone_number"
|
||||
FROM attachment
|
||||
LEFT JOIN message_attachment_join ON
|
||||
message_attachment_join.attachment_id = attachment.ROWID
|
||||
LEFT JOIN message ON
|
||||
message.ROWID = message_attachment_join.message_id
|
||||
LEFT JOIN handle ON handle.ROWID = message.handle_id;
|
||||
"""
|
||||
SELECT
|
||||
attachment.ROWID as "attachment_id",
|
||||
attachment.*,
|
||||
message.service as "service",
|
||||
handle.id as "phone_number"
|
||||
FROM attachment
|
||||
LEFT JOIN message_attachment_join ON
|
||||
message_attachment_join.attachment_id = attachment.ROWID
|
||||
LEFT JOIN message ON
|
||||
message.ROWID = message_attachment_join.message_id
|
||||
LEFT JOIN handle ON handle.ROWID = message.handle_id;
|
||||
"""
|
||||
)
|
||||
)
|
||||
except sqlite3.OperationalError as exc:
|
||||
self.log.info(
|
||||
"No SMS attachment tables found in the database, skipping: %s", exc
|
||||
)
|
||||
cur.close()
|
||||
conn.close()
|
||||
return
|
||||
names = [description[0] for description in cur.description]
|
||||
|
||||
for item in cur:
|
||||
|
||||
@@ -82,7 +82,7 @@ def generate_test_stix_file(file_path):
|
||||
for h in sha256:
|
||||
i = Indicator(
|
||||
indicator_types=["malicious-activity"],
|
||||
pattern="[file:hashes.sha256='{}']".format(h),
|
||||
pattern="[file:hashes.'SHA-256'='{}']".format(h),
|
||||
pattern_type="stix",
|
||||
)
|
||||
res.append(i)
|
||||
@@ -91,7 +91,7 @@ def generate_test_stix_file(file_path):
|
||||
for h in sha1:
|
||||
i = Indicator(
|
||||
indicator_types=["malicious-activity"],
|
||||
pattern="[file:hashes.sha1='{}']".format(h),
|
||||
pattern="[file:hashes.'SHA-1'='{}']".format(h),
|
||||
pattern_type="stix",
|
||||
)
|
||||
res.append(i)
|
||||
|
||||
@@ -94,6 +94,78 @@ class TestIndicators:
|
||||
)
|
||||
assert ind.check_file_hash("da0611a300a9ce9aa7a09d1212f203fca5856794")
|
||||
|
||||
def test_parse_stix2_hash_key_variants(self, tmp_path):
|
||||
"""STIX2 spec requires single-quoted algorithm names that contain hyphens,
|
||||
e.g. file:hashes.'SHA-256'. Verify MVT accepts both spec-compliant and
|
||||
non-standard lowercase spellings for MD5, SHA-1 and SHA-256."""
|
||||
import json
|
||||
|
||||
sha256_hash = "570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6"
|
||||
sha1_hash = "da0611a300a9ce9aa7a09d1212f203fca5856794"
|
||||
md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
|
||||
|
||||
variants = [
|
||||
# (pattern_key, expected_bucket)
|
||||
("file:hashes.'SHA-256'", "files_sha256"),
|
||||
("file:hashes.SHA-256", "files_sha256"),
|
||||
("file:hashes.SHA256", "files_sha256"),
|
||||
("file:hashes.sha256", "files_sha256"),
|
||||
("file:hashes.'SHA-1'", "files_sha1"),
|
||||
("file:hashes.SHA-1", "files_sha1"),
|
||||
("file:hashes.SHA1", "files_sha1"),
|
||||
("file:hashes.sha1", "files_sha1"),
|
||||
("file:hashes.MD5", "files_md5"),
|
||||
("file:hashes.'MD5'", "files_md5"),
|
||||
("file:hashes.md5", "files_md5"),
|
||||
]
|
||||
|
||||
hash_for = {
|
||||
"files_sha256": sha256_hash,
|
||||
"files_sha1": sha1_hash,
|
||||
"files_md5": md5_hash,
|
||||
}
|
||||
|
||||
for pattern_key, bucket in variants:
|
||||
h = hash_for[bucket]
|
||||
stix = {
|
||||
"type": "bundle",
|
||||
"id": "bundle--test",
|
||||
"objects": [
|
||||
{
|
||||
"type": "malware",
|
||||
"id": "malware--test",
|
||||
"name": "TestMalware",
|
||||
"is_family": False,
|
||||
},
|
||||
{
|
||||
"type": "indicator",
|
||||
"id": "indicator--test",
|
||||
"indicator_types": ["malicious-activity"],
|
||||
"pattern": f"[{pattern_key}='{h}']",
|
||||
"pattern_type": "stix",
|
||||
"valid_from": "2024-01-01T00:00:00Z",
|
||||
},
|
||||
{
|
||||
"type": "relationship",
|
||||
"id": "relationship--test",
|
||||
"relationship_type": "indicates",
|
||||
"source_ref": "indicator--test",
|
||||
"target_ref": "malware--test",
|
||||
},
|
||||
],
|
||||
}
|
||||
stix_file = tmp_path / "test.stix2"
|
||||
stix_file.write_text(json.dumps(stix))
|
||||
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([str(stix_file)], load_default=False)
|
||||
assert len(ind.ioc_collections[0][bucket]) == 1, (
|
||||
f"Pattern key '{pattern_key}' was not parsed into '{bucket}'"
|
||||
)
|
||||
assert ind.check_file_hash(h) is not None, (
|
||||
f"check_file_hash failed for pattern key '{pattern_key}'"
|
||||
)
|
||||
|
||||
def test_check_android_property(self, indicator_file):
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([indicator_file], load_default=False)
|
||||
|
||||
Reference in New Issue
Block a user