Files
NeuroSploit/backend/api/v1/prompts.py
2026-01-19 19:21:57 -03:00

373 lines
11 KiB
Python

"""
NeuroSploit v3 - Prompts API Endpoints
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from backend.db.database import get_db
from backend.models import Prompt
from backend.schemas.prompt import (
PromptCreate, PromptUpdate, PromptResponse, PromptParse, PromptParseResult, PromptPreset
)
from backend.core.prompt_engine.parser import PromptParser
router = APIRouter()
# Preset prompts
PRESET_PROMPTS = [
{
"id": "full_pentest",
"name": "Full Penetration Test",
"description": "Comprehensive security assessment covering all vulnerability categories",
"category": "pentest",
"content": """Perform a comprehensive penetration test on the target application.
Test for ALL vulnerability categories:
- Injection vulnerabilities (XSS, SQL Injection, Command Injection, LDAP, XPath, Template Injection)
- Authentication flaws (Broken auth, session management, JWT issues, OAuth flaws)
- Authorization issues (IDOR, BOLA, privilege escalation, access control bypass)
- File handling vulnerabilities (LFI, RFI, path traversal, file upload, XXE)
- Request forgery (SSRF, CSRF)
- API security issues (rate limiting, mass assignment, excessive data exposure)
- Client-side vulnerabilities (CORS misconfig, clickjacking, open redirect)
- Information disclosure (error messages, stack traces, sensitive data exposure)
- Infrastructure issues (security headers, SSL/TLS, HTTP methods)
- Business logic flaws (race conditions, workflow bypass)
Use thorough testing with multiple payloads and bypass techniques.
Generate detailed PoC for each vulnerability found.
Provide remediation recommendations."""
},
{
"id": "owasp_top10",
"name": "OWASP Top 10",
"description": "Test for OWASP Top 10 2021 vulnerabilities",
"category": "compliance",
"content": """Test for OWASP Top 10 2021 vulnerabilities:
A01:2021 - Broken Access Control
- IDOR, privilege escalation, access control bypass, CORS misconfig
A02:2021 - Cryptographic Failures
- Sensitive data exposure, weak encryption, cleartext transmission
A03:2021 - Injection
- SQL injection, XSS, command injection, LDAP injection
A04:2021 - Insecure Design
- Business logic flaws, missing security controls
A05:2021 - Security Misconfiguration
- Default configs, unnecessary features, missing headers
A06:2021 - Vulnerable Components
- Outdated libraries, known CVEs
A07:2021 - Identification and Authentication Failures
- Weak passwords, session fixation, credential stuffing
A08:2021 - Software and Data Integrity Failures
- Insecure deserialization, CI/CD vulnerabilities
A09:2021 - Security Logging and Monitoring Failures
- Missing audit logs, insufficient monitoring
A10:2021 - Server-Side Request Forgery (SSRF)
- Internal network access, cloud metadata exposure"""
},
{
"id": "api_security",
"name": "API Security Testing",
"description": "Focused testing for REST and GraphQL APIs",
"category": "api",
"content": """Perform API security testing:
Authentication & Authorization:
- Test JWT implementation (algorithm confusion, signature bypass, claim manipulation)
- OAuth/OIDC flow testing
- API key exposure and validation
- Rate limiting bypass
- BOLA/IDOR on all endpoints
Input Validation:
- SQL injection on API parameters
- NoSQL injection
- Command injection
- Parameter pollution
- Mass assignment vulnerabilities
Data Exposure:
- Excessive data exposure in responses
- Sensitive data in error messages
- Information disclosure in headers
- Debug endpoints exposure
GraphQL Specific (if applicable):
- Introspection enabled
- Query depth attacks
- Batching attacks
- Field suggestion exploitation
API Abuse:
- Rate limiting effectiveness
- Resource exhaustion
- Denial of service vectors"""
},
{
"id": "bug_bounty",
"name": "Bug Bounty Hunter",
"description": "Focus on high-impact, bounty-worthy vulnerabilities",
"category": "bug_bounty",
"content": """Hunt for high-impact vulnerabilities suitable for bug bounty:
Priority 1 - Critical Impact:
- Remote Code Execution (RCE)
- SQL Injection leading to data breach
- Authentication bypass
- SSRF to internal services/cloud metadata
- Privilege escalation to admin
Priority 2 - High Impact:
- Stored XSS
- IDOR on sensitive resources
- Account takeover vectors
- Payment/billing manipulation
- PII exposure
Priority 3 - Medium Impact:
- Reflected XSS
- CSRF on sensitive actions
- Information disclosure
- Rate limiting bypass
- Open redirects (if exploitable)
Look for:
- Unique attack chains
- Business logic flaws
- Edge cases and race conditions
- Bypass techniques for existing security controls
Document with clear PoC and impact assessment."""
},
{
"id": "quick_scan",
"name": "Quick Security Scan",
"description": "Fast scan for common vulnerabilities",
"category": "quick",
"content": """Perform a quick security scan for common vulnerabilities:
- Reflected XSS on input parameters
- Basic SQL injection testing
- Directory traversal/LFI
- Security headers check
- SSL/TLS configuration
- Common misconfigurations
- Information disclosure
Use minimal payloads for speed.
Focus on quick wins and obvious issues."""
},
{
"id": "auth_testing",
"name": "Authentication Testing",
"description": "Focus on authentication and session management",
"category": "auth",
"content": """Test authentication and session management:
Login Functionality:
- Username enumeration
- Password brute force protection
- Account lockout bypass
- Credential stuffing protection
- SQL injection in login
Session Management:
- Session token entropy
- Session fixation
- Session timeout
- Cookie security flags (HttpOnly, Secure, SameSite)
- Session invalidation on logout
Password Reset:
- Token predictability
- Token expiration
- Account enumeration
- Host header injection
Multi-Factor Authentication:
- MFA bypass techniques
- Backup codes weakness
- Rate limiting on OTP
OAuth/SSO:
- State parameter validation
- Redirect URI manipulation
- Token leakage"""
}
]
@router.get("/presets", response_model=List[PromptPreset])
async def get_preset_prompts():
"""Get list of preset prompts"""
return [
PromptPreset(
id=p["id"],
name=p["name"],
description=p["description"],
category=p["category"],
vulnerability_count=len(p["content"].split("\n"))
)
for p in PRESET_PROMPTS
]
@router.get("/presets/{preset_id}")
async def get_preset_prompt(preset_id: str):
"""Get a specific preset prompt by ID"""
for preset in PRESET_PROMPTS:
if preset["id"] == preset_id:
return preset
raise HTTPException(status_code=404, detail="Preset not found")
@router.post("/parse", response_model=PromptParseResult)
async def parse_prompt(prompt_data: PromptParse):
"""Parse a prompt to extract vulnerability types and testing scope"""
parser = PromptParser()
result = await parser.parse(prompt_data.content)
return result
@router.get("", response_model=List[PromptResponse])
async def list_prompts(
category: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
"""List all custom prompts"""
query = select(Prompt).where(Prompt.is_preset == False)
if category:
query = query.where(Prompt.category == category)
query = query.order_by(Prompt.created_at.desc())
result = await db.execute(query)
prompts = result.scalars().all()
return [PromptResponse(**p.to_dict()) for p in prompts]
@router.post("", response_model=PromptResponse)
async def create_prompt(prompt_data: PromptCreate, db: AsyncSession = Depends(get_db)):
"""Create a custom prompt"""
# Parse vulnerabilities from content
parser = PromptParser()
parsed = await parser.parse(prompt_data.content)
prompt = Prompt(
name=prompt_data.name,
description=prompt_data.description,
content=prompt_data.content,
category=prompt_data.category,
is_preset=False,
parsed_vulnerabilities=[v.dict() for v in parsed.vulnerabilities_to_test]
)
db.add(prompt)
await db.commit()
await db.refresh(prompt)
return PromptResponse(**prompt.to_dict())
@router.get("/{prompt_id}", response_model=PromptResponse)
async def get_prompt(prompt_id: str, db: AsyncSession = Depends(get_db)):
"""Get a prompt by ID"""
result = await db.execute(select(Prompt).where(Prompt.id == prompt_id))
prompt = result.scalar_one_or_none()
if not prompt:
raise HTTPException(status_code=404, detail="Prompt not found")
return PromptResponse(**prompt.to_dict())
@router.put("/{prompt_id}", response_model=PromptResponse)
async def update_prompt(
prompt_id: str,
prompt_data: PromptUpdate,
db: AsyncSession = Depends(get_db)
):
"""Update a prompt"""
result = await db.execute(select(Prompt).where(Prompt.id == prompt_id))
prompt = result.scalar_one_or_none()
if not prompt:
raise HTTPException(status_code=404, detail="Prompt not found")
if prompt.is_preset:
raise HTTPException(status_code=400, detail="Cannot modify preset prompts")
if prompt_data.name is not None:
prompt.name = prompt_data.name
if prompt_data.description is not None:
prompt.description = prompt_data.description
if prompt_data.content is not None:
prompt.content = prompt_data.content
# Re-parse vulnerabilities
parser = PromptParser()
parsed = await parser.parse(prompt_data.content)
prompt.parsed_vulnerabilities = [v.dict() for v in parsed.vulnerabilities_to_test]
if prompt_data.category is not None:
prompt.category = prompt_data.category
await db.commit()
await db.refresh(prompt)
return PromptResponse(**prompt.to_dict())
@router.delete("/{prompt_id}")
async def delete_prompt(prompt_id: str, db: AsyncSession = Depends(get_db)):
"""Delete a prompt"""
result = await db.execute(select(Prompt).where(Prompt.id == prompt_id))
prompt = result.scalar_one_or_none()
if not prompt:
raise HTTPException(status_code=404, detail="Prompt not found")
if prompt.is_preset:
raise HTTPException(status_code=400, detail="Cannot delete preset prompts")
await db.delete(prompt)
await db.commit()
return {"message": "Prompt deleted"}
@router.post("/upload")
async def upload_prompt(file: UploadFile = File(...)):
"""Upload a prompt file (.md or .txt)"""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
ext = "." + file.filename.split(".")[-1].lower() if "." in file.filename else ""
if ext not in {".md", ".txt"}:
raise HTTPException(status_code=400, detail="Invalid file type. Use .md or .txt")
content = await file.read()
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="Unable to decode file")
# Parse the prompt
parser = PromptParser()
parsed = await parser.parse(text)
return {
"filename": file.filename,
"content": text,
"parsed": parsed.dict()
}