diff --git a/fuzzforge-mcp/src/fuzzforge_mcp/storage.py b/fuzzforge-mcp/src/fuzzforge_mcp/storage.py index dbf0bce..c84dc41 100644 --- a/fuzzforge-mcp/src/fuzzforge_mcp/storage.py +++ b/fuzzforge-mcp/src/fuzzforge_mcp/storage.py @@ -13,6 +13,7 @@ from __future__ import annotations import json import logging +import mimetypes from datetime import UTC, datetime from pathlib import Path from tarfile import open as Archive # noqa: N812 @@ -90,6 +91,7 @@ class LocalStorage: "# FuzzForge storage - ignore large/temporary files\n" "runs/\n" "output/\n" + "artifacts.json\n" "!config.json\n" ) @@ -273,3 +275,203 @@ class LocalStorage: except Exception as exc: msg = f"Failed to extract results: {exc}" raise StorageError(msg) from exc + + # ------------------------------------------------------------------ + # Artifact tracking + # ------------------------------------------------------------------ + + def _artifacts_path(self, project_path: Path) -> Path: + """Get the path to the artifacts registry file. + + :param project_path: Path to the project directory. + :returns: Path to artifacts.json. + + """ + return self._get_project_path(project_path) / "artifacts.json" + + def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]: + """Load the artifact registry from disk. + + :param project_path: Path to the project directory. + :returns: List of artifact dicts. + + """ + path = self._artifacts_path(project_path) + if path.exists(): + try: + return json.loads(path.read_text()) # type: ignore[no-any-return] + except (json.JSONDecodeError, OSError): + return [] + return [] + + def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None: + """Persist the artifact registry to disk. + + :param project_path: Path to the project directory. + :param artifacts: Full artifact list to write. + + """ + path = self._artifacts_path(project_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(artifacts, indent=2, default=str)) + + def _classify_file(self, file_path: Path) -> str: + """Classify a file into a human-friendly type string. + + :param file_path: Path to the file. + :returns: Type string (e.g. "elf-binary", "text", "directory"). + + """ + mime, _ = mimetypes.guess_type(str(file_path)) + suffix = file_path.suffix.lower() + + # Try reading ELF magic for binaries with no extension + if mime is None and suffix == "": + try: + header = file_path.read_bytes()[:4] + if header == b"\x7fELF": + return "elf-binary" + except OSError: + pass + + if mime: + if "json" in mime: + return "json" + if "text" in mime or "xml" in mime or "yaml" in mime: + return "text" + if "image" in mime: + return "image" + if "octet-stream" in mime: + return "binary" + + type_map: dict[str, str] = { + ".json": "json", + ".sarif": "sarif", + ".md": "markdown", + ".txt": "text", + ".log": "text", + ".csv": "csv", + ".yaml": "yaml", + ".yml": "yaml", + ".xml": "xml", + ".html": "html", + ".elf": "elf-binary", + ".so": "elf-binary", + ".bin": "binary", + ".gz": "archive", + ".tar": "archive", + ".zip": "archive", + } + return type_map.get(suffix, "binary") + + def scan_artifacts( + self, + project_path: Path, + server_name: str, + tool_name: str, + ) -> list[dict[str, Any]]: + """Scan the output directory for new or modified files and register them. + + Compares the current state of .fuzzforge/output/ against the existing + artifact registry and registers any new or modified files. + + :param project_path: Path to the project directory. + :param server_name: Hub server that produced the artifacts. + :param tool_name: Tool that produced the artifacts. + :returns: List of newly registered artifact dicts. + + """ + output_path = self.get_project_output_path(project_path) + if output_path is None or not output_path.exists(): + return [] + + existing = self._load_artifacts(project_path) + known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing} + now = datetime.now(tz=UTC).isoformat() + + new_artifacts: list[dict[str, Any]] = [] + for file_path in output_path.rglob("*"): + if not file_path.is_file(): + continue + + # Use the container-style path (/app/output/...) so it's + # directly usable in subsequent tool calls. + relative = file_path.relative_to(output_path) + container_path = f"/app/output/{relative}" + + stat = file_path.stat() + size = stat.st_size + mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat() + + prev = known.get(container_path) + if prev and prev.get("mtime") == mtime and prev.get("size") == size: + continue # Unchanged — skip + + artifact: dict[str, Any] = { + "path": container_path, + "host_path": str(file_path), + "type": self._classify_file(file_path), + "size": size, + "mtime": mtime, + "source_server": server_name, + "source_tool": tool_name, + "registered_at": now, + } + + if prev: + # Update existing entry in-place + idx = next(i for i, a in enumerate(existing) if a["path"] == container_path) + existing[idx] = artifact + else: + existing.append(artifact) + + new_artifacts.append(artifact) + + if new_artifacts: + self._save_artifacts(project_path, existing) + logger.info( + "Registered %d new artifact(s) from %s:%s", + len(new_artifacts), + server_name, + tool_name, + ) + + return new_artifacts + + def list_artifacts( + self, + project_path: Path, + *, + source: str | None = None, + artifact_type: str | None = None, + ) -> list[dict[str, Any]]: + """List registered artifacts, with optional filters. + + :param project_path: Path to the project directory. + :param source: Filter by source server name. + :param artifact_type: Filter by artifact type (e.g. "elf-binary", "json"). + :returns: List of matching artifact dicts. + + """ + artifacts = self._load_artifacts(project_path) + + if source: + artifacts = [a for a in artifacts if a.get("source_server") == source] + if artifact_type: + artifacts = [a for a in artifacts if a.get("type") == artifact_type] + + return artifacts + + def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None: + """Get a single artifact by its container path. + + :param project_path: Path to the project directory. + :param path: Container path of the artifact (e.g. /app/output/...). + :returns: Artifact dict, or None if not found. + + """ + artifacts = self._load_artifacts(project_path) + for artifact in artifacts: + if artifact["path"] == path: + return artifact + return None diff --git a/fuzzforge-mcp/src/fuzzforge_mcp/tools/projects.py b/fuzzforge-mcp/src/fuzzforge_mcp/tools/projects.py index 52b6c4a..678fd37 100644 --- a/fuzzforge-mcp/src/fuzzforge_mcp/tools/projects.py +++ b/fuzzforge-mcp/src/fuzzforge_mcp/tools/projects.py @@ -146,3 +146,70 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None except Exception as exception: message: str = f"Failed to get execution results: {exception}" raise ToolError(message) from exception + + +@mcp.tool +async def list_artifacts( + source: str | None = None, + artifact_type: str | None = None, +) -> dict[str, Any]: + """List all artifacts produced by hub tools in the current project. + + Artifacts are files created by tool executions in /app/output/. + They are automatically tracked after each execute_hub_tool call. + + :param source: Filter by source server name (e.g. "binwalk-mcp"). + :param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive"). + :return: List of artifacts with path, type, size, and source info. + + """ + storage = get_storage() + project_path: Path = get_project_path() + + try: + artifacts = storage.list_artifacts( + project_path, + source=source, + artifact_type=artifact_type, + ) + + return { + "success": True, + "artifacts": artifacts, + "count": len(artifacts), + } + + except Exception as exception: + message: str = f"Failed to list artifacts: {exception}" + raise ToolError(message) from exception + + +@mcp.tool +async def get_artifact(path: str) -> dict[str, Any]: + """Get metadata for a specific artifact by its container path. + + :param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd). + :return: Artifact metadata including path, type, size, source tool, and timestamps. + + """ + storage = get_storage() + project_path: Path = get_project_path() + + try: + artifact = storage.get_artifact(project_path, path) + + if artifact is None: + return { + "success": False, + "path": path, + "error": "Artifact not found", + } + + return { + "success": True, + "artifact": artifact, + } + + except Exception as exception: + message: str = f"Failed to get artifact: {exception}" + raise ToolError(message) from exception diff --git a/pyproject.toml b/pyproject.toml index 8b483df..2e2e1eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fuzzforge-oss" -version = "1.0.0" +version = "0.8.0" description = "FuzzForge AI - AI-driven security research platform for local execution" readme = "README.md" requires-python = ">=3.14"