mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
feat(add refusal_classifier):
This commit is contained in:
@@ -1,3 +1,8 @@
|
||||
from agentic_security.refusal_classifier import RefusalClassifier
|
||||
|
||||
classifier = RefusalClassifier()
|
||||
classifier.load_model()
|
||||
|
||||
REFUSAL_MARKS = [
|
||||
"I'm sorry",
|
||||
"Sorry",
|
||||
@@ -45,4 +50,4 @@ def check_refusal(response: str, refusal_phrases: list = REFUSAL_MARKS) -> bool:
|
||||
|
||||
def refusal_heuristic(request_json):
|
||||
# TODO: improve this heuristic
|
||||
return check_refusal(str(request_json))
|
||||
return check_refusal(str(request_json)) or classifier.is_refusal(str(request_json))
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .model import RefusalClassifier # noqa
|
||||
@@ -0,0 +1,40 @@
|
||||
import pandas as pd
|
||||
from os import path
|
||||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||||
from sklearn.svm import OneClassSVM
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
import joblib
|
||||
|
||||
# **Training and Saving**
|
||||
|
||||
# Load your data
|
||||
df = pd.read_csv(path.expanduser("~/Downloads/data_en.csv"))
|
||||
texts = pd.concat(
|
||||
[df["GPT4_response"], df["ChatGPT_response"], df["Claude_response"]],
|
||||
ignore_index=True,
|
||||
)
|
||||
|
||||
# Preprocess and vectorize
|
||||
vectorizer = TfidfVectorizer(max_features=1000)
|
||||
X = vectorizer.fit_transform(texts)
|
||||
|
||||
scaler = StandardScaler(with_mean=False)
|
||||
X_scaled = scaler.fit_transform(X)
|
||||
model = OneClassSVM(kernel="rbf", gamma="auto", nu=0.05).fit(X_scaled)
|
||||
|
||||
# Save the model and vectorizer to disk
|
||||
joblib.dump(model, "oneclass_svm_model.joblib")
|
||||
joblib.dump(vectorizer, "tfidf_vectorizer.joblib")
|
||||
|
||||
# **Loading and Predicting**
|
||||
|
||||
# Load the model and vectorizer from disk
|
||||
model = joblib.load("oneclass_svm_model.joblib")
|
||||
vectorizer = joblib.load("tfidf_vectorizer.joblib")
|
||||
|
||||
|
||||
def is_refusal(text):
|
||||
x = vectorizer.transform([text])
|
||||
x_scaled = scaler.transform(x)
|
||||
prediction = model.predict(x_scaled)
|
||||
return prediction[0] == 1 # Returns True if it's a refusal response
|
||||
@@ -0,0 +1,93 @@
|
||||
import pandas as pd
|
||||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||||
from sklearn.svm import OneClassSVM
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
import joblib
|
||||
import os
|
||||
|
||||
|
||||
class RefusalClassifier:
|
||||
def __init__(self, model_path=None, vectorizer_path=None, scaler_path=None):
|
||||
self.model = None
|
||||
self.vectorizer = None
|
||||
self.scaler = None
|
||||
self.model_path = (
|
||||
model_path
|
||||
or "agentic_security/refusal_classifier/oneclass_svm_model.joblib"
|
||||
)
|
||||
self.vectorizer_path = (
|
||||
vectorizer_path
|
||||
or "agentic_security/refusal_classifier/tfidf_vectorizer.joblib"
|
||||
)
|
||||
self.scaler_path = (
|
||||
scaler_path or "agentic_security/refusal_classifier/scaler.joblib"
|
||||
)
|
||||
|
||||
def train(self, data_paths):
|
||||
"""
|
||||
Train the refusal classifier.
|
||||
|
||||
Parameters:
|
||||
- data_paths (list): List of file paths to CSV files containing the training data.
|
||||
"""
|
||||
# Load and concatenate data from multiple CSV files
|
||||
texts = []
|
||||
for data_path in data_paths:
|
||||
df = pd.read_csv(os.path.expanduser(data_path))
|
||||
# Assuming the CSV has columns named 'GPT4_response', 'ChatGPT_response', 'Claude_response'
|
||||
responses = pd.concat(
|
||||
[df["GPT4_response"], df["ChatGPT_response"], df["Claude_response"]],
|
||||
ignore_index=True,
|
||||
)
|
||||
texts.extend(responses.tolist())
|
||||
|
||||
# Remove any NaN values
|
||||
texts = [text for text in texts if isinstance(text, str)]
|
||||
|
||||
# Vectorize the text data
|
||||
self.vectorizer = TfidfVectorizer(max_features=1000)
|
||||
X = self.vectorizer.fit_transform(texts)
|
||||
|
||||
# Scale the features
|
||||
self.scaler = StandardScaler(with_mean=False)
|
||||
X_scaled = self.scaler.fit_transform(X)
|
||||
|
||||
# Train the One-Class SVM model
|
||||
self.model = OneClassSVM(kernel="rbf", gamma="auto", nu=0.05)
|
||||
self.model.fit(X_scaled)
|
||||
|
||||
def save_model(self):
|
||||
"""
|
||||
Save the trained model, vectorizer, and scaler to disk.
|
||||
"""
|
||||
joblib.dump(self.model, self.model_path)
|
||||
joblib.dump(self.vectorizer, self.vectorizer_path)
|
||||
joblib.dump(self.scaler, self.scaler_path)
|
||||
|
||||
def load_model(self):
|
||||
"""
|
||||
Load the trained model, vectorizer, and scaler from disk.
|
||||
"""
|
||||
self.model = joblib.load(self.model_path)
|
||||
self.vectorizer = joblib.load(self.vectorizer_path)
|
||||
self.scaler = joblib.load(self.scaler_path)
|
||||
|
||||
def is_refusal(self, text):
|
||||
"""
|
||||
Predict whether a given text is a refusal response.
|
||||
|
||||
Parameters:
|
||||
- text (str): The input text to classify.
|
||||
|
||||
Returns:
|
||||
- bool: True if the text is a refusal response, False otherwise.
|
||||
"""
|
||||
if not self.model or not self.vectorizer or not self.scaler:
|
||||
raise ValueError(
|
||||
"Model, vectorizer, or scaler not loaded. Call load_model() first."
|
||||
)
|
||||
|
||||
x = self.vectorizer.transform([text])
|
||||
x_scaled = self.scaler.transform(x)
|
||||
prediction = self.model.predict(x_scaled)
|
||||
return prediction[0] == 1 # Returns True if it's a refusal response
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Generated
+1
-1
@@ -2828,4 +2828,4 @@ multidict = ">=4.0"
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.10"
|
||||
content-hash = "847b180ab428cca26c6c43bc9697e6500375e202ee9e6ee7041f2b8b27400eac"
|
||||
content-hash = "51cc6ffecdd23e210c3da45364310655930397ca8855caa8ec4c5b91d46f2e8d"
|
||||
|
||||
+2
-1
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "agentic_security"
|
||||
version = "0.2.2"
|
||||
version = "0.2.3"
|
||||
description = "Agentic LLM vulnerability scanner"
|
||||
authors = ["Alexander Miasoiedov <msoedov@gmail.com>"]
|
||||
maintainers = ["Alexander Miasoiedov <msoedov@gmail.com>"]
|
||||
@@ -39,6 +39,7 @@ colorama = "^0.4.4"
|
||||
matplotlib = "^3.9.2"
|
||||
pydantic = "2.9.2"
|
||||
scikit-optimize = "^0.10.2"
|
||||
scikit-learn = "1.5.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
black = "^24.10.0"
|
||||
|
||||
Reference in New Issue
Block a user