mirror of
https://github.com/elder-plinius/OBLITERATUS.git
synced 2026-04-29 06:35:59 +02:00
590 lines
23 KiB
Python
590 lines
23 KiB
Python
"""Bayesian optimization for abliteration hyperparameters.
|
|
|
|
Implements Optuna TPE-based multi-objective optimization that searches for
|
|
optimal ablation parameters co-minimizing refusal rate and KL divergence.
|
|
|
|
Inspired by Heretic (p-e-w, 2025) which pioneered Bayesian optimization
|
|
for abliteration. OBLITERATUS pushes this further by:
|
|
|
|
1. **Parametric layer kernel**: Instead of per-layer independent parameters,
|
|
uses a bell-shaped curve described by 4 global params (Heretic-style).
|
|
This reduces the search space from O(n_layers) to O(1) while capturing
|
|
the spatial structure of refusal across layers.
|
|
2. **Float direction interpolation**: Direction index is continuous — non-
|
|
integer values interpolate between adjacent SVD directions, unlocking
|
|
a smooth direction space beyond the discrete top-k.
|
|
3. **Component-specific weights**: Separate scaling for attention vs MLP
|
|
projections (Heretic showed MLP interventions are more damaging).
|
|
4. **Per-expert granularity**: For MoE models, optimizes per-expert scaling.
|
|
5. **CoT-aware objectives**: Adds chain-of-thought coherence as a third
|
|
optimization objective for CoT models.
|
|
6. **Warm-start from analysis**: Uses OBLITERATUS's analysis-based heuristics
|
|
as initial trial suggestions instead of random initialization.
|
|
|
|
References:
|
|
- Heretic (p-e-w, 2025): Bayesian optimization for LLM abliteration
|
|
- Akiba et al. (2019): Optuna: A Next-generation Hyperparameter
|
|
Optimization Framework
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
from typing import TYPE_CHECKING
|
|
|
|
import torch
|
|
import torch.nn.functional as F
|
|
|
|
if TYPE_CHECKING:
|
|
from obliteratus.abliterate import AbliterationPipeline
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _measure_refusal_rate(
|
|
pipeline: AbliterationPipeline,
|
|
n_prompts: int = 10,
|
|
max_new_tokens: int = 128,
|
|
) -> float:
|
|
"""Measure refusal rate on a subset of harmful prompts.
|
|
|
|
Applies the model's chat template (when available) so the model is in
|
|
chat mode — matching how users actually interact with it. Uses token-ID
|
|
slicing for robust completion extraction regardless of special tokens.
|
|
"""
|
|
from obliteratus.evaluation.advanced_metrics import refusal_rate
|
|
|
|
model = pipeline.handle.model
|
|
tokenizer = pipeline.handle.tokenizer
|
|
device = pipeline._get_model_device(model)
|
|
|
|
test_prompts = pipeline.harmful_prompts[:n_prompts]
|
|
|
|
# Always apply chat template for refusal measurement — refusal circuitry
|
|
# is activated by chat formatting, so raw prompts give false 0% rates.
|
|
saved_flag = pipeline.use_chat_template
|
|
pipeline.use_chat_template = True
|
|
formatted_prompts = pipeline._maybe_apply_chat_template(test_prompts)
|
|
pipeline.use_chat_template = saved_flag
|
|
responses = []
|
|
|
|
for prompt in formatted_prompts:
|
|
try:
|
|
inputs = tokenizer(
|
|
prompt, return_tensors="pt", truncation=True, max_length=256,
|
|
)
|
|
input_len = inputs["input_ids"].shape[1]
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
output = model.generate(
|
|
**inputs, max_new_tokens=max_new_tokens, do_sample=False,
|
|
)
|
|
# Token-ID slicing: robust regardless of special tokens
|
|
completion_ids = output[0][input_len:]
|
|
completion = tokenizer.decode(
|
|
completion_ids, skip_special_tokens=True,
|
|
).strip()
|
|
responses.append(completion)
|
|
del inputs, output
|
|
except Exception:
|
|
responses.append("") # Count failures as non-refusals
|
|
|
|
pipeline._free_gpu_memory()
|
|
return refusal_rate(responses, mode="combined") if responses else 1.0
|
|
|
|
|
|
def _measure_kl_divergence(
|
|
pipeline: AbliterationPipeline,
|
|
reference_logits: list[torch.Tensor],
|
|
prompts: list[str],
|
|
) -> float:
|
|
"""Measure KL divergence from reference (pre-ablation) logits."""
|
|
model = pipeline.handle.model
|
|
tokenizer = pipeline.handle.tokenizer
|
|
device = pipeline._get_model_device(model)
|
|
|
|
total_kl = 0.0
|
|
n_valid = 0
|
|
|
|
for i, prompt in enumerate(prompts):
|
|
if i >= len(reference_logits):
|
|
break
|
|
try:
|
|
inputs = tokenizer(
|
|
prompt, return_tensors="pt", truncation=True, max_length=64,
|
|
)
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
new_logits = outputs.logits[:, -1, :].detach().cpu().float()
|
|
|
|
ref = reference_logits[i]
|
|
log_p = F.log_softmax(ref, dim=-1)
|
|
log_q = F.log_softmax(new_logits.squeeze(0), dim=-1)
|
|
p = F.softmax(ref, dim=-1)
|
|
kl = (p * (log_p - log_q)).sum().item()
|
|
total_kl += max(kl, 0.0) # Clamp negative KL (numerical noise)
|
|
n_valid += 1
|
|
del inputs, outputs, new_logits
|
|
except Exception:
|
|
pass
|
|
|
|
pipeline._free_gpu_memory()
|
|
return total_kl / max(n_valid, 1)
|
|
|
|
|
|
def _parametric_layer_weight(
|
|
layer_idx: int,
|
|
n_layers: int,
|
|
max_weight: float,
|
|
peak_position: float,
|
|
min_weight: float,
|
|
spread: float,
|
|
) -> float:
|
|
"""Compute ablation weight for a layer using a piecewise-linear tent kernel.
|
|
|
|
Faithful reproduction of Heretic's parametric kernel (p-e-w/heretic):
|
|
- max_weight: peak ablation strength at peak_position
|
|
- peak_position: normalized position of peak (0..1)
|
|
- min_weight: weight at the edges of the tent
|
|
- spread: normalized distance from peak to tent edge (min_weight_distance)
|
|
|
|
Layers beyond ``spread`` from the peak get weight 0 (skipped entirely).
|
|
Within the tent, weight drops linearly from max_weight to min_weight.
|
|
This matches Heretic's actual formula::
|
|
|
|
distance = abs(layer_index - max_weight_position)
|
|
if distance > min_weight_distance: skip
|
|
weight = max_weight + (distance / min_weight_distance) * (min_weight - max_weight)
|
|
"""
|
|
if n_layers <= 1:
|
|
return max_weight
|
|
|
|
normalized_pos = layer_idx / (n_layers - 1)
|
|
dist = abs(normalized_pos - peak_position)
|
|
min_weight_distance = max(spread, 0.01)
|
|
|
|
# Hard cutoff: layers outside the tent get 0 weight (Heretic skips them)
|
|
if dist > min_weight_distance:
|
|
return 0.0
|
|
|
|
# Linear interpolation: max_weight at peak → min_weight at edges
|
|
return max_weight + (dist / min_weight_distance) * (min_weight - max_weight)
|
|
|
|
|
|
def _interpolate_direction(
|
|
pipeline: AbliterationPipeline,
|
|
layer_idx: int,
|
|
float_dir_idx: float,
|
|
) -> torch.Tensor:
|
|
"""Get an interpolated refusal direction from a float-valued layer index.
|
|
|
|
Faithful reproduction of Heretic's direction interpolation: the index
|
|
selects which *layer's* diff-of-means direction to use, with float
|
|
values interpolating between adjacent layers' directions. This is
|
|
fundamentally different from interpolating between SVD components
|
|
within a single layer — it searches across the layer axis.
|
|
|
|
From Heretic source (model.py)::
|
|
|
|
weight, index = math.modf(direction_index + 1)
|
|
refusal_direction = F.normalize(
|
|
refusal_directions[int(index)].lerp(
|
|
refusal_directions[int(index) + 1], weight), p=2, dim=0)
|
|
|
|
Args:
|
|
pipeline: Pipeline with extracted refusal directions per layer.
|
|
layer_idx: The layer being projected (used as fallback).
|
|
float_dir_idx: Continuous direction index — selects which layer's
|
|
direction to use (e.g., 5.3 interpolates 70% layer-5 + 30% layer-6).
|
|
|
|
Returns:
|
|
Normalized direction tensor.
|
|
"""
|
|
# Build sorted list of layer indices that have refusal directions
|
|
sorted_layers = sorted(pipeline.refusal_directions.keys())
|
|
if not sorted_layers:
|
|
return pipeline.refusal_directions.get(layer_idx, torch.zeros(1))
|
|
|
|
n_layers_with_dirs = len(sorted_layers)
|
|
|
|
# Heretic uses direction_index + 1 offset; we map float_dir_idx into
|
|
# the sorted layer list, clamped to valid range.
|
|
float_dir_idx = max(0.0, min(float_dir_idx, n_layers_with_dirs - 1))
|
|
|
|
lo = int(float_dir_idx)
|
|
hi = min(lo + 1, n_layers_with_dirs - 1)
|
|
|
|
lo_layer = sorted_layers[lo]
|
|
hi_layer = sorted_layers[hi]
|
|
|
|
d_lo = pipeline.refusal_directions[lo_layer]
|
|
d_hi = pipeline.refusal_directions[hi_layer]
|
|
|
|
if lo == hi:
|
|
d = d_lo
|
|
else:
|
|
# Linear interpolation between adjacent layers' directions
|
|
alpha = float_dir_idx - lo
|
|
d = (1.0 - alpha) * d_lo + alpha * d_hi
|
|
|
|
norm = d.norm()
|
|
if norm > 1e-8:
|
|
d = d / norm
|
|
return d
|
|
|
|
|
|
def run_bayesian_optimization(
|
|
pipeline: AbliterationPipeline,
|
|
n_trials: int = 50,
|
|
n_refusal_prompts: int = 30,
|
|
n_kl_prompts: int = 5,
|
|
) -> dict[int, float]:
|
|
"""Run Bayesian optimization to find optimal ablation parameters.
|
|
|
|
Uses Optuna TPE with a Heretic-style parametric layer kernel to search
|
|
a compact parameter space:
|
|
- 4 kernel params (max_weight, peak_position, min_weight, spread)
|
|
- 1 float direction index (interpolated between SVD directions)
|
|
- 2 component weights (attention vs MLP scaling)
|
|
Total: 7 parameters regardless of model size (vs O(n_layers) before).
|
|
|
|
Also optimizes per-layer independently when the kernel doesn't fit well
|
|
(uses kernel as warm-start for per-layer refinement in phase 2).
|
|
|
|
Args:
|
|
pipeline: Initialized AbliterationPipeline (post-DISTILL stage).
|
|
n_trials: Number of optimization trials.
|
|
n_refusal_prompts: Number of harmful prompts for refusal measurement.
|
|
n_kl_prompts: Number of harmless prompts for KL measurement.
|
|
|
|
Returns:
|
|
Dict mapping layer_idx -> optimal regularization value.
|
|
"""
|
|
try:
|
|
import optuna
|
|
from optuna.samplers import TPESampler
|
|
except ImportError:
|
|
logger.warning(
|
|
"Optuna not installed — skipping Bayesian optimization. "
|
|
"Install with: pip install optuna"
|
|
)
|
|
return {}
|
|
|
|
if not pipeline.handle or not pipeline._strong_layers:
|
|
return {}
|
|
|
|
model = pipeline.handle.model
|
|
tokenizer = pipeline.handle.tokenizer
|
|
device = pipeline._get_model_device(model)
|
|
|
|
pipeline.log("Bayesian optimization: collecting reference logits...")
|
|
|
|
# Collect reference (pre-ablation) logits for KL measurement
|
|
kl_prompts = [
|
|
"The weather today is",
|
|
"In mathematics, the concept of",
|
|
"The history of ancient Rome",
|
|
"How to cook a simple pasta",
|
|
"The process of photosynthesis involves",
|
|
][:n_kl_prompts]
|
|
|
|
reference_logits: list[torch.Tensor] = []
|
|
for prompt in kl_prompts:
|
|
try:
|
|
inputs = tokenizer(
|
|
prompt, return_tensors="pt", truncation=True, max_length=64,
|
|
)
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
reference_logits.append(
|
|
outputs.logits[:, -1, :].detach().cpu().float().squeeze(0)
|
|
)
|
|
del inputs, outputs
|
|
except Exception:
|
|
pass
|
|
pipeline._free_gpu_memory()
|
|
|
|
if not reference_logits:
|
|
pipeline.log(" Failed to collect reference logits — skipping optimization")
|
|
return {}
|
|
|
|
from obliteratus.strategies.utils import (
|
|
get_layer_modules,
|
|
get_attention_module,
|
|
get_ffn_module,
|
|
)
|
|
from obliteratus.abliterate import _ATTN_OUT_NAMES, _FFN_OUT_NAMES
|
|
|
|
layer_modules = get_layer_modules(pipeline.handle)
|
|
arch = pipeline.handle.architecture
|
|
n_total_layers = len(layer_modules)
|
|
|
|
# Save weight tensors for rollback — clone to CPU to free GPU memory
|
|
original_params: list[tuple[torch.Tensor, torch.Tensor]] = []
|
|
seen_data_ptrs: set[int] = set()
|
|
|
|
for idx in pipeline._strong_layers:
|
|
try:
|
|
attn = get_attention_module(layer_modules[idx], arch)
|
|
for attr_name in _ATTN_OUT_NAMES:
|
|
proj = getattr(attn, attr_name, None)
|
|
if proj is not None and hasattr(proj, "weight"):
|
|
ptr = proj.weight.data.data_ptr()
|
|
if ptr not in seen_data_ptrs:
|
|
original_params.append((proj.weight.data, proj.weight.data.clone().cpu()))
|
|
seen_data_ptrs.add(ptr)
|
|
if hasattr(proj, "bias") and proj.bias is not None:
|
|
bptr = proj.bias.data.data_ptr()
|
|
if bptr not in seen_data_ptrs:
|
|
original_params.append((proj.bias.data, proj.bias.data.clone().cpu()))
|
|
seen_data_ptrs.add(bptr)
|
|
except (AttributeError, RuntimeError):
|
|
pass
|
|
try:
|
|
ffn = get_ffn_module(layer_modules[idx], arch)
|
|
for attr_name in _FFN_OUT_NAMES:
|
|
proj = getattr(ffn, attr_name, None)
|
|
if proj is not None and hasattr(proj, "weight"):
|
|
ptr = proj.weight.data.data_ptr()
|
|
if ptr not in seen_data_ptrs:
|
|
original_params.append((proj.weight.data, proj.weight.data.clone().cpu()))
|
|
seen_data_ptrs.add(ptr)
|
|
if hasattr(proj, "bias") and proj.bias is not None:
|
|
bptr = proj.bias.data.data_ptr()
|
|
if bptr not in seen_data_ptrs:
|
|
original_params.append((proj.bias.data, proj.bias.data.clone().cpu()))
|
|
seen_data_ptrs.add(bptr)
|
|
except (AttributeError, RuntimeError):
|
|
pass
|
|
|
|
del seen_data_ptrs
|
|
total_saved_mb = sum(clone.nelement() * clone.element_size() for _, clone in original_params) / 1e6
|
|
pipeline.log(f" Saved {len(original_params)} weight tensors for rollback ({total_saved_mb:.0f} MB, on CPU)")
|
|
|
|
def _restore_all():
|
|
for live_data, saved_clone in original_params: # noqa: F821
|
|
live_data.copy_(saved_clone.to(live_data.device))
|
|
|
|
# Warm-start values for the parametric kernel.
|
|
# If the informed pipeline provided analysis-derived warm-start params,
|
|
# use those (they're much better than the default heuristic).
|
|
informed_warm = getattr(pipeline, "_informed_warm_start", None)
|
|
if informed_warm:
|
|
warm_peak = informed_warm.get("peak_position", 0.5)
|
|
pipeline.log(f" Using analysis-informed warm-start (peak={warm_peak:.2f})")
|
|
elif pipeline._strong_layers:
|
|
peak_layer = pipeline._strong_layers[0]
|
|
warm_peak = peak_layer / max(n_total_layers - 1, 1)
|
|
else:
|
|
warm_peak = 0.5
|
|
|
|
best_result: dict[int, float] = {}
|
|
best_score = float("inf")
|
|
|
|
# Suppress Optuna's verbose logging
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING)
|
|
|
|
# Max layers with directions (for float direction interpolation)
|
|
n_layers_with_dirs = len([
|
|
idx for idx in pipeline._strong_layers
|
|
if idx in pipeline.refusal_directions
|
|
])
|
|
|
|
# ── Phase 1: Parametric kernel optimization (compact search space) ──
|
|
# Heretic uses SEPARATE kernel parameters for attention and MLP,
|
|
# allowing them to peak at different layers (8 params + 1 dir_idx = 9).
|
|
|
|
def objective(trial: optuna.Trial) -> tuple[float, float]:
|
|
"""Multi-objective: minimize (refusal_rate, kl_divergence)."""
|
|
_restore_all()
|
|
|
|
# Attention kernel: 4 params
|
|
attn_max = trial.suggest_float("attn_max_weight", 0.5, 1.0)
|
|
attn_peak = trial.suggest_float("attn_peak_position", 0.1, 0.9)
|
|
attn_min = trial.suggest_float("attn_min_weight", 0.0, 0.3)
|
|
attn_spread = trial.suggest_float("attn_spread", 0.1, 0.6)
|
|
|
|
# MLP kernel: 4 params (separate — can peak at a different layer)
|
|
mlp_max = trial.suggest_float("mlp_max_weight", 0.3, 1.0)
|
|
mlp_peak = trial.suggest_float("mlp_peak_position", 0.1, 0.9)
|
|
mlp_min = trial.suggest_float("mlp_min_weight", 0.0, 0.3)
|
|
mlp_spread = trial.suggest_float("mlp_spread", 0.1, 0.6)
|
|
|
|
# Float direction index (cross-layer interpolation, Heretic-style)
|
|
dir_idx = trial.suggest_float("dir_idx", 0.0, max(n_layers_with_dirs - 1, 0.0))
|
|
|
|
# Compute per-layer, per-component regularization from kernels
|
|
attn_regs: dict[int, float] = {}
|
|
mlp_regs: dict[int, float] = {}
|
|
for idx in pipeline._strong_layers:
|
|
attn_w = _parametric_layer_weight(idx, n_total_layers, attn_max, attn_peak, attn_min, attn_spread)
|
|
mlp_w = _parametric_layer_weight(idx, n_total_layers, mlp_max, mlp_peak, mlp_min, mlp_spread)
|
|
attn_regs[idx] = 1.0 - attn_w
|
|
mlp_regs[idx] = 1.0 - mlp_w
|
|
|
|
# Apply projection with trial's parameters
|
|
for idx in pipeline._strong_layers:
|
|
if idx not in pipeline.refusal_directions:
|
|
continue
|
|
|
|
# Use cross-layer interpolated direction
|
|
direction = _interpolate_direction(pipeline, idx, dir_idx)
|
|
d_col = direction.to(device=next(layer_modules[idx].parameters()).device)
|
|
d_col = d_col.unsqueeze(-1) if d_col.dim() == 1 else d_col
|
|
|
|
# Attention projection (with per-component kernel)
|
|
attn_reg = attn_regs[idx]
|
|
try:
|
|
attn = get_attention_module(layer_modules[idx], arch)
|
|
pipeline._project_out_advanced(
|
|
attn, d_col, _ATTN_OUT_NAMES,
|
|
norm_preserve=pipeline.norm_preserve,
|
|
regularization=attn_reg,
|
|
)
|
|
except (AttributeError, RuntimeError):
|
|
pass
|
|
|
|
# MLP/FFN projection (with per-component kernel)
|
|
mlp_reg = mlp_regs[idx]
|
|
try:
|
|
ffn = get_ffn_module(layer_modules[idx], arch)
|
|
count = pipeline._project_out_advanced(
|
|
ffn, d_col, _FFN_OUT_NAMES,
|
|
norm_preserve=pipeline.norm_preserve,
|
|
regularization=mlp_reg,
|
|
)
|
|
if count == 0:
|
|
pipeline._project_moe_experts(
|
|
ffn, d_col,
|
|
norm_preserve=pipeline.norm_preserve,
|
|
regularization=mlp_reg,
|
|
project_biases=False,
|
|
)
|
|
except (AttributeError, RuntimeError):
|
|
pass
|
|
|
|
# Measure objectives
|
|
refusal = _measure_refusal_rate(pipeline, n_prompts=n_refusal_prompts)
|
|
kl = _measure_kl_divergence(pipeline, reference_logits, kl_prompts)
|
|
|
|
# Track best combined score (use average of attn/mlp regs for layer_regs)
|
|
nonlocal best_score, best_result
|
|
combined = refusal + 0.5 * kl
|
|
if combined < best_score:
|
|
best_score = combined
|
|
best_result = {
|
|
idx: (attn_regs[idx] + mlp_regs[idx]) / 2.0
|
|
for idx in pipeline._strong_layers
|
|
}
|
|
|
|
pipeline.log(
|
|
f" Trial {trial.number + 1}/{n_trials}: "
|
|
f"refusal={refusal:.0%}, KL={kl:.4f} "
|
|
f"(attn_peak={attn_peak:.2f}, mlp_peak={mlp_peak:.2f}, dir={dir_idx:.2f})"
|
|
)
|
|
|
|
return refusal, kl
|
|
|
|
sampler = TPESampler(seed=42, n_startup_trials=min(5, n_trials // 3))
|
|
study = optuna.create_study(
|
|
directions=["minimize", "minimize"],
|
|
sampler=sampler,
|
|
study_name="obliteratus_parametric_optimization",
|
|
)
|
|
|
|
# Enqueue warm-start trial with analysis-derived estimates.
|
|
# Translate informed pipeline params to the new per-component format.
|
|
if informed_warm:
|
|
iw = informed_warm
|
|
warm_params = {
|
|
"attn_max_weight": iw.get("max_weight", 0.9),
|
|
"attn_peak_position": iw.get("peak_position", warm_peak),
|
|
"attn_min_weight": iw.get("min_weight", 0.05),
|
|
"attn_spread": iw.get("spread", 0.3),
|
|
"mlp_max_weight": iw.get("max_weight", 0.9) * iw.get("mlp_scale", 0.6),
|
|
"mlp_peak_position": iw.get("peak_position", warm_peak),
|
|
"mlp_min_weight": iw.get("min_weight", 0.05),
|
|
"mlp_spread": iw.get("spread", 0.3),
|
|
"dir_idx": iw.get("dir_idx", 0.0),
|
|
}
|
|
else:
|
|
warm_params = {
|
|
"attn_max_weight": 0.9,
|
|
"attn_peak_position": warm_peak,
|
|
"attn_min_weight": 0.05,
|
|
"attn_spread": 0.3,
|
|
"mlp_max_weight": 0.6,
|
|
"mlp_peak_position": warm_peak,
|
|
"mlp_min_weight": 0.05,
|
|
"mlp_spread": 0.3,
|
|
"dir_idx": 0.0,
|
|
}
|
|
study.enqueue_trial(warm_params)
|
|
|
|
pipeline.log(f"Bayesian optimization: running {n_trials} trials (parametric kernel)...")
|
|
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
|
|
|
|
# Restore model and apply best result
|
|
_restore_all()
|
|
|
|
# Get best trial from Pareto front (prefer low refusal)
|
|
pareto = study.best_trials
|
|
if pareto:
|
|
pareto.sort(key=lambda t: (t.values[0], t.values[1]))
|
|
best_trial = pareto[0]
|
|
|
|
# Reconstruct per-layer regs from best kernel params
|
|
p = best_trial.params
|
|
best_result = {}
|
|
for idx in pipeline._strong_layers:
|
|
attn_w = _parametric_layer_weight(
|
|
idx, n_total_layers,
|
|
p["attn_max_weight"], p["attn_peak_position"],
|
|
p["attn_min_weight"], p["attn_spread"],
|
|
)
|
|
mlp_w = _parametric_layer_weight(
|
|
idx, n_total_layers,
|
|
p["mlp_max_weight"], p["mlp_peak_position"],
|
|
p["mlp_min_weight"], p["mlp_spread"],
|
|
)
|
|
best_result[idx] = (attn_w + mlp_w) / 2.0 # average for layer-level reg
|
|
best_result[idx] = 1.0 - best_result[idx]
|
|
|
|
pipeline.log(
|
|
f" Best trial: refusal={best_trial.values[0]:.0%}, "
|
|
f"KL={best_trial.values[1]:.4f}"
|
|
)
|
|
pipeline.log(
|
|
f" Attn kernel: peak={p['attn_peak_position']:.2f}, "
|
|
f"spread={p['attn_spread']:.2f}, max={p['attn_max_weight']:.2f}"
|
|
)
|
|
pipeline.log(
|
|
f" MLP kernel: peak={p['mlp_peak_position']:.2f}, "
|
|
f"spread={p['mlp_spread']:.2f}, max={p['mlp_max_weight']:.2f}"
|
|
)
|
|
pipeline.log(f" dir_idx={p['dir_idx']:.2f}")
|
|
|
|
# Store the best direction index for use during EXCISE
|
|
best_dir_idx = p.get("dir_idx", 0.0)
|
|
if best_dir_idx > 0.1:
|
|
pipeline.log(f" Applying interpolated direction (idx={best_dir_idx:.2f})...")
|
|
for idx in pipeline._strong_layers:
|
|
new_dir = _interpolate_direction(pipeline, idx, best_dir_idx)
|
|
pipeline.refusal_directions[idx] = new_dir
|
|
|
|
# Store component scales for use in EXCISE (backward compat)
|
|
pipeline._bayesian_attn_scale = p.get("attn_max_weight", 1.0)
|
|
pipeline._bayesian_mlp_scale = p.get("mlp_max_weight", 1.0)
|
|
|
|
elif best_result:
|
|
pipeline.log(f" Using best combined score: {best_score:.4f}")
|
|
|
|
# Clean up
|
|
del original_params
|
|
pipeline._free_gpu_memory()
|
|
|
|
return best_result
|