This commit is contained in:
Karmaz95
2024-03-24 16:07:27 +01:00
parent 0b5b02fdb9
commit 1342b2054a
7 changed files with 1232 additions and 69 deletions

View File

@@ -11,6 +11,10 @@ import json
import sys
import treelib
import ctypes
import stat
import struct
import threading
import time
### --- I. MACH-O --- ###
class MachOProcessor:
@@ -152,6 +156,9 @@ class MachOProcessor:
if args.calc_offset: # Calculate the real address of the Virtual Memory in the file.
snake_instance.printCalcRealAddressFromVM(args.calc_offset)
if args.constructors: # Print constructors
snake_instance.printConstructors()
class SnakeI:
def __init__(self, binaries, file_path):
'''
@@ -160,8 +167,11 @@ class SnakeI:
'''
self.binary = self.parseFatBinary(binaries)
self.file_path = file_path
self.segments_count, self.file_start, self.file_size, self.file_end = self.getSegmentsInfo()
self.load_commands = self.getLoadCommands()
self.endianess = self.getEndianess()
self.format_specifier = '<I' if self.getEndianess() == 'little' else '>I' # For struct.pack
self.reversed_format_specifier = '>I' if self.getEndianess() == 'little' else '<I' # For CS blob which is in Big Endian.
self.fat_offset = self.binary.fat_offset # For various calculations, if ARM64 Mach-O extracted from Universal Binary
self.prot_map = {
0: '---',
@@ -194,6 +204,24 @@ class SnakeI:
}
}
def getSegmentsInfo(self):
''' Helper function for gathering various initialization information about the binary if extracted from FAT. '''
segments_count = 0
for s in self.binary.segments:
segments_count+=1
for s in self.binary.segments:
if s.index == 0:
file_start = s.file_offset + self.binary.fat_offset
elif s.index == segments_count-1:
file_end = s.file_offset + s.file_size + self.binary.fat_offset
pass # self.binary.fat_offset
file_size = file_end - file_start
return segments_count, file_start, file_size, file_end
def mapProtection(self, numeric_protection):
'''Maps numeric protection to its string representation.'''
return self.prot_map.get(numeric_protection, 'Unknown')
@@ -371,7 +399,7 @@ class SnakeI:
''' Printing only exported symbol names. '''
for symbol in self.getExports():
print(symbol.name)
def getChainedFixups(self):
'''Return Chained Fixups information: https://lief-project.github.io/doc/latest/api/python/macho.html#chained-binding-info'''
return self.binary.dyld_chained_fixups
@@ -598,7 +626,7 @@ class SnakeI:
if self.hasSegment('__TEXT'):
for segment in self.binary.segments:
if segment.name == '__TEXT':
vm_base = segment.virtual_address + self.fat_offset
vm_base = segment.virtual_address
return vm_base
def calcRealAddressFromVM(self, vm_offset):
@@ -625,6 +653,11 @@ class SnakeI:
real_offset_hex = hex(real_offset)
print(f'{vm_offset} : {real_offset_hex}')
def printConstructors(self):
''' Print all constructors functions from the binary. '''
for ctor in self.binary.ctor_functions:
print(ctor)
### --- II. CODE SIGNING --- ###
class CodeSigningProcessor:
def __init__(self):
@@ -660,6 +693,12 @@ class CodeSigningProcessor:
if args.sign_binary: # Sign the given binary using specified identity:
snake_instance.signBinary(args.sign_binary)
if args.cs_offset: # Print Code Signature offset
snake_instance.printCodeSignatureOffset()
if args.cs_flags: # Print Code Signature flags
snake_instance.printCodeSignatureFlags()
class SnakeII(SnakeI):
def __init__(self, binaries, file_path):
super().__init__(binaries, file_path)
@@ -724,6 +763,61 @@ class SnakeII(SnakeI):
except Exception as e:
print(f"An error occurred during Code Signing using {security_identity}\n {e}")
def getCodeSignatureOffset(self):
''' Return the file offset of the Code Signature. Takes into account Fat binaries. '''
return self.binary.code_signature.data_offset + self.fat_offset
def printCodeSignatureOffset(self):
print(f'Code Signature offset: {hex(self.getCodeSignatureOffset())}')
def getCodeSignatureSize(self):
''' Return Code Signature size. '''
return self.binary.code_signature.data_size
def extractCodeSignatureBytes(self):
''' Extract the content of the Code Signature as raw bytes. Takes into account Fat binaries. '''
#The self.binary.code_signature.content.tobytes() takes into account Fat binaries, so no need to calculate the offset of valid signature manually.
#cs_offset = self.getCodeSignatureOffset()
#cs_size = self.getCodeSignatureSize()
#cs_bytes = self.extractBytesAtOffset(cs_offset, cs_size)
#self.saveBytesToFile(cs_bytes, 'test.bin')
cs_bytes = self.binary.code_signature.content.tobytes()
return cs_bytes
def findBytes(self, magic, bytes):
''' Find [magic] bytes in a given [bytes]. '''
offset = bytes.find(magic)
return offset
def parseCodeDirectoryBlob(self):
''' Parse Code Directory blob from Code Signature to extract its version and then use AppleStructuresManager to parse the whole structure according to its version. '''
# Extracting version number
CS_MAGIC_CODEDIRECTORY = 0xFADE0C02
cs_magic_codedirectory_as_bytes = struct.pack(self.reversed_format_specifier, CS_MAGIC_CODEDIRECTORY)
cs_blob = self.extractCodeSignatureBytes()
cs_directory_offset = self.findBytes(cs_magic_codedirectory_as_bytes, cs_blob)
version_offset = cs_directory_offset + 8
version_bytes = cs_blob[version_offset:version_offset+4]
version = struct.unpack(self.reversed_format_specifier, version_bytes)[0]
# Extracting size
size_offset = version_offset - 4
size_bytes = cs_blob[size_offset:size_offset+4]
size = struct.unpack(self.reversed_format_specifier, size_bytes)[0]
# Parsing __CodeDirectory
code_directory_struct_instance = AppleStructuresManager.CodeDirectory(version)
code_directory_dict = code_directory_struct_instance.parse(cs_blob[cs_directory_offset:size])
return code_directory_dict
def getCodeSignatureFlags(self):
''' Extract CS flags: https://github.com/apple-oss-distributions/xnu/blob/1031c584a5e37aff177559b9f69dbd3c8c3fd30a/osfmk/kern/cs_blobs.h#L35'''
code_directory_dict = self.parseCodeDirectoryBlob()
return code_directory_dict['flags']
def printCodeSignatureFlags(self):
print(f'CS_FLAGS: {hex(self.getCodeSignatureFlags())}')
### --- III. CHECKSEC --- ###
class ChecksecProcessor:
def __init__(self):
@@ -1826,8 +1920,29 @@ class AMFIProcessor:
if args.kext_exit: # Print kext exitpoint
snake_instance.printKextExitPoint(args.kext_exit)
if args.amfi:
snake_instance.printExports()
if args.mig: # Search for MIG subsystem and prints message handlers
snake_instance.printMIG()
if args.has_suid: # Print file SUID status
snake_instance.printHasSetUID()
if args.has_sgid: # Print file SGID status
snake_instance.printHasSetGID()
if args.has_sticky: # Print file sticky bit status
snake_instance.printStickyBit()
if args.injectable_dyld: # Static check for DYLD_INSERT_LIBRARIES
snake_instance.printCheckDyldInsertLibraries()
if args.test_insert_dylib: # INVASIVE check for DYLD_INSERT_LIBRARIES
snake_instance.printTestDyldInsertLibraries()
if args.test_prune_dyld: # INVASIVE check for DYLD_PRINT_INITIALIZERS (if DEV are cleared)
snake_instance.printTestPruneDyldEnv()
if args.test_dyld_print_to_file: # INVASIVE check for DYLD_PRINT_TO_FILE
snake_instance.printTestDyldPrintToFile()
class SnakeVI(SnakeV):
def __init__(self, binaries, file_path):
@@ -1978,7 +2093,7 @@ class SnakeVI(SnakeV):
# debug +
#Utils.printQuadWordsLittleEndian64(extracted_kmod_info_bytes)
# debug -
kmod_info_as_dict = AppleStructuresManager.parsekmod_info(extracted_kmod_info_bytes)
kmod_info_as_dict = AppleStructuresManager.kmod_info.parse(extracted_kmod_info_bytes)
return kmod_info_as_dict
def printParsedkmod_info(self, kext_name):
@@ -2017,6 +2132,262 @@ class SnakeVI(SnakeV):
kext_exitpoint = hex(self.calcKextEntryPoint(kext_name))
print(f'{kext_name} exitpoint: {kext_exitpoint}')
def parseMIG(self):
''' Search for MIG subsystem messages. I was using this Hopper script as an inspiration: https://github.com/knightsc/hopper/blob/master/scripts/MIG%20Detect.py
Returns a dictionary like: {'_MIG_subsystem_1000': {'_MIG_msg_1000': routine_for_msg}}
'''
va_start = self.getVirtualMemoryStartingAddress()
mig_subsystem_size = ctypes.sizeof(AppleStructuresManager.mig_subsystem)
routine_descriptor_size = ctypes.sizeof(AppleStructuresManager.routine_descriptor)
mig_subsystems = {}
# The MIG should be in __DATA,__const | __DATA_CONST,__const | __CONST,__constdata, but it is not always the case.
# Great example is decompressed kernelcache, there are no __const section. Conclusion, would be to iterate over each segment, but there is a problem with alignment.
for section in self.binary.sections:
if ('const' in section.name):# and 'DATA' in section.segment.name):
section_bytes = section.content.tobytes()
section_size = section.size
alignment = pow(2,section.alignment)
# Loop through section bytes using alignment to speed up
current_offset = 0
while current_offset < section_size:
chunk = section_bytes[current_offset:current_offset+mig_subsystem_size]
mig_subsystem_dict = AppleStructuresManager.mig_subsystem.parse(chunk)
number_of_msgs = mig_subsystem_dict['end'] - mig_subsystem_dict['start']
# Check for possible mig_subsystem structure:
if (number_of_msgs > 0 and
number_of_msgs < 1024 and
mig_subsystem_dict['server'] != 0 and
mig_subsystem_dict['start'] > 0 and
mig_subsystem_dict['end'] > 0 and
mig_subsystem_dict['reserved'] == 0 and
mig_subsystem_dict['routine_0'] == 0):
'''
# print(f'{hex(mig_subsystem_dict["server"])} {hex(mig_subsystem_dict["start"])}')
# At this stage I get 0x8028000000007e74 instead of 0x100007e74 and I do not know why. The same goes for every impl_routine later too...
# I can manually repair it by: & 0xffff | __TEXT
# It is temp fix, there must be a "proper way" - todo
'''
mig_subsystem_dict['server'] = mig_subsystem_dict['server'] & 0xffff | va_start # Fix according to the above comment
mig_subsystem_number = mig_subsystem_dict['start']
subsystem_name = "MIG_subsystem_{0}".format(mig_subsystem_number)
mig_subsystems[subsystem_name] = {}
current_offset += mig_subsystem_size
# If mig_subsystem structure was found, iterate over all routines
msg = 0
while msg < number_of_msgs:
routine_name = "MIG_msg_{0}".format(mig_subsystem_number+msg)
chunk = section_bytes[current_offset:current_offset+routine_descriptor_size]
routine_descriptor_dict = AppleStructuresManager.routine_descriptor.parse(chunk)
if routine_descriptor_dict['impl_routine'] != 0:
routine_descriptor_dict['impl_routine'] = routine_descriptor_dict['impl_routine'] & 0xffff | va_start # Fix like subsystem
mig_subsystems[subsystem_name].update({routine_name: routine_descriptor_dict})
current_offset += routine_descriptor_size
msg += 1
continue # To find more subsystems we continue the parent while without adding below alignment, because we added routine_descriptor_size
current_offset += alignment
return(mig_subsystems)
def printMIG(self):
''' Iterates over each subsystem and its associated messages, printing them in the nice format. '''
mig_subsystems = self.parseMIG()
for subsystem, messages in mig_subsystems.items():
print(subsystem + ":")
for message, details in messages.items():
print(f"- {message}: {hex(details['impl_routine'])}")
def hasSetUID(self):
"""
Check if a file has the SUID (Set User ID) bit set.
Args:
filename (str): Path to the file to be checked.
Returns:
bool: True if SUID bit is set, False otherwise.
"""
st_mode = os.stat(self.file_path).st_mode
return bool(st_mode & stat.S_ISUID)
def hasSetGID(self):
"""
Check if a file has the setgid (Set Group ID) bit set.
Args:
filename (str): Path to the file to be checked.
Returns:
bool: True if setgid bit is set, False otherwise.
"""
st_mode = os.stat(self.file_path).st_mode
return bool(st_mode & stat.S_ISGID)
def hasStickyBit(self):
"""
Check if a file has the sticky bit set.
Args:
filename (str): Path to the file to be checked.
Returns:
bool: True if sticky bit is set, False otherwise.
"""
st_mode = os.stat(self.file_path).st_mode
return bool(st_mode & stat.S_ISVTX)
def printHasSetUID(self):
print(f'SUID: {self.hasSetUID()}')
def printHasSetGID(self):
print(f'SGID: {self.hasSetGID()}')
def printStickyBit(self):
print(f'STICKY: {self.hasStickyBit()}')
def checkDyldInsertLibraries(self):
''' Check if binary is vulnerable to code injection using DYLD_INSERT_LIBRARIES. '''
cs_flags = self.getCodeSignatureFlags()
if cs_flags & 0x12800:
return False
if self.hasSetUID() or self.hasSetGID() or self.hasRestrictSegment():
return False
return True
def printCheckDyldInsertLibraries(self):
#print(f'{self.file_path} injectable DYLD_INSERT_LIBRARIES: {self.checkDyldInsertLibraries()}')
print(f'Injectable DYLD_INSERT_LIBRARIES: {self.checkDyldInsertLibraries()}')
def listenSyslog(self, test_string, test_string_found, stop_event, timeout=2):
''' Function to listen (for 2 seconds by default) to macOS system logs for a specific string. '''
# Run the log command to retrieve system log messages
process = subprocess.Popen(['log', 'stream', '--timeout', str(timeout)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for line in process.stdout:
if test_string in line:
test_string_found.set()
return
if stop_event.is_set():
return
def testDyldInsertLibraries(self):
''' Checking if DYLD_INSERT_LIBRARIES is allowed.
INVASIVE:
0. Check if /tmp/crimson_stalker.dylib exists.
1. If not - the library /tmp/crimson_stalker.dylib is compiled.
2. Binary is executed with DYLD_INSERT_LIBRARIES=/tmp/crimson_stalker.dylib
3. Library is NOT REMOVED it stays in /tmp/ in case of massive checks with loops.
'''
stalker_path = '/tmp/crimson_stalker.dylib'
env_variable = f'DYLD_INSERT_LIBRARIES={stalker_path}'
test_string = 'crimson_stalker library injected into '
# Compile dylib if not exist:
if not os.path.exists(stalker_path):
file_name_c = '/tmp/crimson_stalker.c'
source_code = SourceCodeManager.crimson_stalker
output_filename = stalker_path
flag_list = ['-dynamiclib']
SourceCodeManager.clangCompilerWrapper(file_name_c, source_code, output_filename, flag_list)
# Create a threading event to signal when the test string is found in syslog or to stop listenSyslog thread (using Event as a flag and .set() as a switch)
test_string_found = threading.Event() # Used in listenSyslog -> when test_string_found.set() is called, it is final check if test_string was found in syslogs.
stop_event = threading.Event() # Used in this function -> when stop_event.set() is called below, it inform listenSyslog to stop.s
# Start listening for syslog messages in a separate thread
syslog_listener_thread = threading.Thread(target=self.listenSyslog, args=(test_string, test_string_found, stop_event))
syslog_listener_thread.start()
# To avoid Race Codition false positives because syslog_listener_thread just started
# We must wait for at least 0.1 for listenSyslog to start reading logs
# Then we can execute the command below without a fear it will be omited in by the syslog_listener_thread.
time.sleep(0.2) # 0.2 here and 2 for the timeout in listenSyslog is enough
# Execute the command and capture stdout and stderr
command = f'{env_variable} {self.file_path}'
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = process.communicate()
# Wait for the subprocess to finish
process.wait()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
# Check if the test string was found in stdout (it should not appear in stderr, but I check for that, you never know :D)
if (test_string in stdout) or (test_string in stderr):
stop_event.set()
return True
# Wait for the thread to finish
syslog_listener_thread.join()
# Check if the test string was found in syslog
if test_string_found.is_set(): #
return True
return False
def printTestDyldInsertLibraries(self):
print(f'DYLD_INSERT_LIBRARIES is allowed: {self.testDyldInsertLibraries()}')
def testPruneDyldEnv(self):
''' Checking if Dyld Environment Variables are cleared (INVASIVE - the binary is executed) '''
env_variable = 'DYLD_PRINT_INITIALIZERS=1'
test_string = 'running initializer '
# Execute the command and capture stdout and stderr
command = f'{env_variable} {self.file_path}'
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = process.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
if test_string in stderr:
return False
return True
def printTestPruneDyldEnv(self):
#print(f'{self.file_path} DEV Pruned: {self.testPruneDyldEnv()}')
print(f'DEV Pruned: {self.testPruneDyldEnv()}')
def testDyldPrintToFile(self):
''' Checking if DYLD_PRINT_TO_FILE Dyld Environment Variables works.
INVASIVE:
1. The binary is executed.
2. The file /tmp/crimson_1029384756_testDyldPrintToFile.txt is created if env works.
3. The file is then removed
'''
test_file_path = '/tmp/crimson_1029384756_testDyldPrintToFile.txt'
env_variable = f'DYLD_PRINT_TO_FILE={test_file_path}'
# Execute the command and capture stdout and stderr
command = f'{env_variable} {self.file_path}'
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = process.communicate()
#process.wait()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
if os.path.exists(test_file_path):
os.remove(test_file_path)
return True
return False
def printTestDyldPrintToFile(self):
print(f'DYLD_PRINT_TO_FILE allowed: {self.testDyldPrintToFile()}')
### --- ARGUMENT PARSER --- ###
class ArgumentParser:
def __init__(self):
@@ -2060,6 +2431,7 @@ class ArgumentParser:
macho_group.add_argument('--info', action='store_true', default=False, help="Print header, load commands, segments, sections, symbols, and strings")
macho_group.add_argument('--dump_data', help="Dump {size} bytes starting from {offset} to a given {filename} (e.g. '0x1234,0x1000,out.bin')", metavar=('offset,size,output_path'), nargs="?")
macho_group.add_argument('--calc_offset', help="Calculate the real address (file on disk) of the given Virtual Memory {vm_offset} (e.g. 0xfffffe000748f580)", metavar='vm_offset')
macho_group.add_argument('--constructors', action='store_true', help="Print binary constructors")
def addCodeSignArgs(self):
codesign_group = self.parser.add_argument_group('CODE SIGNING ARGS')
@@ -2071,6 +2443,8 @@ class ArgumentParser:
codesign_group.add_argument('--extract_certificates', help="Extract Certificates and save them to a given file. To each filename will be added an index at the end: _0 for signing, _1 for intermediate, and _2 for root CA certificate", metavar='certificate_name')
codesign_group.add_argument('--remove_sig', help="Save the new file on a disk with removed signature", metavar='unsigned_binary')
codesign_group.add_argument('--sign_binary', help="Sign binary using specified identity - use : 'security find-identity -v -p codesigning' to get the identity (default: adhoc)", nargs='?', const='adhoc', metavar='adhoc|identity')
codesign_group.add_argument('--cs_offset', action='store_true', help="Print Code Signature file offset")
codesign_group.add_argument('--cs_flags', action='store_true', help="Print Code Signature flags")
def addChecksecArgs(self):
checksec_group = self.parser.add_argument_group('CHECKSEC ARGS')
@@ -2122,11 +2496,16 @@ class ArgumentParser:
dyld_group.add_argument('--dump_prelink_kext', metavar='kext_name', nargs="?", help='Dump prelinked KEXT {kext_name} from decompressed Kernel Cache PRELINK_TEXT segment to a file named: prelinked_{kext_name}.bin')
dyld_group.add_argument('--kext_prelinkinfo', metavar='kext_name', nargs="?", help='Print _Prelink properties from PRELINK_INFO,__info for a give {kext_name}')
dyld_group.add_argument('--kmod_info', metavar='kext_name', help="Parse kmod_info structure for the given {kext_name} from Kernel Cache")
dyld_group.add_argument('--kext_entry', metavar='kext_name', help="Calculate the virtual memory address of the __start (entrpoint) for the given {kext_name} Kernel Extension")
dyld_group.add_argument('--kext_entry', metavar='kext_name', help="Calculate the virtual memory address of the __start (entrypoint) for the given {kext_name} Kernel Extension")
dyld_group.add_argument('--kext_exit', metavar='kext_name', help="Calculate the virtual memory address of the __stop (exitpoint) for the given {kext_name} Kernel Extension")
dyld_group.add_argument('--amfi', help="a")
dyld_group.add_argument('--mig', action='store_true', help="Search for MIG subsystem and prints message handlers")
dyld_group.add_argument('--has_suid', action='store_true', help="Check if the file has SetUID bit set")
dyld_group.add_argument('--has_sgid', action='store_true', help="Check if the file has SetGID bit set")
dyld_group.add_argument('--has_sticky', action='store_true', help="Check if the file has sticky bit set")
dyld_group.add_argument('--injectable_dyld', action='store_true', help="Check if the binary is injectable using DYLD_INSERT_LIBRARIES")
dyld_group.add_argument('--test_insert_dylib', action='store_true', help="Check if it is possible to inject dylib using DYLD_INSERT_LIBRARIES (INVASIVE - the binary is executed)")
dyld_group.add_argument('--test_prune_dyld', action='store_true', help="Check if Dyld Environment Variables are cleared (using DYLD_PRINT_INITIALIZERS=1) (INVASIVE - the binary is executed)")
dyld_group.add_argument('--test_dyld_print_to_file', action='store_true', help="Check if YLD_PRINT_TO_FILE Dyld Environment Variables works (INVASIVE - the binary is executed)")
def parseArgs(self):
return self.parser.parse_args()
@@ -2138,6 +2517,18 @@ class ArgumentParser:
### --- SOURCE CODE --- ###
class SourceCodeManager:
crimson_stalker = r'''
// clang -dynamiclib /tmp/crimson_stalker.c -o /tmp/crimson_stalker.dylib
#include <syslog.h>
#include <stdio.h>
__attribute__((constructor))
void myconstructor(int argc, const char **argv) {
syslog(LOG_ERR, "crimson_stalker library injected into %s\n", argv[0]);
printf("crimson_stalker library injected into %s\n", argv[0]);
}
'''
dylib_hijacking = r'''
// clang -dynamiclib m.c -o m.dylib //-o $PWD/TARGET_DYLIB
#include <syslog.h>
@@ -2155,6 +2546,7 @@ void myconstructor(int argc, const char **argv)
//system("/bin/sh");
}
'''
@staticmethod
def clangCompilerWrapper(file_name_c, source_code, output_filename, flag_list=None):
# Save the source code to a file
@@ -2185,32 +2577,197 @@ class AppleStructuresManager:
("start", ctypes.c_uint64),
("stop", ctypes.c_uint64)
]
def parse(data):
# Create an instance of the kmod_info structure
info = AppleStructuresManager.kmod_info()
# Cast the binary data to the structure
ctypes.memmove(ctypes.byref(info), data, ctypes.sizeof(info))
def parsekmod_info(data):
# Create an instance of the kmod_info structure
info = AppleStructuresManager.kmod_info()
# Cast the binary data to the structure
ctypes.memmove(ctypes.byref(info), data, ctypes.sizeof(info))
# Convert name and version to strings
name = info.name.decode('utf-8').rstrip('\x00')
version = info.version.decode('utf-8').rstrip('\x00')
# Convert name and version to strings
name = info.name.decode('utf-8').rstrip('\x00')
version = info.version.decode('utf-8').rstrip('\x00')
# Return parsed data as a dictionary
return {
"next": info.next,
"info_version": info.info_version,
"id": hex(info.id),
"name": name,
"version": version,
"reference_count": info.reference_count,
"reference_list": hex(info.reference_list),
"address": hex(info.address),
"size": hex(info.size),
"hdr_size": hex(info.hdr_size),
"start": hex(info.start),
"stop": hex(info.stop)
}
# Return parsed data as a dictionary
return {
"next": info.next,
"info_version": info.info_version,
"id": hex(info.id),
"name": name,
"version": version,
"reference_count": info.reference_count,
"reference_list": hex(info.reference_list),
"address": hex(info.address),
"size": hex(info.size),
"hdr_size": hex(info.hdr_size),
"start": hex(info.start),
"stop": hex(info.stop)
}
class mig_subsystem(ctypes.Structure):
''' REF: https://github.com/apple-oss-distributions/xnu/blob/1031c584a5e37aff177559b9f69dbd3c8c3fd30a/osfmk/mach/mig.h#L121C16-L121C29 '''
_pack_ = 1 # Specify the byte order (little-endian)
_fields_ = [
("server", ctypes.c_uint64), # Pointer to demux routine
("start", ctypes.c_uint32), # Min routine number
("end", ctypes.c_uint32), # Max routine number + 1
("maxsize", ctypes.c_uint64), # Max reply message size
("reserved", ctypes.c_uint64), # Reserved for MIG use
("routine_0", ctypes.c_uint64) # Routine descriptor array
]
def parse(data):
# Create an instance of the structure
info = AppleStructuresManager.mig_subsystem()
# Cast the binary data to the structure
ctypes.memmove(ctypes.byref(info), data, ctypes.sizeof(info))
# Return parsed data as a dictionary
return {
"server": info.server,
"start": info.start,
"end": info.end,
"maxsize": info.maxsize,
"reserved": info.reserved,
"routine_0": info.routine_0,
}
class routine_descriptor(ctypes.Structure):
''' REF: https://github.com/apple-oss-distributions/xnu/blob/1031c584a5e37aff177559b9f69dbd3c8c3fd30a/osfmk/mach/mig.h#L105C8-L105C26 '''
_pack_ = 1 # Specify the byte order (little-endian)
_fields_ = [
("impl_routine", ctypes.c_uint64), # Server work func pointer
("stub_routine", ctypes.c_uint64), # Unmarshalling func pointer
("argc", ctypes.c_uint32), # Number of argument words
("descr_count", ctypes.c_uint32), # Number complex descriptors
("arg_descr", ctypes.c_uint64), # Pointer to descriptor array
("max_reply_msg", ctypes.c_uint64) # Max size for reply msg
]
def parse(data):
# Create an instance of the structure
info = AppleStructuresManager.routine_descriptor()
# Cast the binary data to the structure
ctypes.memmove(ctypes.byref(info), data, ctypes.sizeof(info))
# Return parsed data as a dictionary
return {
"impl_routine": info.impl_routine,
"stub_routine": info.stub_routine,
"argc": info.argc,
"descr_count": info.descr_count,
"arg_descr": info.arg_descr,
"max_reply_msg": info.max_reply_msg,
}
class CodeDirectory(ctypes.BigEndianStructure):
''' REF: https://github.com/Karmaz95/Snake_Apple/blob/0b5b02fdb954ca5f63eb240092cf98a68fa4e19f/II.%20Code%20Signing/mac/cs_blobs.h#L212C16-L212C31'''
class v0(ctypes.BigEndianStructure):
_fields_ = [
("magic", ctypes.c_uint32),
("length", ctypes.c_uint32),
("version", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("hashOffset", ctypes.c_uint32),
("identOffset", ctypes.c_uint32),
("nSpecialSlots", ctypes.c_uint32),
("nCodeSlots", ctypes.c_uint32),
("codeLimit", ctypes.c_uint32),
("hashSize", ctypes.c_uint8),
("hashType", ctypes.c_uint8),
("platform", ctypes.c_uint8),
("pageSize", ctypes.c_uint8),
("spare2", ctypes.c_uint32),
]
class v20100(v0):
_fields_ = [
("scatterOffset", ctypes.c_uint32),
]
class v20200(v20100):
_fields_ = [
("teamOffset", ctypes.c_uint32),
]
class v20300(v20200):
_fields_ = [
("spare3", ctypes.c_uint32),
("codeLimit64", ctypes.c_uint64),
]
class v20400(v20300):
_fields_ = [
("execSegBase", ctypes.c_uint64),
("execSegLimit", ctypes.c_uint64),
("execSegFlags", ctypes.c_uint64),
]
class v20500(v20400):
_fields_ = [
("runtime", ctypes.c_uint32),
("preEncryptOffset", ctypes.c_uint32),
]
class v20600(v20500):
_fields_ = [
("linkageHashType", ctypes.c_uint8),
("linkageApplicationType", ctypes.c_uint8),
("linkageApplicationSubType", ctypes.c_uint16),
("linkageOffset", ctypes.c_uint32),
("linkageSize", ctypes.c_uint32),
]
def __init__(self, version):
self.version = version
if version == 0x20100:
self.info = self.v20100()
elif version == 0x20200:
self.info = self.v20200()
elif version == 0x20300:
self.info = self.v20300()
elif version == 0x20400:
self.info = self.v20400()
elif version == 0x20500:
self.info = self.v20500()
elif version == 0x20600:
self.info = self.v20600()
else:
self.info = self.v0()
def parse(self, data):
ctypes.memmove(ctypes.byref(self.info), data, min(ctypes.sizeof(self.info), len(data)))
# Return parsed data as a dictionary
return {
"magic": getattr(self.info, "magic", None),
"length": getattr(self.info, "length", None),
"version": getattr(self.info, "version", None),
"flags": getattr(self.info, "flags", None),
"hashOffset": getattr(self.info, "hashOffset", None),
"identOffset": getattr(self.info, "identOffset", None),
"nSpecialSlots": getattr(self.info, "nSpecialSlots", None),
"nCodeSlots": getattr(self.info, "nCodeSlots", None),
"codeLimit": getattr(self.info, "codeLimit", None),
"hashSize": getattr(self.info, "hashSize", None),
"hashType": getattr(self.info, "hashType", None),
"platform": getattr(self.info, "platform", None),
"pageSize": getattr(self.info, "pageSize", None),
"spare2": getattr(self.info, "spare2", None),
"scatterOffset": getattr(self.info, "scatterOffset", None),
"teamOffset": getattr(self.info, "teamOffset", None),
"spare3": getattr(self.info, "spare3", None),
"codeLimit64": getattr(self.info, "codeLimit64", None),
"execSegBase": getattr(self.info, "execSegBase", None),
"execSegLimit": getattr(self.info, "execSegLimit", None),
"execSegFlags": getattr(self.info, "execSegFlags", None),
"runtime": getattr(self.info, "runtime", None),
"preEncryptOffset": getattr(self.info, "preEncryptOffset", None),
"linkageHashType": getattr(self.info, "linkageHashType", None),
"linkageApplicationType": getattr(self.info, "linkageApplicationType", None),
"linkageApplicationSubType": getattr(self.info, "linkageApplicationSubType", None),
"linkageOffset": getattr(self.info, "linkageOffset", None),
"linkageSize": getattr(self.info, "linkageSize", None),
}
### --- UTILS / DEBUG --- ###
class Utils:
@@ -2242,6 +2799,34 @@ class Utils:
i+=1
print()
def printQuadWordsBigEndian64(byte_string, columns=2):
''' Print Q values from given {byte_string} in {columns} columns (default 2)
0000000000000000 FFFFFFFF00000001
6C7070612E6D6F63 7265766972642E65
'''
# Ensure the byte string length is a multiple of 8
while len(byte_string) % 8 != 0:
byte_string += b'\x00' # Add padding to make it divisible by 8
# Convert the byte string to a list of integers
byte_list = list(byte_string)
# Group the bytes into 8-byte chunks
chunks = [byte_list[i:i+8] for i in range(0, len(byte_list), 8)]
# Print the raw bytes in 64-bit big-endian order
print("Raw bytes (64-bit big-endian):")
i = 1
for chunk in chunks:
chunk_value = int.from_bytes(chunk, byteorder='big') # Changed to 'big'
if i < columns:
print(f"{chunk_value:016X}", end=" ")
else:
print(f"{chunk_value:016X}", end="\n")
i = 0
i += 1
print()
def printRawHex(byte_string):
'''
Print bytes as raw hexes (without endianess).
@@ -2250,7 +2835,6 @@ class Utils:
hex_string = ' '.join(f'{byte:02x}' for byte in byte_string)
print(hex_string)
if __name__ == "__main__":
arg_parser = ArgumentParser()
args = arg_parser.parseArgs()

View File

@@ -0,0 +1,81 @@
# The script is not mine. Here is the source: https://github.com/knightsc/hopper/blob/master/scripts/MIG%20Detect.py
# This script attempts to identify mach_port_subsystem structures in the
# __DATA section of executables or kernels
#
# const struct mach_port_subsystem {
# mig_server_routine_t server; /* Server routine */
# mach_msg_id_t start; /* Min routine number */
# mach_msg_id_t end; /* Max routine number + 1 */
# unsigned int maxsize; /* Max msg size */
# vm_address_t reserved; /* Reserved */
# struct routine_descriptor routine[X]; /* Array of routine descriptors */
# }
#
# struct routine_descriptor {
# mig_impl_routine_t impl_routine; /* Server work func pointer */
# mig_stub_routine_t stub_routine; /* Unmarshalling func pointer */
# unsigned int argc; /* Number of argument words */
# unsigned int descr_count; /* Number complex descriptors */
# routine_arg_descriptor_t arg_descr; /* pointer to descriptor array*/
# unsigned int max_reply_msg; /* Max size for reply msg */
# };
#
# If it finds the mach_port_subsystem structure then it will label the structure as
# well as labelling each MIG msg stub function.
sections = [
('__DATA', '__const'),
('__CONST', '__constdata'),
('__DATA_CONST', '__const'),
]
doc = Document.getCurrentDocument()
for (segname, secname) in sections:
seg = doc.getSegmentByName(segname)
if seg is None:
continue
seclist = seg.getSectionsList()
for sec in seclist:
if sec.getName() != secname:
continue
# Loop through each item in the section
start = sec.getStartingAddress()
end = start + sec.getLength() - 0x28
for addr in range(start, end):
mach_port_subsystem_reserved = seg.readUInt64LE(addr + 0x18)
mach_port_subsystem_routine0_impl_routine = seg.readUInt64LE(addr + 0x20)
mach_port_subsystem_start = seg.readUInt32LE(addr + 0x8)
mach_port_subsystem_end = seg.readUInt32LE(addr + 0xc)
number_of_msgs = mach_port_subsystem_end - mach_port_subsystem_start
# Check if this looks like a mach_port_subsystem structure
if (mach_port_subsystem_reserved == 0 and
mach_port_subsystem_routine0_impl_routine == 0 and
mach_port_subsystem_start != 0 and
number_of_msgs > 0 and
number_of_msgs < 1024):
subsystem_name = "_MIG_subsystem_{0}".format(mach_port_subsystem_start)
doc.log("{0}: MIG Subsystem {1}: {2} messages".format(hex(addr), mach_port_subsystem_start, number_of_msgs))
seg.setNameAtAddress(addr, subsystem_name)
# Loop through the routine_descriptor structs
msg_num = 0
for routine_addr in range(addr + 0x20, addr+0x20+(number_of_msgs*0x28), 0x28):
stub_routine_addr = routine_addr + 0x8
stub_routine = seg.readUInt64LE(stub_routine_addr)
msg = mach_port_subsystem_start + msg_num
if stub_routine == 0:
doc.log("{0}: skip MIG msg {1}".format(hex(stub_routine_addr), msg))
else:
routine_name = "_MIG_msg_{0}".format(msg)
doc.log("{0}: MIG msg {1}".format(hex(stub_routine_addr), msg))
doc.setNameAtAddress(stub_routine, routine_name)
msg_num = msg_num + 1

View File

@@ -17,4 +17,4 @@ set_flags = check_flags(input_value)
if set_flags:
print("Flags set:")
print(*set_flags, sep="\n"
print(*set_flags, sep="\n")