Add KEV/EPSS static site generator

This commit is contained in:
0xMarcio
2025-12-17 13:59:41 +01:00
parent b5a98c6151
commit aac323de9b
19 changed files with 1539 additions and 48 deletions
+12
View File
@@ -0,0 +1,12 @@
# Build pipeline
```
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python scripts/fetch_kev.py
python scripts/fetch_epss.py
python scripts/build_site.py
```
Outputs land in `docs/` and JSON under `docs/api/v1/`. Snapshots live in `docs/api/v1/snapshots/` (last 14 days) and diffs under `docs/api/v1/diff/`.
+111
View File
@@ -0,0 +1,111 @@
from __future__ import annotations
import argparse
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple
from utils import API_DIR, SNAPSHOT_DIR, ensure_dirs, load_json, save_json
DEFAULT_LOOKBACK_DAYS = 14
DEFAULT_HIGH_EPSS_THRESHOLD = 0.5
DEFAULT_MAX_MOVERS = 50
def parse_date(date_str: str) -> date:
return datetime.strptime(date_str, "%Y-%m-%d").date()
def load_snapshot(path: Path) -> Dict:
return load_json(path, default={}) or {}
def diff_lists(prev: List[Dict], curr: List[Dict], key: str = "cve") -> Dict[str, List[Dict]]:
prev_ids = {item[key]: item for item in prev}
curr_ids = {item[key]: item for item in curr}
new_items = [curr_ids[cve] for cve in sorted(curr_ids.keys() - prev_ids.keys())]
removed_items = [prev_ids[cve] for cve in sorted(prev_ids.keys() - curr_ids.keys())]
return {"new": new_items, "removed": removed_items}
def compute_epss_movers(prev_epss: Dict[str, Dict], curr_epss: Dict[str, Dict], max_items: int) -> List[Dict]:
deltas = []
for cve, curr in curr_epss.items():
prev = prev_epss.get(cve)
if not prev:
continue
delta = (curr.get("epss") or 0) - (prev.get("epss") or 0)
if abs(delta) < 0.0001:
continue
deltas.append({"cve": cve, "delta": round(delta, 5), "epss": curr.get("epss"), "prev_epss": prev.get("epss")})
deltas.sort(key=lambda row: (-row["delta"], row["cve"]))
return deltas[:max_items]
def build_diff(snapshots: List[Path], *, threshold: float, max_movers: int) -> Tuple[Dict, Path | None]:
if not snapshots:
return {}, None
latest_path = snapshots[-1]
latest = load_snapshot(latest_path)
latest_date = latest.get("generated") or latest_path.stem
if len(snapshots) >= 2:
prev = load_snapshot(snapshots[-2])
kev_diff = diff_lists(prev.get("kev_top", []), latest.get("kev_top", []))
high_epss_diff = diff_lists(prev.get("high_epss", []), latest.get("high_epss", []))
else:
prev = {}
kev_diff = {"new": latest.get("kev_top", []), "removed": []}
high_epss_diff = {"new": latest.get("high_epss", []), "removed": []}
prev_epss_lookup = {row["cve"]: row for row in (prev.get("high_epss", []) if prev else [])}
curr_epss_lookup = {row["cve"]: row for row in latest.get("high_epss", [])}
epss_movers = compute_epss_movers(prev_epss_lookup, curr_epss_lookup, max_movers)
diff_outputs = {
"generated": latest_date,
"new_kev_entries": kev_diff["new"],
"removed_kev_entries": kev_diff["removed"],
"new_high_epss": [row for row in high_epss_diff["new"] if (row.get("epss") or 0) >= threshold],
"removed_high_epss": high_epss_diff["removed"],
"epss_movers": epss_movers,
}
target = API_DIR / "diff" / f"{latest_date}.json"
ensure_dirs(target.parent)
save_json(target, diff_outputs)
# also write a stable latest pointer
save_json(target.parent / "latest.json", diff_outputs)
return diff_outputs, target
def prune_snapshots(snapshots: List[Path], *, lookback_days: int) -> None:
cutoff = datetime.utcnow().date() - timedelta(days=lookback_days)
for snap in snapshots:
snap_date = parse_date(snap.stem)
if snap_date < cutoff:
snap.unlink(missing_ok=True)
def main() -> int:
parser = argparse.ArgumentParser(description="Build daily diff JSON from snapshots")
parser.add_argument("--threshold", type=float, default=DEFAULT_HIGH_EPSS_THRESHOLD, help="High EPSs minimum threshold")
parser.add_argument("--lookback", type=int, default=DEFAULT_LOOKBACK_DAYS, help="How many days of snapshots to keep")
parser.add_argument("--max-movers", type=int, default=DEFAULT_MAX_MOVERS, help="Max EPSs movers to keep")
args = parser.parse_args()
ensure_dirs(SNAPSHOT_DIR)
snapshots = sorted(SNAPSHOT_DIR.glob("*.json"))
diff, target = build_diff(snapshots, threshold=args.threshold, max_movers=args.max_movers)
if target:
print(f"Wrote diff to {target}")
else:
print("No snapshots available to diff")
prune_snapshots(snapshots, lookback_days=args.lookback)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+299
View File
@@ -0,0 +1,299 @@
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
from utils import (
API_DIR,
DATA_DIR,
DOCS_DIR,
SNAPSHOT_DIR,
load_json,
save_json,
today_str,
ensure_dirs,
load_poc_index,
slugify,
stable_unique,
)
KEV_PATH = DATA_DIR / "kev.json"
EPSS_PATH = DATA_DIR / "epss.json"
DEFAULT_TOP_KEV = 75
DEFAULT_HIGH_EPSS_LIMIT = 250
DEFAULT_HIGH_EPSS_THRESHOLD = 0.5
def load_inputs(kev_path: Path, epss_path: Path) -> Tuple[Dict, Dict]:
kev_data = load_json(kev_path, default={}) or {}
epss_data = load_json(epss_path, default={}) or {}
return kev_data, epss_data
def enrich_kev(kev_items: List[Dict], epss_lookup: Dict[str, Dict], poc_index: Dict[str, Dict]) -> List[Dict]:
enriched = []
for entry in kev_items:
cve = entry.get("cve") or entry.get("cveID") or ""
if not cve:
continue
cve = cve.upper()
epss_info = epss_lookup.get(cve, {})
poc_count = len(poc_index.get(cve, {}).get("poc", []))
enriched.append(
{
"cve": cve,
"vendor": entry.get("vendor") or entry.get("vendorProject", ""),
"product": entry.get("product", ""),
"date_added": entry.get("date_added") or entry.get("dateAdded"),
"due_date": entry.get("due_date") or entry.get("dueDate"),
"short_description": entry.get("short_description") or entry.get("shortDescription", ""),
"required_action": entry.get("required_action") or entry.get("requiredAction", ""),
"notes": entry.get("notes", ""),
"epss": epss_info.get("epss"),
"percentile": epss_info.get("percentile"),
"poc_count": poc_count,
}
)
enriched.sort(key=lambda row: (-float(row.get("percentile") or 0), row["cve"]))
return enriched
def build_epss_lookup(epss_items: List[Dict]) -> Dict[str, Dict]:
return {row.get("cve", "").upper(): row for row in epss_items if row.get("cve")}
def build_high_epss_not_in_kev(
epss_items: List[Dict],
kev_set: Set[str],
poc_index: Dict[str, Dict],
*,
threshold: float,
limit: int,
) -> List[Dict]:
output: List[Dict] = []
for row in epss_items:
cve = row.get("cve", "").upper()
if not cve or cve in kev_set:
continue
epss_score = row.get("epss") or 0.0
if epss_score < threshold:
continue
poc_count = len(poc_index.get(cve, {}).get("poc", []))
output.append(
{
"cve": cve,
"epss": row.get("epss"),
"percentile": row.get("percentile"),
"poc_count": poc_count,
}
)
if len(output) >= limit:
break
return output
def build_cve_details(
kev_enriched: Iterable[Dict],
high_epss: Iterable[Dict],
poc_index: Dict[str, Dict],
) -> Dict[str, Dict]:
details: Dict[str, Dict] = {}
def ensure_detail(cve: str) -> Dict:
if cve not in details:
data = poc_index.get(cve, {})
details[cve] = {
"cve": cve,
"description": data.get("desc", ""),
"poc_links": data.get("poc", []),
"poc_count": len(data.get("poc", [])),
"kev": None,
"epss": None,
"percentile": None,
"vendor": None,
"product": None,
}
return details[cve]
for entry in kev_enriched:
cve = entry["cve"]
detail = ensure_detail(cve)
detail.update(
{
"kev": {
"date_added": entry.get("date_added"),
"due_date": entry.get("due_date"),
"short_description": entry.get("short_description"),
"required_action": entry.get("required_action"),
"notes": entry.get("notes"),
},
"epss": entry.get("epss"),
"percentile": entry.get("percentile"),
"vendor": entry.get("vendor"),
"product": entry.get("product"),
}
)
for entry in high_epss:
cve = entry["cve"]
detail = ensure_detail(cve)
if detail.get("epss") is None:
detail["epss"] = entry.get("epss")
detail["percentile"] = entry.get("percentile")
return details
def build_vendor_map(details: Dict[str, Dict]) -> Dict[str, Dict]:
vendors: Dict[str, Dict] = {}
for detail in details.values():
vendor_name = detail.get("vendor")
if not vendor_name:
continue
slug = slugify(vendor_name)
entry = vendors.setdefault(slug, {"vendor": vendor_name, "cves": []})
entry["cves"].append(detail["cve"])
for value in vendors.values():
value["cves"].sort()
return dict(sorted(vendors.items(), key=lambda kv: kv[0]))
def truncate_description(text: str, limit: int = 220) -> str:
if not text:
return ""
text = " ".join(text.split())
return text if len(text) <= limit else text[: limit - 3].rstrip() + "..."
def build_joined(
kev_data: Dict,
epss_data: Dict,
poc_index: Dict[str, Dict],
*,
top_kev: int = DEFAULT_TOP_KEV,
high_epss_threshold: float = DEFAULT_HIGH_EPSS_THRESHOLD,
high_epss_limit: int = DEFAULT_HIGH_EPSS_LIMIT,
extra_cves: Iterable[str] | None = None,
) -> Dict:
kev_items = kev_data.get("items") or []
epss_items = epss_data.get("items") or []
epss_lookup = build_epss_lookup(epss_items)
kev_enriched = enrich_kev(kev_items, epss_lookup, poc_index)
kev_top = kev_enriched[:top_kev]
kev_set = {row["cve"] for row in kev_enriched}
high_epss = build_high_epss_not_in_kev(epss_items, kev_set, poc_index, threshold=high_epss_threshold, limit=high_epss_limit)
details = build_cve_details(kev_top, high_epss, poc_index)
if extra_cves:
extra_set = {cve.upper() for cve in extra_cves}
epss_lookup = build_epss_lookup(epss_items)
kev_lookup = {row["cve"]: row for row in kev_enriched}
for cve in sorted(extra_set):
if cve in details:
continue
epss_row = epss_lookup.get(cve, {})
kev_row = kev_lookup.get(cve)
details[cve] = {
"cve": cve,
"description": poc_index.get(cve, {}).get("desc", ""),
"poc_links": poc_index.get(cve, {}).get("poc", []),
"poc_count": len(poc_index.get(cve, {}).get("poc", [])),
"kev": None,
"epss": epss_row.get("epss"),
"percentile": epss_row.get("percentile"),
"vendor": None,
"product": None,
}
if kev_row:
details[cve]["kev"] = {
"date_added": kev_row.get("date_added"),
"due_date": kev_row.get("due_date"),
"short_description": kev_row.get("short_description"),
"required_action": kev_row.get("required_action"),
"notes": kev_row.get("notes"),
}
details[cve]["vendor"] = kev_row.get("vendor")
details[cve]["product"] = kev_row.get("product")
vendors = build_vendor_map(details)
# add display summary
for collection in (kev_top, high_epss):
for row in collection:
desc = poc_index.get(row["cve"], {}).get("desc") or ""
row["summary"] = truncate_description(desc)
joined = {
"generated": today_str(),
"kev_top": kev_top,
"high_epss": high_epss,
}
return {
"joined": joined,
"kev_enriched": kev_enriched,
"epss_items": epss_items,
"details": details,
"vendors": vendors,
}
def write_api_outputs(payload: Dict, *, api_dir: Path = API_DIR) -> None:
ensure_dirs(api_dir, api_dir / "cve", SNAPSHOT_DIR)
joined = payload["joined"]
save_json(api_dir / "kev.json", {"generated": joined["generated"], "items": payload["kev_enriched"]})
save_json(
api_dir / "epss_top.json",
{
"generated": joined["generated"],
"items": payload["joined"]["high_epss"],
},
)
save_json(api_dir / "joined_top.json", joined)
for cve, detail in payload["details"].items():
save_json(api_dir / "cve" / f"{cve}.json", detail)
def main() -> int:
parser = argparse.ArgumentParser(description="Join KEV and EPSS with PoC data")
parser.add_argument("--kev", type=Path, default=KEV_PATH, help="Path to KEV JSON")
parser.add_argument("--epss", type=Path, default=EPSS_PATH, help="Path to EPSS JSON")
parser.add_argument("--top-kev", type=int, default=DEFAULT_TOP_KEV, help="How many KEV rows to surface on top list")
parser.add_argument(
"--high-epss-threshold",
type=float,
default=DEFAULT_HIGH_EPSS_THRESHOLD,
help="Minimum EPSS to include when selecting high EPSs CVEs",
)
parser.add_argument(
"--high-epss-limit",
type=int,
default=DEFAULT_HIGH_EPSS_LIMIT,
help="Maximum number of high EPSs CVEs to keep",
)
args = parser.parse_args()
poc_index = load_poc_index()
kev_data, epss_data = load_inputs(args.kev, args.epss)
payload = build_joined(
kev_data,
epss_data,
poc_index,
top_kev=args.top_kev,
high_epss_threshold=args.high_epss_threshold,
high_epss_limit=args.high_epss_limit,
)
write_api_outputs(payload)
print("Generated joined JSON endpoints under docs/api/v1/")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+105
View File
@@ -0,0 +1,105 @@
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Dict, Tuple
from jinja2 import Environment, FileSystemLoader, select_autoescape
from utils import (
API_DIR,
DOCS_DIR,
TEMPLATES_DIR,
ensure_dirs,
load_json,
load_poc_index,
parse_trending_from_readme,
save_json,
)
from build_joined import build_joined, write_api_outputs
from build_diffs import build_diff, prune_snapshots
KEV_DATA = DOCS_DIR.parent / "data" / "kev.json"
EPSS_DATA = DOCS_DIR.parent / "data" / "epss.json"
README_PATH = DOCS_DIR.parent / "README.md"
def build_env() -> Environment:
loader = FileSystemLoader(str(TEMPLATES_DIR))
env = Environment(loader=loader, autoescape=select_autoescape(["html", "xml"]))
env.trim_blocks = True
env.lstrip_blocks = True
return env
def render(env: Environment, template_name: str, context: Dict, output_path: Path) -> None:
html = env.get_template(template_name).render(**context)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(html, encoding="utf-8")
def load_joined() -> Dict:
kev = load_json(KEV_DATA, default={})
epss = load_json(EPSS_DATA, default={})
poc_index = load_poc_index()
payload = build_joined(kev, epss, poc_index)
write_api_outputs(payload)
return payload
def write_snapshot(joined: Dict) -> Path:
snapshot_path = API_DIR / "snapshots" / f"{joined['generated']}.json"
ensure_dirs(snapshot_path.parent)
save_json(snapshot_path, joined)
return snapshot_path
def build_pages(env: Environment, data: Dict, diff: Dict | None = None) -> None:
joined = data["joined"]
details = data["details"]
vendors = data["vendors"]
trending = parse_trending_from_readme(README_PATH)
common_ctx = {"generated": joined["generated"]}
render(
env,
"index.html",
{**common_ctx, "data": joined, "trending": trending, "diff": diff or {}},
DOCS_DIR / "index.html",
)
render(env, "kev.html", {**common_ctx, "kev": data["kev_enriched"]}, DOCS_DIR / "kev" / "index.html")
render(env, "epss.html", {**common_ctx, "epss": joined["high_epss"]}, DOCS_DIR / "epss" / "index.html")
render(env, "diffs.html", {**common_ctx, "diff": diff or {}}, DOCS_DIR / "diffs" / "index.html")
for cve, detail in details.items():
render(env, "cve.html", {**common_ctx, "cve": detail}, DOCS_DIR / "cve" / f"{cve}.html")
for slug, vendor in vendors.items():
cve_details = [details[cve] for cve in vendor["cves"] if cve in details]
render(env, "vendor.html", {**common_ctx, "vendor": vendor, "cves": cve_details}, DOCS_DIR / "vendors" / f"{slug}.html")
def main() -> int:
parser = argparse.ArgumentParser(description="Build static site and JSON")
args = parser.parse_args()
ensure_dirs(DOCS_DIR, DOCS_DIR / "cve", DOCS_DIR / "vendors", DOCS_DIR / "kev", DOCS_DIR / "epss", DOCS_DIR / "diffs")
env = build_env()
data = load_joined()
# snapshot + diff before rendering so dashboard can show it
snapshot_path = write_snapshot(data["joined"])
snapshots = sorted((API_DIR / "snapshots").glob("*.json"))
diff, target = build_diff(snapshots, threshold=0.5, max_movers=50)
prune_snapshots(snapshots, lookback_days=14)
build_pages(env, data, diff)
# build daily diff after snapshot is written
print("Site generated under docs/")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+93
View File
@@ -0,0 +1,93 @@
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Dict, List
import requests
from utils import DATA_DIR, maybe_float, save_json, today_str
API_URL = "https://api.first.org/data/v1/epss"
DEFAULT_LIMIT = 2000
DEFAULT_BATCH = 1000
def fetch_batch(offset: int, limit: int) -> Dict:
params = {
"offset": offset,
"limit": limit,
"sort": "epss",
"order": "desc",
}
response = requests.get(API_URL, params=params, timeout=30)
response.raise_for_status()
return response.json()
def normalise_rows(raw_rows: List[Dict]) -> List[Dict]:
normalised = []
for row in raw_rows:
cve = str(row.get("cve", "")).upper()
if not cve:
continue
epss = maybe_float(row.get("epss"))
pct = maybe_float(row.get("percentile"))
normalised.append(
{
"cve": cve,
"epss": epss,
"percentile": pct,
"date": row.get("date"),
}
)
return normalised
def fetch_epss(limit: int = DEFAULT_LIMIT, batch_size: int = DEFAULT_BATCH) -> Dict:
rows: List[Dict] = []
offset = 0
while offset < limit:
size = min(batch_size, limit - offset)
payload = fetch_batch(offset, size)
data_rows = payload.get("data") or []
rows.extend(normalise_rows(data_rows))
if len(data_rows) < size:
break
offset += size
rows.sort(key=lambda row: (-row.get("epss", 0.0), row["cve"]))
return {
"source": API_URL,
"fetched": today_str(),
"count": len(rows),
"limit": limit,
"items": rows,
}
def main() -> int:
parser = argparse.ArgumentParser(description="Fetch EPSS top list")
parser.add_argument("--limit", type=int, default=DEFAULT_LIMIT, help="Number of EPSS rows to fetch")
parser.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH,
help="Batch size for paginated EPSS API calls",
)
parser.add_argument(
"--output",
type=Path,
default=DATA_DIR / "epss.json",
help="Where to store the downloaded EPSS JSON",
)
args = parser.parse_args()
payload = fetch_epss(args.limit, args.batch_size)
save_json(args.output, payload)
print(f"Saved {payload['count']} EPSS rows to {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+60
View File
@@ -0,0 +1,60 @@
from __future__ import annotations
import argparse
from pathlib import Path
from utils import DATA_DIR, fetch_json, save_json, today_str
DEFAULT_SOURCE = "https://raw.githubusercontent.com/cisagov/kev-data/main/known_exploited_vulnerabilities.json"
def fetch_kev(source: str = DEFAULT_SOURCE) -> dict:
data = fetch_json(source)
items = data.get("vulnerabilities") or data.get("data") or data
normalised = []
for entry in items:
cve_id = (entry.get("cveID") or "").upper()
if not cve_id:
continue
normalised.append(
{
"cve": cve_id,
"vendor": entry.get("vendorProject", "").strip(),
"product": entry.get("product", "").strip(),
"date_added": entry.get("dateAdded"),
"due_date": entry.get("dueDate"),
"short_description": entry.get("shortDescription", "").strip(),
"required_action": entry.get("requiredAction", "").strip(),
"notes": entry.get("notes", "").strip(),
}
)
normalised.sort(key=lambda row: row["cve"])
return {
"source": source,
"fetched": today_str(),
"count": len(normalised),
"items": normalised,
}
def main() -> int:
parser = argparse.ArgumentParser(description="Fetch CISA KEV catalogue")
parser.add_argument("--source", default=DEFAULT_SOURCE, help="KEV JSON source URL")
parser.add_argument(
"--output",
type=Path,
default=DATA_DIR / "kev.json",
help="Where to store the downloaded KEV JSON",
)
args = parser.parse_args()
payload = fetch_kev(args.source)
save_json(args.output, payload)
print(f"Saved {payload['count']} KEV entries to {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+201
View File
@@ -0,0 +1,201 @@
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import requests
ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = ROOT / "data"
DOCS_DIR = ROOT / "docs"
API_DIR = DOCS_DIR / "api" / "v1"
SNAPSHOT_DIR = API_DIR / "snapshots"
TEMPLATES_DIR = ROOT / "templates"
ASSETS_DIR = DOCS_DIR / "assets"
def ensure_dirs(*paths: Path) -> None:
for path in paths:
path.mkdir(parents=True, exist_ok=True)
def load_json(path: Path, default=None):
if not path.exists():
return default
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def save_json(path: Path, data, *, sort_keys: bool = True) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, ensure_ascii=False, indent=2, sort_keys=sort_keys)
def fetch_json(url: str, *, timeout: int = 30, headers: Optional[Dict[str, str]] = None):
response = requests.get(url, timeout=timeout, headers=headers or {})
response.raise_for_status()
return response.json()
def today_str() -> str:
return datetime.now(timezone.utc).date().isoformat()
def slugify(text: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9]+", "-", text.strip().lower())
cleaned = cleaned.strip("-")
return cleaned or "unknown"
def stable_unique(items: Iterable[str]) -> List[str]:
seen = set()
output = []
for item in items:
if item and item not in seen:
seen.add(item)
output.append(item)
return output
def maybe_float(value: str | float | int | None) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
# --- PoC data helpers ----------------------------------------------------
CVE_SECTION_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE)
def load_poc_index() -> Dict[str, Dict[str, object]]:
"""Load CVE → {desc, poc} mapping from docs/CVE_list.json or markdown files."""
cve_json = DOCS_DIR / "CVE_list.json"
if cve_json.exists():
data = load_json(cve_json, default=[]) or []
mapping = {}
for entry in data:
cve = str(entry.get("cve", "")).upper()
if not is_valid_cve(cve):
continue
mapping[cve] = {
"desc": entry.get("desc", ""),
"poc": stable_unique(entry.get("poc", []) or []),
}
return mapping
return build_poc_index_from_markdown()
def build_poc_index_from_markdown() -> Dict[str, Dict[str, object]]:
mapping: Dict[str, Dict[str, object]] = {}
for md_path in sorted(ROOT.glob("[12][0-9][0-9][0-9]/CVE-*.md")):
cve = md_path.stem.upper()
if not is_valid_cve(cve):
continue
desc, poc_links = parse_cve_markdown(md_path)
mapping[cve] = {"desc": desc, "poc": poc_links}
return mapping
def parse_cve_markdown(path: Path) -> Tuple[str, List[str]]:
text = path.read_text(encoding="utf-8")
sections = parse_sections(text)
description = normalise_block(sections.get("### Description", ""))
references = collect_links(sections.get("#### Reference", ""))
github_links = collect_links(sections.get("#### Github", ""))
poc_links = stable_unique([*references, *github_links])
return description, poc_links
def normalise_block(text: str) -> str:
text = text.replace("\r\n", "\n")
text = re.sub(r"\n{2,}", "\n", text.strip())
lines = [line.lstrip("- ").rstrip() for line in text.split("\n")]
return "\n".join(line for line in lines if line)
def parse_sections(content: str) -> Dict[str, str]:
sections: Dict[str, str] = {}
current: Optional[str] = None
buffer: List[str] = []
for raw_line in content.splitlines():
line = raw_line.strip()
if line.startswith("### ") or line.startswith("#### "):
if current is not None:
sections[current] = "\n".join(buffer).strip()
current = line
buffer = []
else:
buffer.append(raw_line)
if current is not None:
sections[current] = "\n".join(buffer).strip()
return sections
def collect_links(block: str) -> List[str]:
links: List[str] = []
for raw in block.splitlines():
entry = raw.strip()
if not entry or "No PoCs" in entry:
continue
if entry.startswith("- "):
entry = entry[2:].strip()
if entry and entry not in links:
links.append(entry)
return links
def is_valid_cve(cve_id: str) -> bool:
parts = cve_id.split("-")
if len(parts) != 3:
return False
year = parts[1]
return year.isdigit() and parts[2].isdigit()
# --- Trending PoCs -------------------------------------------------------
TREND_ROW_RE = re.compile(r"^\|\s*(?P<stars>\d+)\s*⭐\s*\|\s*(?P<updated>[^|]+)\|\s*\[(?P<name>[^\]]+)\]\((?P<url>[^)]+)\)\s*\|\s*(?P<desc>.*)\|$")
def parse_trending_from_readme(readme_path: Path) -> List[Dict[str, str]]:
if not readme_path.exists():
return []
results: List[Dict[str, str]] = []
current_year: Optional[str] = None
for line in readme_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("## ") and line[3:].strip().isdigit():
current_year = line[3:].strip()
continue
match = TREND_ROW_RE.match(line)
if match and current_year:
entry = match.groupdict()
entry["year"] = current_year
results.append(entry)
# Keep deterministic order (README already ordered newest first)
return results
# --- Misc helpers --------------------------------------------------------
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8") if path.exists() else ""
def write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")