CI/CD Integration with Ephemeral Deployment Model (#14)

* feat: Complete migration from Prefect to Temporal

BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal

## Major Changes
- Replace Prefect with Temporal for workflow orchestration
- Implement vertical worker architecture (rust, android)
- Replace Docker registry with MinIO for unified storage
- Refactor activities to be co-located with workflows
- Update all API endpoints for Temporal compatibility

## Infrastructure
- New: docker-compose.temporal.yaml (Temporal + MinIO + workers)
- New: workers/ directory with rust and android vertical workers
- New: backend/src/temporal/ (manager, discovery)
- New: backend/src/storage/ (S3-cached storage with MinIO)
- New: backend/toolbox/common/ (shared storage activities)
- Deleted: docker-compose.yaml (old Prefect setup)
- Deleted: backend/src/core/prefect_manager.py
- Deleted: backend/src/services/prefect_stats_monitor.py
- Deleted: Docker registry and insecure-registries requirement

## Workflows
- Migrated: security_assessment workflow to Temporal
- New: rust_test workflow (example/test workflow)
- Deleted: secret_detection_scan (Prefect-based, to be reimplemented)
- Activities now co-located with workflows for independent testing

## API Changes
- Updated: backend/src/api/workflows.py (Temporal submission)
- Updated: backend/src/api/runs.py (Temporal status/results)
- Updated: backend/src/main.py (727 lines, TemporalManager integration)
- Updated: All 16 MCP tools to use TemporalManager

## Testing
-  All services healthy (Temporal, PostgreSQL, MinIO, workers, backend)
-  All API endpoints functional
-  End-to-end workflow test passed (72 findings from vulnerable_app)
-  MinIO storage integration working (target upload/download, results)
-  Worker activity discovery working (6 activities registered)
-  Tarball extraction working
-  SARIF report generation working

## Documentation
- ARCHITECTURE.md: Complete Temporal architecture documentation
- QUICKSTART_TEMPORAL.md: Getting started guide
- MIGRATION_DECISION.md: Why we chose Temporal over Prefect
- IMPLEMENTATION_STATUS.md: Migration progress tracking
- workers/README.md: Worker development guide

## Dependencies
- Added: temporalio>=1.6.0
- Added: boto3>=1.34.0 (MinIO S3 client)
- Removed: prefect>=3.4.18

* feat: Add Python fuzzing vertical with Atheris integration

This commit implements a complete Python fuzzing workflow using Atheris:

## Python Worker (workers/python/)
- Dockerfile with Python 3.11, Atheris, and build tools
- Generic worker.py for dynamic workflow discovery
- requirements.txt with temporalio, boto3, atheris dependencies
- Added to docker-compose.temporal.yaml with dedicated cache volume

## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/)
- Reusable module extending BaseModule
- Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py)
- Recursive search to find targets in nested directories
- Dynamically loads TestOneInput() function
- Configurable max_iterations and timeout
- Real-time stats callback support for live monitoring
- Returns findings as ModuleFinding objects

## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/)
- Temporal workflow for orchestrating fuzzing
- Downloads user code from MinIO
- Executes AtherisFuzzer module
- Uploads results to MinIO
- Cleans up cache after execution
- metadata.yaml with vertical: python for routing

## Test Project (test_projects/python_fuzz_waterfall/)
- Demonstrates stateful waterfall vulnerability
- main.py with check_secret() that leaks progress
- fuzz_target.py with Atheris TestOneInput() harness
- Complete README with usage instructions

## Backend Fixes
- Fixed parameter merging in REST API endpoints (workflows.py)
- Changed workflow parameter passing from positional args to kwargs (manager.py)
- Default parameters now properly merged with user parameters

## Testing
 Worker discovered AtherisFuzzingWorkflow
 Workflow executed end-to-end successfully
 Fuzz target auto-discovered in nested directories
 Atheris ran 100,000 iterations
 Results uploaded and cache cleaned

* chore: Complete Temporal migration with updated CLI/SDK/docs

This commit includes all remaining Temporal migration changes:

## CLI Updates (cli/)
- Updated workflow execution commands for Temporal
- Enhanced error handling and exceptions
- Updated dependencies in uv.lock

## SDK Updates (sdk/)
- Client methods updated for Temporal workflows
- Updated models for new workflow execution
- Updated dependencies in uv.lock

## Documentation Updates (docs/)
- Architecture documentation for Temporal
- Workflow concept documentation
- Resource management documentation (new)
- Debugging guide (new)
- Updated tutorials and how-to guides
- Troubleshooting updates

## README Updates
- Main README with Temporal instructions
- Backend README
- CLI README
- SDK README

## Other
- Updated IMPLEMENTATION_STATUS.md
- Removed old vulnerable_app.tar.gz

These changes complete the Temporal migration and ensure the
CLI/SDK work correctly with the new backend.

* fix: Use positional args instead of kwargs for Temporal workflows

The Temporal Python SDK's start_workflow() method doesn't accept
a 'kwargs' parameter. Workflows must receive parameters as positional
arguments via the 'args' parameter.

Changed from:
  args=workflow_args  # Positional arguments

This fixes the error:
  TypeError: Client.start_workflow() got an unexpected keyword argument 'kwargs'

Workflows now correctly receive parameters in order:
- security_assessment: [target_id, scanner_config, analyzer_config, reporter_config]
- atheris_fuzzing: [target_id, target_file, max_iterations, timeout_seconds]
- rust_test: [target_id, test_message]

* fix: Filter metadata-only parameters from workflow arguments

SecurityAssessmentWorkflow was receiving 7 arguments instead of 2-5.
The issue was that target_path and volume_mode from default_parameters
were being passed to the workflow, when they should only be used by
the system for configuration.

Now filters out metadata-only parameters (target_path, volume_mode)
before passing arguments to workflow execution.

* refactor: Remove Prefect leftovers and volume mounting legacy

Complete cleanup of Prefect migration artifacts:

Backend:
- Delete registry.py and workflow_discovery.py (Prefect-specific files)
- Remove Docker validation from setup.py (no longer needed)
- Remove ResourceLimits and VolumeMount models
- Remove target_path and volume_mode from WorkflowSubmission
- Remove supported_volume_modes from API and discovery
- Clean up metadata.yaml files (remove volume/path fields)
- Simplify parameter filtering in manager.py

SDK:
- Remove volume_mode parameter from client methods
- Remove ResourceLimits and VolumeMount models
- Remove Prefect error patterns from docker_logs.py
- Clean up WorkflowSubmission and WorkflowMetadata models

CLI:
- Remove Volume Modes display from workflow info

All removed features are Prefect-specific or Docker volume mounting
artifacts. Temporal workflows use MinIO storage exclusively.

