Files
CVEs-PoC/scripts/utils.py
2025-12-17 13:59:41 +01:00

202 lines
6.0 KiB
Python

from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import requests
ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = ROOT / "data"
DOCS_DIR = ROOT / "docs"
API_DIR = DOCS_DIR / "api" / "v1"
SNAPSHOT_DIR = API_DIR / "snapshots"
TEMPLATES_DIR = ROOT / "templates"
ASSETS_DIR = DOCS_DIR / "assets"
def ensure_dirs(*paths: Path) -> None:
for path in paths:
path.mkdir(parents=True, exist_ok=True)
def load_json(path: Path, default=None):
if not path.exists():
return default
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def save_json(path: Path, data, *, sort_keys: bool = True) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, ensure_ascii=False, indent=2, sort_keys=sort_keys)
def fetch_json(url: str, *, timeout: int = 30, headers: Optional[Dict[str, str]] = None):
response = requests.get(url, timeout=timeout, headers=headers or {})
response.raise_for_status()
return response.json()
def today_str() -> str:
return datetime.now(timezone.utc).date().isoformat()
def slugify(text: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9]+", "-", text.strip().lower())
cleaned = cleaned.strip("-")
return cleaned or "unknown"
def stable_unique(items: Iterable[str]) -> List[str]:
seen = set()
output = []
for item in items:
if item and item not in seen:
seen.add(item)
output.append(item)
return output
def maybe_float(value: str | float | int | None) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
# --- PoC data helpers ----------------------------------------------------
CVE_SECTION_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE)
def load_poc_index() -> Dict[str, Dict[str, object]]:
"""Load CVE → {desc, poc} mapping from docs/CVE_list.json or markdown files."""
cve_json = DOCS_DIR / "CVE_list.json"
if cve_json.exists():
data = load_json(cve_json, default=[]) or []
mapping = {}
for entry in data:
cve = str(entry.get("cve", "")).upper()
if not is_valid_cve(cve):
continue
mapping[cve] = {
"desc": entry.get("desc", ""),
"poc": stable_unique(entry.get("poc", []) or []),
}
return mapping
return build_poc_index_from_markdown()
def build_poc_index_from_markdown() -> Dict[str, Dict[str, object]]:
mapping: Dict[str, Dict[str, object]] = {}
for md_path in sorted(ROOT.glob("[12][0-9][0-9][0-9]/CVE-*.md")):
cve = md_path.stem.upper()
if not is_valid_cve(cve):
continue
desc, poc_links = parse_cve_markdown(md_path)
mapping[cve] = {"desc": desc, "poc": poc_links}
return mapping
def parse_cve_markdown(path: Path) -> Tuple[str, List[str]]:
text = path.read_text(encoding="utf-8")
sections = parse_sections(text)
description = normalise_block(sections.get("### Description", ""))
references = collect_links(sections.get("#### Reference", ""))
github_links = collect_links(sections.get("#### Github", ""))
poc_links = stable_unique([*references, *github_links])
return description, poc_links
def normalise_block(text: str) -> str:
text = text.replace("\r\n", "\n")
text = re.sub(r"\n{2,}", "\n", text.strip())
lines = [line.lstrip("- ").rstrip() for line in text.split("\n")]
return "\n".join(line for line in lines if line)
def parse_sections(content: str) -> Dict[str, str]:
sections: Dict[str, str] = {}
current: Optional[str] = None
buffer: List[str] = []
for raw_line in content.splitlines():
line = raw_line.strip()
if line.startswith("### ") or line.startswith("#### "):
if current is not None:
sections[current] = "\n".join(buffer).strip()
current = line
buffer = []
else:
buffer.append(raw_line)
if current is not None:
sections[current] = "\n".join(buffer).strip()
return sections
def collect_links(block: str) -> List[str]:
links: List[str] = []
for raw in block.splitlines():
entry = raw.strip()
if not entry or "No PoCs" in entry:
continue
if entry.startswith("- "):
entry = entry[2:].strip()
if entry and entry not in links:
links.append(entry)
return links
def is_valid_cve(cve_id: str) -> bool:
parts = cve_id.split("-")
if len(parts) != 3:
return False
year = parts[1]
return year.isdigit() and parts[2].isdigit()
# --- Trending PoCs -------------------------------------------------------
TREND_ROW_RE = re.compile(r"^\|\s*(?P<stars>\d+)\s*⭐\s*\|\s*(?P<updated>[^|]+)\|\s*\[(?P<name>[^\]]+)\]\((?P<url>[^)]+)\)\s*\|\s*(?P<desc>.*)\|$")
def parse_trending_from_readme(readme_path: Path) -> List[Dict[str, str]]:
if not readme_path.exists():
return []
results: List[Dict[str, str]] = []
current_year: Optional[str] = None
for line in readme_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("## ") and line[3:].strip().isdigit():
current_year = line[3:].strip()
continue
match = TREND_ROW_RE.match(line)
if match and current_year:
entry = match.groupdict()
entry["year"] = current_year
results.append(entry)
# Keep deterministic order (README already ordered newest first)
return results
# --- Misc helpers --------------------------------------------------------
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8") if path.exists() else ""
def write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")