From 0fd3c811dbe198f83412f6b9a2a893c45d77a371 Mon Sep 17 00:00:00 2001 From: Karol Mazurek Date: Tue, 10 Jun 2025 19:08:16 +0200 Subject: [PATCH] Add inputStruct dumping functionality to iokit_tracer with output directory support --- X. NU/custom/drivers/iokit_tracer.py | 57 ++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/X. NU/custom/drivers/iokit_tracer.py b/X. NU/custom/drivers/iokit_tracer.py index 3e684c0..ec0c09f 100644 --- a/X. NU/custom/drivers/iokit_tracer.py +++ b/X. NU/custom/drivers/iokit_tracer.py @@ -4,6 +4,8 @@ import lldb import shlex import threading +import os +import re # --- Shared State & Locking --- g_lock = threading.Lock() @@ -40,6 +42,9 @@ def resolve_service_name(frame, service_ptr): if s and not s.startswith('0x'): return s return f"0x{service_ptr:x}" +def sanitize_filename(s): + return re.sub(r'[^A-Za-z0-9._-]', '_', s) + # --- LLDB Hook Functions --- def ioserviceopen_hook(frame, bp_loc, internal_dict): """Hook at IOServiceOpen. Stores args in a pending dictionary for the thread.""" @@ -58,8 +63,11 @@ def ioserviceopen_hook(frame, bp_loc, internal_dict): } return False +corpus_dir = None +corpus_counter = 0 + def ioconnectcallmethod_hook(frame, bp_loc, internal_dict): - """Hook at IOConnectCallMethod. Resolves connection and prints complete trace info.""" + global corpus_counter thread = frame.GetThread() process = thread.GetProcess() connection = frame.FindRegister("x0").GetValueAsUnsigned() @@ -88,6 +96,31 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict): outputStructCnt_ptr = frame.FindRegister("x9").GetValueAsUnsigned() err = lldb.SBError() + # --- Read output scalar count and output struct count for filename --- + outputCnt_val = process.ReadUnsignedFromMemory(outputCnt_ptr, 4, err) if outputCnt_ptr else 0 + outputCnt_str = str(outputCnt_val) if err.Success() else "err" + outputStructCnt_val = process.ReadUnsignedFromMemory(outputStructCnt_ptr, 8, err) if outputStructCnt_ptr else 0 + outputStructCnt_str = str(outputStructCnt_val) if err.Success() else "err" + + # --- Dump inputStruct to corpus directory if requested --- + if corpus_dir and inputStruct_ptr and inputStructCnt > 0: + try: + data = process.ReadMemory(inputStruct_ptr, inputStructCnt, err) + if err.Success(): + service_name = sanitize_filename(info['service_name']) + type_val = info['type'] + fname = ( + f"corpus_{service_name}_{type_val}_{selector}_" + f"{inputCnt}_{inputStructCnt}_{outputCnt_str}_{outputStructCnt_str}_{corpus_counter}.bin" + ) + full_path = os.path.join(corpus_dir, fname) + with open(full_path, "wb") as f: + f.write(data) + print(f"[iokit_tracer] Dumped inputStruct ({inputStructCnt} bytes) to {full_path}") + corpus_counter += 1 + except Exception as e: + print(f"[iokit_tracer] Error dumping inputStruct: {e}") + # Read input scalars from memory input_scalars = [] if input_ptr and inputCnt > 0: @@ -105,7 +138,7 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict): # Build output lines = [ - "\n----------------------------------------------------------------", + "----------------------------------------------------------------", f"PID: {info['pid']} | EXE: {info['exe_path']}", f"SERVICE: {info['service_name']} (Connection: {hex(connection)}) | TYPE: {info['type']}", f"SELECTOR: {selector} (0x{selector:x})", @@ -117,14 +150,32 @@ def ioconnectcallmethod_hook(frame, bp_loc, internal_dict): lines.append("--- OUTPUT ---") lines.append(f"outputCnt: {outputCnt_str} (capacity pointer: {hex(outputCnt_ptr)})") lines.append(f"outputStructCnt: {outputStructCnt_str} (capacity pointer: {hex(outputStructCnt_ptr)})") - lines.append("----------------------------------------------------------------") + lines.append("----------------------------------------------------------------\n") print("\n".join(lines)) return False def trace_iokit(debugger, command, result, internal_dict): """Command to attach or launch and set up the IOKit tracer.""" + global corpus_dir, corpus_counter args = shlex.split(command) pid, path, prog_args = None, None, [] + corpus_dir = None + corpus_counter = 0 + # --- Parse -o/--out flag --- + if "-o" in args: + idx = args.index("-o") + if idx + 1 < len(args): + corpus_dir = args[idx + 1] + del args[idx:idx+2] + elif "--out" in args: + idx = args.index("--out") + if idx + 1 < len(args): + corpus_dir = args[idx + 1] + del args[idx:idx+2] + if corpus_dir: + os.makedirs(corpus_dir, exist_ok=True) + print(f"[iokit_tracer] Will dump inputStructs to: {corpus_dir}") + if "--pid" in args: pid = args[args.index("--pid") + 1] if "--executable_path" in args: path = args[args.index("--executable_path") + 1] if "--" in args: prog_args = args[args.index("--") + 1:]