mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 22:12:45 +00:00
346 lines
13 KiB
Python
346 lines
13 KiB
Python
"""
|
|
NeuroSploit v3 - Settings API Endpoints
|
|
"""
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, Dict
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, delete, text
|
|
from pydantic import BaseModel
|
|
|
|
from backend.db.database import get_db, engine
|
|
from backend.models import Scan, Target, Endpoint, Vulnerability, VulnerabilityTest, Report
|
|
|
|
router = APIRouter()
|
|
|
|
# Path to .env file (project root)
|
|
ENV_FILE_PATH = Path(__file__).parent.parent.parent.parent / ".env"
|
|
|
|
|
|
def _update_env_file(updates: Dict[str, str]) -> bool:
|
|
"""
|
|
Update key=value pairs in the .env file without breaking formatting.
|
|
- If the key exists (even commented out), update its value
|
|
- If the key doesn't exist, append it
|
|
- Preserves comments and blank lines
|
|
"""
|
|
if not ENV_FILE_PATH.exists():
|
|
return False
|
|
|
|
try:
|
|
lines = ENV_FILE_PATH.read_text().splitlines()
|
|
updated_keys = set()
|
|
|
|
new_lines = []
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
matched = False
|
|
|
|
for key, value in updates.items():
|
|
# Match: KEY=..., # KEY=..., #KEY=...
|
|
pattern = rf'^#?\s*{re.escape(key)}\s*='
|
|
if re.match(pattern, stripped):
|
|
# Replace with uncommented key=value
|
|
new_lines.append(f"{key}={value}")
|
|
updated_keys.add(key)
|
|
matched = True
|
|
break
|
|
|
|
if not matched:
|
|
new_lines.append(line)
|
|
|
|
# Append any keys that weren't found in existing file
|
|
for key, value in updates.items():
|
|
if key not in updated_keys:
|
|
new_lines.append(f"{key}={value}")
|
|
|
|
# Write back with trailing newline
|
|
ENV_FILE_PATH.write_text("\n".join(new_lines) + "\n")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Warning: Failed to update .env file: {e}")
|
|
return False
|
|
|
|
|
|
class SettingsUpdate(BaseModel):
|
|
"""Settings update schema"""
|
|
llm_provider: Optional[str] = None
|
|
anthropic_api_key: Optional[str] = None
|
|
openai_api_key: Optional[str] = None
|
|
openrouter_api_key: Optional[str] = None
|
|
max_concurrent_scans: Optional[int] = None
|
|
aggressive_mode: Optional[bool] = None
|
|
default_scan_type: Optional[str] = None
|
|
recon_enabled_by_default: Optional[bool] = None
|
|
enable_model_routing: Optional[bool] = None
|
|
enable_knowledge_augmentation: Optional[bool] = None
|
|
enable_browser_validation: Optional[bool] = None
|
|
max_output_tokens: Optional[int] = None
|
|
|
|
|
|
class SettingsResponse(BaseModel):
|
|
"""Settings response schema"""
|
|
llm_provider: str = "claude"
|
|
has_anthropic_key: bool = False
|
|
has_openai_key: bool = False
|
|
has_openrouter_key: bool = False
|
|
max_concurrent_scans: int = 3
|
|
aggressive_mode: bool = False
|
|
default_scan_type: str = "full"
|
|
recon_enabled_by_default: bool = True
|
|
enable_model_routing: bool = False
|
|
enable_knowledge_augmentation: bool = False
|
|
enable_browser_validation: bool = False
|
|
max_output_tokens: Optional[int] = None
|
|
|
|
|
|
def _load_settings_from_env() -> dict:
|
|
"""
|
|
Load settings from environment variables / .env file on startup.
|
|
This ensures settings persist across server restarts and browser sessions.
|
|
"""
|
|
from dotenv import load_dotenv
|
|
# Re-read .env file to pick up disk-persisted values
|
|
if ENV_FILE_PATH.exists():
|
|
load_dotenv(ENV_FILE_PATH, override=True)
|
|
|
|
def _env_bool(key: str, default: bool = False) -> bool:
|
|
val = os.getenv(key, "").strip().lower()
|
|
if val in ("true", "1", "yes"):
|
|
return True
|
|
if val in ("false", "0", "no"):
|
|
return False
|
|
return default
|
|
|
|
def _env_int(key: str, default=None):
|
|
val = os.getenv(key, "").strip()
|
|
if val:
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
pass
|
|
return default
|
|
|
|
# Detect provider from which keys are set
|
|
provider = "claude"
|
|
if os.getenv("ANTHROPIC_API_KEY"):
|
|
provider = "claude"
|
|
elif os.getenv("OPENAI_API_KEY"):
|
|
provider = "openai"
|
|
elif os.getenv("OPENROUTER_API_KEY"):
|
|
provider = "openrouter"
|
|
|
|
return {
|
|
"llm_provider": provider,
|
|
"anthropic_api_key": os.getenv("ANTHROPIC_API_KEY", ""),
|
|
"openai_api_key": os.getenv("OPENAI_API_KEY", ""),
|
|
"openrouter_api_key": os.getenv("OPENROUTER_API_KEY", ""),
|
|
"max_concurrent_scans": _env_int("MAX_CONCURRENT_SCANS", 3),
|
|
"aggressive_mode": _env_bool("AGGRESSIVE_MODE", False),
|
|
"default_scan_type": os.getenv("DEFAULT_SCAN_TYPE", "full"),
|
|
"recon_enabled_by_default": _env_bool("RECON_ENABLED_BY_DEFAULT", True),
|
|
"enable_model_routing": _env_bool("ENABLE_MODEL_ROUTING", False),
|
|
"enable_knowledge_augmentation": _env_bool("ENABLE_KNOWLEDGE_AUGMENTATION", False),
|
|
"enable_browser_validation": _env_bool("ENABLE_BROWSER_VALIDATION", False),
|
|
"max_output_tokens": _env_int("MAX_OUTPUT_TOKENS", None),
|
|
}
|
|
|
|
|
|
# Load settings from .env on module import (server start)
|
|
_settings = _load_settings_from_env()
|
|
|
|
|
|
@router.get("", response_model=SettingsResponse)
|
|
async def get_settings():
|
|
"""Get current settings"""
|
|
import os
|
|
return SettingsResponse(
|
|
llm_provider=_settings["llm_provider"],
|
|
has_anthropic_key=bool(_settings["anthropic_api_key"] or os.getenv("ANTHROPIC_API_KEY")),
|
|
has_openai_key=bool(_settings["openai_api_key"] or os.getenv("OPENAI_API_KEY")),
|
|
has_openrouter_key=bool(_settings["openrouter_api_key"] or os.getenv("OPENROUTER_API_KEY")),
|
|
max_concurrent_scans=_settings["max_concurrent_scans"],
|
|
aggressive_mode=_settings["aggressive_mode"],
|
|
default_scan_type=_settings["default_scan_type"],
|
|
recon_enabled_by_default=_settings["recon_enabled_by_default"],
|
|
enable_model_routing=_settings["enable_model_routing"],
|
|
enable_knowledge_augmentation=_settings["enable_knowledge_augmentation"],
|
|
enable_browser_validation=_settings["enable_browser_validation"],
|
|
max_output_tokens=_settings["max_output_tokens"]
|
|
)
|
|
|
|
|
|
@router.put("", response_model=SettingsResponse)
|
|
async def update_settings(settings_data: SettingsUpdate):
|
|
"""Update settings - persists to memory, env vars, AND .env file"""
|
|
env_updates: Dict[str, str] = {}
|
|
|
|
if settings_data.llm_provider is not None:
|
|
_settings["llm_provider"] = settings_data.llm_provider
|
|
|
|
if settings_data.anthropic_api_key is not None:
|
|
_settings["anthropic_api_key"] = settings_data.anthropic_api_key
|
|
if settings_data.anthropic_api_key:
|
|
os.environ["ANTHROPIC_API_KEY"] = settings_data.anthropic_api_key
|
|
env_updates["ANTHROPIC_API_KEY"] = settings_data.anthropic_api_key
|
|
|
|
if settings_data.openai_api_key is not None:
|
|
_settings["openai_api_key"] = settings_data.openai_api_key
|
|
if settings_data.openai_api_key:
|
|
os.environ["OPENAI_API_KEY"] = settings_data.openai_api_key
|
|
env_updates["OPENAI_API_KEY"] = settings_data.openai_api_key
|
|
|
|
if settings_data.openrouter_api_key is not None:
|
|
_settings["openrouter_api_key"] = settings_data.openrouter_api_key
|
|
if settings_data.openrouter_api_key:
|
|
os.environ["OPENROUTER_API_KEY"] = settings_data.openrouter_api_key
|
|
env_updates["OPENROUTER_API_KEY"] = settings_data.openrouter_api_key
|
|
|
|
if settings_data.max_concurrent_scans is not None:
|
|
_settings["max_concurrent_scans"] = settings_data.max_concurrent_scans
|
|
|
|
if settings_data.aggressive_mode is not None:
|
|
_settings["aggressive_mode"] = settings_data.aggressive_mode
|
|
|
|
if settings_data.default_scan_type is not None:
|
|
_settings["default_scan_type"] = settings_data.default_scan_type
|
|
|
|
if settings_data.recon_enabled_by_default is not None:
|
|
_settings["recon_enabled_by_default"] = settings_data.recon_enabled_by_default
|
|
|
|
if settings_data.enable_model_routing is not None:
|
|
_settings["enable_model_routing"] = settings_data.enable_model_routing
|
|
val = str(settings_data.enable_model_routing).lower()
|
|
os.environ["ENABLE_MODEL_ROUTING"] = val
|
|
env_updates["ENABLE_MODEL_ROUTING"] = val
|
|
|
|
if settings_data.enable_knowledge_augmentation is not None:
|
|
_settings["enable_knowledge_augmentation"] = settings_data.enable_knowledge_augmentation
|
|
val = str(settings_data.enable_knowledge_augmentation).lower()
|
|
os.environ["ENABLE_KNOWLEDGE_AUGMENTATION"] = val
|
|
env_updates["ENABLE_KNOWLEDGE_AUGMENTATION"] = val
|
|
|
|
if settings_data.enable_browser_validation is not None:
|
|
_settings["enable_browser_validation"] = settings_data.enable_browser_validation
|
|
val = str(settings_data.enable_browser_validation).lower()
|
|
os.environ["ENABLE_BROWSER_VALIDATION"] = val
|
|
env_updates["ENABLE_BROWSER_VALIDATION"] = val
|
|
|
|
if settings_data.max_output_tokens is not None:
|
|
_settings["max_output_tokens"] = settings_data.max_output_tokens
|
|
if settings_data.max_output_tokens:
|
|
os.environ["MAX_OUTPUT_TOKENS"] = str(settings_data.max_output_tokens)
|
|
env_updates["MAX_OUTPUT_TOKENS"] = str(settings_data.max_output_tokens)
|
|
|
|
# Persist to .env file on disk
|
|
if env_updates:
|
|
_update_env_file(env_updates)
|
|
|
|
return await get_settings()
|
|
|
|
|
|
@router.post("/clear-database")
|
|
async def clear_database(db: AsyncSession = Depends(get_db)):
|
|
"""Clear all data from the database (reset to fresh state)"""
|
|
try:
|
|
# Delete in correct order to respect foreign key constraints
|
|
await db.execute(delete(VulnerabilityTest))
|
|
await db.execute(delete(Vulnerability))
|
|
await db.execute(delete(Endpoint))
|
|
await db.execute(delete(Report))
|
|
await db.execute(delete(Target))
|
|
await db.execute(delete(Scan))
|
|
await db.commit()
|
|
|
|
return {
|
|
"message": "Database cleared successfully",
|
|
"status": "success"
|
|
}
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=500, detail=f"Failed to clear database: {str(e)}")
|
|
|
|
|
|
@router.get("/stats")
|
|
async def get_database_stats(db: AsyncSession = Depends(get_db)):
|
|
"""Get database statistics"""
|
|
from sqlalchemy import func
|
|
|
|
scans_count = (await db.execute(select(func.count()).select_from(Scan))).scalar() or 0
|
|
vulns_count = (await db.execute(select(func.count()).select_from(Vulnerability))).scalar() or 0
|
|
endpoints_count = (await db.execute(select(func.count()).select_from(Endpoint))).scalar() or 0
|
|
reports_count = (await db.execute(select(func.count()).select_from(Report))).scalar() or 0
|
|
|
|
return {
|
|
"scans": scans_count,
|
|
"vulnerabilities": vulns_count,
|
|
"endpoints": endpoints_count,
|
|
"reports": reports_count
|
|
}
|
|
|
|
|
|
@router.get("/tools")
|
|
async def get_installed_tools():
|
|
"""Check which security tools are installed"""
|
|
import asyncio
|
|
import shutil
|
|
|
|
# Complete list of 40+ tools
|
|
tools = {
|
|
"recon": [
|
|
"subfinder", "amass", "assetfinder", "chaos", "uncover",
|
|
"dnsx", "massdns", "puredns", "cero", "tlsx", "cdncheck"
|
|
],
|
|
"web_discovery": [
|
|
"httpx", "httprobe", "katana", "gospider", "hakrawler",
|
|
"gau", "waybackurls", "cariddi", "getJS", "gowitness"
|
|
],
|
|
"fuzzing": [
|
|
"ffuf", "gobuster", "dirb", "dirsearch", "wfuzz", "arjun", "paramspider"
|
|
],
|
|
"vulnerability_scanning": [
|
|
"nuclei", "nikto", "sqlmap", "xsstrike", "dalfox", "crlfuzz"
|
|
],
|
|
"port_scanning": [
|
|
"nmap", "naabu", "rustscan"
|
|
],
|
|
"utilities": [
|
|
"gf", "qsreplace", "unfurl", "anew", "uro", "jq"
|
|
],
|
|
"tech_detection": [
|
|
"whatweb", "wafw00f"
|
|
],
|
|
"exploitation": [
|
|
"hydra", "medusa", "john", "hashcat"
|
|
],
|
|
"network": [
|
|
"curl", "wget", "dig", "whois"
|
|
]
|
|
}
|
|
|
|
results = {}
|
|
total_installed = 0
|
|
total_tools = 0
|
|
|
|
for category, tool_list in tools.items():
|
|
results[category] = {}
|
|
for tool in tool_list:
|
|
total_tools += 1
|
|
# Check if tool exists in PATH
|
|
is_installed = shutil.which(tool) is not None
|
|
results[category][tool] = is_installed
|
|
if is_installed:
|
|
total_installed += 1
|
|
|
|
return {
|
|
"tools": results,
|
|
"summary": {
|
|
"total": total_tools,
|
|
"installed": total_installed,
|
|
"missing": total_tools - total_installed,
|
|
"percentage": round((total_installed / total_tools) * 100, 1)
|
|
}
|
|
}
|