mirror of
https://github.com/0xMarcio/PentestPilot.git
synced 2026-06-08 07:13:53 +02:00
461c14d676
Initial commit of PentestPilot — AI‑assisted pentest recon and orchestration toolkit.\n\nHighlights:\n- Resumeable pipelines (full_pipeline) with manifest state and elapsed timings\n- Rich dashboard (colors, severity bars, durations, compact/json modes)\n- Web helpers: httpx→nuclei auto, tech routing + quick scanners\n- Agents: multi‑task orchestrator (web/full/ad/notes/post) with resume\n- AD/SMB, password utils, shells, transfer, privesc, tunnels\n- QoL scripts: proxy toggle, cleanup, tmux init, URL extractor\n- Docs: README (Quick Start + Docs Index), HOWTO (deep guide), TOOLKIT (catalog with examples)\n\nStructure:\n- bin/automation: pipelines, dashboard, manifest, resume, tech_actions\n- bin/web: routing, scanners, helpers\n- bin/ai: orchestrators + robust AI utils\n- bin/ad, bin/passwords, bin/shells, bin/transfer, bin/privesc, bin/misc, bin/dns, bin/scan, bin/windows, bin/hashes\n- HOWTO.md and TOOLKIT.md cross‑linked with examples\n\nUse:\n- settarget <target>; agent full <domain|hosts.txt>; dashboard --compact\n- See HOWTO.md for setup, semantics, and examples.
165 lines
5.8 KiB
Python
Executable File
165 lines
5.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os, json, sys, glob, datetime
|
|
|
|
def load_manifest(path):
|
|
try:
|
|
with open(path,'r') as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
def status_symbol(s, color=True):
|
|
sym = {'ok':'OK','running':'..','fail':'XX'}.get(s, '--')
|
|
if not color or not sys.stdout.isatty():
|
|
return sym
|
|
colors = {'ok':'\033[32m','running':'\033[33m','fail':'\033[31m'}
|
|
reset='\033[0m'
|
|
return f"{colors.get(s,'')}{sym}{reset}"
|
|
|
|
def sev_bar(sev_map, total, width=20, color=True):
|
|
if not total or total == 0: return '-'*width
|
|
order=['critical','high','medium','low']
|
|
codes={'critical':'\033[41m','high':'\033[45m','medium':'\033[43m','low':'\033[42m'}
|
|
bar=''
|
|
filled=0
|
|
for k in order:
|
|
n=int((sev_map.get(k,0)/total)*width)
|
|
if n>0:
|
|
seg='█'*n
|
|
if color and sys.stdout.isatty():
|
|
bar+=codes.get(k,'')+seg+'\033[0m'
|
|
else:
|
|
bar+=seg
|
|
filled+=n
|
|
if filled<width:
|
|
bar+=' '*(width-filled)
|
|
return bar
|
|
|
|
color = True
|
|
compact = False
|
|
fancy = True
|
|
if '--no-color' in sys.argv:
|
|
color = False
|
|
if '--compact' in sys.argv:
|
|
compact = True
|
|
if '--no-fancy' in sys.argv:
|
|
fancy = False
|
|
if '--fancy' in sys.argv:
|
|
fancy = True
|
|
|
|
root = os.environ.get('HTB_ROOT', os.getcwd())
|
|
targets_dir = os.path.join(root,'targets')
|
|
manifests = glob.glob(os.path.join(targets_dir,'*','manifest.json'))
|
|
|
|
rows=[]
|
|
# Aggregates for badges/header
|
|
agg = {
|
|
'targets': 0,
|
|
'completed': 0,
|
|
'pending': 0,
|
|
'sev': {'critical':0,'high':0,'medium':0,'low':0}
|
|
}
|
|
for m in manifests:
|
|
target=os.path.basename(os.path.dirname(m))
|
|
data=load_manifest(m) or {}
|
|
created=data.get('created_at','')
|
|
last=data.get('last_pipeline','')
|
|
urls=len(data.get('urls',[])) if isinstance(data.get('urls'), list) else 0
|
|
tasks=data.get('tasks',{})
|
|
agg['targets'] += 1
|
|
# Summaries
|
|
sev_str=''
|
|
tech_str=''
|
|
httpx_meta=(tasks.get('httpx') or {}).get('meta') or {}
|
|
nuclei_meta=(tasks.get('nuclei') or {}).get('meta') or {}
|
|
try:
|
|
if 'httpx_summary' in httpx_meta and os.path.isfile(httpx_meta['httpx_summary']):
|
|
with open(httpx_meta['httpx_summary'],'r') as f:
|
|
s=json.load(f)
|
|
tech=s.get('tech') or {}
|
|
top=sorted(tech.items(), key=lambda x: x[1], reverse=True)[:3]
|
|
tech_str=', '.join(f"{k}:{v}" for k,v in top)
|
|
except Exception:
|
|
pass
|
|
sev_map={}
|
|
sev_total=0
|
|
try:
|
|
if 'nuclei_summary' in nuclei_meta and os.path.isfile(nuclei_meta['nuclei_summary']):
|
|
with open(nuclei_meta['nuclei_summary'],'r') as f:
|
|
s=json.load(f)
|
|
sev=(s.get('by_severity') or {})
|
|
sev_map={k:int(sev[k]) for k in sev}; sev_total=int(s.get('total',0) or 0)
|
|
sev_str=' '.join(f"{k[0]}:{sev_map.get(k,0)}" for k in ['critical','high','medium','low'] if sev_map.get(k))
|
|
for k in ['critical','high','medium','low']:
|
|
agg['sev'][k] += int(sev_map.get(k,0))
|
|
except Exception:
|
|
pass
|
|
# Durations
|
|
def dur(task):
|
|
t=tasks.get(task) or {}
|
|
meta=t.get('meta') or {}
|
|
if 'elapsed_sec' in meta:
|
|
return str(meta['elapsed_sec'])
|
|
# fallback compute
|
|
try:
|
|
a=t.get('started_at'); b=t.get('finished_at')
|
|
if a and b:
|
|
da=datetime.datetime.strptime(a,'%Y-%m-%d %H:%M:%S')
|
|
db=datetime.datetime.strptime(b,'%Y-%m-%d %H:%M:%S')
|
|
return str(int((db-da).total_seconds()))
|
|
except Exception:
|
|
pass
|
|
return '-'
|
|
|
|
row=[target, created, last, str(urls),
|
|
status_symbol(tasks.get('dns',{}).get('status'), color=color)+'('+dur('dns')+'s)',
|
|
status_symbol(tasks.get('httpx',{}).get('status'), color=color)+'('+dur('httpx')+'s)',
|
|
status_symbol(tasks.get('nuclei',{}).get('status'), color=color)+'('+dur('nuclei')+'s)',
|
|
status_symbol(tasks.get('techroute',{}).get('status'), color=color)+'('+dur('techroute')+'s)',
|
|
status_symbol(tasks.get('wpscan',{}).get('status'), color=color)+'('+dur('wpscan')+'s)',
|
|
sev_str or '-', tech_str or '-', sev_bar(sev_map, sev_total, color=color)]
|
|
rows.append(row)
|
|
fp = (tasks.get('full_pipeline') or {}).get('status')
|
|
if fp == 'ok':
|
|
agg['completed'] += 1
|
|
else:
|
|
agg['pending'] += 1
|
|
|
|
if '--json' in sys.argv:
|
|
out=[]
|
|
for r in rows:
|
|
out.append({'target':r[0],'created_at':r[1],'last_pipeline':r[2],'urls':int(r[3]),'tasks':{'dns':r[4],'httpx':r[5],'nuclei':r[6],'techroute':r[7],'wpscan':r[8]}})
|
|
print(json.dumps(out, indent=2))
|
|
sys.exit(0)
|
|
|
|
if compact:
|
|
headers=['target','urls','dns','httpx','nuclei','tech','wp','sev','bar']
|
|
print_header()
|
|
print(' | '.join(headers))
|
|
print('-' * 72)
|
|
for r in sorted(rows, key=lambda x: x[0]):
|
|
# r indices: 0 tgt, 3 urls, 4 dns,5 httpx,6 nuclei,7 tech,8 wp,9 sev,11 bar
|
|
out=[r[0], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[11]]
|
|
print(' | '.join(out))
|
|
else:
|
|
headers=['target','created','last','urls','dns','httpx','nuclei','tech','wp','sev','top-techs','bar']
|
|
print_header()
|
|
print(' | '.join(headers))
|
|
print('-' * 72)
|
|
for r in sorted(rows, key=lambda x: x[0]):
|
|
print(' | '.join(r))
|
|
def print_header():
|
|
if not fancy or compact or not sys.stdout.isatty():
|
|
return
|
|
# Build ASCII header
|
|
t = agg['targets']; c = agg['completed']; p = agg['pending']
|
|
s = agg['sev']
|
|
title = f" Pentest Dashboard — targets:{t} completed:{c} pending:{p} "
|
|
line = '+' + '-'*(len(title)) + '+'
|
|
sev_line = f" severities — C:{s['critical']} H:{s['high']} M:{s['medium']} L:{s['low']} "
|
|
sev_bar = sev_line
|
|
print(line)
|
|
print('|' + title + '|')
|
|
print('|' + sev_bar + '|')
|
|
print(line)
|