mirror of
https://github.com/mvt-project/mvt.git
synced 2026-04-18 17:56:44 +02:00
Compare commits
23 Commits
todos
...
accessibil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bcbaac3a2 | ||
|
|
f26303c930 | ||
|
|
4edab3c4f8 | ||
|
|
d754f58c1a | ||
|
|
6c537c624e | ||
|
|
fd31f31aae | ||
|
|
9305c655bb | ||
|
|
ec43f93eb9 | ||
|
|
7e398310b6 | ||
|
|
600e6dcf8f | ||
|
|
3d1407b78c | ||
|
|
d41ff6d604 | ||
|
|
ccd563f2ba | ||
|
|
c681d264b3 | ||
|
|
261b0ae000 | ||
|
|
89d30e84f4 | ||
|
|
557d0a0cd6 | ||
|
|
134bfce90f | ||
|
|
0141da4293 | ||
|
|
5cba61b180 | ||
|
|
29475acb47 | ||
|
|
1d5c83582c | ||
|
|
2dd1428787 |
7
.github/workflows/mypy.yml
vendored
7
.github/workflows/mypy.yml
vendored
@@ -7,14 +7,13 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: 3.9
|
||||
cache: 'pip'
|
||||
- name: Checkout
|
||||
uses: actions/checkout@master
|
||||
- name: Install Dependencies
|
||||
run: |
|
||||
pip install mypy
|
||||
|
||||
40
.github/workflows/publish-release-docker.yml
vendored
40
.github/workflows/publish-release-docker.yml
vendored
@@ -4,6 +4,8 @@ name: Create and publish a Docker image
|
||||
# Configures this workflow to run every time a release is published.
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
branches: [main]
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
@@ -23,9 +25,18 @@ jobs:
|
||||
attestations: write
|
||||
id-token: write
|
||||
#
|
||||
strategy:
|
||||
matrix:
|
||||
platform:
|
||||
- dockerfile: "Dockerfile"
|
||||
tag-suffix: ""
|
||||
- dockerfile: "Dockerfile.ios"
|
||||
tag-suffix: "-ios"
|
||||
- dockerfile: "Dockerfile.android"
|
||||
tag-suffix: "-android"
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v6
|
||||
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
|
||||
- name: Log in to the Container registry
|
||||
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
|
||||
@@ -36,26 +47,33 @@ jobs:
|
||||
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta
|
||||
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
flavor: |
|
||||
latest=false
|
||||
tags: |
|
||||
type=raw,value=latest,enable={{ is_default_branch }},suffix=${{ matrix.platform.tag-suffix }}
|
||||
type=raw,enable=${{ github.event_name == 'release' || github.ref_type == 'tag' }},value=stable,suffix=${{ matrix.platform.tag-suffix }}
|
||||
type=raw,enable=${{ github.event_name == 'release' }},value=${{ github.event.release.tag_name }},suffix=${{ matrix.platform.tag-suffix }}
|
||||
type=raw,enable=${{ github.ref_type == 'tag' }},value=${{ github.ref_name }},suffix=${{ matrix.platform.tag-suffix }}
|
||||
type=sha,suffix=${{ matrix.platform.tag-suffix }}
|
||||
type=sha,format=long,suffix=${{ matrix.platform.tag-suffix }}
|
||||
# This step sets up some additional capabilities to generate the provenance and sbom attestations
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
|
||||
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
|
||||
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
|
||||
- name: Build and push Docker image
|
||||
id: push
|
||||
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
file: ${{ matrix.platform.dockerfile }}
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
|
||||
- name: Generate artifact attestation
|
||||
uses: actions/attest-build-provenance@v1
|
||||
with:
|
||||
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
|
||||
subject-digest: ${{ steps.push.outputs.digest }}
|
||||
push-to-registry: true
|
||||
provenance: mode=max
|
||||
sbom: true
|
||||
|
||||
|
||||
5
.github/workflows/ruff.yml
vendored
5
.github/workflows/ruff.yml
vendored
@@ -11,14 +11,13 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: 3.9
|
||||
cache: 'pip'
|
||||
- name: Checkout
|
||||
uses: actions/checkout@master
|
||||
- name: Install Dependencies
|
||||
run: |
|
||||
pip install ruff
|
||||
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -15,9 +15,9 @@ jobs:
|
||||
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install Python dependencies
|
||||
|
||||
4
.github/workflows/update-ios-data.yml
vendored
4
.github/workflows/update-ios-data.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
- name: Run script to fetch latest iOS releases from Apple RSS feed.
|
||||
run: python3 .github/workflows/scripts/update-ios-releases.py
|
||||
- name: Create Pull Request
|
||||
uses: peter-evans/create-pull-request@v5
|
||||
uses: peter-evans/create-pull-request@v8
|
||||
with:
|
||||
title: '[auto] Update iOS releases and versions'
|
||||
commit-message: Add new iOS versions and build numbers
|
||||
@@ -27,4 +27,4 @@ jobs:
|
||||
add-paths: |
|
||||
*.json
|
||||
labels: |
|
||||
automated pr
|
||||
automated pr
|
||||
|
||||
16
Dockerfile
16
Dockerfile
@@ -1,6 +1,6 @@
|
||||
# Base image for building libraries
|
||||
# ---------------------------------
|
||||
FROM ubuntu:22.04 as build-base
|
||||
FROM ubuntu:22.04 AS build-base
|
||||
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
@@ -22,7 +22,7 @@ RUN apt-get update \
|
||||
|
||||
# libplist
|
||||
# --------
|
||||
FROM build-base as build-libplist
|
||||
FROM build-base AS build-libplist
|
||||
|
||||
# Build
|
||||
RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
|
||||
@@ -32,7 +32,7 @@ RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
|
||||
|
||||
# libimobiledevice-glue
|
||||
# ---------------------
|
||||
FROM build-base as build-libimobiledevice-glue
|
||||
FROM build-base AS build-libimobiledevice-glue
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -45,7 +45,7 @@ RUN git clone https://github.com/libimobiledevice/libimobiledevice-glue && cd li
|
||||
|
||||
# libtatsu
|
||||
# --------
|
||||
FROM build-base as build-libtatsu
|
||||
FROM build-base AS build-libtatsu
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -58,7 +58,7 @@ RUN git clone https://github.com/libimobiledevice/libtatsu && cd libtatsu \
|
||||
|
||||
# libusbmuxd
|
||||
# ----------
|
||||
FROM build-base as build-libusbmuxd
|
||||
FROM build-base AS build-libusbmuxd
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -72,7 +72,7 @@ RUN git clone https://github.com/libimobiledevice/libusbmuxd && cd libusbmuxd \
|
||||
|
||||
# libimobiledevice
|
||||
# ----------------
|
||||
FROM build-base as build-libimobiledevice
|
||||
FROM build-base AS build-libimobiledevice
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -88,7 +88,7 @@ RUN git clone https://github.com/libimobiledevice/libimobiledevice && cd libimob
|
||||
|
||||
# usbmuxd
|
||||
# -------
|
||||
FROM build-base as build-usbmuxd
|
||||
FROM build-base AS build-usbmuxd
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -103,7 +103,7 @@ RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
|
||||
|
||||
|
||||
# Create main image
|
||||
FROM ubuntu:24.04 as main
|
||||
FROM ubuntu:24.04 AS main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Create main image
|
||||
FROM python:3.10.14-alpine3.20 as main
|
||||
FROM python:3.10.14-alpine3.20 AS main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Base image for building libraries
|
||||
# ---------------------------------
|
||||
FROM ubuntu:22.04 as build-base
|
||||
FROM ubuntu:22.04 AS build-base
|
||||
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
@@ -22,7 +22,7 @@ RUN apt-get update \
|
||||
|
||||
# libplist
|
||||
# --------
|
||||
FROM build-base as build-libplist
|
||||
FROM build-base AS build-libplist
|
||||
|
||||
# Build
|
||||
RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
|
||||
@@ -32,7 +32,7 @@ RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
|
||||
|
||||
# libimobiledevice-glue
|
||||
# ---------------------
|
||||
FROM build-base as build-libimobiledevice-glue
|
||||
FROM build-base AS build-libimobiledevice-glue
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -45,7 +45,7 @@ RUN git clone https://github.com/libimobiledevice/libimobiledevice-glue && cd li
|
||||
|
||||
# libtatsu
|
||||
# --------
|
||||
FROM build-base as build-libtatsu
|
||||
FROM build-base AS build-libtatsu
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -58,7 +58,7 @@ RUN git clone https://github.com/libimobiledevice/libtatsu && cd libtatsu \
|
||||
|
||||
# libusbmuxd
|
||||
# ----------
|
||||
FROM build-base as build-libusbmuxd
|
||||
FROM build-base AS build-libusbmuxd
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -72,7 +72,7 @@ RUN git clone https://github.com/libimobiledevice/libusbmuxd && cd libusbmuxd \
|
||||
|
||||
# libimobiledevice
|
||||
# ----------------
|
||||
FROM build-base as build-libimobiledevice
|
||||
FROM build-base AS build-libimobiledevice
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -88,7 +88,7 @@ RUN git clone https://github.com/libimobiledevice/libimobiledevice && cd libimob
|
||||
|
||||
# usbmuxd
|
||||
# -------
|
||||
FROM build-base as build-usbmuxd
|
||||
FROM build-base AS build-usbmuxd
|
||||
|
||||
# Install dependencies
|
||||
COPY --from=build-libplist /build /
|
||||
@@ -104,7 +104,7 @@ RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
|
||||
|
||||
# Main image
|
||||
# ----------
|
||||
FROM python:3.10.14-alpine3.20 as main
|
||||
FROM python:3.10.14-alpine3.20 AS main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
|
||||
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ test-requirements:
|
||||
generate-proto-parsers:
|
||||
# Generate python parsers for protobuf files
|
||||
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
|
||||
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
|
||||
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto2_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
|
||||
|
||||
clean:
|
||||
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
|
||||
# Mobile Verification Toolkit
|
||||
|
||||
> [!IMPORTANT]
|
||||
> Soon we will merge the v3 pull request which will result in breaking changes. If you rely on mvt output in other script make sure to the the branch before we merge. More details: https://github.com/mvt-project/mvt/issues/757
|
||||
|
||||
[](https://pypi.org/project/mvt/)
|
||||
[](https://docs.mvt.re/en/latest/?badge=latest)
|
||||
[](https://github.com/mvt-project/mvt/actions/workflows/tests.yml)
|
||||
|
||||
@@ -2,4 +2,4 @@ mkdocs==1.6.1
|
||||
mkdocs-autorefs==1.4.3
|
||||
mkdocs-material==9.6.20
|
||||
mkdocs-material-extensions==1.3.1
|
||||
mkdocstrings==0.30.1
|
||||
mkdocstrings==1.0.0
|
||||
@@ -17,25 +17,25 @@ classifiers = [
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
dependencies = [
|
||||
"click==8.3.0",
|
||||
"rich==14.1.0",
|
||||
"click==8.3.2",
|
||||
"rich==14.3.3",
|
||||
"tld==0.13.1",
|
||||
"requests==2.32.5",
|
||||
"requests==2.33.1",
|
||||
"simplejson==3.20.2",
|
||||
"packaging==25.0",
|
||||
"packaging==26.0",
|
||||
"appdirs==1.4.4",
|
||||
"iOSbackup==0.9.925",
|
||||
"adb-shell[usb]==0.4.4",
|
||||
"libusb1==3.3.1",
|
||||
"cryptography==46.0.3",
|
||||
"cryptography==46.0.6",
|
||||
"PyYAML>=6.0.2",
|
||||
"pyahocorasick==2.2.0",
|
||||
"betterproto==1.2.5",
|
||||
"pydantic==2.12.3",
|
||||
"pydantic-settings==2.10.1",
|
||||
"betterproto2==0.9.1",
|
||||
"pydantic==2.12.5",
|
||||
"pydantic-settings==2.13.1",
|
||||
"NSKeyedUnArchiver==1.5.2",
|
||||
"python-dateutil==2.9.0.post0",
|
||||
"tzdata==2025.2",
|
||||
"tzdata==2026.1",
|
||||
]
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
@@ -57,7 +57,7 @@ dev = [
|
||||
"stix2>=3.0.1",
|
||||
"ruff>=0.1.6",
|
||||
"mypy>=1.7.1",
|
||||
"betterproto[compiler]",
|
||||
"betterproto2-compiler",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
@@ -81,8 +81,8 @@ addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.ruff]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
ignore = [
|
||||
lint.select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
lint.ignore = [
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
|
||||
@@ -95,10 +95,10 @@ ignore = [
|
||||
# "E203", # whitespace-before-punctuation
|
||||
]
|
||||
|
||||
[tool.ruff.per-file-ignores]
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
|
||||
[tool.ruff.mccabe]
|
||||
[tool.ruff.lint.mccabe]
|
||||
max-complexity = 10
|
||||
|
||||
[tool.setuptools]
|
||||
|
||||
@@ -22,13 +22,13 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
|
||||
def parse(self, content: str) -> None:
|
||||
"""
|
||||
Parse the Dumpsys Accessibility section/
|
||||
Adds results to self.results (List[Dict[str, str]])
|
||||
Parse the Dumpsys Accessibility section.
|
||||
Adds results to self.results (List[Dict[str, Any]])
|
||||
|
||||
:param content: content of the accessibility section (string)
|
||||
"""
|
||||
|
||||
# "Old" syntax
|
||||
# Parse installed services
|
||||
in_services = False
|
||||
for line in content.splitlines():
|
||||
if line.strip().startswith("installed services:"):
|
||||
@@ -39,7 +39,6 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
continue
|
||||
|
||||
if line.strip() == "}":
|
||||
# At end of installed services
|
||||
break
|
||||
|
||||
service = line.split(":")[1].strip()
|
||||
@@ -48,21 +47,66 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
{
|
||||
"package_name": service.split("/")[0],
|
||||
"service": service,
|
||||
"enabled": False,
|
||||
}
|
||||
)
|
||||
|
||||
# "New" syntax - AOSP >= 14 (?)
|
||||
# Looks like:
|
||||
# Enabled services:{{com.azure.authenticator/com.microsoft.brooklyn.module.accessibility.BrooklynAccessibilityService}, {com.agilebits.onepassword/com.agilebits.onepassword.filling.accessibility.FillingAccessibilityService}}
|
||||
# Parse enabled services from both old and new formats.
|
||||
#
|
||||
# Old format (multi-line block):
|
||||
# enabled services: {
|
||||
# 0 : com.example/.MyService
|
||||
# }
|
||||
#
|
||||
# New format (single line, AOSP >= 14):
|
||||
# Enabled services:{{com.example/com.example.MyService}, {com.other/com.other.Svc}}
|
||||
enabled_services = set()
|
||||
|
||||
in_enabled = False
|
||||
for line in content.splitlines():
|
||||
if line.strip().startswith("Enabled services:"):
|
||||
matches = re.finditer(r"{([^{]+?)}", line)
|
||||
stripped = line.strip()
|
||||
|
||||
if in_enabled:
|
||||
if stripped == "}":
|
||||
in_enabled = False
|
||||
continue
|
||||
service = line.split(":")[1].strip()
|
||||
enabled_services.add(service)
|
||||
continue
|
||||
|
||||
if re.match(r"enabled services:\s*\{\s*$", stripped, re.IGNORECASE):
|
||||
# Old multi-line format: "enabled services: {"
|
||||
in_enabled = True
|
||||
continue
|
||||
|
||||
if re.match(r"enabled services:\s*\{", stripped, re.IGNORECASE):
|
||||
# New single-line format: "Enabled services:{{pkg/svc}, {pkg2/svc2}}"
|
||||
matches = re.finditer(r"\{([^{}]+)\}", stripped)
|
||||
for match in matches:
|
||||
# Each match is in format: <package_name>/<service>
|
||||
package_name, _, service = match.group(1).partition("/")
|
||||
enabled_services.add(match.group(1).strip())
|
||||
|
||||
self.results.append(
|
||||
{"package_name": package_name, "service": service}
|
||||
)
|
||||
# Mark installed services that are enabled.
|
||||
# Installed service names may include trailing annotations like
|
||||
# "(A11yTool)" that are absent from the enabled services list,
|
||||
# so strip annotations before comparing.
|
||||
def _strip_annotation(s: str) -> str:
|
||||
return re.sub(r"\s+\(.*?\)\s*$", "", s)
|
||||
|
||||
installed_stripped = {
|
||||
_strip_annotation(r["service"]): r for r in self.results
|
||||
}
|
||||
for enabled in enabled_services:
|
||||
if enabled in installed_stripped:
|
||||
installed_stripped[enabled]["enabled"] = True
|
||||
|
||||
# Add enabled services not found in the installed list
|
||||
for service in enabled_services:
|
||||
if service not in installed_stripped:
|
||||
package_name, _, _ = service.partition("/")
|
||||
self.results.append(
|
||||
{
|
||||
"package_name": package_name,
|
||||
"service": service,
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -186,7 +186,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
package = []
|
||||
|
||||
in_package_list = False
|
||||
for line in content.splitlines():
|
||||
for line in content.split("\n"):
|
||||
if line.startswith("Packages:"):
|
||||
in_package_list = True
|
||||
continue
|
||||
|
||||
@@ -8,7 +8,7 @@ from .artifact import AndroidArtifact
|
||||
|
||||
class Processes(AndroidArtifact):
|
||||
def parse(self, entry: str) -> None:
|
||||
for line in entry.splitlines()[1:]:
|
||||
for line in entry.split("\n")[1:]:
|
||||
proc = line.split()
|
||||
|
||||
# Skip empty lines
|
||||
|
||||
@@ -7,7 +7,7 @@ import datetime
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import pydantic
|
||||
import betterproto
|
||||
import betterproto2
|
||||
from dateutil import parser
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
@@ -124,7 +124,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
"""Parse Android tombstone crash files from a protobuf object."""
|
||||
tombstone_pb = Tombstone().parse(data)
|
||||
tombstone_dict = tombstone_pb.to_dict(
|
||||
betterproto.Casing.SNAKE, include_default_values=True
|
||||
casing=betterproto2.Casing.SNAKE, include_default_values=True
|
||||
)
|
||||
|
||||
# Add some extra metadata
|
||||
@@ -193,7 +193,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
# eg. "Process uptime: 40s"
|
||||
tombstone[destination_key] = int(value_clean.rstrip("s"))
|
||||
elif destination_key == "command_line":
|
||||
# Wrap in list for consistency with protobuf format (repeated string).
|
||||
# XXX: Check if command line should be a single string in a list, or a list of strings.
|
||||
tombstone[destination_key] = [value_clean]
|
||||
else:
|
||||
tombstone[destination_key] = value_clean
|
||||
|
||||
@@ -117,6 +117,8 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose)
|
||||
if from_file:
|
||||
download = DownloadAPKs.from_json(from_file)
|
||||
else:
|
||||
# TODO: Do we actually want to be able to run without storing any
|
||||
# file?
|
||||
if not output:
|
||||
log.critical("You need to specify an output folder with --output!")
|
||||
ctx.exit(1)
|
||||
|
||||
@@ -105,15 +105,15 @@ class AQFFiles(AndroidQFModule):
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
for hash_key in ("sha256", "sha1", "md5"):
|
||||
file_hash = result.get(hash_key, "")
|
||||
if not file_hash:
|
||||
continue
|
||||
ioc = self.indicators.check_file_hash(file_hash)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
break
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_file_hash(result["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
|
||||
def run(self) -> None:
|
||||
if timezone := self._get_device_timezone():
|
||||
@@ -128,7 +128,7 @@ class AQFFiles(AndroidQFModule):
|
||||
data = json.loads(rawdata)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = []
|
||||
for line in rawdata.splitlines():
|
||||
for line in rawdata.split("\n"):
|
||||
if line.strip() == "":
|
||||
continue
|
||||
data.append(json.loads(line))
|
||||
@@ -139,7 +139,7 @@ class AQFFiles(AndroidQFModule):
|
||||
utc_timestamp = datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
# Convert the UTC timestamp to local time on Android device's local timezone
|
||||
# Convert the UTC timestamp to local tiem on Android device's local timezone
|
||||
local_timestamp = utc_timestamp.astimezone(device_timezone)
|
||||
|
||||
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
|
||||
|
||||
@@ -39,7 +39,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
|
||||
self.results[namespace] = {}
|
||||
data = self._get_file_content(setting_file)
|
||||
for line in data.decode("utf-8").splitlines():
|
||||
for line in data.decode("utf-8").split("\n"):
|
||||
line = line.strip()
|
||||
try:
|
||||
key, value = line.split("=", 1)
|
||||
|
||||
@@ -49,9 +49,14 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
|
||||
for result in self.results:
|
||||
self.log.info(
|
||||
'Found installed accessibility service "%s"', result.get("service")
|
||||
'Found installed accessibility service "%s" (enabled: %s)',
|
||||
result.get("service"),
|
||||
result.get("enabled", False),
|
||||
)
|
||||
|
||||
enabled_count = sum(1 for r in self.results if r.get("enabled"))
|
||||
self.log.info(
|
||||
"Identified a total of %d accessibility services", len(self.results)
|
||||
"Identified a total of %d accessibility services, %d enabled",
|
||||
len(self.results),
|
||||
enabled_count,
|
||||
)
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# sources: tombstone.proto
|
||||
# plugin: python-betterproto
|
||||
# plugin: python-betterproto2
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List
|
||||
|
||||
import betterproto
|
||||
import betterproto2
|
||||
|
||||
|
||||
class Architecture(betterproto.Enum):
|
||||
class Architecture(betterproto2.Enum):
|
||||
ARM32 = 0
|
||||
ARM64 = 1
|
||||
X86 = 2
|
||||
@@ -16,12 +15,12 @@ class Architecture(betterproto.Enum):
|
||||
NONE = 5
|
||||
|
||||
|
||||
class MemoryErrorTool(betterproto.Enum):
|
||||
class MemoryErrorTool(betterproto2.Enum):
|
||||
GWP_ASAN = 0
|
||||
SCUDO = 1
|
||||
|
||||
|
||||
class MemoryErrorType(betterproto.Enum):
|
||||
class MemoryErrorType(betterproto2.Enum):
|
||||
UNKNOWN = 0
|
||||
USE_AFTER_FREE = 1
|
||||
DOUBLE_FREE = 2
|
||||
@@ -30,179 +29,179 @@ class MemoryErrorType(betterproto.Enum):
|
||||
BUFFER_UNDERFLOW = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrashDetail(betterproto.Message):
|
||||
@dataclass(eq=False, repr=False)
|
||||
class CrashDetail(betterproto2.Message):
|
||||
"""
|
||||
NOTE TO OEMS: If you add custom fields to this proto, do not use numbers in
|
||||
the reserved range.
|
||||
"""
|
||||
|
||||
name: bytes = betterproto.bytes_field(1)
|
||||
data: bytes = betterproto.bytes_field(2)
|
||||
name: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
|
||||
data: "bytes" = betterproto2.field(2, betterproto2.TYPE_BYTES)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBufferEntry(betterproto.Message):
|
||||
addr: "BacktraceFrame" = betterproto.message_field(1)
|
||||
fp: int = betterproto.uint64_field(2)
|
||||
tag: int = betterproto.uint64_field(3)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class StackHistoryBufferEntry(betterproto2.Message):
|
||||
addr: "BacktraceFrame | None" = betterproto2.field(1, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
fp: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
tag: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBuffer(betterproto.Message):
|
||||
tid: int = betterproto.uint64_field(1)
|
||||
entries: List["StackHistoryBufferEntry"] = betterproto.message_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class StackHistoryBuffer(betterproto2.Message):
|
||||
tid: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
entries: "list[StackHistoryBufferEntry]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tombstone(betterproto.Message):
|
||||
arch: "Architecture" = betterproto.enum_field(1)
|
||||
guest_arch: "Architecture" = betterproto.enum_field(24)
|
||||
build_fingerprint: str = betterproto.string_field(2)
|
||||
revision: str = betterproto.string_field(3)
|
||||
timestamp: str = betterproto.string_field(4)
|
||||
pid: int = betterproto.uint32_field(5)
|
||||
tid: int = betterproto.uint32_field(6)
|
||||
uid: int = betterproto.uint32_field(7)
|
||||
selinux_label: str = betterproto.string_field(8)
|
||||
command_line: List[str] = betterproto.string_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Tombstone(betterproto2.Message):
|
||||
arch: "Architecture" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
|
||||
guest_arch: "Architecture" = betterproto2.field(24, betterproto2.TYPE_ENUM, default_factory=lambda: Architecture(0))
|
||||
build_fingerprint: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
revision: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
|
||||
timestamp: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
pid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT32)
|
||||
tid: "int" = betterproto2.field(6, betterproto2.TYPE_UINT32)
|
||||
uid: "int" = betterproto2.field(7, betterproto2.TYPE_UINT32)
|
||||
selinux_label: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
command_line: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
|
||||
# Process uptime in seconds.
|
||||
process_uptime: int = betterproto.uint32_field(20)
|
||||
signal_info: "Signal" = betterproto.message_field(10)
|
||||
abort_message: str = betterproto.string_field(14)
|
||||
crash_details: List["CrashDetail"] = betterproto.message_field(21)
|
||||
causes: List["Cause"] = betterproto.message_field(15)
|
||||
threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
16, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
process_uptime: "int" = betterproto2.field(20, betterproto2.TYPE_UINT32)
|
||||
signal_info: "Signal | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
abort_message: "str" = betterproto2.field(14, betterproto2.TYPE_STRING)
|
||||
crash_details: "list[CrashDetail]" = betterproto2.field(21, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
causes: "list[Cause]" = betterproto2.field(15, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
threads: "dict[int, Thread]" = betterproto2.field(
|
||||
16, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
|
||||
)
|
||||
guest_threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
25, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
guest_threads: "dict[int, Thread]" = betterproto2.field(
|
||||
25, betterproto2.TYPE_MAP, map_meta=betterproto2.map_meta(betterproto2.TYPE_UINT32, betterproto2.TYPE_MESSAGE)
|
||||
)
|
||||
memory_mappings: List["MemoryMapping"] = betterproto.message_field(17)
|
||||
log_buffers: List["LogBuffer"] = betterproto.message_field(18)
|
||||
open_fds: List["FD"] = betterproto.message_field(19)
|
||||
page_size: int = betterproto.uint32_field(22)
|
||||
has_been_16kb_mode: bool = betterproto.bool_field(23)
|
||||
stack_history_buffer: "StackHistoryBuffer" = betterproto.message_field(26)
|
||||
memory_mappings: "list[MemoryMapping]" = betterproto2.field(17, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
log_buffers: "list[LogBuffer]" = betterproto2.field(18, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
open_fds: "list[FD]" = betterproto2.field(19, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
page_size: "int" = betterproto2.field(22, betterproto2.TYPE_UINT32)
|
||||
has_been_16kb_mode: "bool" = betterproto2.field(23, betterproto2.TYPE_BOOL)
|
||||
stack_history_buffer: "StackHistoryBuffer | None" = betterproto2.field(26, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Signal(betterproto.Message):
|
||||
number: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
code: int = betterproto.int32_field(3)
|
||||
code_name: str = betterproto.string_field(4)
|
||||
has_sender: bool = betterproto.bool_field(5)
|
||||
sender_uid: int = betterproto.int32_field(6)
|
||||
sender_pid: int = betterproto.int32_field(7)
|
||||
has_fault_address: bool = betterproto.bool_field(8)
|
||||
fault_address: int = betterproto.uint64_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Signal(betterproto2.Message):
|
||||
number: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
code: "int" = betterproto2.field(3, betterproto2.TYPE_INT32)
|
||||
code_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
has_sender: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
|
||||
sender_uid: "int" = betterproto2.field(6, betterproto2.TYPE_INT32)
|
||||
sender_pid: "int" = betterproto2.field(7, betterproto2.TYPE_INT32)
|
||||
has_fault_address: "bool" = betterproto2.field(8, betterproto2.TYPE_BOOL)
|
||||
fault_address: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
|
||||
# Note, may or may not contain the dump of the actual memory contents.
|
||||
# Currently, on arm64, we only include metadata, and not the contents.
|
||||
fault_adjacent_metadata: "MemoryDump" = betterproto.message_field(10)
|
||||
fault_adjacent_metadata: "MemoryDump | None" = betterproto2.field(10, betterproto2.TYPE_MESSAGE, optional=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeapObject(betterproto.Message):
|
||||
address: int = betterproto.uint64_field(1)
|
||||
size: int = betterproto.uint64_field(2)
|
||||
allocation_tid: int = betterproto.uint64_field(3)
|
||||
allocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
deallocation_tid: int = betterproto.uint64_field(5)
|
||||
deallocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(6)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class HeapObject(betterproto2.Message):
|
||||
address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
size: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
allocation_tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
allocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
deallocation_tid: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
|
||||
deallocation_backtrace: "list[BacktraceFrame]" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryError(betterproto.Message):
|
||||
tool: "MemoryErrorTool" = betterproto.enum_field(1)
|
||||
type: "MemoryErrorType" = betterproto.enum_field(2)
|
||||
heap: "HeapObject" = betterproto.message_field(3, group="location")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryError(betterproto2.Message):
|
||||
tool: "MemoryErrorTool" = betterproto2.field(1, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorTool(0))
|
||||
type: "MemoryErrorType" = betterproto2.field(2, betterproto2.TYPE_ENUM, default_factory=lambda: MemoryErrorType(0))
|
||||
heap: "HeapObject | None" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, optional=True, group="location")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Cause(betterproto.Message):
|
||||
human_readable: str = betterproto.string_field(1)
|
||||
memory_error: "MemoryError" = betterproto.message_field(2, group="details")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Cause(betterproto2.Message):
|
||||
human_readable: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
memory_error: "MemoryError | None" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, optional=True, group="details")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Register(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
u64: int = betterproto.uint64_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Register(betterproto2.Message):
|
||||
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
u64: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thread(betterproto.Message):
|
||||
id: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
registers: List["Register"] = betterproto.message_field(3)
|
||||
backtrace_note: List[str] = betterproto.string_field(7)
|
||||
unreadable_elf_files: List[str] = betterproto.string_field(9)
|
||||
current_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
memory_dump: List["MemoryDump"] = betterproto.message_field(5)
|
||||
tagged_addr_ctrl: int = betterproto.int64_field(6)
|
||||
pac_enabled_keys: int = betterproto.int64_field(8)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class Thread(betterproto2.Message):
|
||||
id: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
registers: "list[Register]" = betterproto2.field(3, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
backtrace_note: "list[str]" = betterproto2.field(7, betterproto2.TYPE_STRING, repeated=True)
|
||||
unreadable_elf_files: "list[str]" = betterproto2.field(9, betterproto2.TYPE_STRING, repeated=True)
|
||||
current_backtrace: "list[BacktraceFrame]" = betterproto2.field(4, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
memory_dump: "list[MemoryDump]" = betterproto2.field(5, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
tagged_addr_ctrl: "int" = betterproto2.field(6, betterproto2.TYPE_INT64)
|
||||
pac_enabled_keys: "int" = betterproto2.field(8, betterproto2.TYPE_INT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BacktraceFrame(betterproto.Message):
|
||||
rel_pc: int = betterproto.uint64_field(1)
|
||||
pc: int = betterproto.uint64_field(2)
|
||||
sp: int = betterproto.uint64_field(3)
|
||||
function_name: str = betterproto.string_field(4)
|
||||
function_offset: int = betterproto.uint64_field(5)
|
||||
file_name: str = betterproto.string_field(6)
|
||||
file_map_offset: int = betterproto.uint64_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class BacktraceFrame(betterproto2.Message):
|
||||
rel_pc: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
pc: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
sp: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
function_name: "str" = betterproto2.field(4, betterproto2.TYPE_STRING)
|
||||
function_offset: "int" = betterproto2.field(5, betterproto2.TYPE_UINT64)
|
||||
file_name: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
|
||||
file_map_offset: "int" = betterproto2.field(7, betterproto2.TYPE_UINT64)
|
||||
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArmMTEMetadata(betterproto.Message):
|
||||
@dataclass(eq=False, repr=False)
|
||||
class ArmMTEMetadata(betterproto2.Message):
|
||||
# One memory tag per granule (e.g. every 16 bytes) of regular memory.
|
||||
memory_tags: bytes = betterproto.bytes_field(1)
|
||||
memory_tags: "bytes" = betterproto2.field(1, betterproto2.TYPE_BYTES)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryDump(betterproto.Message):
|
||||
register_name: str = betterproto.string_field(1)
|
||||
mapping_name: str = betterproto.string_field(2)
|
||||
begin_address: int = betterproto.uint64_field(3)
|
||||
memory: bytes = betterproto.bytes_field(4)
|
||||
arm_mte_metadata: "ArmMTEMetadata" = betterproto.message_field(6, group="metadata")
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryDump(betterproto2.Message):
|
||||
register_name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
mapping_name: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
begin_address: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
memory: "bytes" = betterproto2.field(4, betterproto2.TYPE_BYTES)
|
||||
arm_mte_metadata: "ArmMTEMetadata | None" = betterproto2.field(6, betterproto2.TYPE_MESSAGE, optional=True, group="metadata")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryMapping(betterproto.Message):
|
||||
begin_address: int = betterproto.uint64_field(1)
|
||||
end_address: int = betterproto.uint64_field(2)
|
||||
offset: int = betterproto.uint64_field(3)
|
||||
read: bool = betterproto.bool_field(4)
|
||||
write: bool = betterproto.bool_field(5)
|
||||
execute: bool = betterproto.bool_field(6)
|
||||
mapping_name: str = betterproto.string_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
load_bias: int = betterproto.uint64_field(9)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class MemoryMapping(betterproto2.Message):
|
||||
begin_address: "int" = betterproto2.field(1, betterproto2.TYPE_UINT64)
|
||||
end_address: "int" = betterproto2.field(2, betterproto2.TYPE_UINT64)
|
||||
offset: "int" = betterproto2.field(3, betterproto2.TYPE_UINT64)
|
||||
read: "bool" = betterproto2.field(4, betterproto2.TYPE_BOOL)
|
||||
write: "bool" = betterproto2.field(5, betterproto2.TYPE_BOOL)
|
||||
execute: "bool" = betterproto2.field(6, betterproto2.TYPE_BOOL)
|
||||
mapping_name: "str" = betterproto2.field(7, betterproto2.TYPE_STRING)
|
||||
build_id: "str" = betterproto2.field(8, betterproto2.TYPE_STRING)
|
||||
load_bias: "int" = betterproto2.field(9, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FD(betterproto.Message):
|
||||
fd: int = betterproto.int32_field(1)
|
||||
path: str = betterproto.string_field(2)
|
||||
owner: str = betterproto.string_field(3)
|
||||
tag: int = betterproto.uint64_field(4)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class FD(betterproto2.Message):
|
||||
fd: "int" = betterproto2.field(1, betterproto2.TYPE_INT32)
|
||||
path: "str" = betterproto2.field(2, betterproto2.TYPE_STRING)
|
||||
owner: "str" = betterproto2.field(3, betterproto2.TYPE_STRING)
|
||||
tag: "int" = betterproto2.field(4, betterproto2.TYPE_UINT64)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogBuffer(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
logs: List["LogMessage"] = betterproto.message_field(2)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class LogBuffer(betterproto2.Message):
|
||||
name: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
logs: "list[LogMessage]" = betterproto2.field(2, betterproto2.TYPE_MESSAGE, repeated=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogMessage(betterproto.Message):
|
||||
timestamp: str = betterproto.string_field(1)
|
||||
pid: int = betterproto.uint32_field(2)
|
||||
tid: int = betterproto.uint32_field(3)
|
||||
priority: int = betterproto.uint32_field(4)
|
||||
tag: str = betterproto.string_field(5)
|
||||
message: str = betterproto.string_field(6)
|
||||
@dataclass(eq=False, repr=False)
|
||||
class LogMessage(betterproto2.Message):
|
||||
timestamp: "str" = betterproto2.field(1, betterproto2.TYPE_STRING)
|
||||
pid: "int" = betterproto2.field(2, betterproto2.TYPE_UINT32)
|
||||
tid: "int" = betterproto2.field(3, betterproto2.TYPE_UINT32)
|
||||
priority: "int" = betterproto2.field(4, betterproto2.TYPE_UINT32)
|
||||
tag: "str" = betterproto2.field(5, betterproto2.TYPE_STRING)
|
||||
message: "str" = betterproto2.field(6, betterproto2.TYPE_STRING)
|
||||
|
||||
@@ -222,6 +222,7 @@ class Command:
|
||||
if self.module_name and module.__name__ != self.module_name:
|
||||
continue
|
||||
|
||||
# FIXME: do we need the logger here
|
||||
module_logger = logging.getLogger(module.__module__)
|
||||
|
||||
m = module(
|
||||
|
||||
@@ -52,9 +52,7 @@ class Indicators:
|
||||
if os.path.isfile(path) and path.lower().endswith(".stix2"):
|
||||
self.parse_stix2(path)
|
||||
elif os.path.isdir(path):
|
||||
for file in glob.glob(
|
||||
os.path.join(path, "**", "*.stix2", recursive=True)
|
||||
):
|
||||
for file in glob.glob(os.path.join(path, "**", "*.stix2"), recursive=True):
|
||||
self.parse_stix2(file)
|
||||
else:
|
||||
self.log.error(
|
||||
@@ -102,6 +100,17 @@ class Indicators:
|
||||
key, value = indicator.get("pattern", "").strip("[]").split("=")
|
||||
key = key.strip()
|
||||
|
||||
# Normalize hash algorithm keys so that both the STIX2-spec-compliant
|
||||
# form (e.g. file:hashes.'SHA-256', which requires quotes around
|
||||
# algorithm names that contain hyphens) and the non-standard lowercase
|
||||
# form (e.g. file:hashes.sha256) are accepted. Strip single quotes and
|
||||
# hyphens from the algorithm name only, then lowercase it.
|
||||
for sep in ("hashes.", "cert."):
|
||||
if sep in key:
|
||||
prefix, _, algo = key.partition(sep)
|
||||
key = prefix + sep + algo.replace("'", "").replace("-", "").lower()
|
||||
break
|
||||
|
||||
if key == "domain-name:value":
|
||||
# We force domain names to lower case.
|
||||
self._add_indicator(
|
||||
|
||||
@@ -180,8 +180,10 @@ class IndicatorsUpdates:
|
||||
def _get_remote_file_latest_commit(
|
||||
self, owner: str, repo: str, branch: str, path: str
|
||||
) -> int:
|
||||
# TODO: The branch is currently not taken into consideration.
|
||||
# How do we specify which branch to look up to the API?
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}&sha={branch}"
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
|
||||
@@ -119,9 +119,10 @@ def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool =
|
||||
if from_2001:
|
||||
timestamp = timestamp + 978307200
|
||||
|
||||
# TODO: This is rather ugly. Happens sometimes with invalid timestamps.
|
||||
try:
|
||||
return convert_unix_to_utc_datetime(timestamp)
|
||||
except (OSError, OverflowError, ValueError):
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -907,6 +907,10 @@
|
||||
"version": "15.8.6",
|
||||
"build": "19H402"
|
||||
},
|
||||
{
|
||||
"version": "15.8.7",
|
||||
"build": "19H411"
|
||||
},
|
||||
{
|
||||
"build": "20A362",
|
||||
"version": "16.0"
|
||||
@@ -1020,6 +1024,10 @@
|
||||
"version": "16.7.14",
|
||||
"build": "20H370"
|
||||
},
|
||||
{
|
||||
"version": "16.7.15",
|
||||
"build": "20H380"
|
||||
},
|
||||
{
|
||||
"version": "17.0",
|
||||
"build": "21A327"
|
||||
@@ -1188,6 +1196,10 @@
|
||||
"version": "18.7.6",
|
||||
"build": "22H320"
|
||||
},
|
||||
{
|
||||
"version": "18.7.7",
|
||||
"build": "22H333"
|
||||
},
|
||||
{
|
||||
"version": "26",
|
||||
"build": "23A341"
|
||||
@@ -1215,5 +1227,9 @@
|
||||
{
|
||||
"version": "26.3.1",
|
||||
"build": "23D8133"
|
||||
},
|
||||
{
|
||||
"version": "26.4",
|
||||
"build": "23E246"
|
||||
}
|
||||
]
|
||||
@@ -87,35 +87,6 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_key(d: dict, key: str) -> None:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
@staticmethod
|
||||
def _b64encode_keys(d: dict, keys: list) -> None:
|
||||
for key in keys:
|
||||
if key in d:
|
||||
d[key] = b64encode(d[key])
|
||||
|
||||
def _b64encode_plist_bytes(self, plist: dict) -> None:
|
||||
"""Encode binary plist values to base64 for JSON serialization."""
|
||||
if "SignerCerts" in plist:
|
||||
plist["SignerCerts"] = [b64encode(x) for x in plist["SignerCerts"]]
|
||||
|
||||
self._b64encode_keys(plist, ["PushTokenDataSentToServerKey", "LastPushTokenHash"])
|
||||
|
||||
if "OTAProfileStub" in plist:
|
||||
stub = plist["OTAProfileStub"]
|
||||
if "SignerCerts" in stub:
|
||||
stub["SignerCerts"] = [b64encode(x) for x in stub["SignerCerts"]]
|
||||
if "PayloadContent" in stub:
|
||||
self._b64encode_key(stub["PayloadContent"], "EnrollmentIdentityPersistentID")
|
||||
|
||||
if "PayloadContent" in plist:
|
||||
for entry in plist["PayloadContent"]:
|
||||
self._b64encode_keys(entry, ["PERSISTENT_REF", "IdentityPersistentRef"])
|
||||
|
||||
def run(self) -> None:
|
||||
for conf_file in self._get_backup_files_from_manifest(
|
||||
domain=CONF_PROFILES_DOMAIN
|
||||
@@ -144,7 +115,65 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
except Exception:
|
||||
conf_plist = {}
|
||||
|
||||
self._b64encode_plist_bytes(conf_plist)
|
||||
# TODO: Tidy up the following code hell.
|
||||
|
||||
if "SignerCerts" in conf_plist:
|
||||
conf_plist["SignerCerts"] = [
|
||||
b64encode(x) for x in conf_plist["SignerCerts"]
|
||||
]
|
||||
|
||||
if "OTAProfileStub" in conf_plist:
|
||||
if "SignerCerts" in conf_plist["OTAProfileStub"]:
|
||||
conf_plist["OTAProfileStub"]["SignerCerts"] = [
|
||||
b64encode(x)
|
||||
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
|
||||
]
|
||||
|
||||
if "PayloadContent" in conf_plist["OTAProfileStub"]:
|
||||
if (
|
||||
"EnrollmentIdentityPersistentID"
|
||||
in conf_plist["OTAProfileStub"]["PayloadContent"]
|
||||
):
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
] = b64encode(
|
||||
conf_plist["OTAProfileStub"]["PayloadContent"][
|
||||
"EnrollmentIdentityPersistentID"
|
||||
]
|
||||
)
|
||||
|
||||
if "PushTokenDataSentToServerKey" in conf_plist:
|
||||
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
|
||||
conf_plist["PushTokenDataSentToServerKey"]
|
||||
)
|
||||
|
||||
if "LastPushTokenHash" in conf_plist:
|
||||
conf_plist["LastPushTokenHash"] = b64encode(
|
||||
conf_plist["LastPushTokenHash"]
|
||||
)
|
||||
|
||||
if "PayloadContent" in conf_plist:
|
||||
for content_entry in range(len(conf_plist["PayloadContent"])):
|
||||
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"PERSISTENT_REF"
|
||||
]
|
||||
)
|
||||
|
||||
if (
|
||||
"IdentityPersistentRef"
|
||||
in conf_plist["PayloadContent"][content_entry]
|
||||
):
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
] = b64encode(
|
||||
conf_plist["PayloadContent"][content_entry][
|
||||
"IdentityPersistentRef"
|
||||
]
|
||||
)
|
||||
|
||||
self.results.append(
|
||||
{
|
||||
|
||||
@@ -73,7 +73,7 @@ class ShutdownLog(IOSExtraction):
|
||||
recent_processes = []
|
||||
times_delayed = 0
|
||||
delay = 0.0
|
||||
for line in content.splitlines():
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith("remaining client pid:"):
|
||||
|
||||
@@ -11,6 +11,7 @@ from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to
|
||||
from ..base import IOSExtraction
|
||||
|
||||
CHROME_FAVICON_BACKUP_IDS = ["55680ab883d0fdcffd94f959b1632e5fbbb18c5b"]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_FAVICON_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/Favicons",
|
||||
]
|
||||
|
||||
@@ -13,6 +13,7 @@ from ..base import IOSExtraction
|
||||
CHROME_HISTORY_BACKUP_IDS = [
|
||||
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
|
||||
]
|
||||
# TODO: Confirm Chrome database path.
|
||||
CHROME_HISTORY_ROOT_PATHS = [
|
||||
"private/var/mobile/Containers/Data/Application/*/Library/Application Support/Google/Chrome/Default/History", # pylint: disable=line-too-long
|
||||
]
|
||||
|
||||
@@ -123,6 +123,11 @@ class SMS(IOSExtraction):
|
||||
"""
|
||||
)
|
||||
items = list(cur)
|
||||
elif "no such table" in str(exc):
|
||||
self.log.info(
|
||||
"No SMS tables found in the database, skipping: %s", exc
|
||||
)
|
||||
return
|
||||
else:
|
||||
raise exc
|
||||
names = [description[0] for description in cur.description]
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from base64 import b64encode
|
||||
from typing import Optional, Union
|
||||
|
||||
@@ -79,21 +80,29 @@ class SMSAttachments(IOSExtraction):
|
||||
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
attachment.ROWID as "attachment_id",
|
||||
attachment.*,
|
||||
message.service as "service",
|
||||
handle.id as "phone_number"
|
||||
FROM attachment
|
||||
LEFT JOIN message_attachment_join ON
|
||||
message_attachment_join.attachment_id = attachment.ROWID
|
||||
LEFT JOIN message ON
|
||||
message.ROWID = message_attachment_join.message_id
|
||||
LEFT JOIN handle ON handle.ROWID = message.handle_id;
|
||||
"""
|
||||
SELECT
|
||||
attachment.ROWID as "attachment_id",
|
||||
attachment.*,
|
||||
message.service as "service",
|
||||
handle.id as "phone_number"
|
||||
FROM attachment
|
||||
LEFT JOIN message_attachment_join ON
|
||||
message_attachment_join.attachment_id = attachment.ROWID
|
||||
LEFT JOIN message ON
|
||||
message.ROWID = message_attachment_join.message_id
|
||||
LEFT JOIN handle ON handle.ROWID = message.handle_id;
|
||||
"""
|
||||
)
|
||||
)
|
||||
except sqlite3.OperationalError as exc:
|
||||
self.log.info(
|
||||
"No SMS attachment tables found in the database, skipping: %s", exc
|
||||
)
|
||||
cur.close()
|
||||
conn.close()
|
||||
return
|
||||
names = [description[0] for description in cur.description]
|
||||
|
||||
for item in cur:
|
||||
|
||||
@@ -79,55 +79,32 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# FIXME: table contains extra fields with timestamp here
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction,
|
||||
mostRecentUserInteractionTime,
|
||||
mostRecentWebPushInteractionTime
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
)
|
||||
has_extra_timestamps = True
|
||||
except sqlite3.OperationalError:
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
domainID,
|
||||
registrableDomain,
|
||||
lastSeen,
|
||||
hadUserInteraction
|
||||
from ObservedDomains;
|
||||
"""
|
||||
)
|
||||
has_extra_timestamps = False
|
||||
except sqlite3.OperationalError:
|
||||
return
|
||||
return
|
||||
|
||||
for row in cur:
|
||||
result = {
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
if has_extra_timestamps:
|
||||
result["most_recent_user_interaction_time"] = row[4]
|
||||
result["most_recent_user_interaction_time_isodate"] = (
|
||||
convert_unix_to_iso(row[4])
|
||||
)
|
||||
result["most_recent_web_push_interaction_time"] = row[5]
|
||||
result["most_recent_web_push_interaction_time_isodate"] = (
|
||||
convert_unix_to_iso(row[5])
|
||||
)
|
||||
self.results.append(result)
|
||||
self.results.append(
|
||||
{
|
||||
"domain_id": row[0],
|
||||
"registrable_domain": row[1],
|
||||
"last_seen": row[2],
|
||||
"had_user_interaction": bool(row[3]),
|
||||
"last_seen_isodate": convert_unix_to_iso(row[2]),
|
||||
"domain": domain,
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
|
||||
if len(self.results) > 0:
|
||||
self.log.info(
|
||||
|
||||
@@ -76,6 +76,12 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
entry["redirect_destination"]
|
||||
)
|
||||
|
||||
# TODO: Currently not used.
|
||||
# subframe_origins = self._extract_domains(
|
||||
# entry["subframe_under_origin"])
|
||||
# subresource_domains = self._extract_domains(
|
||||
# entry["subresource_under_origin"])
|
||||
|
||||
all_origins = set(
|
||||
[entry["origin"]] + source_domains + destination_domains
|
||||
)
|
||||
|
||||
@@ -311,11 +311,14 @@ class NetBase(IOSExtraction):
|
||||
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
# check_manipulated/find_deleted require "live_isodate" and
|
||||
# "live_proc_id" keys which may be absent in older result formats.
|
||||
if self.results and "live_isodate" in self.results[0]:
|
||||
# Check for manipulated process records.
|
||||
# TODO: Catching KeyError for live_isodate for retro-compatibility.
|
||||
# This is not very good.
|
||||
try:
|
||||
self.check_manipulated()
|
||||
self.find_deleted()
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
@@ -25,6 +25,9 @@ class TestDumpsysAccessibilityArtifact:
|
||||
da.results[0]["service"]
|
||||
== "com.android.settings/com.samsung.android.settings.development.gpuwatch.GPUWatchInterceptor"
|
||||
)
|
||||
# All services are installed but none enabled in this fixture
|
||||
for result in da.results:
|
||||
assert result["enabled"] is False
|
||||
|
||||
def test_parsing_v14_aosp_format(self):
|
||||
da = DumpsysAccessibilityArtifact()
|
||||
@@ -36,7 +39,32 @@ class TestDumpsysAccessibilityArtifact:
|
||||
da.parse(data)
|
||||
assert len(da.results) == 1
|
||||
assert da.results[0]["package_name"] == "com.malware.accessibility"
|
||||
assert da.results[0]["service"] == "com.malware.service.malwareservice"
|
||||
assert (
|
||||
da.results[0]["service"]
|
||||
== "com.malware.accessibility/com.malware.service.malwareservice"
|
||||
)
|
||||
assert da.results[0]["enabled"] is True
|
||||
|
||||
def test_parsing_installed_and_enabled(self):
|
||||
da = DumpsysAccessibilityArtifact()
|
||||
file = get_artifact("android_data/dumpsys_accessibility_enabled.txt")
|
||||
with open(file) as f:
|
||||
data = f.read()
|
||||
|
||||
assert len(da.results) == 0
|
||||
da.parse(data)
|
||||
assert len(da.results) == 5
|
||||
|
||||
enabled = [r for r in da.results if r["enabled"]]
|
||||
assert len(enabled) == 1
|
||||
assert enabled[0]["package_name"] == "com.samsung.accessibility"
|
||||
assert (
|
||||
enabled[0]["service"]
|
||||
== "com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)"
|
||||
)
|
||||
|
||||
not_enabled = [r for r in da.results if not r["enabled"]]
|
||||
assert len(not_enabled) == 4
|
||||
|
||||
def test_ioc_check(self, indicator_file):
|
||||
da = DumpsysAccessibilityArtifact()
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
ACCESSIBILITY MANAGER (dumpsys accessibility)
|
||||
|
||||
currentUserId=0
|
||||
User state[
|
||||
attributes:{id=0, touchExplorationEnabled=false, installedServiceCount=5}
|
||||
installed services: {
|
||||
0 : com.google.android.apps.accessibility.voiceaccess/.JustSpeakService (A11yTool)
|
||||
1 : com.microsoft.appmanager/com.microsoft.mmx.screenmirroringsrc.accessibility.ScreenMirroringAccessibilityService
|
||||
2 : com.samsung.accessibility/.assistantmenu.serviceframework.AssistantMenuService (A11yTool)
|
||||
3 : com.samsung.accessibility/.universalswitch.UniversalSwitchService (A11yTool)
|
||||
4 : com.samsung.android.accessibility.talkback/com.samsung.android.marvin.talkback.TalkBackService (A11yTool)
|
||||
}
|
||||
Bound services:{}
|
||||
Enabled services:{{com.samsung.accessibility/.universalswitch.UniversalSwitchService}}
|
||||
Binding services:{}
|
||||
Crashed services:{}
|
||||
@@ -82,7 +82,7 @@ def generate_test_stix_file(file_path):
|
||||
for h in sha256:
|
||||
i = Indicator(
|
||||
indicator_types=["malicious-activity"],
|
||||
pattern="[file:hashes.sha256='{}']".format(h),
|
||||
pattern="[file:hashes.'SHA-256'='{}']".format(h),
|
||||
pattern_type="stix",
|
||||
)
|
||||
res.append(i)
|
||||
@@ -91,7 +91,7 @@ def generate_test_stix_file(file_path):
|
||||
for h in sha1:
|
||||
i = Indicator(
|
||||
indicator_types=["malicious-activity"],
|
||||
pattern="[file:hashes.sha1='{}']".format(h),
|
||||
pattern="[file:hashes.'SHA-1'='{}']".format(h),
|
||||
pattern_type="stix",
|
||||
)
|
||||
res.append(i)
|
||||
|
||||
@@ -94,6 +94,78 @@ class TestIndicators:
|
||||
)
|
||||
assert ind.check_file_hash("da0611a300a9ce9aa7a09d1212f203fca5856794")
|
||||
|
||||
def test_parse_stix2_hash_key_variants(self, tmp_path):
|
||||
"""STIX2 spec requires single-quoted algorithm names that contain hyphens,
|
||||
e.g. file:hashes.'SHA-256'. Verify MVT accepts both spec-compliant and
|
||||
non-standard lowercase spellings for MD5, SHA-1 and SHA-256."""
|
||||
import json
|
||||
|
||||
sha256_hash = "570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6"
|
||||
sha1_hash = "da0611a300a9ce9aa7a09d1212f203fca5856794"
|
||||
md5_hash = "d41d8cd98f00b204e9800998ecf8427e"
|
||||
|
||||
variants = [
|
||||
# (pattern_key, expected_bucket)
|
||||
("file:hashes.'SHA-256'", "files_sha256"),
|
||||
("file:hashes.SHA-256", "files_sha256"),
|
||||
("file:hashes.SHA256", "files_sha256"),
|
||||
("file:hashes.sha256", "files_sha256"),
|
||||
("file:hashes.'SHA-1'", "files_sha1"),
|
||||
("file:hashes.SHA-1", "files_sha1"),
|
||||
("file:hashes.SHA1", "files_sha1"),
|
||||
("file:hashes.sha1", "files_sha1"),
|
||||
("file:hashes.MD5", "files_md5"),
|
||||
("file:hashes.'MD5'", "files_md5"),
|
||||
("file:hashes.md5", "files_md5"),
|
||||
]
|
||||
|
||||
hash_for = {
|
||||
"files_sha256": sha256_hash,
|
||||
"files_sha1": sha1_hash,
|
||||
"files_md5": md5_hash,
|
||||
}
|
||||
|
||||
for pattern_key, bucket in variants:
|
||||
h = hash_for[bucket]
|
||||
stix = {
|
||||
"type": "bundle",
|
||||
"id": "bundle--test",
|
||||
"objects": [
|
||||
{
|
||||
"type": "malware",
|
||||
"id": "malware--test",
|
||||
"name": "TestMalware",
|
||||
"is_family": False,
|
||||
},
|
||||
{
|
||||
"type": "indicator",
|
||||
"id": "indicator--test",
|
||||
"indicator_types": ["malicious-activity"],
|
||||
"pattern": f"[{pattern_key}='{h}']",
|
||||
"pattern_type": "stix",
|
||||
"valid_from": "2024-01-01T00:00:00Z",
|
||||
},
|
||||
{
|
||||
"type": "relationship",
|
||||
"id": "relationship--test",
|
||||
"relationship_type": "indicates",
|
||||
"source_ref": "indicator--test",
|
||||
"target_ref": "malware--test",
|
||||
},
|
||||
],
|
||||
}
|
||||
stix_file = tmp_path / "test.stix2"
|
||||
stix_file.write_text(json.dumps(stix))
|
||||
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([str(stix_file)], load_default=False)
|
||||
assert len(ind.ioc_collections[0][bucket]) == 1, (
|
||||
f"Pattern key '{pattern_key}' was not parsed into '{bucket}'"
|
||||
)
|
||||
assert ind.check_file_hash(h) is not None, (
|
||||
f"check_file_hash failed for pattern key '{pattern_key}'"
|
||||
)
|
||||
|
||||
def test_check_android_property(self, indicator_file):
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([indicator_file], load_default=False)
|
||||
|
||||
Reference in New Issue
Block a user