Compare commits

..

1 Commits

Author SHA1 Message Date
Janik Besendorf
39a26d0f0b Replace iOSbackup with iphone_backup_decrypt
Replace the unmaintained iOSbackup dependency with iphone_backup_decrypt
(MIT licensed, actively maintained). This fixes file corruption caused by
iOSbackup truncating files to inaccurate sizes from backup metadata.

The extract-key command and --key-file option are preserved via an
MVTEncryptedBackup subclass that patches the keybag unlock to
capture/reuse the derived PBKDF2 key.

Closes #669
2026-04-12 10:51:56 +02:00
6 changed files with 199 additions and 169 deletions

View File

@@ -24,7 +24,8 @@ dependencies = [
"simplejson==3.20.2", "simplejson==3.20.2",
"packaging==26.0", "packaging==26.0",
"appdirs==1.4.4", "appdirs==1.4.4",
"iOSbackup==0.9.925", "iphone_backup_decrypt==0.9.0",
"pycryptodome>=3.18",
"adb-shell[usb]==0.4.4", "adb-shell[usb]==0.4.4",
"libusb1==3.3.1", "libusb1==3.3.1",
"cryptography==46.0.6", "cryptography==46.0.6",

View File

@@ -22,13 +22,13 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
def parse(self, content: str) -> None: def parse(self, content: str) -> None:
""" """
Parse the Dumpsys Accessibility section. Parse the Dumpsys Accessibility section/
Adds results to self.results (List[Dict[str, Any]]) Adds results to self.results (List[Dict[str, str]])
:param content: content of the accessibility section (string) :param content: content of the accessibility section (string)
""" """
# Parse installed services # "Old" syntax
in_services = False in_services = False
for line in content.splitlines(): for line in content.splitlines():
if line.strip().startswith("installed services:"): if line.strip().startswith("installed services:"):
@@ -39,6 +39,7 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
continue continue
if line.strip() == "}": if line.strip() == "}":
# At end of installed services
break break
service = line.split(":")[1].strip() service = line.split(":")[1].strip()
@@ -47,66 +48,21 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
{ {
"package_name": service.split("/")[0], "package_name": service.split("/")[0],
"service": service, "service": service,
"enabled": False,
} }
) )
# Parse enabled services from both old and new formats. # "New" syntax - AOSP >= 14 (?)
# # Looks like:
# Old format (multi-line block): # Enabled services:{{com.azure.authenticator/com.microsoft.brooklyn.module.accessibility.BrooklynAccessibilityService}, {com.agilebits.onepassword/com.agilebits.onepassword.filling.accessibility.FillingAccessibilityService}}
# enabled services: {
# 0 : com.example/.MyService
# }
#
# New format (single line, AOSP >= 14):
# Enabled services:{{com.example/com.example.MyService}, {com.other/com.other.Svc}}
enabled_services = set()
in_enabled = False
for line in content.splitlines(): for line in content.splitlines():
stripped = line.strip() if line.strip().startswith("Enabled services:"):
matches = re.finditer(r"{([^{]+?)}", line)
if in_enabled:
if stripped == "}":
in_enabled = False
continue
service = line.split(":")[1].strip()
enabled_services.add(service)
continue
if re.match(r"enabled services:\s*\{\s*$", stripped, re.IGNORECASE):
# Old multi-line format: "enabled services: {"
in_enabled = True
continue
if re.match(r"enabled services:\s*\{", stripped, re.IGNORECASE):
# New single-line format: "Enabled services:{{pkg/svc}, {pkg2/svc2}}"
matches = re.finditer(r"\{([^{}]+)\}", stripped)
for match in matches: for match in matches:
enabled_services.add(match.group(1).strip()) # Each match is in format: <package_name>/<service>
package_name, _, service = match.group(1).partition("/")
# Mark installed services that are enabled. self.results.append(
# Installed service names may include trailing annotations like {"package_name": package_name, "service": service}
# "(A11yTool)" that are absent from the enabled services list, )
# so strip annotations before comparing.
def _strip_annotation(s: str) -> str:
return re.sub(r"\s+\(.*?\)\s*$", "", s)
installed_stripped = {
_strip_annotation(r["service"]): r for r in self.results
}
for enabled in enabled_services:
if enabled in installed_stripped:
installed_stripped[enabled]["enabled"] = True
# Add enabled services not found in the installed list
for service in enabled_services:
if service not in installed_stripped:
package_name, _, _ = service.partition("/")
self.results.append(
{
"package_name": package_name,
"service": service,
"enabled": True,
}
)

View File

