Add files via upload

This commit is contained in:
pliny
2026-03-08 12:07:56 -07:00
committed by GitHub
parent 1065809658
commit 69fa63ac43
14 changed files with 1320 additions and 274 deletions
+219 -43
View File
@@ -98,6 +98,51 @@ def _is_quota_error(exc: BaseException) -> bool:
return True
return False
def _load_model_to_device(
pretrained_path: str,
*,
torch_dtype=None,
trust_remote_code: bool = False,
quantization_config=None,
offload_folder: str | None = None,
low_cpu_mem_usage: bool = False,
token: str | None = None,
) -> AutoModelForCausalLM:
"""Load a causal LM onto the best available device, MPS-safe.
Accelerate's ``device_map="auto"`` is not supported on MPS — models
silently land on CPU. This helper skips ``device_map`` on non-CUDA
backends and explicitly moves the model to the best device after loading.
On CUDA the behaviour is identical to ``device_map="auto"``.
"""
kwargs: dict = {}
if torch_dtype is not None:
kwargs["torch_dtype"] = torch_dtype
if trust_remote_code:
kwargs["trust_remote_code"] = True
if quantization_config is not None:
kwargs["quantization_config"] = quantization_config
if offload_folder is not None:
kwargs["offload_folder"] = offload_folder
if low_cpu_mem_usage:
kwargs["low_cpu_mem_usage"] = True
if token is not None:
kwargs["token"] = token
if dev.supports_device_map_auto():
kwargs["device_map"] = "auto"
model = AutoModelForCausalLM.from_pretrained(pretrained_path, **kwargs)
# On MPS / CPU: model loaded without device_map, move to best device
if not dev.supports_device_map_auto():
target = dev.get_device()
model = model.to(target)
return model
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
@@ -164,7 +209,7 @@ def _recover_sessions_from_disk() -> None:
"""
global _last_obliterated_label, _obliterate_counter
found_any = False
for pattern in ("obliterated_*", "obliterated", "bench_*"):
for pattern in ("obliterated_*", "obliterated", "bench_*", "obliteratus_tourney/r*"):
for p in Path("/tmp").glob(pattern):
if not p.is_dir():
continue
@@ -291,6 +336,11 @@ METHODS = {
"optimized (bayesian auto-tuned)": "optimized",
"inverted (semantic refusal inversion)": "inverted",
"nuclear (maximum force combo)": "nuclear",
# Baseline reproductions for benchmarking
"failspy (FailSpy/abliterator baseline)": "failspy",
"gabliteration (Gülmez 2026 baseline)": "gabliteration",
"heretic (p-e-w 2025-2026 baseline)": "heretic",
"rdo (Wollschlager ICML 2025 baseline)": "rdo",
}
# ── Community Hub push ────────────────────────────────────────────────
@@ -316,6 +366,7 @@ def _get_preset_defaults(method_display: str):
cfg = _PRESET_CONFIGS.get(method_key, _PRESET_CONFIGS["advanced"])
return {
"n_directions": cfg.get("n_directions", 4),
"direction_method": cfg.get("direction_method", "svd"),
"regularization": cfg.get("regularization", 0.3),
"refinement_passes": cfg.get("refinement_passes", 2),
"norm_preserve": cfg.get("norm_preserve", True),
@@ -341,6 +392,17 @@ def _get_preset_defaults(method_display: str):
"spectral_cascade": cfg.get("spectral_cascade", False),
"spectral_bands": cfg.get("spectral_bands", 3),
"spectral_threshold": cfg.get("spectral_threshold", 0.05),
# Baseline-specific parameters
"layer_selection": cfg.get("layer_selection", "all"),
"winsorize_activations": cfg.get("winsorize_activations", False),
"winsorize_percentile": cfg.get("winsorize_percentile", 1.0),
"use_kl_optimization": cfg.get("use_kl_optimization", False),
"kl_budget": cfg.get("kl_budget", 0.5),
"float_layer_interpolation": cfg.get("float_layer_interpolation", False),
"rdo_refinement": cfg.get("rdo_refinement", False),
"cot_aware": cfg.get("cot_aware", False),
"bayesian_trials": cfg.get("bayesian_trials", 50),
"n_sae_features": cfg.get("n_sae_features", 64),
}
def _on_method_change(method_display: str):
@@ -348,6 +410,7 @@ def _on_method_change(method_display: str):
d = _get_preset_defaults(method_display)
return (
d["n_directions"],
d["direction_method"],
d["regularization"],
d["refinement_passes"],
d["reflection_strength"],
@@ -374,6 +437,16 @@ def _on_method_change(method_display: str):
d["expert_transplant"],
d["use_wasserstein_optimal"],
d["spectral_cascade"],
d["layer_selection"],
d["winsorize_activations"],
d["winsorize_percentile"],
d["use_kl_optimization"],
d["kl_budget"],
d["float_layer_interpolation"],
d["rdo_refinement"],
d["cot_aware"],
d["bayesian_trials"],
d["n_sae_features"],
)
def _on_dataset_change(dataset_label: str):
@@ -1731,8 +1804,9 @@ def _format_multi_model_results(results: list[dict], context: dict | None = None
def obliterate(model_choice: str, method_choice: str,
prompt_volume_choice: str, dataset_source_choice: str,
custom_harmful: str, custom_harmless: str,
# Advanced params (sliders)
adv_n_directions: int, adv_regularization: float,
# Advanced params (sliders + radio)
adv_n_directions: int, adv_direction_method: str,
adv_regularization: float,
adv_refinement_passes: int, adv_reflection_strength: float,
adv_embed_regularization: float, adv_steering_strength: float,
adv_transplant_blend: float,
@@ -1748,6 +1822,12 @@ def obliterate(model_choice: str, method_choice: str,
adv_project_embeddings: bool, adv_activation_steering: bool,
adv_expert_transplant: bool, adv_wasserstein_optimal: bool,
adv_spectral_cascade: bool,
adv_layer_selection: str, adv_winsorize: bool,
adv_winsorize_percentile: float,
adv_kl_optimization: bool, adv_kl_budget: float,
adv_float_layer_interp: bool, adv_rdo_refinement: bool,
adv_cot_aware: bool,
adv_bayesian_trials: int, adv_n_sae_features: int,
progress=gr.Progress()):
"""Run the full obliteration pipeline, streaming log updates to the UI.
@@ -1906,6 +1986,7 @@ def obliterate(model_choice: str, method_choice: str,
on_log=on_log,
# Advanced overrides from UI
n_directions=int(adv_n_directions),
direction_method=adv_direction_method,
regularization=float(adv_regularization),
refinement_passes=int(adv_refinement_passes),
norm_preserve=adv_norm_preserve,
@@ -1932,6 +2013,15 @@ def obliterate(model_choice: str, method_choice: str,
spectral_bands=int(adv_spectral_bands),
spectral_threshold=float(adv_spectral_threshold),
verify_sample_size=int(adv_verify_sample_size),
layer_selection=adv_layer_selection,
winsorize_activations=adv_winsorize,
winsorize_percentile=float(adv_winsorize_percentile),
use_kl_optimization=adv_kl_optimization,
kl_budget=float(adv_kl_budget),
float_layer_interpolation=adv_float_layer_interp,
rdo_refinement=adv_rdo_refinement,
cot_aware=adv_cot_aware,
n_sae_features=int(adv_n_sae_features),
)
pipeline_ref[0] = pipeline
pipeline.run()
@@ -2103,10 +2193,9 @@ def obliterate(model_choice: str, method_choice: str,
bnb_4bit_quant_type="nf4",
llm_int8_enable_fp32_cpu_offload=True,
)
model_reloaded = AutoModelForCausalLM.from_pretrained(
model_reloaded = _load_model_to_device(
save_dir,
quantization_config=bnb_cfg,
device_map="auto",
trust_remote_code=True,
)
tokenizer_reloaded = AutoTokenizer.from_pretrained(
@@ -2144,9 +2233,8 @@ def obliterate(model_choice: str, method_choice: str,
yield status_msg, "\n".join(log_lines), gr.update(), gr.update(), gr.update(), gr.update()
try:
offload_dir = tempfile.mkdtemp(prefix="obliteratus_offload_")
model_reloaded = AutoModelForCausalLM.from_pretrained(
model_reloaded = _load_model_to_device(
save_dir,
device_map="auto",
offload_folder=offload_dir,
torch_dtype=torch.float16,
trust_remote_code=True,
@@ -2307,8 +2395,8 @@ def chat_respond(message: str, history: list[dict], system_prompt: str,
if checkpoint and Path(checkpoint).exists():
try:
is_preset = (_state.get("model_name") or "") in MODELS
model = AutoModelForCausalLM.from_pretrained(
checkpoint, device_map="auto", torch_dtype=torch.float16,
model = _load_model_to_device(
checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer = AutoTokenizer.from_pretrained(
@@ -2498,8 +2586,8 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
if checkpoint and Path(checkpoint).exists():
is_preset = (_state.get("model_name") or "") in MODELS
try:
model_loaded = AutoModelForCausalLM.from_pretrained(
checkpoint, device_map="auto", torch_dtype=torch.float16,
model_loaded = _load_model_to_device(
checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer_loaded = AutoTokenizer.from_pretrained(
@@ -2559,9 +2647,8 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
is_preset = cfg["model_choice"] in MODELS
try:
model_loaded = AutoModelForCausalLM.from_pretrained(
model_loaded = _load_model_to_device(
checkpoint_dir,
device_map="auto",
torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
@@ -2595,10 +2682,9 @@ def load_bench_into_chat(choice: str, progress=gr.Progress()):
)
yield f"**Loading {choice}** in 4-bit (model too large for fp16)...", ""
progress(0.5, desc="Loading 4-bit...")
model_loaded = AutoModelForCausalLM.from_pretrained(
model_loaded = _load_model_to_device(
checkpoint_dir,
quantization_config=bnb_cfg,
device_map="auto",
trust_remote_code=is_preset,
)
tokenizer_loaded = AutoTokenizer.from_pretrained(
@@ -2740,8 +2826,8 @@ def ab_chat_respond(message: str, history_left: list[dict], history_right: list[
if checkpoint and Path(checkpoint).exists():
try:
is_preset = (model_name or "") in MODELS
abliterated_model = AutoModelForCausalLM.from_pretrained(
checkpoint, device_map="auto", torch_dtype=torch.float16,
abliterated_model = _load_model_to_device(
checkpoint, torch_dtype=torch.float16,
trust_remote_code=is_preset,
)
tokenizer = AutoTokenizer.from_pretrained(
@@ -2866,10 +2952,9 @@ def ab_chat_respond(message: str, history_left: list[dict], history_right: list[
is_preset = model_name in MODELS
original_response = ""
try:
from transformers import AutoModelForCausalLM as AMCLM
original_model = AMCLM.from_pretrained(
original_model = _load_model_to_device(
model_id, torch_dtype=torch.float16,
device_map="auto", trust_remote_code=is_preset,
trust_remote_code=is_preset,
low_cpu_mem_usage=True,
token=os.environ.get("HF_TOKEN") or None,
)
@@ -3026,6 +3111,9 @@ def strength_sweep(model_choice: str, method_choice: str,
entry["perplexity"] = metrics.get("perplexity")
entry["refusal_rate"] = metrics.get("refusal_rate")
entry["coherence"] = metrics.get("coherence")
entry["kl_divergence"] = metrics.get("kl_divergence")
entry["spectral_cert"] = metrics.get("spectral_certification") or ""
entry["direction_method"] = getattr(pipe, "direction_method", "")
entry["strong_layers"] = len(pipe._strong_layers)
if hasattr(pipe, "handle") and pipe.handle is not None:
pipe.handle.model = None
@@ -3115,17 +3203,21 @@ def _format_sweep_results(results: list[dict]) -> str:
return "*No results yet.*"
lines = ["### Strength Sweep Results", "",
"| Reg | Time | Perplexity | Refusal Rate | Coherence | Error |",
"|-----|------|-----------|-------------|-----------|-------|"]
"| Reg | Dir | Time | PPL | Refusal | Coherence | KL Div | Cert | Error |",
"|-----|-----|------|-----|---------|-----------|--------|------|-------|"]
for r in results:
reg = f"{r['regularization']:.3f}"
ppl = f"{r['perplexity']:.2f}" if r.get("perplexity") is not None else ""
ref = f"{r['refusal_rate']:.0%}" if r.get("refusal_rate") is not None else ""
coh = f"{r['coherence']:.0%}" if r.get("coherence") is not None else ""
kl_val = r.get("kl_divergence")
kl_str = f"{kl_val:.4f}" if kl_val is not None else ""
cert = r.get("spectral_cert", "") or ""
dir_m = r.get("direction_method", "") or ""
err = r.get("error", "")
err_short = (err[:25] + "...") if err and len(err) > 25 else (err or "")
lines.append(f"| {reg} | {r['time_s']}s | {ppl} | {ref} | {coh} | {err_short} |")
lines.append(f"| {reg} | {dir_m} | {r['time_s']}s | {ppl} | {ref} | {coh} | {kl_str} | {cert} | {err_short} |")
return "\n".join(lines)
@@ -3173,8 +3265,8 @@ def _tourney_gpu_wrapper(fn, *args, **kwargs):
return _tourney_gpu_run(fn, *args, **kwargs)
def run_tourney(model_choice, dataset, quantization):
"""Run an elimination tournament across all abliteration methods.
def run_tourney(model_choice, selected_methods, dataset, quantization):
"""Run an elimination tournament across selected abliteration methods.
Each individual method is run inside its own ``@spaces.GPU`` allocation
(up to 5 minutes per method) so the full tournament is not constrained
@@ -3187,6 +3279,10 @@ def run_tourney(model_choice, dataset, quantization):
yield "**Error:** Select a model first.", "", ""
return
if not selected_methods or len(selected_methods) < 3:
yield "**Error:** Select at least 3 methods for a tournament.", "", ""
return
from obliteratus.tourney import (
TourneyRunner, render_bracket_html,
_load_checkpoint, _checkpoint_matches,
@@ -3218,6 +3314,7 @@ def run_tourney(model_choice, dataset, quantization):
hub_repo=None,
dataset_key=dataset_key,
quantization=quant,
methods=list(selected_methods),
on_log=logger,
resume=resume,
)
@@ -3322,18 +3419,27 @@ def run_tourney(model_choice, dataset, quantization):
_ts = datetime.now().strftime("%H:%M")
_short = model_id.split("/")[-1] if "/" in model_id else model_id
_label = f"tourney winner ({winner.method}) on {_short} ({_ts})"
_winner_meta = {
"model_id": model_id,
"model_choice": model_choice,
"method": winner.method,
"dataset_key": dataset_key,
"prompt_volume": 0,
"output_dir": winner.output_dir,
"source": "tourney",
"tourney_score": winner.score,
"tourney_metrics": winner.metrics,
}
with _lock:
_session_models[_label] = {
"model_id": model_id,
"model_choice": model_choice,
"method": winner.method,
"dataset_key": dataset_key,
"prompt_volume": 0,
"output_dir": winner.output_dir,
"source": "tourney",
"tourney_score": winner.score,
"tourney_metrics": winner.metrics,
}
_session_models[_label] = _winner_meta
# Persist so the winner survives ZeroGPU process restarts
_persist_session_meta(winner.output_dir, _label, {
"model_id": model_id,
"model_choice": model_choice,
"method": winner.method,
"dataset_key": dataset_key,
"source": "tourney",
})
yield (
f"**Champion: `{winner.method}`** "
f"(score: {winner.score:.4f})\n"
@@ -3930,7 +4036,13 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
with gr.Row():
adv_n_directions = gr.Slider(
1, 8, value=_defaults["n_directions"], step=1,
label="Directions", info="Number of refusal directions to extract via SVD",
label="Directions", info="Number of refusal directions to extract",
)
adv_direction_method = gr.Radio(
choices=["diff_means", "svd", "leace"],
value=_defaults["direction_method"],
label="Direction Method",
info="diff_means: simple & robust, svd: multi-direction, leace: optimal erasure",
)
adv_regularization = gr.Slider(
0.0, 1.0, value=_defaults["regularization"], step=0.05,
@@ -3996,10 +4108,52 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
with gr.Row():
adv_spectral_cascade = gr.Checkbox(value=_defaults["spectral_cascade"], label="Spectral Cascade",
info="DCT frequency decomposition for precision refusal targeting")
gr.Markdown("**Layer Selection & Baseline Options**")
with gr.Row():
adv_layer_selection = gr.Dropdown(
choices=["knee_cosmic", "all", "all_except_first", "middle60", "top_k", "knee"],
value=_defaults["layer_selection"],
label="Layer Selection",
info="Which layers to project refusal directions from",
)
adv_winsorize_percentile = gr.Slider(
0.0, 1.0, value=_defaults["winsorize_percentile"], step=0.01,
label="Winsorize Percentile",
info="Activation clamping quantile (1.0 = disabled, 0.01 = 99th pctile)",
)
adv_kl_budget = gr.Slider(
0.0, 2.0, value=_defaults["kl_budget"], step=0.1,
label="KL Budget",
info="Max KL divergence from base model (Heretic/optimized)",
)
with gr.Row():
adv_winsorize = gr.Checkbox(value=_defaults["winsorize_activations"], label="Winsorize Activations",
info="Clamp outlier activations before direction extraction")
adv_kl_optimization = gr.Checkbox(value=_defaults["use_kl_optimization"], label="KL Optimization",
info="Optimize projection strength to stay within KL budget")
adv_float_layer_interp = gr.Checkbox(value=_defaults["float_layer_interpolation"], label="Float Layer Interpolation",
info="Interpolate between adjacent layers' directions (Heretic)")
adv_rdo_refinement = gr.Checkbox(value=_defaults["rdo_refinement"], label="RDO Refinement",
info="Gradient-based direction refinement (Wollschlager et al.)")
with gr.Row():
adv_cot_aware = gr.Checkbox(value=_defaults["cot_aware"], label="CoT-Aware",
info="Preserve chain-of-thought reasoning during abliteration")
with gr.Row():
adv_bayesian_trials = gr.Slider(
10, 200, value=_defaults["bayesian_trials"], step=10,
label="Bayesian Trials",
info="Optuna TPE optimization trials (Heretic/optimized methods)",
)
adv_n_sae_features = gr.Slider(
16, 256, value=_defaults["n_sae_features"], step=16,
label="SAE Features",
info="Number of SAE features to target (inverted/nuclear methods)",
)
# List of all advanced controls (order must match _on_method_change return)
_adv_controls = [
adv_n_directions, adv_regularization, adv_refinement_passes,
adv_n_directions, adv_direction_method,
adv_regularization, adv_refinement_passes,
adv_reflection_strength, adv_embed_regularization,
adv_steering_strength, adv_transplant_blend,
adv_spectral_bands, adv_spectral_threshold,
@@ -4011,6 +4165,12 @@ with gr.Blocks(theme=THEME, css=CSS, js=_JS, title="OBLITERATUS", fill_height=Tr
adv_project_embeddings, adv_activation_steering,
adv_expert_transplant, adv_wasserstein_optimal,
adv_spectral_cascade,
adv_layer_selection, adv_winsorize,
adv_winsorize_percentile,
adv_kl_optimization, adv_kl_budget,
adv_float_layer_interp, adv_rdo_refinement,
adv_cot_aware,
adv_bayesian_trials, adv_n_sae_features,
]
obliterate_btn = gr.Button(
@@ -4181,7 +4341,8 @@ result = client.predict(
mm_method = gr.Dropdown(
choices=["basic", "advanced", "aggressive",
"spectral_cascade", "informed", "surgical",
"optimized", "inverted", "nuclear"],
"optimized", "inverted", "nuclear",
"failspy", "gabliteration", "heretic", "rdo"],
value="surgical",
label="Abliteration Method",
)
@@ -4550,11 +4711,11 @@ tradeoff point where refusal is minimized with minimal capability damage.
# ── Tab 6: Tourney ────────────────────────────────────────────────
with gr.Tab("Tourney", id="tourney"):
gr.Markdown("""### March Madness Tournament
Pit **all abliteration methods** against each other in elimination rounds.
gr.Markdown("""### Tourney Mode
Pit abliteration methods against each other in elimination rounds.
The winner is saved locally — push it to HuggingFace Hub from the **Push to Hub** tab.
**Round 1 — Qualifiers:** All methods, reduced prompts. Bottom half eliminated.
**Round 1 — Qualifiers:** Selected methods, reduced prompts. Bottom half eliminated.
**Round 2 — Semifinals:** Survivors, full prompts. Bottom half eliminated.
**Round 3 — Finals:** Top contenders, maximum prompts. Champion crowned.
""")
@@ -4566,6 +4727,14 @@ The winner is saved locally — push it to HuggingFace Hub from the **Push to Hu
allow_custom_value=True,
)
from obliteratus.tourney import TOURNEY_METHODS as _ALL_TOURNEY_METHODS
tourney_methods_cb = gr.CheckboxGroup(
choices=_ALL_TOURNEY_METHODS,
value=_ALL_TOURNEY_METHODS,
label="Methods to Compete",
info="Pick at least 3 methods. All selected by default.",
)
with gr.Accordion("Advanced Settings", open=False):
with gr.Row():
tourney_dataset_dd = gr.Dropdown(
@@ -4595,9 +4764,16 @@ The winner is saved locally — push it to HuggingFace Hub from the **Push to Hu
tourney_btn.click(
fn=run_tourney,
inputs=[tourney_model_dd,
inputs=[tourney_model_dd, tourney_methods_cb,
tourney_dataset_dd, tourney_quant_dd],
outputs=[tourney_status, tourney_bracket, tourney_log],
).then(
fn=lambda: (
gr.update(choices=_get_session_model_choices()),
gr.update(choices=_get_session_model_choices()),
_get_vram_html(),
),
outputs=[session_model_dd, ab_session_model_dd, vram_display],
)
# ── Tab 7: Export ─────────────────────────────────────────────────
+58 -4
View File
@@ -1317,11 +1317,56 @@
<span class="method-label">AGGRESSIVE</span>
<span class="method-desc">Full Gabliteration + 3-pass refine</span>
</label>
<label class="method-radio" id="method-spectral_cascade" onclick="setAblMethod('spectral_cascade')">
<input type="radio" name="abl-method" value="spectral_cascade">
<span class="method-label">SPECTRAL</span>
<span class="method-desc">DCT frequency-selective decomposition</span>
</label>
<label class="method-radio" id="method-informed" onclick="setAblMethod('informed')" style="border-color:var(--cyan)">
<input type="radio" name="abl-method" value="informed">
<span class="method-label" style="color:var(--cyan)">INFORMED</span>
<span class="method-desc">Analysis-guided auto-config + Ouroboros</span>
</label>
<label class="method-radio" id="method-surgical" onclick="setAblMethod('surgical')">
<input type="radio" name="abl-method" value="surgical">
<span class="method-label">SURGICAL</span>
<span class="method-desc">Precision MoE-aware head surgery</span>
</label>
<label class="method-radio" id="method-optimized" onclick="setAblMethod('optimized')">
<input type="radio" name="abl-method" value="optimized">
<span class="method-label">OPTIMIZED</span>
<span class="method-desc">Bayesian auto-tuned + KL-optimized</span>
</label>
<label class="method-radio" id="method-inverted" onclick="setAblMethod('inverted')">
<input type="radio" name="abl-method" value="inverted">
<span class="method-label">INVERTED</span>
<span class="method-desc">Semantic refusal inversion</span>
</label>
<label class="method-radio" id="method-nuclear" onclick="setAblMethod('nuclear')">
<input type="radio" name="abl-method" value="nuclear">
<span class="method-label">NUCLEAR</span>
<span class="method-desc">Maximum force combo</span>
</label>
<label class="method-radio" id="method-failspy" onclick="setAblMethod('failspy')">
<input type="radio" name="abl-method" value="failspy">
<span class="method-label">FAILSPY</span>
<span class="method-desc">FailSpy/abliterator baseline</span>
</label>
<label class="method-radio" id="method-gabliteration" onclick="setAblMethod('gabliteration')">
<input type="radio" name="abl-method" value="gabliteration">
<span class="method-label">GABLIT</span>
<span class="method-desc">Gabliteration (G&uuml;lmez 2026) baseline</span>
</label>
<label class="method-radio" id="method-heretic" onclick="setAblMethod('heretic')">
<input type="radio" name="abl-method" value="heretic">
<span class="method-label">HERETIC</span>
<span class="method-desc">Heretic/p-e-w Bayesian baseline</span>
</label>
<label class="method-radio" id="method-rdo" onclick="setAblMethod('rdo')">
<input type="radio" name="abl-method" value="rdo">
<span class="method-label">RDO</span>
<span class="method-desc">Refusal Direction Optimization baseline</span>
</label>
</div>
<div id="method-details" style="margin-top:10px; font-size:0.7rem; color:var(--text-dim); padding:8px; border:1px solid rgba(188,19,254,0.2); border-radius:4px">
4 SVD directions &bull; norm-preserving &bull; 30% regularization &bull; 2 refinement passes &bull; 32 prompt pairs
@@ -1941,10 +1986,19 @@ function startAbliterateFromLibrary(hfId) {
let ablMethod = 'advanced';
const METHOD_INFO = {
basic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'1 direction &bull; standard projection &bull; 1 pass &bull; 32 prompt pairs'},
advanced: {dirs:4, norm:true, reg:0.3, passes:2, desc:'4 SVD directions &bull; norm-preserving &bull; 30% regularization &bull; 2 refinement passes &bull; 32 prompt pairs'},
aggressive: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions &bull; norm-preserving &bull; full orthogonalization &bull; 3 refinement passes &bull; 32 prompt pairs'},
informed: {dirs:'auto', norm:true, reg:'auto', passes:'auto', desc:'<span style="color:var(--cyan)">Analysis-guided</span> &bull; auto directions &bull; auto regularization &bull; Ouroboros-compensated &bull; cone/alignment/cluster/defense analysis'},
basic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'1 direction &bull; standard projection &bull; 1 pass'},
advanced: {dirs:4, norm:true, reg:0.3, passes:2, desc:'4 SVD directions &bull; norm-preserving &bull; 30% regularization &bull; 2 refinement passes'},
aggressive: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions &bull; norm-preserving &bull; full orthogonalization &bull; 3 refinement passes'},
spectral_cascade: {dirs:6, norm:true, reg:0.15, passes:1, desc:'6 whitened-SVD directions &bull; DCT frequency decomposition &bull; coherence-weighted &bull; adaptive bands'},
informed: {dirs:'auto', norm:true, reg:'auto', passes:'auto', desc:'<span style="color:var(--cyan)">Analysis-guided</span> &bull; auto directions &bull; auto regularization &bull; Ouroboros-compensated &bull; cone/alignment/cluster analysis'},
surgical: {dirs:4, norm:true, reg:0.2, passes:2, desc:'4 SVD directions &bull; attention head surgery &bull; SAE features &bull; safety neuron masking &bull; per-expert MoE'},
optimized: {dirs:4, norm:true, reg:0.2, passes:2, desc:'4 SVD directions &bull; Bayesian auto-tuned &bull; CoT-aware &bull; KL co-optimized &bull; winsorized activations'},
inverted: {dirs:4, norm:true, reg:0.1, passes:2, desc:'4 SVD directions &bull; semantic inversion (2x reflection) &bull; SAE feature targeting'},
nuclear: {dirs:8, norm:true, reg:0.0, passes:3, desc:'8 SVD directions &bull; all techniques combined &bull; maximum force &bull; head surgery + SAE + steering + transplant'},
failspy: {dirs:1, norm:false, reg:0.0, passes:1, desc:'<span style="color:var(--text-dim)">Baseline</span> &bull; 1 diff-means direction &bull; all layers except first &bull; FailSpy/abliterator reproduction'},
gabliteration: {dirs:4, norm:false, reg:0.231, passes:1, desc:'<span style="color:var(--text-dim)">Baseline</span> &bull; 4 SVD directions &bull; ridge reg (alpha=0.3) &bull; top-k layer selection &bull; G&uuml;lmez 2026'},
heretic: {dirs:1, norm:false, reg:0.0, passes:1, desc:'<span style="color:var(--text-dim)">Baseline</span> &bull; 1 diff-means &bull; Bayesian (Optuna TPE) &bull; KL-optimized &bull; float layer interpolation &bull; p-e-w'},
rdo: {dirs:4, norm:true, reg:0.0, passes:1, desc:'<span style="color:var(--text-dim)">Baseline</span> &bull; 4 SVD directions &bull; gradient-refined (RDO) &bull; linear probe classifier &bull; Wollschlager ICML 2025'},
};
function getAblCmd() {
+162 -49
View File
@@ -63,6 +63,7 @@ METHODS = {
"label": "Basic (Arditi et al.)",
"description": "Single refusal direction via difference-in-means",
"n_directions": 1,
"direction_method": "diff_means",
"norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
@@ -75,6 +76,7 @@ METHODS = {
"label": "Advanced (Multi-direction + Norm-preserving)",
"description": "SVD-based multi-direction extraction with norm preservation",
"n_directions": 4,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.3,
"embed_regularization": 0.5,
@@ -97,6 +99,7 @@ METHODS = {
"Zero regularization for maximum refusal removal."
),
"n_directions": 8,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 3,
@@ -124,6 +127,7 @@ METHODS = {
"separating trained-in refusal patterns from per-layer artifacts."
),
"n_directions": 6,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -146,25 +150,31 @@ METHODS = {
"Uses InformedAbliterationPipeline for the full feedback loop. "
"Auto-detects alignment method (DPO/RLHF/CAI/SFT), maps concept "
"cone geometry, performs cluster-aware layer selection, and gates "
"projection by safety-capability entanglement. Includes spectral "
"certification of abliteration completeness and Wasserstein-optimal "
"primary direction extraction."
"projection by safety-capability entanglement. Defaults to single "
"diff-of-means direction + Bayesian optimization (Heretic-style). "
"LEACE available via direction_method='leace'."
),
"n_directions": 4,
"n_directions": 1,
"direction_method": "diff_means",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
"project_biases": True,
"use_chat_template": True,
"use_whitened_svd": True,
"use_whitened_svd": False,
"true_iterative_refinement": True,
"use_jailbreak_contrast": False,
"layer_adaptive_strength": False,
"layer_adaptive_strength": True,
"safety_neuron_masking": False,
"per_expert_directions": False,
"attention_head_surgery": False,
"use_sae_features": False,
"use_wasserstein_optimal": True,
"use_wasserstein_optimal": False,
"use_kl_optimization": True,
"kl_budget": 0.5,
"float_layer_interpolation": True,
"winsorize_activations": True,
"winsorize_percentile": 0.01,
},
"surgical": {
"label": "Surgical (Full SOTA MoE-Aware)",
@@ -176,6 +186,7 @@ METHODS = {
"minimizing capability damage via precision targeting."
),
"n_directions": 8,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -204,6 +215,7 @@ METHODS = {
"techniques plus the inversion layer."
),
"n_directions": 8,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -234,6 +246,7 @@ METHODS = {
"Best for maximizing quality when compute budget allows ~50 trials."
),
"n_directions": 4,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 1,
@@ -275,6 +288,7 @@ METHODS = {
"runtime overhead except lightweight steering hooks."
),
"n_directions": 4,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 2,
@@ -314,12 +328,14 @@ METHODS = {
"description": (
"Faithful reproduction of the FailSpy/abliterator library — the "
"most widely used community tool. Single direction via difference-"
"in-means (Arditi et al.), middle 60%% layer heuristic (layers "
"20%%-80%%), no regularization, no norm preservation. Uses chat "
"template for instruct models. This is what most HuggingFace "
"abliterated models were created with."
"in-means (Arditi et al.), applied to all layers except layer 0 "
"(matching FailSpy source: range(1, n_layers)). Projects both "
"W_O (attention output) and MLP W_out. No regularization, no "
"norm preservation. Uses chat template for instruct models. "
"This is what most HuggingFace abliterated models were created with."
),
"n_directions": 1,
"direction_method": "diff_means",
"norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
@@ -334,7 +350,7 @@ METHODS = {
"attention_head_surgery": False,
"use_sae_features": False,
"invert_refusal": False,
"layer_selection": "middle60",
"layer_selection": "all_except_first",
},
"gabliteration": {
"label": "Gabliteration (Gülmez 2026 Baseline)",
@@ -347,6 +363,7 @@ METHODS = {
"whitened SVD, no iterative refinement."
),
"n_directions": 4,
"direction_method": "svd",
"norm_preserve": False,
# Ridge alpha=0.3 → effective reg = alpha/(1+alpha) = 0.3/1.3 ≈ 0.231
# For orthonormal V: P_V^alpha = 1/(1+alpha) * VV^T = 0.769 * VV^T
@@ -367,19 +384,26 @@ METHODS = {
"layer_selection": "top_k",
},
"heretic": {
"label": "Heretic / p-e-w (2025 Baseline)",
"label": "Heretic / p-e-w (2025-2026 Baseline)",
"description": (
"Faithful reproduction of Heretic's core algorithm (p-e-w, 2025). "
"Bayesian optimization via Optuna TPE with parametric bell curve "
"kernel. Uses 1-2 directions (float interpolation between top SVD "
"components), component-specific scaling (attention vs MLP), "
"activation winsorization (1%% tails). No whitened SVD, no SAE "
"features, no jailbreak contrast. The key innovation is replacing "
"Faithful reproduction of Heretic's core algorithm (p-e-w, 2025-2026). "
"Bayesian optimization via Optuna TPE with linear bell curve layer "
"weighting (NOT Gaussian — linear interpolation between max_weight and "
"min_weight over min_weight_distance). One diff-of-means direction per "
"layer; direction_scope is sampled ('global' selects a float layer index "
"with lerp between adjacent layers' directions, 'per layer' uses each "
"layer's own direction). LoRA-based ablation (delta W = -lambda * v * "
"(v^T W)), never modifies base weights directly. Row normalization "
"defaults to NONE (PRE and FULL are options). Activation winsorization "
"via symmetric quantile clamping. The key innovation is replacing "
"manual hyperparameter selection with automated Pareto optimization "
"over the (refusal_rate, KL_divergence) frontier."
"over the (refusal_count, KL_divergence) frontier."
),
"n_directions": 2,
"norm_preserve": True,
"n_directions": 1,
"direction_method": "diff_means",
# Heretic default row_normalization is NONE; PRE/FULL are optional.
# OBLITERATUS norm_preserve=False matches Heretic's default behavior.
"norm_preserve": False,
"regularization": 0.0,
"refinement_passes": 1,
"project_biases": False,
@@ -387,14 +411,21 @@ METHODS = {
"use_whitened_svd": False,
"true_iterative_refinement": False,
"use_jailbreak_contrast": False,
"layer_adaptive_strength": True,
# Heretic uses its own bell curve weighting (linear, not Gaussian),
# not OBLITERATUS's norm-based layer_adaptive_strength.
"layer_adaptive_strength": False,
"safety_neuron_masking": False,
"per_expert_directions": False,
"attention_head_surgery": False,
"use_sae_features": False,
"invert_refusal": False,
"winsorize_activations": True,
"winsorize_percentile": 0.01,
# Heretic default winsorization_quantile is 1.0 (disabled by default).
# For faithful baseline reproduction we match the source default.
"winsorize_activations": False,
"winsorize_percentile": 1.0,
# Heretic's float direction index interpolates between adjacent LAYERS'
# directions (not SVD components). OBLITERATUS float_layer_interpolation
# provides the bell-curve layer weighting aspect.
"float_layer_interpolation": True,
"cot_aware": False,
"use_kl_optimization": True,
@@ -414,6 +445,7 @@ METHODS = {
"boundary rather than the statistical activation difference."
),
"n_directions": 4,
"direction_method": "svd",
"norm_preserve": True,
"regularization": 0.0,
"refinement_passes": 1,
@@ -566,6 +598,7 @@ class AbliterationPipeline:
hub_token: str | None = None,
hub_community_org: str | None = None,
n_directions: int | None = None,
direction_method: str | None = None,
norm_preserve: bool | None = None,
regularization: float | None = None,
refinement_passes: int | None = None,
@@ -659,6 +692,7 @@ class AbliterationPipeline:
method_cfg = METHODS[method]
self.method = method
self.n_directions = n_directions if n_directions is not None else method_cfg["n_directions"]
self.direction_method = direction_method if direction_method is not None else method_cfg.get("direction_method", "svd")
self.norm_preserve = norm_preserve if norm_preserve is not None else method_cfg["norm_preserve"]
self.regularization = regularization if regularization is not None else method_cfg["regularization"]
self.refinement_passes = refinement_passes if refinement_passes is not None else method_cfg["refinement_passes"]
@@ -936,7 +970,7 @@ class AbliterationPipeline:
self.log(f"Loading model: {self.model_name}")
self.log(f"Device: {self.device} | Dtype: {self.dtype}")
self.log(f"Method: {method_label}")
self.log(f" Directions: {self.n_directions} | Norm-preserve: {self.norm_preserve}")
self.log(f" Directions: {self.n_directions} ({self.direction_method}) | Norm-preserve: {self.norm_preserve}")
self.log(f" Regularization: {self.regularization} | Refinement passes: {self.refinement_passes}")
self.handle = load_model(
@@ -1400,18 +1434,26 @@ class AbliterationPipeline:
else:
max_length = 384 if collect_multi_pos else 256
free_gb = dev.get_total_free_gb()
# Scale memory thresholds by model size — a 1.2B model needs far
# less KV-cache memory per token than a 7B model. Baseline
# thresholds (4 / 2 GB) were tuned for 7B (hidden=4096, layers=32).
_h = self.handle.hidden_size if self.handle else 4096
_l = n_layers if n_layers else 32
_mem_scale = (_h / 4096) * (_l / 32)
_tight_gb = max(4.0 * _mem_scale, 0.5)
_low_gb = max(2.0 * _mem_scale, 0.25)
if dev.is_gpu_available():
if self.max_seq_length is None and free_gb < 2.0:
if self.max_seq_length is None and free_gb < _low_gb:
max_length = 64
self.log(f" Low GPU memory ({free_gb:.1f} GB free), using max_length={max_length}")
elif self.max_seq_length is None and free_gb < 4.0:
self.log(f" Low GPU memory ({free_gb:.1f} GB free, threshold {_low_gb:.1f} GB), using max_length={max_length}")
elif self.max_seq_length is None and free_gb < _tight_gb:
max_length = 128
self.log(f" Tight GPU memory ({free_gb:.1f} GB free), using max_length={max_length}")
self.log(f" Tight GPU memory ({free_gb:.1f} GB free, threshold {_tight_gb:.1f} GB), using max_length={max_length}")
device = self._get_model_device(model)
# Batch prompts for throughput — hooks unbatch per-prompt activations
batch_size = 16 if free_gb > 4.0 else 8 if free_gb > 2.0 else 1
batch_size = 16 if free_gb > _tight_gb else 8 if free_gb > _low_gb else 1
# Left-pad so position -1 is always the last real token in every batch element
orig_padding_side = getattr(tokenizer, "padding_side", "right")
if batch_size > 1:
@@ -1498,9 +1540,16 @@ class AbliterationPipeline:
wasserstein_extractor = WassersteinOptimalExtractor()
self.log("Using Wasserstein-optimal direction extraction (cost-minimizing GEP)")
# Optionally use LEACE for theoretically optimal concept erasure
leace_extractor = None
if self.direction_method == "leace":
from obliteratus.analysis.leace import LEACEExtractor
leace_extractor = LEACEExtractor()
self.log("Using LEACE (closed-form optimal concept erasure) for direction extraction")
# Optionally use whitened SVD for cleaner direction extraction
whitened_extractor = None
if self.use_whitened_svd and n_dirs > 1 and not self.use_wasserstein_optimal:
if self.use_whitened_svd and n_dirs > 1 and not self.use_wasserstein_optimal and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
self.log("Using whitened SVD (covariance-normalized) for direction extraction")
@@ -1547,6 +1596,30 @@ class AbliterationPipeline:
if idx < 5:
self.log(f" layer {idx}: Wasserstein extraction failed ({e}), falling back to SVD")
if leace_extractor is not None:
# LEACE: closed-form optimal concept erasure direction
if idx in self._harmful_acts and idx in self._harmless_acts:
try:
l_result = leace_extractor.extract(
self._harmful_acts[idx],
self._harmless_acts[idx],
layer_idx=idx,
)
self.refusal_directions[idx] = l_result.direction
self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
norms[idx] = l_result.generalized_eigenvalue
if idx < 5 or idx == n_layers - 1:
self.log(
f" layer {idx}: LEACE eigenvalue={l_result.generalized_eigenvalue:.4f}, "
f"erasure_loss={l_result.erasure_loss:.4f}, "
f"cond={l_result.within_class_condition:.0f}"
)
continue
except Exception as e:
if idx < 5:
self.log(f" layer {idx}: LEACE failed ({e}), falling back to diff-of-means")
if n_dirs == 1:
# Classic single-direction: difference-in-means
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
@@ -1630,7 +1703,8 @@ class AbliterationPipeline:
# Supports multiple algorithms for baseline comparison:
# knee_cosmic: OBLITERATUS default (knee detection + COSMIC fusion)
# knee: knee detection only (simplified OBLITERATUS)
# middle60: FailSpy/abliterator heuristic (layers 20%-80%)
# middle60: legacy heuristic (layers 20%-80%)
# all_except_first: FailSpy/abliterator (all layers except layer 0)
# all: all layers (for Bayesian optimization / Heretic)
# top_k: top-k by refusal strength (Gabliteration-style)
sorted_layers = sorted(norms.items(), key=lambda x: x[1], reverse=True)
@@ -1643,8 +1717,14 @@ class AbliterationPipeline:
selection_method = self.layer_selection
if selection_method == "middle60":
# FailSpy/abliterator heuristic: middle 60% of layers
if selection_method == "all_except_first":
# FailSpy/abliterator: all layers except layer 0
# Source: range(1, self.model.cfg.n_layers) in FailSpy/abliterator
self._strong_layers = list(range(1, n_layers))
self.log(f"Layer selection: all-except-first ({len(self._strong_layers)} layers)")
elif selection_method == "middle60":
# Legacy heuristic: middle 60% of layers (layers 20%-80%)
self._strong_layers = self._select_layers_middle60(n_layers)
self.log(f"Layer selection: middle-60% ({len(self._strong_layers)} layers)")
@@ -2300,14 +2380,14 @@ class AbliterationPipeline:
@staticmethod
def _select_layers_middle60(n_layers: int) -> list[int]:
"""Select the middle 60% of layers (FailSpy/abliterator heuristic).
"""Select the middle 60% of layers (legacy heuristic).
The original abliterator library by FailSpy selects layers from index
n_layers*0.2 to n_layers*0.8, based on the empirical observation that
refusal concentrates in middle layers (not early embedding layers or
late unembedding layers).
Selects layers from index n_layers*0.2 to n_layers*0.8.
Reference: FailSpy/abliterator (2024), GitHub.
NOTE: This does NOT match FailSpy/abliterator's actual layer selection.
FailSpy uses all layers except layer 0 (range(1, n_layers)). Use
layer_selection="all_except_first" for faithful FailSpy reproduction.
This method is retained for backward compatibility only.
"""
start = int(n_layers * 0.2)
end = int(n_layers * 0.8)
@@ -3589,9 +3669,18 @@ class AbliterationPipeline:
except Exception:
pass
# Use LEACE when enabled (matching main _distill)
leace_extractor = None
if self.direction_method == "leace":
try:
from obliteratus.analysis.leace import LEACEExtractor
leace_extractor = LEACEExtractor()
except Exception:
pass
# Use whitened SVD when enabled (matching main _distill)
whitened_extractor = None
if self.use_whitened_svd and n_dirs > 1 and wasserstein_extractor is None:
if self.use_whitened_svd and n_dirs > 1 and wasserstein_extractor is None and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
@@ -3624,6 +3713,22 @@ class AbliterationPipeline:
except Exception:
pass # Fall through to SVD
# LEACE path (matching main _distill)
if leace_extractor is not None:
if idx in self._harmful_acts and idx in self._harmless_acts:
try:
l_result = leace_extractor.extract(
self._harmful_acts[idx],
self._harmless_acts[idx],
layer_idx=idx,
)
self.refusal_directions[idx] = l_result.direction
self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
norms[idx] = l_result.generalized_eigenvalue
continue
except Exception:
pass # Fall through to diff-of-means
if n_dirs == 1:
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
norm = diff.norm()
@@ -3667,7 +3772,9 @@ class AbliterationPipeline:
# Respect configured layer_selection (matching _distill)
selection_method = self.layer_selection
if selection_method == "middle60":
if selection_method == "all_except_first":
self._strong_layers = list(range(1, n_layers))
elif selection_method == "middle60":
self._strong_layers = self._select_layers_middle60(n_layers)
elif selection_method == "all":
self._strong_layers = self._select_layers_all(n_layers)
@@ -5663,8 +5770,9 @@ class AbliterationPipeline:
cert_n = min(20, len(self.harmful_prompts), len(self.harmless_prompts))
cert_harmful = self._maybe_apply_chat_template(self.harmful_prompts[:cert_n])
cert_harmless = self._maybe_apply_chat_template(self.harmless_prompts[:cert_n])
cert_h_acts = self._collect_activations(layers, cert_harmful, "cert_harmful")
cert_b_acts = self._collect_activations(layers, cert_harmless, "cert_harmless")
cert_layer_modules = get_layer_modules(self.handle)
cert_h_acts = self._collect_activations(cert_layer_modules, cert_harmful, "cert_harmful")
cert_b_acts = self._collect_activations(cert_layer_modules, cert_harmless, "cert_harmless")
cert_results = []
for layer_idx in cert_layers:
@@ -5741,6 +5849,7 @@ class AbliterationPipeline:
"method": self.method,
"method_config": {
"n_directions": self.n_directions,
"direction_method": self.direction_method,
"norm_preserve": self.norm_preserve,
"regularization": self.regularization,
"refinement_passes": self.refinement_passes,
@@ -5868,10 +5977,11 @@ class AbliterationPipeline:
param_bytes = sum(v.numel() * v.element_size() for v in state_dict.values())
self.log(f"State dict: {len(state_dict)} tensors, {param_bytes / 1e9:.1f} GB")
# 3. NOW it's safe to clean up the offload dir — all weights are in memory.
self._cleanup_offload_dir()
# 4. Save model + tokenizer + metadata
# 3. Save model + tokenizer + metadata
# NOTE: offload dir cleanup is deferred until AFTER save_pretrained
# completes, because accelerate's dispatch hooks may still access
# the offload dir during serialization (even when state_dict is
# explicitly provided).
self.output_dir.mkdir(parents=True, exist_ok=True)
self.log(f"Saving model to {self.output_dir}/")
@@ -5940,6 +6050,9 @@ class AbliterationPipeline:
del state_dict
self._free_gpu_memory()
# NOW it's safe to clean up the offload dir — save_pretrained is done.
self._cleanup_offload_dir()
self.handle.tokenizer.save_pretrained(self.output_dir)
(self.output_dir / "abliteration_metadata.json").write_text(
@@ -269,7 +269,7 @@ class ConditionalAbliterator:
) -> torch.Tensor | None:
"""Extract category-specific refusal direction.
Uses Fisher's Linear Discriminant (whitened difference-of-means)
Uses difference-of-means (category_mean - harmless_mean)
and then orthogonalizes against previously extracted directions
to ensure category independence.
"""
+239
View File
@@ -0,0 +1,239 @@
"""LEACE-inspired direction extraction for refusal concept erasure.
This module implements Fisher's Linear Discriminant (FLD) direction for
concept erasure, inspired by LEACE (Belrose et al. 2023).
IMPORTANT: This is NOT a faithful implementation of LEACE as described in
the paper. Key difference:
- **True LEACE** uses the *total* covariance Sigma_X for whitening:
P* = I - W^{-1} P_{W Sigma_XZ} W where W = Sigma_X^{-1/2}
For binary concepts, this yields: v = Sigma_X^{-1} delta
- **This implementation** uses *within-class* covariance S_w:
v = S_w^{-1} delta
This is Fisher's Linear Discriminant direction, which maximizes
class separability relative to within-class spread.
For binary concepts, Sigma_X = S_w + p(1-p) * delta @ delta^T,
so the two directions differ when the between-class scatter is
non-negligible relative to within-class scatter. In high-dimensional
settings (d >> 1) with moderate class separation, the difference
is typically small but non-zero.
The FLD direction is still a strong choice for refusal erasure — it
handles rogue dimensions (high-variance but non-discriminative) better
than plain diff-of-means, and is a closed-form solution with no
iterative optimization.
Advantages over SVD:
- Within-class normalization prevents high-variance but
non-discriminative dimensions from dominating
- No hyperparameters beyond regularization epsilon
- Closed-form solution (no iterative optimization)
References:
- Belrose et al. (2023): LEACE: Perfect linear concept erasure in
closed form. NeurIPS 2023.
- Ravfogel et al. (2022): RLACE: Adversarial concept erasure
(iterative precursor to LEACE).
- Fisher (1936): The use of multiple measurements in taxonomic
problems. Annals of Eugenics.
"""
from __future__ import annotations
from dataclasses import dataclass
import torch
@dataclass
class LEACEResult:
"""Result of LEACE direction extraction for a single layer."""
layer_idx: int
direction: torch.Tensor # (hidden_dim,) unit vector
generalized_eigenvalue: float # lambda from GEP (discriminability)
within_class_condition: float # condition number of S_w
mean_diff_norm: float # ||mu_1 - mu_0||
erasure_loss: float # expected squared distortion from erasure
class LEACEExtractor:
"""Extract refusal directions via Fisher's Linear Discriminant.
Finds the direction that maximally separates harmful from harmless
activations relative to within-class variance (v = S_w^{-1} delta).
See module docstring for how this relates to true LEACE.
"""
def __init__(
self,
regularization_eps: float = 1e-4,
shrinkage: float = 0.0,
):
"""
Args:
regularization_eps: Tikhonov regularization for S_w inversion.
Larger values produce more conservative (but stable) results.
shrinkage: Ledoit-Wolf shrinkage toward identity (0..1).
0 = no shrinkage, 1 = full shrinkage to scaled identity.
Useful when n_samples < hidden_dim.
"""
self.regularization_eps = regularization_eps
self.shrinkage = shrinkage
def extract(
self,
harmful_activations: list[torch.Tensor],
harmless_activations: list[torch.Tensor],
layer_idx: int = 0,
) -> LEACEResult:
"""Extract the LEACE direction for a single layer.
Args:
harmful_activations: List of (hidden_dim,) tensors from harmful prompts.
harmless_activations: List of (hidden_dim,) tensors from harmless prompts.
layer_idx: Layer index (for metadata).
Returns:
LEACEResult with the optimal erasure direction.
"""
H = torch.stack(harmful_activations).float() # (n_h, d)
B = torch.stack(harmless_activations).float() # (n_b, d)
if H.dim() == 3:
H = H.squeeze(1)
if B.dim() == 3:
B = B.squeeze(1)
n_h, d = H.shape
n_b = B.shape[0]
# Class-conditional means
mu_h = H.mean(dim=0) # (d,)
mu_b = B.mean(dim=0) # (d,)
# Mean difference (between-class direction)
delta = mu_h - mu_b # (d,)
delta_norm = delta.norm().item()
# Within-class covariance: S_w = (S_h + S_b) / 2
# where S_h = (H - mu_h)^T (H - mu_h) / (n_h - 1) etc.
H_centered = H - mu_h.unsqueeze(0)
B_centered = B - mu_b.unsqueeze(0)
S_h = (H_centered.T @ H_centered) / max(n_h - 1, 1)
S_b = (B_centered.T @ B_centered) / max(n_b - 1, 1)
S_w = (S_h + S_b) / 2.0 # (d, d)
# Apply Ledoit-Wolf shrinkage if requested
if self.shrinkage > 0:
trace_S_w = S_w.trace().item()
S_w = (1 - self.shrinkage) * S_w + self.shrinkage * (trace_S_w / d) * torch.eye(d, device=S_w.device)
# Regularize S_w for numerical stability
S_w_reg = S_w + self.regularization_eps * torch.eye(d, device=S_w.device)
# Condition number of S_w (for diagnostics)
try:
eigs_w = torch.linalg.eigvalsh(S_w_reg)
eigs_w = eigs_w.clamp(min=0)
pos_eigs = eigs_w[eigs_w > eigs_w.max() * 1e-10]
condition = (pos_eigs.max() / pos_eigs.min()).item() if pos_eigs.numel() > 0 else float('inf')
except Exception:
condition = float('inf')
# LEACE direction via S_w^{-1} @ delta
# The generalized eigenvector for rank-1 S_between = delta @ delta^T
# reduces to: v = S_w^{-1} @ delta (up to normalization)
try:
# Use solve for numerical stability (avoids explicit inverse)
v = torch.linalg.solve(S_w_reg, delta) # (d,)
except torch.linalg.LinAlgError:
# Fallback: pseudoinverse
v = torch.linalg.lstsq(S_w_reg, delta.unsqueeze(1)).solution.squeeze(1)
# Normalize to unit length
v_norm = v.norm()
if v_norm > 1e-8:
direction = v / v_norm
else:
# Degenerate case: fall back to normalized mean difference
direction = delta / max(delta_norm, 1e-8)
# Generalized eigenvalue: lambda = delta^T @ S_w^{-1} @ delta
# This measures how discriminable the classes are after whitening
gen_eigenvalue = (delta @ v).item()
# Erasure loss: expected squared distortion E[||x - x'||^2]
# For rank-1 projection: loss = v^T @ S_total @ v where S_total
# is the total (pooled) covariance
all_acts = torch.cat([H, B], dim=0)
mu_total = all_acts.mean(dim=0)
centered_total = all_acts - mu_total.unsqueeze(0)
S_total = (centered_total.T @ centered_total) / max(all_acts.shape[0] - 1, 1)
erasure_loss = (direction @ S_total @ direction).item()
return LEACEResult(
layer_idx=layer_idx,
direction=direction,
generalized_eigenvalue=gen_eigenvalue,
within_class_condition=condition,
mean_diff_norm=delta_norm,
erasure_loss=erasure_loss,
)
def extract_all_layers(
self,
harmful_acts: dict[int, list[torch.Tensor]],
harmless_acts: dict[int, list[torch.Tensor]],
) -> dict[int, LEACEResult]:
"""Extract LEACE directions for all layers.
Args:
harmful_acts: {layer_idx: [activations]} from activation collection.
harmless_acts: {layer_idx: [activations]} from activation collection.
Returns:
{layer_idx: LEACEResult} for each layer.
"""
results = {}
for idx in sorted(harmful_acts.keys()):
if idx not in harmless_acts:
continue
results[idx] = self.extract(
harmful_acts[idx],
harmless_acts[idx],
layer_idx=idx,
)
return results
@staticmethod
def compare_with_diff_of_means(
leace_result: LEACEResult,
harmful_mean: torch.Tensor,
harmless_mean: torch.Tensor,
) -> dict[str, float]:
"""Compare LEACE direction with simple diff-of-means.
Returns cosine similarity and diagnostic metrics showing how much
the within-class normalization rotates the direction.
"""
diff = harmful_mean.squeeze() - harmless_mean.squeeze()
diff_norm = diff.norm()
if diff_norm > 1e-8:
diff_normalized = diff / diff_norm
else:
diff_normalized = diff
cosine_sim = (leace_result.direction @ diff_normalized).abs().item()
return {
"cosine_similarity": cosine_sim,
"leace_eigenvalue": leace_result.generalized_eigenvalue,
"leace_erasure_loss": leace_result.erasure_loss,
"within_class_condition": leace_result.within_class_condition,
"mean_diff_norm": leace_result.mean_diff_norm,
}
+15 -4
View File
@@ -428,8 +428,15 @@ class RiemannianManifoldAnalyzer:
geodesic triangle with area A satisfies:
sum(angles) = pi + K * A (Gauss-Bonnet for small triangles)
We approximate geodesics with straight lines (valid for small K)
and use angle excess to estimate K.
IMPORTANT LIMITATION: This method uses Euclidean chords and angles
in ambient space, NOT geodesics on the manifold. In flat Euclidean
space, the angle sum of any triangle is exactly pi, so this method
will yield K ≈ 0 (up to numerical noise) regardless of the actual
manifold curvature. The results are only meaningful when the data
lies on an approximately low-dimensional curved submanifold and
triangles are sufficiently small relative to the curvature radius.
For rigorous curvature estimates, use methods based on local PCA
eigenvalue decay or Jacobian-based Riemannian metric computation.
"""
# Compute sides
ab = (b - a).float()
@@ -613,8 +620,12 @@ class RiemannianManifoldAnalyzer:
return torch.zeros_like(activation)
v = v / norm
# Correction magnitude: K * proj_magnitude^2 / 2
correction_magnitude = curvature * proj_magnitude ** 2 / 2.0
# Second-order geodesic correction: K * proj_magnitude^2 / 6
# From Jacobi field estimate: deviation of geodesic from straight
# line over distance L with curvature K is ≈ K * L^2 / 6.
# Note: the residual bound in analyze() uses K * ||x||^2 / 8
# which is a looser upper bound including higher-order terms.
correction_magnitude = curvature * proj_magnitude ** 2 / 6.0
# Clamp to prevent instability
correction_magnitude = max(-0.1, min(0.1, correction_magnitude))
+1 -1
View File
@@ -94,7 +94,7 @@ class SparseAutoencoder(nn.Module):
@property
def decoder_weight(self) -> torch.Tensor:
"""Return the decoder weight matrix (n_features x hidden_dim for untied, or encoder.weight.T)."""
"""Return the decoder weight matrix (hidden_dim x n_features for untied, or encoder.weight.T)."""
if self.tied_weights:
return self.encoder.weight.T
return self.decoder.weight
+10 -4
View File
@@ -175,10 +175,11 @@ class SpectralCertifier:
harmful_centered = harmful_activations - harmful_mean
harmless_centered = harmless_activations - harmless_mean
# Pooled within-class covariance
# Pooled within-class covariance (standard formula: sum of scatter
# matrices divided by total degrees of freedom)
cov_h = harmful_centered.T @ harmful_centered / max(n_h - 1, 1)
cov_b = harmless_centered.T @ harmless_centered / max(n_b - 1, 1)
pooled_cov = (cov_h * n_h + cov_b * n_b) / max(n - 2, 1)
pooled_cov = (cov_h * (n_h - 1) + cov_b * (n_b - 1)) / max(n - 2, 1)
# Step 2: Estimate noise variance (median eigenvalue method)
noise_var = self._estimate_noise_variance(pooled_cov, n, d)
@@ -374,8 +375,13 @@ class SpectralCertifier:
# Correct for MP bias: median of MP distribution
gamma = d / max(n, 1)
if gamma < 1:
# MP median approximation (from Bai & Silverstein)
mp_median_ratio = (1 + math.sqrt(gamma)) ** 2 * 0.5
# MP median approximation. The exact MP median requires
# numerical inversion of the MP CDF; we use the empirical
# approximation median ≈ (1 - sqrt(gamma))^2 + gamma^(1/3)
# which is more accurate than the naive 0.5 * upper_edge
# for small gamma. Falls back to the simpler formula when
# gamma is very small.
mp_median_ratio = (1 - math.sqrt(gamma)) ** 2 + gamma ** (1.0 / 3.0)
noise_var = median_eig / max(mp_median_ratio, 1e-10)
else:
noise_var = median_eig
+1 -1
View File
@@ -58,7 +58,7 @@ class WassersteinDirectionResult:
direction: torch.Tensor # (hidden_dim,) optimal direction
wasserstein_cost: float # W_2^2 cost for this direction
mean_shift_component: float # (r^T m)^2 portion
bures_component: float # r^T Sigma r portion (upper bound)
bures_component: float # r^T Sigma r portion (exact when r is eigenvector of Sigma, lower bound otherwise)
refusal_projection: float # (r^T d)^2
cost_effectiveness_ratio: float # W_2^2 / (r^T d)^2
+147 -90
View File
@@ -142,28 +142,35 @@ def _parametric_layer_weight(
min_weight: float,
spread: float,
) -> float:
"""Compute ablation weight for a layer using a parametric bell curve.
"""Compute ablation weight for a layer using a piecewise-linear tent kernel.
This is the Heretic-style parametric kernel:
- max_weight: peak ablation strength (0..1)
- peak_position: normalized position of peak (0..1 maps to layer 0..n_layers-1)
- min_weight: minimum ablation weight at the tails
- spread: controls width of the bell curve (higher = wider)
Faithful reproduction of Heretic's parametric kernel (p-e-w/heretic):
- max_weight: peak ablation strength at peak_position
- peak_position: normalized position of peak (0..1)
- min_weight: weight at the edges of the tent
- spread: normalized distance from peak to tent edge (min_weight_distance)
Returns a value in [min_weight, max_weight] representing how strongly
to ablate this layer (1.0 = full projection, 0.0 = no projection).
Layers beyond ``spread`` from the peak get weight 0 (skipped entirely).
Within the tent, weight drops linearly from max_weight to min_weight.
This matches Heretic's actual formula::
distance = abs(layer_index - max_weight_position)
if distance > min_weight_distance: skip
weight = max_weight + (distance / min_weight_distance) * (min_weight - max_weight)
"""
if n_layers <= 1:
return max_weight
normalized_pos = layer_idx / (n_layers - 1)
peak = peak_position
# Gaussian-shaped kernel
dist = abs(normalized_pos - peak)
sigma = max(spread, 0.01)
gauss = math.exp(-0.5 * (dist / sigma) ** 2)
dist = abs(normalized_pos - peak_position)
min_weight_distance = max(spread, 0.01)
return min_weight + (max_weight - min_weight) * gauss
# Hard cutoff: layers outside the tent get 0 weight (Heretic skips them)
if dist > min_weight_distance:
return 0.0
# Linear interpolation: max_weight at peak → min_weight at edges
return max_weight + (dist / min_weight_distance) * (min_weight - max_weight)
def _interpolate_direction(
@@ -171,37 +178,56 @@ def _interpolate_direction(
layer_idx: int,
float_dir_idx: float,
) -> torch.Tensor:
"""Get an interpolated refusal direction from a float-valued index.
"""Get an interpolated refusal direction from a float-valued layer index.
Non-integer values interpolate between adjacent SVD directions in the
refusal subspace, unlocking a continuous space of directions beyond
the discrete top-k.
Faithful reproduction of Heretic's direction interpolation: the index
selects which *layer's* diff-of-means direction to use, with float
values interpolating between adjacent layers' directions. This is
fundamentally different from interpolating between SVD components
within a single layer it searches across the layer axis.
From Heretic source (model.py)::
weight, index = math.modf(direction_index + 1)
refusal_direction = F.normalize(
refusal_directions[int(index)].lerp(
refusal_directions[int(index) + 1], weight), p=2, dim=0)
Args:
pipeline: Pipeline with extracted refusal subspaces.
layer_idx: Which layer's subspace to use.
float_dir_idx: Continuous direction index (e.g., 0.7 interpolates
between direction 0 and direction 1).
pipeline: Pipeline with extracted refusal directions per layer.
layer_idx: The layer being projected (used as fallback).
float_dir_idx: Continuous direction index selects which layer's
direction to use (e.g., 5.3 interpolates 70% layer-5 + 30% layer-6).
Returns:
Normalized direction tensor.
"""
subspace = pipeline.refusal_subspaces.get(layer_idx)
if subspace is None or subspace.shape[0] == 0:
# Build sorted list of layer indices that have refusal directions
sorted_layers = sorted(pipeline.refusal_directions.keys())
if not sorted_layers:
return pipeline.refusal_directions.get(layer_idx, torch.zeros(1))
n_dirs = subspace.shape[0]
# Clamp to valid range
float_dir_idx = max(0.0, min(float_dir_idx, n_dirs - 1))
n_layers_with_dirs = len(sorted_layers)
# Heretic uses direction_index + 1 offset; we map float_dir_idx into
# the sorted layer list, clamped to valid range.
float_dir_idx = max(0.0, min(float_dir_idx, n_layers_with_dirs - 1))
lo = int(float_dir_idx)
hi = min(lo + 1, n_dirs - 1)
hi = min(lo + 1, n_layers_with_dirs - 1)
lo_layer = sorted_layers[lo]
hi_layer = sorted_layers[hi]
d_lo = pipeline.refusal_directions[lo_layer]
d_hi = pipeline.refusal_directions[hi_layer]
if lo == hi:
d = subspace[lo]
d = d_lo
else:
# Linear interpolation between adjacent layers' directions
alpha = float_dir_idx - lo
d = (1.0 - alpha) * subspace[lo] + alpha * subspace[hi]
d = (1.0 - alpha) * d_lo + alpha * d_hi
norm = d.norm()
if norm > 1e-8:
@@ -342,9 +368,14 @@ def run_bayesian_optimization(
for live_data, saved_clone in original_params: # noqa: F821
live_data.copy_(saved_clone.to(live_data.device))
# Warm-start values for the parametric kernel
# Estimate peak position from strongest layer
if pipeline._strong_layers:
# Warm-start values for the parametric kernel.
# If the informed pipeline provided analysis-derived warm-start params,
# use those (they're much better than the default heuristic).
informed_warm = getattr(pipeline, "_informed_warm_start", None)
if informed_warm:
warm_peak = informed_warm.get("peak_position", 0.5)
pipeline.log(f" Using analysis-informed warm-start (peak={warm_peak:.2f})")
elif pipeline._strong_layers:
peak_layer = pipeline._strong_layers[0]
warm_peak = peak_layer / max(n_total_layers - 1, 1)
else:
@@ -356,56 +387,56 @@ def run_bayesian_optimization(
# Suppress Optuna's verbose logging
optuna.logging.set_verbosity(optuna.logging.WARNING)
# Max SVD directions available (for float direction interpolation)
max_n_dirs = max(
(pipeline.refusal_subspaces[idx].shape[0]
for idx in pipeline._strong_layers
if idx in pipeline.refusal_subspaces),
default=1,
)
# Max layers with directions (for float direction interpolation)
n_layers_with_dirs = len([
idx for idx in pipeline._strong_layers
if idx in pipeline.refusal_directions
])
# ── Phase 1: Parametric kernel optimization (compact search space) ──
# Heretic uses SEPARATE kernel parameters for attention and MLP,
# allowing them to peak at different layers (8 params + 1 dir_idx = 9).
def objective(trial: optuna.Trial) -> tuple[float, float]:
"""Multi-objective: minimize (refusal_rate, kl_divergence)."""
_restore_all()
# Parametric kernel: 4 params describe the entire layer weighting
max_weight = trial.suggest_float("max_weight", 0.5, 1.0)
peak_position = trial.suggest_float("peak_position", 0.1, 0.9)
min_weight = trial.suggest_float("min_weight", 0.0, 0.3)
spread = trial.suggest_float("spread", 0.1, 0.6)
# Attention kernel: 4 params
attn_max = trial.suggest_float("attn_max_weight", 0.5, 1.0)
attn_peak = trial.suggest_float("attn_peak_position", 0.1, 0.9)
attn_min = trial.suggest_float("attn_min_weight", 0.0, 0.3)
attn_spread = trial.suggest_float("attn_spread", 0.1, 0.6)
# Component-specific scaling (Heretic insight: MLP more damaging)
attn_scale = trial.suggest_float("attn_scale", 0.5, 1.0)
mlp_scale = trial.suggest_float("mlp_scale", 0.3, 1.0)
# MLP kernel: 4 params (separate — can peak at a different layer)
mlp_max = trial.suggest_float("mlp_max_weight", 0.3, 1.0)
mlp_peak = trial.suggest_float("mlp_peak_position", 0.1, 0.9)
mlp_min = trial.suggest_float("mlp_min_weight", 0.0, 0.3)
mlp_spread = trial.suggest_float("mlp_spread", 0.1, 0.6)
# Float direction index (continuous interpolation between SVD dirs)
dir_idx = trial.suggest_float("dir_idx", 0.0, max(max_n_dirs - 1, 0.0))
# Float direction index (cross-layer interpolation, Heretic-style)
dir_idx = trial.suggest_float("dir_idx", 0.0, max(n_layers_with_dirs - 1, 0.0))
# Compute per-layer regularization from parametric kernel
layer_regs: dict[int, float] = {}
# Compute per-layer, per-component regularization from kernels
attn_regs: dict[int, float] = {}
mlp_regs: dict[int, float] = {}
for idx in pipeline._strong_layers:
weight = _parametric_layer_weight(
idx, n_total_layers, max_weight, peak_position, min_weight, spread,
)
# Convert weight to regularization (weight=1 → reg=0, weight=0 → reg=1)
layer_regs[idx] = 1.0 - weight
attn_w = _parametric_layer_weight(idx, n_total_layers, attn_max, attn_peak, attn_min, attn_spread)
mlp_w = _parametric_layer_weight(idx, n_total_layers, mlp_max, mlp_peak, mlp_min, mlp_spread)
attn_regs[idx] = 1.0 - attn_w
mlp_regs[idx] = 1.0 - mlp_w
# Apply projection with trial's parameters
for idx in pipeline._strong_layers:
if idx not in pipeline.refusal_subspaces:
if idx not in pipeline.refusal_directions:
continue
# Use interpolated direction
# Use cross-layer interpolated direction
direction = _interpolate_direction(pipeline, idx, dir_idx)
d_col = direction.to(device=next(layer_modules[idx].parameters()).device)
d_col = d_col.unsqueeze(-1) if d_col.dim() == 1 else d_col
reg = layer_regs[idx]
# Attention projection (with attn_scale)
attn_reg = 1.0 - (1.0 - reg) * attn_scale
# Attention projection (with per-component kernel)
attn_reg = attn_regs[idx]
try:
attn = get_attention_module(layer_modules[idx], arch)
pipeline._project_out_advanced(
@@ -416,8 +447,8 @@ def run_bayesian_optimization(
except (AttributeError, RuntimeError):
pass
# MLP/FFN projection (with mlp_scale)
mlp_reg = 1.0 - (1.0 - reg) * mlp_scale
# MLP/FFN projection (with per-component kernel)
mlp_reg = mlp_regs[idx]
try:
ffn = get_ffn_module(layer_modules[idx], arch)
count = pipeline._project_out_advanced(
@@ -439,18 +470,20 @@ def run_bayesian_optimization(
refusal = _measure_refusal_rate(pipeline, n_prompts=n_refusal_prompts)
kl = _measure_kl_divergence(pipeline, reference_logits, kl_prompts)
# Track best combined score
# Track best combined score (use average of attn/mlp regs for layer_regs)
nonlocal best_score, best_result
combined = refusal + 0.5 * kl
if combined < best_score:
best_score = combined
best_result = dict(layer_regs)
best_result = {
idx: (attn_regs[idx] + mlp_regs[idx]) / 2.0
for idx in pipeline._strong_layers
}
pipeline.log(
f" Trial {trial.number + 1}/{n_trials}: "
f"refusal={refusal:.0%}, KL={kl:.4f} "
f"(peak={peak_position:.2f}, spread={spread:.2f}, "
f"attn={attn_scale:.2f}, mlp={mlp_scale:.2f}, dir={dir_idx:.2f})"
f"(attn_peak={attn_peak:.2f}, mlp_peak={mlp_peak:.2f}, dir={dir_idx:.2f})"
)
return refusal, kl
@@ -462,16 +495,33 @@ def run_bayesian_optimization(
study_name="obliteratus_parametric_optimization",
)
# Enqueue warm-start trial with analysis-derived estimates
warm_params = {
"max_weight": 0.9,
"peak_position": warm_peak,
"min_weight": 0.05,
"spread": 0.3,
"attn_scale": 0.8,
"mlp_scale": 0.6,
"dir_idx": 0.0,
}
# Enqueue warm-start trial with analysis-derived estimates.
# Translate informed pipeline params to the new per-component format.
if informed_warm:
iw = informed_warm
warm_params = {
"attn_max_weight": iw.get("max_weight", 0.9),
"attn_peak_position": iw.get("peak_position", warm_peak),
"attn_min_weight": iw.get("min_weight", 0.05),
"attn_spread": iw.get("spread", 0.3),
"mlp_max_weight": iw.get("max_weight", 0.9) * iw.get("mlp_scale", 0.6),
"mlp_peak_position": iw.get("peak_position", warm_peak),
"mlp_min_weight": iw.get("min_weight", 0.05),
"mlp_spread": iw.get("spread", 0.3),
"dir_idx": iw.get("dir_idx", 0.0),
}
else:
warm_params = {
"attn_max_weight": 0.9,
"attn_peak_position": warm_peak,
"attn_min_weight": 0.05,
"attn_spread": 0.3,
"mlp_max_weight": 0.6,
"mlp_peak_position": warm_peak,
"mlp_min_weight": 0.05,
"mlp_spread": 0.3,
"dir_idx": 0.0,
}
study.enqueue_trial(warm_params)
pipeline.log(f"Bayesian optimization: running {n_trials} trials (parametric kernel)...")
@@ -490,25 +540,32 @@ def run_bayesian_optimization(
p = best_trial.params
best_result = {}
for idx in pipeline._strong_layers:
weight = _parametric_layer_weight(
attn_w = _parametric_layer_weight(
idx, n_total_layers,
p["max_weight"], p["peak_position"],
p["min_weight"], p["spread"],
p["attn_max_weight"], p["attn_peak_position"],
p["attn_min_weight"], p["attn_spread"],
)
best_result[idx] = 1.0 - weight
mlp_w = _parametric_layer_weight(
idx, n_total_layers,
p["mlp_max_weight"], p["mlp_peak_position"],
p["mlp_min_weight"], p["mlp_spread"],
)
best_result[idx] = (attn_w + mlp_w) / 2.0 # average for layer-level reg
best_result[idx] = 1.0 - best_result[idx]
pipeline.log(
f" Best trial: refusal={best_trial.values[0]:.0%}, "
f"KL={best_trial.values[1]:.4f}"
)
pipeline.log(
f" Kernel: peak={p['peak_position']:.2f}, spread={p['spread']:.2f}, "
f"max={p['max_weight']:.2f}, min={p['min_weight']:.2f}"
f" Attn kernel: peak={p['attn_peak_position']:.2f}, "
f"spread={p['attn_spread']:.2f}, max={p['attn_max_weight']:.2f}"
)
pipeline.log(
f" Components: attn={p['attn_scale']:.2f}, mlp={p['mlp_scale']:.2f}, "
f"dir_idx={p['dir_idx']:.2f}"
f" MLP kernel: peak={p['mlp_peak_position']:.2f}, "
f"spread={p['mlp_spread']:.2f}, max={p['mlp_max_weight']:.2f}"
)
pipeline.log(f" dir_idx={p['dir_idx']:.2f}")
# Store the best direction index for use during EXCISE
best_dir_idx = p.get("dir_idx", 0.0)
@@ -518,9 +575,9 @@ def run_bayesian_optimization(
new_dir = _interpolate_direction(pipeline, idx, best_dir_idx)
pipeline.refusal_directions[idx] = new_dir
# Store component scales for use in EXCISE
pipeline._bayesian_attn_scale = p.get("attn_scale", 1.0)
pipeline._bayesian_mlp_scale = p.get("mlp_scale", 1.0)
# Store component scales for use in EXCISE (backward compat)
pipeline._bayesian_attn_scale = p.get("attn_max_weight", 1.0)
pipeline._bayesian_mlp_scale = p.get("mlp_max_weight", 1.0)
elif best_result:
pipeline.log(f" Using best combined score: {best_score:.4f}")
+7 -1
View File
@@ -109,7 +109,12 @@ def main(argv: list[str] | None = None):
],
help="Liberation method (default: advanced)",
)
p.add_argument("--n-directions", type=int, default=None, help="Override: number of SVD directions to extract")
p.add_argument("--n-directions", type=int, default=None, help="Override: number of refusal directions to extract")
p.add_argument(
"--direction-method", type=str, default=None,
choices=["diff_means", "svd", "leace"],
help="Direction extraction method: diff_means (simple, robust), svd (multi-direction), leace (optimal erasure)",
)
p.add_argument("--regularization", type=float, default=None, help="Override: fraction to preserve (0.0-1.0)")
p.add_argument("--refinement-passes", type=int, default=None, help="Override: number of iterative passes")
p.add_argument(
@@ -591,6 +596,7 @@ def _cmd_abliterate(args):
dtype=args.dtype,
method=method,
n_directions=args.n_directions,
direction_method=getattr(args, "direction_method", None),
regularization=args.regularization,
refinement_passes=args.refinement_passes,
quantization=args.quantization,
+12 -11
View File
@@ -334,19 +334,20 @@ def _load_harmbench_classifier():
bnb_4bit_quant_type="nf4",
llm_int8_enable_fp32_cpu_offload=True,
)
model = AutoModelForCausalLM.from_pretrained(
model_id,
quantization_config=bnb_cfg,
device_map="auto",
torch_dtype=torch.float16,
)
load_kwargs = dict(quantization_config=bnb_cfg, torch_dtype=torch.float16)
if dev.supports_device_map_auto():
load_kwargs["device_map"] = "auto"
model = AutoModelForCausalLM.from_pretrained(model_id, **load_kwargs)
except Exception:
logger.info("4-bit quantization unavailable for classifier, loading in float16")
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.float16,
)
load_kwargs = dict(torch_dtype=torch.float16)
if dev.supports_device_map_auto():
load_kwargs["device_map"] = "auto"
model = AutoModelForCausalLM.from_pretrained(model_id, **load_kwargs)
# On MPS/CPU: move model to best available device
if not dev.supports_device_map_auto():
model = model.to(dev.get_device())
model.eval()
_HARMBENCH_CLASSIFIER = (model, tokenizer)
+377 -53
View File
@@ -73,15 +73,17 @@ INFORMED_METHOD = {
"description": (
"Runs analysis modules between PROBE and DISTILL to auto-configure "
"direction extraction, layer selection, and projection strategy based "
"on the model's actual refusal geometry."
"on the model's actual refusal geometry. Defaults to single diff-of-means "
"direction + Bayesian optimization (Heretic-style)."
),
"n_directions": 4, # overridden by analysis
"n_directions": 1, # overridden by analysis
"direction_method": "diff_means", # overridden by analysis; "leace" also available
"norm_preserve": True,
"regularization": 0.0, # overridden by analysis
"refinement_passes": 2, # overridden by analysis
"project_biases": True,
"use_chat_template": True,
"use_whitened_svd": True, # overridden by analysis
"use_whitened_svd": False, # overridden by analysis
"true_iterative_refinement": True,
}
@@ -126,7 +128,8 @@ class AnalysisInsights:
clean_layers: list[int] = field(default_factory=list)
# Derived configuration
recommended_n_directions: int = 4
recommended_n_directions: int = 1
recommended_direction_method: str = "diff_means"
recommended_regularization: float = 0.0
recommended_refinement_passes: int = 2
recommended_layers: list[int] = field(default_factory=list)
@@ -217,12 +220,19 @@ class InformedAbliterationPipeline(AbliterationPipeline):
hub_token=hub_token,
hub_community_org=hub_community_org,
quantization=quantization,
# Set informed defaults
# Set informed defaults: single direction + Bayesian opt
n_directions=1,
direction_method="diff_means",
norm_preserve=True,
project_biases=True,
use_chat_template=True,
use_whitened_svd=True,
use_whitened_svd=False,
true_iterative_refinement=True,
use_kl_optimization=True,
float_layer_interpolation=True,
layer_adaptive_strength=True,
winsorize_activations=True,
winsorize_percentile=0.01,
)
self.method = "informed"
@@ -311,7 +321,11 @@ class InformedAbliterationPipeline(AbliterationPipeline):
if self._run_defense:
self._analyze_defense_robustness()
# 5. Derive configuration from insights
# 5. Sparse Surgery Analysis (RSI computation)
if self._run_sparse:
self._analyze_sparsity()
# 6. Derive configuration from insights
self._derive_configuration()
elapsed = time.time() - t0
@@ -392,6 +406,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
sample_layers = candidate_layers[::step]
polyhedral_count = 0
all_results = []
best_cone_result = None
best_strength = 0.0
@@ -405,34 +420,43 @@ class InformedAbliterationPipeline(AbliterationPipeline):
layer_idx=layer_idx,
)
all_results.append(result)
if result.is_polyhedral:
polyhedral_count += 1
# Track the strongest layer's cone analysis
# Track the strongest layer's cone analysis for per-category directions
general_strength = result.general_direction.norm().item() if result.general_direction.numel() > 1 else 0
if general_strength > best_strength:
best_strength = general_strength
best_cone_result = result
if best_cone_result is not None:
self._insights.cone_is_polyhedral = best_cone_result.is_polyhedral
self._insights.cone_dimensionality = best_cone_result.cone_dimensionality
self._insights.mean_pairwise_cosine = best_cone_result.mean_pairwise_cosine
if all_results:
# Aggregate cone geometry across sampled layers (majority vote +
# mean dimensionality) instead of relying on a single layer.
n_sampled = len(all_results)
is_polyhedral = polyhedral_count > n_sampled / 2
avg_dimensionality = sum(r.cone_dimensionality for r in all_results) / n_sampled
avg_pairwise_cos = sum(r.mean_pairwise_cosine for r in all_results) / n_sampled
# Store per-category directions for category-aware excision
for cd in best_cone_result.category_directions:
self._insights.per_category_directions[cd.category] = cd.direction
self._insights.direction_specificity[cd.category] = cd.specificity
self._insights.cone_is_polyhedral = is_polyhedral
self._insights.cone_dimensionality = avg_dimensionality
self._insights.mean_pairwise_cosine = avg_pairwise_cos
cone_type = "POLYHEDRAL" if best_cone_result.is_polyhedral else "LINEAR"
self.log(f" Cone type: {cone_type}")
self.log(f" Dimensionality: {best_cone_result.cone_dimensionality:.2f}")
self.log(f" Mean pairwise cosine: {best_cone_result.mean_pairwise_cosine:.3f}")
self.log(f" Categories detected: {best_cone_result.category_count}")
self.log(f" Polyhedral at {polyhedral_count}/{len(sample_layers)} sampled layers")
# Store per-category directions from the strongest layer
if best_cone_result is not None:
for cd in best_cone_result.category_directions:
self._insights.per_category_directions[cd.category] = cd.direction
self._insights.direction_specificity[cd.category] = cd.specificity
for cd in sorted(best_cone_result.category_directions, key=lambda x: -x.strength)[:5]:
self.log(f" {cd.category:15s} DSI={cd.specificity:.3f} str={cd.strength:.3f}")
cone_type = "POLYHEDRAL" if is_polyhedral else "LINEAR"
self.log(f" Cone type: {cone_type} (majority vote: {polyhedral_count}/{n_sampled} layers)")
self.log(f" Avg dimensionality: {avg_dimensionality:.2f}")
self.log(f" Avg pairwise cosine: {avg_pairwise_cos:.3f}")
if best_cone_result is not None:
self.log(f" Categories detected: {best_cone_result.category_count}")
for cd in sorted(best_cone_result.category_directions, key=lambda x: -x.strength)[:5]:
self.log(f" {cd.category:15s} DSI={cd.specificity:.3f} str={cd.strength:.3f}")
else:
self.log(" No cone results — using default linear assumption")
@@ -517,6 +541,71 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log(f" Most entangled layers: {emap.most_entangled_layers}")
self.log(f" Cleanest layers: {emap.least_entangled_layers}")
def _analyze_sparsity(self):
"""Compute Refusal Sparsity Index to decide sparse vs dense excision."""
self.log("\n[5/5] Refusal Sparsity Analysis")
self.log("-" * 40)
from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon
from obliteratus.strategies.utils import (
get_ffn_module,
get_layer_modules,
)
# Need refusal directions — use quick diff-in-means
quick_directions = {}
for idx in sorted(self._harmful_means.keys()):
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze()
norm = diff.norm().item()
if norm > 1e-10:
quick_directions[idx] = diff / diff.norm()
if not quick_directions:
self.log(" No refusal directions — skipping sparsity analysis")
return
# Gather FFN output weights for representative layers (sample for speed)
layers = get_layer_modules(self.handle)
arch = self.handle.architecture
n_layers = len(layers)
sample_idxs = sorted(quick_directions.keys())
step = max(1, len(sample_idxs) // 8)
sample_idxs = sample_idxs[::step]
weights = {}
sampled_dirs = {}
for idx in sample_idxs:
if idx >= n_layers:
continue
try:
ffn = get_ffn_module(layers[idx], arch)
for name in ["down_proj", "c_proj", "dense_4h_to_h", "fc_out", "fc2", "w2"]:
proj = getattr(ffn, name, None)
if proj is not None and hasattr(proj, "weight"):
W = proj.weight.data
d = quick_directions[idx]
if W.shape[-1] == d.shape[0]:
weights[idx] = W
sampled_dirs[idx] = d
break
except (AttributeError, RuntimeError):
continue
if not weights:
self.log(" Could not access FFN weights — skipping sparsity analysis")
return
surgeon = SparseDirectionSurgeon(auto_sparsity=True)
plan = surgeon.plan_surgery(weights, sampled_dirs)
self._insights.mean_refusal_sparsity_index = plan.mean_refusal_sparsity_index
self._insights.recommended_sparsity = plan.recommended_sparsity
self.log(f" Mean RSI: {plan.mean_refusal_sparsity_index:.3f}")
self.log(f" Recommended sparsity: {plan.recommended_sparsity:.1%}")
self.log(f" Most sparse layer: {plan.most_sparse_layer}")
self.log(f" Most dense layer: {plan.most_dense_layer}")
# ── Configuration Derivation ─────────────────────────────────────
def _derive_configuration(self):
@@ -528,18 +617,32 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log("-" * 50)
insights = self._insights
# 1. n_directions: based on cone geometry
if insights.cone_is_polyhedral:
# Polyhedral cone → need more directions to capture all facets
# 1. n_directions + direction_method: based on cone geometry
# Default: single direction via diff-of-means (proven most robust).
# Only escalate to multi-direction when analysis confirms polyhedral geometry.
if insights.cone_is_polyhedral and insights.cone_dimensionality > 2.0:
# Clearly polyhedral cone → use multiple directions via SVD
n_dirs = max(4, min(8, int(insights.cone_dimensionality * 2)))
self.direction_method = "svd"
self.use_whitened_svd = True
self.log(f" Polyhedral cone (dim={insights.cone_dimensionality:.1f}) "
f"→ n_directions={n_dirs}")
f"→ n_directions={n_dirs}, method=svd (whitened)")
elif insights.cone_is_polyhedral:
# Mildly polyhedral → LEACE gives better single-direction erasure
n_dirs = 1
self.direction_method = "leace"
self.use_whitened_svd = False
self.log(f" Mildly polyhedral (dim={insights.cone_dimensionality:.1f}) "
f"→ n_directions=1, method=leace")
else:
# Linear cone → fewer directions suffice
n_dirs = max(1, min(4, int(insights.cone_dimensionality + 1)))
# Linear cone → single direction via diff-of-means (simplest, most robust)
n_dirs = 1
self.direction_method = "diff_means"
self.use_whitened_svd = False
self.log(f" Linear cone (dim={insights.cone_dimensionality:.1f}) "
f"→ n_directions={n_dirs}")
f"→ n_directions=1, method=diff_means")
insights.recommended_n_directions = n_dirs
insights.recommended_direction_method = self.direction_method
self.n_directions = n_dirs
# 2. regularization: based on alignment method + entanglement
@@ -586,15 +689,22 @@ class InformedAbliterationPipeline(AbliterationPipeline):
# 4. Layer selection: cluster-aware + entanglement-gated
if insights.cluster_representative_layers:
# Start from cluster representatives
# Start from cluster representatives (strongest per cluster)
base_layers = list(insights.cluster_representative_layers)
# Expand: add all layers from clusters that have strong signals
all_cluster_layers = []
# Conservative expansion: for each cluster, add at most the top-2
# strongest layers (by refusal norm) beyond the representative,
# to avoid over-modifying weak layers in large clusters.
norms = {}
for idx in self._harmful_means:
if idx in self._harmless_means:
norms[idx] = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze().norm().item()
for cluster in insights.direction_clusters:
all_cluster_layers.extend(cluster)
if all_cluster_layers:
base_layers = sorted(set(all_cluster_layers))
ranked = sorted(cluster, key=lambda ly: norms.get(ly, 0), reverse=True)
# Add up to 2 additional strong layers per cluster
for ly in ranked[:3]: # representative + up to 2 more
base_layers.append(ly)
base_layers = sorted(set(base_layers))
# Gate: remove highly entangled layers
skip = set()
@@ -621,13 +731,9 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.log(f" RSI={insights.mean_refusal_sparsity_index:.2f} "
f"→ standard dense projection")
# 6. Whitened SVD: always use for multi-direction, skip for single
if n_dirs > 1:
self.use_whitened_svd = True
self.log(f" Multi-direction ({n_dirs}) → whitened SVD enabled")
else:
self.use_whitened_svd = False
self.log(" Single direction → standard diff-in-means")
# 6. Direction method summary (already set in step 1)
self.log(f" Direction method: {self.direction_method} "
f"(whitened_svd={'on' if self.use_whitened_svd else 'off'})")
# ── Informed DISTILL ─────────────────────────────────────────────
@@ -650,7 +756,38 @@ class InformedAbliterationPipeline(AbliterationPipeline):
n_layers = len(self._harmful_means)
norms: dict[int, float] = {}
if self.use_whitened_svd and self.n_directions > 1:
# ── Small-model direction cap (matching base _distill) ────────
# On small models, each SVD direction removes a proportionally
# larger fraction of weight energy. Cap to prevent over-ablation.
hidden_size = self.handle.hidden_size if self.handle else 0
total_params = getattr(self.handle, 'total_params', 0) if self.handle else 0
if total_params == 0 and self.handle:
try:
total_params = sum(p.numel() for p in self.handle.model.parameters())
except Exception:
pass
if self.n_directions > 1 and (
(0 < hidden_size < 2048)
or (0 < total_params < 2_000_000_000)
or n_layers <= 16
):
max_dirs = max(1, min(self.n_directions, 2))
if max_dirs < self.n_directions:
self.log(
f"Capped n_directions from {self.n_directions} to {max_dirs} "
f"for small model (hidden={hidden_size}, "
f"params={total_params / 1e9:.1f}B, layers={n_layers})"
)
self.n_directions = max_dirs
# LEACE extractor for optimal concept erasure
leace_extractor = None
if self.direction_method == "leace":
from obliteratus.analysis.leace import LEACEExtractor
leace_extractor = LEACEExtractor()
self.log(f"Using LEACE (closed-form optimal concept erasure)")
if self.use_whitened_svd and self.n_directions > 1 and leace_extractor is None:
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
whitened_extractor = WhitenedSVDExtractor()
self.log(f"Using whitened SVD with {self.n_directions} directions")
@@ -658,6 +795,29 @@ class InformedAbliterationPipeline(AbliterationPipeline):
whitened_extractor = None
for idx in range(n_layers):
# LEACE path: theoretically optimal single-direction erasure
if leace_extractor is not None:
if idx in self._harmful_acts and idx in self._harmless_acts:
try:
l_result = leace_extractor.extract(
self._harmful_acts[idx],
self._harmless_acts[idx],
layer_idx=idx,
)
self.refusal_directions[idx] = l_result.direction
self.refusal_subspaces[idx] = l_result.direction.unsqueeze(0)
norms[idx] = l_result.generalized_eigenvalue
if idx < 5 or idx == n_layers - 1:
self.log(
f" layer {idx}: LEACE eigenvalue={l_result.generalized_eigenvalue:.4f}, "
f"erasure_loss={l_result.erasure_loss:.4f}"
)
continue
except Exception as e:
if idx < 5:
self.log(f" layer {idx}: LEACE failed ({e}), falling back")
if self.n_directions == 1:
diff = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze(0)
norm = diff.norm().item()
@@ -691,6 +851,41 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self.refusal_directions[idx] = primary / primary.norm()
norms[idx] = S[:k].sum().item()
# Enrich subspaces with per-category cone directions when available.
# This uses the actual refusal cone generators instead of purely
# data-agnostic SVD components.
cat_dirs = self._insights.per_category_directions
if cat_dirs and self._insights.cone_is_polyhedral and self.n_directions > 1:
cat_tensors = list(cat_dirs.values())
# Stack and orthogonalize category directions
cat_stack = torch.stack(cat_tensors) # (n_cats, hidden)
cat_norms = cat_stack.norm(dim=1, keepdim=True).clamp(min=1e-8)
cat_stack = cat_stack / cat_norms
# Blend into strong-signal layers: replace later SVD components
# with category directions (which are geometrically meaningful)
n_cat = cat_stack.shape[0]
for idx in norms:
sub = self.refusal_subspaces.get(idx)
if sub is None or sub.shape[0] <= 1:
continue
# Keep the first SVD direction (strongest), replace remaining
# with category directions projected to be orthogonal to it
primary = sub[0:1] # (1, hidden)
# Project category directions orthogonal to primary
cos = (cat_stack @ primary.squeeze(0)) # (n_cat,)
ortho_cats = cat_stack - cos.unsqueeze(1) * primary
ortho_norms = ortho_cats.norm(dim=1)
# Keep only directions that survived orthogonalization
valid = ortho_norms > 0.1
if valid.sum() > 0:
ortho_cats = ortho_cats[valid]
ortho_cats = ortho_cats / ortho_cats.norm(dim=1, keepdim=True)
# Take up to (n_directions - 1) category directions
n_take = min(self.n_directions - 1, ortho_cats.shape[0])
new_sub = torch.cat([primary, ortho_cats[:n_take]], dim=0)
self.refusal_subspaces[idx] = new_sub
self.log(f"Enriched subspaces with {n_cat} per-category cone directions")
# Layer selection: use analysis-recommended layers if available,
# otherwise fall back to knee detection
if self._insights.recommended_layers:
@@ -728,15 +923,117 @@ class InformedAbliterationPipeline(AbliterationPipeline):
def _excise_informed(self):
"""Excise refusal directions with analysis-informed strategy.
Uses sparse surgery if analysis recommends it, otherwise falls
back to the standard projection with analysis-tuned parameters.
Uses Bayesian optimization (when available) with analysis-derived
warm-start parameters, falling back to sparse surgery or standard
projection. This is the key integration: analysis maps the geometry,
Bayesian optimization finds the optimal projection strength.
"""
if self._insights.use_sparse_surgery:
self._excise_sparse()
return
# Enable Bayesian optimization using analysis insights for warm-start.
# The analysis provides much better initial parameters than the default
# heuristic (strongest-layer-based peak), dramatically narrowing the
# search space and improving convergence.
self._configure_bayesian_warm_start()
self._excise()
def _configure_bayesian_warm_start(self):
"""Configure Bayesian optimization with analysis-derived warm-start.
Translates analysis insights into a much tighter search space:
- peak_position from cluster representative layers
- spread from cluster structure (narrow clusters narrow spread)
- component scaling from entanglement analysis
- KL budget from alignment method detection
"""
insights = self._insights
# Enable Bayesian optimization (50 trials default, same as heretic)
self._bayesian_trials = 50
# Also set heretic-compatible flags on the pipeline so the base
# _excise_inner() picks them up during Bayesian optimization.
self.layer_adaptive_strength = True
self.float_layer_interpolation = True
self.use_kl_optimization = True
# KL budget: tighter for methods that are fragile (CAI, RLHF),
# looser for concentrated methods (DPO, SFT).
method = insights.detected_alignment_method
if method == "dpo":
self.kl_budget = 0.5
elif method == "rlhf":
self.kl_budget = 0.3
elif method == "cai":
self.kl_budget = 0.2
elif method == "sft":
self.kl_budget = 0.4
else:
# Standard excision with analysis-tuned parameters
# (regularization, norm_preserve, etc. already configured)
self._excise()
self.kl_budget = 0.35
self.log(f"Bayesian optimization enabled (50 trials, KL budget={self.kl_budget})")
self.log("Analysis insights will warm-start the optimizer")
# Compute analysis-derived warm-start for the parametric kernel.
# The Bayesian optimizer reads these from the pipeline if present.
n_layers = len(self._harmful_means) if self._harmful_means else 32
if insights.cluster_representative_layers and n_layers > 1:
# Peak position: normalized position of the strongest cluster rep
norms = {}
for idx in self._harmful_means:
if idx in self._harmless_means:
norms[idx] = (self._harmful_means[idx] - self._harmless_means[idx]).squeeze().norm().item()
reps = insights.cluster_representative_layers
if norms:
best_rep = max(reps, key=lambda ly: norms.get(ly, 0))
else:
best_rep = reps[len(reps) // 2]
warm_peak = best_rep / max(n_layers - 1, 1)
# Spread: narrow if clusters are tight, wide if clusters span many layers
if insights.direction_clusters:
cluster_widths = [
(max(c) - min(c)) / max(n_layers - 1, 1)
for c in insights.direction_clusters if len(c) > 1
]
warm_spread = max(0.1, min(0.6, sum(cluster_widths) / len(cluster_widths) if cluster_widths else 0.3))
else:
warm_spread = 0.3
# Min weight: higher if high persistence (refusal spread across all layers)
warm_min = min(0.3, max(0.0, insights.direction_persistence * 0.2))
# Attn/MLP scaling: reduce MLP scaling if entanglement is high
# (MLP projections cause more capability damage)
if insights.entanglement_score > 0.5:
warm_mlp = 0.4
warm_attn = 0.7
else:
warm_mlp = 0.6
warm_attn = 0.8
else:
warm_peak = 0.5
warm_spread = 0.3
warm_min = 0.05
warm_mlp = 0.6
warm_attn = 0.8
# Store warm-start params for the Bayesian optimizer to pick up
self._informed_warm_start = {
"max_weight": 0.9,
"peak_position": warm_peak,
"min_weight": warm_min,
"spread": warm_spread,
"attn_scale": warm_attn,
"mlp_scale": warm_mlp,
"dir_idx": 0.0,
}
self.log(
f" Warm-start: peak={warm_peak:.2f}, spread={warm_spread:.2f}, "
f"min={warm_min:.2f}, attn={warm_attn:.2f}, mlp={warm_mlp:.2f}"
)
def _excise_sparse(self):
"""Sparse direction surgery — only modifies high-projection rows."""
@@ -825,14 +1122,22 @@ class InformedAbliterationPipeline(AbliterationPipeline):
1. Residual refusal signal (via activation probing)
2. Self-repair / Ouroboros effect (via defense robustness)
3. Triggers additional targeted passes at compensating layers
KL-gated: stops early if model damage (KL divergence) is getting
worse even though refusal persists. This prevents the death spiral
where each pass damages the model without removing refusal.
"""
# Run standard verification first
self._verify()
# Check if Ouroboros compensation is needed
refusal_rate = self._quality_metrics.get("refusal_rate", 0.0)
prev_kl = self._quality_metrics.get("kl_divergence", 0.0)
ouroboros_pass = 0
# KL budget: stop if KL exceeds this threshold (model too damaged)
kl_ceiling = getattr(self, "kl_budget", 0.5) * 2.0 # 2x budget as hard ceiling
while (refusal_rate > self._ouroboros_threshold
and ouroboros_pass < self._max_ouroboros_passes):
ouroboros_pass += 1
@@ -849,9 +1154,9 @@ class InformedAbliterationPipeline(AbliterationPipeline):
self._distill_inner()
self.log(f"Found {len(self._strong_layers)} layers with residual refusal")
# Re-excise at the new strong layers
# Re-excise at the new strong layers using informed strategy
if self._strong_layers:
self._excise()
self._excise_informed()
else:
self.log("No strong layers found — stopping Ouroboros compensation")
break
@@ -859,7 +1164,24 @@ class InformedAbliterationPipeline(AbliterationPipeline):
# Re-verify
self._verify()
refusal_rate = self._quality_metrics.get("refusal_rate", 0.0)
self.log(f"After Ouroboros pass {ouroboros_pass}: refusal rate = {refusal_rate:.0%}")
current_kl = self._quality_metrics.get("kl_divergence", 0.0)
self.log(f"After Ouroboros pass {ouroboros_pass}: refusal={refusal_rate:.0%}, KL={current_kl:.4f}")
# KL-gated early stopping: if KL is rising and exceeds ceiling,
# the model is being damaged faster than refusal is being removed.
if current_kl > kl_ceiling:
self.log(
f"KL divergence {current_kl:.4f} exceeds ceiling {kl_ceiling:.4f}"
f"stopping to prevent further model damage"
)
break
if ouroboros_pass > 1 and current_kl > prev_kl * 1.5 and refusal_rate > 0.3:
self.log(
f"KL rising sharply ({prev_kl:.4f}{current_kl:.4f}) with "
f"refusal still at {refusal_rate:.0%} — stopping (diminishing returns)"
)
break
prev_kl = current_kl
self._report.ouroboros_passes = ouroboros_pass
self._report.final_refusal_rate = refusal_rate
@@ -903,6 +1225,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
},
"derived_config": {
"n_directions": insights.recommended_n_directions,
"direction_method": insights.recommended_direction_method,
"regularization": insights.recommended_regularization,
"refinement_passes": insights.recommended_refinement_passes,
"layers_used": insights.recommended_layers,
@@ -981,6 +1304,7 @@ class InformedAbliterationPipeline(AbliterationPipeline):
lines.append("Derived Configuration:")
lines.append(f" n_directions: {insights.recommended_n_directions}")
lines.append(f" direction_method: {insights.recommended_direction_method}")
lines.append(f" regularization: {insights.recommended_regularization}")
lines.append(f" refinement_passes: {insights.recommended_refinement_passes}")
lines.append(f" sparse surgery: {insights.use_sparse_surgery}")
+71 -12
View File
@@ -55,26 +55,45 @@ def composite_score(metrics: dict[str, Any]) -> float:
"""Score an abliteration run on [0, 1]. Higher is better.
Weights:
40% refusal removal the whole point
30% coherence model must still be useful
35% refusal removal the whole point
25% coherence model must still be useful
20% KL divergence minimal capability damage
10% perplexity fluency preservation
5% spectral cert formal completeness guarantee
5% degenerate penalty penalize broken output
"""
rr = metrics.get("refusal_rate")
co = metrics.get("coherence")
kl = metrics.get("kl_divergence")
pp = metrics.get("perplexity")
spec = metrics.get("spectral_certification")
degen = metrics.get("degenerate_count", 0) or 0
refusal_score = (1.0 - rr) if rr is not None else 0.0
coherence_score = co if co is not None else 0.0
kl_score = 1.0 / (1.0 + kl) if kl is not None else 0.5
ppl_score = 1.0 / (1.0 + pp / 100.0) if pp is not None else 0.5
# Spectral certification: GREEN=1.0, YELLOW=0.5, RED=0.0, None=0.5 (neutral)
if spec == "GREEN":
spec_score = 1.0
elif spec == "YELLOW":
spec_score = 0.5
elif spec == "RED":
spec_score = 0.0
else:
spec_score = 0.5 # not measured → neutral
# Degenerate penalty: any broken outputs reduce score
degen_score = 1.0 / (1.0 + degen) if degen > 0 else 1.0
return (
refusal_score * 0.4
+ coherence_score * 0.3
+ kl_score * 0.2
+ ppl_score * 0.1
refusal_score * 0.35
+ coherence_score * 0.25
+ kl_score * 0.20
+ ppl_score * 0.10
+ spec_score * 0.05
+ degen_score * 0.05
)
@@ -94,6 +113,8 @@ class Contender:
time_s: float = 0.0
error: str | None = None
round_eliminated: int = 0 # 0 = still alive / winner
direction_method: str = "" # which direction extraction was used
spectral_cert: str = "" # GREEN/YELLOW/RED/""
@dataclass
@@ -140,6 +161,8 @@ class TourneyResult:
"metrics": c.metrics,
"time_s": c.time_s,
"error": c.error,
"direction_method": c.direction_method,
"spectral_cert": c.spectral_cert,
}
for c in sorted(r.contenders, key=lambda x: x.score, reverse=True)
],
@@ -197,6 +220,8 @@ def _save_checkpoint(
"time_s": c.time_s,
"error": c.error,
"round_eliminated": c.round_eliminated,
"direction_method": c.direction_method,
"spectral_cert": c.spectral_cert,
}
for c in r.contenders
],
@@ -218,6 +243,8 @@ def _save_checkpoint(
"time_s": c.time_s,
"error": c.error,
"round_eliminated": c.round_eliminated,
"direction_method": c.direction_method,
"spectral_cert": c.spectral_cert,
}
for c in completed_methods
],
@@ -286,6 +313,8 @@ def _restore_rounds(checkpoint: dict) -> tuple[TourneyResult, list[Contender], l
time_s=c_data.get("time_s", 0.0),
error=c_data.get("error"),
round_eliminated=c_data.get("round_eliminated", 0),
direction_method=c_data.get("direction_method", ""),
spectral_cert=c_data.get("spectral_cert", ""),
))
result.rounds.append(rnd)
@@ -328,14 +357,14 @@ def render_bracket(result: TourneyResult) -> str:
lines.append(f"## Round {rnd.round_num}: {rnd.name}")
lines.append(f"*{len(rnd.contenders)} contenders, {rnd.prompt_volume} prompt pairs*")
lines.append("")
lines.append("| Rank | Method | Score | Refusal | Coherence | KL Div | Perplexity | Time |")
lines.append("|------|--------|-------|---------|-----------|--------|------------|------|")
lines.append("| Rank | Method | Dir | Score | Refusal | Coherence | KL Div | PPL | Cert | Time |")
lines.append("|------|--------|-----|-------|---------|-----------|--------|-----|------|------|")
sorted_contenders = sorted(rnd.contenders, key=lambda x: x.score, reverse=True)
for i, c in enumerate(sorted_contenders, 1):
if c.error:
lines.append(
f"| {i} | {c.method} | ERROR | — | — | — | — | {c.time_s:.0f}s |"
f"| {i} | {c.method} | — | ERROR | — | — | — | — | — | {c.time_s:.0f}s |"
)
continue
m = c.metrics
@@ -351,9 +380,11 @@ def render_bracket(result: TourneyResult) -> str:
kl_val = m.get('kl_divergence')
kl_str = f"{kl_val:.4f}" if kl_val is not None else ""
pp = f"{m.get('perplexity', 0):.1f}" if m.get('perplexity') is not None else ""
dir_m = c.direction_method or m.get("direction_method", "")
cert = c.spectral_cert or ""
lines.append(
f"| {i} | **{c.method}**{marker} | {c.score:.4f} "
f"| {rr} | {co} | {kl_str} | {pp} | {c.time_s:.0f}s |"
f"| {i} | **{c.method}**{marker} | {dir_m} | {c.score:.4f} "
f"| {rr} | {co} | {kl_str} | {pp} | {cert} | {c.time_s:.0f}s |"
)
lines.append("")
@@ -572,9 +603,12 @@ def render_bracket_html(result: TourneyResult) -> str:
f'<span class="trophy">&#x1F3C6;</span> '
f'<span class="champ-name">{html_mod.escape(w.method)}</span>'
)
dir_m = w.direction_method or ""
cert = w.spectral_cert or ""
header_parts.append(
f'<div class="champ-score">'
f'Score: {w.score:.4f} &nbsp;|&nbsp; Refusal: {rr} &nbsp;|&nbsp; Coherence: {co}'
f'Score: {w.score:.4f} &nbsp;|&nbsp; Refusal: {rr} &nbsp;|&nbsp; '
f'Coherence: {co} &nbsp;|&nbsp; Dir: {html_mod.escape(dir_m)} &nbsp;|&nbsp; Cert: {html_mod.escape(cert)}'
f'</div>'
)
header_parts.append("</div>")
@@ -632,6 +666,11 @@ def render_bracket_html(result: TourneyResult) -> str:
m = c.metrics or {}
metric_spans = []
if not c.error:
dm = c.direction_method or m.get("direction_method", "")
if dm:
metric_spans.append(
f'<span class="metric">dir <span class="val">{html_mod.escape(dm)}</span></span>'
)
rr = m.get("refusal_rate")
if rr is not None:
metric_spans.append(
@@ -642,6 +681,12 @@ def render_bracket_html(result: TourneyResult) -> str:
metric_spans.append(
f'<span class="metric">coh <span class="val">{co:.3f}</span></span>'
)
sc = c.spectral_cert or m.get("spectral_certification", "")
if sc:
cert_color = {"GREEN": "#4ecca3", "YELLOW": "#f0c040", "RED": "#cc4444"}.get(sc, "#777")
metric_spans.append(
f'<span class="metric">cert <span class="val" style="color:{cert_color}">{html_mod.escape(sc)}</span></span>'
)
kl = m.get("kl_divergence")
if kl is not None:
metric_spans.append(
@@ -705,10 +750,12 @@ in elimination rounds.
| Metric | Value |
|--------|-------|
| Composite Score | **{w.score:.4f}** |
| Direction Method | {w.direction_method or 'N/A'} |
| Refusal Rate | {f'{w.metrics["refusal_rate"]:.1%}' if w.metrics.get('refusal_rate') is not None else 'N/A'} |
| Coherence | {f'{w.metrics["coherence"]:.3f}' if w.metrics.get('coherence') is not None else 'N/A'} |
| KL Divergence | {f'{w.metrics["kl_divergence"]:.4f}' if w.metrics.get('kl_divergence') is not None else 'N/A'} |
| Perplexity | {f'{w.metrics["perplexity"]:.1f}' if w.metrics.get('perplexity') is not None else 'N/A'} |
| Spectral Cert | {w.spectral_cert or 'N/A'} |
## How to Use
@@ -866,6 +913,8 @@ class TourneyRunner:
contender.metrics = dict(pipeline._quality_metrics)
contender.score = composite_score(contender.metrics)
contender.output_dir = save_dir
contender.direction_method = getattr(pipeline, "direction_method", "")
contender.spectral_cert = contender.metrics.get("spectral_certification", "") or ""
# Free pipeline to reclaim GPU
del pipeline
@@ -1048,6 +1097,11 @@ class TourneyRunner:
result.winner = winner
result.total_time_s = time.time() - t_start
# Clean up non-winner finalist dirs to free disk
for c in ranked[1:]:
if c.output_dir and Path(c.output_dir).exists():
shutil.rmtree(c.output_dir, ignore_errors=True)
self.log("")
self.log("=" * 60)
if winner:
@@ -1352,6 +1406,11 @@ class TourneyRunner:
result.winner = winner
result.total_time_s = time.time() - t_start
# Clean up non-winner finalist dirs to free disk
for c in ranked[1:]:
if c.output_dir and Path(c.output_dir).exists():
shutil.rmtree(c.output_dir, ignore_errors=True)
self.log("")
self.log("=" * 60)
if winner: