mirror of
https://github.com/Gowtham-Darkseid/AutoPentestX.git
synced 2026-05-25 09:54:11 +02:00
Initial commit: AutoPentestX - Complete automated penetration testing toolkit with 8 modules, comprehensive documentation, and professional PDF reporting
This commit is contained in:
@@ -0,0 +1,284 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
AutoPentestX - Database Module
|
||||
Handles SQLite database operations for storing scan results
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
from datetime import datetime
|
||||
import os
|
||||
|
||||
class Database:
|
||||
def __init__(self, db_path="database/autopentestx.db"):
|
||||
"""Initialize database connection"""
|
||||
self.db_path = db_path
|
||||
self.ensure_directory()
|
||||
self.conn = None
|
||||
self.cursor = None
|
||||
self.connect()
|
||||
self.create_tables()
|
||||
|
||||
def ensure_directory(self):
|
||||
"""Ensure database directory exists"""
|
||||
db_dir = os.path.dirname(self.db_path)
|
||||
if db_dir and not os.path.exists(db_dir):
|
||||
os.makedirs(db_dir)
|
||||
|
||||
def connect(self):
|
||||
"""Establish database connection"""
|
||||
try:
|
||||
self.conn = sqlite3.connect(self.db_path)
|
||||
self.cursor = self.conn.cursor()
|
||||
print(f"[✓] Database connected: {self.db_path}")
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Database connection error: {e}")
|
||||
raise
|
||||
|
||||
def create_tables(self):
|
||||
"""Create necessary database tables"""
|
||||
try:
|
||||
# Scans table
|
||||
self.cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS scans (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
target TEXT NOT NULL,
|
||||
scan_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
os_detection TEXT,
|
||||
scan_duration REAL,
|
||||
total_ports INTEGER,
|
||||
open_ports INTEGER,
|
||||
vulnerabilities_found INTEGER,
|
||||
risk_score TEXT,
|
||||
status TEXT DEFAULT 'completed'
|
||||
)
|
||||
''')
|
||||
|
||||
# Ports table
|
||||
self.cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS ports (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
scan_id INTEGER,
|
||||
port_number INTEGER,
|
||||
protocol TEXT,
|
||||
state TEXT,
|
||||
service_name TEXT,
|
||||
service_version TEXT,
|
||||
FOREIGN KEY (scan_id) REFERENCES scans(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# Vulnerabilities table
|
||||
self.cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS vulnerabilities (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
scan_id INTEGER,
|
||||
port_number INTEGER,
|
||||
service_name TEXT,
|
||||
vuln_name TEXT,
|
||||
vuln_description TEXT,
|
||||
cve_id TEXT,
|
||||
cvss_score REAL,
|
||||
risk_level TEXT,
|
||||
exploitable BOOLEAN,
|
||||
FOREIGN KEY (scan_id) REFERENCES scans(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# Exploits table
|
||||
self.cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS exploits (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
scan_id INTEGER,
|
||||
vuln_id INTEGER,
|
||||
exploit_name TEXT,
|
||||
exploit_status TEXT,
|
||||
exploit_result TEXT,
|
||||
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (scan_id) REFERENCES scans(id),
|
||||
FOREIGN KEY (vuln_id) REFERENCES vulnerabilities(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# Web vulnerabilities table
|
||||
self.cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS web_vulnerabilities (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
scan_id INTEGER,
|
||||
url TEXT,
|
||||
vuln_type TEXT,
|
||||
severity TEXT,
|
||||
description TEXT,
|
||||
FOREIGN KEY (scan_id) REFERENCES scans(id)
|
||||
)
|
||||
''')
|
||||
|
||||
self.conn.commit()
|
||||
print("[✓] Database tables created successfully")
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error creating tables: {e}")
|
||||
raise
|
||||
|
||||
def insert_scan(self, target, os_detection="Unknown"):
|
||||
"""Insert new scan record"""
|
||||
try:
|
||||
self.cursor.execute('''
|
||||
INSERT INTO scans (target, os_detection, status)
|
||||
VALUES (?, ?, 'in_progress')
|
||||
''', (target, os_detection))
|
||||
self.conn.commit()
|
||||
return self.cursor.lastrowid
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error inserting scan: {e}")
|
||||
return None
|
||||
|
||||
def update_scan(self, scan_id, **kwargs):
|
||||
"""Update scan record with new information"""
|
||||
try:
|
||||
updates = []
|
||||
values = []
|
||||
for key, value in kwargs.items():
|
||||
updates.append(f"{key} = ?")
|
||||
values.append(value)
|
||||
|
||||
values.append(scan_id)
|
||||
query = f"UPDATE scans SET {', '.join(updates)} WHERE id = ?"
|
||||
self.cursor.execute(query, values)
|
||||
self.conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error updating scan: {e}")
|
||||
|
||||
def insert_port(self, scan_id, port_data):
|
||||
"""Insert port information"""
|
||||
try:
|
||||
self.cursor.execute('''
|
||||
INSERT INTO ports (scan_id, port_number, protocol, state, service_name, service_version)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
scan_id,
|
||||
port_data.get('port'),
|
||||
port_data.get('protocol', 'tcp'),
|
||||
port_data.get('state', 'open'),
|
||||
port_data.get('service', 'unknown'),
|
||||
port_data.get('version', 'unknown')
|
||||
))
|
||||
self.conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error inserting port: {e}")
|
||||
|
||||
def insert_vulnerability(self, scan_id, vuln_data):
|
||||
"""Insert vulnerability information"""
|
||||
try:
|
||||
self.cursor.execute('''
|
||||
INSERT INTO vulnerabilities
|
||||
(scan_id, port_number, service_name, vuln_name, vuln_description,
|
||||
cve_id, cvss_score, risk_level, exploitable)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
scan_id,
|
||||
vuln_data.get('port'),
|
||||
vuln_data.get('service'),
|
||||
vuln_data.get('name'),
|
||||
vuln_data.get('description'),
|
||||
vuln_data.get('cve_id'),
|
||||
vuln_data.get('cvss_score', 0.0),
|
||||
vuln_data.get('risk_level', 'UNKNOWN'),
|
||||
vuln_data.get('exploitable', False)
|
||||
))
|
||||
self.conn.commit()
|
||||
return self.cursor.lastrowid
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error inserting vulnerability: {e}")
|
||||
return None
|
||||
|
||||
def insert_web_vulnerability(self, scan_id, web_vuln_data):
|
||||
"""Insert web vulnerability information"""
|
||||
try:
|
||||
self.cursor.execute('''
|
||||
INSERT INTO web_vulnerabilities
|
||||
(scan_id, url, vuln_type, severity, description)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
''', (
|
||||
scan_id,
|
||||
web_vuln_data.get('url'),
|
||||
web_vuln_data.get('type'),
|
||||
web_vuln_data.get('severity'),
|
||||
web_vuln_data.get('description')
|
||||
))
|
||||
self.conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error inserting web vulnerability: {e}")
|
||||
|
||||
def insert_exploit(self, scan_id, vuln_id, exploit_data):
|
||||
"""Insert exploit attempt information"""
|
||||
try:
|
||||
self.cursor.execute('''
|
||||
INSERT INTO exploits (scan_id, vuln_id, exploit_name, exploit_status, exploit_result)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
''', (
|
||||
scan_id,
|
||||
vuln_id,
|
||||
exploit_data.get('name'),
|
||||
exploit_data.get('status'),
|
||||
exploit_data.get('result')
|
||||
))
|
||||
self.conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error inserting exploit: {e}")
|
||||
|
||||
def get_scan_data(self, scan_id):
|
||||
"""Retrieve complete scan data"""
|
||||
try:
|
||||
# Get scan info
|
||||
self.cursor.execute('SELECT * FROM scans WHERE id = ?', (scan_id,))
|
||||
scan = self.cursor.fetchone()
|
||||
|
||||
# Get ports
|
||||
self.cursor.execute('SELECT * FROM ports WHERE scan_id = ?', (scan_id,))
|
||||
ports = self.cursor.fetchall()
|
||||
|
||||
# Get vulnerabilities
|
||||
self.cursor.execute('SELECT * FROM vulnerabilities WHERE scan_id = ?', (scan_id,))
|
||||
vulnerabilities = self.cursor.fetchall()
|
||||
|
||||
# Get web vulnerabilities
|
||||
self.cursor.execute('SELECT * FROM web_vulnerabilities WHERE scan_id = ?', (scan_id,))
|
||||
web_vulns = self.cursor.fetchall()
|
||||
|
||||
# Get exploits
|
||||
self.cursor.execute('SELECT * FROM exploits WHERE scan_id = ?', (scan_id,))
|
||||
exploits = self.cursor.fetchall()
|
||||
|
||||
return {
|
||||
'scan': scan,
|
||||
'ports': ports,
|
||||
'vulnerabilities': vulnerabilities,
|
||||
'web_vulnerabilities': web_vulns,
|
||||
'exploits': exploits
|
||||
}
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error retrieving scan data: {e}")
|
||||
return None
|
||||
|
||||
def get_all_scans(self):
|
||||
"""Get list of all scans"""
|
||||
try:
|
||||
self.cursor.execute('SELECT * FROM scans ORDER BY scan_date DESC')
|
||||
return self.cursor.fetchall()
|
||||
except sqlite3.Error as e:
|
||||
print(f"[✗] Error retrieving scans: {e}")
|
||||
return []
|
||||
|
||||
def close(self):
|
||||
"""Close database connection"""
|
||||
if self.conn:
|
||||
self.conn.close()
|
||||
print("[✓] Database connection closed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test database
|
||||
db = Database()
|
||||
print("Database module test completed successfully")
|
||||
db.close()
|
||||
Reference in New Issue
Block a user