feat: Complete Temporal migration cleanup and fixes

- Remove obsolete docker_logs.py module and container diagnostics from SDK
- Fix security_assessment workflow metadata (vertical: rust -> python)
- Remove all Prefect references from documentation
- Add SDK exception handling test suite
- Clean up old test artifacts
This commit is contained in:
tduhamel42
2025-10-14 15:02:52 +02:00
parent 60ca088ecf
commit 40d48a8045
27 changed files with 379 additions and 9627 deletions

View File

@@ -26,7 +26,7 @@ What type of security workflow is this?
## Files
Please attach or provide links to your workflow files:
- [ ] `workflow.py` - Main Prefect flow implementation
- [ ] `workflow.py` - Main Temporal flow implementation
- [ ] `Dockerfile` - Container definition
- [ ] `metadata.yaml` - Workflow metadata
- [ ] Test files or examples

View File

@@ -1,92 +1,44 @@
# FuzzForge AI Architecture
**Last Updated:** 2025-10-01
**Status:** Approved Architecture Plan
**Current Phase:** Migration from Prefect to Temporal with Vertical Workers
**Last Updated:** 2025-10-14
**Status:** Production - Temporal with Vertical Workers
---
## Table of Contents
1. [Executive Summary](#executive-summary)
2. [Current Architecture (Prefect)](#current-architecture-prefect)
3. [Target Architecture (Temporal + Vertical Workers)](#target-architecture-temporal--vertical-workers)
4. [Vertical Worker Model](#vertical-worker-model)
5. [Storage Strategy (MinIO)](#storage-strategy-minio)
6. [Dynamic Workflow Loading](#dynamic-workflow-loading)
7. [Architecture Principles](#architecture-principles)
8. [Component Details](#component-details)
9. [Scaling Strategy](#scaling-strategy)
10. [File Lifecycle Management](#file-lifecycle-management)
11. [Future: Nomad Migration](#future-nomad-migration)
12. [Migration Timeline](#migration-timeline)
13. [Decision Log](#decision-log)
2. [Current Architecture (Temporal + Vertical Workers)](#current-architecture-temporal--vertical-workers)
3. [Vertical Worker Model](#vertical-worker-model)
4. [Storage Strategy (MinIO)](#storage-strategy-minio)
5. [Dynamic Workflow Loading](#dynamic-workflow-loading)
6. [Architecture Principles](#architecture-principles)
7. [Component Details](#component-details)
8. [Scaling Strategy](#scaling-strategy)
9. [File Lifecycle Management](#file-lifecycle-management)
10. [Future: Nomad Migration](#future-nomad-migration)
---
## Executive Summary
### The Decision
### The Architecture
**Replace Prefect with Temporal** using a **vertical worker architecture** where each worker is pre-built with domain-specific security toolchains (Android, Rust, Web, iOS, Blockchain, etc.). Use **MinIO** for unified storage across dev and production environments.
**Temporal orchestration** with a **vertical worker architecture** where each worker is pre-built with domain-specific security toolchains (Android, Rust, Web, iOS, Blockchain, OSS-Fuzz, etc.). Uses **MinIO** for unified S3-compatible storage across dev and production environments.
### Why This Change?
| Aspect | Current (Prefect) | Target (Temporal + Verticals) |
|--------|-------------------|-------------------------------|
| **Services** | 6 (Server, Postgres, Redis, Registry, Docker-proxy, Worker) | 6 (Temporal, MinIO, MinIO-setup, 3+ vertical workers) |
| **Orchestration** | Prefect (complex) | Temporal (simpler, more reliable) |
| **Worker Model** | Ephemeral containers per workflow | Long-lived vertical workers with pre-built toolchains |
| **Storage** | Docker Registry + volume mounts | MinIO (S3-compatible) with caching |
| **Dynamic Workflows** | Build image per workflow | Mount workflow code as volume (no rebuild) |
| **Target Access** | Host volume mounts (/Users, /home) | Upload to MinIO, download to cache |
| **Memory Usage** | ~1.85GB | ~2.3GB (+24%, worth it for benefits) |
### Key Benefits
### Key Architecture Features
1. **Vertical Specialization:** Pre-built toolchains (Android: Frida, apktool; Rust: AFL++, cargo-fuzz)
2. **Zero Startup Overhead:** Long-lived workers (no 5s container spawn per workflow)
2. **Zero Startup Overhead:** Long-lived workers (no container spawn per workflow)
3. **Dynamic Workflows:** Add workflows without rebuilding images (mount as volume)
4. **Unified Storage:** MinIO works identically in dev and prod (no environment-specific code)
4. **Unified Storage:** MinIO works identically in dev and prod
5. **Better Security:** No host filesystem mounts, isolated uploaded targets
6. **Automatic Cleanup:** MinIO lifecycle policies handle file expiration
7. **Marketing Advantage:** Sell "security verticals" not "generic orchestration" (safer Nomad BSL positioning)
8. **Scalability:** Clear path from single-host to multi-host to Nomad cluster
7. **Scalability:** Clear path from single-host to multi-host to Nomad cluster
---
## Current Architecture (Prefect)
### Infrastructure Components
```
┌─────────────────────────────────────────────────────────┐
│ Docker Compose Stack (6 services) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Prefect │ │ Postgres │ │ Redis │ │
│ │ Server │ │ (metadata) │ │ (queue) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Registry │ │ Docker Proxy │ │ Prefect │ │
│ │ (images) │ │ (isolation) │ │ Worker │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
```
### Pain Points
- **Complexity:** 6 services to manage, configure, and monitor
- **Registry overhead:** Must push/pull images for every workflow deployment
- **Volume mounting complexity:** job_variables configuration per workflow
- **Dynamic workflows:** Requires rebuilding and pushing Docker images
- **Scalability:** Unclear how to scale beyond single host
- **Resource usage:** ~1.85GB baseline
---
## Target Architecture (Temporal + Vertical Workers)
## Current Architecture (Temporal + Vertical Workers)
### Infrastructure Overview
@@ -155,7 +107,7 @@ Worker-android: ~512MB (varies by toolchain)
Worker-rust: ~512MB
Worker-web: ~512MB
─────────────────────────
Total: ~2.3GB (vs 1.85GB Prefect = +24%)
Total: ~2.3GB
Note: +450MB overhead is worth it for:
- Unified dev/prod architecture
@@ -1030,8 +982,8 @@ job "fuzzforge-worker-android" {
## Decision Log
### 2025-09-30: Initial Architecture Decision
- **Decision:** Migrate from Prefect to Temporal
### 2025-09-30: Architecture Implementation
- **Decision:** Temporal with Vertical Workers
- **Rationale:** Simpler infrastructure, better reliability, clear scaling path
### 2025-10-01: Vertical Worker Model

View File

@@ -84,9 +84,10 @@ docs(readme): update installation instructions
```
backend/toolbox/workflows/your_workflow/
├── __init__.py
├── workflow.py # Main Prefect flow
├── metadata.yaml # Workflow metadata
── Dockerfile # Container definition
├── workflow.py # Main Temporal workflow
├── activities.py # Workflow activities (optional)
── metadata.yaml # Workflow metadata (includes vertical field)
└── requirements.txt # Additional dependencies (optional)
```
2. **Register Your Workflow**

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
# FuzzForge AI Module
FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge security platform through natural language. It orchestrates local tooling, registered Agent-to-Agent (A2A) peers, and the Prefect-powered backend while keeping long-running context in memory and project knowledge graphs.
FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge security platform through natural language. It orchestrates local tooling, registered Agent-to-Agent (A2A) peers, and the Temporal-powered backend while keeping long-running context in memory and project knowledge graphs.
## Quick Start
@@ -32,7 +32,7 @@ FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge securi
```bash
fuzzforge ai agent
```
Keep the backend running (Prefect API at `FUZZFORGE_MCP_URL`) so workflow commands succeed.
Keep the backend running (Temporal API at `FUZZFORGE_MCP_URL`) so workflow commands succeed.
## Everyday Workflow
@@ -61,7 +61,7 @@ Inside `fuzzforge ai agent` you can mix slash commands and free-form prompts:
/sendfile SecurityAgent src/report.md "Please review"
You> route_to SecurityAnalyzer: scan ./backend for secrets
You> run fuzzforge workflow static_analysis_scan on ./test_projects/demo
You> search project knowledge for "prefect status" using INSIGHTS
You> search project knowledge for "temporal status" using INSIGHTS
```
Artifacts created during the conversation are served from `.fuzzforge/artifacts/` and exposed through the A2A HTTP API.
@@ -84,7 +84,7 @@ Use these to validate the setup once the agent shell is running:
- `run fuzzforge workflow static_analysis_scan on ./backend with target_branch=main`
- `show findings for that run once it finishes`
- `refresh the project knowledge graph for ./backend`
- `search project knowledge for "prefect readiness" using INSIGHTS`
- `search project knowledge for "temporal readiness" using INSIGHTS`
- `/recall terraform secrets`
- `/memory status`
- `ROUTE_TO SecurityAnalyzer: audit infrastructure_vulnerable`

View File

@@ -841,15 +841,15 @@ class FuzzForgeExecutor:
elif normalised_mode in {"read_write", "readwrite", "rw"}:
normalised_mode = "rw"
else:
# Fall back to Prefect defaults if we can't recognise the input
# Fall back to read-only if we can't recognise the input
normalised_mode = "ro"
# Resolve the target path to an absolute path for Prefect's validation
# Resolve the target path to an absolute path for validation
resolved_path = target_path or "."
try:
resolved_path = str(Path(resolved_path).expanduser().resolve())
except Exception:
# If resolution fails, Prefect will surface the validation error use the raw value
# If resolution fails, use the raw value
resolved_path = target_path
# Ensure configuration objects default to dictionaries instead of None

View File

@@ -1,6 +1,6 @@
name: security_assessment
version: "2.0.0"
vertical: rust
vertical: python
description: "Comprehensive security assessment workflow that scans files, analyzes code for vulnerabilities, and generates SARIF reports"
author: "FuzzForge Team"
tags:

View File

@@ -68,7 +68,7 @@ Response excerpt:
- Call `POST /graph/query` to explore project knowledge.
- Call `POST /project/files` to fetch raw files from the repository.
- Download finished scan summaries with `GET /artifacts/{id}`.
4. The AI module pushes Prefect workflow results into artifacts automatically, so remote agents can poll without re-running scans.
4. The AI module pushes Temporal workflow results into artifacts automatically, so remote agents can poll without re-running scans.
## Registration Flow
@@ -129,7 +129,7 @@ sequenceDiagram
participant Remote as Remote Agent
participant HTTP as A2A Server
participant Exec as Executor
participant Workflow as Prefect Backend
participant Workflow as Temporal Backend
Remote->>HTTP: POST / (message with tool request)
HTTP->>Exec: Forward message

View File

@@ -1,6 +1,6 @@
# AI Architecture
FuzzForge AI is the orchestration layer that lets large language models drive the broader security platform. Built on the Google ADK runtime, the module coordinates local tools, remote Agent-to-Agent (A2A) peers, and Prefect-backed workflows while persisting long-running context for every project.
FuzzForge AI is the orchestration layer that lets large language models drive the broader security platform. Built on the Google ADK runtime, the module coordinates local tools, remote Agent-to-Agent (A2A) peers, and Temporal-backed workflows while persisting long-running context for every project.
## System Diagram
@@ -27,7 +27,7 @@ graph TB
Executor --> Prompts[Prompt Templates]
Router --> RemoteAgents[Registered A2A Agents]
MCP --> Prefect[FuzzForge Backend]
MCP --> Temporal[FuzzForge Backend]
Memory --> SessionDB[Session Store]
Memory --> Semantic[Semantic Recall]
Memory --> Graphs[Cognee Graph]
@@ -44,7 +44,7 @@ sequenceDiagram
participant CLI as CLI / HTTP Surface
participant Exec as FuzzForgeExecutor
participant ADK as ADK Runner
participant Prefect as Prefect Backend
participant Temporal as Temporal Backend
participant Cognee as Cognee
participant Artifact as Artifact Cache
@@ -52,8 +52,8 @@ sequenceDiagram
CLI->>Exec: Normalised request + context ID
Exec->>ADK: Tool invocation (LiteLLM)
ADK-->>Exec: Structured response / tool result
Exec->>Prefect: (optional) submit workflow via MCP
Prefect-->>Exec: Run status updates
Exec->>Temporal: (optional) submit workflow via MCP
Temporal-->>Exec: Run status updates
Exec->>Cognee: (optional) knowledge query / ingestion
Cognee-->>Exec: Graph results
Exec->>Artifact: Persist generated files
@@ -69,7 +69,7 @@ sequenceDiagram
## Core Components
- **FuzzForgeAgent** (`ai/src/fuzzforge_ai/agent.py`) assembles the runtime: it loads environment variables, constructs the executor, and builds an ADK `Agent` backed by `LiteLlm`. The singleton accessor `get_fuzzforge_agent()` keeps CLI and server instances aligned and shares the generated agent card.
- **FuzzForgeExecutor** (`ai/src/fuzzforge_ai/agent_executor.py`) is the brain. It registers tools, manages session storage (SQLite or in-memory via `DatabaseSessionService` / `InMemorySessionService`), and coordinates artifact storage. The executor also tracks long-running Prefect workflows inside `pending_runs`, produces `TaskStatusUpdateEvent` objects, and funnels every response through ADKs `Runner` so traces include tool metadata.
- **FuzzForgeExecutor** (`ai/src/fuzzforge_ai/agent_executor.py`) is the brain. It registers tools, manages session storage (SQLite or in-memory via `DatabaseSessionService` / `InMemorySessionService`), and coordinates artifact storage. The executor also tracks long-running Temporal workflows inside `pending_runs`, produces `TaskStatusUpdateEvent` objects, and funnels every response through ADKs `Runner` so traces include tool metadata.
- **Remote agent registry** (`ai/src/fuzzforge_ai/remote_agent.py`) holds metadata for downstream agents and handles capability discovery over HTTP. Auto-registration is configured by `ConfigManager` so known agents attach on startup.
- **Memory services**:
- `FuzzForgeMemoryService` and `HybridMemoryManager` (`ai/src/fuzzforge_ai/memory_service.py`) provide conversation recall and bridge to Cognee datasets when configured.
@@ -77,15 +77,15 @@ sequenceDiagram
## Workflow Automation
The executor wraps Prefect MCP actions exposed by the backend:
The executor wraps Temporal MCP actions exposed by the backend:
| Tool | Source | Purpose |
| --- | --- | --- |
| `list_workflows_mcp` | `ai/src/fuzzforge_ai/agent_executor.py` | Enumerate available scans |
| `submit_security_scan_mcp` | `agent_executor.py` | Launch a scan and persist run metadata |
| `get_run_status_mcp` | `agent_executor.py` | Poll Prefect for status and push task events |
| `get_run_status_mcp` | `agent_executor.py` | Poll Temporal for status and push task events |
| `get_comprehensive_scan_summary` | `agent_executor.py` | Collect findings and bundle artifacts |
| `get_backend_status_mcp` | `agent_executor.py` | Block submissions until Prefect reports `ready` |
| `get_backend_status_mcp` | `agent_executor.py` | Block submissions until Temporal reports `ready` |
The CLI surface mirrors these helpers as natural-language prompts (`You> run fuzzforge workflow …`). ADKs `Runner` handles retries and ensures each tool call yields structured `Event` objects for downstream instrumentation.

View File

@@ -87,7 +87,7 @@ If the Cognee variables are omitted, graph-specific tools remain available but r
FUZZFORGE_MCP_URL=http://localhost:8010/mcp
```
The agent uses this endpoint to list, launch, and monitor Prefect workflows.
The agent uses this endpoint to list, launch, and monitor Temporal workflows.
## Tracing & Observability

View File

@@ -53,7 +53,7 @@ All runs automatically skip `.fuzzforge/**` and `.git/**` to avoid recursive ing
You> refresh the project knowledge graph for ./backend
Assistant> Kicks off `fuzzforge ingest` with recursive scan
You> search project knowledge for "prefect workflow" using INSIGHTS
You> search project knowledge for "temporal workflow" using INSIGHTS
Assistant> Routes to Cognee `search_project_knowledge`
You> ingest_to_dataset("Design doc for new scanner", "insights")
@@ -70,7 +70,7 @@ LLM_PROVIDER=openai
LITELLM_MODEL=gpt-5-mini
OPENAI_API_KEY=sk-your-key
# FuzzForge backend (Prefect-powered)
# FuzzForge backend (Temporal-powered)
FUZZFORGE_MCP_URL=http://localhost:8010/mcp
# Optional: knowledge graph provider

View File

@@ -4,7 +4,7 @@ sidebar_position: 1
# FuzzForge AI Module
FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge security platform through natural language. It orchestrates local tooling, registered Agent-to-Agent (A2A) peers, and the Prefect-powered backend while keeping long-running context in memory and project knowledge graphs.
FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge security platform through natural language. It orchestrates local tooling, registered Agent-to-Agent (A2A) peers, and the Temporal-powered backend while keeping long-running context in memory and project knowledge graphs.
## Quick Start
@@ -36,7 +36,7 @@ FuzzForge AI is the multi-agent layer that lets you operate the FuzzForge securi
```bash
fuzzforge ai agent
```
Keep the backend running (Prefect API at `FUZZFORGE_MCP_URL`) so workflow commands succeed.
Keep the backend running (Temporal API at `FUZZFORGE_MCP_URL`) so workflow commands succeed.
## Everyday Workflow
@@ -65,7 +65,7 @@ Inside `fuzzforge ai agent` you can mix slash commands and free-form prompts:
/sendfile SecurityAgent src/report.md "Please review"
You> route_to SecurityAnalyzer: scan ./backend for secrets
You> run fuzzforge workflow static_analysis_scan on ./test_projects/demo
You> search project knowledge for "prefect status" using INSIGHTS
You> search project knowledge for "temporal status" using INSIGHTS
```
Artifacts created during the conversation are served from `.fuzzforge/artifacts/` and exposed through the A2A HTTP API.
@@ -88,7 +88,7 @@ Use these to validate the setup once the agent shell is running:
- `run fuzzforge workflow static_analysis_scan on ./backend with target_branch=main`
- `show findings for that run once it finishes`
- `refresh the project knowledge graph for ./backend`
- `search project knowledge for "prefect readiness" using INSIGHTS`
- `search project knowledge for "temporal readiness" using INSIGHTS`
- `/recall terraform secrets`
- `/memory status`
- `ROUTE_TO SecurityAnalyzer: audit infrastructure_vulnerable`

View File

@@ -33,7 +33,7 @@ Assistant> Streams the `get_comprehensive_scan_summary` output and attaches the
You> refresh the project knowledge graph for ./backend
Assistant> Launches `fuzzforge ingest --path ./backend --recursive` and reports file counts.
You> search project knowledge for "prefect readiness" using INSIGHTS
You> search project knowledge for "temporal readiness" using INSIGHTS
Assistant> Routes to Cognee via `query_project_knowledge_api` and returns the top matches.
You> recall "api key rotation"
@@ -52,7 +52,7 @@ Assistant> Uploads the file as an artifact and notifies the remote agent.
## Prompt Tips
- Use explicit verbs (`list`, `run`, `search`) to trigger the Prefect workflow helpers.
- Use explicit verbs (`list`, `run`, `search`) to trigger the Temporal workflow helpers.
- Include parameter names inline (`with target_branch=main`) so the executor maps values to MCP tool inputs without additional clarification.
- When referencing prior runs, reuse the assistants run IDs or ask for "the last run"—the session store tracks them per context ID.
- If Cognee is not configured, graph queries return a friendly notice; set `LLM_COGNEE_*` variables to enable full answers.

View File

@@ -25,9 +25,9 @@ At a glance, FuzzForge is organized into several layers, each with a clear respo
- **Client Layer:** Where users and external systems interact (CLI, API clients, MCP server).
- **API Layer:** The FastAPI backend, which exposes REST endpoints and manages requests.
- **Orchestration Layer:** Prefect server and workers, which schedule and execute workflows.
- **Execution Layer:** Docker Engine and containers, where workflows actually run.
- **Storage Layer:** PostgreSQL database, Docker volumes, and a result cache for persistence.
- **Orchestration Layer:** Temporal server and vertical workers, which schedule and execute workflows.
- **Execution Layer:** Long-lived vertical worker containers with pre-installed toolchains, where workflows run.
- **Storage Layer:** PostgreSQL database, MinIO (S3-compatible storage), and worker cache for persistence.
Heres a simplified view of how these layers fit together:
@@ -46,8 +46,8 @@ graph TB
end
subgraph "Orchestration Layer"
Prefect[Prefect Server]
Workers[Prefect Workers]
Temporal[Temporal Server]
Workers[Vertical Workers]
Scheduler[Workflow Scheduler]
end
@@ -69,9 +69,9 @@ graph TB
FastAPI --> Router
Router --> Middleware
Middleware --> Prefect
Middleware --> Temporal
Prefect --> Workers
Temporal --> Workers
Workers --> Scheduler
Scheduler --> Docker

View File

@@ -363,12 +363,12 @@ class DependencyAnalysisWorkflow:
return sarif_report
```
**Key differences from Prefect:**
- Use `@workflow.defn` class instead of `@flow` function
- Use `@activity.defn` instead of `@task`
- Must call `get_target` activity to download from MinIO with isolation mode
**Key Temporal Workflow Concepts:**
- Use `@workflow.defn` class decorator to define workflows
- Use `@activity.defn` decorator for activity functions
- Call `get_target` activity to download targets from MinIO with workspace isolation
- Use `workflow.execute_activity()` with explicit timeouts and retry policies
- Use `workflow.logger` for logging (appears in Temporal UI)
- Use `workflow.logger` for logging (appears in Temporal UI and backend logs)
- Call `cleanup_cache` activity at end to clean up workspace
---

View File

@@ -204,7 +204,7 @@ curl -X POST http://localhost:8000/workflows/infrastructure_scan/submit \
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ MCP Tools │ │ Prefect
│ MCP Tools │ │ Temporal
│ - scan submit │ │ Workflows │
│ - results │ │ - Security │
│ - analysis │ │ - Fuzzing │

View File

@@ -1,6 +1,6 @@
# FuzzForge Documentation
Welcome to FuzzForge, a comprehensive security analysis platform built on Prefect 3 that automates security testing workflows. FuzzForge provides 6 production-ready workflows that run static analysis, secret detection, infrastructure scanning, penetration testing, and custom fuzzing campaigns with Docker-based isolation and SARIF-compliant reporting.
Welcome to FuzzForge, a comprehensive security analysis platform built on Temporal that automates security testing workflows. FuzzForge provides production-ready workflows that run static analysis, secret detection, infrastructure scanning, penetration testing, and custom fuzzing campaigns with Docker-based isolation and SARIF-compliant reporting.
## 🚀 Quick Navigation

View File

@@ -1,387 +0,0 @@
"""
Docker log integration for enhanced error reporting.
This module provides functionality to fetch and parse Docker container logs
to provide better context for deployment and workflow execution errors.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
import re
import subprocess
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class ContainerLogEntry:
"""A single log entry from a container."""
timestamp: datetime
level: str
message: str
stream: str # 'stdout' or 'stderr'
raw: str
@dataclass
class ContainerDiagnostics:
"""Complete diagnostics for a container."""
container_id: Optional[str]
status: str
exit_code: Optional[int]
error: Optional[str]
logs: List[ContainerLogEntry]
resource_usage: Dict[str, Any]
volume_mounts: List[Dict[str, str]]
class DockerLogIntegration:
"""
Integration with Docker to fetch container logs and diagnostics.
This class provides methods to fetch container logs, parse common error
patterns, and extract meaningful diagnostic information from Docker
containers related to FuzzForge workflow execution.
"""
def __init__(self):
self.docker_available = self._check_docker_availability()
# Common error patterns in container logs
self.error_patterns = {
'permission_denied': [
r'permission denied',
r'operation not permitted',
r'cannot access.*permission denied'
],
'out_of_memory': [
r'out of memory',
r'oom killed',
r'cannot allocate memory'
],
'image_pull_failed': [
r'failed to pull image',
r'pull access denied',
r'image not found'
],
'volume_mount_failed': [
r'invalid mount config',
r'mount denied',
r'no such file or directory.*mount'
],
'network_error': [
r'network is unreachable',
r'connection refused',
r'timeout.*connect'
]
}
def _check_docker_availability(self) -> bool:
"""Check if Docker is available and accessible."""
try:
result = subprocess.run(['docker', 'version', '--format', 'json'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
return False
def get_container_logs(self, container_name_or_id: str, tail: int = 100) -> List[ContainerLogEntry]:
"""
Fetch logs from a Docker container.
Args:
container_name_or_id: Container name or ID
tail: Number of log lines to retrieve
Returns:
List of parsed log entries
"""
if not self.docker_available:
logger.warning("Docker not available, cannot fetch container logs")
return []
try:
cmd = ['docker', 'logs', '--timestamps', '--tail', str(tail), container_name_or_id]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
logger.error(f"Failed to fetch logs for container {container_name_or_id}: {result.stderr}")
return []
return self._parse_docker_logs(result.stdout + result.stderr)
except subprocess.TimeoutExpired:
logger.error(f"Timeout fetching logs for container {container_name_or_id}")
return []
except Exception as e:
logger.error(f"Error fetching container logs: {e}")
return []
def _parse_docker_logs(self, raw_logs: str) -> List[ContainerLogEntry]:
"""Parse raw Docker logs into structured entries."""
entries = []
for line in raw_logs.strip().split('\n'):
if not line.strip():
continue
entry = self._parse_log_line(line)
if entry:
entries.append(entry)
return entries
def _parse_log_line(self, line: str) -> Optional[ContainerLogEntry]:
"""Parse a single log line with timestamp."""
# Docker log format: 2023-10-01T12:00:00.000000000Z message
timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\s+(.*)', line)
if timestamp_match:
timestamp_str, message = timestamp_match.groups()
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
except ValueError:
timestamp = datetime.now(timezone.utc)
else:
timestamp = datetime.now(timezone.utc)
message = line
# Determine log level from message content
level = self._extract_log_level(message)
# Determine stream (simplified - Docker doesn't clearly separate in combined output)
stream = 'stderr' if any(keyword in message.lower() for keyword in ['error', 'failed', 'exception']) else 'stdout'
return ContainerLogEntry(
timestamp=timestamp,
level=level,
message=message.strip(),
stream=stream,
raw=line
)
def _extract_log_level(self, message: str) -> str:
"""Extract log level from message content."""
message_lower = message.lower()
if any(keyword in message_lower for keyword in ['error', 'failed', 'exception', 'fatal']):
return 'ERROR'
elif any(keyword in message_lower for keyword in ['warning', 'warn']):
return 'WARNING'
elif any(keyword in message_lower for keyword in ['info', 'information']):
return 'INFO'
elif any(keyword in message_lower for keyword in ['debug']):
return 'DEBUG'
else:
return 'INFO'
def get_container_diagnostics(self, container_name_or_id: str) -> ContainerDiagnostics:
"""
Get complete diagnostics for a container including logs, status, and resource usage.
Args:
container_name_or_id: Container name or ID
Returns:
Complete container diagnostics
"""
if not self.docker_available:
return ContainerDiagnostics(
container_id=None,
status="unknown",
exit_code=None,
error="Docker not available",
logs=[],
resource_usage={},
volume_mounts=[]
)
# Get container inspect data
inspect_data = self._get_container_inspect(container_name_or_id)
# Get logs
logs = self.get_container_logs(container_name_or_id)
# Extract key information
if inspect_data:
state = inspect_data.get('State', {})
config = inspect_data.get('Config', {})
host_config = inspect_data.get('HostConfig', {})
status = state.get('Status', 'unknown')
exit_code = state.get('ExitCode')
error = state.get('Error', '')
# Get volume mounts
mounts = inspect_data.get('Mounts', [])
volume_mounts = [
{
'source': mount.get('Source', ''),
'destination': mount.get('Destination', ''),
'mode': mount.get('Mode', ''),
'type': mount.get('Type', '')
}
for mount in mounts
]
# Get resource limits
resource_usage = {
'memory_limit': host_config.get('Memory', 0),
'cpu_limit': host_config.get('CpuQuota', 0),
'cpu_period': host_config.get('CpuPeriod', 0)
}
else:
status = "not_found"
exit_code = None
error = f"Container {container_name_or_id} not found"
volume_mounts = []
resource_usage = {}
return ContainerDiagnostics(
container_id=container_name_or_id,
status=status,
exit_code=exit_code,
error=error,
logs=logs,
resource_usage=resource_usage,
volume_mounts=volume_mounts
)
def _get_container_inspect(self, container_name_or_id: str) -> Optional[Dict[str, Any]]:
"""Get container inspection data."""
try:
cmd = ['docker', 'inspect', container_name_or_id]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
return data[0] if data else None
except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception) as e:
logger.debug(f"Failed to inspect container {container_name_or_id}: {e}")
return None
def analyze_error_patterns(self, logs: List[ContainerLogEntry]) -> Dict[str, List[str]]:
"""
Analyze logs for common error patterns.
Args:
logs: List of log entries to analyze
Returns:
Dictionary mapping error types to matching log messages
"""
detected_errors = {}
for error_type, patterns in self.error_patterns.items():
matches = []
for log_entry in logs:
for pattern in patterns:
if re.search(pattern, log_entry.message, re.IGNORECASE):
matches.append(log_entry.message)
break # Don't match the same message multiple times
if matches:
detected_errors[error_type] = matches
return detected_errors
def get_container_names_by_label(self, label_filter: str) -> List[str]:
"""
Get container names that match a specific label filter.
Args:
label_filter: Label filter (e.g., "prefect.flow-run-id=12345")
Returns:
List of container names
"""
if not self.docker_available:
return []
try:
cmd = ['docker', 'ps', '-a', '--filter', f'label={label_filter}', '--format', '{{.Names}}']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode != 0:
return []
return [name.strip() for name in result.stdout.strip().split('\n') if name.strip()]
except Exception as e:
logger.debug(f"Failed to get containers by label {label_filter}: {e}")
return []
def suggest_fixes(self, error_analysis: Dict[str, List[str]]) -> List[str]:
"""
Suggest fixes based on detected error patterns.
Args:
error_analysis: Result from analyze_error_patterns()
Returns:
List of suggested fixes
"""
suggestions = []
if 'permission_denied' in error_analysis:
suggestions.extend([
"Check file permissions on the target path",
"Ensure the Docker daemon has access to the mounted volumes",
"Try running with elevated privileges or adjust volume ownership"
])
if 'out_of_memory' in error_analysis:
suggestions.extend([
"Increase memory limits for the workflow",
"Check if the target files are too large for available memory",
"Consider using streaming processing for large datasets"
])
if 'image_pull_failed' in error_analysis:
suggestions.extend([
"Check network connectivity to Docker registry",
"Verify image name and tag are correct",
"Ensure Docker registry credentials are configured"
])
if 'volume_mount_failed' in error_analysis:
suggestions.extend([
"Verify the target path exists and is accessible",
"Check volume mount syntax and permissions",
"Ensure the path is not already in use by another process"
])
if 'network_error' in error_analysis:
suggestions.extend([
"Check network connectivity",
"Verify backend services are running (docker-compose up -d)",
"Check firewall settings and port availability"
])
if not suggestions:
suggestions.append("Review the container logs above for specific error details")
return suggestions
# Global instance for easy access
docker_integration = DockerLogIntegration()

View File

@@ -1,8 +1,9 @@
"""
Enhanced exceptions for FuzzForge SDK with rich context and Docker integration.
Enhanced exceptions for FuzzForge SDK with rich context.
Provides comprehensive error information including container logs, diagnostics,
and actionable suggestions for troubleshooting.
Provides comprehensive error information and actionable suggestions for troubleshooting.
Note: Container diagnostics are not available in Temporal architecture as workflows
run in long-lived worker containers rather than ephemeral per-workflow containers.
"""
# Copyright (c) 2025 FuzzingLabs
#
@@ -21,8 +22,6 @@ import re
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from .docker_logs import docker_integration, ContainerDiagnostics
@dataclass
class ErrorContext:
@@ -31,7 +30,6 @@ class ErrorContext:
request_method: Optional[str] = None
request_data: Optional[Dict[str, Any]] = None
response_data: Optional[Dict[str, Any]] = None
container_diagnostics: Optional[ContainerDiagnostics] = None
suggested_fixes: List[str] = None
error_patterns: Dict[str, List[str]] = None
related_run_id: Optional[str] = None
@@ -62,49 +60,10 @@ class FuzzForgeError(Exception):
self.context = context or ErrorContext()
self.original_exception = original_exception
# Auto-populate container diagnostics if we have a run ID
if self.context.related_run_id and not self.context.container_diagnostics:
self._fetch_container_diagnostics()
def _fetch_container_diagnostics(self):
"""Fetch container diagnostics for the related run."""
if not self.context.related_run_id:
return
try:
# Try to find containers by Prefect run ID label
label_filter = f"prefect.flow-run-id={self.context.related_run_id}"
container_names = docker_integration.get_container_names_by_label(label_filter)
if container_names:
# Use the most recent container
container_name = container_names[0]
diagnostics = docker_integration.get_container_diagnostics(container_name)
# Analyze error patterns in logs
if diagnostics.logs:
error_analysis = docker_integration.analyze_error_patterns(diagnostics.logs)
suggestions = docker_integration.suggest_fixes(error_analysis)
self.context.container_diagnostics = diagnostics
self.context.error_patterns = error_analysis
self.context.suggested_fixes.extend(suggestions)
except Exception:
# Don't fail the main error because of diagnostics issues
pass
def get_summary(self) -> str:
"""Get a summary of the error with key details."""
parts = [self.message]
if self.context.container_diagnostics:
diag = self.context.container_diagnostics
if diag.status != 'running':
parts.append(f"Container status: {diag.status}")
if diag.exit_code is not None:
parts.append(f"Exit code: {diag.exit_code}")
if self.context.error_patterns:
detected = list(self.context.error_patterns.keys())
parts.append(f"Detected issues: {', '.join(detected)}")
@@ -153,18 +112,11 @@ class FuzzForgeHTTPError(FuzzForgeError):
self.response_text = response_text
def get_summary(self) -> str:
base = f"HTTP {self.status_code}: {self.message}"
if self.context.container_diagnostics:
diag = self.context.container_diagnostics
if diag.exit_code is not None and diag.exit_code != 0:
base += f" (Container exit code: {diag.exit_code})"
return base
return f"HTTP {self.status_code}: {self.message}"
class DeploymentError(FuzzForgeHTTPError):
"""Enhanced deployment errors with container diagnostics."""
"""Enhanced deployment errors."""
def __init__(
self,
@@ -181,23 +133,9 @@ class DeploymentError(FuzzForgeHTTPError):
context.workflow_name = workflow_name
# If we have a container name, get its diagnostics immediately
if container_name:
try:
diagnostics = docker_integration.get_container_diagnostics(container_name)
context.container_diagnostics = diagnostics
# Analyze logs for error patterns
if diagnostics.logs:
error_analysis = docker_integration.analyze_error_patterns(diagnostics.logs)
suggestions = docker_integration.suggest_fixes(error_analysis)
context.error_patterns = error_analysis
context.suggested_fixes.extend(suggestions)
except Exception:
# Don't fail on diagnostics
pass
# Note: Container diagnostics are not fetched in Temporal architecture.
# Workflows run in long-lived worker containers, not per-workflow containers.
# The container_name parameter is kept for backward compatibility but not used.
full_message = f"Deployment failed for workflow '{workflow_name}': {message}"
super().__init__(full_message, status_code, response_text, context)
@@ -292,22 +230,9 @@ class ContainerError(FuzzForgeError):
if context is None:
context = ErrorContext()
# Immediately fetch container diagnostics
try:
diagnostics = docker_integration.get_container_diagnostics(container_name)
context.container_diagnostics = diagnostics
# Analyze logs for patterns
if diagnostics.logs:
error_analysis = docker_integration.analyze_error_patterns(diagnostics.logs)
suggestions = docker_integration.suggest_fixes(error_analysis)
context.error_patterns = error_analysis
context.suggested_fixes.extend(suggestions)
except Exception:
# Don't fail on diagnostics
pass
# Note: Container diagnostics are not fetched in Temporal architecture.
# Workflows run in long-lived worker containers, not per-workflow containers.
# The container_name parameter is kept for backward compatibility but not used.
full_message = f"Container error ({container_name}): {message}"
if exit_code is not None:

View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
Quick smoke test for SDK exception handling after exceptions.py modifications.
Tests that the modified _fetch_container_diagnostics() no-op doesn't break exception flows.
"""
import sys
from pathlib import Path
# Add SDK to path
sdk_path = Path(__file__).parent / "src"
sys.path.insert(0, str(sdk_path))
from fuzzforge_sdk.exceptions import (
FuzzForgeError,
FuzzForgeHTTPError,
WorkflowNotFoundError,
RunNotFoundError,
ErrorContext,
DeploymentError,
WorkflowExecutionError,
ValidationError,
)
def test_basic_import():
"""Test that all exception classes can be imported."""
print("✓ All exception classes imported successfully")
def test_error_context():
"""Test ErrorContext instantiation."""
context = ErrorContext(
url="http://localhost:8000/test",
related_run_id="test-run-123",
workflow_name="test_workflow"
)
assert context.url == "http://localhost:8000/test"
assert context.related_run_id == "test-run-123"
assert context.workflow_name == "test_workflow"
print("✓ ErrorContext instantiation works")
def test_base_exception():
"""Test base FuzzForgeError."""
context = ErrorContext(related_run_id="test-run-456")
error = FuzzForgeError("Test error message", context=context)
assert error.message == "Test error message"
assert error.context.related_run_id == "test-run-456"
print("✓ FuzzForgeError creation works")
def test_http_error():
"""Test HTTP error creation."""
error = FuzzForgeHTTPError(
message="Test HTTP error",
status_code=500,
response_text='{"error": "Internal server error"}'
)
assert error.status_code == 500
assert error.message == "Test HTTP error"
assert error.context.response_data == {"error": "Internal server error"}
print("✓ FuzzForgeHTTPError creation works")
def test_workflow_not_found():
"""Test WorkflowNotFoundError with suggestions."""
error = WorkflowNotFoundError(
workflow_name="nonexistent_workflow",
available_workflows=["security_assessment", "secret_detection"]
)
assert error.workflow_name == "nonexistent_workflow"
assert len(error.context.suggested_fixes) > 0
print("✓ WorkflowNotFoundError with suggestions works")
def test_run_not_found():
"""Test RunNotFoundError."""
error = RunNotFoundError(run_id="missing-run-123")
assert error.run_id == "missing-run-123"
assert error.context.related_run_id == "missing-run-123"
assert len(error.context.suggested_fixes) > 0
print("✓ RunNotFoundError creation works")
def test_deployment_error():
"""Test DeploymentError."""
error = DeploymentError(
workflow_name="test_workflow",
message="Deployment failed",
deployment_id="deploy-123",
container_name="test-container-456" # Kept for backward compatibility
)
assert error.workflow_name == "test_workflow"
assert error.deployment_id == "deploy-123"
print("✓ DeploymentError creation works")
def test_workflow_execution_error():
"""Test WorkflowExecutionError."""
error = WorkflowExecutionError(
workflow_name="security_assessment",
run_id="run-789",
message="Execution timeout"
)
assert error.workflow_name == "security_assessment"
assert error.run_id == "run-789"
assert error.context.related_run_id == "run-789"
print("✓ WorkflowExecutionError creation works")
def test_validation_error():
"""Test ValidationError."""
error = ValidationError(
field_name="target_path",
message="Path does not exist",
provided_value="/nonexistent/path",
expected_format="Valid directory path"
)
assert error.field_name == "target_path"
assert error.provided_value == "/nonexistent/path"
assert len(error.context.suggested_fixes) > 0
print("✓ ValidationError with suggestions works")
def test_exception_string_representation():
"""Test exception summary and string conversion."""
error = FuzzForgeHTTPError(
message="Test error",
status_code=404,
response_text="Not found"
)
summary = error.get_summary()
assert "404" in summary
assert "Test error" in summary
str_repr = str(error)
assert str_repr == summary
print("✓ Exception string representation works")
def test_exception_detailed_info():
"""Test detailed error information."""
context = ErrorContext(
url="http://localhost:8000/test",
workflow_name="test_workflow"
)
error = FuzzForgeError("Test error", context=context)
info = error.get_detailed_info()
assert info["message"] == "Test error"
assert info["type"] == "FuzzForgeError"
assert info["url"] == "http://localhost:8000/test"
assert info["workflow_name"] == "test_workflow"
print("✓ Exception detailed info works")
def main():
"""Run all tests."""
print("\n" + "="*60)
print("SDK Exception Handling Smoke Tests")
print("="*60 + "\n")
tests = [
test_basic_import,
test_error_context,
test_base_exception,
test_http_error,
test_workflow_not_found,
test_run_not_found,
test_deployment_error,
test_workflow_execution_error,
test_validation_error,
test_exception_string_representation,
test_exception_detailed_info,
]
passed = 0
failed = 0
for test_func in tests:
try:
test_func()
passed += 1
except Exception as e:
print(f"{test_func.__name__} FAILED: {e}")
failed += 1
print("\n" + "="*60)
print(f"Results: {passed} passed, {failed} failed")
print("="*60 + "\n")
if failed > 0:
print("❌ SDK exception handling has issues")
return 1
else:
print("✅ SDK exception handling works correctly")
print("✅ The no-op _fetch_container_diagnostics() doesn't break exception flows")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,5 +0,0 @@
{
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"runs": [],
"version": "2.1.0"
}

View File

@@ -26,3 +26,10 @@ path = "fuzz_targets/fuzz_divide.rs"
test = false
doc = false
bench = false
[[bin]]
name = "fuzz_waterfall"
path = "fuzz_targets/fuzz_waterfall.rs"
test = false
doc = false
bench = false

View File

@@ -41,6 +41,73 @@ pub fn divide_numbers(data: &[u8]) -> Option<i32> {
Some(a / b)
}
/// Waterfall vulnerability: checks secret character by character
/// This is a classic sequential comparison vulnerability that creates
/// distinct code paths for coverage-guided fuzzing to discover.
pub fn check_secret_waterfall(data: &[u8]) -> usize {
const SECRET: &[u8] = b"FUZZINGLABS";
if data.is_empty() {
return 0;
}
let mut matches = 0;
// Check each character sequentially
// Each comparison creates a distinct code path for coverage guidance
for i in 0..std::cmp::min(data.len(), SECRET.len()) {
if data[i] != SECRET[i] {
// Wrong character - stop checking
return matches;
}
matches += 1;
// Add explicit comparisons to help coverage-guided fuzzing
// Each comparison creates a distinct code path for the fuzzer to detect
if matches >= 1 && data[0] == b'F' {
// F
}
if matches >= 2 && data[1] == b'U' {
// FU
}
if matches >= 3 && data[2] == b'Z' {
// FUZ
}
if matches >= 4 && data[3] == b'Z' {
// FUZZ
}
if matches >= 5 && data[4] == b'I' {
// FUZZI
}
if matches >= 6 && data[5] == b'N' {
// FUZZIN
}
if matches >= 7 && data[6] == b'G' {
// FUZZING
}
if matches >= 8 && data[7] == b'L' {
// FUZZINGL
}
if matches >= 9 && data[8] == b'A' {
// FUZZINGLA
}
if matches >= 10 && data[9] == b'B' {
// FUZZINGLAB
}
if matches >= 11 && data[10] == b'S' {
// FUZZINGLABS
}
}
// VULNERABILITY: Panics when complete secret found
if matches == SECRET.len() && data.len() >= SECRET.len() {
panic!("SECRET COMPROMISED! Found: {:?}", &data[..SECRET.len()]);
}
matches
}
#[cfg(test)]
mod tests {
use super::*;
@@ -55,4 +122,17 @@ mod tests {
let data = vec![3, 1, 2, 3, 4];
assert_eq!(process_buffer(&data), vec![3, 1, 2]);
}
#[test]
fn test_waterfall_partial_match() {
assert_eq!(check_secret_waterfall(b"F"), 1);
assert_eq!(check_secret_waterfall(b"FU"), 2);
assert_eq!(check_secret_waterfall(b"FUZZ"), 4);
}
#[test]
#[should_panic(expected = "SECRET COMPROMISED")]
fn test_waterfall_full_match() {
check_secret_waterfall(b"FUZZINGLABS");
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff