mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-13 06:22:44 +00:00
141 lines
4.4 KiB
Python
141 lines
4.4 KiB
Python
"""
|
|
NeuroSploit v3 - Scheduler API Router
|
|
|
|
CRUD endpoints for managing scheduled scan jobs.
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from fastapi import APIRouter, HTTPException, Request
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict
|
|
|
|
router = APIRouter()
|
|
|
|
CONFIG_PATH = Path(__file__).parent.parent.parent.parent / "config" / "config.json"
|
|
|
|
|
|
class ScheduleJobRequest(BaseModel):
|
|
"""Request model for creating a scheduled job."""
|
|
job_id: str
|
|
target: str
|
|
scan_type: str = "quick"
|
|
cron_expression: Optional[str] = None
|
|
interval_minutes: Optional[int] = None
|
|
agent_role: Optional[str] = None
|
|
llm_profile: Optional[str] = None
|
|
|
|
|
|
class ScheduleJobResponse(BaseModel):
|
|
"""Response model for a scheduled job."""
|
|
id: str
|
|
target: str
|
|
scan_type: str
|
|
schedule: str
|
|
status: str
|
|
next_run: Optional[str] = None
|
|
last_run: Optional[str] = None
|
|
run_count: int = 0
|
|
|
|
|
|
@router.get("/", response_model=List[Dict])
|
|
async def list_scheduled_jobs(request: Request):
|
|
"""List all scheduled scan jobs."""
|
|
scheduler = getattr(request.app.state, 'scheduler', None)
|
|
if not scheduler:
|
|
return []
|
|
return scheduler.list_jobs()
|
|
|
|
|
|
@router.post("/", response_model=Dict)
|
|
async def create_scheduled_job(job: ScheduleJobRequest, request: Request):
|
|
"""Create a new scheduled scan job."""
|
|
scheduler = getattr(request.app.state, 'scheduler', None)
|
|
if not scheduler:
|
|
raise HTTPException(status_code=503, detail="Scheduler not available")
|
|
|
|
if not job.cron_expression and not job.interval_minutes:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Either cron_expression or interval_minutes must be provided"
|
|
)
|
|
|
|
result = scheduler.add_job(
|
|
job_id=job.job_id,
|
|
target=job.target,
|
|
scan_type=job.scan_type,
|
|
cron_expression=job.cron_expression,
|
|
interval_minutes=job.interval_minutes,
|
|
agent_role=job.agent_role,
|
|
llm_profile=job.llm_profile
|
|
)
|
|
|
|
if "error" in result:
|
|
raise HTTPException(status_code=400, detail=result["error"])
|
|
|
|
return result
|
|
|
|
|
|
@router.delete("/{job_id}")
|
|
async def delete_scheduled_job(job_id: str, request: Request):
|
|
"""Delete a scheduled scan job."""
|
|
scheduler = getattr(request.app.state, 'scheduler', None)
|
|
if not scheduler:
|
|
raise HTTPException(status_code=503, detail="Scheduler not available")
|
|
|
|
success = scheduler.remove_job(job_id)
|
|
if not success:
|
|
raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found")
|
|
|
|
return {"message": f"Job '{job_id}' deleted", "id": job_id}
|
|
|
|
|
|
@router.post("/{job_id}/pause")
|
|
async def pause_scheduled_job(job_id: str, request: Request):
|
|
"""Pause a scheduled scan job."""
|
|
scheduler = getattr(request.app.state, 'scheduler', None)
|
|
if not scheduler:
|
|
raise HTTPException(status_code=503, detail="Scheduler not available")
|
|
|
|
success = scheduler.pause_job(job_id)
|
|
if not success:
|
|
raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found")
|
|
|
|
return {"message": f"Job '{job_id}' paused", "id": job_id, "status": "paused"}
|
|
|
|
|
|
@router.post("/{job_id}/resume")
|
|
async def resume_scheduled_job(job_id: str, request: Request):
|
|
"""Resume a paused scheduled scan job."""
|
|
scheduler = getattr(request.app.state, 'scheduler', None)
|
|
if not scheduler:
|
|
raise HTTPException(status_code=503, detail="Scheduler not available")
|
|
|
|
success = scheduler.resume_job(job_id)
|
|
if not success:
|
|
raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found")
|
|
|
|
return {"message": f"Job '{job_id}' resumed", "id": job_id, "status": "active"}
|
|
|
|
|
|
@router.get("/agent-roles", response_model=List[Dict])
|
|
async def get_agent_roles():
|
|
"""Return available agent roles from config.json for scheduler dropdown."""
|
|
try:
|
|
if not CONFIG_PATH.exists():
|
|
return []
|
|
config = json.loads(CONFIG_PATH.read_text())
|
|
roles = config.get("agent_roles", {})
|
|
result = []
|
|
for role_id, role_data in roles.items():
|
|
if role_data.get("enabled", True):
|
|
result.append({
|
|
"id": role_id,
|
|
"name": role_id.replace("_", " ").title(),
|
|
"description": role_data.get("description", ""),
|
|
"tools": role_data.get("tools_allowed", []),
|
|
})
|
|
return result
|
|
except Exception:
|
|
return []
|