From b7595b62eb9408b50d19d9386c0a38751cab4646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Thu, 6 Feb 2025 20:06:57 +0100 Subject: [PATCH] Add initial tombstone parser This supports parsing tombstone files from Android bugreports. The parser can load both the legacy text format and the new binary protobuf format. --- .../android/artifacts/tombstone_crashes.py | 260 +++++++++++++++++- src/mvt/android/modules/bugreport/__init__.py | 2 + src/mvt/android/modules/bugreport/base.py | 10 +- .../android/modules/bugreport/tombstones.py | 31 ++- tests/android/test_artifact_tombstones.py | 77 ++++-- ...ombstone_process => tombstone_process.txt} | 0 6 files changed, 338 insertions(+), 42 deletions(-) rename tests/artifacts/android_data/{tombstone_process => tombstone_process.txt} (100%) diff --git a/src/mvt/android/artifacts/tombstone_crashes.py b/src/mvt/android/artifacts/tombstone_crashes.py index f9a5571..62c329a 100644 --- a/src/mvt/android/artifacts/tombstone_crashes.py +++ b/src/mvt/android/artifacts/tombstone_crashes.py @@ -3,11 +3,267 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import datetime +from typing import List, Optional, Union +import pydantic +import betterproto + +from mvt.common.utils import convert_datetime_to_iso +from mvt.android.parsers.proto.tombstone import Tombstone from .artifact import AndroidArtifact +TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***" + +# Map the legacy crash file keys to the new format. +TOMBSTONE_TEXT_KEY_MAPPINGS = { + "Build fingerprint": "build_fingerprint", + "Revision": "revision", + "ABI": "arch", + "Timestamp": "timestamp", + "Process uptime": "process_uptime", + "Cmdline": "command_line", + "pid": "pid", + "tid": "tid", + "name": "process_name", + "binary_path": "binary_path", + "uid": "uid", + "signal": "signal_info", + "code": "code", + "Cause": "cause", +} + + +class SignalInfo(pydantic.BaseModel): + code: int + code_name: str + name: str + number: Optional[int] = None + + +class TombstoneCrashResult(pydantic.BaseModel): + """ + MVT Result model for a tombstone crash result. + + Needed for validation and serialization, and consistency between text and protobuf tombstones. + """ + + file_name: str + file_timestamp: str # We store the timestamp as a string to avoid timezone issues + build_fingerprint: str + revision: int + arch: Optional[str] = None + timestamp: str # We store the timestamp as a string to avoid timezone issues + process_uptime: Optional[int] = None + command_line: Optional[List[str]] = None + pid: int + tid: int + process_name: Optional[str] = None + binary_path: Optional[str] = None + selinux_label: Optional[str] = None + uid: Optional[int] = None + signal_info: SignalInfo + cause: Optional[str] = None + extra: Optional[str] = None + + class TombstoneCrashArtifact(AndroidArtifact): - def parse(self, content: bytes) -> None: + """ " + Parser for Android tombstone crash files. + + This parser can parse both text and protobuf tombstone crash files. + """ + + def serialize(self, record: dict) -> Union[dict, list]: + return { + "timestamp": record["timestamp"], + "module": self.__class__.__name__, + "event": "Tombstone", + "data": ( + f"Crash in '{record['process_name']}' process running as UID '{record['uid']}' at " + f"{record['timestamp']}. Crash type '{record['signal_info']['name']}' with code '{record['signal_info']['code_name']}'" + ), + } + + def check_indicators(self) -> None: + if not self.indicators: + return + + for result in self.results: + ioc = self.indicators.check_process(result["process_name"]) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + if result.get("command_line", []): + command_name = result.get("command_line")[0].split("/")[-1] + ioc = self.indicators.check_process(command_name) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + SUSPICIOUS_UIDS = [ + 0, # root + 1000, # system + 2000, # shell + ] + if result["uid"] in SUSPICIOUS_UIDS: + self.log.warning( + f"Potentially suspicious crash in process '{result['process_name']}' " + f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}" + ) + self.detected.append(result) + + def parse_protobuf( + self, file_name: str, file_timestamp: datetime.datetime, data: bytes + ) -> None: """ - Parse Android tombstone crash files.""" + Parse Android tombstone crash files from a protobuf object. + """ + tombstone_pb = Tombstone().parse(data) + tombstone_dict = tombstone_pb.to_dict(betterproto.Casing.SNAKE) + + # Add some extra metadata + tombstone_dict["timestamp"] = self._parse_timestamp_string( + tombstone_pb.timestamp + ) + tombstone_dict["file_name"] = file_name + tombstone_dict["file_timestamp"] = convert_datetime_to_iso(file_timestamp) + tombstone_dict["process_name"] = self._proccess_name_from_thread(tombstone_dict) + + # Confirm the tombstone is valid, and matches the output model + tombstone = TombstoneCrashResult.model_validate(tombstone_dict) + self.results.append(tombstone.model_dump()) + + def parse( + self, file_name: str, file_timestamp: datetime.datetime, content: bytes + ) -> None: + """ + Parse text Android tombstone crash files. + """ + + # Split the tombstone file into a dictonary + tombstone_dict = { + "file_name": file_name, + "file_timestamp": convert_datetime_to_iso(file_timestamp), + } + lines = content.decode("utf-8").splitlines() + for line in lines: + if not line.strip() or TOMBSTONE_DELIMITER in line: + continue + for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items(): + self._parse_tombstone_line(line, key, destination_key, tombstone_dict) + + # Validate the tombstone and add it to the results + tombstone = TombstoneCrashResult.model_validate(tombstone_dict) + self.results.append(tombstone.model_dump()) + + def _parse_tombstone_line( + self, line: str, key: str, destination_key: str, tombstone: dict + ) -> bool: + if not line.startswith(f"{key}"): + return None + + if key == "pid": + return self._load_pid_line(line, tombstone) + elif key == "signal": + return self._load_signal_line(line, tombstone) + elif key == "Timestamp": + return self._load_timestamp_line(line, tombstone) + else: + return self._load_key_value_line(line, key, destination_key, tombstone) + + def _load_key_value_line( + self, line: str, key: str, destination_key: str, tombstone: dict + ) -> bool: + line_key, value = line.split(":", 1) + if line_key != key: + raise ValueError(f"Expected key {key}, got {line_key}") + + value_clean = value.strip().strip("'") + if destination_key in ["uid", "revision"]: + tombstone[destination_key] = int(value_clean) + elif destination_key == "process_uptime": + # eg. "Process uptime: 40s" + tombstone[destination_key] = int(value_clean.rstrip("s")) + elif destination_key == "command_line": + # XXX: Check if command line should be a single string in a list, or a list of strings. + tombstone[destination_key] = [value_clean] + else: + tombstone[destination_key] = value_clean + return True + + def _load_pid_line(self, line: str, tombstone: dict) -> bool: + pid_part, tid_part, name_part = [part.strip() for part in line.split(",")] + + pid_key, pid_value = pid_part.split(":", 1) + if pid_key != "pid": + raise ValueError(f"Expected key pid, got {pid_key}") + pid_value = int(pid_value.strip()) + + tid_key, tid_value = tid_part.split(":", 1) + if tid_key != "tid": + raise ValueError(f"Expected key tid, got {tid_key}") + tid_value = int(tid_value.strip()) + + name_key, name_value = name_part.split(":", 1) + if name_key != "name": + raise ValueError(f"Expected key name, got {name_key}") + name_value = name_value.strip() + process_name, binary_path = self._parse_process_name(name_value, tombstone) + + tombstone["pid"] = pid_value + tombstone["tid"] = tid_value + tombstone["process_name"] = process_name + tombstone["binary_path"] = binary_path + return True + + def _parse_process_name(self, process_name_part, tombstone: dict) -> bool: + process_name, process_path = process_name_part.split(">>>") + process_name = process_name.strip() + binary_path = process_path.strip().split(" ")[0] + return process_name, binary_path + + def _load_signal_line(self, line: str, tombstone: dict) -> bool: + signal, code, _ = [part.strip() for part in line.split(",", 2)] + signal = signal.split("signal ")[1] + signal_code, signal_name = signal.split(" ") + signal_name = signal_name.strip("()") + + code_part = code.split("code ")[1] + code_number, code_name = code_part.split(" ") + code_name = code_name.strip("()") + + tombstone["signal_info"] = { + "code": int(code_number), + "code_name": code_name, + "name": signal_name, + "number": int(signal_code), + } + return True + + def _load_timestamp_line(self, line: str, tombstone: dict) -> bool: + timestamp = line.split(":", 1)[1].strip() + tombstone["timestamp"] = self._parse_timestamp_string(timestamp) + return True + + @staticmethod + def _parse_timestamp_string(timestamp: str) -> str: + timestamp_date, timezone = timestamp.split("+") + # Truncate microseconds before parsing + timestamp_without_micro = timestamp_date.split(".")[0] + "+" + timezone + timestamp_parsed = datetime.datetime.strptime( + timestamp_without_micro, "%Y-%m-%d %H:%M:%S%z" + ) + return convert_datetime_to_iso(timestamp_parsed) + + @staticmethod + def _proccess_name_from_thread(tombstone_dict: dict) -> str: + if tombstone_dict.get("threads"): + for thread in tombstone_dict["threads"].values(): + if thread.get("id") == tombstone_dict["tid"] and thread.get("name"): + return thread["name"] + return "Unknown" diff --git a/src/mvt/android/modules/bugreport/__init__.py b/src/mvt/android/modules/bugreport/__init__.py index 73dd852..6945629 100644 --- a/src/mvt/android/modules/bugreport/__init__.py +++ b/src/mvt/android/modules/bugreport/__init__.py @@ -14,6 +14,7 @@ from .packages import Packages from .platform_compat import PlatformCompat from .receivers import Receivers from .adb_state import DumpsysADBState +from .tombstones import Tombstones BUGREPORT_MODULES = [ Accessibility, @@ -27,4 +28,5 @@ BUGREPORT_MODULES = [ PlatformCompat, Receivers, DumpsysADBState, + Tombstones, ] diff --git a/src/mvt/android/modules/bugreport/base.py b/src/mvt/android/modules/bugreport/base.py index d434116..bf98dca 100644 --- a/src/mvt/android/modules/bugreport/base.py +++ b/src/mvt/android/modules/bugreport/base.py @@ -2,7 +2,7 @@ # Copyright (c) 2021-2023 The MVT Authors. # See the file 'LICENSE' for usage and copying permissions, or find a copy at # https://github.com/mvt-project/mvt/blob/main/LICENSE - +import datetime import fnmatch import logging import os @@ -91,3 +91,11 @@ class BugReportModule(MVTModule): return None return self._get_file_content(dumpstate_logs[0]) + + def _get_file_modification_time(self, file_path: str) -> dict: + if self.zip_archive: + file_timetuple = self.zip_archive.getinfo(file_path).date_time + return datetime.datetime(*file_timetuple) + else: + file_stat = os.stat(os.path.join(self.extract_path, file_path)) + return datetime.datetime.fromtimestamp(file_stat.st_mtime) diff --git a/src/mvt/android/modules/bugreport/tombstones.py b/src/mvt/android/modules/bugreport/tombstones.py index 250c472..6447e61 100644 --- a/src/mvt/android/modules/bugreport/tombstones.py +++ b/src/mvt/android/modules/bugreport/tombstones.py @@ -42,18 +42,23 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule): ) return - for tombstone_file in tombstone_files: - if tombstone_file.endswith("*.pb"): - self.log.info("Skipping protobuf tombstone file: %s", tombstone_file) - continue - - print(tombstone_file) + for tombstone_file in sorted(tombstone_files): + tombstone_filename = tombstone_file.split("/")[-1] + modification_time = self._get_file_modification_time(tombstone_file) tombstone_data = self._get_file_content(tombstone_file) - tombstone = self.parse_tombstone(tombstone_data) - print(tombstone) - break - # self.log.info( - # "Extracted a total of %d database connection pool records", - # len(self.results), - # ) + try: + if tombstone_file.endswith(".pb"): + self.parse_protobuf( + tombstone_filename, modification_time, tombstone_data + ) + else: + self.parse(tombstone_filename, modification_time, tombstone_data) + except ValueError as e: + # Catch any exceptions raised during parsing or validation. + self.log.error(f"Error parsing tombstone file {tombstone_file}: {e}") + + self.log.info( + "Extracted a total of %d tombstone files", + len(self.results), + ) diff --git a/tests/android/test_artifact_tombstones.py b/tests/android/test_artifact_tombstones.py index ba6c823..b837d73 100644 --- a/tests/android/test_artifact_tombstones.py +++ b/tests/android/test_artifact_tombstones.py @@ -2,39 +2,64 @@ # Copyright (c) 2021-2023 The MVT Authors. # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import os +import datetime + +import pytest from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact -from mvt.android.parsers.proto.tombstone import Tombstone from ..utils import get_artifact class TestTombstoneCrashArtifact: - # def test_tombtone_process_parsing(self): - # tombstone_artifact = TombstoneCrashArtifact() - # file = get_artifact("android_data/tombstone_process.txt") - # with open(file, "rb") as f: - # data = f.read() - - # tombstone_artifact.parse_text(data) - # assert len(tombstone_artifact.results) == 1 - - # def test_tombtone_kernel_parsing(self): - # tombstone_artifact = TombstoneCrashArtifact() - # file = get_artifact("android_data/tombstone_kernel.txt") - # with open(file, "rb") as f: - # data = f.read() - - # tombstone_artifact.parse_text(data) - # assert len(tombstone_artifact.results) == 1 - - def test_tombstone_pb_process_parsing(self): - file = get_artifact("android_data/tombstone_process.pb") + def test_tombtone_process_parsing(self): + tombstone_artifact = TombstoneCrashArtifact() + artifact_path = "android_data/tombstone_process.txt" + file = get_artifact(artifact_path) with open(file, "rb") as f: data = f.read() - parsed_tombstone = Tombstone().parse(data) - assert parsed_tombstone - assert parsed_tombstone.command_line == ["/vendor/bin/hw/android.hardware.media.c2@1.2-mediatek"] - assert parsed_tombstone.uid == 1046 - assert parsed_tombstone.timestamp == "2023-04-12 12:32:40.518290770+0200" + # Pass the file name and timestamp to the parse method + file_name = os.path.basename(artifact_path) + file_timestamp = datetime.datetime(2023, 4, 12, 12, 32, 40, 518290) + tombstone_artifact.parse(file_name, file_timestamp, data) + + assert len(tombstone_artifact.results) == 1 + self.validate_tombstone_result(tombstone_artifact.results[0]) + + def test_tombstone_pb_process_parsing(self): + tombstone_artifact = TombstoneCrashArtifact() + artifact_path = "android_data/tombstone_process.pb" + file = get_artifact(artifact_path) + with open(file, "rb") as f: + data = f.read() + + file_name = os.path.basename(artifact_path) + file_timestamp = datetime.datetime(2023, 4, 12, 12, 32, 40, 518290) + tombstone_artifact.parse_protobuf(file_name, file_timestamp, data) + + assert len(tombstone_artifact.results) == 1 + self.validate_tombstone_result(tombstone_artifact.results[0]) + + @pytest.mark.skip(reason="Not implemented yet") + def test_tombtone_kernel_parsing(self): + tombstone_artifact = TombstoneCrashArtifact() + file = get_artifact("android_data/tombstone_kernel.txt") + with open(file, "rb") as f: + data = f.read() + + tombstone_artifact.parse_text(data) + assert len(tombstone_artifact.results) == 1 + + def validate_tombstone_result(self, tombstone_result: dict): + assert tombstone_result.get("command_line") == [ + "/vendor/bin/hw/android.hardware.media.c2@1.2-mediatek" + ] + assert tombstone_result.get("uid") == 1046 + assert tombstone_result.get("pid") == 25541 + assert tombstone_result.get("process_name") == "mtk.ape.decoder" + + # Check if the timestamp is correctly parsed, and converted to UTC + # Original is in +0200: 2023-04-12 12:32:40.518290770+0200, result should be 2023-04-12 10:32:40.000000+0000 + assert tombstone_result.get("timestamp") == "2023-04-12 10:32:40.000000" diff --git a/tests/artifacts/android_data/tombstone_process b/tests/artifacts/android_data/tombstone_process.txt similarity index 100% rename from tests/artifacts/android_data/tombstone_process rename to tests/artifacts/android_data/tombstone_process.txt