Files
fuzzforge_ai/workers/ossfuzz/activities.py
tduhamel42 9488ccf8f1 fix: Resolve critical bugs - file handle leaks and IndexError issues
Fixed multiple critical bugs identified during comprehensive code audit:

**Critical Fixes:**
- Fix file handle leaks in SDK client upload methods (sync and async)
  - Use context managers to ensure file handles are properly closed
  - Affects: sdk/src/fuzzforge_sdk/client.py lines 397, 484

**High Priority Fixes:**
- Fix IndexError in OSS-Fuzz stats parsing when accessing array elements
  - Add bounds checking before accessing parts[i+1]
  - Affects: workers/ossfuzz/activities.py lines 372-376

- Fix IndexError in exception handling URL parsing
  - Add empty string validation before splitting URL segments
  - Prevents crash when parsing malformed URLs
  - Affects: sdk/src/fuzzforge_sdk/exceptions.py lines 419-426

**Medium Priority Fixes:**
- Fix IndexError in Android workflow SARIF report parsing
  - Check if runs list is empty before accessing first element
  - Affects: backend/toolbox/workflows/android_static_analysis/workflow.py line 270

All fixes follow defensive programming practices with proper bounds checking
and resource management using context managers.
2025-11-14 10:51:38 +01:00

414 lines
14 KiB
Python

