From 37a6e7a5bc953ebd428780727d13808bb4c26ad5 Mon Sep 17 00:00:00 2001 From: Alexander Myasoedov Date: Thu, 13 Mar 2025 18:12:33 +0200 Subject: [PATCH] fix(data loaders): --- agentic_security/probe_data/data.py | 205 ++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 57 deletions(-) diff --git a/agentic_security/probe_data/data.py b/agentic_security/probe_data/data.py index b4ffa3e..f4f5175 100644 --- a/agentic_security/probe_data/data.py +++ b/agentic_security/probe_data/data.py @@ -3,6 +3,7 @@ import os import random from collections.abc import Callable, Iterator from functools import partial +from typing import Any, TypeVar import httpx import pandas as pd @@ -21,15 +22,18 @@ from agentic_security.probe_data.modules import ( ) # Type aliases for clarity +T = TypeVar("T") FilterFn = Callable[[pd.Series], bool] ColumnMappings = dict[str, str] DatasetLoader = Callable[[], ProbeDataset] +TransformFn = Callable[[str], str] # Core data loading utilities def fetch_csv_content(url: str) -> str: """Fetch CSV content from a URL.""" response = httpx.get(url) + response.raise_for_status() # Raise exception for bad responses return response.content.decode("utf-8") @@ -57,7 +61,7 @@ def transform_df( def create_probe_dataset( - name: str, prompts: list[str], metadata: dict = None + name: str, prompts: list[str], metadata: dict[str, Any] | None = None ) -> ProbeDataset: """Create a ProbeDataset from prompts.""" metadata = metadata or {} @@ -77,14 +81,46 @@ def load_dataset_generic( mappings: ColumnMappings | None = None, filter_fn: FilterFn | None = None, url: str | None = None, - metadata: dict | None = None, + metadata: dict[str, Any] | None = None, ) -> ProbeDataset: """Load and process a dataset with flexible configuration.""" - df = load_df_from_source(url or name, is_url=bool(url)) - transformed_df = transform_df(df, mappings, filter_fn) - prompt_col = mappings.get("prompt", "prompt") if mappings else "prompt" - prompts = transformed_df[prompt_col].tolist() - return create_probe_dataset(name, prompts, metadata) + try: + df = load_df_from_source(url or name, is_url=bool(url)) + transformed_df = transform_df(df, mappings, filter_fn) + + # Determine which column to use as the prompt source + prompt_col = None + if mappings and "prompt" in mappings: + prompt_col = mappings["prompt"] + elif "prompt" in transformed_df.columns: + prompt_col = "prompt" + else: + # Try to find a suitable text column + text_columns = [ + col + for col in transformed_df.columns + if any( + keyword in col.lower() + for keyword in ["prompt", "text", "query", "question"] + ) + ] + if text_columns: + prompt_col = text_columns[0] + logger.info(f"Using column '{prompt_col}' as prompt source") + else: + logger.error(f"No suitable prompt column found in dataset {name}") + return create_probe_dataset(name, [], metadata) + + # Extract prompts and filter out empty ones + prompts = [ + p + for p in transformed_df[prompt_col].tolist() + if p and isinstance(p, (str, int, float)) + ] + return create_probe_dataset(name, prompts, metadata) + except Exception as e: + logger.error(f"Error loading dataset {name}: {e}") + return create_probe_dataset(name, [], {"error": str(e)}) # Dataset-specific configurations @@ -159,7 +195,7 @@ DATASET_CONFIGS_GENERICS = { # Dataset factory -def create_dataset_loader(name: str, config: dict) -> DatasetLoader: +def create_dataset_loader(name: str, config: dict[str, Any]) -> DatasetLoader: """Create a dataset loader from configuration.""" return partial( load_dataset_generic, @@ -167,6 +203,7 @@ def create_dataset_loader(name: str, config: dict) -> DatasetLoader: mappings=config.get("mappings"), filter_fn=config.get("filter_fn"), url=config.get("url"), + metadata={"source": name, "config": str(config)}, ) @@ -176,20 +213,38 @@ def load_multi_dataset(name: str, sub_datasets: list[str]) -> ProbeDataset: """Load and combine multiple sub-datasets.""" prompts = [] for sub in sub_datasets: - dataset = load_dataset(name, sub) - prompts.extend(dataset["train"]["query"]) - return create_probe_dataset(f"{name}_combined", prompts) + try: + dataset = load_dataset(name, sub) + if "query" in dataset["train"].features: + prompts.extend(dataset["train"]["query"]) + else: + logger.warning(f"No 'query' column in {name}/{sub}") + except Exception as e: + logger.error(f"Error loading {name}/{sub}: {e}") + + return create_probe_dataset( + f"{name}_combined", prompts, {"source": name, "sub_datasets": sub_datasets} + ) @cache_to_disk() def load_jailbreak_v28k() -> ProbeDataset: """Load JailBreakV-28K dataset.""" - df = pd.read_csv("hf://datasets/JailbreakV-28K/JailBreakV-28k/JailBreakV_28K.csv") - prompts = df["jailbreak_query"].tolist() - return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", prompts) + try: + df = pd.read_csv( + "hf://datasets/JailbreakV-28K/JailBreakV-28k/JailBreakV_28K.csv" + ) + prompts = df["jailbreak_query"].tolist() + return create_probe_dataset( + "JailbreakV-28K/JailBreakV-28k", + prompts, + {"source": "JailbreakV-28K/JailBreakV-28k"}, + ) + except Exception as e: + logger.error(f"Error loading JailbreakV-28K: {e}") + return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", []) -@cache_to_disk() def load_local_csv() -> ProbeDataset: """Load prompts from local CSV files.""" csv_files = [f for f in os.listdir(".") if f.endswith(".csv")] @@ -202,7 +257,7 @@ def load_local_csv() -> ProbeDataset: if "prompt" in df.columns: prompts.extend(df["prompt"].tolist()) else: - logger.warning(f"File {file} lacks 'prompt' column") + logger.warning(f"File {file} lacks a suitable prompt column") except Exception as e: logger.error(f"Error reading {file}: {e}") @@ -231,36 +286,59 @@ class StenographyTransformer: def transform(self) -> Iterator[ProbeDataset]: """Generate transformed datasets.""" + if not self.datasets: + logger.warning("No datasets provided to StenographyTransformer") + return + for dataset in self.datasets: + if not dataset.prompts: + logger.warning( + f"Dataset {dataset.dataset_name} has no prompts to transform" + ) + continue + size = len(dataset.prompts) - sample_size = int(size * 0.25) + sample_size = max(1, int(size * 0.25)) # At least 1 sample for name, fn in self.TRANSFORMATIONS.items(): logger.info(f"Applying {name} to {dataset.dataset_name}") - sampled = random.sample(dataset.prompts, sample_size) - transformed = [ - f"Follow the instructions in {name}: {fn(p)}" for p in sampled - ] - yield create_probe_dataset( - f"stenography.{name}({dataset.dataset_name})", transformed - ) + sampled = random.sample(dataset.prompts, min(sample_size, size)) + try: + transformed = [ + f"Follow the instructions in {name}: {fn(str(p))}" + for p in sampled + ] + transformed_dataset = create_probe_dataset( + f"stenography.{name}({dataset.dataset_name})", + transformed, + {"source": dataset.dataset_name, "transformation": name}, + ) + yield transformed_dataset + except Exception as e: + logger.error( + f"Error applying {name} to {dataset.dataset_name}: {e}" + ) def dataset_from_iterator( - name: str, iterator, lazy: bool = False + name: str, iterator: Iterator[str], lazy: bool = False ) -> list[ProbeDataset]: """Convert an iterator into a list of ProbeDataset objects.""" - prompts = list(iterator) if not lazy else iterator - tokens = sum(len(str(s).split()) for s in prompts) if not lazy else 0 - dataset = ProbeDataset( - dataset_name=name, - metadata={}, - prompts=prompts, - tokens=tokens, - approx_cost=0.0, - lazy=lazy, - ) - return [dataset] + try: + prompts = list(iterator) if not lazy else iterator + tokens = sum(len(str(s).split()) for s in prompts) if not lazy else 0 + dataset = ProbeDataset( + dataset_name=name, + metadata={"source": name, "lazy": lazy}, + prompts=prompts, + tokens=tokens, + approx_cost=0.0, + lazy=lazy, + ) + return [dataset] + except Exception as e: + logger.error(f"Error creating dataset from iterator {name}: {e}") + return [create_probe_dataset(name, [], {"error": str(e)})] # Main dataset preparation @@ -272,6 +350,7 @@ def prepare_prompts( ) -> list[ProbeDataset]: """Prepare datasets based on names and options.""" # Base dataset loaders + logger.info(f"Preparing datasets: {dataset_names}") dataset_loaders = { **{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS.items()}, **{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS_GENERICS.items()}, @@ -288,28 +367,39 @@ def prepare_prompts( ), "JailbreakV-28K/JailBreakV-28k": load_jailbreak_v28k, "Local CSV": load_local_csv, + "Custom CSV": load_local_csv, } # Dynamic dataset loaders dynamic_loaders = { "AgenticBackend": lambda opts: dataset_from_iterator( "AgenticBackend", - fine_tuned.Module([], tools_inbox=tools_inbox, opts=opts).apply(), + fine_tuned.Module( + opts["datasets"], tools_inbox=tools_inbox, opts=opts + ).apply(), lazy=True, ), - "Steganography": lambda opts: list(StenographyTransformer([]).transform()), + "Steganography": lambda opts: list( + StenographyTransformer(opts["datasets"]).transform() + ), "llm-adaptive-attacks": lambda opts: dataset_from_iterator( "llm-adaptive-attacks", - adaptive_attacks.Module([], tools_inbox=tools_inbox, opts=opts).apply(), + adaptive_attacks.Module( + opts["datasets"], tools_inbox=tools_inbox, opts=opts + ).apply(), ), "Garak": lambda opts: dataset_from_iterator( "Garak", - garak_tool.Module([], tools_inbox=tools_inbox, opts=opts).apply(), + garak_tool.Module( + opts["datasets"], tools_inbox=tools_inbox, opts=opts + ).apply(), lazy=True, ), "Reinforcement Learning Optimization": lambda opts: dataset_from_iterator( "Reinforcement Learning Optimization", - rl_model.Module([], tools_inbox=tools_inbox, opts=opts).apply(), + rl_model.Module( + opts["datasets"], tools_inbox=tools_inbox, opts=opts + ).apply(), lazy=True, ), "InspectAI": lambda opts: dataset_from_iterator( @@ -320,28 +410,29 @@ def prepare_prompts( "GPT fuzzer": lambda opts: [], } - options = options or [{} for _ in dataset_names] datasets = [] + options = options or [dict(datasets=datasets) for _ in dataset_names] # Load base datasets for name, opts in zip(dataset_names, options): - if name in dataset_loaders: - logger.info(f"Loading base dataset {name}") - try: - datasets.append(dataset_loaders[name]()) - except Exception as e: - logger.error(f"Error loading {name}: {e}") + if name not in dataset_loaders: + continue + try: + datasets.append(dataset_loaders[name]()) + except Exception as e: + + logger.error(f"Error loading {name}: {e}") # Load dynamic datasets and apply transformations for name, opts in zip(dataset_names, options): - if name in dynamic_loaders: - logger.info(f"Loading dynamic dataset {name}") - try: - dynamic_result = dynamic_loaders[name](opts) - datasets.extend(dynamic_result) - except Exception as e: - logger.error(f"Error loading dynamic {name}: {e}") - elif name == "Steganography": - datasets.extend(list(StenographyTransformer(datasets).transform())) + if name not in dynamic_loaders: + continue + logger.info(f"Loading dynamic dataset {name} {opts}") + opts["datasets"] = datasets + try: + dynamic_result = dynamic_loaders[name](opts) + datasets.extend(dynamic_result) + except Exception as e: + logger.exception(f"Error loading dynamic {name}: {e}") return datasets