refactor(modules-sdk): write all events to stdout as JSONL

This commit is contained in:
AFredefon
2026-02-23 02:21:12 +01:00
parent c6e9557541
commit 04c8383739
5 changed files with 195 additions and 94 deletions

View File

@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults, FuzzForgeModuleStatus
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult
@@ -79,19 +79,19 @@ class Module(FuzzForgeModule):
logger.info("cargo-fuzzer starting", resource_count=len(resources))
# Emit initial progress
self.emit_progress(0, status="initializing", message="Setting up fuzzing environment")
self.emit_progress(0, status=FuzzForgeModuleStatus.INITIALIZING, message="Setting up fuzzing environment")
self.emit_event("module_started", resource_count=len(resources))
# Setup the fuzzing environment
if not self._setup_environment(resources):
self.emit_progress(100, status="failed", message="Failed to setup environment")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Failed to setup environment")
return FuzzForgeModuleResults.FAILURE
# Get list of fuzz targets
targets = self._get_fuzz_targets()
if not targets:
logger.error("no fuzz targets found")
self.emit_progress(100, status="failed", message="No fuzz targets found")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="No fuzz targets found")
return FuzzForgeModuleResults.FAILURE
# Filter targets if specific ones were requested
@@ -100,7 +100,7 @@ class Module(FuzzForgeModule):
targets = [t for t in targets if t in requested]
if not targets:
logger.error("none of the requested targets found", requested=list(requested))
self.emit_progress(100, status="failed", message="Requested targets not found")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Requested targets not found")
return FuzzForgeModuleResults.FAILURE
logger.info("found fuzz targets", targets=targets)
@@ -137,7 +137,7 @@ class Module(FuzzForgeModule):
progress = int((i / len(targets)) * 100) if not is_continuous else 50
self.emit_progress(
progress,
status="running",
status=FuzzForgeModuleStatus.RUNNING,
message=progress_msg,
current_task=target,
metrics={
@@ -177,7 +177,7 @@ class Module(FuzzForgeModule):
# Emit final progress
self.emit_progress(
100,
status="completed",
status=FuzzForgeModuleStatus.COMPLETED,
message=f"Fuzzing completed. Found {total_crashes} crashes.",
metrics={
"targets_fuzzed": len(self._target_results),

View File

@@ -1,13 +1,9 @@
from pathlib import Path
PATH_TO_DATA: Path = Path("/data")
PATH_TO_DATA: Path = Path("/fuzzforge")
PATH_TO_INPUTS: Path = PATH_TO_DATA.joinpath("input")
PATH_TO_INPUT: Path = PATH_TO_INPUTS.joinpath("input.json")
PATH_TO_OUTPUTS: Path = PATH_TO_DATA.joinpath("output")
PATH_TO_ARTIFACTS: Path = PATH_TO_OUTPUTS.joinpath("artifacts")
PATH_TO_RESULTS: Path = PATH_TO_OUTPUTS.joinpath("results.json")
PATH_TO_LOGS: Path = PATH_TO_OUTPUTS.joinpath("logs.jsonl")
# Streaming output paths for real-time progress
PATH_TO_PROGRESS: Path = PATH_TO_OUTPUTS.joinpath("progress.json")
PATH_TO_STREAM: Path = PATH_TO_OUTPUTS.joinpath("stream.jsonl")

View File

@@ -0,0 +1,90 @@
"""FuzzForge modules SDK models.
This module provides backward-compatible exports for all model types.
For Core SDK compatibility, use imports from `fuzzforge_modules_sdk.api.models.mod`.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from pydantic import ConfigDict
# Re-export from mod.py for Core SDK compatibility
from fuzzforge_modules_sdk.api.models.mod import (
Base,
FuzzForgeModuleInputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResources,
FuzzForgeModulesSettingsBase,
FuzzForgeModulesSettingsType,
)
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""Module execution result enumeration."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleStatus(StrEnum):
"""Possible statuses emitted by a running module."""
#: Module is setting up its environment.
INITIALIZING = "initializing"
#: Module is actively running.
RUNNING = "running"
#: Module finished successfully.
COMPLETED = "completed"
#: Module encountered an error.
FAILED = "failed"
#: Module was stopped by the orchestrator (SIGTERM).
STOPPED = "stopped"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifact]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults
__all__ = [
# Core SDK compatible exports
"Base",
"FuzzForgeModuleInputBase",
"FuzzForgeModuleResource",
"FuzzForgeModuleResources",
"FuzzForgeModulesSettingsBase",
"FuzzForgeModulesSettingsType",
# OSS-specific exports (also used in OSS modules)
"FuzzForgeModuleArtifact",
"FuzzForgeModuleArtifacts",
"FuzzForgeModuleOutputBase",
"FuzzForgeModuleResults",
"FuzzForgeModuleStatus",
]

View File

@@ -1,3 +1,9 @@
"""Core module models for FuzzForge modules SDK.
This module contains the base classes for module settings, inputs, and resources.
These are compatible with the fuzzforge-core SDK structure.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from typing import TypeVar
@@ -6,27 +12,27 @@ from pydantic import BaseModel, ConfigDict
class Base(BaseModel):
"""TODO."""
"""Base model for all FuzzForge module types."""
model_config = ConfigDict(extra="forbid")
class FuzzForgeModulesSettingsBase(Base):
"""TODO."""
"""Base class for module settings."""
FuzzForgeModulesSettingsType = TypeVar("FuzzForgeModulesSettingsType", bound=FuzzForgeModulesSettingsBase)
class FuzzForgeModuleResources(StrEnum):
"""Enumeration of artifact types."""
"""Enumeration of resource types."""
#: The type of the resource is unknown or irrelevant.
UNKNOWN = "unknown"
class FuzzForgeModuleResource(Base):
"""TODO."""
"""A resource provided to a module as input."""
#: The description of the resource.
description: str
@@ -45,41 +51,3 @@ class FuzzForgeModuleInputBase[FuzzForgeModulesSettingsType: FuzzForgeModulesSet
resources: list[FuzzForgeModuleResource]
#: The settings of the module.
settings: FuzzForgeModulesSettingsType
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""TODO."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifacts]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults

View File

@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod
import json
import signal
import threading
import time
from datetime import datetime, timezone
from shutil import rmtree
@@ -11,9 +13,7 @@ from fuzzforge_modules_sdk.api.constants import (
PATH_TO_ARTIFACTS,
PATH_TO_INPUT,
PATH_TO_LOGS,
PATH_TO_PROGRESS,
PATH_TO_RESULTS,
PATH_TO_STREAM,
)
from fuzzforge_modules_sdk.api.exceptions import FuzzForgeModuleError
from fuzzforge_modules_sdk.api.models import (
@@ -23,6 +23,7 @@ from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleOutputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResults,
FuzzForgeModuleStatus,
FuzzForgeModulesSettingsType,
)
@@ -52,6 +53,11 @@ class FuzzForgeModule(ABC):
#: Custom output data set by the module.
__output_data: dict[str, Any]
#: Event set when stop is requested (SIGTERM received).
#: Using :class:`threading.Event` so multi-threaded modules can
#: efficiently wait on it via :pymethod:`threading.Event.wait`.
__stop_requested: threading.Event
def __init__(self, name: str, version: str) -> None:
"""Initialize an instance of the class.
@@ -65,10 +71,10 @@ class FuzzForgeModule(ABC):
self.__version = version
self.__start_time = time.time()
self.__output_data = {}
# Initialize streaming output files
PATH_TO_PROGRESS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_STREAM.parent.mkdir(exist_ok=True, parents=True)
self.__stop_requested = threading.Event()
# Register SIGTERM handler for graceful shutdown
signal.signal(signal.SIGTERM, self._handle_sigterm)
@final
def get_logger(self) -> BoundLogger:
@@ -86,6 +92,58 @@ class FuzzForgeModule(ABC):
return self.__version
@final
def is_stop_requested(self) -> bool:
"""Check if stop was requested (SIGTERM received).
Long-running modules should check this periodically and exit gracefully
when True. Results will be written automatically on SIGTERM.
The underlying :class:`threading.Event` can be obtained via
:meth:`stop_event` for modules that need to *wait* on it.
:returns: True if SIGTERM was received.
"""
return self.__stop_requested.is_set()
@final
def stop_event(self) -> threading.Event:
"""Return the stop :class:`threading.Event`.
Multi-threaded modules can use ``self.stop_event().wait(timeout)``
instead of polling :meth:`is_stop_requested` in a busy-loop.
:returns: The threading event that is set on SIGTERM.
"""
return self.__stop_requested
@final
def _handle_sigterm(self, signum: int, frame: Any) -> None:
"""Handle SIGTERM signal for graceful shutdown.
Sets the stop event and emits a final progress update, then returns.
The normal :meth:`main` lifecycle (run → cleanup → write results) will
complete as usual once :meth:`_run` observes :meth:`is_stop_requested`
and returns, giving the module a chance to do any last-minute work
before the process exits.
:param signum: Signal number.
:param frame: Current stack frame.
"""
self.__stop_requested.set()
self.get_logger().info("received SIGTERM, stopping after current operation")
# Emit final progress update
self.emit_progress(
progress=100,
status=FuzzForgeModuleStatus.STOPPED,
message="Module stopped by orchestrator (SIGTERM)",
)
@final
def set_output(self, **kwargs: Any) -> None:
"""Set custom output data to be included in results.json.
@@ -107,63 +165,53 @@ class FuzzForgeModule(ABC):
def emit_progress(
self,
progress: int,
status: str = "running",
status: FuzzForgeModuleStatus = FuzzForgeModuleStatus.RUNNING,
message: str = "",
metrics: dict[str, Any] | None = None,
current_task: str = "",
) -> None:
"""Emit a progress update to the progress file.
"""Emit a structured progress event to stdout (JSONL).
This method writes to /data/output/progress.json which can be polled
by the orchestrator or UI to show real-time progress.
Progress is written as a single JSON line to stdout so that the
orchestrator can capture it via ``kubectl logs`` without requiring
any file-system access inside the container.
:param progress: Progress percentage (0-100).
:param status: Current status ("initializing", "running", "completed", "failed").
:param status: Current module status.
:param message: Human-readable status message.
:param metrics: Dictionary of metrics (e.g., {"executions": 1000, "coverage": 50}).
:param current_task: Name of the current task being performed.
"""
elapsed = time.time() - self.__start_time
progress_data = {
"module": self.__name,
"version": self.__version,
"status": status,
"progress": max(0, min(100, progress)),
"message": message,
"current_task": current_task,
"elapsed_seconds": round(elapsed, 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": metrics or {},
}
PATH_TO_PROGRESS.write_text(json.dumps(progress_data, indent=2))
self.emit_event(
"progress",
status=status.value,
progress=max(0, min(100, progress)),
message=message,
current_task=current_task,
metrics=metrics or {},
)
@final
def emit_event(self, event: str, **data: Any) -> None:
"""Emit a streaming event to the stream file.
"""Emit a structured event to stdout as a single JSONL line.
This method appends to /data/output/stream.jsonl which can be tailed
by the orchestrator or UI for real-time event streaming.
All module events (including progress updates) are written to stdout
so the orchestrator can stream them in real time via ``kubectl logs``.
:param event: Event type (e.g., "crash_found", "target_started", "metrics").
:param event: Event type (e.g., ``"crash_found"``, ``"target_started"``,
``"progress"``, ``"metrics"``).
:param data: Additional event data as keyword arguments.
"""
elapsed = time.time() - self.__start_time
event_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_seconds": round(elapsed, 2),
"elapsed_seconds": round(self.get_elapsed_seconds(), 2),
"module": self.__name,
"event": event,
**data,
}
# Append to stream file (create if doesn't exist)
with PATH_TO_STREAM.open("a") as f:
f.write(json.dumps(event_data) + "\n")
print(json.dumps(event_data), flush=True)
@final
def get_elapsed_seconds(self) -> float:
@@ -208,7 +256,7 @@ class FuzzForgeModule(ABC):
@final
def main(self) -> None:
"""TODO."""
"""Execute the module lifecycle: prepare → run → cleanup → write results."""
result = FuzzForgeModuleResults.SUCCESS
try:
@@ -238,9 +286,8 @@ class FuzzForgeModule(ABC):
result=result,
**self.__output_data,
)
buffer = output.model_dump_json().encode("utf-8")
PATH_TO_RESULTS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_RESULTS.write_bytes(buffer)
PATH_TO_RESULTS.write_bytes(output.model_dump_json().encode("utf-8"))
@classmethod
@abstractmethod