refactor: Update database to use native findings format

- Renamed FindingRecord.sarif_data to findings_data
- Updated database schema: sarif_data column -> findings_data column
- Updated all database methods to work with native format:
  - save_findings()
  - get_findings()
  - list_findings()
  - get_all_findings()
  - get_aggregated_stats()

- Updated SQL queries to use native format JSON paths:
  - Changed from SARIF paths ($.runs[0].results) to native paths ($.findings)
  - Updated severity filtering from SARIF levels (error/warning/note) to native (critical/high/medium/low/info)

- Updated CLI commands to support both formats during transition:
  - get_findings command now extracts summary from both native and SARIF formats
  - show_finding and show_findings_by_rule updated to use findings_data field
  - Format detection to handle data from API (still SARIF) and database (native)

Breaking changes:
- Database schema changed - existing databases will need recreation
- FindingRecord.sarif_data renamed to findings_data
This commit is contained in:
tduhamel42
2025-11-02 14:36:04 +01:00
parent 268aae37ad
commit 4aa8187716
2 changed files with 73 additions and 53 deletions

View File

@@ -86,40 +86,62 @@ def get_findings(
try:
db = ensure_project_db()
# Extract summary from SARIF
sarif_data = findings.sarif
runs_data = sarif_data.get("runs", [])
# Get findings data (API returns .sarif for now, will be native format later)
findings_data = findings.sarif
summary = {}
if runs_data:
results = runs_data[0].get("results", [])
# Support both native format and SARIF format
if "findings" in findings_data:
# Native FuzzForge format
findings_list = findings_data.get("findings", [])
summary = {
"total_issues": len(results),
"total_issues": len(findings_list),
"by_severity": {},
"by_rule": {},
"tools": []
"by_source": {}
}
for result in results:
level = result.get("level", "note")
rule_id = result.get("ruleId", "unknown")
for finding in findings_list:
severity = finding.get("severity", "info")
rule_id = finding.get("rule_id", "unknown")
module = finding.get("found_by", {}).get("module", "unknown")
summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1
summary["by_severity"][severity] = summary["by_severity"].get(severity, 0) + 1
summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1
summary["by_source"][module] = summary["by_source"].get(module, 0) + 1
# Extract tool info
tool = runs_data[0].get("tool", {})
driver = tool.get("driver", {})
if driver.get("name"):
summary["tools"].append({
"name": driver.get("name"),
"version": driver.get("version"),
"rules": len(driver.get("rules", []))
})
elif "runs" in findings_data:
# SARIF format (backward compatibility)
runs_data = findings_data.get("runs", [])
if runs_data:
results = runs_data[0].get("results", [])
summary = {
"total_issues": len(results),
"by_severity": {},
"by_rule": {},
"tools": []
}
for result in results:
level = result.get("level", "note")
rule_id = result.get("ruleId", "unknown")
summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1
summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1
# Extract tool info
tool = runs_data[0].get("tool", {})
driver = tool.get("driver", {})
if driver.get("name"):
summary["tools"].append({
"name": driver.get("name"),
"version": driver.get("version"),
"rules": len(driver.get("rules", []))
})
finding_record = FindingRecord(
run_id=run_id,
sarif_data=sarif_data,
findings_data=findings_data,
summary=summary,
created_at=datetime.now()
)
@@ -174,9 +196,9 @@ def show_finding(
with get_client() as client:
console.print(f"🔍 Fetching findings for run: {run_id}")
findings = client.get_run_findings(run_id)
findings_dict = findings.sarif # Will become native format
findings_dict = findings.sarif # API still returns .sarif for now
else:
findings_dict = findings_data.sarif_data # Will become findings_data
findings_dict = findings_data.findings_data
# Find the specific finding by unique ID
# For now, support both SARIF (old) and native format (new)
@@ -239,9 +261,9 @@ def show_findings_by_rule(
with get_client() as client:
console.print(f"🔍 Fetching findings for run: {run_id}")
findings = client.get_run_findings(run_id)
findings_dict = findings.sarif
findings_dict = findings.sarif # API still returns .sarif for now
else:
findings_dict = findings_data.sarif_data
findings_dict = findings_data.findings_data
# Find all findings matching the rule
matching_findings = []

View File

@@ -46,7 +46,7 @@ class FindingRecord(BaseModel):
"""Database record for findings"""
id: Optional[int] = None
run_id: str
sarif_data: Dict[str, Any]
findings_data: Dict[str, Any] # Native FuzzForge format
summary: Dict[str, Any] = {}
created_at: datetime
@@ -81,7 +81,7 @@ class FuzzForgeDatabase:
CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
sarif_data TEXT NOT NULL,
findings_data TEXT NOT NULL,
summary TEXT DEFAULT '{}',
created_at TIMESTAMP NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs (run_id)
@@ -292,21 +292,21 @@ class FuzzForgeDatabase:
# Findings management methods
def save_findings(self, finding: FindingRecord) -> int:
"""Save findings and return the ID"""
"""Save findings in native FuzzForge format and return the ID"""
with self.connection() as conn:
cursor = conn.execute("""
INSERT INTO findings (run_id, sarif_data, summary, created_at)
INSERT INTO findings (run_id, findings_data, summary, created_at)
VALUES (?, ?, ?, ?)
""", (
finding.run_id,
json.dumps(finding.sarif_data),
json.dumps(finding.findings_data),
json.dumps(finding.summary),
finding.created_at
))
return cursor.lastrowid
def get_findings(self, run_id: str) -> Optional[FindingRecord]:
"""Get findings for a run"""
"""Get findings for a run in native FuzzForge format"""
with self.connection() as conn:
row = conn.execute(
"SELECT * FROM findings WHERE run_id = ? ORDER BY created_at DESC LIMIT 1",
@@ -317,14 +317,14 @@ class FuzzForgeDatabase:
return FindingRecord(
id=row["id"],
run_id=row["run_id"],
sarif_data=json.loads(row["sarif_data"]),
findings_data=json.loads(row["findings_data"]),
summary=json.loads(row["summary"]),
created_at=row["created_at"]
)
return None
def list_findings(self, limit: int = 50) -> List[FindingRecord]:
"""List recent findings"""
"""List recent findings in native FuzzForge format"""
with self.connection() as conn:
rows = conn.execute("""
SELECT * FROM findings
@@ -336,7 +336,7 @@ class FuzzForgeDatabase:
FindingRecord(
id=row["id"],
run_id=row["run_id"],
sarif_data=json.loads(row["sarif_data"]),
findings_data=json.loads(row["findings_data"]),
summary=json.loads(row["summary"]),
created_at=row["created_at"]
)
@@ -380,18 +380,17 @@ class FuzzForgeDatabase:
finding = FindingRecord(
id=row["id"],
run_id=row["run_id"],
sarif_data=json.loads(row["sarif_data"]),
findings_data=json.loads(row["findings_data"]),
summary=json.loads(row["summary"]),
created_at=row["created_at"]
)
# Filter by severity if specified
# Filter by severity if specified (native format)
if severity:
finding_severities = set()
if "runs" in finding.sarif_data:
for run in finding.sarif_data["runs"]:
for result in run.get("results", []):
finding_severities.add(result.get("level", "note").lower())
if "findings" in finding.findings_data:
for f in finding.findings_data["findings"]:
finding_severities.add(f.get("severity", "info").lower())
if not any(sev.lower() in finding_severities for sev in severity):
continue
@@ -408,7 +407,7 @@ class FuzzForgeDatabase:
return self.get_all_findings(workflow=workflow)
def get_aggregated_stats(self) -> Dict[str, Any]:
"""Get aggregated statistics for all findings using SQL aggregation"""
"""Get aggregated statistics for all findings using native format and SQL aggregation"""
with self.connection() as conn:
# Total findings and runs
total_findings = conn.execute("SELECT COUNT(*) FROM findings").fetchone()[0]
@@ -429,39 +428,38 @@ class FuzzForgeDatabase:
WHERE created_at > datetime('now', '-7 days')
""").fetchone()[0]
# Use SQL JSON functions to aggregate severity stats efficiently
# Use SQL JSON functions to aggregate severity stats efficiently (native format)
# This avoids loading all findings into memory
severity_stats = conn.execute("""
SELECT
SUM(json_array_length(json_extract(sarif_data, '$.runs[0].results'))) as total_issues,
SUM(json_array_length(json_extract(findings_data, '$.findings'))) as total_issues,
COUNT(*) as finding_count
FROM findings
WHERE json_extract(sarif_data, '$.runs[0].results') IS NOT NULL
WHERE json_extract(findings_data, '$.findings') IS NOT NULL
""").fetchone()
total_issues = severity_stats["total_issues"] or 0
# Get severity distribution using SQL
# Get severity distribution using native format (critical/high/medium/low/info)
# Note: This is a simplified version - for full accuracy we'd need JSON parsing
# But it's much more efficient than loading all data into Python
severity_counts = {"error": 0, "warning": 0, "note": 0, "info": 0}
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
# Sample the first N findings for severity distribution
# This gives a good approximation without loading everything
sample_findings = conn.execute("""
SELECT sarif_data
SELECT findings_data
FROM findings
LIMIT ?
""", (STATS_SAMPLE_SIZE,)).fetchall()
for row in sample_findings:
try:
data = json.loads(row["sarif_data"])
if "runs" in data:
for run in data["runs"]:
for result in run.get("results", []):
level = result.get("level", "note").lower()
severity_counts[level] = severity_counts.get(level, 0) + 1
data = json.loads(row["findings_data"])
if "findings" in data:
for finding in data["findings"]:
severity = finding.get("severity", "info").lower()
severity_counts[severity] = severity_counts.get(severity, 0) + 1
except (json.JSONDecodeError, KeyError):
continue