From 04c838373973f8bcd1d112f631826573fda55244 Mon Sep 17 00:00:00 2001 From: AFredefon Date: Mon, 23 Feb 2026 02:21:12 +0100 Subject: [PATCH] refactor(modules-sdk): write all events to stdout as JSONL --- .../cargo-fuzzer/src/module/mod.py | 14 +- .../fuzzforge_modules_sdk/api/constants.py | 6 +- .../api/models/__init__.py | 90 +++++++++++++ .../api/{models.py => models/mod.py} | 52 ++----- .../fuzzforge_modules_sdk/api/modules/base.py | 127 ++++++++++++------ 5 files changed, 195 insertions(+), 94 deletions(-) create mode 100644 fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/__init__.py rename fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/{models.py => models/mod.py} (53%) diff --git a/fuzzforge-modules/cargo-fuzzer/src/module/mod.py b/fuzzforge-modules/cargo-fuzzer/src/module/mod.py index 4000c3b..fecb350 100644 --- a/fuzzforge-modules/cargo-fuzzer/src/module/mod.py +++ b/fuzzforge-modules/cargo-fuzzer/src/module/mod.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING import structlog from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS -from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults +from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults, FuzzForgeModuleStatus from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult @@ -79,19 +79,19 @@ class Module(FuzzForgeModule): logger.info("cargo-fuzzer starting", resource_count=len(resources)) # Emit initial progress - self.emit_progress(0, status="initializing", message="Setting up fuzzing environment") + self.emit_progress(0, status=FuzzForgeModuleStatus.INITIALIZING, message="Setting up fuzzing environment") self.emit_event("module_started", resource_count=len(resources)) # Setup the fuzzing environment if not self._setup_environment(resources): - self.emit_progress(100, status="failed", message="Failed to setup environment") + self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Failed to setup environment") return FuzzForgeModuleResults.FAILURE # Get list of fuzz targets targets = self._get_fuzz_targets() if not targets: logger.error("no fuzz targets found") - self.emit_progress(100, status="failed", message="No fuzz targets found") + self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="No fuzz targets found") return FuzzForgeModuleResults.FAILURE # Filter targets if specific ones were requested @@ -100,7 +100,7 @@ class Module(FuzzForgeModule): targets = [t for t in targets if t in requested] if not targets: logger.error("none of the requested targets found", requested=list(requested)) - self.emit_progress(100, status="failed", message="Requested targets not found") + self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Requested targets not found") return FuzzForgeModuleResults.FAILURE logger.info("found fuzz targets", targets=targets) @@ -137,7 +137,7 @@ class Module(FuzzForgeModule): progress = int((i / len(targets)) * 100) if not is_continuous else 50 self.emit_progress( progress, - status="running", + status=FuzzForgeModuleStatus.RUNNING, message=progress_msg, current_task=target, metrics={ @@ -177,7 +177,7 @@ class Module(FuzzForgeModule): # Emit final progress self.emit_progress( 100, - status="completed", + status=FuzzForgeModuleStatus.COMPLETED, message=f"Fuzzing completed. Found {total_crashes} crashes.", metrics={ "targets_fuzzed": len(self._target_results), diff --git a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/constants.py b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/constants.py index 47f6a35..cbab687 100644 --- a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/constants.py +++ b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/constants.py @@ -1,13 +1,9 @@ from pathlib import Path -PATH_TO_DATA: Path = Path("/data") +PATH_TO_DATA: Path = Path("/fuzzforge") PATH_TO_INPUTS: Path = PATH_TO_DATA.joinpath("input") PATH_TO_INPUT: Path = PATH_TO_INPUTS.joinpath("input.json") PATH_TO_OUTPUTS: Path = PATH_TO_DATA.joinpath("output") PATH_TO_ARTIFACTS: Path = PATH_TO_OUTPUTS.joinpath("artifacts") PATH_TO_RESULTS: Path = PATH_TO_OUTPUTS.joinpath("results.json") PATH_TO_LOGS: Path = PATH_TO_OUTPUTS.joinpath("logs.jsonl") - -# Streaming output paths for real-time progress -PATH_TO_PROGRESS: Path = PATH_TO_OUTPUTS.joinpath("progress.json") -PATH_TO_STREAM: Path = PATH_TO_OUTPUTS.joinpath("stream.jsonl") diff --git a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/__init__.py b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/__init__.py new file mode 100644 index 0000000..c825cbb --- /dev/null +++ b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/__init__.py @@ -0,0 +1,90 @@ +"""FuzzForge modules SDK models. + +This module provides backward-compatible exports for all model types. +For Core SDK compatibility, use imports from `fuzzforge_modules_sdk.api.models.mod`. +""" + +from enum import StrEnum +from pathlib import Path # noqa: TC003 (required by pydantic at runtime) + +from pydantic import ConfigDict + +# Re-export from mod.py for Core SDK compatibility +from fuzzforge_modules_sdk.api.models.mod import ( + Base, + FuzzForgeModuleInputBase, + FuzzForgeModuleResource, + FuzzForgeModuleResources, + FuzzForgeModulesSettingsBase, + FuzzForgeModulesSettingsType, +) + + +class FuzzForgeModuleArtifacts(StrEnum): + """Enumeration of artifact types.""" + + #: The artifact is an asset. + ASSET = "asset" + + +class FuzzForgeModuleArtifact(Base): + """An artifact generated by the module during its run.""" + + #: The description of the artifact. + description: str + #: The type of the artifact. + kind: FuzzForgeModuleArtifacts + #: The name of the artifact. + name: str + #: The path to the artifact on disk. + path: Path + + +class FuzzForgeModuleResults(StrEnum): + """Module execution result enumeration.""" + + SUCCESS = "success" + FAILURE = "failure" + + +class FuzzForgeModuleStatus(StrEnum): + """Possible statuses emitted by a running module.""" + + #: Module is setting up its environment. + INITIALIZING = "initializing" + #: Module is actively running. + RUNNING = "running" + #: Module finished successfully. + COMPLETED = "completed" + #: Module encountered an error. + FAILED = "failed" + #: Module was stopped by the orchestrator (SIGTERM). + STOPPED = "stopped" + + +class FuzzForgeModuleOutputBase(Base): + """The (standardized) output of a FuzzForge module.""" + + #: The collection of artifacts generated by the module during its run. + artifacts: list[FuzzForgeModuleArtifact] + #: The path to the logs. + logs: Path + #: The result of the module's run. + result: FuzzForgeModuleResults + + +__all__ = [ + # Core SDK compatible exports + "Base", + "FuzzForgeModuleInputBase", + "FuzzForgeModuleResource", + "FuzzForgeModuleResources", + "FuzzForgeModulesSettingsBase", + "FuzzForgeModulesSettingsType", + # OSS-specific exports (also used in OSS modules) + "FuzzForgeModuleArtifact", + "FuzzForgeModuleArtifacts", + "FuzzForgeModuleOutputBase", + "FuzzForgeModuleResults", + "FuzzForgeModuleStatus", +] diff --git a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models.py b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/mod.py similarity index 53% rename from fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models.py rename to fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/mod.py index 5bcabb8..69d6a09 100644 --- a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models.py +++ b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/models/mod.py @@ -1,3 +1,9 @@ +"""Core module models for FuzzForge modules SDK. + +This module contains the base classes for module settings, inputs, and resources. +These are compatible with the fuzzforge-core SDK structure. +""" + from enum import StrEnum from pathlib import Path # noqa: TC003 (required by pydantic at runtime) from typing import TypeVar @@ -6,27 +12,27 @@ from pydantic import BaseModel, ConfigDict class Base(BaseModel): - """TODO.""" + """Base model for all FuzzForge module types.""" model_config = ConfigDict(extra="forbid") class FuzzForgeModulesSettingsBase(Base): - """TODO.""" + """Base class for module settings.""" FuzzForgeModulesSettingsType = TypeVar("FuzzForgeModulesSettingsType", bound=FuzzForgeModulesSettingsBase) class FuzzForgeModuleResources(StrEnum): - """Enumeration of artifact types.""" + """Enumeration of resource types.""" #: The type of the resource is unknown or irrelevant. UNKNOWN = "unknown" class FuzzForgeModuleResource(Base): - """TODO.""" + """A resource provided to a module as input.""" #: The description of the resource. description: str @@ -45,41 +51,3 @@ class FuzzForgeModuleInputBase[FuzzForgeModulesSettingsType: FuzzForgeModulesSet resources: list[FuzzForgeModuleResource] #: The settings of the module. settings: FuzzForgeModulesSettingsType - - -class FuzzForgeModuleArtifacts(StrEnum): - """Enumeration of artifact types.""" - - #: The artifact is an asset. - ASSET = "asset" - - -class FuzzForgeModuleArtifact(Base): - """An artifact generated by the module during its run.""" - - #: The description of the artifact. - description: str - #: The type of the artifact. - kind: FuzzForgeModuleArtifacts - #: The name of the artifact. - name: str - #: The path to the artifact on disk. - path: Path - - -class FuzzForgeModuleResults(StrEnum): - """TODO.""" - - SUCCESS = "success" - FAILURE = "failure" - - -class FuzzForgeModuleOutputBase(Base): - """The (standardized) output of a FuzzForge module.""" - - #: The collection of artifacts generated by the module during its run. - artifacts: list[FuzzForgeModuleArtifacts] - #: The path to the logs. - logs: Path - #: The result of the module's run. - result: FuzzForgeModuleResults diff --git a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/modules/base.py b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/modules/base.py index 3cf89f9..90d8fa6 100644 --- a/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/modules/base.py +++ b/fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/modules/base.py @@ -1,5 +1,7 @@ from abc import ABC, abstractmethod import json +import signal +import threading import time from datetime import datetime, timezone from shutil import rmtree @@ -11,9 +13,7 @@ from fuzzforge_modules_sdk.api.constants import ( PATH_TO_ARTIFACTS, PATH_TO_INPUT, PATH_TO_LOGS, - PATH_TO_PROGRESS, PATH_TO_RESULTS, - PATH_TO_STREAM, ) from fuzzforge_modules_sdk.api.exceptions import FuzzForgeModuleError from fuzzforge_modules_sdk.api.models import ( @@ -23,6 +23,7 @@ from fuzzforge_modules_sdk.api.models import ( FuzzForgeModuleOutputBase, FuzzForgeModuleResource, FuzzForgeModuleResults, + FuzzForgeModuleStatus, FuzzForgeModulesSettingsType, ) @@ -52,6 +53,11 @@ class FuzzForgeModule(ABC): #: Custom output data set by the module. __output_data: dict[str, Any] + #: Event set when stop is requested (SIGTERM received). + #: Using :class:`threading.Event` so multi-threaded modules can + #: efficiently wait on it via :pymethod:`threading.Event.wait`. + __stop_requested: threading.Event + def __init__(self, name: str, version: str) -> None: """Initialize an instance of the class. @@ -65,10 +71,10 @@ class FuzzForgeModule(ABC): self.__version = version self.__start_time = time.time() self.__output_data = {} - - # Initialize streaming output files - PATH_TO_PROGRESS.parent.mkdir(exist_ok=True, parents=True) - PATH_TO_STREAM.parent.mkdir(exist_ok=True, parents=True) + self.__stop_requested = threading.Event() + + # Register SIGTERM handler for graceful shutdown + signal.signal(signal.SIGTERM, self._handle_sigterm) @final def get_logger(self) -> BoundLogger: @@ -86,6 +92,58 @@ class FuzzForgeModule(ABC): return self.__version @final + def is_stop_requested(self) -> bool: + """Check if stop was requested (SIGTERM received). + + Long-running modules should check this periodically and exit gracefully + when True. Results will be written automatically on SIGTERM. + + The underlying :class:`threading.Event` can be obtained via + :meth:`stop_event` for modules that need to *wait* on it. + + :returns: True if SIGTERM was received. + + """ + return self.__stop_requested.is_set() + + @final + def stop_event(self) -> threading.Event: + """Return the stop :class:`threading.Event`. + + Multi-threaded modules can use ``self.stop_event().wait(timeout)`` + instead of polling :meth:`is_stop_requested` in a busy-loop. + + :returns: The threading event that is set on SIGTERM. + + """ + return self.__stop_requested + + @final + def _handle_sigterm(self, signum: int, frame: Any) -> None: + """Handle SIGTERM signal for graceful shutdown. + + Sets the stop event and emits a final progress update, then returns. + The normal :meth:`main` lifecycle (run → cleanup → write results) will + complete as usual once :meth:`_run` observes :meth:`is_stop_requested` + and returns, giving the module a chance to do any last-minute work + before the process exits. + + :param signum: Signal number. + :param frame: Current stack frame. + + """ + self.__stop_requested.set() + self.get_logger().info("received SIGTERM, stopping after current operation") + + # Emit final progress update + self.emit_progress( + progress=100, + status=FuzzForgeModuleStatus.STOPPED, + message="Module stopped by orchestrator (SIGTERM)", + ) + + @final + def set_output(self, **kwargs: Any) -> None: """Set custom output data to be included in results.json. @@ -107,63 +165,53 @@ class FuzzForgeModule(ABC): def emit_progress( self, progress: int, - status: str = "running", + status: FuzzForgeModuleStatus = FuzzForgeModuleStatus.RUNNING, message: str = "", metrics: dict[str, Any] | None = None, current_task: str = "", ) -> None: - """Emit a progress update to the progress file. + """Emit a structured progress event to stdout (JSONL). - This method writes to /data/output/progress.json which can be polled - by the orchestrator or UI to show real-time progress. + Progress is written as a single JSON line to stdout so that the + orchestrator can capture it via ``kubectl logs`` without requiring + any file-system access inside the container. :param progress: Progress percentage (0-100). - :param status: Current status ("initializing", "running", "completed", "failed"). + :param status: Current module status. :param message: Human-readable status message. :param metrics: Dictionary of metrics (e.g., {"executions": 1000, "coverage": 50}). :param current_task: Name of the current task being performed. """ - elapsed = time.time() - self.__start_time - - progress_data = { - "module": self.__name, - "version": self.__version, - "status": status, - "progress": max(0, min(100, progress)), - "message": message, - "current_task": current_task, - "elapsed_seconds": round(elapsed, 2), - "timestamp": datetime.now(timezone.utc).isoformat(), - "metrics": metrics or {}, - } - - PATH_TO_PROGRESS.write_text(json.dumps(progress_data, indent=2)) + self.emit_event( + "progress", + status=status.value, + progress=max(0, min(100, progress)), + message=message, + current_task=current_task, + metrics=metrics or {}, + ) @final def emit_event(self, event: str, **data: Any) -> None: - """Emit a streaming event to the stream file. + """Emit a structured event to stdout as a single JSONL line. - This method appends to /data/output/stream.jsonl which can be tailed - by the orchestrator or UI for real-time event streaming. + All module events (including progress updates) are written to stdout + so the orchestrator can stream them in real time via ``kubectl logs``. - :param event: Event type (e.g., "crash_found", "target_started", "metrics"). + :param event: Event type (e.g., ``"crash_found"``, ``"target_started"``, + ``"progress"``, ``"metrics"``). :param data: Additional event data as keyword arguments. """ - elapsed = time.time() - self.__start_time - event_data = { "timestamp": datetime.now(timezone.utc).isoformat(), - "elapsed_seconds": round(elapsed, 2), + "elapsed_seconds": round(self.get_elapsed_seconds(), 2), "module": self.__name, "event": event, **data, } - - # Append to stream file (create if doesn't exist) - with PATH_TO_STREAM.open("a") as f: - f.write(json.dumps(event_data) + "\n") + print(json.dumps(event_data), flush=True) @final def get_elapsed_seconds(self) -> float: @@ -208,7 +256,7 @@ class FuzzForgeModule(ABC): @final def main(self) -> None: - """TODO.""" + """Execute the module lifecycle: prepare → run → cleanup → write results.""" result = FuzzForgeModuleResults.SUCCESS try: @@ -238,9 +286,8 @@ class FuzzForgeModule(ABC): result=result, **self.__output_data, ) - buffer = output.model_dump_json().encode("utf-8") PATH_TO_RESULTS.parent.mkdir(exist_ok=True, parents=True) - PATH_TO_RESULTS.write_bytes(buffer) + PATH_TO_RESULTS.write_bytes(output.model_dump_json().encode("utf-8")) @classmethod @abstractmethod