Files
CVEs-PoC/scripts/pipeline_outputs.py

221 lines
7.8 KiB
Python

from __future__ import annotations
from collections import Counter
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from utils import API_DIR, DIFFS_DIR, SNAPSHOT_DIR, TOP_DIR, ensure_dirs, load_json, save_json, today_str
def write_cve_outputs(results: List[Dict], *, base_dir: Path | None = None) -> None:
target_dir = base_dir or API_DIR / "cve"
ensure_dirs(target_dir)
for result in results:
last_updated = result.get("last_updated") or today_str()
output = {
"cve_id": result["cve_id"],
"last_updated": last_updated,
"pocs": [
{
"repo_full_name": poc.get("repo_full_name"),
"repo_url": poc.get("repo_url"),
"is_fork": poc.get("is_fork"),
"parent_repo_url": poc.get("parent_repo_url"),
"stars": poc.get("stars"),
"forks": poc.get("forks"),
"archived": poc.get("archived"),
"pushed_at": poc.get("pushed_at") or poc.get("updated_at"),
"topics": poc.get("topics", []),
"primary_language": poc.get("primary_language"),
"matches": poc.get("matches", []),
"confidence_score": poc.get("confidence_score"),
"confidence_tier": poc.get("confidence_tier"),
}
for poc in result.get("pocs", [])
],
}
save_json(target_dir / f"{result['cve_id']}.json", output)
def build_index(results: List[Dict]) -> Dict:
items: List[Dict] = []
for result in results:
poc_entries = result.get("pocs", [])
high = [p for p in poc_entries if p.get("confidence_tier") == "high"]
medium = [p for p in poc_entries if p.get("confidence_tier") == "medium"]
langs = Counter()
max_score = 0.0
for poc in poc_entries:
lang = poc.get("primary_language")
if lang:
langs[lang] += 1
max_score = max(max_score, poc.get("confidence_score") or 0)
items.append(
{
"cve_id": result["cve_id"],
"poc_count": len(poc_entries),
"high_confidence": len(high),
"medium_confidence": len(medium),
"top_languages": [lang for lang, _ in langs.most_common(3)],
"max_score": max_score,
"last_updated": result.get("last_updated"),
}
)
return {"generated": today_str(), "items": sorted(items, key=lambda r: r["cve_id"], reverse=True)}
def write_index(results: List[Dict]) -> Dict:
ensure_dirs(API_DIR)
payload = build_index(results)
save_json(API_DIR / "index.json", payload)
return payload
def write_top(results: List[Dict], *, limit: int = 100) -> Dict:
ensure_dirs(TOP_DIR)
entries: List[Dict] = []
for result in results:
for poc in result.get("pocs", []):
if poc.get("confidence_tier") not in {"high", "medium"}:
continue
entries.append(
{
"cve_id": result["cve_id"],
"repo_full_name": poc.get("repo_full_name"),
"repo_url": poc.get("repo_url"),
"score": poc.get("confidence_score"),
"tier": poc.get("confidence_tier"),
"stars": poc.get("stars"),
"primary_language": poc.get("primary_language"),
}
)
entries.sort(key=lambda e: (-(e.get("score") or 0), -(e.get("stars") or 0)))
payload = {"generated": today_str(), "items": entries[:limit]}
save_json(TOP_DIR / "today.json", payload)
return payload
def summarise_for_snapshot(results: List[Dict], *, top: Dict | None = None) -> Dict:
summary: Dict[str, Dict[str, Dict]] = {}
for result in results:
repo_map: Dict[str, Dict] = {}
for poc in result.get("pocs", []):
repo_map[poc.get("repo_full_name")] = {
"score": poc.get("confidence_score"),
"tier": poc.get("confidence_tier"),
}
summary[result["cve_id"]] = repo_map
payload = {"generated": today_str(), "entries": summary}
if top:
payload["top"] = top
return payload
def write_snapshot(summary: Dict) -> Path:
ensure_dirs(SNAPSHOT_DIR)
target = SNAPSHOT_DIR / f"{summary['generated']}.json"
save_json(target, summary)
save_json(SNAPSHOT_DIR / "latest.json", summary)
return target
def prune_old_snapshots(days: int = 14) -> None:
if not SNAPSHOT_DIR.exists():
return
cutoff = datetime.utcnow().date() - timedelta(days=days)
for snap in SNAPSHOT_DIR.glob("*.json"):
try:
snap_date = datetime.strptime(snap.stem, "%Y-%m-%d").date()
except ValueError:
continue
if snap_date < cutoff:
snap.unlink(missing_ok=True)
def prune_old_diffs(days: int = 14) -> None:
if not DIFFS_DIR.exists():
return
cutoff = datetime.now().date() - timedelta(days=days)
for diff in DIFFS_DIR.glob("*.json"):
try:
diff_date = datetime.strptime(diff.stem, "%Y-%m-%d").date()
except ValueError:
continue
if diff_date < cutoff:
diff.unlink(missing_ok=True)
def _load_snapshot(path: Path) -> Dict:
return load_json(path, default={}) or {}
def build_diff(prev: Dict, curr: Dict, *, dead_links: List[Dict] | None = None) -> Dict:
prev_entries = prev.get("entries", {})
curr_entries = curr.get("entries", {})
new_high: List[Dict] = []
promoted: List[Dict] = []
demoted: List[Dict] = []
for cve_id, repos in curr_entries.items():
for repo_name, info in repos.items():
tier = info.get("tier")
if tier != "high":
continue
prev_info = (prev_entries.get(cve_id) or {}).get(repo_name)
if not prev_info:
new_high.append({"cve_id": cve_id, "repo_full_name": repo_name, "score": info.get("score")})
elif prev_info.get("tier") != "high":
promoted.append(
{
"cve_id": cve_id,
"repo_full_name": repo_name,
"score": info.get("score"),
"previous_tier": prev_info.get("tier"),
}
)
for cve_id, repos in prev_entries.items():
for repo_name, info in repos.items():
if info.get("tier") != "high":
continue
curr_info = (curr_entries.get(cve_id) or {}).get(repo_name)
if not curr_info or curr_info.get("tier") != "high":
demoted.append(
{
"cve_id": cve_id,
"repo_full_name": repo_name,
"previous_score": info.get("score"),
"previous_tier": info.get("tier"),
"current_tier": curr_info.get("tier") if curr_info else None,
}
)
return {
"generated": curr.get("generated"),
"new_high_conf_pocs": new_high,
"promoted_to_high": promoted,
"demoted_or_removed": demoted,
"dead_links": dead_links or [],
}
def write_diff(diff: Dict) -> Path:
ensure_dirs(DIFFS_DIR)
target = DIFFS_DIR / f"{diff['generated']}.json"
save_json(target, diff)
save_json(DIFFS_DIR / "latest.json", diff)
return target
def latest_snapshots() -> Tuple[Dict, Dict]:
if not SNAPSHOT_DIR.exists():
return {}, {}
snaps = sorted(SNAPSHOT_DIR.glob("*.json"))
if not snaps:
return {}, {}
curr = _load_snapshot(snaps[-1])
prev = _load_snapshot(snaps[-2]) if len(snaps) > 1 else {}
return prev, curr