From 55b91912290f622b008a653bcaa142fe418f54a9 Mon Sep 17 00:00:00 2001 From: Muhammad Osama Date: Tue, 19 Aug 2025 08:06:01 +0500 Subject: [PATCH] fixed intelligent_smart_scan 16/151 tools enabled in it --- hexstrike_mcp.py | 30 +++- hexstrike_server.py | 346 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 355 insertions(+), 21 deletions(-) diff --git a/hexstrike_mcp.py b/hexstrike_mcp.py index 805dd85..16ae2f7 100644 --- a/hexstrike_mcp.py +++ b/hexstrike_mcp.py @@ -4719,7 +4719,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: Returns: Results from AI-optimized scanning with tool execution summary """ - logger.info(f"🚀 Starting intelligent smart scan for {target}") + logger.info(f"{HexStrikeColors.FIRE_RED}🚀 Starting intelligent smart scan for {target}{HexStrikeColors.RESET}") data = { "target": target, @@ -4730,10 +4730,32 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: if result.get("success"): scan_results = result.get("scan_results", {}) - tools_executed = len(scan_results.get("tools_executed", [])) - logger.info(f"✅ Intelligent scan completed - {tools_executed} tools executed") + tools_executed = scan_results.get("tools_executed", []) + execution_summary = scan_results.get("execution_summary", {}) + + # Enhanced logging with detailed results + logger.info(f"{HexStrikeColors.SUCCESS}✅ Intelligent scan completed for {target}{HexStrikeColors.RESET}") + logger.info(f"{HexStrikeColors.CYBER_ORANGE}📊 Execution Summary:{HexStrikeColors.RESET}") + logger.info(f" • Tools executed: {execution_summary.get('successful_tools', 0)}/{execution_summary.get('total_tools', 0)}") + logger.info(f" • Success rate: {execution_summary.get('success_rate', 0):.1f}%") + logger.info(f" • Total vulnerabilities: {scan_results.get('total_vulnerabilities', 0)}") + logger.info(f" • Execution time: {execution_summary.get('total_execution_time', 0):.2f}s") + + # Log successful tools + successful_tools = [t['tool'] for t in tools_executed if t.get('success')] + if successful_tools: + logger.info(f"{HexStrikeColors.HIGHLIGHT_GREEN} Successful tools: {', '.join(successful_tools)} {HexStrikeColors.RESET}") + + # Log failed tools + failed_tools = [t['tool'] for t in tools_executed if not t.get('success')] + if failed_tools: + logger.warning(f"{HexStrikeColors.HIGHLIGHT_RED} Failed tools: {', '.join(failed_tools)} {HexStrikeColors.RESET}") + + # Log vulnerabilities found + if scan_results.get('total_vulnerabilities', 0) > 0: + logger.warning(f"{HexStrikeColors.VULN_HIGH}🚨 {scan_results['total_vulnerabilities']} vulnerabilities detected!{HexStrikeColors.RESET}") else: - logger.error(f"❌ Intelligent scan failed for {target}") + logger.error(f"{HexStrikeColors.ERROR}❌ Intelligent scan failed for {target}: {result.get('error', 'Unknown error')}{HexStrikeColors.RESET}") return result diff --git a/hexstrike_server.py b/hexstrike_server.py index cb12164..ba01649 100644 --- a/hexstrike_server.py +++ b/hexstrike_server.py @@ -7802,7 +7802,7 @@ def create_attack_chain(): @app.route("/api/intelligence/smart-scan", methods=["POST"]) def intelligent_smart_scan(): - """Execute an intelligent scan using AI-driven tool selection and parameter optimization""" + """Execute an intelligent scan using AI-driven tool selection and parameter optimization with parallel execution""" try: data = request.get_json() if not data or 'target' not in data: @@ -7820,32 +7820,127 @@ def intelligent_smart_scan(): # Select optimal tools selected_tools = decision_engine.select_optimal_tools(profile, objective)[:max_tools] - # Execute tools with optimized parameters + # Execute tools in parallel with real tool execution scan_results = { "target": target, "target_profile": profile.to_dict(), "tools_executed": [], "total_vulnerabilities": 0, - "execution_summary": {} + "execution_summary": {}, + "combined_output": "" } - for tool in selected_tools: - logger.info(f"🔧 Executing {tool} with optimized parameters") - - # Get optimized parameters - optimized_params = decision_engine.optimize_parameters(tool, profile) - - # Execute the tool (this would call the actual tool endpoint) - tool_result = { - "tool": tool, - "parameters": optimized_params, - "status": "executed", - "timestamp": datetime.now().isoformat() + def execute_single_tool(tool_name, target, profile): + """Execute a single tool and return results""" + try: + logger.info(f"🔧 Executing {tool_name} with optimized parameters") + + # Get optimized parameters for this tool + optimized_params = decision_engine.optimize_parameters(tool_name, profile) + + # Map tool names to their actual execution functions + tool_execution_map = { + 'nmap': lambda: execute_nmap_scan(target, optimized_params), + 'gobuster': lambda: execute_gobuster_scan(target, optimized_params), + 'nuclei': lambda: execute_nuclei_scan(target, optimized_params), + 'nikto': lambda: execute_nikto_scan(target, optimized_params), + 'sqlmap': lambda: execute_sqlmap_scan(target, optimized_params), + 'ffuf': lambda: execute_ffuf_scan(target, optimized_params), + 'feroxbuster': lambda: execute_feroxbuster_scan(target, optimized_params), + 'katana': lambda: execute_katana_scan(target, optimized_params), + 'httpx': lambda: execute_httpx_scan(target, optimized_params), + 'wpscan': lambda: execute_wpscan_scan(target, optimized_params), + 'dirsearch': lambda: execute_dirsearch_scan(target, optimized_params), + 'arjun': lambda: execute_arjun_scan(target, optimized_params), + 'paramspider': lambda: execute_paramspider_scan(target, optimized_params), + 'dalfox': lambda: execute_dalfox_scan(target, optimized_params), + 'amass': lambda: execute_amass_scan(target, optimized_params), + 'subfinder': lambda: execute_subfinder_scan(target, optimized_params) + } + + # Execute the tool if we have a mapping for it + if tool_name in tool_execution_map: + result = tool_execution_map[tool_name]() + + # Extract vulnerability count from result + vuln_count = 0 + if result.get('success') and result.get('stdout'): + # Simple vulnerability detection based on common patterns + output = result.get('stdout', '') + vuln_indicators = ['CRITICAL', 'HIGH', 'MEDIUM', 'VULNERABILITY', 'EXPLOIT', 'SQL injection', 'XSS', 'CSRF'] + vuln_count = sum(1 for indicator in vuln_indicators if indicator.lower() in output.lower()) + + return { + "tool": tool_name, + "parameters": optimized_params, + "status": "success" if result.get('success') else "failed", + "timestamp": datetime.now().isoformat(), + "execution_time": result.get('execution_time', 0), + "stdout": result.get('stdout', ''), + "stderr": result.get('stderr', ''), + "vulnerabilities_found": vuln_count, + "command": result.get('command', ''), + "success": result.get('success', False) + } + else: + logger.warning(f"⚠️ No execution mapping found for tool: {tool_name}") + return { + "tool": tool_name, + "parameters": optimized_params, + "status": "skipped", + "timestamp": datetime.now().isoformat(), + "error": f"Tool {tool_name} not implemented in execution map", + "success": False + } + + except Exception as e: + logger.error(f"❌ Error executing {tool_name}: {str(e)}") + return { + "tool": tool_name, + "status": "failed", + "timestamp": datetime.now().isoformat(), + "error": str(e), + "success": False + } + + # Execute tools in parallel using ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=min(len(selected_tools), 5)) as executor: + # Submit all tool executions + future_to_tool = { + executor.submit(execute_single_tool, tool, target, profile): tool + for tool in selected_tools } - scan_results["tools_executed"].append(tool_result) + # Collect results as they complete + for future in future_to_tool: + tool_result = future.result() + scan_results["tools_executed"].append(tool_result) + + # Accumulate vulnerability count + if tool_result.get("vulnerabilities_found"): + scan_results["total_vulnerabilities"] += tool_result["vulnerabilities_found"] + + # Combine outputs + if tool_result.get("stdout"): + scan_results["combined_output"] += f"\n=== {tool_result['tool'].upper()} OUTPUT ===\n" + scan_results["combined_output"] += tool_result["stdout"] + scan_results["combined_output"] += "\n" + "="*50 + "\n" + + # Create execution summary + successful_tools = [t for t in scan_results["tools_executed"] if t.get("success")] + failed_tools = [t for t in scan_results["tools_executed"] if not t.get("success")] + + scan_results["execution_summary"] = { + "total_tools": len(selected_tools), + "successful_tools": len(successful_tools), + "failed_tools": len(failed_tools), + "success_rate": len(successful_tools) / len(selected_tools) * 100 if selected_tools else 0, + "total_execution_time": sum(t.get("execution_time", 0) for t in scan_results["tools_executed"]), + "tools_used": [t["tool"] for t in successful_tools] + } logger.info(f"✅ Intelligent smart scan completed for {target}") + logger.info(f"📊 Results: {len(successful_tools)}/{len(selected_tools)} tools successful, {scan_results['total_vulnerabilities']} vulnerabilities found") return jsonify({ "success": True, @@ -7855,7 +7950,224 @@ def intelligent_smart_scan(): except Exception as e: logger.error(f"💥 Error in intelligent smart scan: {str(e)}") - return jsonify({"error": f"Server error: {str(e)}"}), 500 + return jsonify({"error": f"Server error: {str(e)}", "success": False}), 500 + +# Helper functions for intelligent smart scan tool execution +def execute_nmap_scan(target, params): + """Execute nmap scan with optimized parameters""" + try: + scan_type = params.get('scan_type', '-sV') + ports = params.get('ports', '') + additional_args = params.get('additional_args', '') + + # Build nmap command + cmd_parts = ['nmap', scan_type] + if ports: + cmd_parts.extend(['-p', ports]) + if additional_args: + cmd_parts.extend(additional_args.split()) + cmd_parts.append(target) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_gobuster_scan(target, params): + """Execute gobuster scan with optimized parameters""" + try: + mode = params.get('mode', 'dir') + wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt') + additional_args = params.get('additional_args', '') + + cmd_parts = ['gobuster', mode, '-u', target, '-w', wordlist] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_nuclei_scan(target, params): + """Execute nuclei scan with optimized parameters""" + try: + severity = params.get('severity', '') + tags = params.get('tags', '') + additional_args = params.get('additional_args', '') + + cmd_parts = ['nuclei', '-u', target] + if severity: + cmd_parts.extend(['-severity', severity]) + if tags: + cmd_parts.extend(['-tags', tags]) + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_nikto_scan(target, params): + """Execute nikto scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['nikto', '-h', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_sqlmap_scan(target, params): + """Execute sqlmap scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '--batch --random-agent') + cmd_parts = ['sqlmap', '-u', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_ffuf_scan(target, params): + """Execute ffuf scan with optimized parameters""" + try: + wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt') + additional_args = params.get('additional_args', '') + + # Ensure target has FUZZ placeholder + if 'FUZZ' not in target: + target = target.rstrip('/') + '/FUZZ' + + cmd_parts = ['ffuf', '-u', target, '-w', wordlist] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_feroxbuster_scan(target, params): + """Execute feroxbuster scan with optimized parameters""" + try: + wordlist = params.get('wordlist', '/usr/share/wordlists/dirb/common.txt') + additional_args = params.get('additional_args', '') + + cmd_parts = ['feroxbuster', '-u', target, '-w', wordlist] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_katana_scan(target, params): + """Execute katana scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['katana', '-u', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_httpx_scan(target, params): + """Execute httpx scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '-tech-detect -status-code') + # Use shell command with pipe for httpx + cmd = f"echo {target} | httpx {additional_args}" + + return execute_command(cmd) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_wpscan_scan(target, params): + """Execute wpscan scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '--enumerate p,t,u') + cmd_parts = ['wpscan', '--url', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_dirsearch_scan(target, params): + """Execute dirsearch scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['dirsearch', '-u', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_arjun_scan(target, params): + """Execute arjun scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['arjun', '-u', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_paramspider_scan(target, params): + """Execute paramspider scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['paramspider', '-d', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_dalfox_scan(target, params): + """Execute dalfox scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['dalfox', 'url', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_amass_scan(target, params): + """Execute amass scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['amass', 'enum', '-d', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} + +def execute_subfinder_scan(target, params): + """Execute subfinder scan with optimized parameters""" + try: + additional_args = params.get('additional_args', '') + cmd_parts = ['subfinder', '-d', target] + if additional_args: + cmd_parts.extend(additional_args.split()) + + return execute_command(' '.join(cmd_parts)) + except Exception as e: + return {"success": False, "error": str(e)} @app.route("/api/intelligence/technology-detection", methods=["POST"]) def detect_technologies():