* feat: Add comprehensive test suite and benchmark infrastructure

- Add 68 unit tests for fuzzer, scanner, and analyzer modules
- Implement pytest-based test infrastructure with fixtures
- Add 6 performance benchmarks with category-specific thresholds
- Configure GitHub Actions for automated testing and benchmarking
- Add test and benchmark documentation

Test coverage:
- AtherisFuzzer: 8 tests
- CargoFuzzer: 14 tests
- FileScanner: 22 tests
- SecurityAnalyzer: 24 tests

All tests passing (68/68)
All benchmarks passing (6/6)

* fix: Resolve all ruff linting violations across codebase

Fixed 27 ruff violations in 12 files:
- Removed unused imports (Depends, Dict, Any, Optional, etc.)
- Fixed undefined workflow_info variable in workflows.py
- Removed dead code with undefined variables in atheris_fuzzer.py
- Changed f-string to regular string where no placeholders used

All files now pass ruff checks for CI/CD compliance.

* fix: Configure CI for unit tests only

- Renamed docker-compose.temporal.yaml → docker-compose.yml for CI compatibility
- Commented out integration-tests job (no integration tests yet)
- Updated test-summary to only depend on lint and unit-tests

CI will now run successfully with 68 unit tests. Integration tests can be added later.

* feat: Add CI/CD integration with ephemeral deployment model

Implements comprehensive CI/CD support for FuzzForge with on-demand worker management:

**Worker Management (v0.7.0)**
- Add WorkerManager for automatic worker lifecycle control
- Auto-start workers from stopped state when workflows execute
- Auto-stop workers after workflow completion
- Health checks and startup timeout handling (90s default)

**CI/CD Features**
- `--fail-on` flag: Fail builds based on SARIF severity levels (error/warning/note/info)
- `--export-sarif` flag: Export findings in SARIF 2.1.0 format
- `--auto-start`/`--auto-stop` flags: Control worker lifecycle
- Exit code propagation: Returns 1 on blocking findings, 0 on success

**Exit Code Fix**
- Add `except typer.Exit: raise` handlers at 3 critical locations
- Move worker cleanup to finally block for guaranteed execution
- Exit codes now propagate correctly even when build fails

**CI Scripts & Examples**
- ci-start.sh: Start FuzzForge services with health checks
- ci-stop.sh: Clean shutdown with volume preservation option
- GitHub Actions workflow example (security-scan.yml)
- GitLab CI pipeline example (.gitlab-ci.example.yml)
- docker-compose.ci.yml: CI-optimized compose file with profiles

**OSS-Fuzz Integration**
- New ossfuzz_campaign workflow for running OSS-Fuzz projects
- OSS-Fuzz worker with Docker-in-Docker support
- Configurable campaign duration and project selection

**Documentation**
- Comprehensive CI/CD integration guide (docs/how-to/cicd-integration.md)
- Updated architecture docs with worker lifecycle details
- Updated workspace isolation documentation
- CLI README with worker management examples

**SDK Enhancements**
- Add get_workflow_worker_info() endpoint
- Worker vertical metadata in workflow responses

**Testing**
- All workflows tested: security_assessment, atheris_fuzzing, secret_detection, cargo_fuzzing
- All monitoring commands tested: stats, crashes, status, finding
- Full CI pipeline simulation verified
- Exit codes verified for success/failure scenarios

Ephemeral CI/CD model: ~3-4GB RAM, ~60-90s startup, runs entirely in CI containers.

* fix: Resolve ruff linting violations in CI/CD code

- Remove unused variables (run_id, defaults, result)
- Remove unused imports
- Fix f-string without placeholders

