From 69fa63ac43ff2caa0a8ff0df2a1faff9b447a8cc Mon Sep 17 00:00:00 2001
From: pliny <133052465+elder-plinius@users.noreply.github.com>
Date: Sun, 8 Mar 2026 12:07:56 -0700
Subject: [PATCH] Add files via upload
---
app.py | 262 +++++++++--
docs/index.html | 62 ++-
obliteratus/abliterate.py | 211 +++++++--
.../analysis/conditional_abliteration.py | 2 +-
obliteratus/analysis/leace.py | 239 ++++++++++
obliteratus/analysis/riemannian_manifold.py | 19 +-
obliteratus/analysis/sae_abliteration.py | 2 +-
.../analysis/spectral_certification.py | 14 +-
obliteratus/analysis/wasserstein_optimal.py | 2 +-
obliteratus/bayesian_optimizer.py | 237 ++++++----
obliteratus/cli.py | 8 +-
obliteratus/evaluation/heretic_eval.py | 23 +-
obliteratus/informed_pipeline.py | 430 +++++++++++++++---
obliteratus/tourney.py | 83 +++-
14 files changed, 1320 insertions(+), 274 deletions(-)
create mode 100644 obliteratus/analysis/leace.py
diff --git a/app.py b/app.py
index f7df636..28f2d07 100644
--- a/app.py
+++ b/app.py
@@ -98,6 +98,51 @@ def _is_quota_error(exc: BaseException) -> bool:
return True
return False
+
+def _load_model_to_device(
+ pretrained_path: str,
+ *,
+ torch_dtype=None,
+ trust_remote_code: bool = False,
+ quantization_config=None,
+ offload_folder: str | None = None,
+ low_cpu_mem_usage: bool = False,
+ token: str | None = None,
+) -> AutoModelForCausalLM:
+ """Load a causal LM onto the best available device, MPS-safe.
+
+ Accelerate's ``device_map="auto"`` is not supported on MPS — models
+ silently land on CPU. This helper skips ``device_map`` on non-CUDA
+ backends and explicitly moves the model to the best device after loading.
+ On CUDA the behaviour is identical to ``device_map="auto"``.
+ """
+ kwargs: dict = {}
+ if torch_dtype is not None:
+ kwargs["torch_dtype"] = torch_dtype
+ if trust_remote_code:
+ kwargs["trust_remote_code"] = True
+ if quantization_config is not None:
+ kwargs["quantization_config"] = quantization_config
+ if offload_folder is not None:
+ kwargs["offload_folder"] = offload_folder
+ if low_cpu_mem_usage:
+ kwargs["low_cpu_mem_usage"] = True
+ if token is not None:
+ kwargs["token"] = token
+
+ if dev.supports_device_map_auto():
+ kwargs["device_map"] = "auto"
+
+ model = AutoModelForCausalLM.from_pretrained(pretrained_path, **kwargs)
+
+ # On MPS / CPU: model loaded without device_map, move to best device
+ if not dev.supports_device_map_auto():
+ target = dev.get_device()
+ model = model.to(target)
+
+ return model
+
+
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
@@ -164,7 +209,7 @@ def _recover_sessions_from_disk() -> None:
"""
global _last_obliterated_label, _obliterate_counter
found_any = False
- for pattern in ("obliterated_*", "obliterated", "bench_*"):
+ for pattern in ("obliterated_*", "obliterated", "bench_*", "obliteratus_tourney/r*"):
for p in Path("/tmp").glob(pattern):
if not p.is_dir():
continue
@@ -291,6 +336,11 @@ METHODS = {
"optimized (bayesian auto-tuned)": "optimized",
"inverted (semantic refusal inversion)": "inverted",
"nuclear (maximum force combo)": "nuclear",
+ # Baseline reproductions for benchmarking
+ "failspy (FailSpy/abliterator baseline)": "failspy",
+ "gabliteration (Gülmez 2026 baseline)": "gabliteration",
+ "heretic (p-e-w 2025-2026 baseline)": "heretic",
+ "rdo (Wollschlager ICML 2025 baseline)": "rdo",
}
# ── Community Hub push ────────────────────────────────────────────────
@@ -316,6 +366,7 @@ def _get_preset_defaults(method_display: str):
cfg = _PRESET_CONFIGS.get(method_key, _PRESET_CONFIGS["advanced"])
return {
"n_directions": cfg.get("n_directions", 4),
+ "direction_method": cfg.get("direction_method", "svd"),
"regularization": cfg.get("regularization", 0.3),
"refinement_passes": cfg.get("refinement_passes", 2),
"norm_preserve": cfg.get("norm_preserve", True),
@@ -341,6 +392,17 @@ def _get_preset_defaults(method_display: str):
"spectral_cascade": cfg.get("spectral_cascade", False),
"spectral_bands": cfg.get("spectral_bands", 3),
"spectral_threshold": cfg.get("spectral_threshold", 0.05),
+ # Baseline-specific parameters
+ "layer_selection": cfg.get("layer_selection", "all"),
+ "winsorize_activations": cfg.get("winsorize_activations", False),
+ "winsorize_percentile": cfg.get("winsorize_percentile", 1.0),
+ "use_kl_optimization": cfg.get("use_kl_optimization", False),
+ "kl_budget": cfg.get("kl_budget", 0.5),
+ "float_layer_interpolation": cfg.get("float_layer_interpolation", False),
+ "rdo_refinement": cfg.get("rdo_refinement", False),
+ "cot_aware": cfg.get("cot_aware", False),
+ "bayesian_trials": cfg.get("bayesian_trials", 50),
+ "n_sae_features": cfg.get("n_sae_features", 64),
}
def _on_method_change(method_display: str):
@@ -348,6 +410,7 @@ def _on_method_change(method_display: str):
d = _get_preset_defaults(method_display)
return (
d["n_directions"],
+ d["direction_method"],
d["regularization"],
d["refinement_passes"],
d["reflection_strength"],
@@ -374,6 +437,16 @@ def _on_method_change(method_display: str):
d["expert_transplant"],
d["use_wasserstein_optimal"],
d["spectral_cascade"],
+ d["layer_selection"],
+ d["winsorize_activations"],
+ d["winsorize_percentile"],
+ d["use_kl_optimization"],
+ d["kl_budget"],
+ d["float_layer_interpolation"],
+ d["rdo_refinement"],
+ d["cot_aware"],
+ d["bayesian_trials"],
+ d["n_sae_features"],
)
def _on_dataset_change(dataset_label: str):
@@ -1731,8 +1804,9 @@ def _format_multi_model_results(results: list[dict], context: dict | None = None
def obliterate(model_choice: str, method_choice: str,
prompt_volume_choice: str, dataset_source_choice: str,
custom_harmful: str, custom_harmless: str,
- # Advanced params (sliders)
- adv_n_directions: int, adv_regularization: float,
+ # Advanced params (sliders + radio)
+ adv_n_directions: int, adv_direction_method: str,
+ adv_regularization: float,
adv_refinement_passes: int, adv_reflection_strength: float,
adv_embed_regularization: float, adv_steering_strength: float,
adv_transplant_blend: float,
@@ -1748,6 +1822,12 @@ def obliterate(model_choice: str, method_choice: str,
adv_project_embeddings: bool, adv_activation_steering: bool,
adv_expert_transplant: bool, adv_wasserstein_optimal: bool,
adv_spectral_cascade: bool,
+ adv_layer_selection: str, adv_winsorize: bool,
+ adv_winsorize_percentile: float,
+ adv_kl_optimization: bool, adv_kl_budget: float,
+ adv_float_layer_interp: bool, adv_rdo_refinement: bool,
+ adv_cot_aware: bool,
+ adv_bayesian_trials: int, adv_n_sae_features: int,
progress=gr.Progress()):
"""Run the full obliteration pipeline, streaming log updates to the UI.
@@ -1906,6 +1986,7 @@ def obliterate(model_choice: str, method_choice: str,
on_log=on_log,
# Advanced overrides from UI
n_directions=int(adv_n_directions),
+ direction_method=adv_direction_method,
regularization=float(adv_regularization),
refinement_passes=int(adv_refinement_passes),
norm_preserve=adv_norm_preserve,
@@ -1932,6 +2013,15 @@ def obliterate(model_choice: str, method_choice: str,
spectral_bands=int(adv_spectral_bands),
spectral_threshold=float(adv_spectral_threshold),
verify_sample_size=int(adv_verify_sample_size),
+ layer_selection=adv_layer_selection,
+ winsorize_activations=adv_winsorize,
+ winsorize_percentile=float(adv_winsorize_percentile),
+ use_kl_optimization=adv_kl_optimization,
+ kl_budget=float(adv_kl_budget),
+ float_layer_interpolation=adv_float_layer_interp,
+ rdo_refinement=adv_rdo_refinement,
+ cot_aware=adv_cot_aware,
+ n_sae_features=int(adv_n_sae_features),
)
pipeline_ref[0] = pipeline
pipeline.run()
@@ -2103,10 +2193,9 @@ def obliterate(model_choice: str, method_choice: str,
bnb_4bit_quant_type="nf4",
llm_int8_enable_fp32_cpu_offload=True,
)
- model_reloaded = AutoModelForCausalLM.from_pretrained(
+ model_reloaded = _load_model_to_device(
save_dir,
quantization_config=bnb_cfg,
- device_map="auto",
trust_remote_code=True,
)
tokenizer_reloaded = AutoTokenizer.from_pretrained(
@@ -2144,9 +2233,8 @@ def obliterate(model_choice: str, method_choice: str,
yield status_msg, "\n".join(log_lines), gr.update(), gr.update(), gr.update(), gr.update()
try:
offload_dir = tempfile.mkdtemp(prefix="obliteratus_offload_")
- model_reloaded = AutoModelForCausalLM.from_pretrained(
+ model_reloaded = _load_model_to_device(
save_dir,
- device_map="auto",
offload_folder=offload_dir,
torch_dtype=torch.float16,
trust_remote_code=True,
@@ -2307,8 +2395,8 @@ def chat_respond(message: str, history: list[dict], system_prompt: str,
if checkpoint and Path(checkpoint).exists():
try:
is_preset = (_state.get("model_name") or "") in MODELS
- model = AutoModelForCausalLM.from_pretrained(
- checkpoint, device_map="auto", torch_dtype=torch.float16,
+ model = _load_model_to_device(
+ checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer = AutoTokenizer.from_pretrained(
@@ -2498,8 +2586,8 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
if checkpoint and Path(checkpoint).exists():
is_preset = (_state.get("model_name") or "") in MODELS
try:
- model_loaded = AutoModelForCausalLM.from_pretrained(
- checkpoint, device_map="auto", torch_dtype=torch.float16,
+ model_loaded = _load_model_to_device(
+ checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer_loaded = AutoTokenizer.from_pretrained(
@@ -2559,9 +2647,8 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
is_preset = cfg["model_choice"] in MODELS
try:
- model_loaded = AutoModelForCausalLM.from_pretrained(
+ model_loaded = _load_model_to_device(
checkpoint_dir,
- device_map="auto",
torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
@@ -2595,10 +2682,9 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
)
yield f"**Loading {choice}** in 4-bit (model too large for fp16)...", ""
progress(0.5, desc="Loading 4-bit...")
- model_loaded = AutoModelForCausalLM.from_pretrained(
+ model_loaded = _load_model_to_device(
checkpoint_dir,
quantization_config=bnb_cfg,
- device_map="auto",
trust_remote_code=is_preset,
)
tokenizer_loaded = AutoTokenizer.from_pretrained(
@@ -2740,8 +2826,8 @@ def ab_chat_respond(message: str, history_left: list[dict], history_right: list[
if checkpoint and Path(checkpoint).exists():
try:
is_preset = (model_name or "") in MODELS
- abliterated_model = AutoModelForCausalLM.from_pretrained(
- checkpoint, device_map="auto", torch_dtype=torch.float16,
+ abliterated_model = _load_model_to_device(
+ checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer = AutoTokenizer.from_pretrained(
@@ -2866,10 +2952,9 @@ def ab_chat_respond(message: str, history_left: list[dict], history_right: list[
is_preset = model_name in MODELS
original_response = ""
try:
- from transformers import AutoModelForCausalLM as AMCLM
- original_model = AMCLM.from_pretrained(
+ original_model = _load_model_to_device(
model_id, torch_dtype=torch.float16,
- device_map="auto", trust_remote_code=is_preset,
+ trust_remote_code=is_preset,
low_cpu_mem_usage=True,
token=os.environ.get("HF_TOKEN") or None,
)
@@ -3026,6 +3111,9 @@ def strength_sweep(model_choice: str, method_choice: str,
entry["perplexity"] = metrics.get("perplexity")
entry["refusal_rate"] = metrics.get("refusal_rate")
entry["coherence"] = metrics.get("coherence")
+ entry["kl_divergence"] = metrics.get("kl_divergence")
+ entry["spectral_cert"] = metrics.get("spectral_certification") or ""
+ entry["direction_method"] = getattr(pipe, "direction_method", "")
entry["strong_layers"] = len(pipe._strong_layers)
if hasattr(pipe, "handle") and pipe.handle is not None:
pipe.handle.model = None
@@ -3115,17 +3203,21 @@ def _format_sweep_results(results: list[dict]) -> str:
return "*No results yet.*"
lines = ["### Strength Sweep Results", "",
- "| Reg | Time | Perplexity | Refusal Rate | Coherence | Error |",
- "|-----|------|-----------|-------------|-----------|-------|"]
+ "| Reg | Dir | Time | PPL | Refusal | Coherence | KL Div | Cert | Error |",
+ "|-----|-----|------|-----|---------|-----------|--------|------|-------|"]
for r in results:
reg = f"{r['regularization']:.3f}"
ppl = f"{r['perplexity']:.2f}" if r.get("perplexity") is not None else "—"
ref = f"{r['refusal_rate']:.0%}" if r.get("refusal_rate") is not None else "—"
coh = f"{r['coherence']:.0%}" if r.get("coherence") is not None else "—"
+ kl_val = r.get("kl_divergence")
+ kl_str = f"{kl_val:.4f}" if kl_val is not None else "—"
+ cert = r.get("spectral_cert", "") or "—"
+ dir_m = r.get("direction_method", "") or "—"
err = r.get("error", "")
err_short = (err[:25] + "...") if err and len(err) > 25 else (err or "")
- lines.append(f"| {reg} | {r['time_s']}s | {ppl} | {ref} | {coh} | {err_short} |")
+ lines.append(f"| {reg} | {dir_m} | {r['time_s']}s | {ppl} | {ref} | {coh} | {kl_str} | {cert} | {err_short} |")
return "\n".join(lines)
@@ -3173,8 +3265,8 @@ def _tourney_gpu_wrapper(fn, *args, **kwargs):
return _tourney_gpu_run(fn, *args, **kwargs)
-def run_tourney(model_choice, dataset, quantization):
- """Run an elimination tournament across all abliteration methods.
+def run_tourney(model_choice, selected_methods, dataset, quantization):
+ """Run an elimination tournament across selected abliteration methods.
Each individual method is run inside its own ``@spaces.GPU`` allocation
(up to 5 minutes per method) so the full tournament is not constrained
@@ -3187,6 +3279,10 @@ def run_tourney(model_choice, dataset, quantization):
yield "**Error:** Select a model first.", "", ""
return
+ if not selected_methods or len(selected_methods) < 3:
+ yield "**Error:** Select at least 3 methods for a tournament.", "", ""
+ return
+
from obliteratus.tourney import (
TourneyRunner, render_bracket_html,
_load_checkpoint, _checkpoint_matches,
@@ -3218,6 +3314,7 @@ def run_tourney(model_choice, dataset, quantization):
hub_repo=None,
dataset_key=dataset_key,
quantization=quant,
+ methods=list(selected_methods),
on_log=logger,
resume=resume,
)
@@ -3322,18 +3419,27 @@ def run_tourney(model_choice, dataset, quantization):
_ts = datetime.now().strftime("%H:%M")
_short = model_id.split("/")[-1] if "/" in model_id else model_id
_label = f"tourney winner ({winner.method}) on {_short} ({_ts})"
+ _winner_meta = {
+ "model_id": model_id,
+ "model_choice": model_choice,
+ "method": winner.method,
+ "dataset_key": dataset_key,
+ "prompt_volume": 0,
+ "output_dir": winner.output_dir,
+ "source": "tourney",
+ "tourney_score": winner.score,
+ "tourney_metrics": winner.metrics,
+ }
with _lock:
- _session_models[_label] = {
- "model_id": model_id,
- "model_choice": model_choice,
- "method": winner.method,
- "dataset_key": dataset_key,
- "prompt_volume": 0,
- "output_dir": winner.output_dir,
- "source": "tourney",
- "tourney_score": winner.score,
- "tourney_metrics": winner.metrics,
- }
+ _session_models[_label] = _winner_meta
+ # Persist so the winner survives ZeroGPU process restarts
+ _persist_session_meta(winner.output_dir, _label, {
+ "model_id": model_id,
+ "model_choice": model_choice,
+ "method": winner.method,
+ "dataset_key": dataset_key,
+ "source": "tourney",
+ })
yield (
f"**Champion: `{winner.method}`** "
f"(score: {winner.score:.4f})\n"
@@ -3930,7 +4036,13 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
with gr.Row():
adv_n_directions = gr.Slider(
1, 8, value=_defaults["n_directions"], step=1,
- label="Directions", info="Number of refusal directions to extract via SVD",
+ label="Directions", info="Number of refusal directions to extract",
+ )
+ adv_direction_method = gr.Radio(
+ choices=["diff_means", "svd", "leace"],
+ value=_defaults["direction_method"],
+ label="Direction Method",
+ info="diff_means: simple & robust, svd: multi-direction, leace: optimal erasure",
)
adv_regularization = gr.Slider(
0.0, 1.0, value=_defaults["regularization"], step=0.05,
@@ -3996,10 +4108,52 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
with gr.Row():
adv_spectral_cascade = gr.Checkbox(value=_defaults["spectral_cascade"], label="Spectral Cascade",
info="DCT frequency decomposition for precision refusal targeting")
+ gr.Markdown("**Layer Selection & Baseline Options**")
+ with gr.Row():
+ adv_layer_selection = gr.Dropdown(
+ choices=["knee_cosmic", "all", "all_except_first", "middle60", "top_k", "knee"],
+ value=_defaults["layer_selection"],
+ label="Layer Selection",
+ info="Which layers to project refusal directions from",
+ )
+ adv_winsorize_percentile = gr.Slider(
+ 0.0, 1.0, value=_defaults["winsorize_percentile"], step=0.01,
+ label="Winsorize Percentile",
+ info="Activation clamping quantile (1.0 = disabled, 0.01 = 99th pctile)",
+ )
+ adv_kl_budget = gr.Slider(
+ 0.0, 2.0, value=_defaults["kl_budget"], step=0.1,
+ label="KL Budget",
+ info="Max KL divergence from base model (Heretic/optimized)",
+ )
+ with gr.Row():
+ adv_winsorize = gr.Checkbox(value=_defaults["winsorize_activations"], label="Winsorize Activations",
+ info="Clamp outlier activations before direction extraction")
+ adv_kl_optimization = gr.Checkbox(value=_defaults["use_kl_optimization"], label="KL Optimization",
+ info="Optimize projection strength to stay within KL budget")
+ adv_float_layer_interp = gr.Checkbox(value=_defaults["float_layer_interpolation"], label="Float Layer Interpolation",
+ info="Interpolate between adjacent layers' directions (Heretic)")
+ adv_rdo_refinement = gr.Checkbox(value=_defaults["rdo_refinement"], label="RDO Refinement",
+ info="Gradient-based direction refinement (Wollschlager et al.)")
+ with gr.Row():
+ adv_cot_aware = gr.Checkbox(value=_defaults["cot_aware"], label="CoT-Aware",
+ info="Preserve chain-of-thought reasoning during abliteration")
+ with gr.Row():
+ adv_bayesian_trials = gr.Slider(
+ 10, 200, value=_defaults["bayesian_trials"], step=10,
+ label="Bayesian Trials",
+ info="Optuna TPE optimization trials (Heretic/optimized methods)",
+ )
+ adv_n_sae_features = gr.Slider(
+ 16, 256, value=_defaults["n_sae_features"], step=16,
+ label="SAE Features",
+ info="Number of SAE features to target (inverted/nuclear methods)",
+ )
# List of all advanced controls (order must match _on_method_change return)
_adv_controls = [
- adv_n_directions, adv_regularization, adv_refinement_passes,
+ adv_n_directions, adv_direction_method,
+ adv_regularization, adv_refinement_passes,
adv_reflection_strength, adv_embed_regularization,
adv_steering_strength, adv_transplant_blend,
adv_spectral_bands, adv_spectral_threshold,
@@ -4011,6 +4165,12 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
adv_project_embeddings, adv_activation_steering,
adv_expert_transplant, adv_wasserstein_optimal,
adv_spectral_cascade,
+ adv_layer_selection, adv_winsorize,
+ adv_winsorize_percentile,
+ adv_kl_optimization, adv_kl_budget,
+ adv_float_layer_interp, adv_rdo_refinement,
+ adv_cot_aware,
+ adv_bayesian_trials, adv_n_sae_features,
]
obliterate_btn = gr.Button(
@@ -4181,7 +4341,8 @@ result = client.predict(
mm_method = gr.Dropdown(
choices=["basic", "advanced", "aggressive",
"spectral_cascade", "informed", "surgical",
- "optimized", "inverted", "nuclear"],
+ "optimized", "inverted", "nuclear",
+ "failspy", "gabliteration", "heretic", "rdo"],
value="surgical",
label="Abliteration Method",
)
@@ -4550,11 +4711,11 @@ tradeoff point where refusal is minimized with minimal capability damage.
# ── Tab 6: Tourney ────────────────────────────────────────────────
with gr.Tab("Tourney", id="tourney"):
- gr.Markdown("""### March Madness Tournament
-Pit **all abliteration methods** against each other in elimination rounds.
+ gr.Markdown("""### Tourney Mode
+Pit abliteration methods against each other in elimination rounds.
The winner is saved locally — push it to HuggingFace Hub from the **Push to Hub** tab.
-**Round 1 — Qualifiers:** All methods, reduced prompts. Bottom half eliminated.
+**Round 1 — Qualifiers:** Selected methods, reduced prompts. Bottom half eliminated.
**Round 2 — Semifinals:** Survivors, full prompts. Bottom half eliminated.
**Round 3 — Finals:** Top contenders, maximum prompts. Champion crowned.
""")
@@ -4566,6 +4727,14 @@ The winner is saved locally — push it to HuggingFace Hub from the **Push to Hu
allow_custom_value=True,
)
+ from obliteratus.tourney import TOURNEY_METHODS as _ALL_TOURNEY_METHODS
+ tourney_methods_cb = gr.CheckboxGroup(
+ choices=_ALL_TOURNEY_METHODS,
+ value=_ALL_TOURNEY_METHODS,
+ label="Methods to Compete",
+ info="Pick at least 3 methods. All selected by default.",
+ )
+
with gr.Accordion("Advanced Settings", open=False):
with gr.Row():
tourney_dataset_dd = gr.Dropdown(
@@ -4595,9 +4764,16 @@ The winner is saved locally — push it to HuggingFace Hub from the **Push to Hu
tourney_btn.click(
fn=run_tourney,
- inputs=[tourney_model_dd,
+ inputs=[tourney_model_dd, tourney_methods_cb,
tourney_dataset_dd, tourney_quant_dd],
outputs=[tourney_status, tourney_bracket, tourney_log],
+ ).then(
+ fn=lambda: (
+ gr.update(choices=_get_session_model_choices()),
+ gr.update(choices=_get_session_model_choices()),
+ _get_vram_html(),
+ ),
+ outputs=[session_model_dd, ab_session_model_dd, vram_display],
)
# ── Tab 7: Export ─────────────────────────────────────────────────
diff --git a/docs/index.html b/docs/index.html
index 180c798..8f2f001 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -1317,11 +1317,56 @@
AGGRESSIVE
Full Gabliteration + 3-pass refine
+
+
+
+
+
+
+
+
+
4 SVD directions • norm-preserving • 30% regularization • 2 refinement passes • 32 prompt pairs
@@ -1941,10 +1986,19 @@ function startAbliterateFromLibrary(hfId) {
let ablMethod = 'advanced';
const METHOD_INFO = {
- basic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'1 direction • standard projection • 1 pass • 32 prompt pairs'},
- advanced: {dirs:4, norm:true, reg:0.3, passes:2, desc:'4 SVD directions • norm-preserving • 30% regularization • 2 refinement passes • 32 prompt pairs'},
- aggressive: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions • norm-preserving • full orthogonalization • 3 refinement passes • 32 prompt pairs'},
- informed: {dirs:'auto', norm:true, reg:'auto', passes:'auto', desc:'
Analysis-guided • auto directions • auto regularization • Ouroboros-compensated • cone/alignment/cluster/defense analysis'},
+ basic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'1 direction • standard projection • 1 pass'},
+ advanced: {dirs:4, norm:true, reg:0.3, passes:2, desc:'4 SVD directions • norm-preserving • 30% regularization • 2 refinement passes'},
+ aggressive: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions • norm-preserving • full orthogonalization • 3 refinement passes'},
+ spectral_cascade: {dirs:6, norm:true, reg:0.15, passes:1, desc:'6 whitened-SVD directions • DCT frequency decomposition • coherence-weighted • adaptive bands'},
+ informed: {dirs:'auto', norm:true, reg:'auto', passes:'auto', desc:'
Analysis-guided • auto directions • auto regularization • Ouroboros-compensated • cone/alignment/cluster analysis'},
+ surgical: {dirs:4, norm:true, reg:0.2, passes:2, desc:'4 SVD directions • attention head surgery • SAE features • safety neuron masking • per-expert MoE'},
+ optimized: {dirs:4, norm:true, reg:0.2, passes:2, desc:'4 SVD directions • Bayesian auto-tuned • CoT-aware • KL co-optimized • winsorized activations'},
+ inverted: {dirs:4, norm:true, reg:0.1, passes:2, desc:'4 SVD directions • semantic inversion (2x reflection) • SAE feature targeting'},
+ nuclear: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions • all techniques combined • maximum force • head surgery + SAE + steering + transplant'},
+ failspy: {dirs:1, norm:false, reg:0.0, passes:1, desc:'
Baseline • 1 diff-means direction • all layers except first • FailSpy/abliterator reproduction'},
+ gabliteration: {dirs:4, norm:false, reg:0.231, passes:1, desc:'
Baseline • 4 SVD directions • ridge reg (alpha=0.3) • top-k layer selection • Gülmez 2026'},
+ heretic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'
Baseline • 1 diff-means • Bayesian (Optuna TPE) • KL-optimized • float layer interpolation • p-e-w'},
+ rdo: {dirs:4, norm:true, reg:0.0, passes:1, desc:'
Baseline • 4 SVD directions • gradient-refined (RDO) • linear probe classifier • Wollschlager ICML 2025'},
};
function getAblCmd() {
diff --git a/obliteratus/abliterate.py b/obliteratus/abliterate.py
index 6c6f3b9..800ae69 100644
--- a/obliteratus/abliterate.py
+++ b/obliteratus/abliterate.py
@@ -63,6 +63,7 @@ METHODS = {
"label": "Basic (Arditi et al.)",
"description": "Single refusal direction via difference-in-means",
"n_directions": 1,
+ "direction_method": "diff_means",
"norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
@@ -75,6 +76,7 @@ METHODS = {
"label": "Advanced (Multi-direction + Norm-preserving)",
"description": "SVD-based multi-direction extraction with norm preservation",
"n_directions": 4,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.3,
"embed_regularization": 0.5,
@@ -97,6 +99,7 @@ METHODS = {
"Zero regularization for maximum refusal removal."
),
"n_directions": 8,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 3,
@@ -124,6 +127,7 @@ METHODS = {
"separating trained-in refusal patterns from per-layer artifacts."
),
"n_directions": 6,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -146,25 +150,31 @@ METHODS = {
"Uses InformedAbliterationPipeline for the full feedback loop. "
"Auto-detects alignment method (DPO/RLHF/CAI/SFT), maps concept "
"cone geometry, performs cluster-aware layer selection, and gates "
- "projection by safety-capability entanglement. Includes spectral "
- "certification of abliteration completeness and Wasserstein-optimal "
- "primary direction extraction."
+ "projection by safety-capability entanglement. Defaults to single "
+ "diff-of-means direction + Bayesian optimization (Heretic-style). "
+ "LEACE available via direction_method='leace'."
),
- "n_directions": 4,
+ "n_directions": 1,
+ "direction_method": "diff_means",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
"project_biases": True,
"use_chat_template": True,
- "use_whitened_svd": True,
+ "use_whitened_svd": False,
"true_iterative_refinement": True,
"use_jailbreak_contrast": False,
- "layer_adaptive_strength": False,
+ "layer_adaptive_strength": True,
"safety_neuron_masking": False,
"per_expert_directions": False,
"attention_head_surgery": False,
"use_sae_features": False,
- "use_wasserstein_optimal": True,
+ "use_wasserstein_optimal": False,
+ "use_kl_optimization": True,
+ "kl_budget": 0.5,
+ "float_layer_interpolation": True,
+ "winsorize_activations": True,
+ "winsorize_percentile": 0.01,
},
"surgical": {
"label": "Surgical (Full SOTA MoE-Aware)",
@@ -176,6 +186,7 @@ METHODS = {
"minimizing capability damage via precision targeting."
),
"n_directions": 8,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -204,6 +215,7 @@ METHODS = {
"techniques plus the inversion layer."
),
"n_directions": 8,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -234,6 +246,7 @@ METHODS = {
"Best for maximizing quality when compute budget allows ~50 trials."
),
"n_directions": 4,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 1,
@@ -275,6 +288,7 @@ METHODS = {
"runtime overhead except lightweight steering hooks."
),
"n_directions": 4,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -314,12 +328,14 @@ METHODS = {
"description": (
"Faithful reproduction of the FailSpy/abliterator library — the "
"most widely used community tool. Single direction via difference-"
- "in-means (Arditi et al.), middle 60%% layer heuristic (layers "
- "20%%-80%%), no regularization, no norm preservation. Uses chat "
- "template for instruct models. This is what most HuggingFace "
- "abliterated models were created with."
+ "in-means (Arditi et al.), applied to all layers except layer 0 "
+ "(matching FailSpy source: range(1, n_layers)). Projects both "
+ "W_O (attention output) and MLP W_out. No regularization, no "
+ "norm preservation. Uses chat template for instruct models. "
+ "This is what most HuggingFace abliterated models were created with."
),
"n_directions": 1,
+ "direction_method": "diff_means",
"norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
@@ -334,7 +350,7 @@ METHODS = {
"attention_head_surgery": False,
"use_sae_features": False,
"invert_refusal": False,
- "layer_selection": "middle60",
+ "layer_selection": "all_except_first",
},
"gabliteration": {
"label": "Gabliteration (Gülmez 2026 Baseline)",
@@ -347,6 +363,7 @@ METHODS = {
"whitened SVD, no iterative refinement."
),
"n_directions": 4,
+ "direction_method": "svd",
"norm_preserve": False,
# Ridge alpha=0.3 → effective reg = alpha/(1+alpha) = 0.3/1.3 ≈ 0.231
# For orthonormal V: P_V^alpha = 1/(1+alpha) * VV^T = 0.769 * VV^T
@@ -367,19 +384,26 @@ METHODS = {
"layer_selection": "top_k",
},
"heretic": {
- "label": "Heretic / p-e-w (2025 Baseline)",
+ "label": "Heretic / p-e-w (2025-2026 Baseline)",
"description": (
- "Faithful reproduction of Heretic's core algorithm (p-e-w, 2025). "
- "Bayesian optimization via Optuna TPE with parametric bell curve "
- "kernel. Uses 1-2 directions (float interpolation between top SVD "
- "components), component-specific scaling (attention vs MLP), "
- "activation winsorization (1%% tails). No whitened SVD, no SAE "
- "features, no jailbreak contrast. The key innovation is replacing "
+ "Faithful reproduction of Heretic's core algorithm (p-e-w, 2025-2026). "
+ "Bayesian optimization via Optuna TPE with linear bell curve layer "
+ "weighting (NOT Gaussian — linear interpolation between max_weight and "
+ "min_weight over min_weight_distance). One diff-of-means direction per "
+ "layer; direction_scope is sampled ('global' selects a float layer index "
+ "with lerp between adjacent layers' directions, 'per layer' uses each "
+ "layer's own direction). LoRA-based ablation (delta W = -lambda * v * "
+ "(v^T W)), never modifies base weights directly. Row normalization "
+ "defaults to NONE (PRE and FULL are options). Activation winsorization "
+ "via symmetric quantile clamping. The key innovation is replacing "
"manual hyperparameter selection with automated Pareto optimization "
- "over the (refusal_rate, KL_divergence) frontier."
+ "over the (refusal_count, KL_divergence) frontier."
),
- "n_directions": 2,
- "norm_preserve": True,
+ "n_directions": 1,
+ "direction_method": "diff_means",
+ # Heretic default row_normalization is NONE; PRE/FULL are optional.
+ # OBLITERATUS norm_preserve=False matches Heretic's default behavior.
+ "norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
"project_biases": False,
@@ -387,14 +411,21 @@ METHODS = {
"use_whitened_svd": False,
"true_iterative_refinement": False,
"use_jailbreak_contrast": False,
- "layer_adaptive_strength": True,
+ # Heretic uses its own bell curve weighting (linear, not Gaussian),
+ # not OBLITERATUS's norm-based layer_adaptive_strength.
+ "layer_adaptive_strength": False,
"safety_neuron_masking": False,
"per_expert_directions": False,
"attention_head_surgery": False,
"use_sae_features": False,
"invert_refusal": False,
- "winsorize_activations": True,
- "winsorize_percentile": 0.01,
+ # Heretic default winsorization_quantile is 1.0 (disabled by default).
+ # For faithful baseline reproduction we match the source default.
+ "winsorize_activations": False,
+ "winsorize_percentile": 1.0,
+ # Heretic's float direction index interpolates between adjacent LAYERS'
+ # directions (not SVD components). OBLITERATUS float_layer_interpolation
+ # provides the bell-curve layer weighting aspect.
"float_layer_interpolation": True,
"cot_aware": False,
"use_kl_optimization": True,
@@ -414,6 +445,7 @@ METHODS = {
"boundary rather than the statistical activation difference."
),
"n_directions": 4,
+ "direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 1,
@@ -566,6 +598,7 @@ class AbliterationPipeline:
hub_token: str | None = None,
hub_community_org: str | None = None,
n_directions: int | None = None,
+ direction_method: str | None = None,
norm_preserve: bool | None = None,
regularization: float | None = None,
refinement_passes: int | None = None,
@@ -659,6 +692,7 @@ class AbliterationPipeline:
method_cfg = METHODS[method]
self.method = method
self.n_directions = n_directions if n_directions is not None else method_cfg["n_directions"]
+ self.direction_method = direction_method if direction_method is not None else method_cfg.get("direction_method", "svd")
self.norm_preserve = norm_preserve if norm_preserve is not None else method_cfg["norm_preserve"]
self.regularization = regularization if regularization is not None else method_cfg["regularization"]
self.refinement_passes = refinement_passes if refinement_passes is not None else method_cfg["refinement_passes"]
@@ -936,7 +970,7 @@ class AbliterationPipeline:
self.log(f"Loading model: {self.model_name}")
self.log(f"Device: {self.device} | Dtype: {self.dtype}")
self.log(f"Method: {method_label}")
- self.log(f" Directions: {self.n_directions} | Norm-preserve: {self.norm_preserve}")
+ self.log(f" Directions: {self.n_directions} ({self.direction_method}) | Norm-preserve: {self.norm_preserve}")
self.log(f" Regularization: {self.regularization} | Refinement passes: {self.refinement_passes}")
self.handle = load_model(
@@ -1400,18 +1434,26 @@ class AbliterationPipeline:
else:
max_length = 384 if collect_multi_pos else 256
free_gb = dev.get_total_free_gb()
+ # Scale memory thresholds by model size — a 1.2B model needs far
+ # less KV-cache memory per token than a 7B model. Baseline
+ # thresholds (4 / 2 GB) were tuned for 7B (hidden=4096, layers=32).
+ _h = self.handle.hidden_size if self.handle else 4096
+ _l = n_layers if n_layers else 32
+ _mem_scale = (_h / 4096) * (_l / 32)
+ _tight_gb = max(4.0 * _mem_scale, 0.5)
+ _low_gb = max(2.0 * _mem_scale, 0.25)
if dev.is_gpu_available():
- if self.max_seq_length is None and free_gb < 2.0:
+ if self.max_seq_length is None and free_gb < _low_gb:
max_length = 64
- self.log(f" Low GPU memory ({free_gb:.1f} GB free), using max_length={max_length}")
- elif self.max_seq_length is None and free_gb < 4.0:
+ self.log(f" Low GPU memory ({free_gb:.1f} GB free, threshold {_low_gb:.1f} GB), using max_length={max_length}")
+ elif self.max_seq_length is None and free_gb < _tight_gb:
max_length = 128
- self.log(f" Tight GPU memory ({free_gb:.1f} GB free), using max_length={max_length}")
+ self.log(f" Tight GPU memory ({free_gb:.1f} GB free, threshold {_tight_gb:.1f} GB), using max_length={max_length}")
device = self._get_model_device(model)
# Batch prompts for throughput — hooks unbatch per-prompt activations
- batch_size = 16 if free_gb > 4.0 else 8 if free_gb > 2.0 else 1
+ batch_size = 16 if free_gb > _tight_gb else 8 if free_gb > _low_gb else 1
# Left-pad so position -1 is always the last real token in every batch element
orig_padding_side = getattr(tokenizer, "padding_side", "right")
if batch_size > 1:
@@ -1498,9 +1540,16 @@ class AbliterationPipeline:
wasserstein_extractor = WassersteinOptimalExtractor()
self.log("Using Wasserstein-optimal direction extraction (cost-minimizing GEP)")
+ # Optionally use LEACE for theoretically optimal concept erasure
+ leace_extractor = None
+ if self.direction_method == "leace":
+ from obliteratus.analysis.leace import LEACEExtractor
+ leace_extractor = LEACEExtractor()
+ self.log("Using LEACE (closed-form optimal concept erasure) for direction extraction")
+
# Optionally use whitened SVD for cleaner direction extraction
whitened_extractor = None
- if self.use_whitened_svd and n_dirs > 1 and not self.use_wasserstein_optimal:
+ if self.use_whitened_svd and n_dirs > 1 and not self.use_wasserstein_optimal and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
self.log("Using whitened SVD (covariance-normalized) for direction extraction")
@@ -1547,6 +1596,30 @@ class AbliterationPipeline:
if idx < 5:
self.log(f" layer {idx}: Wasserstein extraction failed ({e}), falling back to SVD")
+ if leace_extractor is not None:
+ # LEACE: closed-form optimal concept erasure direction
+ if idx in self._harmful_acts and idx in self._harmless_acts:
+ try:
+ l_result = leace_extractor.extract(
+ self._harmful_acts[idx],
+ self._harmless_acts[idx],
+ layer_idx=idx,
+ )
+ self.refusal_directions[idx] = l_result.direction
+ self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
+ norms[idx] = l_result.generalized_eigenvalue
+
+ if idx < 5 or idx == n_layers - 1:
+ self.log(
+ f" layer {idx}: LEACE eigenvalue={l_result.generalized_eigenvalue:.4f}, "
+ f"erasure_loss={l_result.erasure_loss:.4f}, "
+ f"cond={l_result.within_class_condition:.0f}"
+ )
+ continue
+ except Exception as e:
+ if idx < 5:
+ self.log(f" layer {idx}: LEACE failed ({e}), falling back to diff-of-means")
+
if n_dirs == 1:
# Classic single-direction: difference-in-means
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
@@ -1630,7 +1703,8 @@ class AbliterationPipeline:
# Supports multiple algorithms for baseline comparison:
# knee_cosmic: OBLITERATUS default (knee detection + COSMIC fusion)
# knee: knee detection only (simplified OBLITERATUS)
- # middle60: FailSpy/abliterator heuristic (layers 20%-80%)
+ # middle60: legacy heuristic (layers 20%-80%)
+ # all_except_first: FailSpy/abliterator (all layers except layer 0)
# all: all layers (for Bayesian optimization / Heretic)
# top_k: top-k by refusal strength (Gabliteration-style)
sorted_layers = sorted(norms.items(), key=lambda x: x[1], reverse=True)
@@ -1643,8 +1717,14 @@ class AbliterationPipeline:
selection_method = self.layer_selection
- if selection_method == "middle60":
- # FailSpy/abliterator heuristic: middle 60% of layers
+ if selection_method == "all_except_first":
+ # FailSpy/abliterator: all layers except layer 0
+ # Source: range(1, self.model.cfg.n_layers) in FailSpy/abliterator
+ self._strong_layers = list(range(1, n_layers))
+ self.log(f"Layer selection: all-except-first ({len(self._strong_layers)} layers)")
+
+ elif selection_method == "middle60":
+ # Legacy heuristic: middle 60% of layers (layers 20%-80%)
self._strong_layers = self._select_layers_middle60(n_layers)
self.log(f"Layer selection: middle-60% ({len(self._strong_layers)} layers)")
@@ -2300,14 +2380,14 @@ class AbliterationPipeline:
@staticmethod
def _select_layers_middle60(n_layers: int) -> list[int]:
- """Select the middle 60% of layers (FailSpy/abliterator heuristic).
+ """Select the middle 60% of layers (legacy heuristic).
- The original abliterator library by FailSpy selects layers from index
- n_layers*0.2 to n_layers*0.8, based on the empirical observation that
- refusal concentrates in middle layers (not early embedding layers or
- late unembedding layers).
+ Selects layers from index n_layers*0.2 to n_layers*0.8.
- Reference: FailSpy/abliterator (2024), GitHub.
+ NOTE: This does NOT match FailSpy/abliterator's actual layer selection.
+ FailSpy uses all layers except layer 0 (range(1, n_layers)). Use
+ layer_selection="all_except_first" for faithful FailSpy reproduction.
+ This method is retained for backward compatibility only.
"""
start = int(n_layers * 0.2)
end = int(n_layers * 0.8)
@@ -3589,9 +3669,18 @@ class AbliterationPipeline:
except Exception:
pass
+ # Use LEACE when enabled (matching main _distill)
+ leace_extractor = None
+ if self.direction_method == "leace":
+ try:
+ from obliteratus.analysis.leace import LEACEExtractor
+ leace_extractor = LEACEExtractor()
+ except Exception:
+ pass
+
# Use whitened SVD when enabled (matching main _distill)
whitened_extractor = None
- if self.use_whitened_svd and n_dirs > 1 and wasserstein_extractor is None:
+ if self.use_whitened_svd and n_dirs > 1 and wasserstein_extractor is None and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
@@ -3624,6 +3713,22 @@ class AbliterationPipeline:
except Exception:
pass # Fall through to SVD
+ # LEACE path (matching main _distill)
+ if leace_extractor is not None:
+ if idx in self._harmful_acts and idx in self._harmless_acts:
+ try:
+ l_result = leace_extractor.extract(
+ self._harmful_acts[idx],
+ self._harmless_acts[idx],
+ layer_idx=idx,
+ )
+ self.refusal_directions[idx] = l_result.direction
+ self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
+ norms[idx] = l_result.generalized_eigenvalue
+ continue
+ except Exception:
+ pass # Fall through to diff-of-means
+
if n_dirs == 1:
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
norm = diff.norm()
@@ -3667,7 +3772,9 @@ class AbliterationPipeline:
# Respect configured layer_selection (matching _distill)
selection_method = self.layer_selection
- if selection_method == "middle60":
+ if selection_method == "all_except_first":
+ self._strong_layers = list(range(1, n_layers))
+ elif selection_method == "middle60":
self._strong_layers = self._select_layers_middle60(n_layers)
elif selection_method == "all":
self._strong_layers = self._select_layers_all(n_layers)
@@ -5663,8 +5770,9 @@ class AbliterationPipeline:
cert_n = min(20, len(self.harmful_prompts), len(self.harmless_prompts))
cert_harmful = self._maybe_apply_chat_template(self.harmful_prompts[:cert_n])
cert_harmless = self._maybe_apply_chat_template(self.harmless_prompts[:cert_n])
- cert_h_acts = self._collect_activations(layers, cert_harmful, "cert_harmful")
- cert_b_acts = self._collect_activations(layers, cert_harmless, "cert_harmless")
+ cert_layer_modules = get_layer_modules(self.handle)
+ cert_h_acts = self._collect_activations(cert_layer_modules, cert_harmful, "cert_harmful")
+ cert_b_acts = self._collect_activations(cert_layer_modules, cert_harmless, "cert_harmless")
cert_results = []
for layer_idx in cert_layers:
@@ -5741,6 +5849,7 @@ class AbliterationPipeline:
"method": self.method,
"method_config": {
"n_directions": self.n_directions,
+ "direction_method": self.direction_method,
"norm_preserve": self.norm_preserve,
"regularization": self.regularization,
"refinement_passes": self.refinement_passes,
@@ -5868,10 +5977,11 @@ class AbliterationPipeline:
param_bytes = sum(v.numel() * v.element_size() for v in state_dict.values())
self.log(f"State dict: {len(state_dict)} tensors, {param_bytes / 1e9:.1f} GB")
- # 3. NOW it's safe to clean up the offload dir — all weights are in memory.
- self._cleanup_offload_dir()
-
- # 4. Save model + tokenizer + metadata
+ # 3. Save model + tokenizer + metadata
+ # NOTE: offload dir cleanup is deferred until AFTER save_pretrained
+ # completes, because accelerate's dispatch hooks may still access
+ # the offload dir during serialization (even when state_dict is
+ # explicitly provided).
self.output_dir.mkdir(parents=True, exist_ok=True)
self.log(f"Saving model to {self.output_dir}/")
@@ -5940,6 +6050,9 @@ class AbliterationPipeline:
del state_dict
self._free_gpu_memory()
+ # NOW it's safe to clean up the offload dir — save_pretrained is done.
+ self._cleanup_offload_dir()
+
self.handle.tokenizer.save_pretrained(self.output_dir)
(self.output_dir / "abliteration_metadata.json").write_text(
diff --git a/obliteratus/analysis/conditional_abliteration.py b/obliteratus/analysis/conditional_abliteration.py
index 7f52960..b55bfb7 100644
--- a/obliteratus/analysis/conditional_abliteration.py
+++ b/obliteratus/analysis/conditional_abliteration.py
@@ -269,7 +269,7 @@ class ConditionalAbliterator:
) -> torch.Tensor | None:
"""Extract category-specific refusal direction.
- Uses Fisher's Linear Discriminant (whitened difference-of-means)
+ Uses difference-of-means (category_mean - harmless_mean)
and then orthogonalizes against previously extracted directions
to ensure category independence.
"""
diff --git a/obliteratus/analysis/leace.py b/obliteratus/analysis/leace.py
new file mode 100644
index 0000000..b11ff39
--- /dev/null
+++ b/obliteratus/analysis/leace.py
@@ -0,0 +1,239 @@
+"""LEACE-inspired direction extraction for refusal concept erasure.
+
+This module implements Fisher's Linear Discriminant (FLD) direction for
+concept erasure, inspired by LEACE (Belrose et al. 2023).
+
+IMPORTANT: This is NOT a faithful implementation of LEACE as described in
+the paper. Key difference:
+
+ - **True LEACE** uses the *total* covariance Sigma_X for whitening:
+ P* = I - W^{-1} P_{W Sigma_XZ} W where W = Sigma_X^{-1/2}
+ For binary concepts, this yields: v = Sigma_X^{-1} delta
+
+ - **This implementation** uses *within-class* covariance S_w:
+ v = S_w^{-1} delta
+ This is Fisher's Linear Discriminant direction, which maximizes
+ class separability relative to within-class spread.
+
+For binary concepts, Sigma_X = S_w + p(1-p) * delta @ delta^T,
+so the two directions differ when the between-class scatter is
+non-negligible relative to within-class scatter. In high-dimensional
+settings (d >> 1) with moderate class separation, the difference
+is typically small but non-zero.
+
+The FLD direction is still a strong choice for refusal erasure — it
+handles rogue dimensions (high-variance but non-discriminative) better
+than plain diff-of-means, and is a closed-form solution with no
+iterative optimization.
+
+Advantages over SVD:
+ - Within-class normalization prevents high-variance but
+ non-discriminative dimensions from dominating
+ - No hyperparameters beyond regularization epsilon
+ - Closed-form solution (no iterative optimization)
+
+References:
+ - Belrose et al. (2023): LEACE: Perfect linear concept erasure in
+ closed form. NeurIPS 2023.
+ - Ravfogel et al. (2022): RLACE: Adversarial concept erasure
+ (iterative precursor to LEACE).
+ - Fisher (1936): The use of multiple measurements in taxonomic
+ problems. Annals of Eugenics.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+import torch
+
+
+@dataclass
+class LEACEResult:
+ """Result of LEACE direction extraction for a single layer."""
+
+ layer_idx: int
+ direction: torch.Tensor # (hidden_dim,) unit vector
+ generalized_eigenvalue: float # lambda from GEP (discriminability)
+ within_class_condition: float # condition number of S_w
+ mean_diff_norm: float # ||mu_1 - mu_0||
+ erasure_loss: float # expected squared distortion from erasure
+
+
+class LEACEExtractor:
+ """Extract refusal directions via Fisher's Linear Discriminant.
+
+ Finds the direction that maximally separates harmful from harmless
+ activations relative to within-class variance (v = S_w^{-1} delta).
+ See module docstring for how this relates to true LEACE.
+ """
+
+ def __init__(
+ self,
+ regularization_eps: float = 1e-4,
+ shrinkage: float = 0.0,
+ ):
+ """
+ Args:
+ regularization_eps: Tikhonov regularization for S_w inversion.
+ Larger values produce more conservative (but stable) results.
+ shrinkage: Ledoit-Wolf shrinkage toward identity (0..1).
+ 0 = no shrinkage, 1 = full shrinkage to scaled identity.
+ Useful when n_samples < hidden_dim.
+ """
+ self.regularization_eps = regularization_eps
+ self.shrinkage = shrinkage
+
+ def extract(
+ self,
+ harmful_activations: list[torch.Tensor],
+ harmless_activations: list[torch.Tensor],
+ layer_idx: int = 0,
+ ) -> LEACEResult:
+ """Extract the LEACE direction for a single layer.
+
+ Args:
+ harmful_activations: List of (hidden_dim,) tensors from harmful prompts.
+ harmless_activations: List of (hidden_dim,) tensors from harmless prompts.
+ layer_idx: Layer index (for metadata).
+
+ Returns:
+ LEACEResult with the optimal erasure direction.
+ """
+ H = torch.stack(harmful_activations).float() # (n_h, d)
+ B = torch.stack(harmless_activations).float() # (n_b, d)
+
+ if H.dim() == 3:
+ H = H.squeeze(1)
+ if B.dim() == 3:
+ B = B.squeeze(1)
+
+ n_h, d = H.shape
+ n_b = B.shape[0]
+
+ # Class-conditional means
+ mu_h = H.mean(dim=0) # (d,)
+ mu_b = B.mean(dim=0) # (d,)
+
+ # Mean difference (between-class direction)
+ delta = mu_h - mu_b # (d,)
+ delta_norm = delta.norm().item()
+
+ # Within-class covariance: S_w = (S_h + S_b) / 2
+ # where S_h = (H - mu_h)^T (H - mu_h) / (n_h - 1) etc.
+ H_centered = H - mu_h.unsqueeze(0)
+ B_centered = B - mu_b.unsqueeze(0)
+
+ S_h = (H_centered.T @ H_centered) / max(n_h - 1, 1)
+ S_b = (B_centered.T @ B_centered) / max(n_b - 1, 1)
+ S_w = (S_h + S_b) / 2.0 # (d, d)
+
+ # Apply Ledoit-Wolf shrinkage if requested
+ if self.shrinkage > 0:
+ trace_S_w = S_w.trace().item()
+ S_w = (1 - self.shrinkage) * S_w + self.shrinkage * (trace_S_w / d) * torch.eye(d, device=S_w.device)
+
+ # Regularize S_w for numerical stability
+ S_w_reg = S_w + self.regularization_eps * torch.eye(d, device=S_w.device)
+
+ # Condition number of S_w (for diagnostics)
+ try:
+ eigs_w = torch.linalg.eigvalsh(S_w_reg)
+ eigs_w = eigs_w.clamp(min=0)
+ pos_eigs = eigs_w[eigs_w > eigs_w.max() * 1e-10]
+ condition = (pos_eigs.max() / pos_eigs.min()).item() if pos_eigs.numel() > 0 else float('inf')
+ except Exception:
+ condition = float('inf')
+
+ # LEACE direction via S_w^{-1} @ delta
+ # The generalized eigenvector for rank-1 S_between = delta @ delta^T
+ # reduces to: v = S_w^{-1} @ delta (up to normalization)
+ try:
+ # Use solve for numerical stability (avoids explicit inverse)
+ v = torch.linalg.solve(S_w_reg, delta) # (d,)
+ except torch.linalg.LinAlgError:
+ # Fallback: pseudoinverse
+ v = torch.linalg.lstsq(S_w_reg, delta.unsqueeze(1)).solution.squeeze(1)
+
+ # Normalize to unit length
+ v_norm = v.norm()
+ if v_norm > 1e-8:
+ direction = v / v_norm
+ else:
+ # Degenerate case: fall back to normalized mean difference
+ direction = delta / max(delta_norm, 1e-8)
+
+ # Generalized eigenvalue: lambda = delta^T @ S_w^{-1} @ delta
+ # This measures how discriminable the classes are after whitening
+ gen_eigenvalue = (delta @ v).item()
+
+ # Erasure loss: expected squared distortion E[||x - x'||^2]
+ # For rank-1 projection: loss = v^T @ S_total @ v where S_total
+ # is the total (pooled) covariance
+ all_acts = torch.cat([H, B], dim=0)
+ mu_total = all_acts.mean(dim=0)
+ centered_total = all_acts - mu_total.unsqueeze(0)
+ S_total = (centered_total.T @ centered_total) / max(all_acts.shape[0] - 1, 1)
+ erasure_loss = (direction @ S_total @ direction).item()
+
+ return LEACEResult(
+ layer_idx=layer_idx,
+ direction=direction,
+ generalized_eigenvalue=gen_eigenvalue,
+ within_class_condition=condition,
+ mean_diff_norm=delta_norm,
+ erasure_loss=erasure_loss,
+ )
+
+ def extract_all_layers(
+ self,
+ harmful_acts: dict[int, list[torch.Tensor]],
+ harmless_acts: dict[int, list[torch.Tensor]],
+ ) -> dict[int, LEACEResult]:
+ """Extract LEACE directions for all layers.
+
+ Args:
+ harmful_acts: {layer_idx: [activations]} from activation collection.
+ harmless_acts: {layer_idx: [activations]} from activation collection.
+
+ Returns:
+ {layer_idx: LEACEResult} for each layer.
+ """
+ results = {}
+ for idx in sorted(harmful_acts.keys()):
+ if idx not in harmless_acts:
+ continue
+ results[idx] = self.extract(
+ harmful_acts[idx],
+ harmless_acts[idx],
+ layer_idx=idx,
+ )
+ return results
+
+ @staticmethod
+ def compare_with_diff_of_means(
+ leace_result: LEACEResult,
+ harmful_mean: torch.Tensor,
+ harmless_mean: torch.Tensor,
+ ) -> dict[str, float]:
+ """Compare LEACE direction with simple diff-of-means.
+
+ Returns cosine similarity and diagnostic metrics showing how much
+ the within-class normalization rotates the direction.
+ """
+ diff = harmful_mean.squeeze() - harmless_mean.squeeze()
+ diff_norm = diff.norm()
+ if diff_norm > 1e-8:
+ diff_normalized = diff / diff_norm
+ else:
+ diff_normalized = diff
+
+ cosine_sim = (leace_result.direction @ diff_normalized).abs().item()
+
+ return {
+ "cosine_similarity": cosine_sim,
+ "leace_eigenvalue": leace_result.generalized_eigenvalue,
+ "leace_erasure_loss": leace_result.erasure_loss,
+ "within_class_condition": leace_result.within_class_condition,
+ "mean_diff_norm": leace_result.mean_diff_norm,
+ }
diff --git a/obliteratus/analysis/riemannian_manifold.py b/obliteratus/analysis/riemannian_manifold.py
index 857b0bf..e5980de 100644
--- a/obliteratus/analysis/riemannian_manifold.py
+++ b/obliteratus/analysis/riemannian_manifold.py
@@ -428,8 +428,15 @@ class RiemannianManifoldAnalyzer:
geodesic triangle with area A satisfies:
sum(angles) = pi + K * A (Gauss-Bonnet for small triangles)
- We approximate geodesics with straight lines (valid for small K)
- and use angle excess to estimate K.
+ IMPORTANT LIMITATION: This method uses Euclidean chords and angles
+ in ambient space, NOT geodesics on the manifold. In flat Euclidean
+ space, the angle sum of any triangle is exactly pi, so this method
+ will yield K ≈ 0 (up to numerical noise) regardless of the actual
+ manifold curvature. The results are only meaningful when the data
+ lies on an approximately low-dimensional curved submanifold and
+ triangles are sufficiently small relative to the curvature radius.
+ For rigorous curvature estimates, use methods based on local PCA
+ eigenvalue decay or Jacobian-based Riemannian metric computation.
"""
# Compute sides
ab = (b - a).float()
@@ -613,8 +620,12 @@ class RiemannianManifoldAnalyzer:
return torch.zeros_like(activation)
v = v / norm
- # Correction magnitude: K * proj_magnitude^2 / 2
- correction_magnitude = curvature * proj_magnitude ** 2 / 2.0
+ # Second-order geodesic correction: K * proj_magnitude^2 / 6
+ # From Jacobi field estimate: deviation of geodesic from straight
+ # line over distance L with curvature K is ≈ K * L^2 / 6.
+ # Note: the residual bound in analyze() uses K * ||x||^2 / 8
+ # which is a looser upper bound including higher-order terms.
+ correction_magnitude = curvature * proj_magnitude ** 2 / 6.0
# Clamp to prevent instability
correction_magnitude = max(-0.1, min(0.1, correction_magnitude))
diff --git a/obliteratus/analysis/sae_abliteration.py b/obliteratus/analysis/sae_abliteration.py
index 479a474..0d6f70e 100644
--- a/obliteratus/analysis/sae_abliteration.py
+++ b/obliteratus/analysis/sae_abliteration.py
@@ -94,7 +94,7 @@ class SparseAutoencoder(nn.Module):
@property
def decoder_weight(self) -> torch.Tensor:
- """Return the decoder weight matrix (n_features x hidden_dim for untied, or encoder.weight.T)."""
+ """Return the decoder weight matrix (hidden_dim x n_features for untied, or encoder.weight.T)."""
if self.tied_weights:
return self.encoder.weight.T
return self.decoder.weight
diff --git a/obliteratus/analysis/spectral_certification.py b/obliteratus/analysis/spectral_certification.py
index 19061fc..32f04f8 100644
--- a/obliteratus/analysis/spectral_certification.py
+++ b/obliteratus/analysis/spectral_certification.py
@@ -175,10 +175,11 @@ class SpectralCertifier:
harmful_centered = harmful_activations - harmful_mean
harmless_centered = harmless_activations - harmless_mean
- # Pooled within-class covariance
+ # Pooled within-class covariance (standard formula: sum of scatter
+ # matrices divided by total degrees of freedom)
cov_h = harmful_centered.T @ harmful_centered / max(n_h - 1, 1)
cov_b = harmless_centered.T @ harmless_centered / max(n_b - 1, 1)
- pooled_cov = (cov_h * n_h + cov_b * n_b) / max(n - 2, 1)
+ pooled_cov = (cov_h * (n_h - 1) + cov_b * (n_b - 1)) / max(n - 2, 1)
# Step 2: Estimate noise variance (median eigenvalue method)
noise_var = self._estimate_noise_variance(pooled_cov, n, d)
@@ -374,8 +375,13 @@ class SpectralCertifier:
# Correct for MP bias: median of MP distribution
gamma = d / max(n, 1)
if gamma < 1:
- # MP median approximation (from Bai & Silverstein)
- mp_median_ratio = (1 + math.sqrt(gamma)) ** 2 * 0.5
+ # MP median approximation. The exact MP median requires
+ # numerical inversion of the MP CDF; we use the empirical
+ # approximation median ≈ (1 - sqrt(gamma))^2 + gamma^(1/3)
+ # which is more accurate than the naive 0.5 * upper_edge
+ # for small gamma. Falls back to the simpler formula when
+ # gamma is very small.
+ mp_median_ratio = (1 - math.sqrt(gamma)) ** 2 + gamma ** (1.0 / 3.0)
noise_var = median_eig / max(mp_median_ratio, 1e-10)
else:
noise_var = median_eig
diff --git a/obliteratus/analysis/wasserstein_optimal.py b/obliteratus/analysis/wasserstein_optimal.py
index 24b9fb0..51469e2 100644
--- a/obliteratus/analysis/wasserstein_optimal.py
+++ b/obliteratus/analysis/wasserstein_optimal.py
@@ -58,7 +58,7 @@ class WassersteinDirectionResult:
direction: torch.Tensor # (hidden_dim,) optimal direction
wasserstein_cost: float # W_2^2 cost for this direction
mean_shift_component: float # (r^T m)^2 portion
- bures_component: float # r^T Sigma r portion (upper bound)
+ bures_component: float # r^T Sigma r portion (exact when r is eigenvector of Sigma, lower bound otherwise)
refusal_projection: float # (r^T d)^2
cost_effectiveness_ratio: float # W_2^2 / (r^T d)^2
diff --git a/obliteratus/bayesian_optimizer.py b/obliteratus/bayesian_optimizer.py
index 3e6df70..2944003 100644
--- a/obliteratus/bayesian_optimizer.py
+++ b/obliteratus/bayesian_optimizer.py
@@ -142,28 +142,35 @@ def _parametric_layer_weight(
min_weight: float,
spread: float,
) -> float:
- """Compute ablation weight for a layer using a parametric bell curve.
+ """Compute ablation weight for a layer using a piecewise-linear tent kernel.
- This is the Heretic-style parametric kernel:
- - max_weight: peak ablation strength (0..1)
- - peak_position: normalized position of peak (0..1 maps to layer 0..n_layers-1)
- - min_weight: minimum ablation weight at the tails
- - spread: controls width of the bell curve (higher = wider)
+ Faithful reproduction of Heretic's parametric kernel (p-e-w/heretic):
+ - max_weight: peak ablation strength at peak_position
+ - peak_position: normalized position of peak (0..1)
+ - min_weight: weight at the edges of the tent
+ - spread: normalized distance from peak to tent edge (min_weight_distance)
- Returns a value in [min_weight, max_weight] representing how strongly
- to ablate this layer (1.0 = full projection, 0.0 = no projection).
+ Layers beyond ``spread`` from the peak get weight 0 (skipped entirely).
+ Within the tent, weight drops linearly from max_weight to min_weight.
+ This matches Heretic's actual formula::
+
+ distance = abs(layer_index - max_weight_position)
+ if distance > min_weight_distance: skip
+ weight = max_weight + (distance / min_weight_distance) * (min_weight - max_weight)
"""
if n_layers <= 1:
return max_weight
normalized_pos = layer_idx / (n_layers - 1)
- peak = peak_position
- # Gaussian-shaped kernel
- dist = abs(normalized_pos - peak)
- sigma = max(spread, 0.01)
- gauss = math.exp(-0.5 * (dist / sigma) ** 2)
+ dist = abs(normalized_pos - peak_position)
+ min_weight_distance = max(spread, 0.01)
- return min_weight + (max_weight - min_weight) * gauss
+ # Hard cutoff: layers outside the tent get 0 weight (Heretic skips them)
+ if dist > min_weight_distance:
+ return 0.0
+
+ # Linear interpolation: max_weight at peak → min_weight at edges
+ return max_weight + (dist / min_weight_distance) * (min_weight - max_weight)
def _interpolate_direction(
@@ -171,37 +178,56 @@ def _interpolate_direction(
layer_idx: int,
float_dir_idx: float,
) -> torch.Tensor:
- """Get an interpolated refusal direction from a float-valued index.
+ """Get an interpolated refusal direction from a float-valued layer index.
- Non-integer values interpolate between adjacent SVD directions in the
- refusal subspace, unlocking a continuous space of directions beyond
- the discrete top-k.
+ Faithful reproduction of Heretic's direction interpolation: the index
+ selects which *layer's* diff-of-means direction to use, with float
+ values interpolating between adjacent layers' directions. This is
+ fundamentally different from interpolating between SVD components
+ within a single layer — it searches across the layer axis.
+
+ From Heretic source (model.py)::
+
+ weight, index = math.modf(direction_index + 1)
+ refusal_direction = F.normalize(
+ refusal_directions[int(index)].lerp(
+ refusal_directions[int(index) + 1], weight), p=2, dim=0)
Args:
- pipeline: Pipeline with extracted refusal subspaces.
- layer_idx: Which layer's subspace to use.
- float_dir_idx: Continuous direction index (e.g., 0.7 interpolates
- between direction 0 and direction 1).
+ pipeline: Pipeline with extracted refusal directions per layer.
+ layer_idx: The layer being projected (used as fallback).
+ float_dir_idx: Continuous direction index — selects which layer's
+ direction to use (e.g., 5.3 interpolates 70% layer-5 + 30% layer-6).
Returns:
Normalized direction tensor.
"""
- subspace = pipeline.refusal_subspaces.get(layer_idx)
- if subspace is None or subspace.shape[0] == 0:
+ # Build sorted list of layer indices that have refusal directions
+ sorted_layers = sorted(pipeline.refusal_directions.keys())
+ if not sorted_layers:
return pipeline.refusal_directions.get(layer_idx, torch.zeros(1))
- n_dirs = subspace.shape[0]
- # Clamp to valid range
- float_dir_idx = max(0.0, min(float_dir_idx, n_dirs - 1))
+ n_layers_with_dirs = len(sorted_layers)
+
+ # Heretic uses direction_index + 1 offset; we map float_dir_idx into
+ # the sorted layer list, clamped to valid range.
+ float_dir_idx = max(0.0, min(float_dir_idx, n_layers_with_dirs - 1))
lo = int(float_dir_idx)
- hi = min(lo + 1, n_dirs - 1)
+ hi = min(lo + 1, n_layers_with_dirs - 1)
+
+ lo_layer = sorted_layers[lo]
+ hi_layer = sorted_layers[hi]
+
+ d_lo = pipeline.refusal_directions[lo_layer]
+ d_hi = pipeline.refusal_directions[hi_layer]
if lo == hi:
- d = subspace[lo]
+ d = d_lo
else:
+ # Linear interpolation between adjacent layers' directions
alpha = float_dir_idx - lo
- d = (1.0 - alpha) * subspace[lo] + alpha * subspace[hi]
+ d = (1.0 - alpha) * d_lo + alpha * d_hi
norm = d.norm()
if norm > 1e-8:
@@ -342,9 +368,14 @@ def run_bayesian_optimization(
for live_data, saved_clone in original_params: # noqa: F821
live_data.copy_(saved_clone.to(live_data.device))
- # Warm-start values for the parametric kernel
- # Estimate peak position from strongest layer
- if pipeline._strong_layers:
+ # Warm-start values for the parametric kernel.
+ # If the informed pipeline provided analysis-derived warm-start params,
+ # use those (they're much better than the default heuristic).
+ informed_warm = getattr(pipeline, "_informed_warm_start", None)
+ if informed_warm:
+ warm_peak = informed_warm.get("peak_position", 0.5)
+ pipeline.log(f" Using analysis-informed warm-start (peak={warm_peak:.2f})")
+ elif pipeline._strong_layers:
peak_layer = pipeline._strong_layers[0]
warm_peak = peak_layer / max(n_total_layers - 1, 1)
else:
@@ -356,56 +387,56 @@ def run_bayesian_optimization(
# Suppress Optuna's verbose logging
optuna.logging.set_verbosity(optuna.logging.WARNING)
- # Max SVD directions available (for float direction interpolation)
- max_n_dirs = max(
- (pipeline.refusal_subspaces[idx].shape[0]
- for idx in pipeline._strong_layers
- if idx in pipeline.refusal_subspaces),
- default=1,
- )
+ # Max layers with directions (for float direction interpolation)
+ n_layers_with_dirs = len([
+ idx for idx in pipeline._strong_layers
+ if idx in pipeline.refusal_directions
+ ])
# ── Phase 1: Parametric kernel optimization (compact search space) ──
+ # Heretic uses SEPARATE kernel parameters for attention and MLP,
+ # allowing them to peak at different layers (8 params + 1 dir_idx = 9).
def objective(trial: optuna.Trial) -> tuple[float, float]:
"""Multi-objective: minimize (refusal_rate, kl_divergence)."""
_restore_all()
- # Parametric kernel: 4 params describe the entire layer weighting
- max_weight = trial.suggest_float("max_weight", 0.5, 1.0)
- peak_position = trial.suggest_float("peak_position", 0.1, 0.9)
- min_weight = trial.suggest_float("min_weight", 0.0, 0.3)
- spread = trial.suggest_float("spread", 0.1, 0.6)
+ # Attention kernel: 4 params
+ attn_max = trial.suggest_float("attn_max_weight", 0.5, 1.0)
+ attn_peak = trial.suggest_float("attn_peak_position", 0.1, 0.9)
+ attn_min = trial.suggest_float("attn_min_weight", 0.0, 0.3)
+ attn_spread = trial.suggest_float("attn_spread", 0.1, 0.6)
- # Component-specific scaling (Heretic insight: MLP more damaging)
- attn_scale = trial.suggest_float("attn_scale", 0.5, 1.0)
- mlp_scale = trial.suggest_float("mlp_scale", 0.3, 1.0)
+ # MLP kernel: 4 params (separate — can peak at a different layer)
+ mlp_max = trial.suggest_float("mlp_max_weight", 0.3, 1.0)
+ mlp_peak = trial.suggest_float("mlp_peak_position", 0.1, 0.9)
+ mlp_min = trial.suggest_float("mlp_min_weight", 0.0, 0.3)
+ mlp_spread = trial.suggest_float("mlp_spread", 0.1, 0.6)
- # Float direction index (continuous interpolation between SVD dirs)
- dir_idx = trial.suggest_float("dir_idx", 0.0, max(max_n_dirs - 1, 0.0))
+ # Float direction index (cross-layer interpolation, Heretic-style)
+ dir_idx = trial.suggest_float("dir_idx", 0.0, max(n_layers_with_dirs - 1, 0.0))
- # Compute per-layer regularization from parametric kernel
- layer_regs: dict[int, float] = {}
+ # Compute per-layer, per-component regularization from kernels
+ attn_regs: dict[int, float] = {}
+ mlp_regs: dict[int, float] = {}
for idx in pipeline._strong_layers:
- weight = _parametric_layer_weight(
- idx, n_total_layers, max_weight, peak_position, min_weight, spread,
- )
- # Convert weight to regularization (weight=1 → reg=0, weight=0 → reg=1)
- layer_regs[idx] = 1.0 - weight
+ attn_w = _parametric_layer_weight(idx, n_total_layers, attn_max, attn_peak, attn_min, attn_spread)
+ mlp_w = _parametric_layer_weight(idx, n_total_layers, mlp_max, mlp_peak, mlp_min, mlp_spread)
+ attn_regs[idx] = 1.0 - attn_w
+ mlp_regs[idx] = 1.0 - mlp_w
# Apply projection with trial's parameters
for idx in pipeline._strong_layers:
- if idx not in pipeline.refusal_subspaces:
+ if idx not in pipeline.refusal_directions:
continue
- # Use interpolated direction
+ # Use cross-layer interpolated direction
direction = _interpolate_direction(pipeline, idx, dir_idx)
d_col = direction.to(device=next(layer_modules[idx].parameters()).device)
d_col = d_col.unsqueeze(-1) if d_col.dim() == 1 else d_col
- reg = layer_regs[idx]
-
- # Attention projection (with attn_scale)
- attn_reg = 1.0 - (1.0 - reg) * attn_scale
+ # Attention projection (with per-component kernel)
+ attn_reg = attn_regs[idx]
try:
attn = get_attention_module(layer_modules[idx], arch)
pipeline._project_out_advanced(
@@ -416,8 +447,8 @@ def run_bayesian_optimization(
except (AttributeError, RuntimeError):
pass
- # MLP/FFN projection (with mlp_scale)
- mlp_reg = 1.0 - (1.0 - reg) * mlp_scale
+ # MLP/FFN projection (with per-component kernel)
+ mlp_reg = mlp_regs[idx]
try:
ffn = get_ffn_module(layer_modules[idx], arch)
count = pipeline._project_out_advanced(
@@ -439,18 +470,20 @@ def run_bayesian_optimization(
refusal = _measure_refusal_rate(pipeline, n_prompts=n_refusal_prompts)
kl = _measure_kl_divergence(pipeline, reference_logits, kl_prompts)
- # Track best combined score
+ # Track best combined score (use average of attn/mlp regs for layer_regs)
nonlocal best_score, best_result
combined = refusal + 0.5 * kl
if combined < best_score:
best_score = combined
- best_result = dict(layer_regs)
+ best_result = {
+ idx: (attn_regs[idx] + mlp_regs[idx]) / 2.0
+ for idx in pipeline._strong_layers
+ }
pipeline.log(
f" Trial {trial.number + 1}/{n_trials}: "
f"refusal={refusal:.0%}, KL={kl:.4f} "
- f"(peak={peak_position:.2f}, spread={spread:.2f}, "
- f"attn={attn_scale:.2f}, mlp={mlp_scale:.2f}, dir={dir_idx:.2f})"
+ f"(attn_peak={attn_peak:.2f}, mlp_peak={mlp_peak:.2f}, dir={dir_idx:.2f})"
)
return refusal, kl
@@ -462,16 +495,33 @@ def run_bayesian_optimization(
study_name="obliteratus_parametric_optimization",
)
- # Enqueue warm-start trial with analysis-derived estimates
- warm_params = {
- "max_weight": 0.9,
- "peak_position": warm_peak,
- "min_weight": 0.05,
- "spread": 0.3,
- "attn_scale": 0.8,
- "mlp_scale": 0.6,
- "dir_idx": 0.0,
- }
+ # Enqueue warm-start trial with analysis-derived estimates.
+ # Translate informed pipeline params to the new per-component format.
+ if informed_warm:
+ iw = informed_warm
+ warm_params = {
+ "attn_max_weight": iw.get("max_weight", 0.9),
+ "attn_peak_position": iw.get("peak_position", warm_peak),
+ "attn_min_weight": iw.get("min_weight", 0.05),
+ "attn_spread": iw.get("spread", 0.3),
+ "mlp_max_weight": iw.get("max_weight", 0.9) * iw.get("mlp_scale", 0.6),
+ "mlp_peak_position": iw.get("peak_position", warm_peak),
+ "mlp_min_weight": iw.get("min_weight", 0.05),
+ "mlp_spread": iw.get("spread", 0.3),
+ "dir_idx": iw.get("dir_idx", 0.0),
+ }
+ else:
+ warm_params = {
+ "attn_max_weight": 0.9,
+ "attn_peak_position": warm_peak,
+ "attn_min_weight": 0.05,
+ "attn_spread": 0.3,
+ "mlp_max_weight": 0.6,
+ "mlp_peak_position": warm_peak,
+ "mlp_min_weight": 0.05,
+ "mlp_spread": 0.3,
+ "dir_idx": 0.0,
+ }
study.enqueue_trial(warm_params)
pipeline.log(f"Bayesian optimization: running {n_trials} trials (parametric kernel)...")
@@ -490,25 +540,32 @@ def run_bayesian_optimization(
p = best_trial.params
best_result = {}
for idx in pipeline._strong_layers:
- weight = _parametric_layer_weight(
+ attn_w = _parametric_layer_weight(
idx, n_total_layers,
- p["max_weight"], p["peak_position"],
- p["min_weight"], p["spread"],
+ p["attn_max_weight"], p["attn_peak_position"],
+ p["attn_min_weight"], p["attn_spread"],
)
- best_result[idx] = 1.0 - weight
+ mlp_w = _parametric_layer_weight(
+ idx, n_total_layers,
+ p["mlp_max_weight"], p["mlp_peak_position"],
+ p["mlp_min_weight"], p["mlp_spread"],
+ )
+ best_result[idx] = (attn_w + mlp_w) / 2.0 # average for layer-level reg
+ best_result[idx] = 1.0 - best_result[idx]
pipeline.log(
f" Best trial: refusal={best_trial.values[0]:.0%}, "
f"KL={best_trial.values[1]:.4f}"
)
pipeline.log(
- f" Kernel: peak={p['peak_position']:.2f}, spread={p['spread']:.2f}, "
- f"max={p['max_weight']:.2f}, min={p['min_weight']:.2f}"
+ f" Attn kernel: peak={p['attn_peak_position']:.2f}, "
+ f"spread={p['attn_spread']:.2f}, max={p['attn_max_weight']:.2f}"
)
pipeline.log(
- f" Components: attn={p['attn_scale']:.2f}, mlp={p['mlp_scale']:.2f}, "
- f"dir_idx={p['dir_idx']:.2f}"
+ f" MLP kernel: peak={p['mlp_peak_position']:.2f}, "
+ f"spread={p['mlp_spread']:.2f}, max={p['mlp_max_weight']:.2f}"
)
+ pipeline.log(f" dir_idx={p['dir_idx']:.2f}")
# Store the best direction index for use during EXCISE
best_dir_idx = p.get("dir_idx", 0.0)
@@ -518,9 +575,9 @@ def run_bayesian_optimization(
new_dir = _interpolate_direction(pipeline, idx, best_dir_idx)
pipeline.refusal_directions[idx] = new_dir
- # Store component scales for use in EXCISE
- pipeline._bayesian_attn_scale = p.get("attn_scale", 1.0)
- pipeline._bayesian_mlp_scale = p.get("mlp_scale", 1.0)
+ # Store component scales for use in EXCISE (backward compat)
+ pipeline._bayesian_attn_scale = p.get("attn_max_weight", 1.0)
+ pipeline._bayesian_mlp_scale = p.get("mlp_max_weight", 1.0)
elif best_result:
pipeline.log(f" Using best combined score: {best_score:.4f}")
diff --git a/obliteratus/cli.py b/obliteratus/cli.py
index 9fc61a0..e32fb25 100644
--- a/obliteratus/cli.py
+++ b/obliteratus/cli.py
@@ -109,7 +109,12 @@ def main(argv: list[str] | None = None):
],
help="Liberation method (default: advanced)",
)
- p.add_argument("--n-directions", type=int, default=None, help="Override: number of SVD directions to extract")
+ p.add_argument("--n-directions", type=int, default=None, help="Override: number of refusal directions to extract")
+ p.add_argument(
+ "--direction-method", type=str, default=None,
+ choices=["diff_means", "svd", "leace"],
+ help="Direction extraction method: diff_means (simple, robust), svd (multi-direction), leace (optimal erasure)",
+ )
p.add_argument("--regularization", type=float, default=None, help="Override: fraction to preserve (0.0-1.0)")
p.add_argument("--refinement-passes", type=int, default=None, help="Override: number of iterative passes")
p.add_argument(
@@ -591,6 +596,7 @@ def _cmd_abliterate(args):
dtype=args.dtype,
method=method,
n_directions=args.n_directions,
+ direction_method=getattr(args, "direction_method", None),
regularization=args.regularization,
refinement_passes=args.refinement_passes,
quantization=args.quantization,
diff --git a/obliteratus/evaluation/heretic_eval.py b/obliteratus/evaluation/heretic_eval.py
index c2d2fb6..01409d1 100644
--- a/obliteratus/evaluation/heretic_eval.py
+++ b/obliteratus/evaluation/heretic_eval.py
@@ -334,19 +334,20 @@ def _load_harmbench_classifier():
bnb_4bit_quant_type="nf4",
llm_int8_enable_fp32_cpu_offload=True,
)
- model = AutoModelForCausalLM.from_pretrained(
- model_id,
- quantization_config=bnb_cfg,
- device_map="auto",
- torch_dtype=torch.float16,
- )
+ load_kwargs = dict(quantization_config=bnb_cfg, torch_dtype=torch.float16)
+ if dev.supports_device_map_auto():
+ load_kwargs["device_map"] = "auto"
+ model = AutoModelForCausalLM.from_pretrained(model_id, **load_kwargs)
except Exception:
logger.info("4-bit quantization unavailable for classifier, loading in float16")
- model = AutoModelForCausalLM.from_pretrained(
- model_id,
- device_map="auto",
- torch_dtype=torch.float16,
- )
+ load_kwargs = dict(torch_dtype=torch.float16)
+ if dev.supports_device_map_auto():
+ load_kwargs["device_map"] = "auto"
+ model = AutoModelForCausalLM.from_pretrained(model_id, **load_kwargs)
+
+ # On MPS/CPU: move model to best available device
+ if not dev.supports_device_map_auto():
+ model = model.to(dev.get_device())
model.eval()
_HARMBENCH_CLASSIFIER = (model, tokenizer)
diff --git a/obliteratus/informed_pipeline.py b/obliteratus/informed_pipeline.py
index 6eafe49..1fb1ded 100644
--- a/obliteratus/informed_pipeline.py
+++ b/obliteratus/informed_pipeline.py
@@ -73,15 +73,17 @@ INFORMED_METHOD = {
"description": (
"Runs analysis modules between PROBE and DISTILL to auto-configure "
"direction extraction, layer selection, and projection strategy based "
- "on the model's actual refusal geometry."
+ "on the model's actual refusal geometry. Defaults to single diff-of-means "
+ "direction + Bayesian optimization (Heretic-style)."
),
- "n_directions": 4, # overridden by analysis
+ "n_directions": 1, # overridden by analysis
+ "direction_method": "diff_means", # overridden by analysis; "leace" also available
"norm_preserve": True,
"regularization": 0.0, # overridden by analysis
"refinement_passes": 2, # overridden by analysis
"project_biases": True,
"use_chat_template": True,
- "use_whitened_svd": True, # overridden by analysis
+ "use_whitened_svd": False, # overridden by analysis
"true_iterative_refinement": True,
}
@@ -126,7 +128,8 @@ class AnalysisInsights:
clean_layers: list[int] = field(default_factory=list)
# Derived configuration
- recommended_n_directions: int = 4
+ recommended_n_directions: int = 1
+ recommended_direction_method: str = "diff_means"
recommended_regularization: float = 0.0
recommended_refinement_passes: int = 2
recommended_layers: list[int] = field(default_factory=list)
@@ -217,12 +220,19 @@ class InformedAbliterationPipeline(AbliterationPipeline):
hub_token=hub_token,
hub_community_org=hub_community_org,
quantization=quantization,
- # Set informed defaults
+ # Set informed defaults: single direction + Bayesian opt
+ n_directions=1,
+ direction_method="diff_means",
norm_preserve=True,
project_biases=True,
use_chat_template=True,
- use_whitened_svd=True,
+ use_whitened_svd=False,
true_iterative_refinement=True,
+ use_kl_optimization=True,
+ float_layer_interpolation=True,
+ layer_adaptive_strength=True,
+ winsorize_activations=True,
+ winsorize_percentile=0.01,
)
self.method = "informed"
@@ -311,7 +321,11 @@ class InformedAbliterationPipeline(AbliterationPipeline):
if self._run_defense:
self._analyze_defense_robustness()
- # 5. Derive configuration from insights
+ # 5. Sparse Surgery Analysis (RSI computation)
+ if self._run_sparse:
+ self._analyze_sparsity()
+
+ # 6. Derive configuration from insights
self._derive_configuration()
elapsed = time.time() - t0
@@ -392,6 +406,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
sample_layers = candidate_layers[::step]
polyhedral_count = 0
+ all_results = []
best_cone_result = None
best_strength = 0.0
@@ -405,34 +420,43 @@ class InformedAbliterationPipeline(AbliterationPipeline):
layer_idx=layer_idx,
)
+ all_results.append(result)
if result.is_polyhedral:
polyhedral_count += 1
- # Track the strongest layer's cone analysis
+ # Track the strongest layer's cone analysis for per-category directions
general_strength = result.general_direction.norm().item() if result.general_direction.numel() > 1 else 0
if general_strength > best_strength:
best_strength = general_strength
best_cone_result = result
- if best_cone_result is not None:
- self._insights.cone_is_polyhedral = best_cone_result.is_polyhedral
- self._insights.cone_dimensionality = best_cone_result.cone_dimensionality
- self._insights.mean_pairwise_cosine = best_cone_result.mean_pairwise_cosine
+ if all_results:
+ # Aggregate cone geometry across sampled layers (majority vote +
+ # mean dimensionality) instead of relying on a single layer.
+ n_sampled = len(all_results)
+ is_polyhedral = polyhedral_count > n_sampled / 2
+ avg_dimensionality = sum(r.cone_dimensionality for r in all_results) / n_sampled
+ avg_pairwise_cos = sum(r.mean_pairwise_cosine for r in all_results) / n_sampled
- # Store per-category directions for category-aware excision
- for cd in best_cone_result.category_directions:
- self._insights.per_category_directions[cd.category] = cd.direction
- self._insights.direction_specificity[cd.category] = cd.specificity
+ self._insights.cone_is_polyhedral = is_polyhedral
+ self._insights.cone_dimensionality = avg_dimensionality
+ self._insights.mean_pairwise_cosine = avg_pairwise_cos
- cone_type = "POLYHEDRAL" if best_cone_result.is_polyhedral else "LINEAR"
- self.log(f" Cone type: {cone_type}")
- self.log(f" Dimensionality: {best_cone_result.cone_dimensionality:.2f}")
- self.log(f" Mean pairwise cosine: {best_cone_result.mean_pairwise_cosine:.3f}")
- self.log(f" Categories detected: {best_cone_result.category_count}")
- self.log(f" Polyhedral at {polyhedral_count}/{len(sample_layers)} sampled layers")
+ # Store per-category directions from the strongest layer
+ if best_cone_result is not None:
+ for cd in best_cone_result.category_directions:
+ self._insights.per_category_directions[cd.category] = cd.direction
+ self._insights.direction_specificity[cd.category] = cd.specificity
- for cd in sorted(best_cone_result.category_directions, key=lambda x: -x.strength)[:5]:
- self.log(f" {cd.category:15s} DSI={cd.specificity:.3f} str={cd.strength:.3f}")
+ cone_type = "POLYHEDRAL" if is_polyhedral else "LINEAR"
+ self.log(f" Cone type: {cone_type} (majority vote: {polyhedral_count}/{n_sampled} layers)")
+ self.log(f" Avg dimensionality: {avg_dimensionality:.2f}")
+ self.log(f" Avg pairwise cosine: {avg_pairwise_cos:.3f}")
+ if best_cone_result is not None:
+ self.log(f" Categories detected: {best_cone_result.category_count}")
+
+ for cd in sorted(best_cone_result.category_directions, key=lambda x: -x.strength)[:5]:
+ self.log(f" {cd.category:15s} DSI={cd.specificity:.3f} str={cd.strength:.3f}")
else:
self.log(" No cone results — using default linear assumption")
@@ -517,6 +541,71 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log(f" Most entangled layers: {emap.most_entangled_layers}")
self.log(f" Cleanest layers: {emap.least_entangled_layers}")
+ def _analyze_sparsity(self):
+ """Compute Refusal Sparsity Index to decide sparse vs dense excision."""
+ self.log("\n[5/5] Refusal Sparsity Analysis")
+ self.log("-" * 40)
+
+ from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon
+ from obliteratus.strategies.utils import (
+ get_ffn_module,
+ get_layer_modules,
+ )
+
+ # Need refusal directions — use quick diff-in-means
+ quick_directions = {}
+ for idx in sorted(self._harmful_means.keys()):
+ diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze()
+ norm = diff.norm().item()
+ if norm > 1e-10:
+ quick_directions[idx] = diff / diff.norm()
+
+ if not quick_directions:
+ self.log(" No refusal directions — skipping sparsity analysis")
+ return
+
+ # Gather FFN output weights for representative layers (sample for speed)
+ layers = get_layer_modules(self.handle)
+ arch = self.handle.architecture
+ n_layers = len(layers)
+ sample_idxs = sorted(quick_directions.keys())
+ step = max(1, len(sample_idxs) // 8)
+ sample_idxs = sample_idxs[::step]
+
+ weights = {}
+ sampled_dirs = {}
+ for idx in sample_idxs:
+ if idx >= n_layers:
+ continue
+ try:
+ ffn = get_ffn_module(layers[idx], arch)
+ for name in ["down_proj", "c_proj", "dense_4h_to_h", "fc_out", "fc2", "w2"]:
+ proj = getattr(ffn, name, None)
+ if proj is not None and hasattr(proj, "weight"):
+ W = proj.weight.data
+ d = quick_directions[idx]
+ if W.shape[-1] == d.shape[0]:
+ weights[idx] = W
+ sampled_dirs[idx] = d
+ break
+ except (AttributeError, RuntimeError):
+ continue
+
+ if not weights:
+ self.log(" Could not access FFN weights — skipping sparsity analysis")
+ return
+
+ surgeon = SparseDirectionSurgeon(auto_sparsity=True)
+ plan = surgeon.plan_surgery(weights, sampled_dirs)
+
+ self._insights.mean_refusal_sparsity_index = plan.mean_refusal_sparsity_index
+ self._insights.recommended_sparsity = plan.recommended_sparsity
+
+ self.log(f" Mean RSI: {plan.mean_refusal_sparsity_index:.3f}")
+ self.log(f" Recommended sparsity: {plan.recommended_sparsity:.1%}")
+ self.log(f" Most sparse layer: {plan.most_sparse_layer}")
+ self.log(f" Most dense layer: {plan.most_dense_layer}")
+
# ── Configuration Derivation ─────────────────────────────────────
def _derive_configuration(self):
@@ -528,18 +617,32 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log("-" * 50)
insights = self._insights
- # 1. n_directions: based on cone geometry
- if insights.cone_is_polyhedral:
- # Polyhedral cone → need more directions to capture all facets
+ # 1. n_directions + direction_method: based on cone geometry
+ # Default: single direction via diff-of-means (proven most robust).
+ # Only escalate to multi-direction when analysis confirms polyhedral geometry.
+ if insights.cone_is_polyhedral and insights.cone_dimensionality > 2.0:
+ # Clearly polyhedral cone → use multiple directions via SVD
n_dirs = max(4, min(8, int(insights.cone_dimensionality * 2)))
+ self.direction_method = "svd"
+ self.use_whitened_svd = True
self.log(f" Polyhedral cone (dim={insights.cone_dimensionality:.1f}) "
- f"→ n_directions={n_dirs}")
+ f"→ n_directions={n_dirs}, method=svd (whitened)")
+ elif insights.cone_is_polyhedral:
+ # Mildly polyhedral → LEACE gives better single-direction erasure
+ n_dirs = 1
+ self.direction_method = "leace"
+ self.use_whitened_svd = False
+ self.log(f" Mildly polyhedral (dim={insights.cone_dimensionality:.1f}) "
+ f"→ n_directions=1, method=leace")
else:
- # Linear cone → fewer directions suffice
- n_dirs = max(1, min(4, int(insights.cone_dimensionality + 1)))
+ # Linear cone → single direction via diff-of-means (simplest, most robust)
+ n_dirs = 1
+ self.direction_method = "diff_means"
+ self.use_whitened_svd = False
self.log(f" Linear cone (dim={insights.cone_dimensionality:.1f}) "
- f"→ n_directions={n_dirs}")
+ f"→ n_directions=1, method=diff_means")
insights.recommended_n_directions = n_dirs
+ insights.recommended_direction_method = self.direction_method
self.n_directions = n_dirs
# 2. regularization: based on alignment method + entanglement
@@ -586,15 +689,22 @@ class InformedAbliterationPipeline(AbliterationPipeline):
# 4. Layer selection: cluster-aware + entanglement-gated
if insights.cluster_representative_layers:
- # Start from cluster representatives
+ # Start from cluster representatives (strongest per cluster)
base_layers = list(insights.cluster_representative_layers)
- # Expand: add all layers from clusters that have strong signals
- all_cluster_layers = []
+ # Conservative expansion: for each cluster, add at most the top-2
+ # strongest layers (by refusal norm) beyond the representative,
+ # to avoid over-modifying weak layers in large clusters.
+ norms = {}
+ for idx in self._harmful_means:
+ if idx in self._harmless_means:
+ norms[idx] = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze().norm().item()
for cluster in insights.direction_clusters:
- all_cluster_layers.extend(cluster)
- if all_cluster_layers:
- base_layers = sorted(set(all_cluster_layers))
+ ranked = sorted(cluster, key=lambda ly: norms.get(ly, 0), reverse=True)
+ # Add up to 2 additional strong layers per cluster
+ for ly in ranked[:3]: # representative + up to 2 more
+ base_layers.append(ly)
+ base_layers = sorted(set(base_layers))
# Gate: remove highly entangled layers
skip = set()
@@ -621,13 +731,9 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log(f" RSI={insights.mean_refusal_sparsity_index:.2f} "
f"→ standard dense projection")
- # 6. Whitened SVD: always use for multi-direction, skip for single
- if n_dirs > 1:
- self.use_whitened_svd = True
- self.log(f" Multi-direction ({n_dirs}) → whitened SVD enabled")
- else:
- self.use_whitened_svd = False
- self.log(" Single direction → standard diff-in-means")
+ # 6. Direction method summary (already set in step 1)
+ self.log(f" Direction method: {self.direction_method} "
+ f"(whitened_svd={'on' if self.use_whitened_svd else 'off'})")
# ── Informed DISTILL ─────────────────────────────────────────────
@@ -650,7 +756,38 @@ class InformedAbliterationPipeline(AbliterationPipeline):
n_layers = len(self._harmful_means)
norms: dict[int, float] = {}
- if self.use_whitened_svd and self.n_directions > 1:
+ # ── Small-model direction cap (matching base _distill) ────────
+ # On small models, each SVD direction removes a proportionally
+ # larger fraction of weight energy. Cap to prevent over-ablation.
+ hidden_size = self.handle.hidden_size if self.handle else 0
+ total_params = getattr(self.handle, 'total_params', 0) if self.handle else 0
+ if total_params == 0 and self.handle:
+ try:
+ total_params = sum(p.numel() for p in self.handle.model.parameters())
+ except Exception:
+ pass
+ if self.n_directions > 1 and (
+ (0 < hidden_size < 2048)
+ or (0 < total_params < 2_000_000_000)
+ or n_layers <= 16
+ ):
+ max_dirs = max(1, min(self.n_directions, 2))
+ if max_dirs < self.n_directions:
+ self.log(
+ f"Capped n_directions from {self.n_directions} to {max_dirs} "
+ f"for small model (hidden={hidden_size}, "
+ f"params={total_params / 1e9:.1f}B, layers={n_layers})"
+ )
+ self.n_directions = max_dirs
+
+ # LEACE extractor for optimal concept erasure
+ leace_extractor = None
+ if self.direction_method == "leace":
+ from obliteratus.analysis.leace import LEACEExtractor
+ leace_extractor = LEACEExtractor()
+ self.log(f"Using LEACE (closed-form optimal concept erasure)")
+
+ if self.use_whitened_svd and self.n_directions > 1 and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
self.log(f"Using whitened SVD with {self.n_directions} directions")
@@ -658,6 +795,29 @@ class InformedAbliterationPipeline(AbliterationPipeline):
whitened_extractor = None
for idx in range(n_layers):
+ # LEACE path: theoretically optimal single-direction erasure
+ if leace_extractor is not None:
+ if idx in self._harmful_acts and idx in self._harmless_acts:
+ try:
+ l_result = leace_extractor.extract(
+ self._harmful_acts[idx],
+ self._harmless_acts[idx],
+ layer_idx=idx,
+ )
+ self.refusal_directions[idx] = l_result.direction
+ self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
+ norms[idx] = l_result.generalized_eigenvalue
+
+ if idx < 5 or idx == n_layers - 1:
+ self.log(
+ f" layer {idx}: LEACE eigenvalue={l_result.generalized_eigenvalue:.4f}, "
+ f"erasure_loss={l_result.erasure_loss:.4f}"
+ )
+ continue
+ except Exception as e:
+ if idx < 5:
+ self.log(f" layer {idx}: LEACE failed ({e}), falling back")
+
if self.n_directions == 1:
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
norm = diff.norm().item()
@@ -691,6 +851,41 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.refusal_directions[idx] = primary / primary.norm()
norms[idx] = S[:k].sum().item()
+ # Enrich subspaces with per-category cone directions when available.
+ # This uses the actual refusal cone generators instead of purely
+ # data-agnostic SVD components.
+ cat_dirs = self._insights.per_category_directions
+ if cat_dirs and self._insights.cone_is_polyhedral and self.n_directions > 1:
+ cat_tensors = list(cat_dirs.values())
+ # Stack and orthogonalize category directions
+ cat_stack = torch.stack(cat_tensors) # (n_cats, hidden)
+ cat_norms = cat_stack.norm(dim=1, keepdim=True).clamp(min=1e-8)
+ cat_stack = cat_stack / cat_norms
+ # Blend into strong-signal layers: replace later SVD components
+ # with category directions (which are geometrically meaningful)
+ n_cat = cat_stack.shape[0]
+ for idx in norms:
+ sub = self.refusal_subspaces.get(idx)
+ if sub is None or sub.shape[0] <= 1:
+ continue
+ # Keep the first SVD direction (strongest), replace remaining
+ # with category directions projected to be orthogonal to it
+ primary = sub[0:1] # (1, hidden)
+ # Project category directions orthogonal to primary
+ cos = (cat_stack @ primary.squeeze(0)) # (n_cat,)
+ ortho_cats = cat_stack - cos.unsqueeze(1) * primary
+ ortho_norms = ortho_cats.norm(dim=1)
+ # Keep only directions that survived orthogonalization
+ valid = ortho_norms > 0.1
+ if valid.sum() > 0:
+ ortho_cats = ortho_cats[valid]
+ ortho_cats = ortho_cats / ortho_cats.norm(dim=1, keepdim=True)
+ # Take up to (n_directions - 1) category directions
+ n_take = min(self.n_directions - 1, ortho_cats.shape[0])
+ new_sub = torch.cat([primary, ortho_cats[:n_take]], dim=0)
+ self.refusal_subspaces[idx] = new_sub
+ self.log(f"Enriched subspaces with {n_cat} per-category cone directions")
+
# Layer selection: use analysis-recommended layers if available,
# otherwise fall back to knee detection
if self._insights.recommended_layers:
@@ -728,15 +923,117 @@ class InformedAbliterationPipeline(AbliterationPipeline):
def _excise_informed(self):
"""Excise refusal directions with analysis-informed strategy.
- Uses sparse surgery if analysis recommends it, otherwise falls
- back to the standard projection with analysis-tuned parameters.
+ Uses Bayesian optimization (when available) with analysis-derived
+ warm-start parameters, falling back to sparse surgery or standard
+ projection. This is the key integration: analysis maps the geometry,
+ Bayesian optimization finds the optimal projection strength.
"""
if self._insights.use_sparse_surgery:
self._excise_sparse()
+ return
+
+ # Enable Bayesian optimization using analysis insights for warm-start.
+ # The analysis provides much better initial parameters than the default
+ # heuristic (strongest-layer-based peak), dramatically narrowing the
+ # search space and improving convergence.
+ self._configure_bayesian_warm_start()
+ self._excise()
+
+ def _configure_bayesian_warm_start(self):
+ """Configure Bayesian optimization with analysis-derived warm-start.
+
+ Translates analysis insights into a much tighter search space:
+ - peak_position from cluster representative layers
+ - spread from cluster structure (narrow clusters → narrow spread)
+ - component scaling from entanglement analysis
+ - KL budget from alignment method detection
+ """
+ insights = self._insights
+
+ # Enable Bayesian optimization (50 trials default, same as heretic)
+ self._bayesian_trials = 50
+
+ # Also set heretic-compatible flags on the pipeline so the base
+ # _excise_inner() picks them up during Bayesian optimization.
+ self.layer_adaptive_strength = True
+ self.float_layer_interpolation = True
+ self.use_kl_optimization = True
+
+ # KL budget: tighter for methods that are fragile (CAI, RLHF),
+ # looser for concentrated methods (DPO, SFT).
+ method = insights.detected_alignment_method
+ if method == "dpo":
+ self.kl_budget = 0.5
+ elif method == "rlhf":
+ self.kl_budget = 0.3
+ elif method == "cai":
+ self.kl_budget = 0.2
+ elif method == "sft":
+ self.kl_budget = 0.4
else:
- # Standard excision with analysis-tuned parameters
- # (regularization, norm_preserve, etc. already configured)
- self._excise()
+ self.kl_budget = 0.35
+
+ self.log(f"Bayesian optimization enabled (50 trials, KL budget={self.kl_budget})")
+ self.log("Analysis insights will warm-start the optimizer")
+
+ # Compute analysis-derived warm-start for the parametric kernel.
+ # The Bayesian optimizer reads these from the pipeline if present.
+ n_layers = len(self._harmful_means) if self._harmful_means else 32
+ if insights.cluster_representative_layers and n_layers > 1:
+ # Peak position: normalized position of the strongest cluster rep
+ norms = {}
+ for idx in self._harmful_means:
+ if idx in self._harmless_means:
+ norms[idx] = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze().norm().item()
+ reps = insights.cluster_representative_layers
+ if norms:
+ best_rep = max(reps, key=lambda ly: norms.get(ly, 0))
+ else:
+ best_rep = reps[len(reps) // 2]
+ warm_peak = best_rep / max(n_layers - 1, 1)
+
+ # Spread: narrow if clusters are tight, wide if clusters span many layers
+ if insights.direction_clusters:
+ cluster_widths = [
+ (max(c) - min(c)) / max(n_layers - 1, 1)
+ for c in insights.direction_clusters if len(c) > 1
+ ]
+ warm_spread = max(0.1, min(0.6, sum(cluster_widths) / len(cluster_widths) if cluster_widths else 0.3))
+ else:
+ warm_spread = 0.3
+
+ # Min weight: higher if high persistence (refusal spread across all layers)
+ warm_min = min(0.3, max(0.0, insights.direction_persistence * 0.2))
+
+ # Attn/MLP scaling: reduce MLP scaling if entanglement is high
+ # (MLP projections cause more capability damage)
+ if insights.entanglement_score > 0.5:
+ warm_mlp = 0.4
+ warm_attn = 0.7
+ else:
+ warm_mlp = 0.6
+ warm_attn = 0.8
+ else:
+ warm_peak = 0.5
+ warm_spread = 0.3
+ warm_min = 0.05
+ warm_mlp = 0.6
+ warm_attn = 0.8
+
+ # Store warm-start params for the Bayesian optimizer to pick up
+ self._informed_warm_start = {
+ "max_weight": 0.9,
+ "peak_position": warm_peak,
+ "min_weight": warm_min,
+ "spread": warm_spread,
+ "attn_scale": warm_attn,
+ "mlp_scale": warm_mlp,
+ "dir_idx": 0.0,
+ }
+ self.log(
+ f" Warm-start: peak={warm_peak:.2f}, spread={warm_spread:.2f}, "
+ f"min={warm_min:.2f}, attn={warm_attn:.2f}, mlp={warm_mlp:.2f}"
+ )
def _excise_sparse(self):
"""Sparse direction surgery — only modifies high-projection rows."""
@@ -825,14 +1122,22 @@ class InformedAbliterationPipeline(AbliterationPipeline):
1. Residual refusal signal (via activation probing)
2. Self-repair / Ouroboros effect (via defense robustness)
3. Triggers additional targeted passes at compensating layers
+
+ KL-gated: stops early if model damage (KL divergence) is getting
+ worse even though refusal persists. This prevents the death spiral
+ where each pass damages the model without removing refusal.
"""
# Run standard verification first
self._verify()
# Check if Ouroboros compensation is needed
refusal_rate = self._quality_metrics.get("refusal_rate", 0.0)
+ prev_kl = self._quality_metrics.get("kl_divergence", 0.0)
ouroboros_pass = 0
+ # KL budget: stop if KL exceeds this threshold (model too damaged)
+ kl_ceiling = getattr(self, "kl_budget", 0.5) * 2.0 # 2x budget as hard ceiling
+
while (refusal_rate > self._ouroboros_threshold
and ouroboros_pass < self._max_ouroboros_passes):
ouroboros_pass += 1
@@ -849,9 +1154,9 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self._distill_inner()
self.log(f"Found {len(self._strong_layers)} layers with residual refusal")
- # Re-excise at the new strong layers
+ # Re-excise at the new strong layers using informed strategy
if self._strong_layers:
- self._excise()
+ self._excise_informed()
else:
self.log("No strong layers found — stopping Ouroboros compensation")
break
@@ -859,7 +1164,24 @@ class InformedAbliterationPipeline(AbliterationPipeline):
# Re-verify
self._verify()
refusal_rate = self._quality_metrics.get("refusal_rate", 0.0)
- self.log(f"After Ouroboros pass {ouroboros_pass}: refusal rate = {refusal_rate:.0%}")
+ current_kl = self._quality_metrics.get("kl_divergence", 0.0)
+ self.log(f"After Ouroboros pass {ouroboros_pass}: refusal={refusal_rate:.0%}, KL={current_kl:.4f}")
+
+ # KL-gated early stopping: if KL is rising and exceeds ceiling,
+ # the model is being damaged faster than refusal is being removed.
+ if current_kl > kl_ceiling:
+ self.log(
+ f"KL divergence {current_kl:.4f} exceeds ceiling {kl_ceiling:.4f} — "
+ f"stopping to prevent further model damage"
+ )
+ break
+ if ouroboros_pass > 1 and current_kl > prev_kl * 1.5 and refusal_rate > 0.3:
+ self.log(
+ f"KL rising sharply ({prev_kl:.4f} → {current_kl:.4f}) with "
+ f"refusal still at {refusal_rate:.0%} — stopping (diminishing returns)"
+ )
+ break
+ prev_kl = current_kl
self._report.ouroboros_passes = ouroboros_pass
self._report.final_refusal_rate = refusal_rate
@@ -903,6 +1225,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
},
"derived_config": {
"n_directions": insights.recommended_n_directions,
+ "direction_method": insights.recommended_direction_method,
"regularization": insights.recommended_regularization,
"refinement_passes": insights.recommended_refinement_passes,
"layers_used": insights.recommended_layers,
@@ -981,6 +1304,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
lines.append("Derived Configuration:")
lines.append(f" n_directions: {insights.recommended_n_directions}")
+ lines.append(f" direction_method: {insights.recommended_direction_method}")
lines.append(f" regularization: {insights.recommended_regularization}")
lines.append(f" refinement_passes: {insights.recommended_refinement_passes}")
lines.append(f" sparse surgery: {insights.use_sparse_surgery}")
diff --git a/obliteratus/tourney.py b/obliteratus/tourney.py
index c3a0ab8..f6ff88f 100644
--- a/obliteratus/tourney.py
+++ b/obliteratus/tourney.py
@@ -55,26 +55,45 @@ def composite_score(metrics: dict[str, Any]) -> float:
"""Score an abliteration run on [0, 1]. Higher is better.
Weights:
- 40% refusal removal — the whole point
- 30% coherence — model must still be useful
+ 35% refusal removal — the whole point
+ 25% coherence — model must still be useful
20% KL divergence — minimal capability damage
10% perplexity — fluency preservation
+ 5% spectral cert — formal completeness guarantee
+ 5% degenerate penalty — penalize broken output
"""
rr = metrics.get("refusal_rate")
co = metrics.get("coherence")
kl = metrics.get("kl_divergence")
pp = metrics.get("perplexity")
+ spec = metrics.get("spectral_certification")
+ degen = metrics.get("degenerate_count", 0) or 0
refusal_score = (1.0 - rr) if rr is not None else 0.0
coherence_score = co if co is not None else 0.0
kl_score = 1.0 / (1.0 + kl) if kl is not None else 0.5
ppl_score = 1.0 / (1.0 + pp / 100.0) if pp is not None else 0.5
+ # Spectral certification: GREEN=1.0, YELLOW=0.5, RED=0.0, None=0.5 (neutral)
+ if spec == "GREEN":
+ spec_score = 1.0
+ elif spec == "YELLOW":
+ spec_score = 0.5
+ elif spec == "RED":
+ spec_score = 0.0
+ else:
+ spec_score = 0.5 # not measured → neutral
+
+ # Degenerate penalty: any broken outputs reduce score
+ degen_score = 1.0 / (1.0 + degen) if degen > 0 else 1.0
+
return (
- refusal_score * 0.4
- + coherence_score * 0.3
- + kl_score * 0.2
- + ppl_score * 0.1
+ refusal_score * 0.35
+ + coherence_score * 0.25
+ + kl_score * 0.20
+ + ppl_score * 0.10
+ + spec_score * 0.05
+ + degen_score * 0.05
)
@@ -94,6 +113,8 @@ class Contender:
time_s: float = 0.0
error: str | None = None
round_eliminated: int = 0 # 0 = still alive / winner
+ direction_method: str = "" # which direction extraction was used
+ spectral_cert: str = "" # GREEN/YELLOW/RED/""
@dataclass
@@ -140,6 +161,8 @@ class TourneyResult:
"metrics": c.metrics,
"time_s": c.time_s,
"error": c.error,
+ "direction_method": c.direction_method,
+ "spectral_cert": c.spectral_cert,
}
for c in sorted(r.contenders, key=lambda x: x.score, reverse=True)
],
@@ -197,6 +220,8 @@ def _save_checkpoint(
"time_s": c.time_s,
"error": c.error,
"round_eliminated": c.round_eliminated,
+ "direction_method": c.direction_method,
+ "spectral_cert": c.spectral_cert,
}
for c in r.contenders
],
@@ -218,6 +243,8 @@ def _save_checkpoint(
"time_s": c.time_s,
"error": c.error,
"round_eliminated": c.round_eliminated,
+ "direction_method": c.direction_method,
+ "spectral_cert": c.spectral_cert,
}
for c in completed_methods
],
@@ -286,6 +313,8 @@ def _restore_rounds(checkpoint: dict) -> tuple[TourneyResult, list[Contender], l
time_s=c_data.get("time_s", 0.0),
error=c_data.get("error"),
round_eliminated=c_data.get("round_eliminated", 0),
+ direction_method=c_data.get("direction_method", ""),
+ spectral_cert=c_data.get("spectral_cert", ""),
))
result.rounds.append(rnd)
@@ -328,14 +357,14 @@ def render_bracket(result: TourneyResult) -> str:
lines.append(f"## Round {rnd.round_num}: {rnd.name}")
lines.append(f"*{len(rnd.contenders)} contenders, {rnd.prompt_volume} prompt pairs*")
lines.append("")
- lines.append("| Rank | Method | Score | Refusal | Coherence | KL Div | Perplexity | Time |")
- lines.append("|------|--------|-------|---------|-----------|--------|------------|------|")
+ lines.append("| Rank | Method | Dir | Score | Refusal | Coherence | KL Div | PPL | Cert | Time |")
+ lines.append("|------|--------|-----|-------|---------|-----------|--------|-----|------|------|")
sorted_contenders = sorted(rnd.contenders, key=lambda x: x.score, reverse=True)
for i, c in enumerate(sorted_contenders, 1):
if c.error:
lines.append(
- f"| {i} | {c.method} | ERROR | — | — | — | — | {c.time_s:.0f}s |"
+ f"| {i} | {c.method} | — | ERROR | — | — | — | — | — | {c.time_s:.0f}s |"
)
continue
m = c.metrics
@@ -351,9 +380,11 @@ def render_bracket(result: TourneyResult) -> str:
kl_val = m.get('kl_divergence')
kl_str = f"{kl_val:.4f}" if kl_val is not None else "—"
pp = f"{m.get('perplexity', 0):.1f}" if m.get('perplexity') is not None else "—"
+ dir_m = c.direction_method or m.get("direction_method", "—")
+ cert = c.spectral_cert or "—"
lines.append(
- f"| {i} | **{c.method}**{marker} | {c.score:.4f} "
- f"| {rr} | {co} | {kl_str} | {pp} | {c.time_s:.0f}s |"
+ f"| {i} | **{c.method}**{marker} | {dir_m} | {c.score:.4f} "
+ f"| {rr} | {co} | {kl_str} | {pp} | {cert} | {c.time_s:.0f}s |"
)
lines.append("")
@@ -572,9 +603,12 @@ def render_bracket_html(result: TourneyResult) -> str:
f'
🏆 '
f'
{html_mod.escape(w.method)}'
)
+ dir_m = w.direction_method or "—"
+ cert = w.spectral_cert or "—"
header_parts.append(
f'
'
- f'Score: {w.score:.4f} | Refusal: {rr} | Coherence: {co}'
+ f'Score: {w.score:.4f} | Refusal: {rr} | '
+ f'Coherence: {co} | Dir: {html_mod.escape(dir_m)} | Cert: {html_mod.escape(cert)}'
f'
'
)
header_parts.append("
")
@@ -632,6 +666,11 @@ def render_bracket_html(result: TourneyResult) -> str:
m = c.metrics or {}
metric_spans = []
if not c.error:
+ dm = c.direction_method or m.get("direction_method", "")
+ if dm:
+ metric_spans.append(
+ f'dir {html_mod.escape(dm)}'
+ )
rr = m.get("refusal_rate")
if rr is not None:
metric_spans.append(
@@ -642,6 +681,12 @@ def render_bracket_html(result: TourneyResult) -> str:
metric_spans.append(
f'coh {co:.3f}'
)
+ sc = c.spectral_cert or m.get("spectral_certification", "")
+ if sc:
+ cert_color = {"GREEN": "#4ecca3", "YELLOW": "#f0c040", "RED": "#cc4444"}.get(sc, "#777")
+ metric_spans.append(
+ f'cert {html_mod.escape(sc)}'
+ )
kl = m.get("kl_divergence")
if kl is not None:
metric_spans.append(
@@ -705,10 +750,12 @@ in elimination rounds.
| Metric | Value |
|--------|-------|
| Composite Score | **{w.score:.4f}** |
+| Direction Method | {w.direction_method or 'N/A'} |
| Refusal Rate | {f'{w.metrics["refusal_rate"]:.1%}' if w.metrics.get('refusal_rate') is not None else 'N/A'} |
| Coherence | {f'{w.metrics["coherence"]:.3f}' if w.metrics.get('coherence') is not None else 'N/A'} |
| KL Divergence | {f'{w.metrics["kl_divergence"]:.4f}' if w.metrics.get('kl_divergence') is not None else 'N/A'} |
| Perplexity | {f'{w.metrics["perplexity"]:.1f}' if w.metrics.get('perplexity') is not None else 'N/A'} |
+| Spectral Cert | {w.spectral_cert or 'N/A'} |
## How to Use
@@ -866,6 +913,8 @@ class TourneyRunner:
contender.metrics = dict(pipeline._quality_metrics)
contender.score = composite_score(contender.metrics)
contender.output_dir = save_dir
+ contender.direction_method = getattr(pipeline, "direction_method", "")
+ contender.spectral_cert = contender.metrics.get("spectral_certification", "") or ""
# Free pipeline to reclaim GPU
del pipeline
@@ -1048,6 +1097,11 @@ class TourneyRunner:
result.winner = winner
result.total_time_s = time.time() - t_start
+ # Clean up non-winner finalist dirs to free disk
+ for c in ranked[1:]:
+ if c.output_dir and Path(c.output_dir).exists():
+ shutil.rmtree(c.output_dir, ignore_errors=True)
+
self.log("")
self.log("=" * 60)
if winner:
@@ -1352,6 +1406,11 @@ class TourneyRunner:
result.winner = winner
result.total_time_s = time.time() - t_start
+ # Clean up non-winner finalist dirs to free disk
+ for c in ranked[1:]:
+ if c.output_dir and Path(c.output_dir).exists():
+ shutil.rmtree(c.output_dir, ignore_errors=True)
+
self.log("")
self.log("=" * 60)
if winner: