fixed intelligent_smart_scan 16/151 tools enabled in it

This commit is contained in:
Muhammad Osama
2025-08-19 08:06:01 +05:00
parent 764ce98a3e
commit 55b9191229
2 changed files with 355 additions and 21 deletions
+26 -4
View File
@@ -4719,7 +4719,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP:
Returns:
Results from AI-optimized scanning with tool execution summary
"""
logger.info(f"🚀 Starting intelligent smart scan for {target}")
logger.info(f"{HexStrikeColors.FIRE_RED}🚀 Starting intelligent smart scan for {target}{HexStrikeColors.RESET}")
data = {
"target": target,
@@ -4730,10 +4730,32 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP:
if result.get("success"):
scan_results = result.get("scan_results", {})
tools_executed = len(scan_results.get("tools_executed", []))
logger.info(f"✅ Intelligent scan completed - {tools_executed} tools executed")
tools_executed = scan_results.get("tools_executed", [])
execution_summary = scan_results.get("execution_summary", {})
# Enhanced logging with detailed results
logger.info(f"{HexStrikeColors.SUCCESS}✅ Intelligent scan completed for {target}{HexStrikeColors.RESET}")
logger.info(f"{HexStrikeColors.CYBER_ORANGE}📊 Execution Summary:{HexStrikeColors.RESET}")
logger.info(f" • Tools executed: {execution_summary.get('successful_tools', 0)}/{execution_summary.get('total_tools', 0)}")
logger.info(f" • Success rate: {execution_summary.get('success_rate', 0):.1f}%")
logger.info(f" • Total vulnerabilities: {scan_results.get('total_vulnerabilities', 0)}")
logger.info(f" • Execution time: {execution_summary.get('total_execution_time', 0):.2f}s")
# Log successful tools
successful_tools = [t['tool'] for t in tools_executed if t.get('success')]
if successful_tools:
logger.info(f"{HexStrikeColors.HIGHLIGHT_GREEN} Successful tools: {', '.join(successful_tools)} {HexStrikeColors.RESET}")
# Log failed tools
failed_tools = [t['tool'] for t in tools_executed if not t.get('success')]
if failed_tools:
logger.warning(f"{HexStrikeColors.HIGHLIGHT_RED} Failed tools: {', '.join(failed_tools)} {HexStrikeColors.RESET}")
# Log vulnerabilities found
if scan_results.get('total_vulnerabilities', 0) > 0:
logger.warning(f"{HexStrikeColors.VULN_HIGH}🚨 {scan_results['total_vulnerabilities']} vulnerabilities detected!{HexStrikeColors.RESET}")
else:
logger.error(f"❌ Intelligent scan failed for {target}")
logger.error(f"{HexStrikeColors.ERROR}❌ Intelligent scan failed for {target}: {result.get('error', 'Unknown error')}{HexStrikeColors.RESET}")
return result
+329 -17
View File
@@ -7802,7 +7802,7 @@ def create_attack_chain():
@app.route("/api/intelligence/smart-scan", methods=["POST"])
def intelligent_smart_scan():
"""Execute an intelligent scan using AI-driven tool selection and parameter optimization"""
"""Execute an intelligent scan using AI-driven tool selection and parameter optimization with parallel execution"""
try:
data = request.get_json()
if not data or 'target' not in data:
@@ -7820,32 +7820,127 @@ def intelligent_smart_scan():
# Select optimal tools
selected_tools = decision_engine.select_optimal_tools(profile, objective)[:max_tools]
# Execute tools with optimized parameters
# Execute tools in parallel with real tool execution
scan_results = {
"target": target,
"target_profile": profile.to_dict(),
"tools_executed": [],
"total_vulnerabilities": 0,
"execution_summary": {}
"execution_summary": {},
"combined_output": ""
}
for tool in selected_tools:
logger.info(f"🔧 Executing {tool} with optimized parameters")
# Get optimized parameters
optimized_params = decision_engine.optimize_parameters(tool, profile)
# Execute the tool (this would call the actual tool endpoint)
tool_result = {
"tool": tool,
"parameters": optimized_params,
"status": "executed",
"timestamp": datetime.now().isoformat()
def execute_single_tool(tool_name, target, profile):
"""Execute a single tool and return results"""
try:
logger.info(f"🔧 Executing {tool_name} with optimized parameters")
# Get optimized parameters for this tool
optimized_params = decision_engine.optimize_parameters(tool_name, profile)
# Map tool names to their actual execution functions
tool_execution_map = {
'nmap': lambda: execute_nmap_scan(target, optimized_params),
'gobuster': lambda: execute_gobuster_scan(target, optimized_params),
'nuclei': lambda: execute_nuclei_scan(target, optimized_params),
'nikto': lambda: execute_nikto_scan(target, optimized_params),
'sqlmap': lambda: execute_sqlmap_scan(target, optimized_params),
'ffuf': lambda: execute_ffuf_scan(target, optimized_params),
'feroxbuster': lambda: execute_feroxbuster_scan(target, optimized_params),
'katana': lambda: execute_katana_scan(target, optimized_params),
'httpx': lambda: execute_httpx_scan(target, optimized_params),
'wpscan': lambda: execute_wpscan_scan(target, optimized_params),
'dirsearch': lambda: execute_dirsearch_scan(target, optimized_params),
'arjun': lambda: execute_arjun_scan(target, optimized_params),
'paramspider': lambda: execute_paramspider_scan(target, optimized_params),
'dalfox': lambda: execute_dalfox_scan(target, optimized_params),
'amass': lambda: execute_amass_scan(target, optimized_params),
'subfinder': lambda: execute_subfinder_scan(target, optimized_params)
}
# Execute the tool if we have a mapping for it
if tool_name in tool_execution_map:
result = tool_execution_map[tool_name]()
# Extract vulnerability count from result
vuln_count = 0
if result.get('success') and result.get('stdout'):
# Simple vulnerability detection based on common patterns
output = result.get('stdout', '')
vuln_indicators = ['CRITICAL', 'HIGH', 'MEDIUM', 'VULNERABILITY', 'EXPLOIT', 'SQL injection', 'XSS', 'CSRF']
vuln_count = sum(1 for indicator in vuln_indicators if indicator.lower() in output.lower())
return {
"tool": tool_name,
"parameters": optimized_params,
"status": "success" if result.get('success') else "failed",
"timestamp": datetime.now().isoformat(),
"execution_time": result.get('execution_time', 0),
"stdout": result.get('stdout', ''),
"stderr": result.get('stderr', ''),
"vulnerabilities_found": vuln_count,
"command": result.get('command', ''),
"success": result.get('success', False)
}
else:
logger.warning(f"⚠️ No execution mapping found for tool: {tool_name}")
return {
"tool": tool_name,
"parameters": optimized_params,
"status": "skipped",
"timestamp": datetime.now().isoformat(),
"error": f"Tool {tool_name} not implemented in execution map",
"success": False
}
except Exception as e:
logger.error(f"❌ Error executing {tool_name}: {str(e)}")
return {
"tool": tool_name,
"status": "failed",
"timestamp": datetime.now().isoformat(),
"error": str(e),
"success": False
}
# Execute tools in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(selected_tools), 5)) as executor:
# Submit all tool executions
future_to_tool = {
executor.submit(execute_single_tool, tool, target, profile): tool
for tool in selected_tools
}
scan_results["tools_executed"].append(tool_result)
# Collect results as they complete
for future in future_to_tool:
tool_result = future.result()
scan_results["tools_executed"].append(tool_result)
# Accumulate vulnerability count
if tool_result.get("vulnerabilities_found"):
scan_results["total_vulnerabilities"] += tool_result["vulnerabilities_found"]
# Combine outputs
if tool_result.get("stdout"):
scan_results["combined_output"] += f"\n=== {tool_result['tool'].upper()} OUTPUT ===\n"
scan_results["combined_output"] += tool_result["stdout"]
scan_results["combined_output"] += "\n" + "="*50 + "\n"
# Create execution summary
successful_tools = [t for t in scan_results["tools_executed"] if t.get("success")]
failed_tools = [t for t in scan_results["tools_executed"] if not t.get("success")]
scan_results["execution_summary"] = {
"total_tools": len(selected_tools),
"successful_tools": len(successful_tools),
"failed_tools": len(failed_tools),
"success_rate": len(successful_tools) / len(selected_tools) * 100 if selected_tools else 0,
"total_execution_time": sum(t.get("execution_time", 0) for t in scan_results["tools_executed"]),
"tools_used": [t["tool"] for t in successful_tools]
}
logger.info(f"✅ Intelligent smart scan completed for {target}")
logger.info(f"📊 Results: {len(successful_tools)}/{len(selected_tools)} tools successful, {scan_results['total_vulnerabilities']} vulnerabilities found")
return jsonify({
"success": True,
@@ -7855,7 +7950,224 @@ def intelligent_smart_scan():
except Exception as e:
logger.error(f"💥 Error in intelligent smart scan: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
return jsonify({"error": f"Server error: {str(e)}", "success": False}), 500
# Helper functions for intelligent smart scan tool execution
def execute_nmap_scan(target, params):
"""Execute nmap scan with optimized parameters"""
try:
scan_type = params.get('scan_type', '-sV')
ports = params.get('ports', '')
additional_args = params.get('additional_args', '')
# Build nmap command
cmd_parts = ['nmap', scan_type]
if ports:
cmd_parts.extend(['-p', ports])
if additional_args:
cmd_parts.extend(additional_args.split())
cmd_parts.append(target)
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_gobuster_scan(target, params):
"""Execute gobuster scan with optimized parameters"""
try:
mode = params.get('mode', 'dir')
wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt')
additional_args = params.get('additional_args', '')
cmd_parts = ['gobuster', mode, '-u', target, '-w', wordlist]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_nuclei_scan(target, params):
"""Execute nuclei scan with optimized parameters"""
try:
severity = params.get('severity', '')
tags = params.get('tags', '')
additional_args = params.get('additional_args', '')
cmd_parts = ['nuclei', '-u', target]
if severity:
cmd_parts.extend(['-severity', severity])
if tags:
cmd_parts.extend(['-tags', tags])
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_nikto_scan(target, params):
"""Execute nikto scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['nikto', '-h', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_sqlmap_scan(target, params):
"""Execute sqlmap scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '--batch --random-agent')
cmd_parts = ['sqlmap', '-u', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_ffuf_scan(target, params):
"""Execute ffuf scan with optimized parameters"""
try:
wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt')
additional_args = params.get('additional_args', '')
# Ensure target has FUZZ placeholder
if 'FUZZ' not in target:
target = target.rstrip('/') + '/FUZZ'
cmd_parts = ['ffuf', '-u', target, '-w', wordlist]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_feroxbuster_scan(target, params):
"""Execute feroxbuster scan with optimized parameters"""
try:
wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt')
additional_args = params.get('additional_args', '')
cmd_parts = ['feroxbuster', '-u', target, '-w', wordlist]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_katana_scan(target, params):
"""Execute katana scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['katana', '-u', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_httpx_scan(target, params):
"""Execute httpx scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '-tech-detect -status-code')
# Use shell command with pipe for httpx
cmd = f"echo {target} | httpx {additional_args}"
return execute_command(cmd)
except Exception as e:
return {"success": False, "error": str(e)}
def execute_wpscan_scan(target, params):
"""Execute wpscan scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '--enumerate p,t,u')
cmd_parts = ['wpscan', '--url', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_dirsearch_scan(target, params):
"""Execute dirsearch scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['dirsearch', '-u', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_arjun_scan(target, params):
"""Execute arjun scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['arjun', '-u', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_paramspider_scan(target, params):
"""Execute paramspider scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['paramspider', '-d', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_dalfox_scan(target, params):
"""Execute dalfox scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['dalfox', 'url', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_amass_scan(target, params):
"""Execute amass scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['amass', 'enum', '-d', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
def execute_subfinder_scan(target, params):
"""Execute subfinder scan with optimized parameters"""
try:
additional_args = params.get('additional_args', '')
cmd_parts = ['subfinder', '-d', target]
if additional_args:
cmd_parts.extend(additional_args.split())
return execute_command(' '.join(cmd_parts))
except Exception as e:
return {"success": False, "error": str(e)}
@app.route("/api/intelligence/technology-detection", methods=["POST"])
def detect_technologies():