Add initial tombstone parser

This supports parsing tombstone files from Android bugreports. The parser
can load both the legacy text format and the new binary protobuf format.
This commit is contained in:
Donncha Ó Cearbhaill
2025-02-06 20:06:57 +01:00
parent 02c02ca15c
commit b7595b62eb
6 changed files with 338 additions and 42 deletions
+258 -2
View File
@@ -3,11 +3,267 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import datetime
from typing import List, Optional, Union
import pydantic
import betterproto
from mvt.common.utils import convert_datetime_to_iso
from mvt.android.parsers.proto.tombstone import Tombstone
from .artifact import AndroidArtifact
TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
# Map the legacy crash file keys to the new format.
TOMBSTONE_TEXT_KEY_MAPPINGS = {
"Build fingerprint": "build_fingerprint",
"Revision": "revision",
"ABI": "arch",
"Timestamp": "timestamp",
"Process uptime": "process_uptime",
"Cmdline": "command_line",
"pid": "pid",
"tid": "tid",
"name": "process_name",
"binary_path": "binary_path",
"uid": "uid",
"signal": "signal_info",
"code": "code",
"Cause": "cause",
}
class SignalInfo(pydantic.BaseModel):
code: int
code_name: str
name: str
number: Optional[int] = None
class TombstoneCrashResult(pydantic.BaseModel):
"""
MVT Result model for a tombstone crash result.
Needed for validation and serialization, and consistency between text and protobuf tombstones.
"""
file_name: str
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
build_fingerprint: str
revision: int
arch: Optional[str] = None
timestamp: str # We store the timestamp as a string to avoid timezone issues
process_uptime: Optional[int] = None
command_line: Optional[List[str]] = None
pid: int
tid: int
process_name: Optional[str] = None
binary_path: Optional[str] = None
selinux_label: Optional[str] = None
uid: Optional[int] = None
signal_info: SignalInfo
cause: Optional[str] = None
extra: Optional[str] = None
class TombstoneCrashArtifact(AndroidArtifact):
def parse(self, content: bytes) -> None:
""" "
Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files.
"""
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["timestamp"],
"module": self.__class__.__name__,
"event": "Tombstone",
"data": (
f"Crash in '{record['process_name']}' process running as UID '{record['uid']}' at "
f"{record['timestamp']}. Crash type '{record['signal_info']['name']}' with code '{record['signal_info']['code_name']}'"
),
}
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result["process_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
ioc = self.indicators.check_process(command_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
SUSPICIOUS_UIDS = [
0, # root
1000, # system
2000, # shell
]
if result["uid"] in SUSPICIOUS_UIDS:
self.log.warning(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
)
self.detected.append(result)
def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None:
"""
Parse Android tombstone crash files."""
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict(betterproto.Casing.SNAKE)
# Add some extra metadata
tombstone_dict["timestamp"] = self._parse_timestamp_string(
tombstone_pb.timestamp
)
tombstone_dict["file_name"] = file_name
tombstone_dict["file_timestamp"] = convert_datetime_to_iso(file_timestamp)
tombstone_dict["process_name"] = self._proccess_name_from_thread(tombstone_dict)
# Confirm the tombstone is valid, and matches the output model
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
self.results.append(tombstone.model_dump())
def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None:
"""
Parse text Android tombstone crash files.
"""
# Split the tombstone file into a dictonary
tombstone_dict = {
"file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp),
}
lines = content.decode("utf-8").splitlines()
for line in lines:
if not line.strip() or TOMBSTONE_DELIMITER in line:
continue
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
# Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
self.results.append(tombstone.model_dump())
def _parse_tombstone_line(
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
if not line.startswith(f"{key}"):
return None
if key == "pid":
return self._load_pid_line(line, tombstone)
elif key == "signal":
return self._load_signal_line(line, tombstone)
elif key == "Timestamp":
return self._load_timestamp_line(line, tombstone)
else:
return self._load_key_value_line(line, key, destination_key, tombstone)
def _load_key_value_line(
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
line_key, value = line.split(":", 1)
if line_key != key:
raise ValueError(f"Expected key {key}, got {line_key}")
value_clean = value.strip().strip("'")
if destination_key in ["uid", "revision"]:
tombstone[destination_key] = int(value_clean)
elif destination_key == "process_uptime":
# eg. "Process uptime: 40s"
tombstone[destination_key] = int(value_clean.rstrip("s"))
elif destination_key == "command_line":
# XXX: Check if command line should be a single string in a list, or a list of strings.
tombstone[destination_key] = [value_clean]
else:
tombstone[destination_key] = value_clean
return True
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
pid_key, pid_value = pid_part.split(":", 1)
if pid_key != "pid":
raise ValueError(f"Expected key pid, got {pid_key}")
pid_value = int(pid_value.strip())
tid_key, tid_value = tid_part.split(":", 1)
if tid_key != "tid":
raise ValueError(f"Expected key tid, got {tid_key}")
tid_value = int(tid_value.strip())
name_key, name_value = name_part.split(":", 1)
if name_key != "name":
raise ValueError(f"Expected key name, got {name_key}")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)
tombstone["pid"] = pid_value
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
process_name, process_path = process_name_part.split(">>>")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal, code, _ = [part.strip() for part in line.split(",", 2)]
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")
code_part = code.split("code ")[1]
code_number, code_name = code_part.split(" ")
code_name = code_name.strip("()")
tombstone["signal_info"] = {
"code": int(code_number),
"code_name": code_name,
"name": signal_name,
"number": int(signal_code),
}
return True
def _load_timestamp_line(self, line: str, tombstone: dict) -> bool:
timestamp = line.split(":", 1)[1].strip()
tombstone["timestamp"] = self._parse_timestamp_string(timestamp)
return True
@staticmethod
def _parse_timestamp_string(timestamp: str) -> str:
timestamp_date, timezone = timestamp.split("+")
# Truncate microseconds before parsing
timestamp_without_micro = timestamp_date.split(".")[0] + "+" + timezone
timestamp_parsed = datetime.datetime.strptime(
timestamp_without_micro, "%Y-%m-%d %H:%M:%S%z"
)
return convert_datetime_to_iso(timestamp_parsed)
@staticmethod
def _proccess_name_from_thread(tombstone_dict: dict) -> str:
if tombstone_dict.get("threads"):
for thread in tombstone_dict["threads"].values():
if thread.get("id") == tombstone_dict["tid"] and thread.get("name"):
return thread["name"]
return "Unknown"
@@ -14,6 +14,7 @@ from .packages import Packages
from .platform_compat import PlatformCompat
from .receivers import Receivers
from .adb_state import DumpsysADBState
from .tombstones import Tombstones
BUGREPORT_MODULES = [
Accessibility,
@@ -27,4 +28,5 @@ BUGREPORT_MODULES = [
PlatformCompat,
Receivers,
DumpsysADBState,
Tombstones,
]
+9 -1
View File
@@ -2,7 +2,7 @@
# Copyright (c) 2021-2023 The MVT Authors.
# See the file 'LICENSE' for usage and copying permissions, or find a copy at
# https://github.com/mvt-project/mvt/blob/main/LICENSE
import datetime
import fnmatch
import logging
import os
@@ -91,3 +91,11 @@ class BugReportModule(MVTModule):
return None
return self._get_file_content(dumpstate_logs[0])
def _get_file_modification_time(self, file_path: str) -> dict:
if self.zip_archive:
file_timetuple = self.zip_archive.getinfo(file_path).date_time
return datetime.datetime(*file_timetuple)
else:
file_stat = os.stat(os.path.join(self.extract_path, file_path))
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
+18 -13
View File
@@ -42,18 +42,23 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
)
return
for tombstone_file in tombstone_files:
if tombstone_file.endswith("*.pb"):
self.log.info("Skipping protobuf tombstone file: %s", tombstone_file)
continue
print(tombstone_file)
for tombstone_file in sorted(tombstone_files):
tombstone_filename = tombstone_file.split("/")[-1]
modification_time = self._get_file_modification_time(tombstone_file)
tombstone_data = self._get_file_content(tombstone_file)
tombstone = self.parse_tombstone(tombstone_data)
print(tombstone)
break
# self.log.info(
# "Extracted a total of %d database connection pool records",
# len(self.results),
# )
try:
if tombstone_file.endswith(".pb"):
self.parse_protobuf(
tombstone_filename, modification_time, tombstone_data
)
else:
self.parse(tombstone_filename, modification_time, tombstone_data)
except ValueError as e:
# Catch any exceptions raised during parsing or validation.
self.log.error(f"Error parsing tombstone file {tombstone_file}: {e}")
self.log.info(
"Extracted a total of %d tombstone files",
len(self.results),
)
+51 -26
View File
@@ -2,39 +2,64 @@
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import datetime
import pytest
from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact
from mvt.android.parsers.proto.tombstone import Tombstone
from ..utils import get_artifact
class TestTombstoneCrashArtifact:
# def test_tombtone_process_parsing(self):
# tombstone_artifact = TombstoneCrashArtifact()
# file = get_artifact("android_data/tombstone_process.txt")
# with open(file, "rb") as f:
# data = f.read()
# tombstone_artifact.parse_text(data)
# assert len(tombstone_artifact.results) == 1
# def test_tombtone_kernel_parsing(self):
# tombstone_artifact = TombstoneCrashArtifact()
# file = get_artifact("android_data/tombstone_kernel.txt")
# with open(file, "rb") as f:
# data = f.read()
# tombstone_artifact.parse_text(data)
# assert len(tombstone_artifact.results) == 1
def test_tombstone_pb_process_parsing(self):
file = get_artifact("android_data/tombstone_process.pb")
def test_tombtone_process_parsing(self):
tombstone_artifact = TombstoneCrashArtifact()
artifact_path = "android_data/tombstone_process.txt"
file = get_artifact(artifact_path)
with open(file, "rb") as f:
data = f.read()
parsed_tombstone = Tombstone().parse(data)
assert parsed_tombstone
assert parsed_tombstone.command_line == ["/vendor/bin/hw/android.hardware.media.c2@1.2-mediatek"]
assert parsed_tombstone.uid == 1046
assert parsed_tombstone.timestamp == "2023-04-12 12:32:40.518290770+0200"
# Pass the file name and timestamp to the parse method
file_name = os.path.basename(artifact_path)
file_timestamp = datetime.datetime(2023, 4, 12, 12, 32, 40, 518290)
tombstone_artifact.parse(file_name, file_timestamp, data)
assert len(tombstone_artifact.results) == 1
self.validate_tombstone_result(tombstone_artifact.results[0])
def test_tombstone_pb_process_parsing(self):
tombstone_artifact = TombstoneCrashArtifact()
artifact_path = "android_data/tombstone_process.pb"
file = get_artifact(artifact_path)
with open(file, "rb") as f:
data = f.read()
file_name = os.path.basename(artifact_path)
file_timestamp = datetime.datetime(2023, 4, 12, 12, 32, 40, 518290)
tombstone_artifact.parse_protobuf(file_name, file_timestamp, data)
assert len(tombstone_artifact.results) == 1
self.validate_tombstone_result(tombstone_artifact.results[0])
@pytest.mark.skip(reason="Not implemented yet")
def test_tombtone_kernel_parsing(self):
tombstone_artifact = TombstoneCrashArtifact()
file = get_artifact("android_data/tombstone_kernel.txt")
with open(file, "rb") as f:
data = f.read()
tombstone_artifact.parse_text(data)
assert len(tombstone_artifact.results) == 1
def validate_tombstone_result(self, tombstone_result: dict):
assert tombstone_result.get("command_line") == [
"/vendor/bin/hw/android.hardware.media.c2@1.2-mediatek"
]
assert tombstone_result.get("uid") == 1046
assert tombstone_result.get("pid") == 25541
assert tombstone_result.get("process_name") == "mtk.ape.decoder"
# Check if the timestamp is correctly parsed, and converted to UTC
# Original is in +0200: 2023-04-12 12:32:40.518290770+0200, result should be 2023-04-12 10:32:40.000000+0000
assert tombstone_result.get("timestamp") == "2023-04-12 10:32:40.000000"