mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-03-31 00:20:44 +02:00
## AI-Everywhere Auto Pentest - Pre-stream AI master planning (_ai_master_plan) runs before parallel streams - Stream 1 AI recon analysis (Phase 9: hidden endpoint probing, priority routing) - Stream 2 AI payload generation (replaces hardcoded payloads with context-aware AI) - Stream 3 AI tool output analysis (real findings vs noise classification) - 4 new prompt builders in ai_prompts.py (master_plan, junior_ai_test, tool_analysis, recon_analysis) ## LLM-as-VulnEngine: AI Deep Testing - New _ai_deep_test() iterative loop: OBSERVE→PLAN→EXECUTE→ANALYZE→ADAPT (3 iterations max) - AI-first for top 15 injection types, hardcoded fallback for rest - Per-endpoint AI testing in Phase C instead of single _ai_dynamic_test() - New system prompt context: deep_testing + iterative_testing - Token budget adaptive: 15 normal, 5 when <50k tokens remain ## Container Fix (Critical) - Fixed ENTRYPOINT ["/bin/bash", "-c"] → CMD ["bash"] in Dockerfile.kali - Root cause: Docker ran /bin/bash -c "sleep" "infinity" → missing operand → container exit - All Kali sandbox tools (nuclei, naabu, etc.) now start and execute correctly ## Deep Recon Overhaul - JS analysis: 10→30 files, 11 regex patterns, source map parsing, parameter extraction - Sitemaps: recursive index following (depth 3), 8 candidates, 500 URL cap - API discovery: 7→20 Swagger/OpenAPI paths, 1→6 GraphQL paths, request body schema extraction - Framework detection: 9 frameworks (WordPress, Laravel, Django, Spring, Express, ASP.NET, Rails, Next.js, Flask) - 40+ common hidden/sensitive paths checked (.env, .git, /actuator, /debug, etc.) - API pattern fuzzing: infers endpoints from discovered patterns, batch existence checks - HTTP method discovery via OPTIONS probing - URL normalization and deduplication ## Frontend Fixes - Elapsed time now works for completed scans (computed from started_at→completed_at) - Container telemetry: exit -1 shows "ERR" (yellow), duration shows "N/A" on failure - HTML report rewrite: professional pentest report with cover page, risk gauge, ToC, per-finding cards, print CSS ## Other - Updated rebuild.sh summary and validation - Bug bounty training datasets added Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
978 lines
39 KiB
Python
978 lines
39 KiB
Python
"""
|
|
Advanced reconnaissance module for NeuroSploitv2.
|
|
|
|
Performs deep JS analysis, sitemap/robots parsing, API enumeration,
|
|
source map parsing, framework-specific discovery, path fuzzing,
|
|
and technology fingerprinting using async HTTP requests.
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from urllib.parse import urljoin, urlparse, parse_qs, urlencode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import aiohttp
|
|
HAS_AIOHTTP = True
|
|
except ImportError:
|
|
HAS_AIOHTTP = False
|
|
|
|
try:
|
|
from xml.etree import ElementTree as ET
|
|
except ImportError:
|
|
ET = None
|
|
|
|
REQUEST_TIMEOUT = aiohttp.ClientTimeout(total=10) if HAS_AIOHTTP else None
|
|
MAX_JS_FILES = 30
|
|
MAX_JS_SIZE = 1024 * 1024 # 1 MB
|
|
MAX_SITEMAP_URLS = 500
|
|
MAX_SITEMAP_DEPTH = 3 # Recursive sitemap index depth
|
|
MAX_ENDPOINTS = 2000 # Global cap to prevent memory bloat
|
|
|
|
# --- Regex patterns for JS analysis ---
|
|
|
|
RE_API_ENDPOINT = re.compile(r'["\'](/api/v?\d*/[a-zA-Z0-9_/\-{}]+)["\']')
|
|
RE_RELATIVE_PATH = re.compile(r'["\'](/[a-zA-Z0-9_\-]+(?:/[a-zA-Z0-9_\-{}]+){1,6})["\']')
|
|
RE_FETCH_URL = re.compile(r'fetch\(\s*["\']([^"\']+)["\']')
|
|
RE_AXIOS_URL = re.compile(r'axios\.(?:get|post|put|patch|delete|request)\(\s*["\']([^"\']+)["\']')
|
|
RE_AJAX_URL = re.compile(r'\$\.ajax\(\s*\{[^}]*url\s*:\s*["\']([^"\']+)["\']', re.DOTALL)
|
|
RE_XHR_URL = re.compile(r'\.open\(\s*["\'][A-Z]+["\']\s*,\s*["\']([^"\']+)["\']')
|
|
RE_TEMPLATE_LITERAL = re.compile(r'`(/[a-zA-Z0-9_/\-]+\$\{[^}]+\}[a-zA-Z0-9_/\-]*)`')
|
|
RE_WINDOW_LOCATION = re.compile(r'(?:window\.location|location\.href)\s*=\s*["\']([^"\']+)["\']')
|
|
RE_FORM_ACTION = re.compile(r'action\s*[:=]\s*["\']([^"\']+)["\']')
|
|
RE_HREF_PATTERN = re.compile(r'href\s*[:=]\s*["\']([^"\']+)["\']')
|
|
|
|
RE_API_KEY = re.compile(
|
|
r'(?:sk-[a-zA-Z0-9]{20,}|pk_(?:live|test)_[a-zA-Z0-9]{20,}'
|
|
r'|AKIA[0-9A-Z]{16}'
|
|
r'|ghp_[a-zA-Z0-9]{36}'
|
|
r'|glpat-[a-zA-Z0-9\-]{20,}'
|
|
r'|eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,})'
|
|
)
|
|
|
|
RE_INTERNAL_URL = re.compile(
|
|
r'https?://(?:localhost|127\.0\.0\.1|10\.\d+\.\d+\.\d+|192\.168\.\d+\.\d+|172\.(?:1[6-9]|2\d|3[01])\.\d+\.\d+)[^\s"\']*'
|
|
)
|
|
|
|
RE_REACT_ROUTE = re.compile(r'path\s*[:=]\s*["\'](/[^"\']*)["\']')
|
|
RE_ANGULAR_ROUTE = re.compile(r'path\s*:\s*["\']([^"\']+)["\']')
|
|
RE_VUE_ROUTE = re.compile(r'path\s*:\s*["\'](/[^"\']*)["\']')
|
|
RE_NEXTJS_PAGE = re.compile(r'"(/[a-zA-Z0-9_/\[\]\-]+)"')
|
|
|
|
# Source map patterns
|
|
RE_SOURCEMAP_URL = re.compile(r'//[#@]\s*sourceMappingURL\s*=\s*(\S+)')
|
|
RE_SOURCEMAP_ROUTES = re.compile(r'(?:pages|routes|views)/([a-zA-Z0-9_/\[\]\-]+)\.(?:tsx?|jsx?|vue|svelte)')
|
|
|
|
# GraphQL patterns
|
|
RE_GQL_QUERY = re.compile(r'(?:query|mutation|subscription)\s+(\w+)')
|
|
RE_GQL_FIELD = re.compile(r'gql\s*`[^`]*`', re.DOTALL)
|
|
|
|
# Parameter patterns in JS
|
|
RE_URL_PARAM = re.compile(r'[?&]([a-zA-Z0-9_]+)=')
|
|
RE_BODY_PARAM = re.compile(r'(?:body|data|params)\s*[:=]\s*\{([^}]+)\}', re.DOTALL)
|
|
RE_JSON_KEY = re.compile(r'["\']([a-zA-Z_][a-zA-Z0-9_]*)["\']')
|
|
|
|
|
|
@dataclass
|
|
class JSAnalysisResult:
|
|
"""Results from JavaScript file analysis."""
|
|
endpoints: List[str] = field(default_factory=list)
|
|
api_keys: List[str] = field(default_factory=list)
|
|
internal_urls: List[str] = field(default_factory=list)
|
|
secrets: List[str] = field(default_factory=list)
|
|
parameters: Dict[str, List[str]] = field(default_factory=dict)
|
|
source_map_routes: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class APISchema:
|
|
"""Parsed API schema from Swagger/OpenAPI or GraphQL introspection."""
|
|
endpoints: List[Dict] = field(default_factory=list)
|
|
version: str = ""
|
|
source: str = ""
|
|
|
|
|
|
@dataclass
|
|
class EndpointInfo:
|
|
"""Rich endpoint descriptor with method and parameter hints."""
|
|
url: str
|
|
method: str = "GET"
|
|
params: List[str] = field(default_factory=list)
|
|
source: str = "" # How this endpoint was discovered
|
|
priority: int = 5 # 1-10, higher = more interesting
|
|
|
|
|
|
def _normalize_url(url: str) -> str:
|
|
"""Canonicalize a URL for deduplication."""
|
|
parsed = urlparse(url)
|
|
path = parsed.path.rstrip("/") or "/"
|
|
# Normalize double slashes
|
|
while "//" in path:
|
|
path = path.replace("//", "/")
|
|
# Sort query parameters
|
|
if parsed.query:
|
|
params = parse_qs(parsed.query, keep_blank_values=True)
|
|
sorted_query = urlencode(sorted(params.items()), doseq=True)
|
|
return f"{parsed.scheme}://{parsed.netloc}{path}?{sorted_query}"
|
|
return f"{parsed.scheme}://{parsed.netloc}{path}"
|
|
|
|
|
|
class DeepRecon:
|
|
"""Advanced reconnaissance: JS analysis, sitemap, robots, API enum, fingerprinting."""
|
|
|
|
def __init__(self, session: Optional["aiohttp.ClientSession"] = None):
|
|
self._external_session = session is not None
|
|
self._session = session
|
|
self._seen_urls: Set[str] = set()
|
|
|
|
async def _get_session(self) -> "aiohttp.ClientSession":
|
|
if self._session is None or self._session.closed:
|
|
self._session = aiohttp.ClientSession(timeout=REQUEST_TIMEOUT)
|
|
self._external_session = False
|
|
return self._session
|
|
|
|
async def close(self):
|
|
if not self._external_session and self._session and not self._session.closed:
|
|
await self._session.close()
|
|
|
|
async def _fetch(self, url: str, max_size: int = 0) -> Optional[str]:
|
|
"""Fetch URL text with optional size limit. Returns None on any error."""
|
|
try:
|
|
session = await self._get_session()
|
|
async with session.get(url, ssl=False, allow_redirects=True) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
if max_size:
|
|
chunk = await resp.content.read(max_size)
|
|
return chunk.decode("utf-8", errors="replace")
|
|
return await resp.text()
|
|
except Exception:
|
|
return None
|
|
|
|
async def _head_check(self, url: str) -> Optional[int]:
|
|
"""Quick HEAD request to check if a URL exists. Returns status or None."""
|
|
try:
|
|
session = await self._get_session()
|
|
async with session.head(url, ssl=False, allow_redirects=True, timeout=aiohttp.ClientTimeout(total=5)) as resp:
|
|
return resp.status
|
|
except Exception:
|
|
return None
|
|
|
|
async def _check_url_alive(self, url: str, accept_codes: Set[int] = None) -> bool:
|
|
"""Check if URL returns an acceptable status code."""
|
|
if accept_codes is None:
|
|
accept_codes = {200, 201, 301, 302, 307, 308, 401, 403}
|
|
status = await self._head_check(url)
|
|
return status is not None and status in accept_codes
|
|
|
|
# ------------------------------------------------------------------
|
|
# JS file analysis (enhanced)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def crawl_js_files(self, base_url: str, js_urls: List[str]) -> JSAnalysisResult:
|
|
"""Fetch and analyse JavaScript files for endpoints, keys, and secrets."""
|
|
result = JSAnalysisResult()
|
|
urls_to_scan = list(dict.fromkeys(js_urls))[:MAX_JS_FILES]
|
|
|
|
tasks = [self._fetch(urljoin(base_url, u), max_size=MAX_JS_SIZE) for u in urls_to_scan]
|
|
bodies = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Also try to fetch source maps in parallel
|
|
sourcemap_tasks = []
|
|
sourcemap_base_urls = []
|
|
for url, body in zip(urls_to_scan, bodies):
|
|
if not isinstance(body, str):
|
|
continue
|
|
sm = RE_SOURCEMAP_URL.search(body)
|
|
if sm:
|
|
sm_url = sm.group(1)
|
|
if not sm_url.startswith("data:"):
|
|
full_url = urljoin(urljoin(base_url, url), sm_url)
|
|
sourcemap_tasks.append(self._fetch(full_url, max_size=MAX_JS_SIZE * 2))
|
|
sourcemap_base_urls.append(full_url)
|
|
|
|
sourcemap_bodies = []
|
|
if sourcemap_tasks:
|
|
sourcemap_bodies = await asyncio.gather(*sourcemap_tasks, return_exceptions=True)
|
|
|
|
seen_endpoints: set = set()
|
|
seen_params: Dict[str, Set[str]] = {}
|
|
|
|
for body in bodies:
|
|
if not isinstance(body, str):
|
|
continue
|
|
self._extract_from_js(body, seen_endpoints, seen_params, result)
|
|
|
|
# Parse source maps for original file paths → route discovery
|
|
for sm_body in sourcemap_bodies:
|
|
if not isinstance(sm_body, str):
|
|
continue
|
|
try:
|
|
sm_data = json.loads(sm_body)
|
|
sources = sm_data.get("sources", [])
|
|
for src in sources:
|
|
m = RE_SOURCEMAP_ROUTES.search(src)
|
|
if m:
|
|
route = "/" + m.group(1).replace("[", "{").replace("]", "}")
|
|
result.source_map_routes.append(route)
|
|
seen_endpoints.add(route)
|
|
except (json.JSONDecodeError, ValueError):
|
|
# Not valid JSON source map — might still contain paths
|
|
for m in RE_SOURCEMAP_ROUTES.finditer(sm_body):
|
|
route = "/" + m.group(1).replace("[", "{").replace("]", "}")
|
|
result.source_map_routes.append(route)
|
|
seen_endpoints.add(route)
|
|
|
|
# Resolve endpoints relative to base_url
|
|
for ep in sorted(seen_endpoints):
|
|
if ep.startswith("http"):
|
|
resolved = ep
|
|
elif ep.startswith("/"):
|
|
resolved = urljoin(base_url, ep)
|
|
else:
|
|
continue
|
|
normalized = _normalize_url(resolved)
|
|
if normalized not in self._seen_urls:
|
|
self._seen_urls.add(normalized)
|
|
result.endpoints.append(resolved)
|
|
|
|
# Convert param sets
|
|
for endpoint, params in seen_params.items():
|
|
result.parameters[endpoint] = sorted(params)
|
|
|
|
return result
|
|
|
|
def _extract_from_js(
|
|
self, body: str, seen_endpoints: set, seen_params: Dict[str, Set[str]],
|
|
result: JSAnalysisResult,
|
|
):
|
|
"""Extract endpoints, params, keys, and internal URLs from a JS body."""
|
|
# API endpoint patterns (expanded)
|
|
for regex in (RE_API_ENDPOINT, RE_RELATIVE_PATH, RE_FETCH_URL, RE_AXIOS_URL,
|
|
RE_AJAX_URL, RE_XHR_URL, RE_TEMPLATE_LITERAL, RE_WINDOW_LOCATION,
|
|
RE_FORM_ACTION, RE_HREF_PATTERN):
|
|
for m in regex.finditer(body):
|
|
ep = m.group(1) if regex.groups else m.group(0)
|
|
# Filter out obvious non-endpoints
|
|
if self._is_valid_endpoint(ep):
|
|
seen_endpoints.add(ep)
|
|
|
|
# Route definitions (React Router, Angular, Vue Router, Next.js)
|
|
for regex in (RE_REACT_ROUTE, RE_ANGULAR_ROUTE, RE_VUE_ROUTE, RE_NEXTJS_PAGE):
|
|
for m in regex.finditer(body):
|
|
route = m.group(1)
|
|
if route.startswith("/") and len(route) < 200:
|
|
seen_endpoints.add(route)
|
|
|
|
# Extract URL parameters
|
|
for m in RE_URL_PARAM.finditer(body):
|
|
param_name = m.group(1)
|
|
# Find the URL this param belongs to (rough heuristic)
|
|
start = max(0, m.start() - 200)
|
|
context = body[start:m.start()]
|
|
for ep_regex in (RE_FETCH_URL, RE_API_ENDPOINT):
|
|
ep_match = ep_regex.search(context)
|
|
if ep_match:
|
|
ep = ep_match.group(1) if ep_regex.groups else ep_match.group(0)
|
|
if ep not in seen_params:
|
|
seen_params[ep] = set()
|
|
seen_params[ep].add(param_name)
|
|
|
|
# Extract JSON body parameters
|
|
for m in RE_BODY_PARAM.finditer(body):
|
|
block = m.group(1)
|
|
for key_m in RE_JSON_KEY.finditer(block):
|
|
key = key_m.group(1)
|
|
if len(key) <= 50 and not key.startswith("__"):
|
|
if "_body_params" not in seen_params:
|
|
seen_params["_body_params"] = set()
|
|
seen_params["_body_params"].add(key)
|
|
|
|
# API keys / tokens
|
|
for m in RE_API_KEY.finditer(body):
|
|
val = m.group(0)
|
|
if val not in result.api_keys:
|
|
result.api_keys.append(val)
|
|
result.secrets.append(val)
|
|
|
|
# Internal / private URLs
|
|
for m in RE_INTERNAL_URL.finditer(body):
|
|
val = m.group(0)
|
|
if val not in result.internal_urls:
|
|
result.internal_urls.append(val)
|
|
|
|
@staticmethod
|
|
def _is_valid_endpoint(ep: str) -> bool:
|
|
"""Filter out non-endpoint matches (CSS, images, data URIs, etc.)."""
|
|
if not ep or len(ep) > 500:
|
|
return False
|
|
if ep.startswith(("data:", "javascript:", "mailto:", "tel:", "#", "blob:")):
|
|
return False
|
|
# Skip common static assets
|
|
SKIP_EXT = ('.css', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.woff',
|
|
'.woff2', '.ttf', '.eot', '.mp4', '.mp3', '.webp', '.avif',
|
|
'.map', '.ts', '.tsx', '.jsx', '.scss', '.less', '.pdf')
|
|
lower = ep.lower()
|
|
if any(lower.endswith(ext) for ext in SKIP_EXT):
|
|
return False
|
|
# Must look like a path
|
|
if ep.startswith("/") or ep.startswith("http"):
|
|
return True
|
|
return False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Sitemap parsing (enhanced with recursive index following)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def parse_sitemap(self, target: str) -> List[str]:
|
|
"""Fetch and parse sitemap XML files for URLs. Follows sitemap indexes recursively."""
|
|
target = target.rstrip("/")
|
|
candidates = [
|
|
f"{target}/sitemap.xml",
|
|
f"{target}/sitemap_index.xml",
|
|
f"{target}/sitemap1.xml",
|
|
f"{target}/sitemap-index.xml",
|
|
f"{target}/sitemaps.xml",
|
|
f"{target}/post-sitemap.xml",
|
|
f"{target}/page-sitemap.xml",
|
|
f"{target}/category-sitemap.xml",
|
|
]
|
|
|
|
# Also check robots.txt for sitemap directives
|
|
robots_body = await self._fetch(f"{target}/robots.txt")
|
|
if robots_body:
|
|
for line in robots_body.splitlines():
|
|
line = line.strip()
|
|
if line.lower().startswith("sitemap:"):
|
|
sm_url = line.split(":", 1)[1].strip()
|
|
if sm_url and sm_url not in candidates:
|
|
candidates.append(sm_url)
|
|
|
|
urls: set = set()
|
|
visited_sitemaps: set = set()
|
|
|
|
async def _parse_one(sitemap_url: str, depth: int = 0):
|
|
if depth > MAX_SITEMAP_DEPTH or sitemap_url in visited_sitemaps:
|
|
return
|
|
if len(urls) >= MAX_SITEMAP_URLS:
|
|
return
|
|
visited_sitemaps.add(sitemap_url)
|
|
|
|
body = await self._fetch(sitemap_url)
|
|
if not body or ET is None:
|
|
return
|
|
try:
|
|
root = ET.fromstring(body)
|
|
except ET.ParseError:
|
|
return
|
|
|
|
sub_sitemaps = []
|
|
for elem in root.iter():
|
|
tag = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
|
|
if tag == "loc" and elem.text:
|
|
loc = elem.text.strip()
|
|
# Check if this is a sub-sitemap
|
|
if loc.endswith(".xml") or "sitemap" in loc.lower():
|
|
sub_sitemaps.append(loc)
|
|
else:
|
|
urls.add(loc)
|
|
if len(urls) >= MAX_SITEMAP_URLS:
|
|
return
|
|
|
|
# Recursively follow sub-sitemaps
|
|
for sub in sub_sitemaps[:10]: # Limit sub-sitemap recursion
|
|
await _parse_one(sub, depth + 1)
|
|
|
|
# Parse all candidate sitemaps
|
|
for sitemap_url in candidates:
|
|
if len(urls) >= MAX_SITEMAP_URLS:
|
|
break
|
|
await _parse_one(sitemap_url)
|
|
|
|
return sorted(urls)[:MAX_SITEMAP_URLS]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Robots.txt parsing (enhanced with Sitemap extraction)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def parse_robots(self, target: str) -> Tuple[List[str], List[str]]:
|
|
"""Parse robots.txt. Returns (paths, sitemap_urls)."""
|
|
target = target.rstrip("/")
|
|
body = await self._fetch(f"{target}/robots.txt")
|
|
if not body:
|
|
return [], []
|
|
|
|
paths: set = set()
|
|
sitemaps: list = []
|
|
|
|
for line in body.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("#") or ":" not in line:
|
|
continue
|
|
directive, _, value = line.partition(":")
|
|
directive = directive.strip().lower()
|
|
value = value.strip()
|
|
if directive in ("disallow", "allow") and value and value != "/":
|
|
resolved = urljoin(target + "/", value)
|
|
paths.add(resolved)
|
|
elif directive == "sitemap" and value:
|
|
sitemaps.append(value)
|
|
|
|
return sorted(paths), sitemaps
|
|
|
|
# ------------------------------------------------------------------
|
|
# API enumeration (Swagger / OpenAPI / GraphQL / WADL / AsyncAPI)
|
|
# ------------------------------------------------------------------
|
|
|
|
_API_DOC_PATHS = [
|
|
"/swagger.json",
|
|
"/openapi.json",
|
|
"/api-docs",
|
|
"/v2/api-docs",
|
|
"/v3/api-docs",
|
|
"/swagger/v1/swagger.json",
|
|
"/swagger/v2/swagger.json",
|
|
"/.well-known/openapi",
|
|
"/api/swagger.json",
|
|
"/api/openapi.json",
|
|
"/api/v1/swagger.json",
|
|
"/api/v1/openapi.json",
|
|
"/api/docs",
|
|
"/docs/api",
|
|
"/doc.json",
|
|
"/public/swagger.json",
|
|
"/swagger-ui/swagger.json",
|
|
"/api-docs.json",
|
|
"/api/api-docs",
|
|
"/_api/docs",
|
|
]
|
|
|
|
_GRAPHQL_PATHS = [
|
|
"/graphql",
|
|
"/graphiql",
|
|
"/api/graphql",
|
|
"/v1/graphql",
|
|
"/gql",
|
|
"/query",
|
|
]
|
|
|
|
async def enumerate_api(self, target: str, technologies: List[str]) -> APISchema:
|
|
"""Discover and parse API documentation (OpenAPI/Swagger, GraphQL, WADL)."""
|
|
target = target.rstrip("/")
|
|
schema = APISchema()
|
|
|
|
# Try OpenAPI / Swagger endpoints (parallel batch)
|
|
api_tasks = [self._fetch(f"{target}{path}") for path in self._API_DOC_PATHS]
|
|
api_results = await asyncio.gather(*api_tasks, return_exceptions=True)
|
|
|
|
for path, body in zip(self._API_DOC_PATHS, api_results):
|
|
if not isinstance(body, str):
|
|
continue
|
|
try:
|
|
doc = json.loads(body)
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
|
|
if "paths" in doc or "openapi" in doc or "swagger" in doc:
|
|
schema.version = doc.get("openapi", doc.get("info", {}).get("version", ""))
|
|
schema.source = path
|
|
for route, methods in doc.get("paths", {}).items():
|
|
if not isinstance(methods, dict):
|
|
continue
|
|
for method, detail in methods.items():
|
|
if method.lower() in ("get", "post", "put", "patch", "delete", "options", "head"):
|
|
params = []
|
|
if isinstance(detail, dict):
|
|
for p in detail.get("parameters", []):
|
|
if isinstance(p, dict):
|
|
params.append(p.get("name", ""))
|
|
# Also extract request body schema params
|
|
req_body = detail.get("requestBody", {})
|
|
if isinstance(req_body, dict):
|
|
content = req_body.get("content", {})
|
|
for ct, ct_detail in content.items():
|
|
if isinstance(ct_detail, dict):
|
|
props = ct_detail.get("schema", {}).get("properties", {})
|
|
if isinstance(props, dict):
|
|
params.extend(props.keys())
|
|
schema.endpoints.append({
|
|
"url": route,
|
|
"method": method.upper(),
|
|
"params": [p for p in params if p],
|
|
})
|
|
logger.info(f"[DeepRecon] Found API schema at {path}: {len(schema.endpoints)} endpoints")
|
|
return schema
|
|
|
|
# GraphQL introspection (try multiple paths)
|
|
for gql_path in self._GRAPHQL_PATHS:
|
|
introspection = await self._graphql_introspect(f"{target}{gql_path}")
|
|
if introspection:
|
|
return introspection
|
|
|
|
return schema
|
|
|
|
async def _graphql_introspect(self, gql_url: str) -> Optional[APISchema]:
|
|
"""Attempt GraphQL introspection query at a specific URL."""
|
|
query = '{"query":"{ __schema { queryType { name } mutationType { name } types { name kind fields { name args { name type { name } } } } } }"}'
|
|
try:
|
|
session = await self._get_session()
|
|
headers = {"Content-Type": "application/json"}
|
|
async with session.post(
|
|
gql_url, data=query, headers=headers, ssl=False,
|
|
timeout=aiohttp.ClientTimeout(total=8),
|
|
) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
data = await resp.json()
|
|
except Exception:
|
|
return None
|
|
|
|
if "data" not in data or "__schema" not in data.get("data", {}):
|
|
return None
|
|
|
|
parsed_url = urlparse(gql_url)
|
|
source_path = parsed_url.path
|
|
|
|
schema = APISchema(version="graphql", source=source_path)
|
|
for type_info in data["data"]["__schema"].get("types", []):
|
|
type_name = type_info.get("name", "")
|
|
if type_name.startswith("__") or type_info.get("kind") in ("SCALAR", "ENUM", "INPUT_OBJECT"):
|
|
continue
|
|
for fld in type_info.get("fields", []) or []:
|
|
params = [a["name"] for a in fld.get("args", []) if isinstance(a, dict)]
|
|
schema.endpoints.append({
|
|
"url": f"/{type_name}/{fld['name']}",
|
|
"method": "QUERY",
|
|
"params": params,
|
|
})
|
|
return schema if schema.endpoints else None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Framework-specific endpoint discovery
|
|
# ------------------------------------------------------------------
|
|
|
|
_FRAMEWORK_PATHS: Dict[str, List[str]] = {
|
|
"wordpress": [
|
|
"/wp-admin/", "/wp-login.php", "/wp-json/wp/v2/posts",
|
|
"/wp-json/wp/v2/users", "/wp-json/wp/v2/pages",
|
|
"/wp-json/wp/v2/categories", "/wp-json/wp/v2/comments",
|
|
"/wp-json/wp/v2/media", "/wp-json/wp/v2/tags",
|
|
"/wp-json/", "/wp-content/uploads/",
|
|
"/wp-cron.php", "/xmlrpc.php", "/?rest_route=/wp/v2/users",
|
|
"/wp-admin/admin-ajax.php", "/wp-admin/load-scripts.php",
|
|
"/wp-includes/wlwmanifest.xml",
|
|
],
|
|
"laravel": [
|
|
"/api/user", "/api/login", "/api/register",
|
|
"/sanctum/csrf-cookie", "/telescope",
|
|
"/horizon", "/nova-api/", "/_debugbar/open",
|
|
"/storage/logs/laravel.log", "/env",
|
|
],
|
|
"django": [
|
|
"/admin/", "/admin/login/", "/api/",
|
|
"/__debug__/", "/static/admin/",
|
|
"/accounts/login/", "/accounts/signup/",
|
|
"/api/v1/", "/api/v2/",
|
|
],
|
|
"spring": [
|
|
"/actuator", "/actuator/health", "/actuator/env",
|
|
"/actuator/beans", "/actuator/mappings", "/actuator/info",
|
|
"/actuator/configprops", "/actuator/metrics",
|
|
"/swagger-ui.html", "/swagger-ui/index.html",
|
|
"/api-docs", "/v3/api-docs",
|
|
],
|
|
"express": [
|
|
"/api/", "/api/v1/", "/api/health",
|
|
"/api/status", "/auth/login", "/auth/register",
|
|
"/graphql",
|
|
],
|
|
"aspnet": [
|
|
"/_blazor", "/swagger", "/swagger/index.html",
|
|
"/api/values", "/api/health",
|
|
"/Identity/Account/Login", "/Identity/Account/Register",
|
|
],
|
|
"rails": [
|
|
"/rails/info", "/rails/mailers",
|
|
"/api/v1/", "/admin/",
|
|
"/users/sign_in", "/users/sign_up",
|
|
"/assets/application.js",
|
|
],
|
|
"nextjs": [
|
|
"/_next/data/", "/api/", "/api/auth/session",
|
|
"/api/auth/signin", "/api/auth/providers",
|
|
"/_next/static/chunks/",
|
|
],
|
|
"flask": [
|
|
"/api/", "/api/v1/", "/admin/",
|
|
"/static/", "/auth/login", "/auth/register",
|
|
"/swagger.json",
|
|
],
|
|
}
|
|
|
|
# Common hidden paths to check regardless of framework
|
|
_COMMON_HIDDEN_PATHS = [
|
|
"/.env", "/.git/config", "/.git/HEAD",
|
|
"/backup/", "/backups/", "/backup.sql", "/backup.zip",
|
|
"/config.json", "/config.yaml", "/config.yml",
|
|
"/debug/", "/debug/vars", "/debug/pprof",
|
|
"/internal/", "/internal/health", "/internal/status",
|
|
"/metrics", "/prometheus", "/health", "/healthz", "/ready",
|
|
"/status", "/ping", "/version", "/info",
|
|
"/.well-known/security.txt", "/security.txt",
|
|
"/crossdomain.xml", "/clientaccesspolicy.xml",
|
|
"/server-status", "/server-info",
|
|
"/phpinfo.php", "/info.php",
|
|
"/web.config", "/WEB-INF/web.xml",
|
|
"/console/", "/manage/", "/management/",
|
|
"/api/debug", "/api/config",
|
|
"/trace", "/jolokia/",
|
|
"/cgi-bin/", "/fcgi-bin/",
|
|
"/.htaccess", "/.htpasswd",
|
|
]
|
|
|
|
async def discover_framework_endpoints(
|
|
self, target: str, technologies: List[str]
|
|
) -> List[EndpointInfo]:
|
|
"""Probe framework-specific endpoints based on detected technologies."""
|
|
target = target.rstrip("/")
|
|
tech_lower = [t.lower() for t in technologies]
|
|
endpoints: List[EndpointInfo] = []
|
|
urls_to_check: List[Tuple[str, str, int]] = [] # (url, source, priority)
|
|
|
|
# Match frameworks by technology signatures
|
|
fw_matches = set()
|
|
for fw_name, keywords in {
|
|
"wordpress": ["wordpress", "wp-", "woocommerce"],
|
|
"laravel": ["laravel", "php", "lumen"],
|
|
"django": ["django", "python", "wagtail"],
|
|
"spring": ["spring", "java", "tomcat", "wildfly", "jetty"],
|
|
"express": ["express", "node", "koa", "fastify"],
|
|
"aspnet": ["asp.net", ".net", "blazor", "iis"],
|
|
"rails": ["ruby", "rails", "rack"],
|
|
"nextjs": ["next.js", "nextjs", "react", "vercel"],
|
|
"flask": ["flask", "python", "gunicorn", "werkzeug"],
|
|
}.items():
|
|
for kw in keywords:
|
|
for tech in tech_lower:
|
|
if kw in tech:
|
|
fw_matches.add(fw_name)
|
|
break
|
|
|
|
# Add framework-specific paths
|
|
for fw in fw_matches:
|
|
for path in self._FRAMEWORK_PATHS.get(fw, []):
|
|
urls_to_check.append((f"{target}{path}", f"framework:{fw}", 7))
|
|
|
|
# Always check common hidden paths
|
|
for path in self._COMMON_HIDDEN_PATHS:
|
|
urls_to_check.append((f"{target}{path}", "common_hidden", 6))
|
|
|
|
# Batch check existence (parallel HEAD requests)
|
|
check_tasks = [self._check_url_alive(url) for url, _, _ in urls_to_check]
|
|
results = await asyncio.gather(*check_tasks, return_exceptions=True)
|
|
|
|
for (url, source, priority), alive in zip(urls_to_check, results):
|
|
if alive is True:
|
|
endpoints.append(EndpointInfo(
|
|
url=url, method="GET", source=source, priority=priority,
|
|
))
|
|
|
|
logger.info(f"[DeepRecon] Framework discovery: {len(endpoints)}/{len(urls_to_check)} alive")
|
|
return endpoints
|
|
|
|
# ------------------------------------------------------------------
|
|
# Path pattern fuzzing
|
|
# ------------------------------------------------------------------
|
|
|
|
async def fuzz_api_patterns(
|
|
self, target: str, known_endpoints: List[str]
|
|
) -> List[EndpointInfo]:
|
|
"""Infer and test related endpoints from discovered patterns."""
|
|
target = target.rstrip("/")
|
|
target_parsed = urlparse(target)
|
|
target_origin = f"{target_parsed.scheme}://{target_parsed.netloc}"
|
|
|
|
inferred: Set[str] = set()
|
|
|
|
# Extract API path patterns
|
|
api_bases: Set[str] = set()
|
|
api_resources: Set[str] = set()
|
|
|
|
for ep in known_endpoints:
|
|
parsed = urlparse(ep)
|
|
path = parsed.path
|
|
# Identify API base paths like /api/v1, /api/v2
|
|
m = re.match(r'(/api(?:/v\d+)?)', path)
|
|
if m:
|
|
api_bases.add(m.group(1))
|
|
# Extract resource name
|
|
rest = path[len(m.group(1)):]
|
|
parts = [p for p in rest.split("/") if p and not p.isdigit() and not re.match(r'^[0-9a-f-]{8,}$', p)]
|
|
if parts:
|
|
api_resources.add(parts[0])
|
|
|
|
# Common REST resource names to try
|
|
COMMON_RESOURCES = [
|
|
"users", "user", "auth", "login", "register", "logout",
|
|
"profile", "settings", "admin", "posts", "articles",
|
|
"comments", "categories", "tags", "search", "upload",
|
|
"files", "images", "media", "notifications", "messages",
|
|
"products", "orders", "payments", "invoices", "customers",
|
|
"dashboard", "reports", "analytics", "logs", "events",
|
|
"webhooks", "tokens", "sessions", "roles", "permissions",
|
|
"config", "health", "status", "version", "docs",
|
|
]
|
|
|
|
# Common REST sub-patterns
|
|
CRUD_SUFFIXES = [
|
|
"", "/1", "/me", "/all", "/list", "/search",
|
|
"/count", "/export", "/import", "/bulk",
|
|
]
|
|
|
|
for base in api_bases:
|
|
# Try common resources under each API base
|
|
for resource in COMMON_RESOURCES:
|
|
if resource not in api_resources:
|
|
inferred.add(f"{target_origin}{base}/{resource}")
|
|
# Try CRUD variants for known resources
|
|
for resource in api_resources:
|
|
for suffix in CRUD_SUFFIXES:
|
|
inferred.add(f"{target_origin}{base}/{resource}{suffix}")
|
|
|
|
# Remove already-known endpoints
|
|
known_normalized = {_normalize_url(ep) for ep in known_endpoints}
|
|
inferred = {url for url in inferred if _normalize_url(url) not in known_normalized}
|
|
|
|
# Batch check (parallel, capped)
|
|
to_check = sorted(inferred)[:100]
|
|
check_tasks = [self._check_url_alive(url) for url in to_check]
|
|
results = await asyncio.gather(*check_tasks, return_exceptions=True)
|
|
|
|
discovered = []
|
|
for url, alive in zip(to_check, results):
|
|
if alive is True:
|
|
discovered.append(EndpointInfo(
|
|
url=url, method="GET", source="api_fuzzing", priority=6,
|
|
))
|
|
|
|
logger.info(f"[DeepRecon] API fuzzing: {len(discovered)}/{len(to_check)} alive")
|
|
return discovered
|
|
|
|
# ------------------------------------------------------------------
|
|
# Multi-method discovery
|
|
# ------------------------------------------------------------------
|
|
|
|
async def discover_methods(
|
|
self, target: str, endpoints: List[str], sample_size: int = 20
|
|
) -> Dict[str, List[str]]:
|
|
"""Test which HTTP methods each endpoint accepts (OPTIONS + probing)."""
|
|
results: Dict[str, List[str]] = {}
|
|
sampled = endpoints[:sample_size]
|
|
|
|
async def _check_options(url: str) -> Tuple[str, List[str]]:
|
|
try:
|
|
session = await self._get_session()
|
|
async with session.options(
|
|
url, ssl=False, timeout=aiohttp.ClientTimeout(total=5)
|
|
) as resp:
|
|
allow = resp.headers.get("Allow", "")
|
|
if allow:
|
|
return url, [m.strip().upper() for m in allow.split(",")]
|
|
# Also check Access-Control-Allow-Methods
|
|
cors = resp.headers.get("Access-Control-Allow-Methods", "")
|
|
if cors:
|
|
return url, [m.strip().upper() for m in cors.split(",")]
|
|
except Exception:
|
|
pass
|
|
return url, []
|
|
|
|
tasks = [_check_options(url) for url in sampled]
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for resp in responses:
|
|
if isinstance(resp, tuple):
|
|
url, methods = resp
|
|
if methods:
|
|
results[url] = methods
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Deep technology fingerprinting
|
|
# ------------------------------------------------------------------
|
|
|
|
_FINGERPRINT_FILES = [
|
|
"/readme.txt", "/README.md", "/CHANGELOG.md", "/CHANGES.txt",
|
|
"/package.json", "/composer.json", "/Gemfile.lock",
|
|
"/requirements.txt", "/go.mod", "/pom.xml", "/build.gradle",
|
|
]
|
|
|
|
_WP_PROBES = [
|
|
"/wp-links-opml.php",
|
|
"/wp-includes/js/wp-embed.min.js",
|
|
]
|
|
|
|
_DRUPAL_PROBES = [
|
|
"/CHANGELOG.txt",
|
|
"/core/CHANGELOG.txt",
|
|
]
|
|
|
|
RE_VERSION = re.compile(r'["\']?version["\']?\s*[:=]\s*["\']?(\d+\.\d+[\w.\-]*)')
|
|
RE_WP_VER = re.compile(r'ver=(\d+\.\d+[\w.\-]*)')
|
|
RE_DRUPAL_VER = re.compile(r'Drupal\s+(\d+\.\d+[\w.\-]*)')
|
|
|
|
async def deep_fingerprint(
|
|
self, target: str, headers: Dict, body: str
|
|
) -> List[Dict]:
|
|
"""Detect software and versions from well-known files and probes."""
|
|
target = target.rstrip("/")
|
|
results: List[Dict] = []
|
|
seen: set = set()
|
|
|
|
def _add(software: str, version: str, source: str):
|
|
key = (software.lower(), version)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
results.append({"software": software, "version": version, "source": source})
|
|
|
|
# Generic version files
|
|
tasks = {path: self._fetch(f"{target}{path}") for path in self._FINGERPRINT_FILES}
|
|
bodies = dict(zip(tasks.keys(), await asyncio.gather(*tasks.values(), return_exceptions=True)))
|
|
|
|
for path, content in bodies.items():
|
|
if not isinstance(content, str):
|
|
continue
|
|
if path.endswith(".json"):
|
|
try:
|
|
doc = json.loads(content)
|
|
name = doc.get("name", "unknown")
|
|
ver = doc.get("version", "")
|
|
if ver:
|
|
_add(name, ver, path)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
elif path == "/go.mod":
|
|
m = re.search(r'^module\s+(\S+)', content, re.MULTILINE)
|
|
if m:
|
|
_add(m.group(1), "go-module", path)
|
|
for dep_m in re.finditer(r'^\s+(\S+)\s+(v[\d.]+)', content, re.MULTILINE):
|
|
_add(dep_m.group(1), dep_m.group(2), path)
|
|
elif path == "/requirements.txt":
|
|
for dep_m in re.finditer(r'^([a-zA-Z0-9_\-]+)==([\d.]+)', content, re.MULTILINE):
|
|
_add(dep_m.group(1), dep_m.group(2), path)
|
|
elif path == "/Gemfile.lock":
|
|
for dep_m in re.finditer(r'^\s{4}([a-z_\-]+)\s+\(([\d.]+)\)', content, re.MULTILINE):
|
|
_add(dep_m.group(1), dep_m.group(2), path)
|
|
else:
|
|
m = self.RE_VERSION.search(content)
|
|
if m:
|
|
_add("unknown", m.group(1), path)
|
|
|
|
# WordPress probes
|
|
for wp_path in self._WP_PROBES:
|
|
content = await self._fetch(f"{target}{wp_path}")
|
|
if not content:
|
|
continue
|
|
m = self.RE_WP_VER.search(content)
|
|
if m:
|
|
_add("WordPress", m.group(1), wp_path)
|
|
elif "WordPress" in content or "wp-" in content:
|
|
_add("WordPress", "unknown", wp_path)
|
|
|
|
# Drupal probes
|
|
for dp_path in self._DRUPAL_PROBES:
|
|
content = await self._fetch(f"{target}{dp_path}")
|
|
if not content:
|
|
continue
|
|
m = self.RE_DRUPAL_VER.search(content)
|
|
if m:
|
|
_add("Drupal", m.group(1), dp_path)
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Comprehensive recon pipeline
|
|
# ------------------------------------------------------------------
|
|
|
|
async def full_recon(
|
|
self, target: str, technologies: List[str],
|
|
js_urls: List[str], known_endpoints: List[str],
|
|
) -> Dict:
|
|
"""Run ALL recon phases and return aggregated results."""
|
|
results: Dict = {
|
|
"sitemap_urls": [],
|
|
"robots_paths": [],
|
|
"js_analysis": None,
|
|
"api_schema": None,
|
|
"framework_endpoints": [],
|
|
"fuzzed_endpoints": [],
|
|
"method_map": {},
|
|
"fingerprints": [],
|
|
"all_endpoints": [],
|
|
}
|
|
|
|
# Run independent phases in parallel
|
|
sitemap_task = self.parse_sitemap(target)
|
|
robots_task = self.parse_robots(target)
|
|
js_task = self.crawl_js_files(target, js_urls) if js_urls else asyncio.sleep(0)
|
|
api_task = self.enumerate_api(target, technologies)
|
|
fw_task = self.discover_framework_endpoints(target, technologies)
|
|
|
|
sitemap_result, robots_result, js_result, api_result, fw_result = \
|
|
await asyncio.gather(sitemap_task, robots_task, js_task, api_task, fw_task,
|
|
return_exceptions=True)
|
|
|
|
if isinstance(sitemap_result, list):
|
|
results["sitemap_urls"] = sitemap_result
|
|
if isinstance(robots_result, tuple):
|
|
results["robots_paths"] = robots_result[0]
|
|
if isinstance(js_result, JSAnalysisResult):
|
|
results["js_analysis"] = js_result
|
|
if isinstance(api_result, APISchema):
|
|
results["api_schema"] = api_result
|
|
if isinstance(fw_result, list):
|
|
results["framework_endpoints"] = fw_result
|
|
|
|
# Aggregate all discovered endpoints
|
|
all_eps = set(known_endpoints)
|
|
all_eps.update(results["sitemap_urls"])
|
|
all_eps.update(results["robots_paths"])
|
|
if results["js_analysis"]:
|
|
all_eps.update(results["js_analysis"].endpoints)
|
|
if results["api_schema"]:
|
|
for ep in results["api_schema"].endpoints:
|
|
url = ep.get("url", "")
|
|
if url.startswith("/"):
|
|
all_eps.add(urljoin(target, url))
|
|
elif url.startswith("http"):
|
|
all_eps.add(url)
|
|
for fw_ep in results["framework_endpoints"]:
|
|
all_eps.add(fw_ep.url)
|
|
|
|
# Now run API fuzzing with ALL known endpoints
|
|
try:
|
|
fuzzed = await self.fuzz_api_patterns(target, sorted(all_eps))
|
|
if isinstance(fuzzed, list):
|
|
results["fuzzed_endpoints"] = fuzzed
|
|
for ep in fuzzed:
|
|
all_eps.add(ep.url)
|
|
except Exception as e:
|
|
logger.warning(f"[DeepRecon] API fuzzing error: {e}")
|
|
|
|
# Discover methods for a sample
|
|
try:
|
|
methods = await self.discover_methods(target, sorted(all_eps))
|
|
results["method_map"] = methods
|
|
except Exception as e:
|
|
logger.warning(f"[DeepRecon] Method discovery error: {e}")
|
|
|
|
results["all_endpoints"] = sorted(all_eps)[:MAX_ENDPOINTS]
|
|
logger.info(f"[DeepRecon] Total endpoints discovered: {len(results['all_endpoints'])}")
|
|
|
|
return results
|