Refresh KEV and EPSS pages for cleaner signal

This commit is contained in:
0xMarcio
2025-12-17 18:24:55 +01:00
parent 1ae3d7e711
commit 0f18fc5736
21 changed files with 14828 additions and 2663 deletions
+32 -4
View File
@@ -8,8 +8,9 @@ from typing import Dict, List, Tuple
from utils import API_DIR, SNAPSHOT_DIR, ensure_dirs, load_json, save_json
DEFAULT_LOOKBACK_DAYS = 14
DEFAULT_HIGH_EPSS_THRESHOLD = 0.5
DEFAULT_HIGH_EPSS_THRESHOLD = 0.05
DEFAULT_MAX_MOVERS = 50
DEFAULT_RECENT_KEV_DAYS = 30
def parse_date(date_str: str) -> date:
@@ -42,7 +43,31 @@ def compute_epss_movers(prev_epss: Dict[str, Dict], curr_epss: Dict[str, Dict],
return deltas[:max_items]
def build_diff(snapshots: List[Path], *, threshold: float, max_movers: int) -> Tuple[Dict, Path | None]:
def filter_recent_kev(kev_items: List[Dict], *, recent_days: int) -> List[Dict]:
cutoff = datetime.utcnow().date() - timedelta(days=recent_days)
fresh: List[Tuple[date, Dict]] = []
for row in kev_items:
date_str = row.get("date_added") or row.get("dateAdded")
if not date_str:
continue
try:
added = parse_date(date_str)
except ValueError:
continue
if added >= cutoff:
fresh.append((added, row))
fresh.sort(key=lambda item: (item[0], item[1].get("percentile") or 0), reverse=True)
return [row for _, row in fresh]
def build_diff(
snapshots: List[Path],
kev_full: List[Dict] | None = None,
*,
threshold: float,
max_movers: int,
recent_days: int,
) -> Tuple[Dict, Path | None]:
if not snapshots:
return {}, None
latest_path = snapshots[-1]
@@ -62,9 +87,11 @@ def build_diff(snapshots: List[Path], *, threshold: float, max_movers: int) -> T
curr_epss_lookup = {row["cve"]: row for row in latest.get("high_epss", [])}
epss_movers = compute_epss_movers(prev_epss_lookup, curr_epss_lookup, max_movers)
kev_recent = filter_recent_kev(kev_full or latest.get("kev_top", []), recent_days=recent_days)
diff_outputs = {
"generated": latest_date,
"new_kev_entries": kev_diff["new"],
"new_kev_entries": kev_recent,
"removed_kev_entries": kev_diff["removed"],
"new_high_epss": [row for row in high_epss_diff["new"] if (row.get("epss") or 0) >= threshold],
"removed_high_epss": high_epss_diff["removed"],
@@ -93,11 +120,12 @@ def main() -> int:
parser.add_argument("--threshold", type=float, default=DEFAULT_HIGH_EPSS_THRESHOLD, help="High EPSs minimum threshold")
parser.add_argument("--lookback", type=int, default=DEFAULT_LOOKBACK_DAYS, help="How many days of snapshots to keep")
parser.add_argument("--max-movers", type=int, default=DEFAULT_MAX_MOVERS, help="Max EPSs movers to keep")
parser.add_argument("--recent-days", type=int, default=DEFAULT_RECENT_KEV_DAYS, help="Days of KEV entries to surface as new")
args = parser.parse_args()
ensure_dirs(SNAPSHOT_DIR)
snapshots = sorted(SNAPSHOT_DIR.glob("*.json"))
diff, target = build_diff(snapshots, threshold=args.threshold, max_movers=args.max_movers)
diff, target = build_diff(snapshots, kev_full=None, threshold=args.threshold, max_movers=args.max_movers, recent_days=args.recent_days)
if target:
print(f"Wrote diff to {target}")
else:
+40 -22
View File
@@ -22,8 +22,8 @@ KEV_PATH = DATA_DIR / "kev.json"
EPSS_PATH = DATA_DIR / "epss.json"
DEFAULT_TOP_KEV = 75
DEFAULT_HIGH_EPSS_LIMIT = 250
DEFAULT_HIGH_EPSS_THRESHOLD = 0.5
DEFAULT_HIGH_EPSS_LIMIT = 50
DEFAULT_HIGH_EPSS_THRESHOLD = 0.05
def load_inputs(kev_path: Path, epss_path: Path) -> Tuple[Dict, Dict]:
@@ -72,26 +72,44 @@ def build_high_epss_not_in_kev(
threshold: float,
limit: int,
) -> List[Dict]:
output: List[Dict] = []
for row in epss_items:
cve = row.get("cve", "").upper()
if not cve or cve in kev_set:
continue
epss_score = row.get("epss") or 0.0
if epss_score < threshold:
continue
poc_count = len(poc_index.get(cve, {}).get("poc", []))
output.append(
{
"cve": cve,
"epss": row.get("epss"),
"percentile": row.get("percentile"),
"poc_count": poc_count,
}
)
if len(output) >= limit:
break
return output
ranked = sorted(
(
row
for row in epss_items
if row.get("cve")
and row.get("cve", "").upper() not in kev_set
and (row.get("epss") is not None)
),
key=lambda row: (-float(row.get("epss") or 0), row.get("cve", "")),
)
def build_rows(source: List[Dict]) -> List[Dict]:
output: List[Dict] = []
for row in source:
cve = row.get("cve", "").upper()
if not cve:
continue
epss_score = row.get("epss") or 0.0
if epss_score < threshold:
continue
poc_count = len(poc_index.get(cve, {}).get("poc", []))
output.append(
{
"cve": cve,
"epss": row.get("epss"),
"percentile": row.get("percentile"),
"poc_count": poc_count,
}
)
if len(output) >= limit:
break
return output
rows = build_rows(ranked)
if not rows and threshold > 0:
# If the threshold is too strict for a given day, fall back to the top ranked set.
rows = build_rows([dict(row, epss=row.get("epss", 0) or 0) for row in ranked[:limit]])
return rows
def build_cve_details(
+14 -2
View File
@@ -60,9 +60,15 @@ def build_pages(env: Environment, data: Dict, diff: Dict | None = None, html_mod
details = data["details"]
vendors = data["vendors"]
trending = parse_trending_from_readme(README_PATH)
recent_kev = (diff or {}).get("new_kev_entries") or []
metrics = {
"kev_total": len(data["kev_enriched"]),
"high_epss_count": len(joined["high_epss"]),
"recent_kev_count": len(recent_kev),
}
if html_mode in {"summary", "all"}:
common_ctx = {"generated": joined["generated"]}
common_ctx = {"generated": joined["generated"], "metrics": metrics, "recent_kev": recent_kev}
render(
env,
"index.html",
@@ -99,7 +105,13 @@ def main() -> int:
# snapshot + diff before rendering so dashboard can show it
snapshot_path = write_snapshot(data["joined"])
snapshots = sorted((API_DIR / "snapshots").glob("*.json"))
diff, target = build_diff(snapshots, threshold=0.5, max_movers=50)
diff, target = build_diff(
snapshots,
kev_full=data["kev_enriched"],
threshold=0.05,
max_movers=50,
recent_days=30,
)
prune_snapshots(snapshots, lookback_days=14)
if args.html_mode != "none":