diff --git a/agentic_security/probe_data/data.py b/agentic_security/probe_data/data.py index f4f5175..4844cb1 100644 --- a/agentic_security/probe_data/data.py +++ b/agentic_security/probe_data/data.py @@ -245,6 +245,7 @@ def load_jailbreak_v28k() -> ProbeDataset: return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", []) +@cache_to_disk() def load_local_csv() -> ProbeDataset: """Load prompts from local CSV files.""" csv_files = [f for f in os.listdir(".") if f.endswith(".csv")] @@ -264,6 +265,44 @@ def load_local_csv() -> ProbeDataset: return create_probe_dataset("Local CSV", prompts, {"src": str(csv_files)}) +@cache_to_disk() +def load_csv(file: str) -> ProbeDataset: + """Load prompts from local CSV files.""" + prompts = [] + try: + df = pd.read_csv(file) + prompts = df["prompt"].tolist() + if "prompt" in df.columns: + prompts.extend(df["prompt"].tolist()) + else: + logger.warning(f"File {file} lacks a suitable prompt column") + except Exception as e: + logger.error(f"Error reading {file}: {e}") + return create_probe_dataset(f"fs://{file}", prompts, {"src": str(file)}) + + +@cache_to_disk() +def load_local_csv_files() -> list[ProbeDataset]: + """Load prompts from local CSV files and return a list of ProbeDataset objects.""" + csv_files = [f for f in os.listdir(".") if f.endswith(".csv")] + logger.info(f"Found {len(csv_files)} CSV files: {csv_files}") + + datasets = [] + + for file in csv_files: + try: + df = pd.read_csv(file) + if "prompt" in df.columns: + prompts = df["prompt"].tolist() + datasets.append(create_probe_dataset(file, prompts, {"src": file})) + else: + logger.warning(f"File {file} lacks a suitable prompt column") + except Exception as e: + logger.error(f"Error reading {file}: {e}") + + return datasets + + # Stenography transformer class StenographyTransformer: """Apply stenography transformations to datasets.""" @@ -435,4 +474,11 @@ def prepare_prompts( except Exception as e: logger.exception(f"Error loading dynamic {name}: {e}") + # Load csv datasets and apply transformations + for name, opts in zip(dataset_names, options): + if not name.endswith(".csv"): + continue + logger.info(f"Loading csv dataset {name} {opts}") + datasets.append(load_csv(name)) + return datasets