Files
AutoPentestX/modules/vuln_scanner.py

345 lines
13 KiB
Python

#!/usr/bin/env python3
"""
AutoPentestX - Vulnerability Scanner Module
Integrates Nikto (web scanning) and SQLMap (SQL injection detection)
"""
import subprocess
import json
import re
import os
from datetime import datetime
class VulnerabilityScanner:
def __init__(self, target, ports_data):
"""Initialize vulnerability scanner"""
self.target = target
self.ports_data = ports_data
self.web_ports = []
self.vulnerabilities = []
self.web_vulns = []
self.sql_vulns = []
# Identify web ports
self.identify_web_services()
def identify_web_services(self):
"""Identify HTTP/HTTPS services from port scan"""
common_web_ports = [80, 443, 8080, 8443, 8000, 8888, 3000, 5000]
web_services = ['http', 'https', 'ssl/http', 'http-proxy', 'http-alt']
for port in self.ports_data:
service = port.get('service', '').lower()
port_num = port.get('port')
# Check if it's a known web service or common web port
if any(web_svc in service for web_svc in web_services) or port_num in common_web_ports:
protocol = 'https' if port_num == 443 or 'https' in service or 'ssl' in service else 'http'
self.web_ports.append({
'port': port_num,
'protocol': protocol,
'url': f"{protocol}://{self.target}:{port_num}"
})
print(f"[✓] Detected web service: {protocol}://{self.target}:{port_num}")
def scan_with_nikto(self, url):
"""Run Nikto web vulnerability scanner"""
print(f"[*] Running Nikto scan on {url}...")
try:
# Check if Nikto is installed
nikto_check = subprocess.run(['which', 'nikto'], capture_output=True)
if nikto_check.returncode != 0:
print("[!] Nikto not installed, skipping web vulnerability scan")
return []
# Run Nikto
cmd = [
'nikto',
'-h', url,
'-Format', 'json',
'-output', f'logs/nikto_{self.target}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json',
'-Tuning', '123456789', # All tests
'-timeout', '10'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
# Parse Nikto output
vulns = self.parse_nikto_output(result.stdout)
print(f"[✓] Nikto scan completed: {len(vulns)} vulnerabilities found")
return vulns
except subprocess.TimeoutExpired:
print("[!] Nikto scan timed out")
return []
except Exception as e:
print(f"[!] Nikto scan failed: {e}")
return []
def parse_nikto_output(self, output):
"""Parse Nikto JSON output"""
vulnerabilities = []
try:
# Try to parse as JSON
if output:
# Nikto output may have multiple JSON objects
json_objects = re.findall(r'\{.*?\}', output, re.DOTALL)
for json_str in json_objects:
try:
data = json.loads(json_str)
if 'vulnerabilities' in data:
for vuln in data['vulnerabilities']:
vulnerabilities.append({
'type': 'web',
'url': vuln.get('url', ''),
'severity': self.map_nikto_severity(vuln.get('OSVDB', '')),
'description': vuln.get('msg', 'Unknown vulnerability')
})
except json.JSONDecodeError:
continue
except Exception as e:
print(f"[!] Error parsing Nikto output: {e}")
# If JSON parsing fails, try text parsing
if not vulnerabilities and output:
lines = output.split('\n')
for line in lines:
if '+ ' in line and any(keyword in line.lower() for keyword in
['vulnerable', 'outdated', 'disclosure', 'injection', 'xss', 'security']):
vulnerabilities.append({
'type': 'web',
'url': self.target,
'severity': 'MEDIUM',
'description': line.strip()
})
return vulnerabilities
def map_nikto_severity(self, osvdb_id):
"""Map OSVDB ID to severity level"""
if not osvdb_id:
return 'MEDIUM'
# Simplified severity mapping
return 'MEDIUM'
def scan_sql_injection(self, url):
"""Scan for SQL injection vulnerabilities using SQLMap"""
print(f"[*] Scanning for SQL injection on {url}...")
try:
# Check if SQLMap is installed
sqlmap_check = subprocess.run(['which', 'sqlmap'], capture_output=True)
if sqlmap_check.returncode != 0:
print("[!] SQLMap not installed, skipping SQL injection scan")
return []
# Run SQLMap with batch mode and basic options
cmd = [
'sqlmap',
'-u', url,
'--batch',
'--crawl=2',
'--level=1',
'--risk=1',
'--random-agent',
'--timeout=30',
'--retries=2',
'--threads=3'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
# Parse SQLMap output
sql_vulns = self.parse_sqlmap_output(result.stdout, url)
print(f"[✓] SQL injection scan completed: {len(sql_vulns)} vulnerabilities found")
return sql_vulns
except subprocess.TimeoutExpired:
print("[!] SQLMap scan timed out")
return []
except Exception as e:
print(f"[!] SQLMap scan failed: {e}")
return []
def parse_sqlmap_output(self, output, url):
"""Parse SQLMap output for vulnerabilities"""
vulnerabilities = []
try:
# Look for SQL injection indicators
if 'parameter' in output.lower() and 'injectable' in output.lower():
# Extract injectable parameters
param_matches = re.findall(r"Parameter: (.*?) \(.*?\) is vulnerable", output, re.IGNORECASE)
for param in param_matches:
vulnerabilities.append({
'type': 'sql_injection',
'url': url,
'parameter': param,
'severity': 'HIGH',
'description': f"SQL Injection vulnerability found in parameter: {param}"
})
# Check for database information
if 'back-end DBMS' in output:
db_match = re.search(r'back-end DBMS: (.*?)[\n\r]', output)
if db_match and vulnerabilities:
vulnerabilities[0]['database'] = db_match.group(1)
except Exception as e:
print(f"[!] Error parsing SQLMap output: {e}")
return vulnerabilities
def scan_common_vulnerabilities(self):
"""Scan for common vulnerabilities based on service versions"""
print("[*] Checking for common vulnerabilities based on service versions...")
common_vulns = []
for port in self.ports_data:
service = port.get('service', '').lower()
version = port.get('version', '').lower()
port_num = port.get('port')
# Check for outdated/vulnerable services
vuln_checks = [
# SSH vulnerabilities
{
'service': 'ssh',
'versions': ['openssh 5', 'openssh 6', 'openssh 7.0', 'openssh 7.1', 'openssh 7.2'],
'vuln_name': 'Outdated SSH Version',
'description': 'SSH service running outdated version with known vulnerabilities',
'severity': 'MEDIUM'
},
# FTP vulnerabilities
{
'service': 'ftp',
'versions': ['vsftpd 2.3.4', 'proftpd 1.3.3'],
'vuln_name': 'Vulnerable FTP Service',
'description': 'FTP service with known backdoor or vulnerabilities',
'severity': 'HIGH'
},
# SMB vulnerabilities
{
'service': 'microsoft-ds',
'versions': ['smb'],
'vuln_name': 'SMB Service Exposed',
'description': 'SMB service exposed, potential for EternalBlue or similar exploits',
'severity': 'HIGH'
},
# MySQL vulnerabilities
{
'service': 'mysql',
'versions': ['5.0', '5.1', '5.5'],
'vuln_name': 'Outdated MySQL Version',
'description': 'MySQL running outdated version with known vulnerabilities',
'severity': 'MEDIUM'
},
# Apache vulnerabilities
{
'service': 'http',
'versions': ['apache 2.0', 'apache 2.2'],
'vuln_name': 'Outdated Apache Server',
'description': 'Apache HTTP server running outdated version',
'severity': 'MEDIUM'
}
]
# Check each vulnerability pattern
for check in vuln_checks:
if check['service'] in service:
for vuln_version in check['versions']:
if vuln_version in version or version == '':
common_vulns.append({
'port': port_num,
'service': service,
'name': check['vuln_name'],
'description': check['description'],
'severity': check['severity'],
'version': version,
'exploitable': True
})
print(f"[!] Found: {check['vuln_name']} on port {port_num}")
break
return common_vulns
def run_full_scan(self):
"""Execute complete vulnerability scan"""
print("\n" + "="*60)
print("AutoPentestX - Vulnerability Scanner")
print("="*60)
print(f"Target: {self.target}")
print(f"Services to scan: {len(self.ports_data)}")
print("="*60 + "\n")
# Scan common vulnerabilities
common_vulns = self.scan_common_vulnerabilities()
self.vulnerabilities.extend(common_vulns)
# Scan web services if found
if self.web_ports:
print(f"\n[*] Found {len(self.web_ports)} web service(s)")
for web_service in self.web_ports:
url = web_service['url']
# Nikto scan
nikto_vulns = self.scan_with_nikto(url)
self.web_vulns.extend(nikto_vulns)
# SQL injection scan
sql_vulns = self.scan_sql_injection(url)
self.sql_vulns.extend(sql_vulns)
else:
print("[!] No web services detected")
print("\n" + "="*60)
print("VULNERABILITY SCAN SUMMARY")
print("="*60)
print(f"Common Vulnerabilities: {len(common_vulns)}")
print(f"Web Vulnerabilities: {len(self.web_vulns)}")
print(f"SQL Injection Points: {len(self.sql_vulns)}")
print(f"Total Vulnerabilities: {len(self.vulnerabilities) + len(self.web_vulns) + len(self.sql_vulns)}")
print("="*60 + "\n")
return {
'vulnerabilities': self.vulnerabilities,
'web_vulnerabilities': self.web_vulns,
'sql_vulnerabilities': self.sql_vulns
}
def get_results(self):
"""Return vulnerability scan results"""
return {
'vulnerabilities': self.vulnerabilities,
'web_vulnerabilities': self.web_vulns,
'sql_vulnerabilities': self.sql_vulns
}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python vuln_scanner.py <target>")
sys.exit(1)
# This would normally receive port data from scanner module
# For testing, we'll use sample data
target = sys.argv[1]
sample_ports = [
{'port': 80, 'service': 'http', 'version': 'Apache 2.2.8'},
{'port': 22, 'service': 'ssh', 'version': 'OpenSSH 7.0'}
]
vuln_scanner = VulnerabilityScanner(target, sample_ports)
results = vuln_scanner.run_full_scan()