Files
hexstrike-ai/hexstrike_mcp.py
Muhammad Osama 0be3cfdebd update
2025-07-10 23:07:24 +05:00

2602 lines
98 KiB
Python

#!/usr/bin/env python3
# HexStrike AI MCP Client - Enhanced version with new features
import sys
import os
import argparse
import logging
from typing import Dict, Any, Optional
import requests
import time
from mcp.server.fastmcp import FastMCP
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
RESET = '\033[0m'
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors and emojis for MCP client"""
COLORS = {
'DEBUG': Colors.CYAN,
'INFO': Colors.GREEN,
'WARNING': Colors.YELLOW,
'ERROR': Colors.RED,
'CRITICAL': Colors.MAGENTA + Colors.BOLD
}
EMOJIS = {
'DEBUG': '🔍',
'INFO': '',
'WARNING': '⚠️',
'ERROR': '',
'CRITICAL': '🔥'
}
def format(self, record):
emoji = self.EMOJIS.get(record.levelname, '📝')
color = self.COLORS.get(record.levelname, Colors.WHITE)
# Add color and emoji to the message
record.msg = f"{color}{emoji} {record.msg}{Colors.RESET}"
return super().format(record)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="[🔥 HexStrike MCP] %(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# Apply colored formatter
for handler in logging.getLogger().handlers:
handler.setFormatter(ColoredFormatter(
"[🔥 HexStrike MCP] %(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger = logging.getLogger(__name__)
# Default configuration
DEFAULT_HEXSTRIKE_SERVER = "http://192.168.1.18:5000" # Update to your HexStrike server IP
DEFAULT_REQUEST_TIMEOUT = 300 # 5 minutes default timeout for API requests
MAX_RETRIES = 3 # Maximum number of retries for connection attempts
class HexStrikeClient:
"""Enhanced client for communicating with the HexStrike AI API Server"""
def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT):
"""
Initialize the HexStrike AI Client
Args:
server_url: URL of the HexStrike AI API Server
timeout: Request timeout in seconds
"""
self.server_url = server_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
# Try to connect to server with retries
connected = False
for i in range(MAX_RETRIES):
try:
logger.info(f"🔗 Attempting to connect to HexStrike AI API at {server_url} (attempt {i+1}/{MAX_RETRIES})")
# First try a direct connection test before using the health endpoint
try:
test_response = self.session.get(f"{self.server_url}/health", timeout=5)
test_response.raise_for_status()
health_check = test_response.json()
connected = True
logger.info(f"🎯 Successfully connected to HexStrike AI API Server at {server_url}")
logger.info(f"🏥 Server health status: {health_check.get('status', 'unknown')}")
logger.info(f"📊 Server version: {health_check.get('version', 'unknown')}")
break
except requests.exceptions.ConnectionError:
logger.warning(f"🔌 Connection refused to {server_url}. Make sure the HexStrike AI server is running.")
time.sleep(2) # Wait before retrying
except Exception as e:
logger.warning(f"⚠️ Connection test failed: {str(e)}")
time.sleep(2) # Wait before retrying
except Exception as e:
logger.warning(f"❌ Connection attempt {i+1} failed: {str(e)}")
time.sleep(2) # Wait before retrying
if not connected:
error_msg = f"Failed to establish connection to HexStrike AI API Server at {server_url} after {MAX_RETRIES} attempts"
logger.error(error_msg)
# We'll continue anyway to allow the MCP server to start, but tools will likely fail
def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Perform a GET request with optional query parameters.
Args:
endpoint: API endpoint path (without leading slash)
params: Optional query parameters
Returns:
Response data as dictionary
"""
if params is None:
params = {}
url = f"{self.server_url}/{endpoint}"
try:
logger.debug(f"📡 GET {url} with params: {params}")
response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"🚫 Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}", "success": False}
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
return {"error": f"Unexpected error: {str(e)}", "success": False}
def safe_post(self, endpoint: str, json_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform a POST request with JSON data.
Args:
endpoint: API endpoint path (without leading slash)
json_data: JSON data to send
Returns:
Response data as dictionary
"""
url = f"{self.server_url}/{endpoint}"
try:
logger.debug(f"📡 POST {url} with data: {json_data}")
response = self.session.post(url, json=json_data, timeout=self.timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"🚫 Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}", "success": False}
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
return {"error": f"Unexpected error: {str(e)}", "success": False}
def execute_command(self, command: str, use_cache: bool = True) -> Dict[str, Any]:
"""
Execute a generic command on the HexStrike server
Args:
command: Command to execute
use_cache: Whether to use caching for this command
Returns:
Command execution results
"""
return self.safe_post("api/command", {"command": command, "use_cache": use_cache})
def check_health(self) -> Dict[str, Any]:
"""
Check the health of the HexStrike AI API Server
Returns:
Health status information
"""
return self.safe_get("health")
def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP:
"""
Set up the MCP server with all enhanced tool functions
Args:
hexstrike_client: Initialized HexStrikeClient
Returns:
Configured FastMCP instance
"""
mcp = FastMCP("hexstrike-ai-mcp")
# ============================================================================
# CORE NETWORK SCANNING TOOLS
# ============================================================================
@mcp.tool()
def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute an enhanced Nmap scan against a target with real-time logging.
Args:
target: The IP address or hostname to scan
scan_type: Scan type (e.g., -sV for version detection, -sC for scripts)
ports: Comma-separated list of ports or port ranges
additional_args: Additional Nmap arguments
Returns:
Scan results with enhanced telemetry
"""
data = {
"target": target,
"scan_type": scan_type,
"ports": ports,
"additional_args": additional_args
}
logger.info(f"🔍 Initiating Nmap scan: {target}")
result = hexstrike_client.safe_post("api/tools/nmap", data)
if result.get("success"):
logger.info(f"✅ Nmap scan completed successfully for {target}")
else:
logger.error(f"❌ Nmap scan failed for {target}")
return result
@mcp.tool()
def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Gobuster to find directories, DNS subdomains, or virtual hosts with enhanced logging.
Args:
url: The target URL
mode: Scan mode (dir, dns, fuzz, vhost)
wordlist: Path to wordlist file
additional_args: Additional Gobuster arguments
Returns:
Scan results with enhanced telemetry
"""
data = {
"url": url,
"mode": mode,
"wordlist": wordlist,
"additional_args": additional_args
}
logger.info(f"📁 Starting Gobuster {mode} scan: {url}")
result = hexstrike_client.safe_post("api/tools/gobuster", data)
if result.get("success"):
logger.info(f"✅ Gobuster scan completed for {url}")
else:
logger.error(f"❌ Gobuster scan failed for {url}")
return result
@mcp.tool()
def nuclei_scan(target: str, severity: str = "", tags: str = "", template: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Nuclei vulnerability scanner with enhanced logging and real-time progress.
Args:
target: The target URL or IP
severity: Filter by severity (critical,high,medium,low,info)
tags: Filter by tags (e.g. cve,rce,lfi)
template: Custom template path
additional_args: Additional Nuclei arguments
Returns:
Scan results with discovered vulnerabilities and telemetry
"""
data = {
"target": target,
"severity": severity,
"tags": tags,
"template": template,
"additional_args": additional_args
}
logger.info(f"🔬 Starting Nuclei vulnerability scan: {target}")
result = hexstrike_client.safe_post("api/tools/nuclei", data)
if result.get("success"):
logger.info(f"✅ Nuclei scan completed for {target}")
else:
logger.error(f"❌ Nuclei scan failed for {target}")
return result
# ============================================================================
# CLOUD SECURITY TOOLS
# ============================================================================
@mcp.tool()
def prowler_scan(provider: str = "aws", profile: str = "default", region: str = "", checks: str = "", output_dir: str = "/tmp/prowler_output", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Prowler for comprehensive cloud security assessment.
Args:
provider: Cloud provider (aws, azure, gcp)
profile: AWS profile to use
region: Specific region to scan
checks: Specific checks to run
output_dir: Directory to save results
output_format: Output format (json, csv, html)
additional_args: Additional Prowler arguments
Returns:
Cloud security assessment results
"""
data = {
"provider": provider,
"profile": profile,
"region": region,
"checks": checks,
"output_dir": output_dir,
"output_format": output_format,
"additional_args": additional_args
}
logger.info(f"☁️ Starting Prowler {provider} security assessment")
result = hexstrike_client.safe_post("api/tools/prowler", data)
if result.get("success"):
logger.info(f"✅ Prowler assessment completed")
else:
logger.error(f"❌ Prowler assessment failed")
return result
@mcp.tool()
def trivy_scan(scan_type: str = "image", target: str = "", output_format: str = "json", severity: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Trivy for container and filesystem vulnerability scanning.
Args:
scan_type: Type of scan (image, fs, repo, config)
target: Target to scan (image name, directory, repository)
output_format: Output format (json, table, sarif)
severity: Severity filter (UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL)
output_file: File to save results
additional_args: Additional Trivy arguments
Returns:
Vulnerability scan results
"""
data = {
"scan_type": scan_type,
"target": target,
"output_format": output_format,
"severity": severity,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Trivy {scan_type} scan: {target}")
result = hexstrike_client.safe_post("api/tools/trivy", data)
if result.get("success"):
logger.info(f"✅ Trivy scan completed for {target}")
else:
logger.error(f"❌ Trivy scan failed for {target}")
return result
# ============================================================================
# FILE OPERATIONS & PAYLOAD GENERATION
# ============================================================================
@mcp.tool()
def create_file(filename: str, content: str, binary: bool = False) -> Dict[str, Any]:
"""
Create a file with specified content on the HexStrike server.
Args:
filename: Name of the file to create
content: Content to write to the file
binary: Whether the content is binary data
Returns:
File creation results
"""
data = {
"filename": filename,
"content": content,
"binary": binary
}
logger.info(f"📄 Creating file: {filename}")
result = hexstrike_client.safe_post("api/files/create", data)
if result.get("success"):
logger.info(f"✅ File created successfully: {filename}")
else:
logger.error(f"❌ Failed to create file: {filename}")
return result
@mcp.tool()
def modify_file(filename: str, content: str, append: bool = False) -> Dict[str, Any]:
"""
Modify an existing file on the HexStrike server.
Args:
filename: Name of the file to modify
content: Content to write or append
append: Whether to append to the file (True) or overwrite (False)
Returns:
File modification results
"""
data = {
"filename": filename,
"content": content,
"append": append
}
logger.info(f"✏️ Modifying file: {filename}")
result = hexstrike_client.safe_post("api/files/modify", data)
if result.get("success"):
logger.info(f"✅ File modified successfully: {filename}")
else:
logger.error(f"❌ Failed to modify file: {filename}")
return result
@mcp.tool()
def delete_file(filename: str) -> Dict[str, Any]:
"""
Delete a file or directory on the HexStrike server.
Args:
filename: Name of the file or directory to delete
Returns:
File deletion results
"""
data = {
"filename": filename
}
logger.info(f"🗑️ Deleting file: {filename}")
result = hexstrike_client.safe_post("api/files/delete", data)
if result.get("success"):
logger.info(f"✅ File deleted successfully: {filename}")
else:
logger.error(f"❌ Failed to delete file: {filename}")
return result
@mcp.tool()
def list_files(directory: str = ".") -> Dict[str, Any]:
"""
List files in a directory on the HexStrike server.
Args:
directory: Directory to list (relative to server's base directory)
Returns:
Directory listing results
"""
logger.info(f"📂 Listing files in directory: {directory}")
result = hexstrike_client.safe_get("api/files/list", {"directory": directory})
if result.get("success"):
file_count = len(result.get("files", []))
logger.info(f"✅ Listed {file_count} files in {directory}")
else:
logger.error(f"❌ Failed to list files in {directory}")
return result
@mcp.tool()
def generate_payload(payload_type: str = "buffer", size: int = 1024, pattern: str = "A", filename: str = "") -> Dict[str, Any]:
"""
Generate large payloads for testing and exploitation.
Args:
payload_type: Type of payload (buffer, cyclic, random)
size: Size of the payload in bytes
pattern: Pattern to use for buffer payloads
filename: Custom filename (auto-generated if empty)
Returns:
Payload generation results
"""
data = {
"type": payload_type,
"size": size,
"pattern": pattern
}
if filename:
data["filename"] = filename
logger.info(f"🎯 Generating {payload_type} payload: {size} bytes")
result = hexstrike_client.safe_post("api/payloads/generate", data)
if result.get("success"):
logger.info(f"✅ Payload generated successfully")
else:
logger.error(f"❌ Failed to generate payload")
return result
# ============================================================================
# PYTHON ENVIRONMENT MANAGEMENT
# ============================================================================
@mcp.tool()
def install_python_package(package: str, env_name: str = "default") -> Dict[str, Any]:
"""
Install a Python package in a virtual environment on the HexStrike server.
Args:
package: Name of the Python package to install
env_name: Name of the virtual environment
Returns:
Package installation results
"""
data = {
"package": package,
"env_name": env_name
}
logger.info(f"📦 Installing Python package: {package} in env {env_name}")
result = hexstrike_client.safe_post("api/python/install", data)
if result.get("success"):
logger.info(f"✅ Package {package} installed successfully")
else:
logger.error(f"❌ Failed to install package {package}")
return result
@mcp.tool()
def execute_python_script(script: str, env_name: str = "default", filename: str = "") -> Dict[str, Any]:
"""
Execute a Python script in a virtual environment on the HexStrike server.
Args:
script: Python script content to execute
env_name: Name of the virtual environment
filename: Custom script filename (auto-generated if empty)
Returns:
Script execution results
"""
data = {
"script": script,
"env_name": env_name
}
if filename:
data["filename"] = filename
logger.info(f"🐍 Executing Python script in env {env_name}")
result = hexstrike_client.safe_post("api/python/execute", data)
if result.get("success"):
logger.info(f"✅ Python script executed successfully")
else:
logger.error(f"❌ Python script execution failed")
return result
# ============================================================================
# ADDITIONAL SECURITY TOOLS FROM ORIGINAL IMPLEMENTATION
# ============================================================================
@mcp.tool()
def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Dirb for directory brute forcing with enhanced logging.
Args:
url: The target URL
wordlist: Path to wordlist file
additional_args: Additional Dirb arguments
Returns:
Scan results with enhanced telemetry
"""
data = {
"url": url,
"wordlist": wordlist,
"additional_args": additional_args
}
logger.info(f"📁 Starting Dirb scan: {url}")
result = hexstrike_client.safe_post("api/tools/dirb", data)
if result.get("success"):
logger.info(f"✅ Dirb scan completed for {url}")
else:
logger.error(f"❌ Dirb scan failed for {url}")
return result
@mcp.tool()
def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Nikto web vulnerability scanner with enhanced logging.
Args:
target: The target URL or IP
additional_args: Additional Nikto arguments
Returns:
Scan results with discovered vulnerabilities
"""
data = {
"target": target,
"additional_args": additional_args
}
logger.info(f"🔬 Starting Nikto scan: {target}")
result = hexstrike_client.safe_post("api/tools/nikto", data)
if result.get("success"):
logger.info(f"✅ Nikto scan completed for {target}")
else:
logger.error(f"❌ Nikto scan failed for {target}")
return result
@mcp.tool()
def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute SQLMap for SQL injection testing with enhanced logging.
Args:
url: The target URL
data: POST data for testing
additional_args: Additional SQLMap arguments
Returns:
SQL injection test results
"""
data_payload = {
"url": url,
"data": data,
"additional_args": additional_args
}
logger.info(f"💉 Starting SQLMap scan: {url}")
result = hexstrike_client.safe_post("api/tools/sqlmap", data_payload)
if result.get("success"):
logger.info(f"✅ SQLMap scan completed for {url}")
else:
logger.error(f"❌ SQLMap scan failed for {url}")
return result
@mcp.tool()
def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]:
"""
Execute a Metasploit module with enhanced logging.
Args:
module: The Metasploit module to use
options: Dictionary of module options
Returns:
Metasploit execution results
"""
data = {
"module": module,
"options": options
}
logger.info(f"🚀 Starting Metasploit module: {module}")
result = hexstrike_client.safe_post("api/tools/metasploit", data)
if result.get("success"):
logger.info(f"✅ Metasploit module completed: {module}")
else:
logger.error(f"❌ Metasploit module failed: {module}")
return result
@mcp.tool()
def hydra_attack(
target: str,
service: str,
username: str = "",
username_file: str = "",
password: str = "",
password_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Execute Hydra for password brute forcing with enhanced logging.
Args:
target: The target IP or hostname
service: The service to attack (ssh, ftp, http, etc.)
username: Single username to test
username_file: File containing usernames
password: Single password to test
password_file: File containing passwords
additional_args: Additional Hydra arguments
Returns:
Brute force attack results
"""
data = {
"target": target,
"service": service,
"username": username,
"username_file": username_file,
"password": password,
"password_file": password_file,
"additional_args": additional_args
}
logger.info(f"🔑 Starting Hydra attack: {target}:{service}")
result = hexstrike_client.safe_post("api/tools/hydra", data)
if result.get("success"):
logger.info(f"✅ Hydra attack completed for {target}")
else:
logger.error(f"❌ Hydra attack failed for {target}")
return result
@mcp.tool()
def john_crack(
hash_file: str,
wordlist: str = "/usr/share/wordlists/rockyou.txt",
format_type: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Execute John the Ripper for password cracking with enhanced logging.
Args:
hash_file: File containing password hashes
wordlist: Wordlist file to use
format_type: Hash format type
additional_args: Additional John arguments
Returns:
Password cracking results
"""
data = {
"hash_file": hash_file,
"wordlist": wordlist,
"format": format_type,
"additional_args": additional_args
}
logger.info(f"🔐 Starting John the Ripper: {hash_file}")
result = hexstrike_client.safe_post("api/tools/john", data)
if result.get("success"):
logger.info(f"✅ John the Ripper completed")
else:
logger.error(f"❌ John the Ripper failed")
return result
@mcp.tool()
def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute WPScan for WordPress vulnerability scanning with enhanced logging.
Args:
url: The WordPress site URL
additional_args: Additional WPScan arguments
Returns:
WordPress vulnerability scan results
"""
data = {
"url": url,
"additional_args": additional_args
}
logger.info(f"🔍 Starting WPScan: {url}")
result = hexstrike_client.safe_post("api/tools/wpscan", data)
if result.get("success"):
logger.info(f"✅ WPScan completed for {url}")
else:
logger.error(f"❌ WPScan failed for {url}")
return result
@mcp.tool()
def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]:
"""
Execute Enum4linux for SMB enumeration with enhanced logging.
Args:
target: The target IP address
additional_args: Additional Enum4linux arguments
Returns:
SMB enumeration results
"""
data = {
"target": target,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Enum4linux: {target}")
result = hexstrike_client.safe_post("api/tools/enum4linux", data)
if result.get("success"):
logger.info(f"✅ Enum4linux completed for {target}")
else:
logger.error(f"❌ Enum4linux failed for {target}")
return result
@mcp.tool()
def ffuf_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", mode: str = "directory", match_codes: str = "200,204,301,302,307,401,403", additional_args: str = "") -> Dict[str, Any]:
"""
Execute FFuf for web fuzzing with enhanced logging.
Args:
url: The target URL
wordlist: Wordlist file to use
mode: Fuzzing mode (directory, vhost, parameter)
match_codes: HTTP status codes to match
additional_args: Additional FFuf arguments
Returns:
Web fuzzing results
"""
data = {
"url": url,
"wordlist": wordlist,
"mode": mode,
"match_codes": match_codes,
"additional_args": additional_args
}
logger.info(f"🔍 Starting FFuf {mode} fuzzing: {url}")
result = hexstrike_client.safe_post("api/tools/ffuf", data)
if result.get("success"):
logger.info(f"✅ FFuf fuzzing completed for {url}")
else:
logger.error(f"❌ FFuf fuzzing failed for {url}")
return result
@mcp.tool()
def netexec_scan(target: str, protocol: str = "smb", username: str = "", password: str = "", hash_value: str = "", module: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute NetExec (formerly CrackMapExec) for network enumeration with enhanced logging.
Args:
target: The target IP or network
protocol: Protocol to use (smb, ssh, winrm, etc.)
username: Username for authentication
password: Password for authentication
hash_value: Hash for pass-the-hash attacks
module: NetExec module to execute
additional_args: Additional NetExec arguments
Returns:
Network enumeration results
"""
data = {
"target": target,
"protocol": protocol,
"username": username,
"password": password,
"hash": hash_value,
"module": module,
"additional_args": additional_args
}
logger.info(f"🔍 Starting NetExec {protocol} scan: {target}")
result = hexstrike_client.safe_post("api/tools/netexec", data)
if result.get("success"):
logger.info(f"✅ NetExec scan completed for {target}")
else:
logger.error(f"❌ NetExec scan failed for {target}")
return result
@mcp.tool()
def amass_scan(domain: str, mode: str = "enum", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Amass for subdomain enumeration with enhanced logging.
Args:
domain: The target domain
mode: Amass mode (enum, intel, viz)
additional_args: Additional Amass arguments
Returns:
Subdomain enumeration results
"""
data = {
"domain": domain,
"mode": mode,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Amass {mode}: {domain}")
result = hexstrike_client.safe_post("api/tools/amass", data)
if result.get("success"):
logger.info(f"✅ Amass completed for {domain}")
else:
logger.error(f"❌ Amass failed for {domain}")
return result
@mcp.tool()
def hashcat_crack(hash_file: str, hash_type: str, attack_mode: str = "0", wordlist: str = "/usr/share/wordlists/rockyou.txt", mask: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Hashcat for advanced password cracking with enhanced logging.
Args:
hash_file: File containing password hashes
hash_type: Hash type number for Hashcat
attack_mode: Attack mode (0=dict, 1=combo, 3=mask, etc.)
wordlist: Wordlist file for dictionary attacks
mask: Mask for mask attacks
additional_args: Additional Hashcat arguments
Returns:
Password cracking results
"""
data = {
"hash_file": hash_file,
"hash_type": hash_type,
"attack_mode": attack_mode,
"wordlist": wordlist,
"mask": mask,
"additional_args": additional_args
}
logger.info(f"🔐 Starting Hashcat attack: mode {attack_mode}")
result = hexstrike_client.safe_post("api/tools/hashcat", data)
if result.get("success"):
logger.info(f"✅ Hashcat attack completed")
else:
logger.error(f"❌ Hashcat attack failed")
return result
@mcp.tool()
def subfinder_scan(domain: str, silent: bool = True, all_sources: bool = False, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Subfinder for passive subdomain enumeration with enhanced logging.
Args:
domain: The target domain
silent: Run in silent mode
all_sources: Use all sources
additional_args: Additional Subfinder arguments
Returns:
Passive subdomain enumeration results
"""
data = {
"domain": domain,
"silent": silent,
"all_sources": all_sources,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Subfinder: {domain}")
result = hexstrike_client.safe_post("api/tools/subfinder", data)
if result.get("success"):
logger.info(f"✅ Subfinder completed for {domain}")
else:
logger.error(f"❌ Subfinder failed for {domain}")
return result
@mcp.tool()
def smbmap_scan(target: str, username: str = "", password: str = "", domain: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute SMBMap for SMB share enumeration with enhanced logging.
Args:
target: The target IP address
username: Username for authentication
password: Password for authentication
domain: Domain for authentication
additional_args: Additional SMBMap arguments
Returns:
SMB share enumeration results
"""
data = {
"target": target,
"username": username,
"password": password,
"domain": domain,
"additional_args": additional_args
}
logger.info(f"🔍 Starting SMBMap: {target}")
result = hexstrike_client.safe_post("api/tools/smbmap", data)
if result.get("success"):
logger.info(f"✅ SMBMap completed for {target}")
else:
logger.error(f"❌ SMBMap failed for {target}")
return result
@mcp.tool()
def volatility_analyze(memory_file: str, plugin: str, profile: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Volatility for memory forensics analysis with enhanced logging.
Args:
memory_file: Path to memory dump file
plugin: Volatility plugin to use
profile: Memory profile to use
additional_args: Additional Volatility arguments
Returns:
Memory forensics analysis results
"""
data = {
"memory_file": memory_file,
"plugin": plugin,
"profile": profile,
"additional_args": additional_args
}
logger.info(f"🧠 Starting Volatility analysis: {plugin}")
result = hexstrike_client.safe_post("api/tools/volatility", data)
if result.get("success"):
logger.info(f"✅ Volatility analysis completed")
else:
logger.error(f"❌ Volatility analysis failed")
return result
@mcp.tool()
def msfvenom_generate(payload: str, format_type: str = "", output_file: str = "", encoder: str = "", iterations: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute MSFVenom for payload generation with enhanced logging.
Args:
payload: The payload to generate
format_type: Output format (exe, elf, raw, etc.)
output_file: Output file path
encoder: Encoder to use
iterations: Number of encoding iterations
additional_args: Additional MSFVenom arguments
Returns:
Payload generation results
"""
data = {
"payload": payload,
"format": format_type,
"output_file": output_file,
"encoder": encoder,
"iterations": iterations,
"additional_args": additional_args
}
logger.info(f"🚀 Starting MSFVenom payload generation: {payload}")
result = hexstrike_client.safe_post("api/tools/msfvenom", data)
if result.get("success"):
logger.info(f"✅ MSFVenom payload generated")
else:
logger.error(f"❌ MSFVenom payload generation failed")
return result
# ============================================================================
# BINARY ANALYSIS & REVERSE ENGINEERING TOOLS
# ============================================================================
@mcp.tool()
def gdb_analyze(binary: str, commands: str = "", script_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute GDB for binary analysis and debugging with enhanced logging.
Args:
binary: Path to the binary file
commands: GDB commands to execute
script_file: Path to GDB script file
additional_args: Additional GDB arguments
Returns:
Binary analysis results
"""
data = {
"binary": binary,
"commands": commands,
"script_file": script_file,
"additional_args": additional_args
}
logger.info(f"🔧 Starting GDB analysis: {binary}")
result = hexstrike_client.safe_post("api/tools/gdb", data)
if result.get("success"):
logger.info(f"✅ GDB analysis completed for {binary}")
else:
logger.error(f"❌ GDB analysis failed for {binary}")
return result
@mcp.tool()
def radare2_analyze(binary: str, commands: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Radare2 for binary analysis and reverse engineering with enhanced logging.
Args:
binary: Path to the binary file
commands: Radare2 commands to execute
additional_args: Additional Radare2 arguments
Returns:
Binary analysis results
"""
data = {
"binary": binary,
"commands": commands,
"additional_args": additional_args
}
logger.info(f"🔧 Starting Radare2 analysis: {binary}")
result = hexstrike_client.safe_post("api/tools/radare2", data)
if result.get("success"):
logger.info(f"✅ Radare2 analysis completed for {binary}")
else:
logger.error(f"❌ Radare2 analysis failed for {binary}")
return result
@mcp.tool()
def binwalk_analyze(file_path: str, extract: bool = False, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Binwalk for firmware and file analysis with enhanced logging.
Args:
file_path: Path to the file to analyze
extract: Whether to extract discovered files
additional_args: Additional Binwalk arguments
Returns:
Firmware analysis results
"""
data = {
"file_path": file_path,
"extract": extract,
"additional_args": additional_args
}
logger.info(f"🔧 Starting Binwalk analysis: {file_path}")
result = hexstrike_client.safe_post("api/tools/binwalk", data)
if result.get("success"):
logger.info(f"✅ Binwalk analysis completed for {file_path}")
else:
logger.error(f"❌ Binwalk analysis failed for {file_path}")
return result
@mcp.tool()
def ropgadget_search(binary: str, gadget_type: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Search for ROP gadgets in a binary using ROPgadget with enhanced logging.
Args:
binary: Path to the binary file
gadget_type: Type of gadgets to search for
additional_args: Additional ROPgadget arguments
Returns:
ROP gadget search results
"""
data = {
"binary": binary,
"gadget_type": gadget_type,
"additional_args": additional_args
}
logger.info(f"🔧 Starting ROPgadget search: {binary}")
result = hexstrike_client.safe_post("api/tools/ropgadget", data)
if result.get("success"):
logger.info(f"✅ ROPgadget search completed for {binary}")
else:
logger.error(f"❌ ROPgadget search failed for {binary}")
return result
@mcp.tool()
def checksec_analyze(binary: str) -> Dict[str, Any]:
"""
Check security features of a binary with enhanced logging.
Args:
binary: Path to the binary file
Returns:
Security features analysis results
"""
data = {
"binary": binary
}
logger.info(f"🔧 Starting Checksec analysis: {binary}")
result = hexstrike_client.safe_post("api/tools/checksec", data)
if result.get("success"):
logger.info(f"✅ Checksec analysis completed for {binary}")
else:
logger.error(f"❌ Checksec analysis failed for {binary}")
return result
@mcp.tool()
def xxd_hexdump(file_path: str, offset: str = "0", length: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Create a hex dump of a file using xxd with enhanced logging.
Args:
file_path: Path to the file
offset: Offset to start reading from
length: Number of bytes to read
additional_args: Additional xxd arguments
Returns:
Hex dump results
"""
data = {
"file_path": file_path,
"offset": offset,
"length": length,
"additional_args": additional_args
}
logger.info(f"🔧 Starting XXD hex dump: {file_path}")
result = hexstrike_client.safe_post("api/tools/xxd", data)
if result.get("success"):
logger.info(f"✅ XXD hex dump completed for {file_path}")
else:
logger.error(f"❌ XXD hex dump failed for {file_path}")
return result
@mcp.tool()
def strings_extract(file_path: str, min_len: int = 4, additional_args: str = "") -> Dict[str, Any]:
"""
Extract strings from a binary file with enhanced logging.
Args:
file_path: Path to the file
min_len: Minimum string length
additional_args: Additional strings arguments
Returns:
String extraction results
"""
data = {
"file_path": file_path,
"min_len": min_len,
"additional_args": additional_args
}
logger.info(f"🔧 Starting Strings extraction: {file_path}")
result = hexstrike_client.safe_post("api/tools/strings", data)
if result.get("success"):
logger.info(f"✅ Strings extraction completed for {file_path}")
else:
logger.error(f"❌ Strings extraction failed for {file_path}")
return result
@mcp.tool()
def objdump_analyze(binary: str, disassemble: bool = True, additional_args: str = "") -> Dict[str, Any]:
"""
Analyze a binary using objdump with enhanced logging.
Args:
binary: Path to the binary file
disassemble: Whether to disassemble the binary
additional_args: Additional objdump arguments
Returns:
Binary analysis results
"""
data = {
"binary": binary,
"disassemble": disassemble,
"additional_args": additional_args
}
logger.info(f"🔧 Starting Objdump analysis: {binary}")
result = hexstrike_client.safe_post("api/tools/objdump", data)
if result.get("success"):
logger.info(f"✅ Objdump analysis completed for {binary}")
else:
logger.error(f"❌ Objdump analysis failed for {binary}")
return result
@mcp.tool()
def feroxbuster_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", threads: int = 10, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Feroxbuster for recursive content discovery with enhanced logging.
Args:
url: The target URL
wordlist: Wordlist file to use
threads: Number of threads
additional_args: Additional Feroxbuster arguments
Returns:
Content discovery results
"""
data = {
"url": url,
"wordlist": wordlist,
"threads": threads,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Feroxbuster scan: {url}")
result = hexstrike_client.safe_post("api/tools/feroxbuster", data)
if result.get("success"):
logger.info(f"✅ Feroxbuster scan completed for {url}")
else:
logger.error(f"❌ Feroxbuster scan failed for {url}")
return result
@mcp.tool()
def dotdotpwn_scan(target: str, module: str = "http", additional_args: str = "") -> Dict[str, Any]:
"""
Execute DotDotPwn for directory traversal testing with enhanced logging.
Args:
target: The target hostname or IP
module: Module to use (http, ftp, tftp, etc.)
additional_args: Additional DotDotPwn arguments
Returns:
Directory traversal test results
"""
data = {
"target": target,
"module": module,
"additional_args": additional_args
}
logger.info(f"🔍 Starting DotDotPwn scan: {target}")
result = hexstrike_client.safe_post("api/tools/dotdotpwn", data)
if result.get("success"):
logger.info(f"✅ DotDotPwn scan completed for {target}")
else:
logger.error(f"❌ DotDotPwn scan failed for {target}")
return result
@mcp.tool()
def xsser_scan(url: str, params: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute XSSer for XSS vulnerability testing with enhanced logging.
Args:
url: The target URL
params: Parameters to test
additional_args: Additional XSSer arguments
Returns:
XSS vulnerability test results
"""
data = {
"url": url,
"params": params,
"additional_args": additional_args
}
logger.info(f"🔍 Starting XSSer scan: {url}")
result = hexstrike_client.safe_post("api/tools/xsser", data)
if result.get("success"):
logger.info(f"✅ XSSer scan completed for {url}")
else:
logger.error(f"❌ XSSer scan failed for {url}")
return result
@mcp.tool()
def wfuzz_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Wfuzz for web application fuzzing with enhanced logging.
Args:
url: The target URL (use FUZZ where you want to inject payloads)
wordlist: Wordlist file to use
additional_args: Additional Wfuzz arguments
Returns:
Web application fuzzing results
"""
data = {
"url": url,
"wordlist": wordlist,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Wfuzz scan: {url}")
result = hexstrike_client.safe_post("api/tools/wfuzz", data)
if result.get("success"):
logger.info(f"✅ Wfuzz scan completed for {url}")
else:
logger.error(f"❌ Wfuzz scan failed for {url}")
return result
# ============================================================================
# AI-POWERED PAYLOAD GENERATION (v5.0 ENHANCEMENT)
# ============================================================================
@mcp.tool()
def ai_generate_payload(attack_type: str, complexity: str = "basic", technology: str = "", url: str = "") -> Dict[str, Any]:
"""
Generate AI-powered contextual payloads for security testing.
Args:
attack_type: Type of attack (xss, sqli, lfi, cmd_injection, ssti, xxe)
complexity: Complexity level (basic, advanced, bypass)
technology: Target technology (php, asp, jsp, python, nodejs)
url: Target URL for context
Returns:
Contextual payloads with risk assessment and test cases
"""
data = {
"attack_type": attack_type,
"complexity": complexity,
"technology": technology,
"url": url
}
logger.info(f"🤖 Generating AI payloads for {attack_type} attack")
result = hexstrike_client.safe_post("api/ai/generate_payload", data)
if result.get("success"):
payload_data = result.get("ai_payload_generation", {})
count = payload_data.get("payload_count", 0)
logger.info(f"✅ Generated {count} contextual {attack_type} payloads")
# Log some example payloads for user awareness
payloads = payload_data.get("payloads", [])
if payloads:
logger.info("🎯 Sample payloads generated:")
for i, payload_info in enumerate(payloads[:3]): # Show first 3
risk = payload_info.get("risk_level", "UNKNOWN")
context = payload_info.get("context", "basic")
logger.info(f" ├─ [{risk}] {context}: {payload_info['payload'][:50]}...")
else:
logger.error("❌ AI payload generation failed")
return result
@mcp.tool()
def ai_test_payload(payload: str, target_url: str, method: str = "GET") -> Dict[str, Any]:
"""
Test generated payload against target with AI analysis.
Args:
payload: The payload to test
target_url: Target URL to test against
method: HTTP method (GET, POST)
Returns:
Test results with AI analysis and vulnerability assessment
"""
data = {
"payload": payload,
"target_url": target_url,
"method": method
}
logger.info(f"🧪 Testing AI payload against {target_url}")
result = hexstrike_client.safe_post("api/ai/test_payload", data)
if result.get("success"):
analysis = result.get("ai_analysis", {})
potential_vuln = analysis.get("potential_vulnerability", False)
logger.info(f"🔍 Payload test completed | Vulnerability detected: {potential_vuln}")
if potential_vuln:
logger.warning("⚠️ Potential vulnerability found! Review the response carefully.")
else:
logger.info("✅ No obvious vulnerability indicators detected")
else:
logger.error("❌ Payload testing failed")
return result
@mcp.tool()
def ai_generate_attack_suite(target_url: str, attack_types: str = "xss,sqli,lfi") -> Dict[str, Any]:
"""
Generate comprehensive attack suite with multiple payload types.
Args:
target_url: Target URL for testing
attack_types: Comma-separated list of attack types
Returns:
Comprehensive attack suite with multiple payload types
"""
attack_list = [attack.strip() for attack in attack_types.split(",")]
results = {
"target_url": target_url,
"attack_types": attack_list,
"payload_suites": {},
"summary": {
"total_payloads": 0,
"high_risk_payloads": 0,
"test_cases": 0
}
}
logger.info(f"🚀 Generating comprehensive attack suite for {target_url}")
logger.info(f"🎯 Attack types: {', '.join(attack_list)}")
for attack_type in attack_list:
logger.info(f"🤖 Generating {attack_type} payloads...")
# Generate payloads for this attack type
payload_result = self.ai_generate_payload(attack_type, "advanced", "", target_url)
if payload_result.get("success"):
payload_data = payload_result.get("ai_payload_generation", {})
results["payload_suites"][attack_type] = payload_data
# Update summary
results["summary"]["total_payloads"] += payload_data.get("payload_count", 0)
results["summary"]["test_cases"] += len(payload_data.get("test_cases", []))
# Count high-risk payloads
for payload_info in payload_data.get("payloads", []):
if payload_info.get("risk_level") == "HIGH":
results["summary"]["high_risk_payloads"] += 1
logger.info(f"✅ Attack suite generated:")
logger.info(f" ├─ Total payloads: {results['summary']['total_payloads']}")
logger.info(f" ├─ High-risk payloads: {results['summary']['high_risk_payloads']}")
logger.info(f" └─ Test cases: {results['summary']['test_cases']}")
return {
"success": True,
"attack_suite": results,
"timestamp": time.time()
}
# ============================================================================
# ADVANCED API TESTING TOOLS (v5.0 ENHANCEMENT)
# ============================================================================
@mcp.tool()
def api_fuzzer(base_url: str, endpoints: str = "", methods: str = "GET,POST,PUT,DELETE", wordlist: str = "/usr/share/wordlists/api/api-endpoints.txt") -> Dict[str, Any]:
"""
Advanced API endpoint fuzzing with intelligent parameter discovery.
Args:
base_url: Base URL of the API
endpoints: Comma-separated list of specific endpoints to test
methods: HTTP methods to test (comma-separated)
wordlist: Wordlist for endpoint discovery
Returns:
API fuzzing results with endpoint discovery and vulnerability assessment
"""
data = {
"base_url": base_url,
"endpoints": [e.strip() for e in endpoints.split(",") if e.strip()] if endpoints else [],
"methods": [m.strip() for m in methods.split(",")],
"wordlist": wordlist
}
logger.info(f"🔍 Starting API fuzzing: {base_url}")
result = hexstrike_client.safe_post("api/tools/api_fuzzer", data)
if result.get("success"):
fuzzing_type = result.get("fuzzing_type", "unknown")
if fuzzing_type == "endpoint_testing":
endpoint_count = len(result.get("results", []))
logger.info(f"✅ API endpoint testing completed: {endpoint_count} endpoints tested")
else:
logger.info(f"✅ API endpoint discovery completed")
else:
logger.error("❌ API fuzzing failed")
return result
@mcp.tool()
def graphql_scanner(endpoint: str, introspection: bool = True, query_depth: int = 10, test_mutations: bool = True) -> Dict[str, Any]:
"""
Advanced GraphQL security scanning and introspection.
Args:
endpoint: GraphQL endpoint URL
introspection: Test introspection queries
query_depth: Maximum query depth to test
test_mutations: Test mutation operations
Returns:
GraphQL security scan results with vulnerability assessment
"""
data = {
"endpoint": endpoint,
"introspection": introspection,
"query_depth": query_depth,
"test_mutations": test_mutations
}
logger.info(f"🔍 Starting GraphQL security scan: {endpoint}")
result = hexstrike_client.safe_post("api/tools/graphql_scanner", data)
if result.get("success"):
scan_results = result.get("graphql_scan_results", {})
vuln_count = len(scan_results.get("vulnerabilities", []))
tests_count = len(scan_results.get("tests_performed", []))
logger.info(f"✅ GraphQL scan completed: {tests_count} tests, {vuln_count} vulnerabilities")
if vuln_count > 0:
logger.warning(f"⚠️ Found {vuln_count} GraphQL vulnerabilities!")
for vuln in scan_results.get("vulnerabilities", [])[:3]: # Show first 3
severity = vuln.get("severity", "UNKNOWN")
vuln_type = vuln.get("type", "unknown")
logger.warning(f" ├─ [{severity}] {vuln_type}")
else:
logger.error("❌ GraphQL scanning failed")
return result
@mcp.tool()
def jwt_analyzer(jwt_token: str, target_url: str = "") -> Dict[str, Any]:
"""
Advanced JWT token analysis and vulnerability testing.
Args:
jwt_token: JWT token to analyze
target_url: Optional target URL for testing token manipulation
Returns:
JWT analysis results with vulnerability assessment and attack vectors
"""
data = {
"jwt_token": jwt_token,
"target_url": target_url
}
logger.info(f"🔍 Starting JWT security analysis")
result = hexstrike_client.safe_post("api/tools/jwt_analyzer", data)
if result.get("success"):
analysis = result.get("jwt_analysis_results", {})
vuln_count = len(analysis.get("vulnerabilities", []))
algorithm = analysis.get("token_info", {}).get("algorithm", "unknown")
logger.info(f"✅ JWT analysis completed: {vuln_count} vulnerabilities found")
logger.info(f"🔐 Token algorithm: {algorithm}")
if vuln_count > 0:
logger.warning(f"⚠️ Found {vuln_count} JWT vulnerabilities!")
for vuln in analysis.get("vulnerabilities", [])[:3]: # Show first 3
severity = vuln.get("severity", "UNKNOWN")
vuln_type = vuln.get("type", "unknown")
logger.warning(f" ├─ [{severity}] {vuln_type}")
else:
logger.error("❌ JWT analysis failed")
return result
@mcp.tool()
def api_schema_analyzer(schema_url: str, schema_type: str = "openapi") -> Dict[str, Any]:
"""
Analyze API schemas and identify potential security issues.
Args:
schema_url: URL to the API schema (OpenAPI/Swagger/GraphQL)
schema_type: Type of schema (openapi, swagger, graphql)
Returns:
Schema analysis results with security issues and recommendations
"""
data = {
"schema_url": schema_url,
"schema_type": schema_type
}
logger.info(f"🔍 Starting API schema analysis: {schema_url}")
result = hexstrike_client.safe_post("api/tools/api_schema_analyzer", data)
if result.get("success"):
analysis = result.get("schema_analysis_results", {})
endpoint_count = len(analysis.get("endpoints_found", []))
issue_count = len(analysis.get("security_issues", []))
logger.info(f"✅ Schema analysis completed: {endpoint_count} endpoints, {issue_count} issues")
if issue_count > 0:
logger.warning(f"⚠️ Found {issue_count} security issues in schema!")
for issue in analysis.get("security_issues", [])[:3]: # Show first 3
severity = issue.get("severity", "UNKNOWN")
issue_type = issue.get("issue", "unknown")
logger.warning(f" ├─ [{severity}] {issue_type}")
if endpoint_count > 0:
logger.info(f"📊 Discovered endpoints:")
for endpoint in analysis.get("endpoints_found", [])[:5]: # Show first 5
method = endpoint.get("method", "GET")
path = endpoint.get("path", "/")
logger.info(f" ├─ {method} {path}")
else:
logger.error("❌ Schema analysis failed")
return result
@mcp.tool()
def comprehensive_api_audit(base_url: str, schema_url: str = "", jwt_token: str = "", graphql_endpoint: str = "") -> Dict[str, Any]:
"""
Comprehensive API security audit combining multiple testing techniques.
Args:
base_url: Base URL of the API
schema_url: Optional API schema URL
jwt_token: Optional JWT token for analysis
graphql_endpoint: Optional GraphQL endpoint
Returns:
Comprehensive audit results with all API security tests
"""
audit_results = {
"base_url": base_url,
"audit_timestamp": time.time(),
"tests_performed": [],
"total_vulnerabilities": 0,
"summary": {},
"recommendations": []
}
logger.info(f"🚀 Starting comprehensive API security audit: {base_url}")
# 1. API Endpoint Fuzzing
logger.info("🔍 Phase 1: API endpoint discovery and fuzzing")
fuzz_result = self.api_fuzzer(base_url)
if fuzz_result.get("success"):
audit_results["tests_performed"].append("api_fuzzing")
audit_results["api_fuzzing"] = fuzz_result
# 2. Schema Analysis (if provided)
if schema_url:
logger.info("🔍 Phase 2: API schema analysis")
schema_result = self.api_schema_analyzer(schema_url)
if schema_result.get("success"):
audit_results["tests_performed"].append("schema_analysis")
audit_results["schema_analysis"] = schema_result
schema_data = schema_result.get("schema_analysis_results", {})
audit_results["total_vulnerabilities"] += len(schema_data.get("security_issues", []))
# 3. JWT Analysis (if provided)
if jwt_token:
logger.info("🔍 Phase 3: JWT token analysis")
jwt_result = self.jwt_analyzer(jwt_token, base_url)
if jwt_result.get("success"):
audit_results["tests_performed"].append("jwt_analysis")
audit_results["jwt_analysis"] = jwt_result
jwt_data = jwt_result.get("jwt_analysis_results", {})
audit_results["total_vulnerabilities"] += len(jwt_data.get("vulnerabilities", []))
# 4. GraphQL Testing (if provided)
if graphql_endpoint:
logger.info("🔍 Phase 4: GraphQL security scanning")
graphql_result = self.graphql_scanner(graphql_endpoint)
if graphql_result.get("success"):
audit_results["tests_performed"].append("graphql_scanning")
audit_results["graphql_scanning"] = graphql_result
graphql_data = graphql_result.get("graphql_scan_results", {})
audit_results["total_vulnerabilities"] += len(graphql_data.get("vulnerabilities", []))
# Generate comprehensive recommendations
audit_results["recommendations"] = [
"Implement proper authentication and authorization",
"Use HTTPS for all API communications",
"Validate and sanitize all input parameters",
"Implement rate limiting and request throttling",
"Add comprehensive logging and monitoring",
"Regular security testing and code reviews",
"Keep API documentation updated and secure",
"Implement proper error handling"
]
# Summary
audit_results["summary"] = {
"tests_performed": len(audit_results["tests_performed"]),
"total_vulnerabilities": audit_results["total_vulnerabilities"],
"audit_coverage": "comprehensive" if len(audit_results["tests_performed"]) >= 3 else "partial"
}
logger.info(f"✅ Comprehensive API audit completed:")
logger.info(f" ├─ Tests performed: {audit_results['summary']['tests_performed']}")
logger.info(f" ├─ Total vulnerabilities: {audit_results['summary']['total_vulnerabilities']}")
logger.info(f" └─ Coverage: {audit_results['summary']['audit_coverage']}")
return {
"success": True,
"comprehensive_audit": audit_results
}
# ============================================================================
# ADVANCED CTF TOOLS (v5.0 ENHANCEMENT)
# ============================================================================
@mcp.tool()
def volatility3_analyze(memory_file: str, plugin: str, output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Volatility3 for advanced memory forensics with enhanced logging.
Args:
memory_file: Path to memory dump file
plugin: Volatility3 plugin to execute
output_file: Output file path
additional_args: Additional Volatility3 arguments
Returns:
Advanced memory forensics results
"""
data = {
"memory_file": memory_file,
"plugin": plugin,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🧠 Starting Volatility3 analysis: {plugin}")
result = hexstrike_client.safe_post("api/tools/volatility3", data)
if result.get("success"):
logger.info(f"✅ Volatility3 analysis completed")
else:
logger.error(f"❌ Volatility3 analysis failed")
return result
@mcp.tool()
def foremost_carving(input_file: str, output_dir: str = "/tmp/foremost_output", file_types: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Foremost for file carving with enhanced logging.
Args:
input_file: Input file or device to carve
output_dir: Output directory for carved files
file_types: File types to carve (jpg,gif,png,etc.)
additional_args: Additional Foremost arguments
Returns:
File carving results
"""
data = {
"input_file": input_file,
"output_dir": output_dir,
"file_types": file_types,
"additional_args": additional_args
}
logger.info(f"📁 Starting Foremost file carving: {input_file}")
result = hexstrike_client.safe_post("api/tools/foremost", data)
if result.get("success"):
logger.info(f"✅ Foremost carving completed")
else:
logger.error(f"❌ Foremost carving failed")
return result
@mcp.tool()
def steghide_analysis(action: str, cover_file: str, embed_file: str = "", passphrase: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Steghide for steganography analysis with enhanced logging.
Args:
action: Action to perform (extract, embed, info)
cover_file: Cover file for steganography
embed_file: File to embed (for embed action)
passphrase: Passphrase for steganography
output_file: Output file path
additional_args: Additional Steghide arguments
Returns:
Steganography analysis results
"""
data = {
"action": action,
"cover_file": cover_file,
"embed_file": embed_file,
"passphrase": passphrase,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🖼️ Starting Steghide {action}: {cover_file}")
result = hexstrike_client.safe_post("api/tools/steghide", data)
if result.get("success"):
logger.info(f"✅ Steghide {action} completed")
else:
logger.error(f"❌ Steghide {action} failed")
return result
@mcp.tool()
def exiftool_extract(file_path: str, output_format: str = "", tags: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute ExifTool for metadata extraction with enhanced logging.
Args:
file_path: Path to file for metadata extraction
output_format: Output format (json, xml, csv)
tags: Specific tags to extract
additional_args: Additional ExifTool arguments
Returns:
Metadata extraction results
"""
data = {
"file_path": file_path,
"output_format": output_format,
"tags": tags,
"additional_args": additional_args
}
logger.info(f"📷 Starting ExifTool analysis: {file_path}")
result = hexstrike_client.safe_post("api/tools/exiftool", data)
if result.get("success"):
logger.info(f"✅ ExifTool analysis completed")
else:
logger.error(f"❌ ExifTool analysis failed")
return result
@mcp.tool()
def hashpump_attack(signature: str, data: str, key_length: str, append_data: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute HashPump for hash length extension attacks with enhanced logging.
Args:
signature: Original hash signature
data: Original data
key_length: Length of secret key
append_data: Data to append
additional_args: Additional HashPump arguments
Returns:
Hash length extension attack results
"""
data = {
"signature": signature,
"data": data,
"key_length": key_length,
"append_data": append_data,
"additional_args": additional_args
}
logger.info(f"🔐 Starting HashPump attack")
result = hexstrike_client.safe_post("api/tools/hashpump", data)
if result.get("success"):
logger.info(f"✅ HashPump attack completed")
else:
logger.error(f"❌ HashPump attack failed")
return result
# ============================================================================
# BUG BOUNTY RECONNAISSANCE TOOLS (v5.0 ENHANCEMENT)
# ============================================================================
@mcp.tool()
def hakrawler_crawl(url: str, depth: int = 2, forms: bool = True, robots: bool = True, sitemap: bool = True, wayback: bool = False, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Hakrawler for web endpoint discovery with enhanced logging.
Args:
url: Target URL to crawl
depth: Crawling depth
forms: Include forms in crawling
robots: Check robots.txt
sitemap: Check sitemap.xml
wayback: Use Wayback Machine
additional_args: Additional Hakrawler arguments
Returns:
Web endpoint discovery results
"""
data = {
"url": url,
"depth": depth,
"forms": forms,
"robots": robots,
"sitemap": sitemap,
"wayback": wayback,
"additional_args": additional_args
}
logger.info(f"🕷️ Starting Hakrawler crawling: {url}")
result = hexstrike_client.safe_post("api/tools/hakrawler", data)
if result.get("success"):
logger.info(f"✅ Hakrawler crawling completed")
else:
logger.error(f"❌ Hakrawler crawling failed")
return result
@mcp.tool()
def httpx_probe(targets: str = "", target_file: str = "", ports: str = "", methods: str = "GET", status_code: str = "", content_length: bool = False, output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute HTTPx for HTTP probing with enhanced logging.
Args:
targets: Target URLs or IPs
target_file: File containing targets
ports: Ports to probe
methods: HTTP methods to use
status_code: Filter by status code
content_length: Show content length
output_file: Output file path
additional_args: Additional HTTPx arguments
Returns:
HTTP probing results
"""
data = {
"targets": targets,
"target_file": target_file,
"ports": ports,
"methods": methods,
"status_code": status_code,
"content_length": content_length,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🌐 Starting HTTPx probing")
result = hexstrike_client.safe_post("api/tools/httpx", data)
if result.get("success"):
logger.info(f"✅ HTTPx probing completed")
else:
logger.error(f"❌ HTTPx probing failed")
return result
@mcp.tool()
def paramspider_discovery(domain: str, exclude: str = "", output_file: str = "", level: int = 2, additional_args: str = "") -> Dict[str, Any]:
"""
Execute ParamSpider for parameter discovery with enhanced logging.
Args:
domain: Target domain
exclude: Extensions to exclude
output_file: Output file path
level: Crawling level
additional_args: Additional ParamSpider arguments
Returns:
Parameter discovery results
"""
data = {
"domain": domain,
"exclude": exclude,
"output_file": output_file,
"level": level,
"additional_args": additional_args
}
logger.info(f"🔍 Starting ParamSpider discovery: {domain}")
result = hexstrike_client.safe_post("api/tools/paramspider", data)
if result.get("success"):
logger.info(f"✅ ParamSpider discovery completed")
else:
logger.error(f"❌ ParamSpider discovery failed")
return result
# ============================================================================
# ADVANCED WEB SECURITY TOOLS CONTINUED
# ============================================================================
@mcp.tool()
def burpsuite_scan(project_file: str = "", config_file: str = "", target: str = "", headless: bool = False, scan_type: str = "", scan_config: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Burp Suite with enhanced logging.
Args:
project_file: Burp project file path
config_file: Burp configuration file path
target: Target URL
headless: Run in headless mode
scan_type: Type of scan to perform
scan_config: Scan configuration
output_file: Output file path
additional_args: Additional Burp Suite arguments
Returns:
Burp Suite scan results
"""
data = {
"project_file": project_file,
"config_file": config_file,
"target": target,
"headless": headless,
"scan_type": scan_type,
"scan_config": scan_config,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Burp Suite scan")
result = hexstrike_client.safe_post("api/tools/burpsuite", data)
if result.get("success"):
logger.info(f"✅ Burp Suite scan completed")
else:
logger.error(f"❌ Burp Suite scan failed")
return result
@mcp.tool()
def zap_scan(target: str = "", scan_type: str = "baseline", api_key: str = "", daemon: bool = False, port: str = "8090", host: str = "0.0.0.0", format_type: str = "xml", output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute OWASP ZAP with enhanced logging.
Args:
target: Target URL
scan_type: Type of scan (baseline, full, api)
api_key: ZAP API key
daemon: Run in daemon mode
port: Port for ZAP daemon
host: Host for ZAP daemon
format_type: Output format (xml, json, html)
output_file: Output file path
additional_args: Additional ZAP arguments
Returns:
ZAP scan results
"""
data = {
"target": target,
"scan_type": scan_type,
"api_key": api_key,
"daemon": daemon,
"port": port,
"host": host,
"format": format_type,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🔍 Starting ZAP scan: {target}")
result = hexstrike_client.safe_post("api/tools/zap", data)
if result.get("success"):
logger.info(f"✅ ZAP scan completed for {target}")
else:
logger.error(f"❌ ZAP scan failed for {target}")
return result
@mcp.tool()
def arjun_scan(url: str, method: str = "GET", data: str = "", headers: str = "", timeout: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Arjun for parameter discovery with enhanced logging.
Args:
url: Target URL
method: HTTP method (GET, POST, etc.)
data: POST data for testing
headers: Custom headers
timeout: Request timeout
output_file: Output file path
additional_args: Additional Arjun arguments
Returns:
Parameter discovery results
"""
data = {
"url": url,
"method": method,
"data": data,
"headers": headers,
"timeout": timeout,
"output_file": output_file,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Arjun parameter discovery: {url}")
result = hexstrike_client.safe_post("api/tools/arjun", data)
if result.get("success"):
logger.info(f"✅ Arjun completed for {url}")
else:
logger.error(f"❌ Arjun failed for {url}")
return result
@mcp.tool()
def wafw00f_scan(target: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute wafw00f to identify and fingerprint WAF products with enhanced logging.
Args:
target: Target URL or IP
additional_args: Additional wafw00f arguments
Returns:
WAF detection results
"""
data = {
"target": target,
"additional_args": additional_args
}
logger.info(f"🛡️ Starting Wafw00f WAF detection: {target}")
result = hexstrike_client.safe_post("api/tools/wafw00f", data)
if result.get("success"):
logger.info(f"✅ Wafw00f completed for {target}")
else:
logger.error(f"❌ Wafw00f failed for {target}")
return result
@mcp.tool()
def fierce_scan(domain: str, dns_server: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute fierce for DNS reconnaissance with enhanced logging.
Args:
domain: Target domain
dns_server: DNS server to use
additional_args: Additional fierce arguments
Returns:
DNS reconnaissance results
"""
data = {
"domain": domain,
"dns_server": dns_server,
"additional_args": additional_args
}
logger.info(f"🔍 Starting Fierce DNS recon: {domain}")
result = hexstrike_client.safe_post("api/tools/fierce", data)
if result.get("success"):
logger.info(f"✅ Fierce completed for {domain}")
else:
logger.error(f"❌ Fierce failed for {domain}")
return result
@mcp.tool()
def dnsenum_scan(domain: str, dns_server: str = "", wordlist: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute dnsenum for DNS enumeration with enhanced logging.
Args:
domain: Target domain
dns_server: DNS server to use
wordlist: Wordlist for brute forcing
additional_args: Additional dnsenum arguments
Returns:
DNS enumeration results
"""
data = {
"domain": domain,
"dns_server": dns_server,
"wordlist": wordlist,
"additional_args": additional_args
}
logger.info(f"🔍 Starting DNSenum: {domain}")
result = hexstrike_client.safe_post("api/tools/dnsenum", data)
if result.get("success"):
logger.info(f"✅ DNSenum completed for {domain}")
else:
logger.error(f"❌ DNSenum failed for {domain}")
return result
@mcp.tool()
def autorecon_scan(
target: str = "",
target_file: str = "",
ports: str = "",
output_dir: str = "",
max_scans: str = "",
max_port_scans: str = "",
heartbeat: str = "",
timeout: str = "",
target_timeout: str = "",
config_file: str = "",
global_file: str = "",
plugins_dir: str = "",
add_plugins_dir: str = "",
tags: str = "",
exclude_tags: str = "",
port_scans: str = "",
service_scans: str = "",
reports: str = "",
single_target: bool = False,
only_scans_dir: bool = False,
no_port_dirs: bool = False,
nmap: str = "",
nmap_append: str = "",
proxychains: bool = False,
disable_sanity_checks: bool = False,
disable_keyboard_control: bool = False,
force_services: str = "",
accessible: bool = False,
verbose: int = 0,
curl_path: str = "",
dirbuster_tool: str = "",
dirbuster_wordlist: str = "",
dirbuster_threads: str = "",
dirbuster_ext: str = "",
onesixtyone_community_strings: str = "",
global_username_wordlist: str = "",
global_password_wordlist: str = "",
global_domain: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Execute AutoRecon for comprehensive target enumeration with full parameter support.
Args:
target: Single target to scan
target_file: File containing multiple targets
ports: Specific ports to scan
output_dir: Output directory
max_scans: Maximum number of concurrent scans
max_port_scans: Maximum number of concurrent port scans
heartbeat: Heartbeat interval
timeout: Global timeout
target_timeout: Per-target timeout
config_file: Configuration file path
global_file: Global configuration file
plugins_dir: Plugins directory
add_plugins_dir: Additional plugins directory
tags: Plugin tags to include
exclude_tags: Plugin tags to exclude
port_scans: Port scan plugins to run
service_scans: Service scan plugins to run
reports: Report plugins to run
single_target: Use single target directory structure
only_scans_dir: Only create scans directory
no_port_dirs: Don't create port directories
nmap: Custom nmap command
nmap_append: Arguments to append to nmap
proxychains: Use proxychains
disable_sanity_checks: Disable sanity checks
disable_keyboard_control: Disable keyboard control
force_services: Force service detection
accessible: Enable accessible output
verbose: Verbosity level (0-3)
curl_path: Custom curl path
dirbuster_tool: Directory busting tool
dirbuster_wordlist: Directory busting wordlist
dirbuster_threads: Directory busting threads
dirbuster_ext: Directory busting extensions
onesixtyone_community_strings: SNMP community strings
global_username_wordlist: Global username wordlist
global_password_wordlist: Global password wordlist
global_domain: Global domain
additional_args: Additional AutoRecon arguments
Returns:
Comprehensive enumeration results with full configurability
"""
data = {
"target": target,
"target_file": target_file,
"ports": ports,
"output_dir": output_dir,
"max_scans": max_scans,
"max_port_scans": max_port_scans,
"heartbeat": heartbeat,
"timeout": timeout,
"target_timeout": target_timeout,
"config_file": config_file,
"global_file": global_file,
"plugins_dir": plugins_dir,
"add_plugins_dir": add_plugins_dir,
"tags": tags,
"exclude_tags": exclude_tags,
"port_scans": port_scans,
"service_scans": service_scans,
"reports": reports,
"single_target": single_target,
"only_scans_dir": only_scans_dir,
"no_port_dirs": no_port_dirs,
"nmap": nmap,
"nmap_append": nmap_append,
"proxychains": proxychains,
"disable_sanity_checks": disable_sanity_checks,
"disable_keyboard_control": disable_keyboard_control,
"force_services": force_services,
"accessible": accessible,
"verbose": verbose,
"curl_path": curl_path,
"dirbuster_tool": dirbuster_tool,
"dirbuster_wordlist": dirbuster_wordlist,
"dirbuster_threads": dirbuster_threads,
"dirbuster_ext": dirbuster_ext,
"onesixtyone_community_strings": onesixtyone_community_strings,
"global_username_wordlist": global_username_wordlist,
"global_password_wordlist": global_password_wordlist,
"global_domain": global_domain,
"additional_args": additional_args
}
logger.info(f"🔍 Starting AutoRecon comprehensive enumeration: {target}")
result = hexstrike_client.safe_post("api/tools/autorecon", data)
if result.get("success"):
logger.info(f"✅ AutoRecon comprehensive enumeration completed for {target}")
else:
logger.error(f"❌ AutoRecon failed for {target}")
return result
# ============================================================================
# SYSTEM MONITORING & TELEMETRY
# ============================================================================
@mcp.tool()
def server_health() -> Dict[str, Any]:
"""
Check the health status of the HexStrike AI server.
Returns:
Server health information with tool availability and telemetry
"""
logger.info(f"🏥 Checking HexStrike AI server health")
result = hexstrike_client.check_health()
if result.get("status") == "healthy":
logger.info(f"✅ Server is healthy - {result.get('total_tools_available', 0)} tools available")
else:
logger.warning(f"⚠️ Server health check returned: {result.get('status', 'unknown')}")
return result
@mcp.tool()
def get_cache_stats() -> Dict[str, Any]:
"""
Get cache statistics from the HexStrike AI server.
Returns:
Cache performance statistics
"""
logger.info(f"💾 Getting cache statistics")
result = hexstrike_client.safe_get("api/cache/stats")
if "hit_rate" in result:
logger.info(f"📊 Cache hit rate: {result.get('hit_rate', 'unknown')}")
return result
@mcp.tool()
def clear_cache() -> Dict[str, Any]:
"""
Clear the cache on the HexStrike AI server.
Returns:
Cache clear operation results
"""
logger.info(f"🧹 Clearing server cache")
result = hexstrike_client.safe_post("api/cache/clear", {})
if result.get("success"):
logger.info(f"✅ Cache cleared successfully")
else:
logger.error(f"❌ Failed to clear cache")
return result
@mcp.tool()
def get_telemetry() -> Dict[str, Any]:
"""
Get system telemetry from the HexStrike AI server.
Returns:
System performance and usage telemetry
"""
logger.info(f"📈 Getting system telemetry")
result = hexstrike_client.safe_get("api/telemetry")
if "commands_executed" in result:
logger.info(f"📊 Commands executed: {result.get('commands_executed', 0)}")
return result
# ============================================================================
# PROCESS MANAGEMENT TOOLS (v5.0 ENHANCEMENT)
# ============================================================================
@mcp.tool()
def list_active_processes() -> Dict[str, Any]:
"""
List all active processes on the HexStrike AI server.
Returns:
List of active processes with their status and progress
"""
logger.info("📊 Listing active processes")
result = hexstrike_client.safe_get("api/processes/list")
if result.get("success"):
logger.info(f"✅ Found {result.get('total_count', 0)} active processes")
else:
logger.error("❌ Failed to list processes")
return result
@mcp.tool()
def get_process_status(pid: int) -> Dict[str, Any]:
"""
Get the status of a specific process.
Args:
pid: Process ID to check
Returns:
Process status information including progress and runtime
"""
logger.info(f"🔍 Checking status of process {pid}")
result = hexstrike_client.safe_get(f"api/processes/status/{pid}")
if result.get("success"):
logger.info(f"✅ Process {pid} status retrieved")
else:
logger.error(f"❌ Process {pid} not found or error occurred")
return result
@mcp.tool()
def terminate_process(pid: int) -> Dict[str, Any]:
"""
Terminate a specific running process.
Args:
pid: Process ID to terminate
Returns:
Success status of the termination operation
"""
logger.info(f"🛑 Terminating process {pid}")
result = hexstrike_client.safe_post(f"api/processes/terminate/{pid}", {})
if result.get("success"):
logger.info(f"✅ Process {pid} terminated successfully")
else:
logger.error(f"❌ Failed to terminate process {pid}")
return result
@mcp.tool()
def pause_process(pid: int) -> Dict[str, Any]:
"""
Pause a specific running process.
Args:
pid: Process ID to pause
Returns:
Success status of the pause operation
"""
logger.info(f"⏸️ Pausing process {pid}")
result = hexstrike_client.safe_post(f"api/processes/pause/{pid}", {})
if result.get("success"):
logger.info(f"✅ Process {pid} paused successfully")
else:
logger.error(f"❌ Failed to pause process {pid}")
return result
@mcp.tool()
def resume_process(pid: int) -> Dict[str, Any]:
"""
Resume a paused process.
Args:
pid: Process ID to resume
Returns:
Success status of the resume operation
"""
logger.info(f"▶️ Resuming process {pid}")
result = hexstrike_client.safe_post(f"api/processes/resume/{pid}", {})
if result.get("success"):
logger.info(f"✅ Process {pid} resumed successfully")
else:
logger.error(f"❌ Failed to resume process {pid}")
return result
@mcp.tool()
def get_process_dashboard() -> Dict[str, Any]:
"""
Get enhanced process dashboard with visual status indicators.
Returns:
Real-time dashboard with progress bars, system metrics, and process status
"""
logger.info("📊 Getting process dashboard")
result = hexstrike_client.safe_get("api/processes/dashboard")
if result.get("success", True) and "total_processes" in result:
total = result.get("total_processes", 0)
logger.info(f"✅ Dashboard retrieved: {total} active processes")
# Log visual summary for better UX
if total > 0:
logger.info("📈 Active Processes Summary:")
for proc in result.get("processes", [])[:3]: # Show first 3
logger.info(f" ├─ PID {proc['pid']}: {proc['progress_bar']} {proc['progress_percent']}")
else:
logger.error("❌ Failed to get process dashboard")
return result
@mcp.tool()
def execute_command(command: str, use_cache: bool = True) -> Dict[str, Any]:
"""
Execute an arbitrary command on the HexStrike AI server with enhanced logging.
Args:
command: The command to execute
use_cache: Whether to use caching for this command
Returns:
Command execution results with enhanced telemetry
"""
try:
logger.info(f"⚡ Executing command: {command}")
result = hexstrike_client.execute_command(command, use_cache)
if "error" in result:
logger.error(f"❌ Command failed: {result['error']}")
return {
"success": False,
"error": result["error"],
"stdout": "",
"stderr": f"Error executing command: {result['error']}"
}
if result.get("success"):
execution_time = result.get("execution_time", 0)
logger.info(f"✅ Command completed successfully in {execution_time:.2f}s")
else:
logger.warning(f"⚠️ Command completed with errors")
return result
except Exception as e:
logger.error(f"💥 Error executing command '{command}': {str(e)}")
return {
"success": False,
"error": str(e),
"stdout": "",
"stderr": f"Error executing command: {str(e)}"
}
return mcp
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the HexStrike AI MCP Client")
parser.add_argument("--server", type=str, default=DEFAULT_HEXSTRIKE_SERVER,
help=f"HexStrike AI API server URL (default: {DEFAULT_HEXSTRIKE_SERVER})")
parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT,
help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
return parser.parse_args()
def main():
"""Main entry point for the MCP server."""
args = parse_args()
# Configure logging based on debug flag
if args.debug:
logger.setLevel(logging.DEBUG)
logger.debug("🔍 Debug logging enabled")
# Print startup banner
banner = f"""
{Colors.RED}{Colors.BOLD}
🔥 HexStrike AI MCP Client v4.0
🤖 Advanced AI-driven cybersecurity automation
🔗 Connecting to: {args.server}
⚡ Enhanced features: caching, telemetry, file ops, cloud security
{Colors.RESET}
"""
print(banner)
try:
# Initialize the HexStrike AI client
hexstrike_client = HexStrikeClient(args.server, args.timeout)
# Check server health and log the result
health = hexstrike_client.check_health()
if "error" in health:
logger.warning(f"⚠️ Unable to connect to HexStrike AI API server at {args.server}: {health['error']}")
logger.warning("🚀 MCP server will start, but tool execution may fail")
else:
logger.info(f"🎯 Successfully connected to HexStrike AI API server at {args.server}")
logger.info(f"🏥 Server health status: {health['status']}")
logger.info(f"📊 Version: {health.get('version', 'unknown')}")
if not health.get("all_essential_tools_available", False):
logger.warning("⚠️ Not all essential tools are available on the HexStrike server")
missing_tools = [tool for tool, available in health.get("tools_status", {}).items() if not available]
if missing_tools:
logger.warning(f"❌ Missing tools: {', '.join(missing_tools[:5])}{'...' if len(missing_tools) > 5 else ''}")
# Set up and run the MCP server
mcp = setup_mcp_server(hexstrike_client)
logger.info("🚀 Starting HexStrike AI MCP server")
logger.info("🤖 Ready to serve AI agents with enhanced cybersecurity capabilities")
mcp.run()
except Exception as e:
logger.error(f"💥 Error starting MCP server: {str(e)}")
import traceback
logger.error(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()