"""
CVE and exploit search engine for NeuroSploitv2.
Extracts software versions from HTTP responses, queries NVD for known CVEs,
and searches GitHub for public exploit code. Fully async, self-contained.
"""
import asyncio
import logging
import re
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
try:
import aiohttp
except ImportError:
aiohttp = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# ── Dataclasses ───────────────────────────────────────────────────────────
@dataclass
class VersionInfo:
software: str
version: str
source: str # "server_header", "body", "meta_generator", etc.
@dataclass
class CVEResult:
cve_id: str
cvss_score: float
severity: str
description: str
cwe_id: str
affected_versions: str
published_date: str
@dataclass
class ExploitResult:
source: str # "github" or "exploitdb"
url: str
description: str
stars: int
language: str
@dataclass
class CVEFinding:
version_info: VersionInfo
cves: List[CVEResult] = field(default_factory=list)
exploits: List[ExploitResult] = field(default_factory=list)
# ── Regex patterns ────────────────────────────────────────────────────────
_SERVER_TOKEN_RE = re.compile(r"([A-Za-z][\w\.\-]*)/(\d+(?:\.\d+)+)")
_META_GENERATOR_RE = re.compile(
r']+name=["\']generator["\'][^>]+content=["\']([^"\']+)["\']', re.I)
_JS_LIB_RE = re.compile(
r"(jquery|react|angular|vue|bootstrap|lodash|moment|backbone)"
r"[\-@/]?(\d+(?:\.\d+)+)", re.I)
_WP_VERSION_RE = re.compile(r'content=["\']WordPress\s+([\d.]+)', re.I)
_DRUPAL_VERSION_RE = re.compile(r'Drupal\s+([\d.]+)', re.I)
_JOOMLA_VERSION_RE = re.compile(
r']+content=["\']Joomla!\s*-?\s*([\d.]+)', re.I)
_GENERIC_VERSION_RE = re.compile(
r"\b([A-Z][A-Za-z\-]+)\s+(?:version\s+)?v?(\d+\.\d+(?:\.\d+)?)\b")
_NVD_RPM_NO_KEY = 6
_NVD_RPM_WITH_KEY = 50
_REQUEST_TIMEOUT = 10
# ── CVEHunter ─────────────────────────────────────────────────────────────
class CVEHunter:
"""Async CVE and exploit search engine."""
def __init__(self, session=None, nvd_api_key=None, github_token=None):
self._external_session = session is not None
self._session = session
self._nvd_api_key = nvd_api_key
self._github_token = github_token
rpm = _NVD_RPM_WITH_KEY if nvd_api_key else _NVD_RPM_NO_KEY
self._nvd_min_interval = 60.0 / rpm
self._nvd_last_request: float = 0.0
async def _get_session(self) -> "aiohttp.ClientSession":
if aiohttp is None:
raise RuntimeError("aiohttp is required but not installed")
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=_REQUEST_TIMEOUT))
return self._session
async def close(self):
if not self._external_session and self._session and not self._session.closed:
await self._session.close()
# ── Version extraction ────────────────────────────────────────────
async def extract_versions(self, headers: Dict[str, str], body: str,
technologies: Optional[List[str]] = None) -> List[VersionInfo]:
seen: set[Tuple[str, str]] = set()
results: List[VersionInfo] = []
def _add(sw: str, ver: str, src: str):
key = (sw.lower(), ver)
if key not in seen:
seen.add(key)
results.append(VersionInfo(software=sw, version=ver, source=src))
# Server header
server = headers.get("server") or headers.get("Server") or ""
for m in _SERVER_TOKEN_RE.finditer(server):
_add(m.group(1), m.group(2), "server_header")
# X-Powered-By
xpb = headers.get("x-powered-by") or headers.get("X-Powered-By") or ""
for m in _SERVER_TOKEN_RE.finditer(xpb):
_add(m.group(1), m.group(2), "x_powered_by")
if xpb and not _SERVER_TOKEN_RE.search(xpb):
parts = xpb.strip().split("/", 1)
if len(parts) == 2 and re.match(r"\d", parts[1]):
_add(parts[0].strip(), parts[1].strip(), "x_powered_by")
# Meta generator tags
for m in _META_GENERATOR_RE.finditer(body):
gp = m.group(1).strip().rsplit(" ", 1)
if len(gp) == 2 and re.match(r"\d", gp[1]):
_add(gp[0], gp[1], "meta_generator")
# CMS-specific patterns
for m in _WP_VERSION_RE.finditer(body):
_add("WordPress", m.group(1), "body")
for m in _DRUPAL_VERSION_RE.finditer(body):
_add("Drupal", m.group(1), "body")
for m in _JOOMLA_VERSION_RE.finditer(body):
_add("Joomla", m.group(1), "body")
# JS libraries (jquery, react, angular, etc.)
for m in _JS_LIB_RE.finditer(body):
_add(m.group(1), m.group(2), "body")
# Generic "SoftwareName version X.Y.Z"
for m in _GENERIC_VERSION_RE.finditer(body):
_add(m.group(1), m.group(2), "body")
# Supplied technology list
for tech in (technologies or []):
tp = re.split(r"[\s/]+", tech.strip(), maxsplit=1)
if len(tp) == 2 and re.match(r"\d", tp[1]):
_add(tp[0], tp[1], "technology_list")
return [v for v in results if v.version]
# ── NVD search ────────────────────────────────────────────────────
async def _nvd_rate_limit(self):
elapsed = time.monotonic() - self._nvd_last_request
if elapsed < self._nvd_min_interval:
await asyncio.sleep(self._nvd_min_interval - elapsed)
self._nvd_last_request = time.monotonic()
async def search_nvd(self, software: str, version: str) -> List[CVEResult]:
"""Query NVD 2.0 API for CVEs matching software + version."""
session = await self._get_session()
await self._nvd_rate_limit()
params = {"keywordSearch": f"{software} {version}"}
hdrs: Dict[str, str] = {}
if self._nvd_api_key:
hdrs["apiKey"] = self._nvd_api_key
results: List[CVEResult] = []
try:
async with session.get("https://services.nvd.nist.gov/rest/json/cves/2.0",
params=params, headers=hdrs) as resp:
if resp.status == 403:
logger.warning("NVD rate limit hit (403). Backing off.")
await asyncio.sleep(30)
return results
if resp.status != 200:
logger.warning("NVD returned %d for %s %s", resp.status, software, version)
return results
data = await resp.json(content_type=None)
except asyncio.TimeoutError:
logger.warning("NVD request timed out for %s %s", software, version)
return results
except Exception as exc:
logger.warning("NVD request failed for %s %s: %s", software, version, exc)
return results
seen_ids: set[str] = set()
for item in data.get("vulnerabilities", []):
cve = item.get("cve", {})
cve_id = cve.get("id", "")
if not cve_id or cve_id in seen_ids:
continue
seen_ids.add(cve_id)
# CVSS: prefer v3.1 → v3.0 → v2
cvss_score, severity = 0.0, "UNKNOWN"
for mk in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2"):
ml = cve.get("metrics", {}).get(mk, [])
if ml:
cd = ml[0].get("cvssData", {})
cvss_score = cd.get("baseScore", 0.0)
severity = cd.get("baseSeverity", "UNKNOWN")
break
# English description
desc = next((d["value"] for d in cve.get("descriptions", [])
if d.get("lang") == "en"), "")
# CWE ID
cwe_id = ""
for w in cve.get("weaknesses", []):
for wd in w.get("description", []):
if wd.get("value", "").startswith("CWE-"):
cwe_id = wd["value"]
break
if cwe_id:
break
# Affected version ranges from configurations
vparts: List[str] = []
for cfg in cve.get("configurations", []):
for node in cfg.get("nodes", []):
for cm in node.get("cpeMatch", []):
vs = cm.get("versionStartIncluding", "")
ve = cm.get("versionEndIncluding", "")
vee = cm.get("versionEndExcluding", "")
if vs and ve: vparts.append(f"{vs}-{ve}")
elif vs and vee: vparts.append(f"{vs}-<{vee}")
elif ve: vparts.append(f"<={ve}")
elif vee: vparts.append(f"<{vee}")
results.append(CVEResult(
cve_id=cve_id, cvss_score=cvss_score, severity=severity.upper(),
description=desc[:500], cwe_id=cwe_id,
affected_versions=", ".join(vparts[:5]),
published_date=cve.get("published", "")[:10],
))
results.sort(key=lambda c: c.cvss_score, reverse=True)
return results
# ── GitHub exploit search ─────────────────────────────────────────
async def search_github_exploits(self, cve_id: str) -> List[ExploitResult]:
"""Search GitHub for public exploit repos matching a CVE ID."""
session = await self._get_session()
params = {"q": cve_id, "sort": "stars", "order": "desc", "per_page": "10"}
hdrs = {"Accept": "application/vnd.github.v3+json"}
if self._github_token:
hdrs["Authorization"] = f"token {self._github_token}"
results: List[ExploitResult] = []
try:
async with session.get("https://api.github.com/search/repositories",
params=params, headers=hdrs) as resp:
if resp.status != 200:
logger.warning("GitHub search returned %d for %s", resp.status, cve_id)
return results
data = await resp.json(content_type=None)
except asyncio.TimeoutError:
logger.warning("GitHub search timed out for %s", cve_id)
return results
except Exception as exc:
logger.warning("GitHub search failed for %s: %s", cve_id, exc)
return results
for repo in data.get("items", []):
results.append(ExploitResult(
source="github", url=repo.get("html_url", ""),
description=(repo.get("description") or "")[:300],
stars=repo.get("stargazers_count", 0),
language=repo.get("language") or "Unknown",
))
results.sort(key=lambda e: e.stars, reverse=True)
return results
# ── Full pipeline ─────────────────────────────────────────────────
async def hunt(self, headers: Dict[str, str], body: str,
technologies: Optional[List[str]] = None) -> List[CVEFinding]:
"""
Full pipeline: extract versions -> NVD lookup -> GitHub exploit search.
Returns findings sorted by highest CVSS score descending.
"""
versions = await self.extract_versions(headers, body, technologies or [])
if not versions:
logger.info("No software versions detected; nothing to hunt.")
return []
logger.info("Detected %d software versions, searching CVEs...", len(versions))
findings: List[CVEFinding] = []
seen_cves: set[str] = set()
for vi in versions:
cves = await self.search_nvd(vi.software, vi.version)
unique = [c for c in cves if c.cve_id not in seen_cves]
seen_cves.update(c.cve_id for c in unique)
if not unique:
continue
exploits: List[ExploitResult] = []
for c in unique:
exploits.extend(await self.search_github_exploits(c.cve_id))
findings.append(CVEFinding(version_info=vi, cves=unique, exploits=exploits))
findings.sort(key=lambda f: max((c.cvss_score for c in f.cves), default=0.0),
reverse=True)
logger.info("CVE hunt complete: %d findings, %d CVEs, %d exploits",
len(findings), sum(len(f.cves) for f in findings),
sum(len(f.exploits) for f in findings))
return findings