Files

283 lines
11 KiB
Python

# attacks/visual_adv/attack.py
from __future__ import annotations
from typing import Any, Dict, Iterable, Tuple, Optional, Union
import os
import json
import csv
import random
from pathlib import Path
import torch
from torchvision.utils import save_image
from PIL import Image
from tqdm import tqdm
from core.base_classes import BaseAttack
from core.data_formats import TestCase
from dataclasses import dataclass
@dataclass
class VisualAdvConfig:
epsilon: float = 8.0
alpha: float = 2.0
n_iters: int = 500
constrained: bool = True
batch_size: int = 8
image_path: str = "attacks/data/clean.jpeg"
target_path: str = "attacks/data/derogatory_corpus.csv"
target_model: str = "minigpt4.minigpt4_model.MiniGPT4"
device: str = "cuda" if torch.cuda.is_available() else "cpu"
class VisualAdvAttack(BaseAttack):
CONFIG_CLASS = VisualAdvConfig
def __init__(self, config: Dict[str, Any] = None, output_image_dir: str = None):
super().__init__(config, output_image_dir)
# If epsilon/alpha looks like pixel values (>1), convert to [0,1] ratio
self.epsilon = getattr(self.cfg, "epsilon", 8.0)
self.alpha = getattr(self.cfg, "alpha", 2.0)
# if epsilon looks like pixel ( > 1 ) convert to fraction over 255
if self.epsilon > 1.0:
self.epsilon = float(self.epsilon) / 255.0
if self.alpha > 1.0:
self.alpha = float(self.alpha) / 255.0
self.n_iters = int(getattr(self.cfg, "n_iters", 500))
self.constrained = bool(getattr(self.cfg, "constrained", True))
self.batch_size = int(getattr(self.cfg, "batch_size", 8))
self.image_path = str(getattr(self.cfg, "image_path", ""))
self.target_path = str(getattr(self.cfg, "target_path", ""))
self.target_model_name = str(getattr(self.cfg, "target_model", ""))
self.device = torch.device(
getattr(self.cfg, "device", "cuda" if torch.cuda.is_available() else "cpu")
)
# Load target list (first column of CSV is the target)
self.targets = []
if self.target_path and os.path.exists(self.target_path):
try:
with open(self.target_path, "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
if len(row) >= 1 and row[0].strip(): # Ensure not empty string
self.targets.append(row[0].strip())
except Exception as e:
print(
f"[VisualAdv] Warning: Failed to load targets from {self.target_path}: {e}"
)
# If targets is empty, provide default value or raise error
if not self.targets:
raise ValueError(
f"No targets loaded from {self.target_path}. "
"Please ensure the target_path points to a valid CSV file with at least one target."
)
# Load model (assume multimodalmodels module is already in the project)
# Use lazy import to avoid import-time issues
# self.model = getattr(multimodalmodels, self.target_model_name)()
import importlib
module_path, cls_name = self.target_model_name.rsplit(".", 1)
mod = importlib.import_module(f"multimodalmodels.{module_path}")
self.model = getattr(mod, cls_name)()
self.save_path = None
# -----------------------
# PGD related functions (keep original logic, but safer)
# -----------------------
def _pgd_unconstrained(self, image_path: str):
# open image -> model.vis_processor -> tensor (B,C,H,W) or (C,H,W)
img = Image.open(image_path).convert("RGB")
# assume model has vis_processor that returns a tensor or processed object
# try a few possibilities
proc = self.model.vis_processor(img)
if isinstance(proc, dict):
# sometimes processors return dict with "pixel_values"
img_t = proc.get("pixel_values")
else:
img_t = proc
# ensure tensor shape [1,C,H,W] and on device
if img_t.dim() == 3:
img_t = img_t.unsqueeze(0)
img_t = img_t.to(self.device)
# adv_noise initialized in [0,1]
adv_noise = torch.rand_like(img_t, device=self.device)
adv_noise.requires_grad_(True)
adv_img_prompt = None
for t in tqdm(range(self.n_iters + 1), desc="PGD_unconstrained"):
# sample targets
if not self.targets:
raise ValueError("Cannot sample targets: targets list is empty")
if len(self.targets) >= self.batch_size:
batch_targets = random.sample(self.targets, self.batch_size)
else:
batch_targets = random.choices(self.targets, k=max(1, self.batch_size))
x_adv = self._normalize(
adv_noise
) # normalizing model input as original implementation
# model.compute_loss_batch expects x_adv to be in a format acceptable to the model
target_loss = self.model.compute_loss_batch(x_adv, batch_targets)
# Backward pass to compute gradients
if target_loss is None:
raise RuntimeError("model.compute_loss_batch returned None")
target_loss.backward()
with torch.no_grad():
adv_noise.data = (
adv_noise.data - self.alpha * adv_noise.grad.detach().sign()
).clamp(0.0, 1.0)
adv_noise.grad.zero_()
# Clear model gradients
try:
self.model.model.zero_grad()
except Exception:
pass
if t % 100 == 0 or t == self.n_iters:
if (
verbose := False
): # Only keep hook, remove this line if verbose is needed
print(f"target_loss: {target_loss.item():.6f} at iter {t}")
x_adv = self._normalize(adv_noise)
adv_img = self._denormalize(x_adv).detach().cpu()
# adv_img: [1,C,H,W] -> squeeze
adv_img_prompt = adv_img.squeeze(0)
# Don't save temporary images here each time to avoid generating many files
if adv_img_prompt is None:
raise RuntimeError(
"Failed to produce adversarial image in PGD_unconstrained"
)
return adv_img_prompt
def _pgd_constrained(self, image_path: str):
img = Image.open(image_path).convert("RGB")
proc = self.model.vis_processor(img)
# try common key names
img_t = None
if isinstance(proc, dict):
for k in ("pixel_values", "pixel_value", "input_pixels"):
if k in proc:
img_t = proc[k]
break
if img_t is None:
# fallback: processor returned tensor directly
if hasattr(proc, "unsqueeze"):
img_t = proc
else:
raise RuntimeError(
"Unable to obtain tensor from model.vis_processor for constrained PGD"
)
if img_t.dim() == 3:
img_t = img_t.unsqueeze(0)
img_t = img_t.to(self.device)
# initialize adv_noise in [-epsilon, +epsilon]
adv_noise = (
torch.rand_like(img_t, device=self.device) * 2.0 * self.epsilon
- self.epsilon
).detach()
x = self._denormalize(img_t).clone().to(self.device)
adv_noise.data = (adv_noise.data + x.data).clamp(0.0, 1.0) - x.data
adv_noise.requires_grad_(True)
# freeze model params if possible
try:
self.model.model.requires_grad_(False)
except Exception:
pass
adv_img_prompt = None
for t in tqdm(range(self.n_iters + 1), desc="PGD_constrained"):
if not self.targets:
raise ValueError("Cannot sample targets: targets list is empty")
if len(self.targets) >= self.batch_size:
batch_targets = random.sample(self.targets, self.batch_size)
else:
batch_targets = random.choices(self.targets, k=max(1, self.batch_size))
x_adv = x + adv_noise
x_adv = self._normalize(x_adv)
target_loss = self.model.compute_loss_batch(x_adv, batch_targets)
if target_loss is None:
raise RuntimeError("model.compute_loss_batch returned None")
target_loss.backward()
with torch.no_grad():
adv_noise.data = (
adv_noise.data - self.alpha * adv_noise.grad.detach().sign()
).clamp(-self.epsilon, self.epsilon)
adv_noise.data = (adv_noise.data + x.data).clamp(0.0, 1.0) - x.data
adv_noise.grad.zero_()
if t % 100 == 0 or t == self.n_iters:
if verbose := False:
print(f"target_loss: {target_loss.item():.6f} at iter {t}")
x_adv = x + adv_noise
x_adv = self._normalize(x_adv)
adv_img = self._denormalize(x_adv).detach().cpu()
adv_img_prompt = adv_img.squeeze(0)
if adv_img_prompt is None:
raise RuntimeError("Failed to produce adversarial image in PGD_constrained")
return adv_img_prompt
# normalize/denormalize: Note to keep consistent with mean/variance used by the model
def _normalize(self, images: torch.Tensor) -> torch.Tensor:
mean = torch.tensor(
[0.48145466, 0.4578275, 0.40821073], device=images.device
).view(1, 3, 1, 1)
std = torch.tensor(
[0.26862954, 0.26130258, 0.27577711], device=images.device
).view(1, 3, 1, 1)
return (images - mean) / std
def _denormalize(self, images: torch.Tensor) -> torch.Tensor:
mean = torch.tensor(
[0.48145466, 0.4578275, 0.40821073], device=images.device
).view(1, 3, 1, 1)
std = torch.tensor(
[0.26862954, 0.26130258, 0.27577711], device=images.device
).view(1, 3, 1, 1)
return images * std + mean
# Override generate: expects prompts to be dict (consistent with what's passed in main.py)
def generate_test_case(
self,
original_prompt: str,
image_path: str,
case_id: str,
**kwargs,
) -> TestCase:
if self.save_path is None:
## Only need to generate image once:
if self.constrained:
adv_img_tensor = self._pgd_constrained(self.image_path)
else:
adv_img_tensor = self._pgd_unconstrained(self.image_path)
if adv_img_tensor.dim() == 3:
adv_t_save = adv_img_tensor.unsqueeze(0)
else:
adv_t_save = adv_img_tensor
self.save_path = self.output_image_dir / "advimg.jpg"
save_image(adv_t_save, str(self.save_path))
return self.create_test_case(
case_id=case_id,
jailbreak_prompt=original_prompt,
jailbreak_image_path=str(self.save_path),
original_prompt=original_prompt,
original_image_path=str(image_path),
)