"""
OSS-Fuzz Campaign Activities
Activities for running OSS-Fuzz campaigns using Google's infrastructure.
"""
import logging
import os
import subprocess
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import yaml
from temporalio import activity
logger = logging.getLogger(__name__)
# Paths
OSS_FUZZ_REPO = Path("/opt/oss-fuzz")
CACHE_DIR = Path(os.getenv("CACHE_DIR", "/cache"))
@activity.defn(name="load_ossfuzz_project")
async def load_ossfuzz_project_activity(project_name: str) -> Dict[str, Any]:
"""
Load OSS-Fuzz project configuration from project.yaml.
Args:
project_name: Name of the OSS-Fuzz project (e.g., "curl", "sqlite3")
Returns:
Dictionary with project config, paths, and metadata
"""
logger.info(f"Loading OSS-Fuzz project: {project_name}")
# Update OSS-Fuzz repo if it exists, clone if not
if OSS_FUZZ_REPO.exists():
logger.info("Updating OSS-Fuzz repository...")
subprocess.run(
["git", "-C", str(OSS_FUZZ_REPO), "pull", "--depth=1"],
check=False # Don't fail if already up to date
)
else:
logger.info("Cloning OSS-Fuzz repository...")
subprocess.run(
[
"git", "clone", "--depth=1",
"https://github.com/google/oss-fuzz.git",
str(OSS_FUZZ_REPO)
],
check=True
)
# Find project directory
project_path = OSS_FUZZ_REPO / "projects" / project_name
if not project_path.exists():
raise ValueError(
f"Project '{project_name}' not found in OSS-Fuzz. "
f"Available projects: https://github.com/google/oss-fuzz/tree/master/projects"
)
# Read project.yaml
config_file = project_path / "project.yaml"
if not config_file.exists():
raise ValueError(f"No project.yaml found for project '{project_name}'")
with open(config_file) as f:
config = yaml.safe_load(f)
# Add paths
config["project_name"] = project_name
config["project_path"] = str(project_path)
config["dockerfile_path"] = str(project_path / "Dockerfile")
config["build_script_path"] = str(project_path / "build.sh")
# Validate required fields
if not config.get("language"):
logger.warning(f"No language specified in project.yaml for {project_name}")
logger.info(
f"✓ Loaded project {project_name}: "
f"language={config.get('language', 'unknown')}, "
f"engines={config.get('fuzzing_engines', [])}, "
f"sanitizers={config.get('sanitizers', [])}"
)
return config
@activity.defn(name="build_ossfuzz_project")
async def build_ossfuzz_project_activity(
project_name: str,
project_config: Dict[str, Any],
sanitizer: Optional[str] = None,
engine: Optional[str] = None
) -> Dict[str, Any]:
"""
Build OSS-Fuzz project directly using build.sh (no Docker-in-Docker).
Args:
project_name: Name of the project
project_config: Configuration from project.yaml
sanitizer: Override sanitizer (default: first from project.yaml)
engine: Override engine (default: first from project.yaml)
Returns:
Dictionary with build results and discovered fuzz targets
"""
logger.info(f"Building OSS-Fuzz project: {project_name}")
# Determine sanitizer and engine
sanitizers = project_config.get("sanitizers", ["address"])
engines = project_config.get("fuzzing_engines", ["libfuzzer"])
use_sanitizer = sanitizer if sanitizer else sanitizers[0]
use_engine = engine if engine else engines[0]
logger.info(f"Building with sanitizer={use_sanitizer}, engine={use_engine}")
# Setup directories
src_dir = Path("/src")
out_dir = Path("/out")
src_dir.mkdir(exist_ok=True)
out_dir.mkdir(exist_ok=True)
# Clean previous build artifacts
for item in out_dir.glob("*"):
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
# Copy project files from OSS-Fuzz repo to /src
project_path = Path(project_config["project_path"])
build_script = project_path / "build.sh"
if not build_script.exists():
raise Exception(f"build.sh not found for project {project_name}")
logger.info(f"Copying project files from {project_path} to {src_dir}")
# Copy build.sh
shutil.copy2(build_script, src_dir / "build.sh")
os.chmod(src_dir / "build.sh", 0o755)
# Copy any fuzzer source files (*.cc, *.c, *.cpp files)
for pattern in ["*.cc", "*.c", "*.cpp", "*.h", "*.hh", "*.hpp"]:
for src_file in project_path.glob(pattern):
dest_file = src_dir / src_file.name
shutil.copy2(src_file, dest_file)
logger.info(f"Copied: {src_file.name}")
# Clone project source code to subdirectory
main_repo = project_config.get("main_repo")
work_dir = src_dir
if main_repo:
logger.info(f"Cloning project source from {main_repo}")
project_src_dir = src_dir / project_name
# Remove existing directory if present
if project_src_dir.exists():
shutil.rmtree(project_src_dir)
clone_cmd = ["git", "clone", "--depth=1", main_repo, str(project_src_dir)]
result = subprocess.run(clone_cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
logger.warning(f"Failed to clone {main_repo}: {result.stderr}")
logger.info("Continuing without cloning (build.sh may download source)")
else:
# Copy build.sh into the project source directory
shutil.copy2(src_dir / "build.sh", project_src_dir / "build.sh")
os.chmod(project_src_dir / "build.sh", 0o755)
# build.sh should run from within the project directory
work_dir = project_src_dir
logger.info(f"Build will run from: {work_dir}")
else:
logger.info("No main_repo in project.yaml, build.sh will download source")
# Set OSS-Fuzz environment variables
build_env = os.environ.copy()
build_env.update({
"SRC": str(src_dir),
"OUT": str(out_dir),
"FUZZING_ENGINE": use_engine,
"SANITIZER": use_sanitizer,
"ARCHITECTURE": "x86_64",
# Use clang's built-in libfuzzer instead of separate library
"LIB_FUZZING_ENGINE": "-fsanitize=fuzzer",
})
# Set sanitizer flags
if use_sanitizer == "address":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=address"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=address"
elif use_sanitizer == "memory":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=memory"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=memory"
elif use_sanitizer == "undefined":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=undefined"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=undefined"
# Execute build.sh from the work directory
logger.info(f"Executing build.sh in {work_dir}")
build_cmd = ["bash", "./build.sh"]
result = subprocess.run(
build_cmd,
cwd=str(work_dir),
env=build_env,
capture_output=True,
text=True,
timeout=1800 # 30 minutes max build time
)
if result.returncode != 0:
logger.error(f"Build failed:\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}")
raise Exception(f"Build failed for {project_name}: {result.stderr}")
logger.info("✓ Build completed successfully")
logger.info(f"Build output:\n{result.stdout[-2000:]}") # Last 2000 chars
# Discover fuzz targets in /out
fuzz_targets = []
for file in out_dir.glob("*"):
if file.is_file() and os.access(file, os.X_OK):
# Check if it's a fuzz target (executable, not .so/.a/.o)
if file.suffix not in ['.so', '.a', '.o', '.zip']:
fuzz_targets.append(str(file))
logger.info(f"Found fuzz target: {file.name}")
if not fuzz_targets:
logger.warning(f"No fuzz targets found in {out_dir}")
logger.info(f"Directory contents: {list(out_dir.glob('*'))}")
return {
"fuzz_targets": fuzz_targets,
"build_log": result.stdout[-5000:], # Last 5000 chars
"sanitizer_used": use_sanitizer,
"engine_used": use_engine,
"out_dir": str(out_dir)
}
@activity.defn(name="fuzz_target")
async def fuzz_target_activity(
target_path: str,
engine: str,
duration_seconds: int,
corpus_dir: Optional[str] = None,
dict_file: Optional[str] = None
) -> Dict[str, Any]:
"""
Run fuzzing on a target with specified engine.
Args:
target_path: Path to fuzz target executable
engine: Fuzzing engine (libfuzzer, afl, honggfuzz)
duration_seconds: How long to fuzz
corpus_dir: Optional corpus directory
dict_file: Optional dictionary file
Returns:
Dictionary with fuzzing stats and results
"""
logger.info(f"Fuzzing {Path(target_path).name} with {engine} for {duration_seconds}s")
# Prepare corpus directory
if not corpus_dir:
corpus_dir = str(CACHE_DIR / "corpus" / Path(target_path).stem)
Path(corpus_dir).mkdir(parents=True, exist_ok=True)
output_dir = CACHE_DIR / "output" / Path(target_path).stem
output_dir.mkdir(parents=True, exist_ok=True)
start_time = datetime.now()
try:
if engine == "libfuzzer":
cmd = [
target_path,
corpus_dir,
f"-max_total_time={duration_seconds}",
"-print_final_stats=1",
f"-artifact_prefix={output_dir}/"
]
if dict_file:
cmd.append(f"-dict={dict_file}")
elif engine == "afl":
cmd = [
"afl-fuzz",
"-i", corpus_dir if Path(corpus_dir).glob("*") else "-", # Empty corpus OK
"-o", str(output_dir),
"-t", "1000", # Timeout per execution
"-m", "none", # No memory limit
"--", target_path, "@@"
]
elif engine == "honggfuzz":
cmd = [
"honggfuzz",
f"--run_time={duration_seconds}",
"-i", corpus_dir,
"-o", str(output_dir),
"--", target_path
]
else:
raise ValueError(f"Unsupported fuzzing engine: {engine}")
logger.info(f"Starting fuzzer: {' '.join(cmd[:5])}...")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=duration_seconds + 120 # Add 2 minute buffer
)
end_time = datetime.now()
elapsed = (end_time - start_time).total_seconds()
# Parse stats from output
stats = parse_fuzzing_stats(result.stdout, result.stderr, engine)
stats["elapsed_time"] = elapsed
stats["target_name"] = Path(target_path).name
stats["engine"] = engine
# Find crashes
crashes = find_crashes(output_dir)
stats["crashes"] = len(crashes)
stats["crash_files"] = crashes
# Collect new corpus files
new_corpus = collect_corpus(corpus_dir)
stats["corpus_size"] = len(new_corpus)
stats["corpus_files"] = new_corpus
logger.info(
f"✓ Fuzzing completed: {stats.get('total_executions', 0)} execs, "
f"{len(crashes)} crashes"
)
return stats
except subprocess.TimeoutExpired:
logger.warning(f"Fuzzing timed out after {duration_seconds}s")
return {
"target_name": Path(target_path).name,
"engine": engine,
"status": "timeout",
"elapsed_time": duration_seconds
}
def parse_fuzzing_stats(stdout: str, stderr: str, engine: str) -> Dict[str, Any]:
"""Parse fuzzing statistics from output"""
stats = {}
if engine == "libfuzzer":
# Parse libFuzzer stats
for line in (stdout + stderr).split('\n'):
if "#" in line and "NEW" in line:
# Example: #8192 NEW cov: 1234 ft: 5678 corp: 89/10KB
parts = line.split()
for i, part in enumerate(parts):
if part.startswith("cov:") and i+1 < len(parts):
stats["coverage"] = int(parts[i+1])
elif part.startswith("corp:") and i+1 < len(parts):
stats["corpus_entries"] = int(parts[i+1].split('/')[0])
elif part.startswith("exec/s:") and i+1 < len(parts):
stats["executions_per_sec"] = float(parts[i+1])
elif part.startswith("#"):
stats["total_executions"] = int(part[1:])
elif engine == "afl":
# Parse AFL stats (would need to read fuzzer_stats file)
pass
elif engine == "honggfuzz":
# Parse Honggfuzz stats
pass
return stats
def find_crashes(output_dir: Path) -> List[str]:
"""Find crash files in output directory"""
crashes = []
# libFuzzer crash files start with "crash-" or "leak-"
for pattern in ["crash-*", "leak-*", "timeout-*"]:
crashes.extend([str(f) for f in output_dir.glob(pattern)])
# AFL crashes in crashes/ subdirectory
crashes_dir = output_dir / "crashes"
if crashes_dir.exists():
crashes.extend([str(f) for f in crashes_dir.glob("*") if f.is_file()])
return crashes
def collect_corpus(corpus_dir: str) -> List[str]:
"""Collect corpus files"""
corpus_path = Path(corpus_dir)
if not corpus_path.exists():
return []
return [str(f) for f in corpus_path.glob("*") if f.is_file()]