Add files via upload

This commit is contained in:
Joas A Santos
2026-01-23 15:46:05 -03:00
committed by GitHub
parent 2a5e9b139a
commit f9e4ec16ec
19 changed files with 1398 additions and 158 deletions

View File

@@ -0,0 +1,105 @@
"""
NeuroSploit v3 - Report Service
Handles automatic report generation on scan completion/stop.
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from backend.models import Scan, Report, Vulnerability
from backend.core.report_engine.generator import ReportGenerator
from backend.api.websocket import manager as ws_manager
class ReportService:
"""Service for automatic report generation"""
def __init__(self, db: AsyncSession):
self.db = db
self.generator = ReportGenerator()
async def auto_generate_report(
self,
scan_id: str,
is_partial: bool = False,
format: str = "html"
) -> Report:
"""
Automatically generate a report for a scan.
Args:
scan_id: The scan ID to generate report for
is_partial: True if scan was stopped/incomplete
format: Report format (html, pdf, json)
Returns:
The generated Report model instance
"""
# Get scan
scan_result = await self.db.execute(select(Scan).where(Scan.id == scan_id))
scan = scan_result.scalar_one_or_none()
if not scan:
raise ValueError(f"Scan {scan_id} not found")
# Get vulnerabilities
vulns_result = await self.db.execute(
select(Vulnerability).where(Vulnerability.scan_id == scan_id)
)
vulnerabilities = vulns_result.scalars().all()
# Generate title
if is_partial:
title = f"Partial Report - {scan.name or 'Unnamed Scan'}"
else:
title = f"Security Assessment Report - {scan.name or 'Unnamed Scan'}"
# Generate report
try:
report_path, executive_summary = await self.generator.generate(
scan=scan,
vulnerabilities=vulnerabilities,
format=format,
title=title,
include_executive_summary=True,
include_poc=True,
include_remediation=True
)
# Create report record
report = Report(
scan_id=scan_id,
title=title,
format=format,
file_path=str(report_path) if report_path else None,
executive_summary=executive_summary,
auto_generated=True,
is_partial=is_partial
)
self.db.add(report)
await self.db.commit()
await self.db.refresh(report)
# Broadcast report generated event
await ws_manager.broadcast_report_generated(scan_id, report.to_dict())
await ws_manager.broadcast_log(
scan_id,
"info",
f"Report auto-generated: {title}"
)
return report
except Exception as e:
await ws_manager.broadcast_log(
scan_id,
"error",
f"Failed to auto-generate report: {str(e)}"
)
raise
async def auto_generate_report(db: AsyncSession, scan_id: str, is_partial: bool = False) -> Report:
"""Helper function to auto-generate a report"""
service = ReportService(db)
return await service.auto_generate_report(scan_id, is_partial)

View File

@@ -19,7 +19,7 @@ from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from backend.models import Scan, Target, Endpoint, Vulnerability, VulnerabilityTest
from backend.models import Scan, Target, Endpoint, Vulnerability, VulnerabilityTest, AgentTask
from backend.api.websocket import manager as ws_manager
from backend.api.v1.prompts import PRESET_PROMPTS
from backend.db.database import async_session_factory
@@ -72,6 +72,55 @@ class ScanService:
self.payload_generator = PayloadGenerator()
self._stop_requested = False
async def _create_agent_task(
self,
scan_id: str,
task_type: str,
task_name: str,
description: str = None,
tool_name: str = None,
tool_category: str = None
) -> AgentTask:
"""Create and start a new agent task"""
task = AgentTask(
scan_id=scan_id,
task_type=task_type,
task_name=task_name,
description=description,
tool_name=tool_name,
tool_category=tool_category
)
task.start()
self.db.add(task)
await self.db.flush()
# Broadcast task started
await ws_manager.broadcast_agent_task_started(scan_id, task.to_dict())
return task
async def _complete_agent_task(
self,
task: AgentTask,
items_processed: int = 0,
items_found: int = 0,
summary: str = None
):
"""Mark an agent task as completed"""
task.complete(items_processed, items_found, summary)
await self.db.commit()
# Broadcast task completed
await ws_manager.broadcast_agent_task_completed(task.scan_id, task.to_dict())
async def _fail_agent_task(self, task: AgentTask, error: str):
"""Mark an agent task as failed"""
task.fail(error)
await self.db.commit()
# Broadcast task update
await ws_manager.broadcast_agent_task(task.scan_id, task.to_dict())
async def execute_scan(self, scan_id: str):
"""Execute a complete scan with real recon, autonomous discovery, and AI analysis"""
try:
@@ -112,13 +161,29 @@ class ScanService:
await ws_manager.broadcast_log(scan_id, "info", f"Targets: {', '.join([t.url for t in targets])}")
# Check available tools
# Check available tools - Create task for initialization
init_task = await self._create_agent_task(
scan_id=scan_id,
task_type="recon",
task_name="Initialize Security Tools",
description="Checking available security tools and dependencies",
tool_name="system",
tool_category="setup"
)
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", "Checking installed security tools...")
tools_status = await check_tools_installed()
installed_tools = [t for t, installed in tools_status.items() if installed]
await ws_manager.broadcast_log(scan_id, "info", f"Available: {', '.join(installed_tools[:15])}...")
await self._complete_agent_task(
init_task,
items_processed=len(tools_status),
items_found=len(installed_tools),
summary=f"Found {len(installed_tools)} available security tools"
)
# Get prompt content
prompt_content = await self._get_prompt_content(scan)
await ws_manager.broadcast_log(scan_id, "info", "")
@@ -141,24 +206,46 @@ class ScanService:
depth = "medium" if scan.scan_type == "full" else "quick"
for target in targets:
await ws_manager.broadcast_log(scan_id, "info", f"Target: {target.url}")
target_recon = await recon_integration.run_full_recon(target.url, depth=depth)
recon_data = self._merge_recon_data(recon_data, target_recon)
# Create recon task for each target
recon_task = await self._create_agent_task(
scan_id=scan_id,
task_type="recon",
task_name=f"Reconnaissance: {target.hostname or target.url[:30]}",
description=f"Running {depth} reconnaissance on {target.url}",
tool_name="recon_integration",
tool_category="scanner"
)
# Save discovered endpoints to database
for endpoint_data in target_recon.get("endpoints", []):
if isinstance(endpoint_data, dict):
endpoint = Endpoint(
scan_id=scan_id,
target_id=target.id,
url=endpoint_data.get("url", ""),
method="GET",
path=endpoint_data.get("path", "/"),
response_status=endpoint_data.get("status"),
content_type=endpoint_data.get("content_type", "")
)
self.db.add(endpoint)
scan.total_endpoints += 1
try:
await ws_manager.broadcast_log(scan_id, "info", f"Target: {target.url}")
target_recon = await recon_integration.run_full_recon(target.url, depth=depth)
recon_data = self._merge_recon_data(recon_data, target_recon)
endpoints_found = 0
# Save discovered endpoints to database
for endpoint_data in target_recon.get("endpoints", []):
if isinstance(endpoint_data, dict):
endpoint = Endpoint(
scan_id=scan_id,
target_id=target.id,
url=endpoint_data.get("url", ""),
method="GET",
path=endpoint_data.get("path", "/"),
response_status=endpoint_data.get("status"),
content_type=endpoint_data.get("content_type", "")
)
self.db.add(endpoint)
scan.total_endpoints += 1
endpoints_found += 1
await self._complete_agent_task(
recon_task,
items_processed=1,
items_found=endpoints_found,
summary=f"Found {endpoints_found} endpoints, {len(target_recon.get('urls', []))} URLs"
)
except Exception as e:
await self._fail_agent_task(recon_task, str(e))
await self.db.commit()
recon_endpoints = scan.total_endpoints
@@ -181,60 +268,90 @@ class ScanService:
await ws_manager.broadcast_log(scan_id, level, message)
for target in targets:
async with AutonomousScanner(
# Create autonomous discovery task
discovery_task = await self._create_agent_task(
scan_id=scan_id,
log_callback=scanner_log,
timeout=15,
max_depth=3
) as scanner:
autonomous_results = await scanner.run_autonomous_scan(
target_url=target.url,
recon_data=recon_data
)
task_type="recon",
task_name=f"Autonomous Discovery: {target.hostname or target.url[:30]}",
description="AI-powered endpoint discovery and vulnerability scanning",
tool_name="autonomous_scanner",
tool_category="ai"
)
# Merge autonomous results
for ep in autonomous_results.get("endpoints", []):
if isinstance(ep, dict):
endpoint = Endpoint(
scan_id=scan_id,
target_id=target.id,
url=ep.get("url", ""),
method=ep.get("method", "GET"),
path=ep.get("url", "").split("?")[0].split("/")[-1] or "/"
)
self.db.add(endpoint)
scan.total_endpoints += 1
try:
endpoints_discovered = 0
vulns_discovered = 0
# Add URLs to recon data
recon_data["urls"] = recon_data.get("urls", []) + [
ep.get("url") for ep in autonomous_results.get("endpoints", [])
if isinstance(ep, dict)
]
recon_data["directories"] = autonomous_results.get("directories_found", [])
recon_data["parameters"] = autonomous_results.get("parameters_found", [])
# Save autonomous vulnerabilities directly
for vuln in autonomous_results.get("vulnerabilities", []):
db_vuln = Vulnerability(
scan_id=scan_id,
title=f"{vuln['type'].replace('_', ' ').title()} on {vuln['endpoint'][:50]}",
vulnerability_type=vuln["type"],
severity=self._confidence_to_severity(vuln["confidence"]),
description=vuln["evidence"],
affected_endpoint=vuln["endpoint"],
poc_payload=vuln["payload"],
poc_request=str(vuln.get("request", {}))[:5000],
poc_response=str(vuln.get("response", {}))[:5000]
async with AutonomousScanner(
scan_id=scan_id,
log_callback=scanner_log,
timeout=15,
max_depth=3
) as scanner:
autonomous_results = await scanner.run_autonomous_scan(
target_url=target.url,
recon_data=recon_data
)
self.db.add(db_vuln)
await ws_manager.broadcast_vulnerability_found(scan_id, {
"id": db_vuln.id,
"title": db_vuln.title,
"severity": db_vuln.severity,
"type": vuln["type"],
"endpoint": vuln["endpoint"]
})
# Merge autonomous results
for ep in autonomous_results.get("endpoints", []):
if isinstance(ep, dict):
endpoint = Endpoint(
scan_id=scan_id,
target_id=target.id,
url=ep.get("url", ""),
method=ep.get("method", "GET"),
path=ep.get("url", "").split("?")[0].split("/")[-1] or "/"
)
self.db.add(endpoint)
scan.total_endpoints += 1
endpoints_discovered += 1
# Add URLs to recon data
recon_data["urls"] = recon_data.get("urls", []) + [
ep.get("url") for ep in autonomous_results.get("endpoints", [])
if isinstance(ep, dict)
]
recon_data["directories"] = autonomous_results.get("directories_found", [])
recon_data["parameters"] = autonomous_results.get("parameters_found", [])
# Save autonomous vulnerabilities directly
for vuln in autonomous_results.get("vulnerabilities", []):
vuln_severity = self._confidence_to_severity(vuln["confidence"])
db_vuln = Vulnerability(
scan_id=scan_id,
title=f"{vuln['type'].replace('_', ' ').title()} on {vuln['endpoint'][:50]}",
vulnerability_type=vuln["type"],
severity=vuln_severity,
description=vuln["evidence"],
affected_endpoint=vuln["endpoint"],
poc_payload=vuln["payload"],
poc_request=str(vuln.get("request", {}))[:5000],
poc_response=str(vuln.get("response", {}))[:5000]
)
self.db.add(db_vuln)
await self.db.flush() # Ensure ID is assigned
vulns_discovered += 1
# Increment vulnerability count
await self._increment_vulnerability_count(scan, vuln_severity)
await ws_manager.broadcast_vulnerability_found(scan_id, {
"id": db_vuln.id,
"title": db_vuln.title,
"severity": db_vuln.severity,
"type": vuln["type"],
"endpoint": vuln["endpoint"]
})
await self._complete_agent_task(
discovery_task,
items_processed=endpoints_discovered,
items_found=vulns_discovered,
summary=f"Discovered {endpoints_discovered} endpoints, {vulns_discovered} vulnerabilities"
)
except Exception as e:
await self._fail_agent_task(discovery_task, str(e))
await self.db.commit()
await ws_manager.broadcast_log(scan_id, "info", f"Autonomous discovery complete. Total endpoints: {scan.total_endpoints}")
@@ -249,27 +366,48 @@ class ScanService:
await ws_manager.broadcast_log(scan_id, "info", "PHASE 2: AI ANALYSIS")
await ws_manager.broadcast_log(scan_id, "info", "=" * 40)
# Enhance prompt with authorization
enhanced_prompt = f"{GLOBAL_AUTHORIZATION}\n\nUSER REQUEST:\n{prompt_content}"
# Get AI-generated testing plan
await ws_manager.broadcast_log(scan_id, "info", "AI processing prompt and determining attack strategy...")
testing_plan = await self.ai_processor.process_prompt(
prompt=enhanced_prompt,
recon_data=recon_data,
target_info={"targets": [t.url for t in targets]}
# Create AI analysis task
analysis_task = await self._create_agent_task(
scan_id=scan_id,
task_type="analysis",
task_name="AI Strategy Analysis",
description="Analyzing prompt and recon data to determine testing strategy",
tool_name="ai_prompt_processor",
tool_category="ai"
)
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", "AI TESTING PLAN:")
await ws_manager.broadcast_log(scan_id, "info", f" Vulnerability Types: {', '.join(testing_plan.vulnerability_types[:10])}")
if len(testing_plan.vulnerability_types) > 10:
await ws_manager.broadcast_log(scan_id, "info", f" ... and {len(testing_plan.vulnerability_types) - 10} more types")
await ws_manager.broadcast_log(scan_id, "info", f" Testing Focus: {', '.join(testing_plan.testing_focus[:5])}")
await ws_manager.broadcast_log(scan_id, "info", f" Depth: {testing_plan.testing_depth}")
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", f"AI Reasoning: {testing_plan.ai_reasoning[:300]}...")
try:
# Enhance prompt with authorization
enhanced_prompt = f"{GLOBAL_AUTHORIZATION}\n\nUSER REQUEST:\n{prompt_content}"
# Get AI-generated testing plan
await ws_manager.broadcast_log(scan_id, "info", "AI processing prompt and determining attack strategy...")
testing_plan = await self.ai_processor.process_prompt(
prompt=enhanced_prompt,
recon_data=recon_data,
target_info={"targets": [t.url for t in targets]}
)
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", "AI TESTING PLAN:")
await ws_manager.broadcast_log(scan_id, "info", f" Vulnerability Types: {', '.join(testing_plan.vulnerability_types[:10])}")
if len(testing_plan.vulnerability_types) > 10:
await ws_manager.broadcast_log(scan_id, "info", f" ... and {len(testing_plan.vulnerability_types) - 10} more types")
await ws_manager.broadcast_log(scan_id, "info", f" Testing Focus: {', '.join(testing_plan.testing_focus[:5])}")
await ws_manager.broadcast_log(scan_id, "info", f" Depth: {testing_plan.testing_depth}")
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", f"AI Reasoning: {testing_plan.ai_reasoning[:300]}...")
await self._complete_agent_task(
analysis_task,
items_processed=1,
items_found=len(testing_plan.vulnerability_types),
summary=f"Generated testing plan with {len(testing_plan.vulnerability_types)} vulnerability types"
)
except Exception as e:
await self._fail_agent_task(analysis_task, str(e))
raise
await ws_manager.broadcast_progress(scan_id, 45, f"Testing {len(testing_plan.vulnerability_types)} vuln types")
@@ -286,48 +424,78 @@ class ScanService:
for target in targets:
await ws_manager.broadcast_log(scan_id, "info", f"Deploying AI Agent on: {target.url}")
# Create log callback for the agent
async def agent_log(level: str, message: str):
await ws_manager.broadcast_log(scan_id, level, message)
# Create AI pentest agent task
agent_task = await self._create_agent_task(
scan_id=scan_id,
task_type="testing",
task_name=f"AI Pentest Agent: {target.hostname or target.url[:30]}",
description=f"AI-powered penetration testing on {target.url}",
tool_name="ai_pentest_agent",
tool_category="ai"
)
# Build auth headers
auth_headers = self._build_auth_headers(scan)
try:
# Create log callback for the agent
async def agent_log(level: str, message: str):
await ws_manager.broadcast_log(scan_id, level, message)
async with AIPentestAgent(
target=target.url,
log_callback=agent_log,
auth_headers=auth_headers,
max_depth=5
) as agent:
agent_report = await agent.run()
# Build auth headers
auth_headers = self._build_auth_headers(scan)
# Save agent findings as vulnerabilities
for finding in agent_report.get("findings", []):
vuln = Vulnerability(
scan_id=scan_id,
title=f"{finding['type'].upper()} - {finding['endpoint'][:50]}",
vulnerability_type=finding["type"],
severity=finding["severity"],
description=finding["evidence"],
affected_endpoint=finding["endpoint"],
poc_payload=finding["payload"],
poc_request=finding.get("raw_request", "")[:5000],
poc_response=finding.get("raw_response", "")[:5000],
remediation=finding.get("impact", ""),
ai_analysis="\n".join(finding.get("exploitation_steps", []))
)
self.db.add(vuln)
findings_count = 0
endpoints_tested = 0
await ws_manager.broadcast_vulnerability_found(scan_id, {
"id": vuln.id,
"title": vuln.title,
"severity": vuln.severity,
"type": finding["type"],
"endpoint": finding["endpoint"]
})
async with AIPentestAgent(
target=target.url,
log_callback=agent_log,
auth_headers=auth_headers,
max_depth=5
) as agent:
agent_report = await agent.run()
# Update endpoint count
scan.total_endpoints += agent_report.get("summary", {}).get("total_endpoints", 0)
# Save agent findings as vulnerabilities
for finding in agent_report.get("findings", []):
finding_severity = finding["severity"]
vuln = Vulnerability(
scan_id=scan_id,
title=f"{finding['type'].upper()} - {finding['endpoint'][:50]}",
vulnerability_type=finding["type"],
severity=finding_severity,
description=finding["evidence"],
affected_endpoint=finding["endpoint"],
poc_payload=finding["payload"],
poc_request=finding.get("raw_request", "")[:5000],
poc_response=finding.get("raw_response", "")[:5000],
remediation=finding.get("impact", ""),
ai_analysis="\n".join(finding.get("exploitation_steps", []))
)
self.db.add(vuln)
await self.db.flush() # Ensure ID is assigned
findings_count += 1
# Increment vulnerability count
await self._increment_vulnerability_count(scan, finding_severity)
await ws_manager.broadcast_vulnerability_found(scan_id, {
"id": vuln.id,
"title": vuln.title,
"severity": vuln.severity,
"type": finding["type"],
"endpoint": finding["endpoint"]
})
# Update endpoint count
endpoints_tested = agent_report.get("summary", {}).get("total_endpoints", 0)
scan.total_endpoints += endpoints_tested
await self._complete_agent_task(
agent_task,
items_processed=endpoints_tested,
items_found=findings_count,
summary=f"Tested {endpoints_tested} endpoints, found {findings_count} vulnerabilities"
)
except Exception as e:
await self._fail_agent_task(agent_task, str(e))
await self.db.commit()
@@ -377,38 +545,70 @@ class ScanService:
await ws_manager.broadcast_log(scan_id, "info", f"Testing {len(endpoints)} endpoints for {len(testing_plan.vulnerability_types)} vuln types")
await ws_manager.broadcast_log(scan_id, "info", "")
# Test endpoints with AI-determined vulnerabilities
total_endpoints = len(endpoints)
async with DynamicVulnerabilityEngine() as engine:
for i, endpoint in enumerate(endpoints):
if self._stop_requested:
break
# Create vulnerability testing task
vuln_testing_task = await self._create_agent_task(
scan_id=scan_id,
task_type="testing",
task_name="Vulnerability Testing",
description=f"Testing {len(endpoints)} endpoints for {len(testing_plan.vulnerability_types)} vulnerability types",
tool_name="dynamic_vuln_engine",
tool_category="scanner"
)
progress = 45 + int((i / total_endpoints) * 45)
await ws_manager.broadcast_progress(
scan_id, progress,
f"Testing {i+1}/{total_endpoints}: {endpoint.path or endpoint.url[:50]}"
)
try:
# Test endpoints with AI-determined vulnerabilities
total_endpoints = len(endpoints)
endpoints_tested = 0
vulns_before = scan.total_vulnerabilities
# Log what we're testing
await ws_manager.broadcast_log(scan_id, "debug", f"[{i+1}/{total_endpoints}] Testing: {endpoint.url[:80]}")
async with DynamicVulnerabilityEngine() as engine:
for i, endpoint in enumerate(endpoints):
if self._stop_requested:
break
await self._test_endpoint_with_ai(
scan=scan,
endpoint=endpoint,
testing_plan=testing_plan,
engine=engine,
recon_data=recon_data
)
progress = 45 + int((i / total_endpoints) * 45)
await ws_manager.broadcast_progress(
scan_id, progress,
f"Testing {i+1}/{total_endpoints}: {endpoint.path or endpoint.url[:50]}"
)
# Update counts
await self._update_vulnerability_counts(scan)
# Log what we're testing
await ws_manager.broadcast_log(scan_id, "debug", f"[{i+1}/{total_endpoints}] Testing: {endpoint.url[:80]}")
await self._test_endpoint_with_ai(
scan=scan,
endpoint=endpoint,
testing_plan=testing_plan,
engine=engine,
recon_data=recon_data
)
endpoints_tested += 1
# Update final counts
await self._update_vulnerability_counts(scan)
vulns_found = scan.total_vulnerabilities - vulns_before
await self._complete_agent_task(
vuln_testing_task,
items_processed=endpoints_tested,
items_found=vulns_found,
summary=f"Tested {endpoints_tested} endpoints, found {vulns_found} vulnerabilities"
)
except Exception as e:
await self._fail_agent_task(vuln_testing_task, str(e))
raise
# Phase 4: Complete
scan.status = "completed"
scan.completed_at = datetime.utcnow()
scan.progress = 100
scan.current_phase = "completed"
# Calculate duration
if scan.started_at:
duration = (scan.completed_at - scan.started_at).total_seconds()
scan.duration = int(duration)
await self.db.commit()
await ws_manager.broadcast_log(scan_id, "info", "")
@@ -432,6 +632,16 @@ class ScanService:
"low": scan.low_count
})
# Auto-generate report on completion
try:
from backend.services.report_service import auto_generate_report
await ws_manager.broadcast_log(scan_id, "info", "")
await ws_manager.broadcast_log(scan_id, "info", "Generating security assessment report...")
report = await auto_generate_report(self.db, scan_id, is_partial=False)
await ws_manager.broadcast_log(scan_id, "info", f"Report generated: {report.title}")
except Exception as report_error:
await ws_manager.broadcast_log(scan_id, "warning", f"Failed to auto-generate report: {str(report_error)}")
except Exception as e:
import traceback
error_msg = f"Scan error: {str(e)}"
@@ -559,11 +769,12 @@ Be thorough and test all discovered endpoints aggressively.
if confidence >= 0.5: # Lower threshold to catch more
# Create vulnerability record
vuln_severity = ai_analysis.get("severity", self._confidence_to_severity(confidence))
vuln = Vulnerability(
scan_id=scan.id,
title=f"{vuln_type.replace('_', ' ').title()} on {endpoint.path or endpoint.url}",
vulnerability_type=vuln_type,
severity=ai_analysis.get("severity", self._confidence_to_severity(confidence)),
severity=vuln_severity,
description=ai_analysis.get("evidence", result.get("evidence", "")),
affected_endpoint=endpoint.url,
poc_payload=payload,
@@ -573,6 +784,10 @@ Be thorough and test all discovered endpoints aggressively.
ai_analysis=ai_analysis.get("exploitation_path", "")
)
self.db.add(vuln)
await self.db.flush() # Ensure ID is assigned
# Increment vulnerability count
await self._increment_vulnerability_count(scan, vuln_severity)
await ws_manager.broadcast_vulnerability_found(scan.id, {
"id": vuln.id,
@@ -761,3 +976,24 @@ Be thorough and test all discovered endpoints aggressively.
scan.total_endpoints = result.scalar() or 0
await self.db.commit()
async def _increment_vulnerability_count(self, scan: Scan, severity: str):
"""Increment vulnerability count for a severity level and broadcast update"""
# Increment the appropriate counter
severity_lower = severity.lower()
if severity_lower in ["critical", "high", "medium", "low", "info"]:
current = getattr(scan, f"{severity_lower}_count", 0)
setattr(scan, f"{severity_lower}_count", current + 1)
scan.total_vulnerabilities = (scan.total_vulnerabilities or 0) + 1
await self.db.commit()
# Broadcast stats update
await ws_manager.broadcast_stats_update(scan.id, {
"total_vulnerabilities": scan.total_vulnerabilities,
"critical": scan.critical_count,
"high": scan.high_count,
"medium": scan.medium_count,
"low": scan.low_count,
"info": scan.info_count,
"total_endpoints": scan.total_endpoints
})