All CI/CD integration files now pass ruff checks.
This commit is contained in:
tduhamel42
2025-10-14 10:13:45 +02:00
committed by GitHub
parent 987c49569c
commit 60ca088ecf
167 changed files with 26101 additions and 5703 deletions
+119
View File
@@ -0,0 +1,119 @@
# FuzzForge Test Suite
Comprehensive test infrastructure for FuzzForge modules and workflows.
## Directory Structure
```
tests/
├── conftest.py # Shared pytest fixtures
├── unit/ # Fast, isolated unit tests
│ ├── test_modules/ # Module-specific tests
│ │ ├── test_cargo_fuzzer.py
│ │ └── test_atheris_fuzzer.py
│ ├── test_workflows/ # Workflow tests
│ └── test_api/ # API endpoint tests
├── integration/ # Integration tests (requires Docker)
└── fixtures/ # Test data and projects
├── test_projects/ # Vulnerable projects for testing
└── expected_results/ # Expected output for validation
```
## Running Tests
### All Tests
```bash
cd backend
pytest tests/ -v
```
### Unit Tests Only (Fast)
```bash
pytest tests/unit/ -v
```
### Integration Tests (Requires Docker)
```bash
# Start services
docker-compose up -d
# Run integration tests
pytest tests/integration/ -v
# Cleanup
docker-compose down
```
### With Coverage
```bash
pytest tests/ --cov=toolbox/modules --cov=src --cov-report=html
```
### Parallel Execution
```bash
pytest tests/unit/ -n auto
```
## Available Fixtures
### Workspace Fixtures
- `temp_workspace`: Empty temporary workspace
- `python_test_workspace`: Python project with vulnerabilities
- `rust_test_workspace`: Rust project with fuzz targets
### Module Fixtures
- `atheris_fuzzer`: AtherisFuzzer instance
- `cargo_fuzzer`: CargoFuzzer instance
- `file_scanner`: FileScanner instance
### Configuration Fixtures
- `atheris_config`: Default Atheris configuration
- `cargo_fuzz_config`: Default cargo-fuzz configuration
- `gitleaks_config`: Default Gitleaks configuration
### Mock Fixtures
- `mock_stats_callback`: Mock stats callback for fuzzing
- `mock_temporal_context`: Mock Temporal activity context
## Writing Tests
### Unit Test Example
```python
import pytest
@pytest.mark.asyncio
async def test_module_execution(cargo_fuzzer, rust_test_workspace, cargo_fuzz_config):
"""Test module execution"""
result = await cargo_fuzzer.execute(cargo_fuzz_config, rust_test_workspace)
assert result.status == "success"
assert result.execution_time > 0
```
### Integration Test Example
```python
@pytest.mark.integration
async def test_end_to_end_workflow():
"""Test complete workflow execution"""
# Test full workflow with real services
pass
```
## CI/CD Integration
Tests run automatically on:
- **Push to main/develop**: Full test suite
- **Pull requests**: Full test suite + coverage
- **Nightly**: Extended integration tests
See `.github/workflows/test.yml` for configuration.
## Code Coverage
Target coverage: **80%+** for core modules
View coverage report:
```bash
pytest tests/ --cov --cov-report=html
open htmlcov/index.html
```
+211
View File
@@ -11,9 +11,220 @@
import sys
from pathlib import Path
from typing import Dict, Any
import pytest
# Ensure project root is on sys.path so `src` is importable
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# Add toolbox to path for module imports
TOOLBOX = ROOT / "toolbox"
if str(TOOLBOX) not in sys.path:
sys.path.insert(0, str(TOOLBOX))
# ============================================================================
# Workspace Fixtures
# ============================================================================
@pytest.fixture
def temp_workspace(tmp_path):
"""Create a temporary workspace directory for testing"""
workspace = tmp_path / "workspace"
workspace.mkdir()
return workspace
@pytest.fixture
def python_test_workspace(temp_workspace):
"""Create a Python test workspace with sample files"""
# Create a simple Python project structure
(temp_workspace / "main.py").write_text("""
def process_data(data):
# Intentional bug: no bounds checking
return data[0:100]
def divide(a, b):
# Division by zero vulnerability
return a / b
""")
(temp_workspace / "config.py").write_text("""
# Hardcoded secrets for testing
API_KEY = "sk_test_1234567890abcdef"
DATABASE_URL = "postgresql://admin:password123@localhost/db"
AWS_SECRET = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
""")
return temp_workspace
@pytest.fixture
def rust_test_workspace(temp_workspace):
"""Create a Rust test workspace with fuzz targets"""
# Create Cargo.toml
(temp_workspace / "Cargo.toml").write_text("""[package]
name = "test_project"
version = "0.1.0"
edition = "2021"
[dependencies]
""")
# Create src/lib.rs
src_dir = temp_workspace / "src"
src_dir.mkdir()
(src_dir / "lib.rs").write_text("""
pub fn process_buffer(data: &[u8]) -> Vec<u8> {
if data.len() < 4 {
return Vec::new();
}
// Vulnerability: bounds checking issue
let size = data[0] as usize;
let mut result = Vec::new();
for i in 0..size {
result.push(data[i]);
}
result
}
""")
# Create fuzz directory structure
fuzz_dir = temp_workspace / "fuzz"
fuzz_dir.mkdir()
(fuzz_dir / "Cargo.toml").write_text("""[package]
name = "test_project-fuzz"
version = "0.0.0"
edition = "2021"
[dependencies]
libfuzzer-sys = "0.4"
[dependencies.test_project]
path = ".."
[[bin]]
name = "fuzz_target_1"
path = "fuzz_targets/fuzz_target_1.rs"
""")
fuzz_targets_dir = fuzz_dir / "fuzz_targets"
fuzz_targets_dir.mkdir()
(fuzz_targets_dir / "fuzz_target_1.rs").write_text("""#![no_main]
use libfuzzer_sys::fuzz_target;
use test_project::process_buffer;
fuzz_target!(|data: &[u8]| {
let _ = process_buffer(data);
});
""")
return temp_workspace
# ============================================================================
# Module Configuration Fixtures
# ============================================================================
@pytest.fixture
def atheris_config():
"""Default Atheris fuzzer configuration"""
return {
"target_file": "auto-discover",
"max_iterations": 1000,
"timeout_seconds": 10,
"corpus_dir": None
}
@pytest.fixture
def cargo_fuzz_config():
"""Default cargo-fuzz configuration"""
return {
"target_name": None,
"max_iterations": 1000,
"timeout_seconds": 10,
"sanitizer": "address"
}
@pytest.fixture
def gitleaks_config():
"""Default Gitleaks configuration"""
return {
"config_path": None,
"scan_uncommitted": True
}
@pytest.fixture
def file_scanner_config():
"""Default file scanner configuration"""
return {
"scan_patterns": ["*.py", "*.rs", "*.js"],
"exclude_patterns": ["*.test.*", "*.spec.*"],
"max_file_size": 1048576 # 1MB
}
# ============================================================================
# Module Instance Fixtures
# ============================================================================
@pytest.fixture
def atheris_fuzzer():
"""Create an AtherisFuzzer instance"""
from modules.fuzzer.atheris_fuzzer import AtherisFuzzer
return AtherisFuzzer()
@pytest.fixture
def cargo_fuzzer():
"""Create a CargoFuzzer instance"""
from modules.fuzzer.cargo_fuzzer import CargoFuzzer
return CargoFuzzer()
@pytest.fixture
def file_scanner():
"""Create a FileScanner instance"""
from modules.scanner.file_scanner import FileScanner
return FileScanner()
# ============================================================================
# Mock Fixtures
# ============================================================================
@pytest.fixture
def mock_stats_callback():
"""Mock stats callback for fuzzing"""
stats_received = []
async def callback(stats: Dict[str, Any]):
stats_received.append(stats)
callback.stats_received = stats_received
return callback
@pytest.fixture
def mock_temporal_context():
"""Mock Temporal activity context"""
class MockActivityInfo:
def __init__(self):
self.workflow_id = "test-workflow-123"
self.activity_id = "test-activity-1"
self.attempt = 1
class MockContext:
def __init__(self):
self.info = MockActivityInfo()
return MockContext()
View File
@@ -1,82 +0,0 @@
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
from datetime import datetime, timezone, timedelta
from src.services.prefect_stats_monitor import PrefectStatsMonitor
from src.api import fuzzing
class FakeLog:
def __init__(self, message: str):
self.message = message
class FakeClient:
def __init__(self, logs):
self._logs = logs
async def read_logs(self, log_filter=None, limit=100, sort="TIMESTAMP_ASC"):
return self._logs
class FakeTaskRun:
def __init__(self):
self.id = "task-1"
self.start_time = datetime.now(timezone.utc) - timedelta(seconds=5)
def test_parse_stats_from_log_fuzzing():
mon = PrefectStatsMonitor()
msg = (
"INFO LIVE_STATS extra={'stats_type': 'fuzzing_live_update', "
"'executions': 42, 'executions_per_sec': 3.14, 'crashes': 1, 'unique_crashes': 1, 'corpus_size': 9}"
)
stats = mon._parse_stats_from_log(msg)
assert stats is not None
assert stats["stats_type"] == "fuzzing_live_update"
assert stats["executions"] == 42
def test_extract_stats_updates_and_broadcasts():
mon = PrefectStatsMonitor()
run_id = "run-123"
workflow = "wf"
fuzzing.initialize_fuzzing_tracking(run_id, workflow)
# Prepare a fake websocket to capture messages
sent = []
class FakeWS:
async def send_text(self, text: str):
sent.append(text)
fuzzing.active_connections[run_id] = [FakeWS()]
# Craft a log line the parser understands
msg = (
"INFO LIVE_STATS extra={'stats_type': 'fuzzing_live_update', "
"'executions': 10, 'executions_per_sec': 1.5, 'crashes': 0, 'unique_crashes': 0, 'corpus_size': 2}"
)
fake_client = FakeClient([FakeLog(msg)])
task_run = FakeTaskRun()
asyncio.run(mon._extract_stats_from_task(fake_client, run_id, task_run, workflow))
# Verify stats updated
stats = fuzzing.fuzzing_stats[run_id]
assert stats.executions == 10
assert stats.executions_per_sec == 1.5
# Verify a message was sent to WebSocket
assert sent, "Expected a stats_update message to be sent"
View File
@@ -0,0 +1,177 @@
"""
Unit tests for AtherisFuzzer module
"""
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
class TestAtherisFuzzerMetadata:
"""Test AtherisFuzzer metadata"""
async def test_metadata_structure(self, atheris_fuzzer):
"""Test that module metadata is properly defined"""
metadata = atheris_fuzzer.get_metadata()
assert metadata.name == "atheris_fuzzer"
assert metadata.category == "fuzzer"
assert "fuzzing" in metadata.tags
assert "python" in metadata.tags
@pytest.mark.asyncio
class TestAtherisFuzzerConfigValidation:
"""Test configuration validation"""
async def test_valid_config(self, atheris_fuzzer, atheris_config):
"""Test validation of valid configuration"""
assert atheris_fuzzer.validate_config(atheris_config) is True
async def test_invalid_max_iterations(self, atheris_fuzzer):
"""Test validation fails with invalid max_iterations"""
config = {
"target_file": "fuzz_target.py",
"max_iterations": -1,
"timeout_seconds": 10
}
with pytest.raises(ValueError, match="max_iterations"):
atheris_fuzzer.validate_config(config)
async def test_invalid_timeout(self, atheris_fuzzer):
"""Test validation fails with invalid timeout"""
config = {
"target_file": "fuzz_target.py",
"max_iterations": 1000,
"timeout_seconds": 0
}
with pytest.raises(ValueError, match="timeout_seconds"):
atheris_fuzzer.validate_config(config)
@pytest.mark.asyncio
class TestAtherisFuzzerDiscovery:
"""Test fuzz target discovery"""
async def test_auto_discover(self, atheris_fuzzer, python_test_workspace):
"""Test auto-discovery of Python fuzz targets"""
# Create a fuzz target file
(python_test_workspace / "fuzz_target.py").write_text("""
import atheris
import sys
def TestOneInput(data):
pass
if __name__ == "__main__":
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
""")
# Pass None for auto-discovery
target = atheris_fuzzer._discover_target(python_test_workspace, None)
assert target is not None
assert "fuzz_target.py" in str(target)
@pytest.mark.asyncio
class TestAtherisFuzzerExecution:
"""Test fuzzer execution logic"""
async def test_execution_creates_result(self, atheris_fuzzer, python_test_workspace, atheris_config):
"""Test that execution returns a ModuleResult"""
# Create a simple fuzz target
(python_test_workspace / "fuzz_target.py").write_text("""
import atheris
import sys
def TestOneInput(data):
if len(data) > 0:
pass
if __name__ == "__main__":
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
""")
# Use a very short timeout for testing
test_config = {
"target_file": "fuzz_target.py",
"max_iterations": 10,
"timeout_seconds": 1
}
# Mock the fuzzing subprocess to avoid actual execution
with patch.object(atheris_fuzzer, '_run_fuzzing', new_callable=AsyncMock, return_value=([], {"total_executions": 10})):
result = await atheris_fuzzer.execute(test_config, python_test_workspace)
assert result.module == "atheris_fuzzer"
assert result.status in ["success", "partial", "failed"]
assert isinstance(result.execution_time, float)
@pytest.mark.asyncio
class TestAtherisFuzzerStatsCallback:
"""Test stats callback functionality"""
async def test_stats_callback_invoked(self, atheris_fuzzer, python_test_workspace, atheris_config, mock_stats_callback):
"""Test that stats callback is invoked during fuzzing"""
(python_test_workspace / "fuzz_target.py").write_text("""
import atheris
import sys
def TestOneInput(data):
pass
if __name__ == "__main__":
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
""")
# Mock fuzzing to simulate stats
async def mock_run_fuzzing(test_one_input, target_path, workspace, max_iterations, timeout_seconds, stats_callback):
if stats_callback:
await stats_callback({
"total_execs": 100,
"execs_per_sec": 10.0,
"crashes": 0,
"coverage": 5,
"corpus_size": 2,
"elapsed_time": 10
})
return
with patch.object(atheris_fuzzer, '_run_fuzzing', side_effect=mock_run_fuzzing):
with patch.object(atheris_fuzzer, '_load_target_module', return_value=lambda x: None):
# Put stats_callback in config dict, not as kwarg
atheris_config["target_file"] = "fuzz_target.py"
atheris_config["stats_callback"] = mock_stats_callback
await atheris_fuzzer.execute(atheris_config, python_test_workspace)
# Verify callback was invoked
assert len(mock_stats_callback.stats_received) > 0
@pytest.mark.asyncio
class TestAtherisFuzzerFindingGeneration:
"""Test finding generation from crashes"""
async def test_create_crash_finding(self, atheris_fuzzer):
"""Test crash finding creation"""
finding = atheris_fuzzer.create_finding(
title="Crash: Exception in TestOneInput",
description="IndexError: list index out of range",
severity="high",
category="crash",
file_path="fuzz_target.py",
metadata={
"crash_type": "IndexError",
"stack_trace": "Traceback..."
}
)
assert finding.title == "Crash: Exception in TestOneInput"
assert finding.severity == "high"
assert finding.category == "crash"
assert "IndexError" in finding.metadata["crash_type"]
@@ -0,0 +1,177 @@
"""
Unit tests for CargoFuzzer module
"""
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
class TestCargoFuzzerMetadata:
"""Test CargoFuzzer metadata"""
async def test_metadata_structure(self, cargo_fuzzer):
"""Test that module metadata is properly defined"""
metadata = cargo_fuzzer.get_metadata()
assert metadata.name == "cargo_fuzz"
assert metadata.version == "0.11.2"
assert metadata.category == "fuzzer"
assert "fuzzing" in metadata.tags
assert "rust" in metadata.tags
@pytest.mark.asyncio
class TestCargoFuzzerConfigValidation:
"""Test configuration validation"""
async def test_valid_config(self, cargo_fuzzer, cargo_fuzz_config):
"""Test validation of valid configuration"""
assert cargo_fuzzer.validate_config(cargo_fuzz_config) is True
async def test_invalid_max_iterations(self, cargo_fuzzer):
"""Test validation fails with invalid max_iterations"""
config = {
"max_iterations": -1,
"timeout_seconds": 10,
"sanitizer": "address"
}
with pytest.raises(ValueError, match="max_iterations"):
cargo_fuzzer.validate_config(config)
async def test_invalid_timeout(self, cargo_fuzzer):
"""Test validation fails with invalid timeout"""
config = {
"max_iterations": 1000,
"timeout_seconds": 0,
"sanitizer": "address"
}
with pytest.raises(ValueError, match="timeout_seconds"):
cargo_fuzzer.validate_config(config)
async def test_invalid_sanitizer(self, cargo_fuzzer):
"""Test validation fails with invalid sanitizer"""
config = {
"max_iterations": 1000,
"timeout_seconds": 10,
"sanitizer": "invalid_sanitizer"
}
with pytest.raises(ValueError, match="sanitizer"):
cargo_fuzzer.validate_config(config)
@pytest.mark.asyncio
class TestCargoFuzzerWorkspaceValidation:
"""Test workspace validation"""
async def test_valid_workspace(self, cargo_fuzzer, rust_test_workspace):
"""Test validation of valid workspace"""
assert cargo_fuzzer.validate_workspace(rust_test_workspace) is True
async def test_nonexistent_workspace(self, cargo_fuzzer, tmp_path):
"""Test validation fails with nonexistent workspace"""
nonexistent = tmp_path / "does_not_exist"
with pytest.raises(ValueError, match="does not exist"):
cargo_fuzzer.validate_workspace(nonexistent)
async def test_workspace_is_file(self, cargo_fuzzer, tmp_path):
"""Test validation fails when workspace is a file"""
file_path = tmp_path / "file.txt"
file_path.write_text("test")
with pytest.raises(ValueError, match="not a directory"):
cargo_fuzzer.validate_workspace(file_path)
@pytest.mark.asyncio
class TestCargoFuzzerDiscovery:
"""Test fuzz target discovery"""
async def test_discover_targets(self, cargo_fuzzer, rust_test_workspace):
"""Test discovery of fuzz targets"""
targets = await cargo_fuzzer._discover_fuzz_targets(rust_test_workspace)
assert len(targets) == 1
assert "fuzz_target_1" in targets
async def test_no_fuzz_directory(self, cargo_fuzzer, temp_workspace):
"""Test discovery with no fuzz directory"""
targets = await cargo_fuzzer._discover_fuzz_targets(temp_workspace)
assert targets == []
@pytest.mark.asyncio
class TestCargoFuzzerExecution:
"""Test fuzzer execution logic"""
async def test_execution_creates_result(self, cargo_fuzzer, rust_test_workspace, cargo_fuzz_config):
"""Test that execution returns a ModuleResult"""
# Mock the build and run methods to avoid actual fuzzing
with patch.object(cargo_fuzzer, '_build_fuzz_target', new_callable=AsyncMock, return_value=True):
with patch.object(cargo_fuzzer, '_run_fuzzing', new_callable=AsyncMock, return_value=([], {"total_executions": 0, "crashes_found": 0})):
with patch.object(cargo_fuzzer, '_parse_crash_artifacts', new_callable=AsyncMock, return_value=[]):
result = await cargo_fuzzer.execute(cargo_fuzz_config, rust_test_workspace)
assert result.module == "cargo_fuzz"
assert result.status == "success"
assert isinstance(result.execution_time, float)
assert result.execution_time >= 0
async def test_execution_with_no_targets(self, cargo_fuzzer, temp_workspace, cargo_fuzz_config):
"""Test execution fails gracefully with no fuzz targets"""
result = await cargo_fuzzer.execute(cargo_fuzz_config, temp_workspace)
assert result.status == "failed"
assert "No fuzz targets found" in result.error
@pytest.mark.asyncio
class TestCargoFuzzerStatsCallback:
"""Test stats callback functionality"""
async def test_stats_callback_invoked(self, cargo_fuzzer, rust_test_workspace, cargo_fuzz_config, mock_stats_callback):
"""Test that stats callback is invoked during fuzzing"""
# Mock build/run to simulate stats generation
async def mock_run_fuzzing(workspace, target, config, callback):
# Simulate stats callback
if callback:
await callback({
"total_execs": 1000,
"execs_per_sec": 100.0,
"crashes": 0,
"coverage": 10,
"corpus_size": 5,
"elapsed_time": 10
})
return [], {"total_executions": 1000}
with patch.object(cargo_fuzzer, '_build_fuzz_target', new_callable=AsyncMock, return_value=True):
with patch.object(cargo_fuzzer, '_run_fuzzing', side_effect=mock_run_fuzzing):
with patch.object(cargo_fuzzer, '_parse_crash_artifacts', new_callable=AsyncMock, return_value=[]):
await cargo_fuzzer.execute(cargo_fuzz_config, rust_test_workspace, stats_callback=mock_stats_callback)
# Verify callback was invoked
assert len(mock_stats_callback.stats_received) > 0
assert mock_stats_callback.stats_received[0]["total_execs"] == 1000
@pytest.mark.asyncio
class TestCargoFuzzerFindingGeneration:
"""Test finding generation from crashes"""
async def test_create_finding_from_crash(self, cargo_fuzzer):
"""Test finding creation"""
finding = cargo_fuzzer.create_finding(
title="Crash: Segmentation Fault",
description="Test crash",
severity="critical",
category="crash",
file_path="fuzz/fuzz_targets/fuzz_target_1.rs",
metadata={"crash_type": "SIGSEGV"}
)
assert finding.title == "Crash: Segmentation Fault"
assert finding.severity == "critical"
assert finding.category == "crash"
assert finding.file_path == "fuzz/fuzz_targets/fuzz_target_1.rs"
assert finding.metadata["crash_type"] == "SIGSEGV"
@@ -0,0 +1,349 @@
"""
Unit tests for FileScanner module
"""
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[3] / "toolbox"))
@pytest.mark.asyncio
class TestFileScannerMetadata:
"""Test FileScanner metadata"""
async def test_metadata_structure(self, file_scanner):
"""Test that metadata has correct structure"""
metadata = file_scanner.get_metadata()
assert metadata.name == "file_scanner"
assert metadata.version == "1.0.0"
assert metadata.category == "scanner"
assert "files" in metadata.tags
assert "enumeration" in metadata.tags
assert metadata.requires_workspace is True
@pytest.mark.asyncio
class TestFileScannerConfigValidation:
"""Test configuration validation"""
async def test_valid_config(self, file_scanner):
"""Test that valid config passes validation"""
config = {
"patterns": ["*.py", "*.js"],
"max_file_size": 1048576,
"check_sensitive": True,
"calculate_hashes": False
}
assert file_scanner.validate_config(config) is True
async def test_default_config(self, file_scanner):
"""Test that empty config uses defaults"""
config = {}
assert file_scanner.validate_config(config) is True
async def test_invalid_patterns_type(self, file_scanner):
"""Test that non-list patterns raises error"""
config = {"patterns": "*.py"}
with pytest.raises(ValueError, match="patterns must be a list"):
file_scanner.validate_config(config)
async def test_invalid_max_file_size(self, file_scanner):
"""Test that invalid max_file_size raises error"""
config = {"max_file_size": -1}
with pytest.raises(ValueError, match="max_file_size must be a positive integer"):
file_scanner.validate_config(config)
async def test_invalid_max_file_size_type(self, file_scanner):
"""Test that non-integer max_file_size raises error"""
config = {"max_file_size": "large"}
with pytest.raises(ValueError, match="max_file_size must be a positive integer"):
file_scanner.validate_config(config)
@pytest.mark.asyncio
class TestFileScannerExecution:
"""Test scanner execution"""
async def test_scan_python_files(self, file_scanner, python_test_workspace):
"""Test scanning Python files"""
config = {
"patterns": ["*.py"],
"check_sensitive": False,
"calculate_hashes": False
}
result = await file_scanner.execute(config, python_test_workspace)
assert result.module == "file_scanner"
assert result.status == "success"
assert len(result.findings) > 0
# Check that Python files were found
python_files = [f for f in result.findings if f.file_path.endswith('.py')]
assert len(python_files) > 0
async def test_scan_all_files(self, file_scanner, python_test_workspace):
"""Test scanning all files with wildcard"""
config = {
"patterns": ["*"],
"check_sensitive": False,
"calculate_hashes": False
}
result = await file_scanner.execute(config, python_test_workspace)
assert result.status == "success"
assert len(result.findings) > 0
assert result.summary["total_files"] > 0
async def test_scan_with_multiple_patterns(self, file_scanner, python_test_workspace):
"""Test scanning with multiple patterns"""
config = {
"patterns": ["*.py", "*.txt"],
"check_sensitive": False,
"calculate_hashes": False
}
result = await file_scanner.execute(config, python_test_workspace)
assert result.status == "success"
assert len(result.findings) > 0
async def test_empty_workspace(self, file_scanner, temp_workspace):
"""Test scanning empty workspace"""
config = {
"patterns": ["*.py"],
"check_sensitive": False
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
assert len(result.findings) == 0
assert result.summary["total_files"] == 0
@pytest.mark.asyncio
class TestFileScannerSensitiveDetection:
"""Test sensitive file detection"""
async def test_detect_env_file(self, file_scanner, temp_workspace):
"""Test detection of .env file"""
# Create .env file
(temp_workspace / ".env").write_text("API_KEY=secret123")
config = {
"patterns": ["*"],
"check_sensitive": True,
"calculate_hashes": False
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
# Check for sensitive file finding
sensitive_findings = [f for f in result.findings if f.category == "sensitive_file"]
assert len(sensitive_findings) > 0
assert any(".env" in f.title for f in sensitive_findings)
async def test_detect_private_key(self, file_scanner, temp_workspace):
"""Test detection of private key file"""
# Create private key file
(temp_workspace / "id_rsa").write_text("-----BEGIN RSA PRIVATE KEY-----")
config = {
"patterns": ["*"],
"check_sensitive": True
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
sensitive_findings = [f for f in result.findings if f.category == "sensitive_file"]
assert len(sensitive_findings) > 0
async def test_no_sensitive_detection_when_disabled(self, file_scanner, temp_workspace):
"""Test that sensitive detection can be disabled"""
(temp_workspace / ".env").write_text("API_KEY=secret123")
config = {
"patterns": ["*"],
"check_sensitive": False
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
sensitive_findings = [f for f in result.findings if f.category == "sensitive_file"]
assert len(sensitive_findings) == 0
@pytest.mark.asyncio
class TestFileScannerHashing:
"""Test file hashing functionality"""
async def test_hash_calculation(self, file_scanner, temp_workspace):
"""Test SHA256 hash calculation"""
# Create test file
test_file = temp_workspace / "test.txt"
test_file.write_text("Hello World")
config = {
"patterns": ["*.txt"],
"calculate_hashes": True
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
# Find the test.txt finding
txt_findings = [f for f in result.findings if "test.txt" in f.file_path]
assert len(txt_findings) > 0
# Check that hash was calculated
finding = txt_findings[0]
assert finding.metadata.get("file_hash") is not None
assert len(finding.metadata["file_hash"]) == 64 # SHA256 hex length
async def test_no_hash_when_disabled(self, file_scanner, temp_workspace):
"""Test that hashing can be disabled"""
test_file = temp_workspace / "test.txt"
test_file.write_text("Hello World")
config = {
"patterns": ["*.txt"],
"calculate_hashes": False
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
txt_findings = [f for f in result.findings if "test.txt" in f.file_path]
if len(txt_findings) > 0:
finding = txt_findings[0]
assert finding.metadata.get("file_hash") is None
@pytest.mark.asyncio
class TestFileScannerFileTypes:
"""Test file type detection"""
async def test_detect_python_type(self, file_scanner, temp_workspace):
"""Test detection of Python file type"""
(temp_workspace / "script.py").write_text("print('hello')")
config = {"patterns": ["*.py"]}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
py_findings = [f for f in result.findings if "script.py" in f.file_path]
assert len(py_findings) > 0
assert "python" in py_findings[0].metadata["file_type"]
async def test_detect_javascript_type(self, file_scanner, temp_workspace):
"""Test detection of JavaScript file type"""
(temp_workspace / "app.js").write_text("console.log('hello')")
config = {"patterns": ["*.js"]}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
js_findings = [f for f in result.findings if "app.js" in f.file_path]
assert len(js_findings) > 0
assert "javascript" in js_findings[0].metadata["file_type"]
async def test_file_type_summary(self, file_scanner, temp_workspace):
"""Test that file type summary is generated"""
(temp_workspace / "script.py").write_text("print('hello')")
(temp_workspace / "app.js").write_text("console.log('hello')")
(temp_workspace / "readme.txt").write_text("Documentation")
config = {"patterns": ["*"]}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
assert "file_types" in result.summary
assert len(result.summary["file_types"]) > 0
@pytest.mark.asyncio
class TestFileScannerSizeLimits:
"""Test file size handling"""
async def test_skip_large_files(self, file_scanner, temp_workspace):
"""Test that large files are skipped"""
# Create a "large" file
large_file = temp_workspace / "large.txt"
large_file.write_text("x" * 1000)
config = {
"patterns": ["*.txt"],
"max_file_size": 500 # Set limit smaller than file
}
result = await file_scanner.execute(config, temp_workspace)
# Should succeed but skip the large file
assert result.status == "success"
# The file should still be counted but not have a detailed finding
assert result.summary["total_files"] > 0
async def test_process_small_files(self, file_scanner, temp_workspace):
"""Test that small files are processed"""
small_file = temp_workspace / "small.txt"
small_file.write_text("small content")
config = {
"patterns": ["*.txt"],
"max_file_size": 1048576 # 1MB
}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
txt_findings = [f for f in result.findings if "small.txt" in f.file_path]
assert len(txt_findings) > 0
@pytest.mark.asyncio
class TestFileScannerSummary:
"""Test result summary generation"""
async def test_summary_structure(self, file_scanner, python_test_workspace):
"""Test that summary has correct structure"""
config = {"patterns": ["*"]}
result = await file_scanner.execute(config, python_test_workspace)
assert result.status == "success"
assert "total_files" in result.summary
assert "total_size_bytes" in result.summary
assert "file_types" in result.summary
assert "patterns_scanned" in result.summary
assert isinstance(result.summary["total_files"], int)
assert isinstance(result.summary["total_size_bytes"], int)
assert isinstance(result.summary["file_types"], dict)
assert isinstance(result.summary["patterns_scanned"], list)
async def test_summary_counts(self, file_scanner, temp_workspace):
"""Test that summary counts are accurate"""
# Create known files
(temp_workspace / "file1.py").write_text("content1")
(temp_workspace / "file2.py").write_text("content2")
(temp_workspace / "file3.txt").write_text("content3")
config = {"patterns": ["*"]}
result = await file_scanner.execute(config, temp_workspace)
assert result.status == "success"
assert result.summary["total_files"] == 3
assert result.summary["total_size_bytes"] > 0
@@ -0,0 +1,493 @@
"""
Unit tests for SecurityAnalyzer module
"""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3] / "toolbox"))
from modules.analyzer.security_analyzer import SecurityAnalyzer
@pytest.fixture
def security_analyzer():
"""Create SecurityAnalyzer instance"""
return SecurityAnalyzer()
@pytest.mark.asyncio
class TestSecurityAnalyzerMetadata:
"""Test SecurityAnalyzer metadata"""
async def test_metadata_structure(self, security_analyzer):
"""Test that metadata has correct structure"""
metadata = security_analyzer.get_metadata()
assert metadata.name == "security_analyzer"
assert metadata.version == "1.0.0"
assert metadata.category == "analyzer"
assert "security" in metadata.tags
assert "vulnerabilities" in metadata.tags
assert metadata.requires_workspace is True
@pytest.mark.asyncio
class TestSecurityAnalyzerConfigValidation:
"""Test configuration validation"""
async def test_valid_config(self, security_analyzer):
"""Test that valid config passes validation"""
config = {
"file_extensions": [".py", ".js"],
"check_secrets": True,
"check_sql": True,
"check_dangerous_functions": True
}
assert security_analyzer.validate_config(config) is True
async def test_default_config(self, security_analyzer):
"""Test that empty config uses defaults"""
config = {}
assert security_analyzer.validate_config(config) is True
async def test_invalid_extensions_type(self, security_analyzer):
"""Test that non-list extensions raises error"""
config = {"file_extensions": ".py"}
with pytest.raises(ValueError, match="file_extensions must be a list"):
security_analyzer.validate_config(config)
@pytest.mark.asyncio
class TestSecurityAnalyzerSecretDetection:
"""Test hardcoded secret detection"""
async def test_detect_api_key(self, security_analyzer, temp_workspace):
"""Test detection of hardcoded API key"""
code_file = temp_workspace / "config.py"
code_file.write_text("""
# Configuration file
api_key = "apikey_live_abcdefghijklmnopqrstuvwxyzabcdefghijk"
database_url = "postgresql://localhost/db"
""")
config = {
"file_extensions": [".py"],
"check_secrets": True,
"check_sql": False,
"check_dangerous_functions": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
secret_findings = [f for f in result.findings if f.category == "hardcoded_secret"]
assert len(secret_findings) > 0
assert any("API Key" in f.title for f in secret_findings)
async def test_detect_password(self, security_analyzer, temp_workspace):
"""Test detection of hardcoded password"""
code_file = temp_workspace / "auth.py"
code_file.write_text("""
def connect():
password = "mySecretP@ssw0rd"
return connect_db(password)
""")
config = {
"file_extensions": [".py"],
"check_secrets": True,
"check_sql": False,
"check_dangerous_functions": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
secret_findings = [f for f in result.findings if f.category == "hardcoded_secret"]
assert len(secret_findings) > 0
async def test_detect_aws_credentials(self, security_analyzer, temp_workspace):
"""Test detection of AWS credentials"""
code_file = temp_workspace / "aws_config.py"
code_file.write_text("""
aws_access_key = "AKIAIOSFODNN7REALKEY"
aws_secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYREALKEY"
""")
config = {
"file_extensions": [".py"],
"check_secrets": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
aws_findings = [f for f in result.findings if "AWS" in f.title]
assert len(aws_findings) >= 2 # Both access key and secret key
async def test_no_secret_detection_when_disabled(self, security_analyzer, temp_workspace):
"""Test that secret detection can be disabled"""
code_file = temp_workspace / "config.py"
code_file.write_text('api_key = "sk_live_1234567890abcdef"')
config = {
"file_extensions": [".py"],
"check_secrets": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
secret_findings = [f for f in result.findings if f.category == "hardcoded_secret"]
assert len(secret_findings) == 0
@pytest.mark.asyncio
class TestSecurityAnalyzerSQLInjection:
"""Test SQL injection detection"""
async def test_detect_string_concatenation(self, security_analyzer, temp_workspace):
"""Test detection of SQL string concatenation"""
code_file = temp_workspace / "db.py"
code_file.write_text("""
def get_user(user_id):
query = "SELECT * FROM users WHERE id = " + user_id
return execute(query)
""")
config = {
"file_extensions": [".py"],
"check_secrets": False,
"check_sql": True,
"check_dangerous_functions": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
sql_findings = [f for f in result.findings if f.category == "sql_injection"]
assert len(sql_findings) > 0
async def test_detect_f_string_sql(self, security_analyzer, temp_workspace):
"""Test detection of f-string in SQL"""
code_file = temp_workspace / "db.py"
code_file.write_text("""
def get_user(name):
query = f"SELECT * FROM users WHERE name = '{name}'"
return execute(query)
""")
config = {
"file_extensions": [".py"],
"check_sql": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
sql_findings = [f for f in result.findings if f.category == "sql_injection"]
assert len(sql_findings) > 0
async def test_detect_dynamic_query_building(self, security_analyzer, temp_workspace):
"""Test detection of dynamic query building"""
code_file = temp_workspace / "queries.py"
code_file.write_text("""
def search(keyword):
query = "SELECT * FROM products WHERE name LIKE " + keyword
execute(query + " ORDER BY price")
""")
config = {
"file_extensions": [".py"],
"check_sql": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
sql_findings = [f for f in result.findings if f.category == "sql_injection"]
assert len(sql_findings) > 0
async def test_no_sql_detection_when_disabled(self, security_analyzer, temp_workspace):
"""Test that SQL detection can be disabled"""
code_file = temp_workspace / "db.py"
code_file.write_text('query = "SELECT * FROM users WHERE id = " + user_id')
config = {
"file_extensions": [".py"],
"check_sql": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
sql_findings = [f for f in result.findings if f.category == "sql_injection"]
assert len(sql_findings) == 0
@pytest.mark.asyncio
class TestSecurityAnalyzerDangerousFunctions:
"""Test dangerous function detection"""
async def test_detect_eval(self, security_analyzer, temp_workspace):
"""Test detection of eval() usage"""
code_file = temp_workspace / "dangerous.py"
code_file.write_text("""
def process_input(user_input):
result = eval(user_input)
return result
""")
config = {
"file_extensions": [".py"],
"check_secrets": False,
"check_sql": False,
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
assert any("eval" in f.title.lower() for f in dangerous_findings)
async def test_detect_exec(self, security_analyzer, temp_workspace):
"""Test detection of exec() usage"""
code_file = temp_workspace / "runner.py"
code_file.write_text("""
def run_code(code):
exec(code)
""")
config = {
"file_extensions": [".py"],
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
async def test_detect_os_system(self, security_analyzer, temp_workspace):
"""Test detection of os.system() usage"""
code_file = temp_workspace / "commands.py"
code_file.write_text("""
import os
def run_command(cmd):
os.system(cmd)
""")
config = {
"file_extensions": [".py"],
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
assert any("os.system" in f.title for f in dangerous_findings)
async def test_detect_pickle_loads(self, security_analyzer, temp_workspace):
"""Test detection of pickle.loads() usage"""
code_file = temp_workspace / "serializer.py"
code_file.write_text("""
import pickle
def deserialize(data):
return pickle.loads(data)
""")
config = {
"file_extensions": [".py"],
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
async def test_detect_javascript_eval(self, security_analyzer, temp_workspace):
"""Test detection of eval() in JavaScript"""
code_file = temp_workspace / "app.js"
code_file.write_text("""
function processInput(userInput) {
return eval(userInput);
}
""")
config = {
"file_extensions": [".js"],
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
async def test_detect_innerHTML(self, security_analyzer, temp_workspace):
"""Test detection of innerHTML (XSS risk)"""
code_file = temp_workspace / "dom.js"
code_file.write_text("""
function updateContent(html) {
document.getElementById("content").innerHTML = html;
}
""")
config = {
"file_extensions": [".js"],
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) > 0
async def test_no_dangerous_detection_when_disabled(self, security_analyzer, temp_workspace):
"""Test that dangerous function detection can be disabled"""
code_file = temp_workspace / "code.py"
code_file.write_text('result = eval(user_input)')
config = {
"file_extensions": [".py"],
"check_dangerous_functions": False
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(dangerous_findings) == 0
@pytest.mark.asyncio
class TestSecurityAnalyzerMultipleIssues:
"""Test detection of multiple issues in same file"""
async def test_detect_multiple_vulnerabilities(self, security_analyzer, temp_workspace):
"""Test detection of multiple vulnerability types"""
code_file = temp_workspace / "vulnerable.py"
code_file.write_text("""
import os
# Hardcoded credentials
api_key = "apikey_live_abcdefghijklmnopqrstuvwxyzabcdef"
password = "MySecureP@ssw0rd"
def process_query(user_input):
# SQL injection
query = "SELECT * FROM users WHERE name = " + user_input
# Dangerous function
result = eval(user_input)
# Command injection
os.system(user_input)
return result
""")
config = {
"file_extensions": [".py"],
"check_secrets": True,
"check_sql": True,
"check_dangerous_functions": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
# Should find multiple types of issues
secret_findings = [f for f in result.findings if f.category == "hardcoded_secret"]
sql_findings = [f for f in result.findings if f.category == "sql_injection"]
dangerous_findings = [f for f in result.findings if f.category == "dangerous_function"]
assert len(secret_findings) > 0
assert len(sql_findings) > 0
assert len(dangerous_findings) > 0
@pytest.mark.asyncio
class TestSecurityAnalyzerSummary:
"""Test result summary generation"""
async def test_summary_structure(self, security_analyzer, temp_workspace):
"""Test that summary has correct structure"""
(temp_workspace / "test.py").write_text("print('hello')")
config = {"file_extensions": [".py"]}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
assert "files_analyzed" in result.summary
assert "total_findings" in result.summary
assert "extensions_scanned" in result.summary
assert isinstance(result.summary["files_analyzed"], int)
assert isinstance(result.summary["total_findings"], int)
assert isinstance(result.summary["extensions_scanned"], list)
async def test_empty_workspace(self, security_analyzer, temp_workspace):
"""Test analyzing empty workspace"""
config = {"file_extensions": [".py"]}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "partial" # No files found
assert result.summary["files_analyzed"] == 0
async def test_analyze_multiple_file_types(self, security_analyzer, temp_workspace):
"""Test analyzing multiple file types"""
(temp_workspace / "app.py").write_text("print('hello')")
(temp_workspace / "script.js").write_text("console.log('hello')")
(temp_workspace / "index.php").write_text("<?php echo 'hello'; ?>")
config = {"file_extensions": [".py", ".js", ".php"]}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
assert result.summary["files_analyzed"] == 3
@pytest.mark.asyncio
class TestSecurityAnalyzerFalsePositives:
"""Test false positive filtering"""
async def test_skip_test_secrets(self, security_analyzer, temp_workspace):
"""Test that test/example secrets are filtered"""
code_file = temp_workspace / "test_config.py"
code_file.write_text("""
# Test configuration - should be filtered
api_key = "test_key_example"
password = "dummy_password_123"
token = "sample_token_placeholder"
""")
config = {
"file_extensions": [".py"],
"check_secrets": True
}
result = await security_analyzer.execute(config, temp_workspace)
assert result.status == "success"
# These should be filtered as false positives
secret_findings = [f for f in result.findings if f.category == "hardcoded_secret"]
# Should have fewer or no findings due to false positive filtering
assert len(secret_findings) == 0 or all(
not any(fp in f.description.lower() for fp in ['test', 'example', 'dummy', 'sample'])
for f in secret_findings
)