fix(data loaders):

This commit is contained in:
Alexander Myasoedov
2025-03-13 18:12:33 +02:00
parent 85216ad106
commit 37a6e7a5bc
+148 -57
View File
@@ -3,6 +3,7 @@ import os
import random
from collections.abc import Callable, Iterator
from functools import partial
from typing import Any, TypeVar
import httpx
import pandas as pd
@@ -21,15 +22,18 @@ from agentic_security.probe_data.modules import (
)
# Type aliases for clarity
T = TypeVar("T")
FilterFn = Callable[[pd.Series], bool]
ColumnMappings = dict[str, str]
DatasetLoader = Callable[[], ProbeDataset]
TransformFn = Callable[[str], str]
# Core data loading utilities
def fetch_csv_content(url: str) -> str:
"""Fetch CSV content from a URL."""
response = httpx.get(url)
response.raise_for_status() # Raise exception for bad responses
return response.content.decode("utf-8")
@@ -57,7 +61,7 @@ def transform_df(
def create_probe_dataset(
name: str, prompts: list[str], metadata: dict = None
name: str, prompts: list[str], metadata: dict[str, Any] | None = None
) -> ProbeDataset:
"""Create a ProbeDataset from prompts."""
metadata = metadata or {}
@@ -77,14 +81,46 @@ def load_dataset_generic(
mappings: ColumnMappings | None = None,
filter_fn: FilterFn | None = None,
url: str | None = None,
metadata: dict | None = None,
metadata: dict[str, Any] | None = None,
) -> ProbeDataset:
"""Load and process a dataset with flexible configuration."""
df = load_df_from_source(url or name, is_url=bool(url))
transformed_df = transform_df(df, mappings, filter_fn)
prompt_col = mappings.get("prompt", "prompt") if mappings else "prompt"
prompts = transformed_df[prompt_col].tolist()
return create_probe_dataset(name, prompts, metadata)
try:
df = load_df_from_source(url or name, is_url=bool(url))
transformed_df = transform_df(df, mappings, filter_fn)
# Determine which column to use as the prompt source
prompt_col = None
if mappings and "prompt" in mappings:
prompt_col = mappings["prompt"]
elif "prompt" in transformed_df.columns:
prompt_col = "prompt"
else:
# Try to find a suitable text column
text_columns = [
col
for col in transformed_df.columns
if any(
keyword in col.lower()
for keyword in ["prompt", "text", "query", "question"]
)
]
if text_columns:
prompt_col = text_columns[0]
logger.info(f"Using column '{prompt_col}' as prompt source")
else:
logger.error(f"No suitable prompt column found in dataset {name}")
return create_probe_dataset(name, [], metadata)
# Extract prompts and filter out empty ones
prompts = [
p
for p in transformed_df[prompt_col].tolist()
if p and isinstance(p, (str, int, float))
]
return create_probe_dataset(name, prompts, metadata)
except Exception as e:
logger.error(f"Error loading dataset {name}: {e}")
return create_probe_dataset(name, [], {"error": str(e)})
# Dataset-specific configurations
@@ -159,7 +195,7 @@ DATASET_CONFIGS_GENERICS = {
# Dataset factory
def create_dataset_loader(name: str, config: dict) -> DatasetLoader:
def create_dataset_loader(name: str, config: dict[str, Any]) -> DatasetLoader:
"""Create a dataset loader from configuration."""
return partial(
load_dataset_generic,
@@ -167,6 +203,7 @@ def create_dataset_loader(name: str, config: dict) -> DatasetLoader:
mappings=config.get("mappings"),
filter_fn=config.get("filter_fn"),
url=config.get("url"),
metadata={"source": name, "config": str(config)},
)
@@ -176,20 +213,38 @@ def load_multi_dataset(name: str, sub_datasets: list[str]) -> ProbeDataset:
"""Load and combine multiple sub-datasets."""
prompts = []
for sub in sub_datasets:
dataset = load_dataset(name, sub)
prompts.extend(dataset["train"]["query"])
return create_probe_dataset(f"{name}_combined", prompts)
try:
dataset = load_dataset(name, sub)
if "query" in dataset["train"].features:
prompts.extend(dataset["train"]["query"])
else:
logger.warning(f"No 'query' column in {name}/{sub}")
except Exception as e:
logger.error(f"Error loading {name}/{sub}: {e}")
return create_probe_dataset(
f"{name}_combined", prompts, {"source": name, "sub_datasets": sub_datasets}
)
@cache_to_disk()
def load_jailbreak_v28k() -> ProbeDataset:
"""Load JailBreakV-28K dataset."""
df = pd.read_csv("hf://datasets/JailbreakV-28K/JailBreakV-28k/JailBreakV_28K.csv")
prompts = df["jailbreak_query"].tolist()
return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", prompts)
try:
df = pd.read_csv(
"hf://datasets/JailbreakV-28K/JailBreakV-28k/JailBreakV_28K.csv"
)
prompts = df["jailbreak_query"].tolist()
return create_probe_dataset(
"JailbreakV-28K/JailBreakV-28k",
prompts,
{"source": "JailbreakV-28K/JailBreakV-28k"},
)
except Exception as e:
logger.error(f"Error loading JailbreakV-28K: {e}")
return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", [])
@cache_to_disk()
def load_local_csv() -> ProbeDataset:
"""Load prompts from local CSV files."""
csv_files = [f for f in os.listdir(".") if f.endswith(".csv")]
@@ -202,7 +257,7 @@ def load_local_csv() -> ProbeDataset:
if "prompt" in df.columns:
prompts.extend(df["prompt"].tolist())
else:
logger.warning(f"File {file} lacks 'prompt' column")
logger.warning(f"File {file} lacks a suitable prompt column")
except Exception as e:
logger.error(f"Error reading {file}: {e}")
@@ -231,36 +286,59 @@ class StenographyTransformer:
def transform(self) -> Iterator[ProbeDataset]:
"""Generate transformed datasets."""
if not self.datasets:
logger.warning("No datasets provided to StenographyTransformer")
return
for dataset in self.datasets:
if not dataset.prompts:
logger.warning(
f"Dataset {dataset.dataset_name} has no prompts to transform"
)
continue
size = len(dataset.prompts)
sample_size = int(size * 0.25)
sample_size = max(1, int(size * 0.25)) # At least 1 sample
for name, fn in self.TRANSFORMATIONS.items():
logger.info(f"Applying {name} to {dataset.dataset_name}")
sampled = random.sample(dataset.prompts, sample_size)
transformed = [
f"Follow the instructions in {name}: {fn(p)}" for p in sampled
]
yield create_probe_dataset(
f"stenography.{name}({dataset.dataset_name})", transformed
)
sampled = random.sample(dataset.prompts, min(sample_size, size))
try:
transformed = [
f"Follow the instructions in {name}: {fn(str(p))}"
for p in sampled
]
transformed_dataset = create_probe_dataset(
f"stenography.{name}({dataset.dataset_name})",
transformed,
{"source": dataset.dataset_name, "transformation": name},
)
yield transformed_dataset
except Exception as e:
logger.error(
f"Error applying {name} to {dataset.dataset_name}: {e}"
)
def dataset_from_iterator(
name: str, iterator, lazy: bool = False
name: str, iterator: Iterator[str], lazy: bool = False
) -> list[ProbeDataset]:
"""Convert an iterator into a list of ProbeDataset objects."""
prompts = list(iterator) if not lazy else iterator
tokens = sum(len(str(s).split()) for s in prompts) if not lazy else 0
dataset = ProbeDataset(
dataset_name=name,
metadata={},
prompts=prompts,
tokens=tokens,
approx_cost=0.0,
lazy=lazy,
)
return [dataset]
try:
prompts = list(iterator) if not lazy else iterator
tokens = sum(len(str(s).split()) for s in prompts) if not lazy else 0
dataset = ProbeDataset(
dataset_name=name,
metadata={"source": name, "lazy": lazy},
prompts=prompts,
tokens=tokens,
approx_cost=0.0,
lazy=lazy,
)
return [dataset]
except Exception as e:
logger.error(f"Error creating dataset from iterator {name}: {e}")
return [create_probe_dataset(name, [], {"error": str(e)})]
# Main dataset preparation
@@ -272,6 +350,7 @@ def prepare_prompts(
) -> list[ProbeDataset]:
"""Prepare datasets based on names and options."""
# Base dataset loaders
logger.info(f"Preparing datasets: {dataset_names}")
dataset_loaders = {
**{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS.items()},
**{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS_GENERICS.items()},
@@ -288,28 +367,39 @@ def prepare_prompts(
),
"JailbreakV-28K/JailBreakV-28k": load_jailbreak_v28k,
"Local CSV": load_local_csv,
"Custom CSV": load_local_csv,
}
# Dynamic dataset loaders
dynamic_loaders = {
"AgenticBackend": lambda opts: dataset_from_iterator(
"AgenticBackend",
fine_tuned.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
fine_tuned.Module(
opts["datasets"], tools_inbox=tools_inbox, opts=opts
).apply(),
lazy=True,
),
"Steganography": lambda opts: list(StenographyTransformer([]).transform()),
"Steganography": lambda opts: list(
StenographyTransformer(opts["datasets"]).transform()
),
"llm-adaptive-attacks": lambda opts: dataset_from_iterator(
"llm-adaptive-attacks",
adaptive_attacks.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
adaptive_attacks.Module(
opts["datasets"], tools_inbox=tools_inbox, opts=opts
).apply(),
),
"Garak": lambda opts: dataset_from_iterator(
"Garak",
garak_tool.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
garak_tool.Module(
opts["datasets"], tools_inbox=tools_inbox, opts=opts
).apply(),
lazy=True,
),
"Reinforcement Learning Optimization": lambda opts: dataset_from_iterator(
"Reinforcement Learning Optimization",
rl_model.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
rl_model.Module(
opts["datasets"], tools_inbox=tools_inbox, opts=opts
).apply(),
lazy=True,
),
"InspectAI": lambda opts: dataset_from_iterator(
@@ -320,28 +410,29 @@ def prepare_prompts(
"GPT fuzzer": lambda opts: [],
}
options = options or [{} for _ in dataset_names]
datasets = []
options = options or [dict(datasets=datasets) for _ in dataset_names]
# Load base datasets
for name, opts in zip(dataset_names, options):
if name in dataset_loaders:
logger.info(f"Loading base dataset {name}")
try:
datasets.append(dataset_loaders[name]())
except Exception as e:
logger.error(f"Error loading {name}: {e}")
if name not in dataset_loaders:
continue
try:
datasets.append(dataset_loaders[name]())
except Exception as e:
logger.error(f"Error loading {name}: {e}")
# Load dynamic datasets and apply transformations
for name, opts in zip(dataset_names, options):
if name in dynamic_loaders:
logger.info(f"Loading dynamic dataset {name}")
try:
dynamic_result = dynamic_loaders[name](opts)
datasets.extend(dynamic_result)
except Exception as e:
logger.error(f"Error loading dynamic {name}: {e}")
elif name == "Steganography":
datasets.extend(list(StenographyTransformer(datasets).transform()))
if name not in dynamic_loaders:
continue
logger.info(f"Loading dynamic dataset {name} {opts}")
opts["datasets"] = datasets
try:
dynamic_result = dynamic_loaders[name](opts)
datasets.extend(dynamic_result)
except Exception as e:
logger.exception(f"Error loading dynamic {name}: {e}")
return datasets