Compare commits

..

1 Commits

Author SHA1 Message Date
tmarschutz
5da3f1e071 first commit 2025-10-03 11:45:17 +02:00
10575 changed files with 1386590 additions and 206 deletions

View File

@@ -6,7 +6,7 @@
<p align="center"><strong>AI-powered workflow automation and AI Agents for AppSec, Fuzzing & Offensive Security</strong></p>
<p align="center">
<a href="https://discord.gg/8XEX33UUwZ/"><img src="https://img.shields.io/discord/1420767905255133267?logo=discord&label=Discord" alt="Discord"></a>
<a href="https://discord.com/invite/acqv9FVG"><img src="https://img.shields.io/discord/1420767905255133267?logo=discord&label=Discord" alt="Discord"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-BSL%20%2B%20Apache-orange" alt="License: BSL + Apache"></a>
<a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.11%2B-blue" alt="Python 3.11+"/></a>
<a href="https://fuzzforge.ai"><img src="https://img.shields.io/badge/Website-fuzzforge.ai-blue" alt="Website"/></a>
@@ -176,7 +176,7 @@ _AI agents automatically analyzing code and providing security insights_
- 🌐 [Website](https://fuzzforge.ai)
- 📖 [Documentation](https://docs.fuzzforge.ai)
- 💬 [Community Discord](https://discord.gg/8XEX33UUwZ)
- 💬 [Community Discord](https://discord.com/invite/acqv9FVG)
- 🎓 [FuzzingLabs Academy](https://academy.fuzzinglabs.com/?coupon=GITHUB_FUZZFORGE)
---
@@ -205,7 +205,7 @@ Planned features and improvements:
- ☁️ Multi-tenant SaaS platform with team collaboration
- 📊 Advanced reporting & analytics
👉 Follow updates in the [GitHub issues](../../issues) and [Discord](https://discord.gg/8XEX33UUwZ)
👉 Follow updates in the [GitHub issues](../../issues) and [Discord](https://discord.com/invite/acqv9FVG).
---

View File

@@ -65,7 +65,7 @@ def create_a2a_app():
port = int(os.getenv('FUZZFORGE_PORT', 10100))
# Get the FuzzForge agent
fuzzforge = get_fuzzforge_agent(auto_start_server=False)
fuzzforge = get_fuzzforge_agent()
# Print ASCII banner
print("\033[95m") # Purple color

View File

@@ -15,12 +15,8 @@ The core agent that combines all components
import os
import threading
import time
import socket
import asyncio
from pathlib import Path
from typing import Dict, Any, List, Optional
from typing import Dict, Any, List
from google.adk import Agent
from google.adk.models.lite_llm import LiteLlm
from .agent_card import get_fuzzforge_agent_card
@@ -47,19 +43,11 @@ class FuzzForgeAgent:
model: str = None,
cognee_url: str = None,
port: int = 10100,
auto_start_server: Optional[bool] = None,
):
"""Initialize FuzzForge agent with configuration"""
self.model = model or os.getenv('LITELLM_MODEL', 'gpt-4o-mini')
self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL')
self.port = int(os.getenv('FUZZFORGE_PORT', port))
self._auto_start_server = (
auto_start_server
if auto_start_server is not None
else os.getenv('FUZZFORGE_AUTO_A2A_SERVER', '1') not in {'0', 'false', 'False'}
)
self._uvicorn_server = None
self._a2a_server_thread: Optional[threading.Thread] = None
self.port = port
# Initialize ADK Memory Service for conversational memory
memory_type = os.getenv('MEMORY_SERVICE', 'inmemory')
@@ -87,9 +75,6 @@ class FuzzForgeAgent:
# Create the ADK agent (for A2A server mode)
self.adk_agent = self._create_adk_agent()
if self._auto_start_server:
self._ensure_a2a_server_running()
def _create_adk_agent(self) -> Agent:
"""Create the ADK agent for A2A server mode"""
@@ -134,85 +119,15 @@ When responding to requests:
async def cleanup(self):
"""Clean up resources"""
await self._stop_a2a_server()
await self.executor.cleanup()
def _ensure_a2a_server_running(self):
"""Start the A2A server in the background if it's not already running."""
if self._a2a_server_thread and self._a2a_server_thread.is_alive():
return
try:
from uvicorn import Config, Server
from .a2a_server import create_a2a_app as create_custom_a2a_app
except ImportError as exc:
if os.getenv('FUZZFORGE_DEBUG', '0') == '1':
print(f"[DEBUG] Unable to start A2A server automatically: {exc}")
return
app = create_custom_a2a_app(
self.adk_agent,
port=self.port,
executor=self.executor,
)
log_level = os.getenv('FUZZFORGE_UVICORN_LOG_LEVEL', 'error')
config = Config(app=app, host='127.0.0.1', port=self.port, log_level=log_level, loop='asyncio')
server = Server(config=config)
self._uvicorn_server = server
def _run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def _serve():
await server.serve()
try:
loop.run_until_complete(_serve())
finally:
loop.close()
thread = threading.Thread(target=_run_server, name='FuzzForgeA2AServer', daemon=True)
thread.start()
self._a2a_server_thread = thread
# Give the server a moment to bind to the port for downstream agents
for _ in range(50):
if server.should_exit:
break
try:
with socket.create_connection(('127.0.0.1', self.port), timeout=0.1):
if os.getenv('FUZZFORGE_DEBUG', '0') == '1':
print(f"[DEBUG] Auto-started A2A server on http://127.0.0.1:{self.port}")
break
except OSError:
time.sleep(0.1)
async def _stop_a2a_server(self):
"""Shut down the background A2A server if we started one."""
server = self._uvicorn_server
if server is None:
return
server.should_exit = True
if self._a2a_server_thread and self._a2a_server_thread.is_alive():
# Allow server loop to exit gracefully without blocking event loop
try:
await asyncio.wait_for(asyncio.to_thread(self._a2a_server_thread.join, 5), timeout=6)
except (asyncio.TimeoutError, RuntimeError):
pass
self._uvicorn_server = None
self._a2a_server_thread = None
# Create a singleton instance for import
_instance = None
def get_fuzzforge_agent(auto_start_server: Optional[bool] = None) -> FuzzForgeAgent:
def get_fuzzforge_agent() -> FuzzForgeAgent:
"""Get the singleton FuzzForge agent instance"""
global _instance
if _instance is None:
_instance = FuzzForgeAgent(auto_start_server=auto_start_server)
_instance = FuzzForgeAgent()
return _instance

View File

@@ -16,7 +16,7 @@ import base64
import time
import uuid
import json
from typing import Dict, Any, List, Union, Optional
from typing import Dict, Any, List, Union
from datetime import datetime
import os
import warnings
@@ -93,8 +93,7 @@ class FuzzForgeExecutor:
self._background_tasks: set[asyncio.Task] = set()
self.pending_runs: Dict[str, Dict[str, Any]] = {}
self.session_metadata: Dict[str, Dict[str, Any]] = {}
self._project_root = self._detect_project_root()
self._artifact_cache_dir = self._resolve_artifact_cache_dir()
self._artifact_cache_dir = Path(os.getenv('FUZZFORGE_ARTIFACT_DIR', Path.cwd() / '.fuzzforge' / 'artifacts'))
self._knowledge_integration = None
# Initialize Cognee service if available
@@ -195,38 +194,6 @@ class FuzzForgeExecutor:
if self.debug:
print(f"[DEBUG] Auto-registration error for {url}: {e}")
def _detect_project_root(self) -> Optional[Path]:
"""Locate the active FuzzForge project root directory if available."""
env_root = os.getenv('FUZZFORGE_PROJECT_DIR')
if env_root:
candidate = Path(env_root).expanduser().resolve()
if candidate.joinpath('.fuzzforge').is_dir():
return candidate
try:
config = ProjectConfigManager()
return config.config_path.parent.resolve()
except Exception:
pass
current = Path.cwd().resolve()
for path in (current,) + tuple(current.parents):
if path.joinpath('.fuzzforge').is_dir():
return path
return None
def _resolve_artifact_cache_dir(self) -> Path:
"""Determine the artifact cache directory, prioritizing project context."""
env_dir = os.getenv('FUZZFORGE_ARTIFACT_DIR')
if env_dir:
return Path(env_dir).expanduser().resolve()
project_root = self._project_root
if project_root:
return (project_root / '.fuzzforge' / 'artifacts').resolve()
return (Path.cwd() / '.fuzzforge' / 'artifacts').resolve()
def _create_artifact_service(self):
"""Create artifact service based on configuration"""
artifact_storage = os.getenv('ARTIFACT_STORAGE', 'inmemory')
@@ -821,39 +788,6 @@ class FuzzForgeExecutor:
tools.append(FunctionTool(send_file_to_agent))
async def send_code_snippet_to_agent(
agent_name: str,
code: str,
filename: str = "",
note: str = "",
tool_context: ToolContext | None = None,
) -> str:
"""Create an artifact from raw code and send it to a registered agent."""
if not agent_name:
return "agent_name is required"
if not code or not code.strip():
return "code is required"
session = None
context_id = None
if tool_context and getattr(tool_context, "invocation_context", None):
invocation = tool_context.invocation_context
session = invocation.session
context_id = self.session_lookup.get(getattr(session, 'id', None))
target_filename = filename or "snippet.rs"
snippet_note = note or "Please analyse the provided code snippet."
return await self.delegate_code_snippet_to_agent(
agent_name,
target_filename,
code,
note=snippet_note,
session=session,
context_id=context_id,
)
tools.append(FunctionTool(send_code_snippet_to_agent))
if self.debug:
print("[DEBUG] Added Cognee project integration tools")
@@ -1952,14 +1886,11 @@ Be concise and intelligent in your responses."""
async def create_project_file_artifact_api(self, file_path: str) -> Dict[str, Any]:
try:
config = ProjectConfigManager(self._project_root) if self._project_root else ProjectConfigManager()
config = ProjectConfigManager()
if not config.is_initialized():
return {"error": "Project not initialized. Run 'fuzzforge init' first."}
project_root = self._project_root or config.config_path.parent.resolve()
if self._project_root is None:
self._project_root = project_root
self._artifact_cache_dir = self._resolve_artifact_cache_dir()
project_root = config.config_path.parent.resolve()
requested_file = (project_root / file_path).resolve()
try:
@@ -2170,45 +2101,6 @@ Be concise and intelligent in your responses."""
await self._append_external_event(session, agent_name, response_text)
return response_text
async def delegate_code_snippet_to_agent(
self,
agent_name: str,
filename: str,
code: str,
note: str = "",
session: Any = None,
context_id: str | None = None,
) -> str:
try:
if not code or not code.strip():
return "No code snippet provided for delegation."
cache_dir = self._prepare_artifact_cache_dir()
artifact_id = uuid.uuid4().hex
# Normalise filename and ensure extension
safe_filename = (filename or "snippet.rs").strip()
if not safe_filename:
safe_filename = "snippet.rs"
if "." not in safe_filename:
safe_filename = f"{safe_filename}.rs"
snippet_dir = cache_dir / artifact_id
snippet_dir.mkdir(parents=True, exist_ok=True)
file_path = snippet_dir / safe_filename
file_path.write_text(code, encoding="utf-8")
message_note = note or f"Please analyse the code snippet {safe_filename}."
return await self.delegate_file_to_agent(
agent_name,
str(file_path),
message_note,
session=session,
context_id=context_id,
)
except Exception as exc:
return f"Failed to delegate code snippet: {exc}"
async def delegate_file_to_agent(
self,
agent_name: str,

View File

@@ -0,0 +1,25 @@
"""
Android Security Modules
This package contains modules for android static code analysis and security testing.
Available modules:
- MobSF: Mobile Security Framework
- Jadx: Dex to Java decompiler
- OpenGrep: Open-source pattern-based static analysis tool
"""
from typing import List, Type
from ..base import BaseModule
# Module registry for automatic discovery
ANDROID_MODULES: List[Type[BaseModule]] = []
def register_module(module_class: Type[BaseModule]):
"""Register a android security module"""
ANDROID_MODULES.append(module_class)
return module_class
def get_available_modules() -> List[Type[BaseModule]]:
"""Get all available android modules"""
return ANDROID_MODULES.copy()

View File

@@ -0,0 +1,15 @@
rules:
- id: clipboard-sensitive-data
severity: WARNING
languages: [java]
message: "Sensitive data may be copied to the clipboard."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: security
area: clipboard
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$CLIPBOARD.setPrimaryClip($CLIP)"

View File

@@ -0,0 +1,23 @@
rules:
- id: hardcoded-secrets
severity: WARNING
languages: [java]
message: "Possible hardcoded secret found in variable '$NAME'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: secrets
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: 'String $NAME = "$VAL";'
- pattern: 'final String $NAME = "$VAL";'
- pattern: 'private String $NAME = "$VAL";'
- pattern: 'public static String $NAME = "$VAL";'
- pattern: 'static final String $NAME = "$VAL";'
- pattern-regex: "$NAME =~ /(?i).*(api|key|token|secret|pass|auth|session|bearer|access|private).*/"

View File

@@ -0,0 +1,18 @@
rules:
- id: insecure-data-storage
severity: WARNING
languages: [java]
message: "Potential insecure data storage (external storage)."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern-either:
- pattern: "$CTX.openFileOutput($NAME, $MODE)"
- pattern: "Environment.getExternalStorageDirectory()"

View File

@@ -0,0 +1,16 @@
rules:
- id: insecure-deeplink
severity: WARNING
languages: [xml]
message: "Potential insecure deeplink found in intent-filter."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<intent-filter>

View File

@@ -0,0 +1,21 @@
rules:
- id: insecure-logging
severity: WARNING
languages: [java]
message: "Sensitive data logged via Android Log API."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: logging
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "Log.d($TAG, $MSG)"
- pattern: "Log.e($TAG, $MSG)"
- pattern: "System.out.println($MSG)"
- pattern-regex: "$MSG =~ /(?i).*(password|token|secret|api|auth|session).*/"

View File

@@ -0,0 +1,15 @@
rules:
- id: intent-redirection
severity: WARNING
languages: [java]
message: "Potential intent redirection: using getIntent().getExtras() without validation."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: intent
area: intercomponent
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$ACT.getIntent().getExtras()"

View File

@@ -0,0 +1,18 @@
rules:
- id: sensitive-data-in-shared-preferences
severity: WARNING
languages: [java]
message: "Sensitive data may be stored in SharedPreferences. Please review the key '$KEY'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern: "$EDITOR.putString($KEY, $VAL);"
- pattern-regex: "$KEY =~ /(?i).*(username|password|pass|token|auth_token|api_key|secret|sessionid|email).*/"

View File

@@ -0,0 +1,21 @@
rules:
- id: sqlite-injection
severity: ERROR
languages: [java]
message: "Possible SQL injection: concatenated input in rawQuery or execSQL."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: injection
area: database
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "$DB.rawQuery($QUERY, ...)"
- pattern: "$DB.execSQL($QUERY)"
- pattern-regex: "$QUERY =~ /.*\".*\".*\\+.*/"

View File

@@ -0,0 +1,16 @@
rules:
- id: vulnerable-activity
severity: WARNING
languages: [xml]
message: "Activity exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<activity android:exported="true"

View File

@@ -0,0 +1,16 @@
rules:
- id: vulnerable-content-provider
severity: WARNING
languages: [xml]
message: "ContentProvider exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<provider android:exported="true"

View File

@@ -0,0 +1,16 @@
rules:
- id: vulnerable-service
severity: WARNING
languages: [xml]
message: "Service exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<service android:exported="true"

View File

@@ -0,0 +1,16 @@
rules:
- id: webview-javascript-enabled
severity: ERROR
languages: [java]
message: "WebView with JavaScript enabled can be dangerous if loading untrusted content."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.getSettings().setJavaScriptEnabled(true)"

View File

@@ -0,0 +1,16 @@
rules:
- id: webview-load-arbitrary-url
severity: WARNING
languages: [java]
message: "Loading unvalidated URL in WebView may cause open redirect or XSS."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.loadUrl($URL)"

View File

@@ -0,0 +1,197 @@
"""Jadx APK Decompilation Module"""
import asyncio
import shutil
from pathlib import Path
from typing import Dict, Any
import logging
from ..base import BaseModule, ModuleMetadata, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class JadxModule(BaseModule):
"""Module responsible for decompiling APK files with Jadx"""
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="jadx",
version="1.5.0",
description="Android APK decompilation using Jadx",
author="FuzzForge Team",
category="android",
tags=["android", "jadx", "decompilation", "reverse"],
input_schema={
"type": "object",
"properties": {
"apk_path": {
"type": "string",
"description": "Path to the APK to decompile (absolute or relative to workspace)",
},
"output_dir": {
"type": "string",
"description": "Directory (relative to workspace) where Jadx output should be written",
"default": "jadx_output",
},
"overwrite": {
"type": "boolean",
"description": "Overwrite existing output directory if present",
"default": True,
},
"threads": {
"type": "integer",
"description": "Number of Jadx decompilation threads",
"default": 4,
},
"decompiler_args": {
"type": "array",
"items": {"type": "string"},
"description": "Additional arguments passed directly to Jadx",
},
},
"required": ["apk_path"],
},
output_schema={
"type": "object",
"properties": {
"output_dir": {"type": "string"},
"source_dir": {"type": "string"},
"resource_dir": {"type": "string"},
},
},
)
def validate_config(self, config: Dict[str, Any]) -> bool:
apk_path = config.get("apk_path")
if not apk_path:
raise ValueError("'apk_path' must be provided for Jadx decompilation")
threads = config.get("threads", 4)
if not isinstance(threads, int) or threads < 1 or threads > 32:
raise ValueError("threads must be between 1 and 32")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
self.start_timer()
try:
self.validate_config(config)
workspace = workspace.resolve()
if not workspace.exists():
raise ValueError(f"Workspace does not exist: {workspace}")
apk_path = Path(config["apk_path"])
if not apk_path.is_absolute():
apk_path = (workspace / apk_path).resolve()
if not apk_path.exists():
raise ValueError(f"APK not found: {apk_path}")
if apk_path.is_dir():
raise ValueError(f"APK path must be a file, not a directory: {apk_path}")
output_dir = Path(config.get("output_dir", "jadx_output"))
if not output_dir.is_absolute():
output_dir = (workspace / output_dir).resolve()
if output_dir.exists():
if config.get("overwrite", True):
shutil.rmtree(output_dir)
else:
raise ValueError(
f"Output directory already exists: {output_dir}. Set overwrite=true to replace it."
)
output_dir.mkdir(parents=True, exist_ok=True)
threads = str(config.get("threads", 4))
extra_args = config.get("decompiler_args", []) or []
cmd = [
"jadx",
"--threads-count",
threads,
"--deobf",
"--output-dir",
str(output_dir),
]
cmd.extend(extra_args)
cmd.append(str(apk_path))
logger.info("Running Jadx decompilation: %s", " ".join(cmd))
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
stdout, stderr = await process.communicate()
stdout_str = stdout.decode(errors="ignore") if stdout else ""
stderr_str = stderr.decode(errors="ignore") if stderr else ""
if stdout_str:
logger.debug("Jadx stdout: %s", stdout_str[:200])
if stderr_str:
logger.debug("Jadx stderr: %s", stderr_str[:200])
if process.returncode != 0:
error_output = stderr_str or stdout_str or "No error output"
raise RuntimeError(
f"Jadx failed with exit code {process.returncode}: {error_output[:500]}"
)
logger.debug("Jadx stdout: %s", stdout.decode(errors="ignore")[:200])
source_dir = output_dir / "sources"
resource_dir = output_dir / "resources"
if not source_dir.exists():
logger.warning("Jadx sources directory not found at expected path: %s", source_dir)
else:
sample_files = []
for idx, file_path in enumerate(source_dir.rglob("*.java")):
sample_files.append(str(file_path))
if idx >= 4:
break
logger.info("Sample Jadx Java files: %s", sample_files or "<none>")
java_files = 0
if source_dir.exists():
java_files = sum(1 for _ in source_dir.rglob("*.java"))
summary = {
"output_dir": str(output_dir),
"source_dir": str(source_dir if source_dir.exists() else output_dir),
"resource_dir": str(resource_dir if resource_dir.exists() else output_dir),
"java_files": java_files,
}
metadata = {
"apk_path": str(apk_path),
"output_dir": str(output_dir),
"source_dir": summary["source_dir"],
"resource_dir": summary["resource_dir"],
"threads": threads,
}
return self.create_result(
findings=[],
status="success",
summary=summary,
metadata=metadata,
)
except Exception as exc:
logger.error("Jadx module failed: %s", exc)
return self.create_result(
findings=[],
status="failed",
error=str(exc),
)

View File

@@ -0,0 +1,293 @@
from pathlib import Path
from typing import Dict, Any
from toolbox.modules.base import BaseModule, ModuleResult, ModuleMetadata, ModuleFinding
import requests
import os
import time
import json
from collections import Counter
"""
TODO:
* Configure workspace storage for apk and reports
* Think about mobsf repo implementation inside workflow
* Curl mobsf pdf report
* Save Json mobsf report
* Export Web server interface from the Workflow docker
"""
class MobSFModule(BaseModule):
def __init__(self):
self.mobsf_url = "http://localhost:8877"
self.file_path = ""
self.api_key = ""
self.scan_id = None
self.scan_hash = ""
self.report_file = ""
self._metadata = self.get_metadata()
self.start_timer() # <-- Add this line
def upload_file(self):
"""
Upload file to MobSF VM
Returns scan hash if upload succeeded
"""
# Ensure file_path is set and valid
if not self.file_path or not os.path.isfile(self.file_path):
raise ValueError("Invalid or missing file_path for upload.")
# Don't set Content-Type manually - let requests handle it
# MobSF expects API key in X-Mobsf-Api-Key header
headers = {'X-Mobsf-Api-Key': self.api_key}
# Keep the file open during the entire request
with open(self.file_path, 'rb') as f:
f.seek(0)
# Extract just the filename from the full path
filename = os.path.basename(self.file_path)
files = {'file': (filename, f, 'application/vnd.android.package-archive')}
# Make the request while the file is still open
response = requests.post(f"{self.mobsf_url}/api/v1/upload", files=files, headers=headers)
if response.status_code == 200:
resp_json = response.json()
if resp_json.get('hash'):
print("[+] Upload succeeded, scan hash:", resp_json['hash'])
return resp_json['hash']
else:
raise Exception(f"File upload failed: {resp_json}")
else:
raise Exception(f"Failed to upload file: {response.text}")
def start_scan(self, re_scan: int = 0, max_attempts: int = 10, delay: int = 3):
"""
Scan file that is already uploaded. Retries if scan is not ready.
Returns scan result or raises Exception after max_attempts.
"""
print("[+] Starting scan for hash", self.scan_hash)
data = {'hash': self.scan_hash}
headers = {'X-Mobsf-Api-Key': self.api_key}
response = requests.post(f"{self.mobsf_url}/api/v1/scan", data=data, headers=headers)
if response.status_code == 200:
try:
result = response.json()
# Heuristic: check for expected keys in result
if result:
print("[+] Scan succeeded for hash", self.scan_hash)
return result
except Exception as e:
print(f"Error parsing scan result: {e}")
def get_json_results(self):
"""
Retrieve JSON results for the scanned file
"""
headers = {'X-Mobsf-Api-Key': self.api_key}
data = {'hash': self.scan_hash}
response = requests.post(f"{self.mobsf_url}/api/v1/report_json", data=data, headers=headers)
if response.status_code == 200:
f = open('dump.json', 'w').write(json.dumps(response.json(), indent=2))
print("[+] Retrieved JSON results")
return response.json()
else:
raise Exception(f"Failed to retrieve JSON results: {response.text}")
def create_summary(self, findings):
"""
Summarize findings by severity.
Returns a dict like {'high': 3, 'info': 2, ...}
"""
severity_counter = Counter()
for finding in findings:
sev = getattr(finding, "severity", None)
if sev is None and isinstance(finding, dict):
sev = finding.get("severity")
if sev:
severity_counter[sev] += 1
res = dict(severity_counter)
print("Total Findings:", len(findings))
print("Severity counts:")
print(res)
return res
def parse_json_results(self):
if self.report_file=="" or not os.path.isfile(self.report_file):
raise ValueError("Invalid or missing report_file for parsing.")
f = open(self.report_file, 'r')
data = json.load(f)
findings = []
# Check specific sections
sections_to_parse = ['permissions', 'manifest_analysis', 'code_analysis', 'behaviour']
for section_name in sections_to_parse:
if section_name in data:
section = data[section_name]
#Permissions
if section_name == 'permissions':
for name, attrs in section.items():
findings.append(self.create_finding(
title=name,
description=attrs.get('description'),
severity=attrs.get('status'),
category="permission",
metadata={
'info': attrs.get('info'),
}
))
#Manifest Analysis
elif section_name == 'manifest_analysis':
findings_list = section.get('manifest_findings', [])
for x in findings_list:
findings.append(self.create_finding(
title=attrs.get('title') or attrs.get('name') or "unknown",
description=attrs.get('description', "No description"),
severity=attrs.get('severity', "unknown"),
category=section_name,
metadata={
'tag': attrs.get('rule')
}))
#Code Analysis
elif section_name == 'code_analysis':
findings_list = section.get('findings', [])
for name, attrs in findings_list.items():
metadata = attrs.get('metadata', {})
findings.append(self.create_finding(
title=name,
description=metadata.get('description'),
severity=metadata.get('severity'),
category="code_analysis",
metadata={
'cwe': metadata.get('cwe'),
'owasp': metadata.get('owasp'),
'files': attrs.get('file')
}))
#Behaviour
elif section_name == 'behaviour':
finding = []
for key, value in data['behaviour'].items():
metadata = value.get('metadata', {})
findings.append(self.create_finding(
title="behaviour_"+metadata.get('label')[0],
description=metadata.get('description'),
severity=metadata.get('severity'),
category="behaviour",
metadata={
'file': value.get('files', {})
}
))
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
findings = []
#Checking that mobsf server is reachable
self.mobsf_url = config.get("mobsf_url", "")
self.file_path = config.get("file_path", "")
# Get API key from config first, fallback to environment variable
self.api_key = config.get("api_key", "") or os.environ.get("MOBSF_API_KEY", "")
#Checking that the file to scan exists
file_path = config.get("file_path", None)
if not file_path or not os.path.isfile(file_path):
raise ValueError(f"Invalid or missing file_path in configuration: {file_path}")
try:
self.scan_hash = self.upload_file()
except Exception as e:
raise Exception(f"Failed to upload file to MobSF: {e}")
if self.scan_hash=="":
raise Exception("scan_hash not returned after upload.")
try:
scan_result = self.start_scan()
except Exception as e:
raise Exception(f"Failed to scan file in MobSF: {e}")
# Parse scan_result and convert to findings
# This is a placeholder; actual parsing logic will depend on MobSF's JSON structure
# Here we just create a dummy finding for illustration
try:
json_data = self.get_json_results()
except json.JSONDecodeError:
return self.create_result(
findings=[],
status="failed",
summary={"error": "Invalid JSON output from MOBSF"},
metadata={"engine": "mobsf", "file_scanned": file_path, "mobsf_url": root_uri}
)
self.report_file = 'dump.json'
findings = self.parse_json_results()
"""
findings.append(ModuleFinding(
title="MobSF Finding",
description="Finding generated by the MobSF module",
severity="medium",
category="mobsf",
metadata={"scan_result": scan_result}
))
"""
tmp_summary = self.create_summary(findings)
summary = {
"total_findings": len(findings),
"dangerous_severity": tmp_summary.get('dangerous', 0),
"warning_severity": tmp_summary.get('warning', 0),
"high_severity": tmp_summary.get('high', 0),
"medium_severity": tmp_summary.get('medium', 0),
"low_severity": tmp_summary.get('low', 0),
"info_severity": tmp_summary.get('info', 0),
}
metadata={"engine": "mobsf", "file_scanned": file_path, "mobsf_url": self.mobsf_url}#Add: "json_report": str(json_output_path
return self.create_result(findings=findings, status="success",summary=summary, metadata=metadata)
return ModuleResult(findings=findings, status="success",summary=summary, metadata=metadata)
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="Mobile Security Framework (MobSF)",
version="1.0.0",
description="Integrates MobSF for mobile app security scanning",
author="FuzzForge Team",
category="scanner",
tags=["mobsf", "mobile", "sast", "scanner"]
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""
Config pattern:
**config
findings: []
"tool_name": "FuzzForge Hello World",
"tool_version": "1.0.0",
"mobsf_uri": "(default: http://localhost:8000)",
"file_path": "(path to the APK or IPA file to scan)"
"""
if "mobsf_url" in config and not isinstance(config["mobsf_url"], str):
return False
# Check that mobsf_url does not render 404 when curling /
if "file_path" in config and not isinstance(config["file_path"], str):
return False
return True
if __name__ == "__main__":
import asyncio
module = MobSFModule()
config = {
"mobsf_url": "http://localhost:8877",
"file_path": "./toolbox/modules/android/beetlebug.apk",
}
workspace = Path("./toolbox/modules/android/")
result = asyncio.run(module.execute(config, workspace))
print(result)

View File

@@ -0,0 +1,411 @@
"""
OpenGrep Static Analysis Module
This module uses OpenGrep (open-source version of Semgrep) for pattern-based
static analysis across multiple programming languages.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import tempfile
from pathlib import Path
from typing import Dict, Any, List
import subprocess
import logging
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class OpenGrepModule(BaseModule):
"""OpenGrep static analysis module"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="opengrep",
version="1.45.0",
description="Open-source pattern-based static analysis tool for security vulnerabilities",
author="FuzzForge Team",
category="static_analysis",
tags=["sast", "pattern-matching", "multi-language", "security"],
input_schema={
"type": "object",
"properties": {
"config": {
"type": "string",
"enum": ["auto", "p/security-audit", "p/owasp-top-ten", "p/cwe-top-25"],
"default": "auto",
"description": "Rule configuration to use"
},
"custom_rules_path": {
"type": "string",
"description": "Path to a directory containing custom OpenGrep rules"
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Specific languages to analyze"
},
"include_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to include"
},
"exclude_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to exclude"
},
"max_target_bytes": {
"type": "integer",
"default": 1000000,
"description": "Maximum file size to analyze (bytes)"
},
"timeout": {
"type": "integer",
"default": 300,
"description": "Analysis timeout in seconds"
},
"severity": {
"type": "array",
"items": {"type": "string", "enum": ["ERROR", "WARNING", "INFO"]},
"default": ["ERROR", "WARNING", "INFO"],
"description": "Minimum severity levels to report"
},
"confidence": {
"type": "array",
"items": {"type": "string", "enum": ["HIGH", "MEDIUM", "LOW"]},
"default": ["HIGH", "MEDIUM", "LOW"],
"description": "Minimum confidence levels to report"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"rule_id": {"type": "string"},
"severity": {"type": "string"},
"confidence": {"type": "string"},
"file_path": {"type": "string"},
"line_number": {"type": "integer"}
}
}
}
}
}
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
timeout = config.get("timeout", 300)
if not isinstance(timeout, int) or timeout < 30 or timeout > 3600:
raise ValueError("Timeout must be between 30 and 3600 seconds")
max_bytes = config.get("max_target_bytes", 1000000)
if not isinstance(max_bytes, int) or max_bytes < 1000 or max_bytes > 10000000:
raise ValueError("max_target_bytes must be between 1000 and 10000000")
custom_rules_path = config.get("custom_rules_path")
if custom_rules_path:
if not Path(custom_rules_path).is_dir():
raise ValueError(f"Custom rules path must be a valid directory: {custom_rules_path}")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute OpenGrep static analysis"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running OpenGrep analysis on {workspace}")
# Build opengrep command
cmd = ["opengrep", "scan", "--json"]
# Add configuration
custom_rules_path = config.get("custom_rules_path")
use_custom_rules = False
if custom_rules_path:
cmd.extend(["--config", custom_rules_path])
use_custom_rules = True
else:
config_type = config.get("config", "auto")
if config_type == "auto":
cmd.extend(["--config", "auto"])
else:
cmd.extend(["--config", config_type])
# Add timeout
cmd.extend(["--timeout", str(config.get("timeout", 300))])
# Add max target bytes
cmd.extend(["--max-target-bytes", str(config.get("max_target_bytes", 1000000))])
# Add languages if specified (but NOT when using custom rules, as rules define their own languages)
if config.get("languages") and not use_custom_rules:
langs = ",".join(config["languages"])
cmd.extend(["--lang", langs])
# Add include patterns
if config.get("include_patterns"):
for pattern in config["include_patterns"]:
cmd.extend(["--include", pattern])
# Add exclude patterns
if config.get("exclude_patterns"):
for pattern in config["exclude_patterns"]:
cmd.extend(["--exclude", pattern])
# Add severity filter only if a single level is requested.
severity_levels = config.get("severity", ["ERROR", "WARNING", "INFO"])
if severity_levels and len(severity_levels) == 1:
cmd.extend(["--severity", severity_levels[0]])
# Add confidence filter (if supported in this version)
confidence_levels = config.get("confidence", ["HIGH", "MEDIUM"])
if confidence_levels and len(confidence_levels) < 3: # Only if not all levels
# Note: confidence filtering might need to be done post-processing
pass
# Disable metrics collection
cmd.append("--disable-version-check")
cmd.append("--no-git-ignore")
# Add target directory
cmd.append(str(workspace))
logger.debug(f"Running command: {' '.join(cmd)}")
# Run OpenGrep
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
if process.returncode in [0, 1]: # 0 = no findings, 1 = findings found
findings = self._parse_opengrep_output(stdout.decode(), workspace, config)
else:
error_msg = stderr.decode()
logger.error(f"OpenGrep failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"OpenGrep execution failed: {error_msg}"
)
# Create summary
summary = self._create_summary(findings)
logger.info(f"OpenGrep found {len(findings)} potential issues")
return self.create_result(
findings=findings,
status="success",
summary=summary
)
except Exception as e:
logger.error(f"OpenGrep module failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, Any]) -> List[ModuleFinding]:
"""Parse OpenGrep JSON output into findings"""
findings = []
if not output.strip():
return findings
try:
data = json.loads(output)
results = data.get("results", [])
logger.debug(f"OpenGrep returned {len(results)} raw results")
# Get filtering criteria
allowed_severities = set(config.get("severity", ["ERROR", "WARNING", "INFO"]))
allowed_confidences = set(config.get("confidence", ["HIGH", "MEDIUM", "LOW"]))
for result in results:
# Extract basic info
rule_id = result.get("check_id", "unknown")
message = result.get("message", "")
extra = result.get("extra", {})
severity = extra.get("severity", "INFO").upper()
# File location info
path_info = result.get("path", "")
start_line = result.get("start", {}).get("line", 0)
end_line = result.get("end", {}).get("line", 0)
start_col = result.get("start", {}).get("col", 0)
end_col = result.get("end", {}).get("col", 0)
# Code snippet
lines = extra.get("lines", "")
# Metadata
rule_metadata = extra.get("metadata", {})
cwe = rule_metadata.get("cwe", [])
owasp = rule_metadata.get("owasp", [])
confidence = extra.get("confidence", rule_metadata.get("confidence", "MEDIUM")).upper()
# Apply severity filter
if severity not in allowed_severities:
continue
# Apply confidence filter
if confidence not in allowed_confidences:
continue
# Make file path relative to workspace
if path_info:
try:
rel_path = Path(path_info).relative_to(workspace)
path_info = str(rel_path)
except ValueError:
pass
# Map severity to our standard levels
finding_severity = self._map_severity(severity)
# Create finding
finding = self.create_finding(
title=f"Security issue: {rule_id}",
description=message or f"OpenGrep rule {rule_id} triggered",
severity=finding_severity,
category=self._get_category(rule_id, extra),
file_path=path_info if path_info else None,
line_start=start_line if start_line > 0 else None,
line_end=end_line if end_line > 0 and end_line != start_line else None,
code_snippet=lines.strip() if lines else None,
recommendation=self._get_recommendation(rule_id, extra),
metadata={
"rule_id": rule_id,
"opengrep_severity": severity,
"confidence": confidence,
"cwe": cwe,
"owasp": owasp,
"fix": extra.get("fix", ""),
"impact": extra.get("impact", ""),
"likelihood": extra.get("likelihood", ""),
"references": extra.get("references", [])
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse OpenGrep output: {e}. Output snippet: {output[:200]}...")
except Exception as e:
logger.warning(f"Error processing OpenGrep results: {e}")
return findings
def _map_severity(self, opengrep_severity: str) -> str:
"""Map OpenGrep severity to our standard severity levels"""
severity_map = {
"ERROR": "high",
"WARNING": "medium",
"INFO": "low"
}
return severity_map.get(opengrep_severity.upper(), "medium")
def _get_category(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Determine finding category based on rule and metadata"""
rule_metadata = extra.get("metadata", {})
cwe_list = rule_metadata.get("cwe", [])
owasp_list = rule_metadata.get("owasp", [])
# Check for common security categories
if any("injection" in rule_id.lower() for x in [rule_id]):
return "injection"
elif any("xss" in rule_id.lower() for x in [rule_id]):
return "xss"
elif any("csrf" in rule_id.lower() for x in [rule_id]):
return "csrf"
elif any("auth" in rule_id.lower() for x in [rule_id]):
return "authentication"
elif any("crypto" in rule_id.lower() for x in [rule_id]):
return "cryptography"
elif cwe_list:
return f"cwe-{cwe_list[0]}"
elif owasp_list:
return f"owasp-{owasp_list[0].replace(' ', '-').lower()}"
else:
return "security"
def _get_recommendation(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Generate recommendation based on rule and metadata"""
fix_suggestion = extra.get("fix", "")
if fix_suggestion:
return fix_suggestion
# Generic recommendations based on rule type
if "injection" in rule_id.lower():
return "Use parameterized queries or prepared statements to prevent injection attacks."
elif "xss" in rule_id.lower():
return "Properly encode/escape user input before displaying it in web pages."
elif "crypto" in rule_id.lower():
return "Use cryptographically secure algorithms and proper key management."
elif "hardcode" in rule_id.lower():
return "Remove hardcoded secrets and use secure configuration management."
else:
return "Review this security issue and apply appropriate fixes based on your security requirements."
def _create_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
"""Create analysis summary"""
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
category_counts = {}
rule_counts = {}
for finding in findings:
# Count by severity
severity_counts[finding.severity] += 1
# Count by category
category = finding.category
category_counts[category] = category_counts.get(category, 0) + 1
# Count by rule
rule_id = finding.metadata.get("rule_id", "unknown")
rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1
return {
"total_findings": len(findings),
"severity_counts": severity_counts,
"category_counts": category_counts,
"top_rules": dict(sorted(rule_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
}

View File

@@ -0,0 +1,59 @@
FROM prefecthq/prefect:3-python3.11
WORKDIR /app
# Install system dependencies for MobSF and Jadx
RUN apt-get update && apt-get install -y \
git \
default-jdk \
wget \
unzip \
xfonts-75dpi \
xfonts-base \
&& rm -rf /var/lib/apt/lists/* \
&& wget https://github.com/wkhtmltopdf/packaging/releases/download/0.12.6.1-3/wkhtmltox_0.12.6.1-3.bookworm_amd64.deb \
&& apt-get update \
&& apt-get install -y ./wkhtmltox_0.12.6.1-3.bookworm_amd64.deb \
&& rm wkhtmltox_0.12.6.1-3.bookworm_amd64.deb \
&& rm -rf /var/lib/apt/lists/*
# Install Jadx
RUN wget https://github.com/skylot/jadx/releases/download/v1.5.0/jadx-1.5.0.zip -O /tmp/jadx.zip \
&& unzip /tmp/jadx.zip -d /opt/jadx \
&& rm /tmp/jadx.zip \
&& ln -s /opt/jadx/bin/jadx /usr/local/bin/jadx
# The upstream OpenGrep CLI is not yet published on PyPI. Use semgrep (the
# engine that OpenGrep builds upon) and expose it under the `opengrep` name so
# the workflow module can invoke it transparently.
RUN pip install --no-cache-dir semgrep==1.45.0 \
&& ln -sf /usr/local/bin/semgrep /usr/local/bin/opengrep
# Clone and setup MobSF
RUN git clone https://github.com/MobSF/Mobile-Security-Framework-MobSF.git /app/mobsf \
&& cd /app/mobsf \
&& git checkout v3.9.7 \
&& ./setup.sh
# Force rebuild after this point
ARG CACHEBUST=2
# Copy the entire toolbox directory structure
COPY . /app/toolbox
# Copy Android custom rules to a well-known location
COPY ./modules/android/custom_rules /app/custom_opengrep_rules
ENV PYTHONPATH=/app/toolbox:$PYTHONPATH
ENV MOBSF_PORT=8877
# Create startup script to launch MobSF in background and then Prefect
RUN echo '#!/bin/bash\n\
cd /app/mobsf && ./run.sh 127.0.0.1:8877 &\n\
echo "Waiting for MobSF to start..."\n\
sleep 10\n\
echo "Starting Prefect engine..."\n\
exec python -m prefect.engine\n\
' > /app/start.sh && chmod +x /app/start.sh
CMD ["/app/start.sh"]

View File

@@ -0,0 +1,16 @@
# Use existing image with MobSF already installed
FROM localhost:5001/fuzzforge/android_static_analysis:latest
# Install unzip and Jadx
RUN apt-get update && apt-get install -y unzip && rm -rf /var/lib/apt/lists/* \
&& wget https://github.com/skylot/jadx/releases/download/v1.5.0/jadx-1.5.0.zip \
&& unzip -o jadx-1.5.0.zip -d /opt/jadx \
&& rm jadx-1.5.0.zip \
&& chmod +x /opt/jadx/bin/jadx \
&& ln -sf /opt/jadx/bin/jadx /usr/local/bin/jadx
# Copy updated toolbox files
COPY . /app/toolbox
# Copy Android custom rules
COPY ./modules/android/custom_rules /app/custom_opengrep_rules

View File

@@ -0,0 +1,6 @@
"""
Android Static Analysis Security Testing (SAST) Workflow
This package contains the Android SAST workflow that combines
multiple static analysis tools optimized for Java code security.
"""

View File

@@ -0,0 +1,135 @@
name: android_static_analysis
version: "1.0.0"
description: "Perform static analysis on Android applications using OpenGrep and MobSF."
author: "FuzzForge Team"
category: "specialized"
tags:
- "android"
- "static-analysis"
- "security"
- "opengrep"
- "semgrep"
- "mobsf"
supported_volume_modes:
- "ro"
- "rw"
default_volume_mode: "ro"
default_target_path: "/workspace/android_test"
requirements:
tools:
- "opengrep"
- "mobsf"
- "sarif_reporter"
resources:
memory: "2Gi"
cpu: "2000m"
timeout: 3600
environment:
python: "3.11"
has_docker: true
default_parameters:
target_path: "/workspace/android_test"
volume_mode: "ro"
apk_path: ""
opengrep_config: {}
custom_rules_path: "/app/custom_opengrep_rules"
reporter_config: {}
parameters:
type: object
properties:
target_path:
type: string
default: "/workspace/android_test"
description: "Path to the decompiled Android source code for OpenGrep analysis."
volume_mode:
type: string
enum: ["ro", "rw"]
default: "ro"
description: "Volume mount mode for the attached workspace."
apk_path:
type: string
default: ""
description: "Path to the APK file for MobSF analysis (relative to workspace parent or absolute). If empty, MobSF analysis will be skipped."
opengrep_config:
type: object
description: "Configuration object forwarded to the OpenGrep module."
properties:
config:
type: string
enum: ["auto", "p/security-audit", "p/owasp-top-ten", "p/cwe-top-25"]
description: "Preset OpenGrep ruleset to run."
custom_rules_path:
type: string
description: "Directory that contains custom OpenGrep rules."
languages:
type: array
items:
type: string
description: "Restrict analysis to specific languages."
include_patterns:
type: array
items:
type: string
description: "File patterns to include in the scan."
exclude_patterns:
type: array
items:
type: string
description: "File patterns to exclude from the scan."
max_target_bytes:
type: integer
description: "Maximum file size to analyze (bytes)."
timeout:
type: integer
description: "Analysis timeout in seconds."
severity:
type: array
items:
type: string
enum: ["ERROR", "WARNING", "INFO"]
description: "Severities to include in the results."
confidence:
type: array
items:
type: string
enum: ["HIGH", "MEDIUM", "LOW"]
description: "Confidence levels to include in the results."
custom_rules_path:
type:
- string
- "null"
default: "/app/custom_opengrep_rules"
description: "Optional in-container path pointing to custom OpenGrep rules."
reporter_config:
type: object
description: "Configuration overrides for the SARIF reporter."
properties:
include_code_flows:
type: boolean
description: "Include code flow information in the SARIF output."
logical_id:
type: string
description: "Custom identifier to attach to the generated SARIF report."
output_schema:
type: object
properties:
sarif:
type: object
description: "SARIF-formatted findings produced by the workflow."
summary:
type: object
description: "Summary information about the analysis execution."
properties:
total_findings:
type: integer
severity_counts:
type: object
tool_metadata:
type: object

View File

@@ -0,0 +1,2 @@
requests
pydantic

View File

@@ -0,0 +1,280 @@
"""
Android Static Analysis Workflow - Analyze APKs using Jadx, MobSF, and OpenGrep
"""
import sys
import os
import logging
import subprocess
import time
import signal
from pathlib import Path
from typing import Dict, Any
from prefect import flow, task
# S'assurer que /app est dans le PYTHONPATH (exécutions Docker)
sys.path.insert(0, "/app")
# Import des modules internes
from toolbox.modules.android.jadx import JadxModule
from toolbox.modules.android.opengrep import OpenGrepModule
from toolbox.modules.reporter import SARIFReporter
from toolbox.modules.android.mobsf import MobSFModule
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---------------------- TASKS ---------------------- #
@task(name="jadx_decompilation")
async def run_jadx_task(workspace: Path, config: Dict[str, Any]) -> Dict[str, Any]:
print("Running Jadx APK decompilation")
print(f" APK file: {config.get('apk_path')}")
print(f" Output dir: {config.get('output_dir')}")
module = JadxModule()
result = await module.execute(config, workspace)
print(f"Jadx completed: {result.status}")
if result.error:
print(f"Jadx error: {result.error}")
if result.status == "success":
print(f"Jadx decompiled {result.summary.get('java_files', 0)} Java files")
print(f"Source dir: {result.summary.get('source_dir')}")
return result.dict()
@task(name="opengrep_analysis")
async def run_opengrep_task(workspace: Path, config: Dict[str, Any]) -> Dict[str, Any]:
print("Running OpenGrep static analysis")
print(f" Workspace: {workspace}")
print(f" Config: {config}")
module = OpenGrepModule()
result = await module.execute(config, workspace)
print(f"OpenGrep completed: {result.status}")
print(f"OpenGrep findings count: {len(result.findings)}")
print(f"OpenGrep summary: {result.summary}")
return result.dict()
@task(name="mobsf_analysis")
async def run_mobsf_task(workspace: Path, config: Dict[str, Any]) -> Dict[str, Any]:
print("Running MobSF static analysis")
print(f" APK file: {config.get('file_path')}")
print(f" MobSF URL: {config.get('mobsf_url')}")
module = MobSFModule()
result = await module.execute(config, workspace)
print(f"MobSF scan completed: {result.status}")
print(f"MobSF findings count: {len(result.findings)}")
return result.dict()
@task(name="android_report_generation")
async def generate_android_sarif_report(
opengrep_result: Dict[str, Any],
mobsf_result: Dict[str, Any],
config: Dict[str, Any],
workspace: Path
) -> Dict[str, Any]:
logger.info("Generating SARIF report for Android scan")
reporter = SARIFReporter()
all_findings = []
all_findings.extend(opengrep_result.get("findings", []))
# Add MobSF findings if available
if mobsf_result:
all_findings.extend(mobsf_result.get("findings", []))
reporter_config = {
**(config or {}),
"findings": all_findings,
"tool_name": "FuzzForge Android Static Analysis",
"tool_version": "1.0.0",
}
result = await reporter.execute(reporter_config, workspace)
# Le reporter renvoie typiquement {"sarif": {...}} dans result.dict()
return result.dict().get("sarif", {})
# ---------------------- FLOW ---------------------- #
@flow(name="android_static_analysis", log_prints=True)
async def main_flow(
target_path: str = os.getenv("FF_TARGET_PATH", "/workspace/android_test"),
volume_mode: str = "ro",
apk_path: str = "",
opengrep_config: Dict[str, Any] = {},
custom_rules_path: str = None,
reporter_config: Dict[str, Any] = {},
) -> Dict[str, Any]:
"""
Android static analysis workflow using OpenGrep and MobSF.
Args:
target_path: Path to decompiled source code (for OpenGrep analysis)
volume_mode: Volume mount mode (ro/rw)
apk_path: Path to APK file for MobSF analysis (relative to workspace or absolute)
opengrep_config: Configuration for OpenGrep module
custom_rules_path: Path to custom OpenGrep rules
reporter_config: Configuration for SARIF reporter
"""
print("📱 Starting Android Static Analysis Workflow")
print(f"Workspace: {target_path} (mode: {volume_mode})")
workspace = Path(target_path)
# Start MobSF server in background if APK analysis is needed
mobsf_process = None
if apk_path:
print("🚀 Starting MobSF server in background...")
try:
mobsf_process = subprocess.Popen(
["bash", "-c", "cd /app/mobsf && ./run.sh 127.0.0.1:8877"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print("⏳ Waiting for MobSF to initialize (45 seconds)...")
time.sleep(45)
print("✅ MobSF should be ready now")
# Retrieve MobSF API key from secret file
print("🔑 Retrieving MobSF API key...")
try:
secret_file = Path("/root/.MobSF/secret")
if secret_file.exists():
secret = secret_file.read_text().strip()
if secret:
# API key is SHA256 hash of the secret file contents
import hashlib
api_key = hashlib.sha256(secret.encode()).hexdigest()
os.environ["MOBSF_API_KEY"] = api_key
print(f"✅ MobSF API key retrieved")
else:
print("⚠️ API key file is empty")
else:
print(f"⚠️ API key file not found at {secret_file}")
except Exception as e:
print(f"⚠️ Error retrieving API key: {e}")
except Exception as e:
print(f"⚠️ Failed to start MobSF: {e}")
mobsf_process = None
# Resolve APK path if provided
# Note: target_path gets mounted as /workspace/ in the execution container
# So all paths should be relative to /workspace/
apk_file_path = None
if apk_path:
apk_path_obj = Path(apk_path)
if apk_path_obj.is_absolute():
apk_file_path = str(apk_path_obj)
else:
# Relative paths are relative to /workspace/ (the mounted target directory)
apk_file_path = f"/workspace/{apk_path}"
print(f"APK path resolved to: {apk_file_path}")
print(f"Checking if APK exists in target: {(Path(target_path) / apk_path).exists()}")
# Set default Android-specific configuration if not provided
if not opengrep_config:
opengrep_config = {
"languages": ["java", "kotlin"], # Focus on Android languages
}
# Use custom Android rules if available, otherwise use custom_rules_path param
if custom_rules_path:
opengrep_config["custom_rules_path"] = custom_rules_path
elif "custom_rules_path" not in opengrep_config:
# Default to custom Android security rules
opengrep_config["custom_rules_path"] = "/app/custom_opengrep_rules"
try:
# --- Phase 1 : Jadx Decompilation ---
jadx_result = None
actual_workspace = workspace
if apk_file_path:
print(f"Phase 1: Jadx decompilation of APK: {apk_file_path}")
jadx_config = {
"apk_path": apk_file_path,
"output_dir": "jadx_output",
"overwrite": True,
"threads": 4,
}
jadx_result = await run_jadx_task(workspace, jadx_config)
if jadx_result.get("status") == "success":
# Use Jadx source output as workspace for OpenGrep
source_dir = jadx_result.get("summary", {}).get("source_dir")
if source_dir:
actual_workspace = Path(source_dir)
print(f"✅ Jadx decompiled {jadx_result.get('summary', {}).get('java_files', 0)} Java files")
print(f" OpenGrep will analyze: {source_dir}")
else:
print(f"⚠️ Jadx failed: {jadx_result.get('error', 'unknown error')}")
else:
print("Phase 1: Jadx decompilation skipped (no APK provided)")
# --- Phase 2 : OpenGrep ---
print("Phase 2: OpenGrep analysis on source code")
print(f"Using config: {opengrep_config}")
opengrep_result = await run_opengrep_task(actual_workspace, opengrep_config)
# --- Phase 3 : MobSF ---
mobsf_result = None
if apk_file_path:
print(f"Phase 3: MobSF analysis on APK: {apk_file_path}")
mobsf_config = {
"mobsf_url": "http://localhost:8877",
"file_path": apk_file_path,
"api_key": os.environ.get("MOBSF_API_KEY", "")
}
print(f"Using MobSF config (api_key={mobsf_config['api_key'][:10]}...): {mobsf_config}")
mobsf_result = await run_mobsf_task(workspace, mobsf_config)
print(f"MobSF result: {mobsf_result}")
else:
print(f"Phase 3: MobSF analysis skipped (apk_path='{apk_path}' empty)")
# --- Phase 4 : Rapport SARIF ---
print("Phase 4: SARIF report generation")
sarif_report = await generate_android_sarif_report(
opengrep_result, mobsf_result, reporter_config or {}, workspace
)
findings = sarif_report.get("runs", [{}])[0].get("results", []) if sarif_report else []
print(f"✅ Workflow complete with {len(findings)} findings")
return sarif_report
except Exception as e:
logger.error(f"Workflow failed: {e}")
print(f"❌ Workflow failed: {e}")
# Retourner un squelette SARIF minimal en cas d'échec
return {
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {"driver": {"name": "FuzzForge Android Static Analysis"}},
"results": [],
"invocations": [
{
"executionSuccessful": False,
"exitCode": 1,
"exitCodeDescription": str(e),
}
],
}
],
}
finally:
# Cleanup: Stop MobSF if it was started
if mobsf_process:
print("🛑 Stopping MobSF server...")
try:
mobsf_process.terminate()
mobsf_process.wait(timeout=5)
print("✅ MobSF stopped")
except Exception as e:
print(f"⚠️ Error stopping MobSF: {e}")
try:
mobsf_process.kill()
except:
pass

View File

@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
# Import each workflow individually to handle failures gracefully
security_assessment_flow = None
secret_detection_flow = None
android_static_analysis_flow = None
# Try to import each workflow individually
try:
@@ -42,6 +43,11 @@ try:
except ImportError as e:
logger.warning(f"Failed to import secret_detection_scan workflow: {e}")
try:
from .android_static_analysis.workflow import main_flow as android_static_analysis_flow
except ImportError as e:
logger.warning(f"Failed to import android_static_analysis workflow: {e}")
# Manual registry - developers add workflows here after creation
# Only include workflows that were successfully imported
@@ -70,6 +76,17 @@ if secret_detection_flow is not None:
"tags": ["secrets", "credentials", "detection", "trufflehog", "gitleaks", "comprehensive"]
}
if android_static_analysis_flow is not None:
WORKFLOW_REGISTRY["android_static_analysis"] = {
"flow": android_static_analysis_flow,
"module_path": "toolbox.workflows.android_static_analysis.workflow",
"function_name": "main_flow",
"description": "Perform static analysis on Android applications using OpenGrep",
"version": "1.0.0",
"author": "FuzzForge Team",
"tags": ["android", "static-analysis", "security", "opengrep", "semgrep"]
}
#
# To add a new workflow, follow this pattern:
#

View File

@@ -46,7 +46,7 @@ services:
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
PREFECT_SERVER_API_HOST: 0.0.0.0
PREFECT_API_URL: http://prefect-server:4200/api
PREFECT_API_URL: http://localhost:4200/api
PREFECT_MESSAGING_BROKER: prefect_redis.messaging
PREFECT_MESSAGING_CACHE: prefect_redis.messaging
PREFECT_REDIS_MESSAGING_HOST: redis

View File

@@ -0,0 +1,25 @@
# android_test
FuzzForge security testing project.
## Quick Start
```bash
# List available workflows
fuzzforge workflows
# Submit a workflow for analysis
fuzzforge workflow <workflow-name> /path/to/target
# Monitor run progress
fuzzforge monitor live <run-id>
# View findings
fuzzforge finding <run-id>
```
## Project Structure
- `.fuzzforge/` - Project data and configuration
- `.fuzzforge/config.yaml` - Project configuration
- `.fuzzforge/findings.db` - Local database for runs and findings

Binary file not shown.

View File

@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
android:versionCode="1"
android:versionName="1.0"
android:compileSdkVersion="30"
android:compileSdkVersionCodename="11"
package="app.beetlebug"
platformBuildVersionCode="30"
platformBuildVersionName="11">
<uses-sdk
android:minSdkVersion="23"
android:targetSdkVersion="30"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.USE_FINGERPRINT"/>
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.USE_BIOMETRIC"/>
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<application
android:theme="@style/Theme.Beetlebug"
android:label="@string/app_name"
android:icon="@mipmap/ic_launcher"
android:debuggable="true"
android:allowBackup="true"
android:supportsRtl="true"
android:extractNativeLibs="false"
android:roundIcon="@mipmap/ic_launcher_round"
android:appComponentFactory="androidx.core.app.CoreComponentFactory">
<activity
android:name="app.beetlebug.ctf.DisplayXSS"
android:exported="false"/>
<activity
android:name="app.beetlebug.ctf.BinaryPatchActivity"
android:exported="false"/>
<activity
android:name="app.beetlebug.ctf.b33tleAdministrator"
android:exported="true"/>
<activity
android:name="app.beetlebug.ctf.VulnerableWebView"
android:exported="true"/>
<activity
android:name="app.beetlebug.ctf.VulnerableClipboardActivity"
android:exported="false"/>
<activity
android:name="app.beetlebug.ctf.InsecureContentProvider"
android:exported="false"/>
<provider
android:name="app.beetlebug.handlers.VulnerableContentProvider"
android:enabled="true"
android:exported="true"
android:authorities="app.beetlebug.provider"/>
<activity
android:name="app.beetlebug.ctf.WebViewXSSActivity"
android:exported="false">
<intent-filter>
<action android:name="android.intent.action.VIEW"/>
</intent-filter>
</activity>
<activity android:name="app.beetlebug.ctf.DeeplinkAccountActivity">
<intent-filter>
<action android:name="android.intent.action.VIEW"/>
<category android:name="android.intent.category.DEFAULT"/>
<category android:name="android.intent.category.BROWSABLE"/>
<data
android:scheme="https"
android:host="beetlebug.com"
android:pathPrefix="/account"/>
</intent-filter>
</activity>
<activity android:name="app.beetlebug.ctf.FirebaseDatabaseActivity"/>
<activity android:name="app.beetlebug.ctf.SQLInjectionActivity"/>
<activity android:name="app.beetlebug.user.PlayerStats"/>
<activity
android:name="app.beetlebug.ctf.WebViewURLActivity"
android:exported="true"/>
<activity android:name="app.beetlebug.ctf.EmbeddedSecretSourceCode"/>
<activity android:name="app.beetlebug.ctf.EmbeddedSecretStrings"/>
<activity android:name="app.beetlebug.ctf.InsecureLoggingActivity"/>
<activity android:name="app.beetlebug.user.UserSignUp"/>
<activity android:name="app.beetlebug.Walkthrough">
<intent-filter>
<action android:name="android.intent.action.MAIN"/>
<category android:name="android.intent.category.LAUNCHER"/>
</intent-filter>
</activity>
<activity
android:name="app.beetlebug.ctf.InsecureStorageExternal"
android:exported="false"/>
<activity
android:name="app.beetlebug.FlagCaptured"
android:exported="false"
android:screenOrientation="portrait"/>
<activity
android:name="app.beetlebug.ctf.BiometricActivityDeeplink"
android:exported="true"/>
<activity android:name="app.beetlebug.ctf.VulnerableServiceActivity"/>
<activity android:name="app.beetlebug.ctf.InsecureStorageSQLite"/>
<activity android:name="app.beetlebug.ctf.InsecureStorageSharedPref"/>
<activity
android:name="app.beetlebug.ctf.VulnerableActivityIntent"
android:exported="false"/>
<activity
android:name="app.beetlebug.FlagsOverview"
android:screenOrientation="portrait"/>
<activity
android:name="app.beetlebug.MainActivity"
android:exported="false"/>
<service
android:name="app.beetlebug.handlers.VulnerableService"
android:protectionLevel="dangerous"
android:enabled="true"
android:exported="true"/>
<service
android:name="com.google.firebase.components.ComponentDiscoveryService"
android:exported="false"
android:directBootAware="true">
<meta-data
android:name="com.google.firebase.components:com.google.firebase.database.DatabaseRegistrar"
android:value="com.google.firebase.components.ComponentRegistrar"/>
</service>
<provider
android:name="com.google.firebase.provider.FirebaseInitProvider"
android:exported="false"
android:authorities="app.beetlebug.firebaseinitprovider"
android:initOrder="100"
android:directBootAware="true"/>
<activity
android:theme="@android:style/Theme.Translucent.NoTitleBar"
android:name="com.google.android.gms.common.api.GoogleApiActivity"
android:exported="false"/>
<meta-data
android:name="com.google.android.gms.version"
android:value="@integer/google_play_services_version"/>
</application>
</manifest>

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
implementation-class=com.google.android.gms.StrictVersionMatcherPlugin

View File

@@ -0,0 +1 @@
implementation-class=com.google.gms.googleservices.GoogleServicesPlugin

View File

@@ -0,0 +1,9 @@
<html>
<head>
<script type="text/javascript">
document.write("token :" + Android.getUserToken());
</script>
</head>
</html>

View File

@@ -0,0 +1,23 @@
<html>
<head>
<style>
.center {
margin: auto;
width: 50%;
padding: 10px;
}
</style>
</head>
<body>
<div class="center">
<img src="https://hafiz.ng/wp-content/uploads/2022/03/pwn.png" />
</div>
</body>
</html>

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@@ -0,0 +1,79 @@
<html>
<head>
<style>
* {box-sizing: border-box}
/* Add padding to containers */
.container {
padding: 16px;
}
/* Full-width input fields */
input[type=text], input[type=password] {
width: 100%;
padding: 15px;
margin: 5px 0 22px 0;
display: inline-block;
border: 1px solid black;
border-radius: 50px;
background: #f1f1f1;
}
input[type=text]:focus, input[type=password]:focus {
background-color: #ddd;
outline: none;
}
/* Overwrite default styles of hr */
hr {
border: 1px solid #f1f1f1;
margin-bottom: 25px;
}
/* Set a style for the submit/register button */
.registerbtn {
background-color: #4CAF50;
color: white;
border: 1.5px solid black;
padding: 16px 20px;
margin: 8px 0;
border-radius: 50px;
cursor: pointer;
width: 100%;
opacity: 0.9;
}
.registerbtn:hover {
opacity:1;
}
/* Add a blue text color to links */
a {
color: dodgerblue;
}
</style>
</head>
<form action="register">
<div class="container">
<h1>Register</h1>
<p>Please fill in this form to create an account.</p>
<em>0x71342e2</em>
<hr>
<label for="email"><b>Username</b></label>
<input type="text" placeholder="Enter username" name="email" id="email" required>
<label for="psw"><b>Password</b></label>
<input type="password" placeholder="Enter password" name="psw" id="psw" required>
<label for="psw-repeat"><b>Repeat Password</b></label>
<input type="password" placeholder="Repeat password" name="psw-repeat" id="psw-repeat" required>
<hr>
<button type="submit" class="registerbtn">Register</button>
</div>
</form>
</html>

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,3 @@
version=16.0.0
client=firebase-annotations
firebase-annotations_client=16.0.0

View File

@@ -0,0 +1,3 @@
version=16.0.0-beta04
client=firebase-appcheck-interop
firebase-appcheck-interop_client=16.0.0-beta04

View File

@@ -0,0 +1,3 @@
version=19.0.2
client=firebase-auth-interop
firebase-auth-interop_client=19.0.2

View File

@@ -0,0 +1,3 @@
version=20.0.0
client=firebase-common
firebase-common_client=20.0.0

View File

@@ -0,0 +1,3 @@
version=17.0.0
client=firebase-components
firebase-components_client=17.0.0

View File

@@ -0,0 +1,3 @@
version=18.0.0
client=firebase-database-collection
firebase-database-collection_client=18.0.0

View File

@@ -0,0 +1,3 @@
version=20.0.3
client=firebase-database
firebase-database_client=20.0.3

Some files were not shown because too many files have changed in this diff Show More