@@ -49,14 +49,9 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
for result in self.results: for result in self.results:
self.log.info( self.log.info(
'Found installed accessibility service "%s" (enabled: %s)', 'Found installed accessibility service "%s"', result.get("service")
result.get("service"),
result.get("enabled", False),
) )
enabled_count = sum(1 for r in self.results if r.get("enabled"))
self.log.info( self.log.info(
"Identified a total of %d accessibility services, %d enabled", "Identified a total of %d accessibility services", len(self.results)
len(self.results),
enabled_count,
) )

View File

@@ -6,17 +6,146 @@
import binascii import binascii
import glob import glob
import logging import logging
import multiprocessing
import os import os
import os.path import os.path
import plistlib
import shutil import shutil
import sqlite3 import sqlite3
import tempfile
from typing import Optional from typing import Optional
from iOSbackup import iOSbackup from iphone_backup_decrypt import EncryptedBackup
from iphone_backup_decrypt import google_iphone_dataprotection
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Import pbkdf2_hmac from the same source iphone_backup_decrypt uses internally,
# so our key derivation is consistent with theirs.
try:
from fastpbkdf2 import pbkdf2_hmac
except ImportError:
import Crypto.Hash.SHA1
import Crypto.Hash.SHA256
import Crypto.Protocol.KDF
_HASH_FNS = {"sha1": Crypto.Hash.SHA1, "sha256": Crypto.Hash.SHA256}
def pbkdf2_hmac(hash_name, password, salt, iterations, dklen=None):
return Crypto.Protocol.KDF.PBKDF2(
password, salt, dklen, iterations, hmac_hash_module=_HASH_FNS[hash_name]
)
class MVTEncryptedBackup(EncryptedBackup):
"""Extends EncryptedBackup with derived key export/import.
NOTE: This subclass relies on internal APIs of iphone_backup_decrypt
(specifically _read_and_unlock_keybag, _keybag, and the Keybag class
internals). Pinned to iphone_backup_decrypt==0.9.0.
"""
def __init__(self, *, backup_directory, passphrase=None, derived_key=None):
if passphrase:
super().__init__(backup_directory=backup_directory, passphrase=passphrase)
self._derived_key = None # Will be set after keybag unlock
elif derived_key:
self._init_without_passphrase(backup_directory, derived_key)
else:
raise ValueError("Either passphrase or derived_key must be provided")
def _init_without_passphrase(self, backup_directory, derived_key):
"""Replicate parent __init__ state without requiring a passphrase."""
self.decrypted = False
self._backup_directory = os.path.expandvars(backup_directory)
self._passphrase = None
self._manifest_plist_path = os.path.join(
self._backup_directory, "Manifest.plist"
)
self._manifest_plist = None
self._manifest_db_path = os.path.join(self._backup_directory, "Manifest.db")
self._keybag = None
self._unlocked = False
self._temporary_folder = tempfile.mkdtemp()
self._temp_decrypted_manifest_db_path = os.path.join(
self._temporary_folder, "Manifest.db"
)
self._temp_manifest_db_conn = None
self._derived_key = derived_key # 32 raw bytes
def _read_and_unlock_keybag(self):
"""Override to capture derived key on password unlock, or use
a pre-derived key to skip PBKDF2."""
if self._unlocked:
return self._unlocked
with open(self._manifest_plist_path, "rb") as infile:
self._manifest_plist = plistlib.load(infile)
self._keybag = google_iphone_dataprotection.Keybag(
self._manifest_plist["BackupKeyBag"]
)
if self._derived_key:
# Skip PBKDF2, unwrap class keys directly with pre-derived key
self._unlocked = _unlock_keybag_with_derived_key(
self._keybag, self._derived_key
)
else:
# Normal path: full PBKDF2 derivation, capturing the intermediate key
self._unlocked, self._derived_key = _unlock_keybag_and_capture_key(
self._keybag, self._passphrase
)
self._passphrase = None
if not self._unlocked:
raise ValueError("Failed to decrypt keys: incorrect passphrase?")
return True
def get_decryption_key(self):
"""Return derived key as hex string (64 chars / 32 bytes)."""
if self._derived_key is None:
raise ValueError("No derived key available")
return self._derived_key.hex()
def _unlock_keybag_with_derived_key(keybag, passphrase_key):
"""Unlock keybag class keys using a pre-derived passphrase_key,
skipping the expensive PBKDF2 rounds."""
WRAP_PASSPHRASE = 2
for classkey in keybag.classKeys.values():
if b"WPKY" not in classkey:
continue
if classkey[b"WRAP"] & WRAP_PASSPHRASE:
k = google_iphone_dataprotection._AESUnwrap(
passphrase_key, classkey[b"WPKY"]
)
if not k:
return False
classkey[b"KEY"] = k
return True
def _unlock_keybag_and_capture_key(keybag, passphrase):
"""Run full PBKDF2 key derivation and AES unwrap, returning
(success, passphrase_key) so the derived key can be exported."""
passphrase_round1 = pbkdf2_hmac(
"sha256", passphrase, keybag.attrs[b"DPSL"], keybag.attrs[b"DPIC"], 32
)
passphrase_key = pbkdf2_hmac(
"sha1", passphrase_round1, keybag.attrs[b"SALT"], keybag.attrs[b"ITER"], 32
)
WRAP_PASSPHRASE = 2
for classkey in keybag.classKeys.values():
if b"WPKY" not in classkey:
continue
if classkey[b"WRAP"] & WRAP_PASSPHRASE:
k = google_iphone_dataprotection._AESUnwrap(
passphrase_key, classkey[b"WPKY"]
)
if not k:
return False, None
classkey[b"KEY"] = k
return True, passphrase_key
class DecryptBackup: class DecryptBackup:
"""This class provides functions to decrypt an encrypted iTunes backup """This class provides functions to decrypt an encrypted iTunes backup
@@ -55,41 +184,27 @@ class DecryptBackup:
log.critical("The backup does not seem encrypted!") log.critical("The backup does not seem encrypted!")
return False return False
def _process_file(
self, relative_path: str, domain: str, item, file_id: str, item_folder: str
) -> None:
self._backup.getFileDecryptedCopy(
manifestEntry=item, targetName=file_id, targetFolder=item_folder
)
log.info(
"Decrypted file %s [%s] to %s/%s",
relative_path,
domain,
item_folder,
file_id,
)
def process_backup(self) -> None: def process_backup(self) -> None:
if not os.path.exists(self.dest_path): if not os.path.exists(self.dest_path):
os.makedirs(self.dest_path) os.makedirs(self.dest_path)
manifest_path = os.path.join(self.dest_path, "Manifest.db") manifest_path = os.path.join(self.dest_path, "Manifest.db")
# We extract a decrypted Manifest.db. # Extract a decrypted Manifest.db to the destination folder.
self._backup.getManifestDB() self._backup.save_manifest_file(output_filename=manifest_path)
# We store it to the destination folder.
shutil.copy(self._backup.manifestDB, manifest_path)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for item in self._backup.getBackupFilesList():
try:
file_id = item["backupFile"]
relative_path = item["relativePath"]
domain = item["domain"]
# Iterate over all files in the backup and decrypt them,
# preserving the XX/file_id directory structure that downstream
# modules expect.
with self._backup.manifest_db_cursor() as cur:
cur.execute(
"SELECT fileID, domain, relativePath, file FROM Files WHERE flags=1"
)
for file_id, domain, relative_path, file_bplist in cur:
# This may be a partial backup. Skip files from the manifest # This may be a partial backup. Skip files from the manifest
# which do not exist locally. # which do not exist locally.
source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id) source_file_path = os.path.join(
self.backup_path, file_id[:2], file_id
)
if not os.path.exists(source_file_path): if not os.path.exists(source_file_path):
log.debug( log.debug(
"Skipping file %s. File not found in encrypted backup directory.", "Skipping file %s. File not found in encrypted backup directory.",
@@ -97,24 +212,26 @@ class DecryptBackup:
) )
continue continue
item_folder = os.path.join(self.dest_path, file_id[0:2]) item_folder = os.path.join(self.dest_path, file_id[:2])
if not os.path.exists(item_folder): os.makedirs(item_folder, exist_ok=True)
os.makedirs(item_folder)
# iOSBackup getFileDecryptedCopy() claims to read a "file" try:
# parameter but the code actually is reading the "manifest" key. decrypted = self._backup._decrypt_inner_file(
# Add manifest plist to both keys to handle this. file_id=file_id, file_bplist=file_bplist
item["manifest"] = item["file"] )
with open(
pool.apply_async( os.path.join(item_folder, file_id), "wb"
self._process_file, ) as handle:
args=(relative_path, domain, item, file_id, item_folder), handle.write(decrypted)
) log.info(
except Exception as exc: "Decrypted file %s [%s] to %s/%s",
log.error("Failed to decrypt file %s: %s", relative_path, exc) relative_path,
domain,
pool.close() item_folder,
pool.join() file_id,
)
except Exception as exc:
log.error("Failed to decrypt file %s: %s", relative_path, exc)
# Copying over the root plist files as well. # Copying over the root plist files as well.
for file_name in os.listdir(self.backup_path): for file_name in os.listdir(self.backup_path):
@@ -155,20 +272,23 @@ class DecryptBackup:
return return
try: try:
self._backup = iOSbackup( self._backup = MVTEncryptedBackup(
udid=os.path.basename(self.backup_path), backup_directory=self.backup_path,
cleartextpassword=password, passphrase=password,
backuproot=os.path.dirname(self.backup_path),
) )
# Eagerly trigger keybag unlock so wrong-password errors
# surface here rather than later during process_backup().
self._backup.test_decryption()
except Exception as exc: except Exception as exc:
self._backup = None
if ( if (
isinstance(exc, KeyError) isinstance(exc, ValueError)
and len(exc.args) > 0 and "passphrase" in str(exc).lower()
and exc.args[0] == b"KEY"
): ):
log.critical("Failed to decrypt backup. Password is probably wrong.") log.critical("Failed to decrypt backup. Password is probably wrong.")
elif ( elif (
isinstance(exc, FileNotFoundError) isinstance(exc, FileNotFoundError)
and hasattr(exc, "filename")
and os.path.basename(exc.filename) == "Manifest.plist" and os.path.basename(exc.filename) == "Manifest.plist"
): ):
log.critical( log.critical(
@@ -211,12 +331,14 @@ class DecryptBackup:
try: try:
key_bytes_raw = binascii.unhexlify(key_bytes) key_bytes_raw = binascii.unhexlify(key_bytes)
self._backup = iOSbackup( self._backup = MVTEncryptedBackup(
udid=os.path.basename(self.backup_path), backup_directory=self.backup_path,
derivedkey=key_bytes_raw, derived_key=key_bytes_raw,
backuproot=os.path.dirname(self.backup_path),
) )
# Eagerly trigger keybag unlock so wrong-key errors surface here.
self._backup.test_decryption()
except Exception as exc: except Exception as exc:
self._backup = None
log.exception(exc) log.exception(exc)
log.critical( log.critical(
"Failed to decrypt backup. Did you provide the correct key file?" "Failed to decrypt backup. Did you provide the correct key file?"
@@ -227,7 +349,7 @@ class DecryptBackup:
if not self._backup: if not self._backup:
return return
self._decryption_key = self._backup.getDecryptionKey() self._decryption_key = self._backup.get_decryption_key()
log.info( log.info(
'Derived decryption key for backup at path %s is: "%s"', 'Derived decryption key for backup at path %s is: "%s"',
self.backup_path, self.backup_path,

View File

@@ -25,9 +25,6 @@ class TestDumpsysAccessibilityArtifact:
da.results[0]["service"] da.results[0]["service"]
== "com.android.settings/com.samsung.android.settings.development.gpuwatch.GPUWatchInterceptor" == "com.android.settings/com.samsung.android.settings.development.gpuwatch.GPUWatchInterceptor"
) )
# All services are installed but none enabled in this fixture
for result in da.results:
assert result["enabled"] is False
def test_parsing_v14_aosp_format(self): def test_parsing_v14_aosp_format(self):
da = DumpsysAccessibilityArtifact() da = DumpsysAccessibilityArtifact()
@@ -39,32 +36,7 @@ class TestDumpsysAccessibilityArtifact:
da.parse(data) da.parse(data)
assert len(da.results) == 1 assert len(da.results) == 1
assert da.results[0]["package_name"] == "com.malware.accessibility" assert da.results[0]["package_name"] == "com.malware.accessibility"
assert ( assert da.results[0]["service"] == "com.malware.service.malwareservice"
da.results[0]["service"]
== "com.malware.accessibility/com.malware.service.malwareservice"
)
assert da.results[0]["enabled"] is True
def test_parsing_installed_and_enabled(self):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility_enabled.txt")
with open(file) as f:
data = f.read()
assert len(da.results) == 0
da.parse(data)
assert len(da.results) == 5
enabled = [r for r in da.results if r["enabled"]]
assert len(enabled) == 1
assert enabled[0]["package_name"] == "com.samsung.accessibility"
assert (
enabled[0]["service"]
== "com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)"
)
not_enabled = [r for r in da.results if not r["enabled"]]
assert len(not_enabled) == 4
def test_ioc_check(self, indicator_file): def test_ioc_check(self, indicator_file):
da = DumpsysAccessibilityArtifact() da = DumpsysAccessibilityArtifact()

View File

@@ -1,16 +0,0 @@
ACCESSIBILITY MANAGER (dumpsys accessibility)
currentUserId=0
User state[
attributes:{id=0, touchExplorationEnabled=false, installedServiceCount=5}
installed services: {
0 : com.google.android.apps.accessibility.voiceaccess/.JustSpeakService (A11yTool)
1 : com.microsoft.appmanager/com.microsoft.mmx.screenmirroringsrc.accessibility.ScreenMirroringAccessibilityService
2 : com.samsung.accessibility/.assistantmenu.serviceframework.AssistantMenuService (A11yTool)
3 : com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)
4 : com.samsung.android.accessibility.talkback/com.samsung.android.marvin.talkback.TalkBackService (A11yTool)
}
Bound services:{}
Enabled services:{{com.samsung.accessibility/.universalswitch.UniversalSwitchService}}
Binding services:{}
Crashed services:{}