Simplify site to search + trending

This commit is contained in:
0xMarcio
2025-12-18 07:13:08 +01:00
parent 8f76f94f78
commit 5c78e585a1
35 changed files with 286 additions and 4247 deletions
+1 -1
View File
@@ -7,7 +7,7 @@ pip install -r requirements.txt
python scripts/fetch_kev.py
python scripts/fetch_epss.py
python scripts/build_site.py
python scripts/build_all.py # new PoC discovery + scoring pipeline
python scripts/build_all.py
```
Outputs land in `docs/` and JSON under `docs/api/v1/`. Snapshots live in `docs/api/v1/snapshots/` (last 14 days) and diffs under `docs/api/v1/diffs/`.
+1 -5
View File
@@ -19,7 +19,6 @@ from pipeline_outputs import (
write_top,
)
from poc_pipeline import PoCPipeline, build_scope, persist_evidence
from site_renderer import SiteRenderer
from utils import API_DIR, DOCS_DIR, load_json
@@ -106,10 +105,7 @@ def main(argv: List[str] | None = None) -> int:
prune_old_snapshots()
prune_old_diffs()
renderer = SiteRenderer(results=results, index_payload=index_payload, top_payload=top_payload, diff_payload=diff_payload)
renderer.build()
print(f"Generated site under {DOCS_DIR}")
print(f"Wrote pipeline outputs under {DOCS_DIR}")
print(f"Wrote latest snapshot to {snapshot_path}")
return 0
+172 -141
View File
@@ -1,31 +1,22 @@
from __future__ import annotations
import argparse
from datetime import timedelta
from pathlib import Path
import json
import re
from typing import Dict, Tuple
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlparse
from jinja2 import Environment, FileSystemLoader, select_autoescape
from utils import (
API_DIR,
DOCS_DIR,
TEMPLATES_DIR,
ensure_dirs,
load_json,
load_poc_index,
parse_trending_from_readme,
save_json,
)
from utils import DOCS_DIR, TEMPLATES_DIR, ensure_dirs, load_blacklist, parse_trending_from_readme, is_blacklisted_repo
from build_joined import build_joined, write_api_outputs
from build_diffs import build_diff, prune_snapshots
KEV_DATA = DOCS_DIR.parent / "data" / "kev.json"
EPSS_DATA = DOCS_DIR.parent / "data" / "epss.json"
README_PATH = DOCS_DIR.parent / "README.md"
TRENDING_WINDOW = timedelta(days=4)
ROOT = DOCS_DIR.parent
README_PATH = ROOT / "README.md"
CVE_OUTPUT = DOCS_DIR / "CVE_list.json"
REMOVED_OUTPUT = DOCS_DIR / "CVE_blacklist_removed.json"
TRENDING_OUTPUT = DOCS_DIR / "trending_poc.json"
def build_env() -> Environment:
@@ -42,160 +33,200 @@ def render(env: Environment, template_name: str, context: Dict, output_path: Pat
output_path.write_text(html, encoding="utf-8")
def load_joined() -> Dict:
kev = load_json(KEV_DATA, default={})
epss = load_json(EPSS_DATA, default={})
poc_index = load_poc_index()
payload = build_joined(kev, epss, poc_index)
write_api_outputs(payload)
return payload
def normalise_block(text: str) -> str:
text = text.replace("\r\n", "\n")
text = re.sub(r"\n{2,}", "\n", text.strip())
lines = [line.lstrip("- ").rstrip() for line in text.split("\n")]
return "\n".join(line for line in lines if line)
def write_snapshot(joined: Dict) -> Path:
snapshot_path = API_DIR / "snapshots" / f"{joined['generated']}.json"
ensure_dirs(snapshot_path.parent)
save_json(snapshot_path, joined)
return snapshot_path
def parse_sections(content: str) -> Dict[str, str]:
sections: Dict[str, str] = {}
current_header: Optional[str] = None
buffer: List[str] = []
for line in content.splitlines():
header = line.strip()
if header.startswith("### ") or header.startswith("#### "):
if current_header is not None:
sections[current_header] = "\n".join(buffer).strip()
current_header = header
buffer = []
else:
buffer.append(line)
if current_header is not None:
sections[current_header] = "\n".join(buffer).strip()
return sections
def _parse_year(row: dict) -> int | None:
def repo_from_url(url: str) -> str:
try:
return int(row.get("year"))
except (TypeError, ValueError):
return None
parsed = urlparse(url)
host = (parsed.netloc or "").lower()
if host and "github" not in host:
return ""
path = parsed.path or url
except Exception:
path = url
parts = path.strip("/").split("/")
if len(parts) >= 2:
return parts[1].lower()
return (parts[-1] if parts else "").lower()
def _age_from_label(label: str) -> timedelta | None:
text = (label or "").strip().lower()
if text == "just now":
return timedelta()
match = re.match(r"(?P<value>\d+)\s+(?P<unit>minute|minutes|hour|hours|day|days)\s+ago", text)
if not match:
return None
value = int(match.group("value"))
unit = match.group("unit")
if unit.startswith("minute"):
return timedelta(minutes=value)
if unit.startswith("hour"):
return timedelta(hours=value)
return timedelta(days=value)
def _is_current_year_name(name: str, year: int) -> bool:
return bool(re.search(rf"cve-{year}-\d+", name or "", re.IGNORECASE))
def select_trending(readme_rows: list[dict]) -> list[dict]:
"""Pick up to 20 entries from the newest year table, filtered to last 4 days, with descriptions, matching the current year."""
if not readme_rows:
return []
years = [yr for yr in (_parse_year(row) for row in readme_rows) if yr is not None]
if not years:
return []
latest_year = max(years)
filtered: list[tuple[dict, timedelta]] = []
for row in readme_rows:
if _parse_year(row) != latest_year:
def is_blacklisted(url: str, blacklist: List[str]) -> bool:
repo = repo_from_url(url)
if not repo:
return False
for entry in blacklist:
slug = entry.lower()
if not slug:
continue
if not _is_current_year_name(row.get("name", ""), latest_year):
if slug.endswith("*"):
if repo.startswith(slug[:-1]):
return True
elif repo == slug:
return True
return False
def collect_links(block: str, *, blacklist: Optional[List[str]] = None, removed: Optional[List[str]] = None) -> List[str]:
links: List[str] = []
blacklist = blacklist or []
if removed is None:
removed = []
for raw in block.splitlines():
entry = raw.strip()
if not entry or "No PoCs" in entry:
continue
if not (row.get("desc") or "").strip():
if entry.startswith("- "):
entry = entry[2:].strip()
if not entry:
continue
age = _age_from_label(row.get("updated", ""))
if age is None or age > TRENDING_WINDOW:
if is_blacklisted(entry, blacklist):
removed.append(entry)
continue
filtered.append((row, age))
# Sort by freshness then stars
filtered.sort(key=lambda pair: (pair[1], -int(pair[0].get("stars") or 0)))
selected: list[dict] = []
for row, _age in filtered[:20]:
try:
stars = int(row.get("stars") or 0)
except (TypeError, ValueError):
stars = 0
selected.append(
{
"stars": stars,
"updated": (row.get("updated") or "").strip(),
"name": (row.get("name") or "").strip(),
"url": (row.get("url") or "").strip(),
"desc": (row.get("desc") or "").strip(),
"year": latest_year,
}
)
return selected
if entry not in links:
links.append(entry)
return links
def build_pages(env: Environment, data: Dict, diff: Dict | None = None, html_mode: str = "summary") -> None:
joined = data["joined"]
details = data["details"]
vendors = data["vendors"]
def build_cve_list(blacklist: List[str]) -> Dict[str, object]:
cve_entries = []
removed_by_cve: Dict[str, List[str]] = {}
removed_seen: set[str] = set()
trending_raw = parse_trending_from_readme(README_PATH)
trending = select_trending(trending_raw)
recent_kev = (diff or {}).get("new_kev_entries") or []
metrics = {
"kev_total": len(data["kev_enriched"]),
"high_epss_count": len(joined["high_epss"]),
"recent_kev_count": len(recent_kev),
for md_path in sorted(ROOT.glob("[12][0-9][0-9][0-9]/CVE-*.md")):
content = md_path.read_text(encoding="utf-8")
sections = parse_sections(content)
description = normalise_block(sections.get("### Description", ""))
removed_links: List[str] = []
references = collect_links(sections.get("#### Reference", ""), blacklist=blacklist, removed=removed_links)
github_links = collect_links(sections.get("#### Github", ""), blacklist=blacklist, removed=removed_links)
poc_entries: List[str] = []
seen = set()
for link in references + github_links:
if link not in seen:
poc_entries.append(link)
seen.add(link)
cve_id = md_path.stem
if removed_links:
removed_by_cve[cve_id] = sorted(set(removed_links))
removed_seen.update(removed_links)
if not poc_entries:
continue
cve_entries.append({
"cve": cve_id,
"desc": description,
"poc": poc_entries,
})
return {
"entries": cve_entries,
"removed": {
"removed": sorted(removed_seen),
"by_cve": removed_by_cve,
},
}
if html_mode in {"summary", "all"}:
common_ctx = {"generated": joined["generated"], "metrics": metrics, "recent_kev": recent_kev}
render(
env,
"index.html",
{**common_ctx, "data": joined, "trending": trending, "diff": diff or {}},
DOCS_DIR / "index.html",
)
render(env, "kev.html", {**common_ctx, "kev": data["kev_enriched"]}, DOCS_DIR / "kev" / "index.html")
render(env, "epss.html", {**common_ctx, "epss": joined["high_epss"]}, DOCS_DIR / "epss" / "index.html")
render(env, "diffs.html", {**common_ctx, "diff": diff or {}}, DOCS_DIR / "diffs" / "index.html")
if html_mode == "all":
common_ctx = {"generated": joined["generated"]}
for cve, detail in details.items():
render(env, "cve.html", {**common_ctx, "cve": detail}, DOCS_DIR / "cve" / f"{cve}.html")
def build_trending(blacklist: List[str]) -> List[Dict[str, object]]:
rows = parse_trending_from_readme(README_PATH)
if not rows:
return []
for slug, vendor in vendors.items():
cve_details = [details[cve] for cve in vendor["cves"] if cve in details]
render(env, "vendor.html", {**common_ctx, "vendor": vendor, "cves": cve_details}, DOCS_DIR / "vendors" / f"{slug}.html")
by_year: Dict[int, List[Dict[str, object]]] = {}
for row in rows:
year_text = row.get("year") or ""
if not str(year_text).isdigit():
continue
year = int(year_text)
url = (row.get("url") or "").strip()
if url and is_blacklisted_repo(url, blacklist):
continue
stars_text = str(row.get("stars") or "").strip()
stars = int(re.sub(r"\D", "", stars_text) or 0)
item = {
"year": year,
"stars": stars,
"updated": (row.get("updated") or "").strip(),
"name": (row.get("name") or "").strip(),
"url": url,
"desc": (row.get("desc") or "").strip(),
}
by_year.setdefault(year, []).append(item)
if not by_year:
return []
current_year = datetime.now(timezone.utc).year
target_year = current_year if current_year in by_year else max(by_year)
return by_year.get(target_year, [])
def write_json(path: Path, data, *, indent: Optional[int] = None) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, ensure_ascii=False, indent=indent)
def main() -> int:
parser = argparse.ArgumentParser(description="Build static site and JSON")
parser = argparse.ArgumentParser(description="Build CVE PoC site")
parser.add_argument(
"--html-mode",
choices=["none", "summary", "all"],
default="none",
help="Render no HTML, summary pages only, or all pages including per-CVE.",
default="summary",
help="Render HTML or skip it.",
)
args = parser.parse_args()
ensure_dirs(DOCS_DIR, DOCS_DIR / "kev", DOCS_DIR / "epss", DOCS_DIR / "diffs")
ensure_dirs(DOCS_DIR)
blacklist = load_blacklist()
data = load_joined()
# snapshot + diff before rendering so dashboard can show it
snapshot_path = write_snapshot(data["joined"])
snapshots = sorted((API_DIR / "snapshots").glob("*.json"))
diff, target = build_diff(
snapshots,
kev_full=data["kev_enriched"],
threshold=0.05,
max_movers=50,
recent_days=30,
cve_payload = build_cve_list(blacklist)
write_json(CVE_OUTPUT, cve_payload["entries"])
write_json(REMOVED_OUTPUT, cve_payload["removed"], indent=2)
trending_items = build_trending(blacklist)
write_json(
TRENDING_OUTPUT,
{
"generated": datetime.now(timezone.utc).isoformat(),
"items": trending_items,
},
indent=2,
)
prune_snapshots(snapshots, lookback_days=14)
if args.html_mode != "none":
env = build_env()
build_pages(env, data, diff, html_mode=args.html_mode)
render(env, "index.html", {"trending": trending_items}, DOCS_DIR / "index.html")
# build daily diff after snapshot is written
print("Site generated under docs/")
return 0
-99
View File
@@ -1,99 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict, List
from jinja2 import Environment, FileSystemLoader, select_autoescape
from utils import DOCS_DIR, TEMPLATES_DIR, ensure_dirs
def build_env() -> Environment:
loader = FileSystemLoader(str(TEMPLATES_DIR))
env = Environment(loader=loader, autoescape=select_autoescape(["html", "xml"]))
env.trim_blocks = True
env.lstrip_blocks = True
return env
class SiteRenderer:
def __init__(
self,
*,
results: List[Dict],
index_payload: Dict,
top_payload: Dict,
diff_payload: Dict | None = None,
) -> None:
self.results = []
for result in results:
visible = [p for p in result.get("pocs", []) if p.get("confidence_tier") in {"high", "medium"}]
if not visible:
visible = result.get("pocs", [])
self.results.append({**result, "visible_pocs": visible})
self.index_payload = index_payload
self.top_payload = top_payload
self.diff_payload = diff_payload or {}
self.env = build_env()
ensure_dirs(
DOCS_DIR,
DOCS_DIR / "pocs",
DOCS_DIR / "cve",
DOCS_DIR / "diffs",
DOCS_DIR / "assets",
)
def render(self, template_name: str, context: Dict, target: Path) -> None:
html = self.env.get_template(template_name).render(**context)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(html, encoding="utf-8")
def build(self) -> None:
generated = self.index_payload.get("generated")
summary = {
"generated": generated,
"total_cves": len(self.index_payload.get("items", [])),
"total_pocs": sum(item.get("poc_count", 0) for item in self.index_payload.get("items", [])),
"high_total": sum(item.get("high_confidence", 0) for item in self.index_payload.get("items", [])),
"medium_total": sum(item.get("medium_confidence", 0) for item in self.index_payload.get("items", [])),
}
self.render(
"pipeline_index.html",
{
"summary": summary,
"top": self.top_payload.get("items", [])[:25],
"diff": self.diff_payload or {},
},
DOCS_DIR / "index.html",
)
self.render(
"pipeline_pocs.html",
{
"generated": generated,
"index": self.index_payload.get("items", []),
"top": self.top_payload.get("items", [])[:100],
},
DOCS_DIR / "pocs" / "index.html",
)
for result in self.results:
self.render(
"pipeline_cve.html",
{"cve": result, "generated": generated},
DOCS_DIR / "cve" / f"{result['cve_id']}.html",
)
if self.diff_payload:
diff_date = self.diff_payload.get("generated")
self.render(
"pipeline_diff.html",
{"diff": self.diff_payload, "generated": generated},
DOCS_DIR / "diffs" / "index.html",
)
if diff_date:
self.render(
"pipeline_diff.html",
{"diff": self.diff_payload, "generated": generated},
DOCS_DIR / "diffs" / f"{diff_date}.html",
)