Add inputStruct dumping functionality to iokit_tracer with output directory support

This commit is contained in:
Karol Mazurek
2025-06-10 19:08:16 +02:00
parent 1962ab10ef
commit 0fd3c811db

View File

@@ -4,6 +4,8 @@
import lldb
import shlex
import threading
import os
import re
# --- Shared State & Locking ---
g_lock = threading.Lock()
@@ -40,6 +42,9 @@ def resolve_service_name(frame, service_ptr):
if s and not s.startswith('0x'): return s
return f"0x{service_ptr:x}"
def sanitize_filename(s):
return re.sub(r'[^A-Za-z0-9._-]', '_', s)
# --- LLDB Hook Functions ---
def ioserviceopen_hook(frame, bp_loc, internal_dict):
"""Hook at IOServiceOpen. Stores args in a pending dictionary for the thread."""
@@ -58,8 +63,11 @@ def ioserviceopen_hook(frame, bp_loc, internal_dict):
}
return False
corpus_dir = None
corpus_counter = 0
def ioconnectcallmethod_hook(frame, bp_loc, internal_dict):
"""Hook at IOConnectCallMethod. Resolves connection and prints complete trace info."""
global corpus_counter
thread = frame.GetThread()
process = thread.GetProcess()
connection = frame.FindRegister("x0").GetValueAsUnsigned()
@@ -88,6 +96,31 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict):
outputStructCnt_ptr = frame.FindRegister("x9").GetValueAsUnsigned()
err = lldb.SBError()
# --- Read output scalar count and output struct count for filename ---
outputCnt_val = process.ReadUnsignedFromMemory(outputCnt_ptr, 4, err) if outputCnt_ptr else 0
outputCnt_str = str(outputCnt_val) if err.Success() else "err"
outputStructCnt_val = process.ReadUnsignedFromMemory(outputStructCnt_ptr, 8, err) if outputStructCnt_ptr else 0
outputStructCnt_str = str(outputStructCnt_val) if err.Success() else "err"
# --- Dump inputStruct to corpus directory if requested ---
if corpus_dir and inputStruct_ptr and inputStructCnt > 0:
try:
data = process.ReadMemory(inputStruct_ptr, inputStructCnt, err)
if err.Success():
service_name = sanitize_filename(info['service_name'])
type_val = info['type']
fname = (
f"corpus_{service_name}_{type_val}_{selector}_"
f"{inputCnt}_{inputStructCnt}_{outputCnt_str}_{outputStructCnt_str}_{corpus_counter}.bin"
)
full_path = os.path.join(corpus_dir, fname)
with open(full_path, "wb") as f:
f.write(data)
print(f"[iokit_tracer] Dumped inputStruct ({inputStructCnt} bytes) to {full_path}")
corpus_counter += 1
except Exception as e:
print(f"[iokit_tracer] Error dumping inputStruct: {e}")
# Read input scalars from memory
input_scalars = []
if input_ptr and inputCnt > 0:
@@ -105,7 +138,7 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict):
# Build output
lines = [
"\n----------------------------------------------------------------",
"----------------------------------------------------------------",
f"PID: {info['pid']} | EXE: {info['exe_path']}",
f"SERVICE: {info['service_name']} (Connection: {hex(connection)}) | TYPE: {info['type']}",
f"SELECTOR: {selector} (0x{selector:x})",
@@ -117,14 +150,32 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict):
lines.append("--- OUTPUT ---")
lines.append(f"outputCnt: {outputCnt_str} (capacity pointer: {hex(outputCnt_ptr)})")
lines.append(f"outputStructCnt: {outputStructCnt_str} (capacity pointer: {hex(outputStructCnt_ptr)})")
lines.append("----------------------------------------------------------------")
lines.append("----------------------------------------------------------------\n")
print("\n".join(lines))
return False
def trace_iokit(debugger, command, result, internal_dict):
"""Command to attach or launch and set up the IOKit tracer."""
global corpus_dir, corpus_counter
args = shlex.split(command)
pid, path, prog_args = None, None, []
corpus_dir = None
corpus_counter = 0
# --- Parse -o/--out flag ---
if "-o" in args:
idx = args.index("-o")
if idx + 1 < len(args):
corpus_dir = args[idx + 1]
del args[idx:idx+2]
elif "--out" in args:
idx = args.index("--out")
if idx + 1 < len(args):
corpus_dir = args[idx + 1]
del args[idx:idx+2]
if corpus_dir:
os.makedirs(corpus_dir, exist_ok=True)
print(f"[iokit_tracer] Will dump inputStructs to: {corpus_dir}")
if "--pid" in args: pid = args[args.index("--pid") + 1]
if "--executable_path" in args: path = args[args.index("--executable_path") + 1]
if "--" in args: prog_args = args[args.index("--") + 1:]