mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-04-09 14:22:18 +02:00
Merge pull request #53 from FuzzingLabs/feat/artifact-management
feat: implement artifact management tools
This commit is contained in:
@@ -13,6 +13,7 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from tarfile import open as Archive # noqa: N812
|
||||
@@ -90,6 +91,7 @@ class LocalStorage:
|
||||
"# FuzzForge storage - ignore large/temporary files\n"
|
||||
"runs/\n"
|
||||
"output/\n"
|
||||
"artifacts.json\n"
|
||||
"!config.json\n"
|
||||
)
|
||||
|
||||
@@ -273,3 +275,203 @@ class LocalStorage:
|
||||
except Exception as exc:
|
||||
msg = f"Failed to extract results: {exc}"
|
||||
raise StorageError(msg) from exc
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Artifact tracking
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _artifacts_path(self, project_path: Path) -> Path:
|
||||
"""Get the path to the artifacts registry file.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:returns: Path to artifacts.json.
|
||||
|
||||
"""
|
||||
return self._get_project_path(project_path) / "artifacts.json"
|
||||
|
||||
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
|
||||
"""Load the artifact registry from disk.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:returns: List of artifact dicts.
|
||||
|
||||
"""
|
||||
path = self._artifacts_path(project_path)
|
||||
if path.exists():
|
||||
try:
|
||||
return json.loads(path.read_text()) # type: ignore[no-any-return]
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return []
|
||||
return []
|
||||
|
||||
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
|
||||
"""Persist the artifact registry to disk.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:param artifacts: Full artifact list to write.
|
||||
|
||||
"""
|
||||
path = self._artifacts_path(project_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(artifacts, indent=2, default=str))
|
||||
|
||||
def _classify_file(self, file_path: Path) -> str:
|
||||
"""Classify a file into a human-friendly type string.
|
||||
|
||||
:param file_path: Path to the file.
|
||||
:returns: Type string (e.g. "elf-binary", "text", "directory").
|
||||
|
||||
"""
|
||||
mime, _ = mimetypes.guess_type(str(file_path))
|
||||
suffix = file_path.suffix.lower()
|
||||
|
||||
# Try reading ELF magic for binaries with no extension
|
||||
if mime is None and suffix == "":
|
||||
try:
|
||||
header = file_path.read_bytes()[:4]
|
||||
if header == b"\x7fELF":
|
||||
return "elf-binary"
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
if mime:
|
||||
if "json" in mime:
|
||||
return "json"
|
||||
if "text" in mime or "xml" in mime or "yaml" in mime:
|
||||
return "text"
|
||||
if "image" in mime:
|
||||
return "image"
|
||||
if "octet-stream" in mime:
|
||||
return "binary"
|
||||
|
||||
type_map: dict[str, str] = {
|
||||
".json": "json",
|
||||
".sarif": "sarif",
|
||||
".md": "markdown",
|
||||
".txt": "text",
|
||||
".log": "text",
|
||||
".csv": "csv",
|
||||
".yaml": "yaml",
|
||||
".yml": "yaml",
|
||||
".xml": "xml",
|
||||
".html": "html",
|
||||
".elf": "elf-binary",
|
||||
".so": "elf-binary",
|
||||
".bin": "binary",
|
||||
".gz": "archive",
|
||||
".tar": "archive",
|
||||
".zip": "archive",
|
||||
}
|
||||
return type_map.get(suffix, "binary")
|
||||
|
||||
def scan_artifacts(
|
||||
self,
|
||||
project_path: Path,
|
||||
server_name: str,
|
||||
tool_name: str,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Scan the output directory for new or modified files and register them.
|
||||
|
||||
Compares the current state of .fuzzforge/output/ against the existing
|
||||
artifact registry and registers any new or modified files.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:param server_name: Hub server that produced the artifacts.
|
||||
:param tool_name: Tool that produced the artifacts.
|
||||
:returns: List of newly registered artifact dicts.
|
||||
|
||||
"""
|
||||
output_path = self.get_project_output_path(project_path)
|
||||
if output_path is None or not output_path.exists():
|
||||
return []
|
||||
|
||||
existing = self._load_artifacts(project_path)
|
||||
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
|
||||
now = datetime.now(tz=UTC).isoformat()
|
||||
|
||||
new_artifacts: list[dict[str, Any]] = []
|
||||
for file_path in output_path.rglob("*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
|
||||
# Use the container-style path (/app/output/...) so it's
|
||||
# directly usable in subsequent tool calls.
|
||||
relative = file_path.relative_to(output_path)
|
||||
container_path = f"/app/output/{relative}"
|
||||
|
||||
stat = file_path.stat()
|
||||
size = stat.st_size
|
||||
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
|
||||
|
||||
prev = known.get(container_path)
|
||||
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
|
||||
continue # Unchanged — skip
|
||||
|
||||
artifact: dict[str, Any] = {
|
||||
"path": container_path,
|
||||
"host_path": str(file_path),
|
||||
"type": self._classify_file(file_path),
|
||||
"size": size,
|
||||
"mtime": mtime,
|
||||
"source_server": server_name,
|
||||
"source_tool": tool_name,
|
||||
"registered_at": now,
|
||||
}
|
||||
|
||||
if prev:
|
||||
# Update existing entry in-place
|
||||
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
|
||||
existing[idx] = artifact
|
||||
else:
|
||||
existing.append(artifact)
|
||||
|
||||
new_artifacts.append(artifact)
|
||||
|
||||
if new_artifacts:
|
||||
self._save_artifacts(project_path, existing)
|
||||
logger.info(
|
||||
"Registered %d new artifact(s) from %s:%s",
|
||||
len(new_artifacts),
|
||||
server_name,
|
||||
tool_name,
|
||||
)
|
||||
|
||||
return new_artifacts
|
||||
|
||||
def list_artifacts(
|
||||
self,
|
||||
project_path: Path,
|
||||
*,
|
||||
source: str | None = None,
|
||||
artifact_type: str | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""List registered artifacts, with optional filters.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:param source: Filter by source server name.
|
||||
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
|
||||
:returns: List of matching artifact dicts.
|
||||
|
||||
"""
|
||||
artifacts = self._load_artifacts(project_path)
|
||||
|
||||
if source:
|
||||
artifacts = [a for a in artifacts if a.get("source_server") == source]
|
||||
if artifact_type:
|
||||
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
|
||||
|
||||
return artifacts
|
||||
|
||||
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
|
||||
"""Get a single artifact by its container path.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:param path: Container path of the artifact (e.g. /app/output/...).
|
||||
:returns: Artifact dict, or None if not found.
|
||||
|
||||
"""
|
||||
artifacts = self._load_artifacts(project_path)
|
||||
for artifact in artifacts:
|
||||
if artifact["path"] == path:
|
||||
return artifact
|
||||
return None
|
||||
|
||||
@@ -146,3 +146,70 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to get execution results: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
|
||||
@mcp.tool
|
||||
async def list_artifacts(
|
||||
source: str | None = None,
|
||||
artifact_type: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""List all artifacts produced by hub tools in the current project.
|
||||
|
||||
Artifacts are files created by tool executions in /app/output/.
|
||||
They are automatically tracked after each execute_hub_tool call.
|
||||
|
||||
:param source: Filter by source server name (e.g. "binwalk-mcp").
|
||||
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
|
||||
:return: List of artifacts with path, type, size, and source info.
|
||||
|
||||
"""
|
||||
storage = get_storage()
|
||||
project_path: Path = get_project_path()
|
||||
|
||||
try:
|
||||
artifacts = storage.list_artifacts(
|
||||
project_path,
|
||||
source=source,
|
||||
artifact_type=artifact_type,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"artifacts": artifacts,
|
||||
"count": len(artifacts),
|
||||
}
|
||||
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to list artifacts: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
|
||||
@mcp.tool
|
||||
async def get_artifact(path: str) -> dict[str, Any]:
|
||||
"""Get metadata for a specific artifact by its container path.
|
||||
|
||||
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
|
||||
:return: Artifact metadata including path, type, size, source tool, and timestamps.
|
||||
|
||||
"""
|
||||
storage = get_storage()
|
||||
project_path: Path = get_project_path()
|
||||
|
||||
try:
|
||||
artifact = storage.get_artifact(project_path, path)
|
||||
|
||||
if artifact is None:
|
||||
return {
|
||||
"success": False,
|
||||
"path": path,
|
||||
"error": "Artifact not found",
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"artifact": artifact,
|
||||
}
|
||||
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to get artifact: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "fuzzforge-oss"
|
||||
version = "1.0.0"
|
||||
version = "0.8.0"
|
||||
description = "FuzzForge AI - AI-driven security research platform for local execution"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.14"
|
||||
|
||||
Reference in New Issue
Block a user