Compare commits

...

1 Commits

Author SHA1 Message Date
Janik Besendorf
4ec46cf53f fix tombstone unpack parsing bug 2025-10-22 21:51:25 +02:00

View File

@@ -70,7 +70,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
class TombstoneCrashArtifact(AndroidArtifact): class TombstoneCrashArtifact(AndroidArtifact):
""" " """
Parser for Android tombstone crash files. Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files. This parser can parse both text and protobuf tombstone crash files.
@@ -121,9 +121,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse_protobuf( def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None: ) -> None:
""" """Parse Android tombstone crash files from a protobuf object."""
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data) tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict( tombstone_dict = tombstone_pb.to_dict(
betterproto.Casing.SNAKE, include_default_values=True betterproto.Casing.SNAKE, include_default_values=True
@@ -144,21 +142,23 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse( def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None: ) -> None:
""" """Parse text Android tombstone crash files."""
Parse text Android tombstone crash files.
"""
# Split the tombstone file into a dictonary
tombstone_dict = { tombstone_dict = {
"file_name": file_name, "file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp), "file_timestamp": convert_datetime_to_iso(file_timestamp),
} }
lines = content.decode("utf-8").splitlines() lines = content.decode("utf-8").splitlines()
for line in lines: for line_num, line in enumerate(lines, 1):
if not line.strip() or TOMBSTONE_DELIMITER in line: if not line.strip() or TOMBSTONE_DELIMITER in line:
continue continue
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items(): try:
self._parse_tombstone_line(line, key, destination_key, tombstone_dict) for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
if self._parse_tombstone_line(
line, key, destination_key, tombstone_dict
):
break
except Exception as e:
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
# Validate the tombstone and add it to the results # Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict) tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
@@ -168,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
self, line: str, key: str, destination_key: str, tombstone: dict self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool: ) -> bool:
if not line.startswith(f"{key}"): if not line.startswith(f"{key}"):
return None return False
if key == "pid": if key == "pid":
return self._load_pid_line(line, tombstone) return self._load_pid_line(line, tombstone)
@@ -200,51 +200,50 @@ class TombstoneCrashArtifact(AndroidArtifact):
return True return True
def _load_pid_line(self, line: str, tombstone: dict) -> bool: def _load_pid_line(self, line: str, tombstone: dict) -> bool:
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")] try:
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
process_info = parts[0]
pid_key, pid_value = pid_part.split(":", 1) # Parse pid, tid, name from process info
if pid_key != "pid": info_parts = [p.strip() for p in process_info.split(",")]
raise ValueError(f"Expected key pid, got {pid_key}") for info in info_parts:
pid_value = int(pid_value.strip()) key, value = info.split(":", 1)
key = key.strip()
value = value.strip()
tid_key, tid_value = tid_part.split(":", 1) if key == "pid":
if tid_key != "tid": tombstone["pid"] = int(value)
raise ValueError(f"Expected key tid, got {tid_key}") elif key == "tid":
tid_value = int(tid_value.strip()) tombstone["tid"] = int(value)
elif key == "name":
tombstone["process_name"] = value
name_key, name_value = name_part.split(":", 1) # Extract binary path if it exists
if name_key != "name": if len(parts) > 1:
raise ValueError(f"Expected key name, got {name_key}") tombstone["binary_path"] = parts[1].strip().rstrip(" <")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)
tombstone["pid"] = pid_value return True
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool: except Exception as e:
process_name, process_path = process_name_part.split(">>>") raise ValueError(f"Failed to parse PID line: {str(e)}")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path
def _load_signal_line(self, line: str, tombstone: dict) -> bool: def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal, code, _ = [part.strip() for part in line.split(",", 2)] signal_part, code_part = map(str.strip, line.split(",")[:2])
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")
code_part = code.split("code ")[1] def parse_part(part: str, prefix: str) -> tuple[int, str]:
code_number, code_name = code_part.split(" ") match = part.split(prefix)[1]
code_name = code_name.strip("()") number = int(match.split()[0])
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
return number, name
signal_number, signal_name = parse_part(signal_part, "signal ")
code_number, code_name = parse_part(code_part, "code ")
tombstone["signal_info"] = { tombstone["signal_info"] = {
"code": int(code_number), "code": code_number,
"code_name": code_name, "code_name": code_name,
"name": signal_name, "name": signal_name,
"number": int(signal_code), "number": signal_number,
} }
return True return True
@@ -256,7 +255,6 @@ class TombstoneCrashArtifact(AndroidArtifact):
@staticmethod @staticmethod
def _parse_timestamp_string(timestamp: str) -> str: def _parse_timestamp_string(timestamp: str) -> str:
timestamp_parsed = parser.parse(timestamp) timestamp_parsed = parser.parse(timestamp)
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion. # HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc) local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
return convert_datetime_to_iso(local_timestamp) return convert_datetime_to_iso(local_timestamp)