diff --git a/src/mvt/android/artifacts/tombstone_crashes.py b/src/mvt/android/artifacts/tombstone_crashes.py index 5193ff8..0b8e522 100644 --- a/src/mvt/android/artifacts/tombstone_crashes.py +++ b/src/mvt/android/artifacts/tombstone_crashes.py @@ -70,7 +70,7 @@ class TombstoneCrashResult(pydantic.BaseModel): class TombstoneCrashArtifact(AndroidArtifact): - """ " + """ Parser for Android tombstone crash files. This parser can parse both text and protobuf tombstone crash files. @@ -121,9 +121,7 @@ class TombstoneCrashArtifact(AndroidArtifact): def parse_protobuf( self, file_name: str, file_timestamp: datetime.datetime, data: bytes ) -> None: - """ - Parse Android tombstone crash files from a protobuf object. - """ + """Parse Android tombstone crash files from a protobuf object.""" tombstone_pb = Tombstone().parse(data) tombstone_dict = tombstone_pb.to_dict( betterproto.Casing.SNAKE, include_default_values=True @@ -144,21 +142,23 @@ class TombstoneCrashArtifact(AndroidArtifact): def parse( self, file_name: str, file_timestamp: datetime.datetime, content: bytes ) -> None: - """ - Parse text Android tombstone crash files. - """ - - # Split the tombstone file into a dictonary + """Parse text Android tombstone crash files.""" tombstone_dict = { "file_name": file_name, "file_timestamp": convert_datetime_to_iso(file_timestamp), } lines = content.decode("utf-8").splitlines() - for line in lines: + for line_num, line in enumerate(lines, 1): if not line.strip() or TOMBSTONE_DELIMITER in line: continue - for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items(): - self._parse_tombstone_line(line, key, destination_key, tombstone_dict) + try: + for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items(): + if self._parse_tombstone_line( + line, key, destination_key, tombstone_dict + ): + break + except Exception as e: + raise ValueError(f"Error parsing line {line_num}: {str(e)}") # Validate the tombstone and add it to the results tombstone = TombstoneCrashResult.model_validate(tombstone_dict) @@ -168,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact): self, line: str, key: str, destination_key: str, tombstone: dict ) -> bool: if not line.startswith(f"{key}"): - return None + return False if key == "pid": return self._load_pid_line(line, tombstone) @@ -200,51 +200,50 @@ class TombstoneCrashArtifact(AndroidArtifact): return True def _load_pid_line(self, line: str, tombstone: dict) -> bool: - pid_part, tid_part, name_part = [part.strip() for part in line.split(",")] + try: + parts = line.split(" >>> ") if " >>> " in line else line.split(">>>") + process_info = parts[0] - pid_key, pid_value = pid_part.split(":", 1) - if pid_key != "pid": - raise ValueError(f"Expected key pid, got {pid_key}") - pid_value = int(pid_value.strip()) + # Parse pid, tid, name from process info + info_parts = [p.strip() for p in process_info.split(",")] + for info in info_parts: + key, value = info.split(":", 1) + key = key.strip() + value = value.strip() - tid_key, tid_value = tid_part.split(":", 1) - if tid_key != "tid": - raise ValueError(f"Expected key tid, got {tid_key}") - tid_value = int(tid_value.strip()) + if key == "pid": + tombstone["pid"] = int(value) + elif key == "tid": + tombstone["tid"] = int(value) + elif key == "name": + tombstone["process_name"] = value - name_key, name_value = name_part.split(":", 1) - if name_key != "name": - raise ValueError(f"Expected key name, got {name_key}") - name_value = name_value.strip() - process_name, binary_path = self._parse_process_name(name_value, tombstone) + # Extract binary path if it exists + if len(parts) > 1: + tombstone["binary_path"] = parts[1].strip().rstrip(" <") - tombstone["pid"] = pid_value - tombstone["tid"] = tid_value - tombstone["process_name"] = process_name - tombstone["binary_path"] = binary_path - return True + return True - def _parse_process_name(self, process_name_part, tombstone: dict) -> bool: - process_name, process_path = process_name_part.split(">>>") - process_name = process_name.strip() - binary_path = process_path.strip().split(" ")[0] - return process_name, binary_path + except Exception as e: + raise ValueError(f"Failed to parse PID line: {str(e)}") def _load_signal_line(self, line: str, tombstone: dict) -> bool: - signal, code, _ = [part.strip() for part in line.split(",", 2)] - signal = signal.split("signal ")[1] - signal_code, signal_name = signal.split(" ") - signal_name = signal_name.strip("()") + signal_part, code_part = map(str.strip, line.split(",")[:2]) - code_part = code.split("code ")[1] - code_number, code_name = code_part.split(" ") - code_name = code_name.strip("()") + def parse_part(part: str, prefix: str) -> tuple[int, str]: + match = part.split(prefix)[1] + number = int(match.split()[0]) + name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN" + return number, name + + signal_number, signal_name = parse_part(signal_part, "signal ") + code_number, code_name = parse_part(code_part, "code ") tombstone["signal_info"] = { - "code": int(code_number), + "code": code_number, "code_name": code_name, "name": signal_name, - "number": int(signal_code), + "number": signal_number, } return True @@ -256,7 +255,6 @@ class TombstoneCrashArtifact(AndroidArtifact): @staticmethod def _parse_timestamp_string(timestamp: str) -> str: timestamp_parsed = parser.parse(timestamp) - # HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion. local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc) return convert_datetime_to_iso(local_timestamp)