mirror of
https://github.com/elder-plinius/OBLITERATUS.git
synced 2026-04-23 19:56:15 +02:00
511 lines
21 KiB
Python
511 lines
21 KiB
Python
"""Edge-case and robustness tests.
|
|
|
|
Tests for NaN/Inf handling, empty inputs, extreme dimensions,
|
|
and other boundary conditions that the main test suite doesn't cover.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
|
|
import pytest
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
|
|
from obliteratus.analysis.cross_layer import CrossLayerAlignmentAnalyzer
|
|
from obliteratus.analysis.concept_geometry import ConceptConeAnalyzer
|
|
from obliteratus.analysis.alignment_imprint import AlignmentImprintDetector
|
|
from obliteratus.analysis.multi_token_position import MultiTokenPositionAnalyzer
|
|
from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon
|
|
from obliteratus.analysis.causal_tracing import CausalRefusalTracer
|
|
from obliteratus.analysis.residual_stream import ResidualStreamDecomposer
|
|
from obliteratus.analysis.probing_classifiers import LinearRefusalProbe
|
|
from obliteratus.analysis.cross_model_transfer import TransferAnalyzer
|
|
from obliteratus.evaluation.advanced_metrics import (
|
|
refusal_rate,
|
|
effective_rank,
|
|
activation_cosine_similarity,
|
|
)
|
|
from obliteratus.analysis.steering_vectors import (
|
|
SteeringVectorFactory,
|
|
SteeringHookManager,
|
|
SteeringConfig,
|
|
SteeringResult,
|
|
compute_steering_effectiveness,
|
|
format_steering_report,
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# NaN / Inf handling
|
|
# ===========================================================================
|
|
|
|
class TestNaNInfHandling:
|
|
"""Test that modules handle degenerate inputs gracefully."""
|
|
|
|
def test_whitened_svd_nan_activations(self):
|
|
"""WhitenedSVD with NaN — currently raises; documenting behavior."""
|
|
harmful = [torch.tensor([float("nan"), 1.0, 2.0]) for _ in range(5)]
|
|
harmless = [torch.randn(3) for _ in range(5)]
|
|
extractor = WhitenedSVDExtractor()
|
|
# NaN propagation through SVD is expected to produce NaN results
|
|
# This documents the current behavior — ideally would guard against it
|
|
raised = False
|
|
result = None
|
|
try:
|
|
result = extractor.extract(harmful, harmless)
|
|
except (RuntimeError, ValueError):
|
|
raised = True
|
|
# Either it raised an exception (acceptable) or returned a result with NaNs
|
|
assert raised or result is not None, (
|
|
"Should either raise on NaN input or return a result"
|
|
)
|
|
|
|
def test_whitened_svd_zero_activations(self):
|
|
"""WhitenedSVD with all-zero activations."""
|
|
harmful = [torch.zeros(8) for _ in range(5)]
|
|
harmless = [torch.zeros(8) for _ in range(5)]
|
|
extractor = WhitenedSVDExtractor()
|
|
result = extractor.extract(harmful, harmless)
|
|
# Should return a valid result without crashing
|
|
assert result is not None
|
|
assert result.directions is not None
|
|
assert result.singular_values is not None
|
|
|
|
def test_concept_cone_nan_direction(self):
|
|
"""ConceptConeAnalyzer with NaN in activations — documenting behavior."""
|
|
harmful = [torch.randn(16) for _ in range(10)]
|
|
harmless = [torch.randn(16) for _ in range(10)]
|
|
# Poison one activation
|
|
harmful[3] = torch.full((16,), float("nan"))
|
|
cat_map = {i: f"cat_{i % 3}" for i in range(10)}
|
|
analyzer = ConceptConeAnalyzer(category_map=cat_map)
|
|
raised = False
|
|
result = None
|
|
try:
|
|
result = analyzer.analyze_layer(harmful, harmless)
|
|
except (RuntimeError, ValueError):
|
|
raised = True
|
|
# Either it raised an exception (acceptable) or returned a result
|
|
assert raised or result is not None, (
|
|
"Should either raise on NaN input or return a result"
|
|
)
|
|
|
|
def test_sparse_surgery_zero_direction(self):
|
|
"""Sparse surgery with zero refusal direction."""
|
|
W = torch.randn(32, 16)
|
|
zero_dir = torch.zeros(16)
|
|
surgeon = SparseDirectionSurgeon()
|
|
result = surgeon.analyze_weight_matrix(W, zero_dir)
|
|
assert result.mean_projection == 0.0
|
|
|
|
def test_sparse_surgery_zero_weight(self):
|
|
"""Sparse surgery with zero weight matrix."""
|
|
W = torch.zeros(32, 16)
|
|
ref_dir = torch.randn(16)
|
|
surgeon = SparseDirectionSurgeon()
|
|
result = surgeon.analyze_weight_matrix(W, ref_dir)
|
|
assert result.max_projection < 1e-6
|
|
|
|
def test_effective_rank_nan_matrix(self):
|
|
"""effective_rank should handle matrix with NaN."""
|
|
W = torch.randn(10, 10)
|
|
W[0, 0] = float("nan")
|
|
# Should either return a value or raise cleanly
|
|
try:
|
|
result = effective_rank(torch.nan_to_num(W))
|
|
assert math.isfinite(result)
|
|
except Exception:
|
|
pass # Raising is acceptable for NaN input
|
|
|
|
def test_cosine_similarity_zero_vectors(self):
|
|
"""Cosine similarity between zero vectors."""
|
|
a = torch.zeros(32)
|
|
b = torch.zeros(32)
|
|
result = activation_cosine_similarity(a, b)
|
|
# Should be 0 or NaN, not crash
|
|
assert math.isfinite(result) or math.isnan(result)
|
|
|
|
def test_transfer_analyzer_nan_directions(self):
|
|
"""Transfer analyzer with NaN directions."""
|
|
dirs_a = {0: torch.randn(16), 1: torch.tensor([float("nan")] * 16)}
|
|
dirs_b = {0: torch.randn(16), 1: torch.randn(16)}
|
|
analyzer = TransferAnalyzer()
|
|
# Should not crash
|
|
result = analyzer.analyze_cross_model(dirs_a, dirs_b)
|
|
assert result is not None
|
|
assert isinstance(result.mean_transfer_score, float)
|
|
assert result.per_layer_transfer is not None
|
|
|
|
|
|
# ===========================================================================
|
|
# Empty inputs
|
|
# ===========================================================================
|
|
|
|
class TestEmptyInputs:
|
|
"""Test graceful handling of empty or minimal inputs."""
|
|
|
|
def test_cross_layer_empty_directions(self):
|
|
analyzer = CrossLayerAlignmentAnalyzer()
|
|
result = analyzer.analyze({})
|
|
assert result.direction_persistence_score == 0.0
|
|
|
|
def test_alignment_imprint_single_layer(self):
|
|
"""Single layer should still return a result."""
|
|
detector = AlignmentImprintDetector()
|
|
dirs = {0: torch.randn(32)}
|
|
result = detector.detect_imprint(dirs)
|
|
assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown")
|
|
|
|
def test_multi_token_single_position(self):
|
|
"""Single-position sequence."""
|
|
ref_dir = torch.randn(16)
|
|
acts = torch.randn(1, 16)
|
|
analyzer = MultiTokenPositionAnalyzer()
|
|
result = analyzer.analyze_prompt(acts, ref_dir)
|
|
assert result.n_tokens == 1
|
|
assert result.peak_position == 0
|
|
|
|
def test_probing_minimal_data(self):
|
|
"""Probing with very few samples."""
|
|
harmful = [torch.randn(8) for _ in range(3)]
|
|
harmless = [torch.randn(8) for _ in range(3)]
|
|
probe = LinearRefusalProbe(n_epochs=10)
|
|
result = probe.probe_layer(harmful, harmless)
|
|
assert 0 <= result.accuracy <= 1.0
|
|
|
|
def test_residual_stream_single_layer(self):
|
|
acts = {0: torch.randn(32)}
|
|
ref_dir = torch.randn(32)
|
|
decomposer = ResidualStreamDecomposer()
|
|
result = decomposer.decompose(acts, ref_dir)
|
|
assert result.n_layers == 1
|
|
|
|
def test_causal_tracing_single_layer(self):
|
|
acts = {0: torch.randn(32)}
|
|
ref_dirs = {0: torch.randn(32)}
|
|
tracer = CausalRefusalTracer()
|
|
result = tracer.trace_from_activations(acts, ref_dirs)
|
|
assert result.n_layers == 1
|
|
|
|
def test_transfer_no_common_layers(self):
|
|
"""Cross-model with no overlapping layer indices."""
|
|
dirs_a = {0: torch.randn(16), 1: torch.randn(16)}
|
|
dirs_b = {2: torch.randn(16), 3: torch.randn(16)}
|
|
analyzer = TransferAnalyzer()
|
|
result = analyzer.analyze_cross_model(dirs_a, dirs_b)
|
|
assert result.mean_transfer_score == 0.0
|
|
|
|
def test_refusal_rate_empty_list(self):
|
|
result = refusal_rate([])
|
|
assert result == 0.0
|
|
|
|
def test_refusal_rate_single_response(self):
|
|
result = refusal_rate(["I cannot help with that."])
|
|
assert result == 1.0
|
|
|
|
|
|
# ===========================================================================
|
|
# Extreme dimensions
|
|
# ===========================================================================
|
|
|
|
class TestExtremeDimensions:
|
|
"""Test with unusually large or small dimensions."""
|
|
|
|
def test_high_dimensional_directions(self):
|
|
"""Test with realistic hidden dimension (4096)."""
|
|
hidden_dim = 4096
|
|
torch.manual_seed(42)
|
|
dirs = {i: torch.randn(hidden_dim) for i in range(8)}
|
|
analyzer = TransferAnalyzer()
|
|
result = analyzer.analyze_cross_layer(dirs)
|
|
assert result.mean_adjacent_transfer >= 0
|
|
|
|
def test_high_dim_sparse_surgery(self):
|
|
"""Sparse surgery with large weight matrix."""
|
|
W = torch.randn(2048, 1024)
|
|
ref_dir = torch.randn(1024)
|
|
surgeon = SparseDirectionSurgeon(sparsity=0.05)
|
|
result = surgeon.analyze_weight_matrix(W, ref_dir)
|
|
assert result.n_rows_modified == int(0.05 * 2048)
|
|
|
|
def test_single_dimension(self):
|
|
"""1D hidden dimension edge case."""
|
|
dirs = {i: torch.randn(1) for i in range(4)}
|
|
analyzer = TransferAnalyzer()
|
|
result = analyzer.analyze_cross_layer(dirs)
|
|
# All 1D directions are parallel or anti-parallel, so cosine is always 1.0
|
|
assert result.mean_adjacent_transfer >= 0.99
|
|
|
|
def test_many_layers_imprint(self):
|
|
"""Alignment imprint with many layers (128)."""
|
|
dirs = {i: torch.randn(32) for i in range(128)}
|
|
detector = AlignmentImprintDetector()
|
|
result = detector.detect_imprint(dirs)
|
|
total = (result.dpo_probability + result.rlhf_probability +
|
|
result.cai_probability + result.sft_probability)
|
|
assert abs(total - 1.0) < 0.01
|
|
|
|
@pytest.mark.parametrize("n_prompts", [1, 2, 5, 50, 100])
|
|
def test_concept_cone_varying_prompt_counts(self, n_prompts):
|
|
"""Concept cone with varying numbers of prompts."""
|
|
harmful = [torch.randn(16) for _ in range(n_prompts)]
|
|
harmless = [torch.randn(16) for _ in range(n_prompts)]
|
|
cat_map = {i: f"cat_{i % 3}" for i in range(n_prompts)}
|
|
analyzer = ConceptConeAnalyzer(category_map=cat_map, min_category_size=1)
|
|
result = analyzer.analyze_layer(harmful, harmless)
|
|
# Should produce a valid result regardless of prompt count
|
|
assert result is not None
|
|
assert result.general_direction is not None
|
|
assert result.category_count >= 1
|
|
|
|
@pytest.mark.parametrize("seq_len", [1, 2, 10, 100, 500])
|
|
def test_multi_token_varying_lengths(self, seq_len):
|
|
"""Multi-token analysis with varying sequence lengths."""
|
|
ref_dir = torch.randn(16)
|
|
acts = torch.randn(seq_len, 16)
|
|
analyzer = MultiTokenPositionAnalyzer()
|
|
result = analyzer.analyze_prompt(acts, ref_dir)
|
|
assert result.n_tokens == seq_len
|
|
|
|
@pytest.mark.parametrize("hidden_dim", [2, 4, 16, 64, 256, 1024])
|
|
def test_probing_varying_dimensions(self, hidden_dim):
|
|
"""Probing classifier across different hidden dims (min 2 for SGD)."""
|
|
harmful = [torch.randn(hidden_dim) * 2 + 1 for _ in range(10)]
|
|
harmless = [torch.randn(hidden_dim) * 2 - 1 for _ in range(10)]
|
|
probe = LinearRefusalProbe(n_epochs=20)
|
|
result = probe.probe_layer(harmful, harmless)
|
|
assert 0 <= result.accuracy <= 1.0
|
|
|
|
|
|
# ===========================================================================
|
|
# Refusal rate edge cases
|
|
# ===========================================================================
|
|
|
|
class TestRefusalRateEdgeCases:
|
|
"""Edge cases for refusal rate detection."""
|
|
|
|
def test_novel_refusal_phrasing(self):
|
|
"""Test that unusual refusal phrasings might be missed (known limitation)."""
|
|
novel_refusals = [
|
|
"That request falls outside my operational parameters.",
|
|
"I'm designed to be helpful but this conflicts with my guidelines.",
|
|
]
|
|
# These should ideally be detected but string matching may miss them
|
|
rate = refusal_rate(novel_refusals)
|
|
# This tests the known limitation — rate may be 0 (string matching misses novel phrasings)
|
|
assert 0 <= rate <= 1.0
|
|
|
|
def test_false_positive_sorry(self):
|
|
"""'I'm sorry to hear that' followed by helpful content."""
|
|
responses = [
|
|
"I'm sorry to hear that. Here's how you can solve the problem: first...",
|
|
]
|
|
rate = refusal_rate(responses, mode="prefix")
|
|
# Known limitation: prefix matching produces false positives here
|
|
# Just verify it returns a valid float
|
|
assert 0 <= rate <= 1.0
|
|
|
|
def test_case_insensitivity(self):
|
|
responses = ["I CANNOT ASSIST WITH THAT REQUEST."]
|
|
rate = refusal_rate(responses, mode="substring")
|
|
assert rate == 1.0
|
|
|
|
def test_whitespace_handling(self):
|
|
responses = [" I'm sorry, but I can't help with that. "]
|
|
rate = refusal_rate(responses, mode="prefix")
|
|
# Should handle leading whitespace
|
|
assert 0 <= rate <= 1.0
|
|
|
|
|
|
# ===========================================================================
|
|
# Steering Vectors
|
|
# ===========================================================================
|
|
|
|
class TestSteeringVectors:
|
|
"""Tests for the steering vector inference-time intervention system."""
|
|
|
|
def test_from_refusal_direction(self):
|
|
d = torch.randn(32)
|
|
vec = SteeringVectorFactory.from_refusal_direction(d, source_layer=5)
|
|
assert vec.label == "refusal"
|
|
assert vec.source_layer == 5
|
|
assert vec.default_alpha == -1.0
|
|
assert abs(vec.direction.norm().item() - 1.0) < 0.01
|
|
|
|
def test_from_contrastive_pairs(self):
|
|
pos = [torch.randn(16) + 2 for _ in range(10)]
|
|
neg = [torch.randn(16) - 2 for _ in range(10)]
|
|
vec = SteeringVectorFactory.from_contrastive_pairs(pos, neg, label="test")
|
|
assert vec.label == "test"
|
|
assert abs(vec.direction.norm().item() - 1.0) < 0.01
|
|
assert "n_positive" in vec.metadata
|
|
|
|
def test_combine_vectors(self):
|
|
v1 = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
|
|
v2 = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
|
|
combined = SteeringVectorFactory.combine([v1, v2], label="merged")
|
|
assert combined.label == "merged"
|
|
assert abs(combined.direction.norm().item() - 1.0) < 0.01
|
|
|
|
def test_combine_single(self):
|
|
v = SteeringVectorFactory.from_refusal_direction(torch.randn(16))
|
|
combined = SteeringVectorFactory.combine([v])
|
|
assert abs(combined.direction.norm().item() - 1.0) < 0.01
|
|
|
|
def test_combine_empty_raises(self):
|
|
with pytest.raises(ValueError):
|
|
SteeringVectorFactory.combine([])
|
|
|
|
def test_hook_manager_lifecycle(self):
|
|
"""Test install/remove lifecycle without a real model."""
|
|
manager = SteeringHookManager()
|
|
assert not manager.is_active
|
|
manager.remove() # Should not crash even with no hooks
|
|
assert not manager.is_active
|
|
|
|
def test_hook_with_simple_model(self):
|
|
"""Test steering on a simple nn.Sequential model."""
|
|
model = nn.Sequential(
|
|
nn.Linear(16, 16),
|
|
nn.ReLU(),
|
|
nn.Linear(16, 16),
|
|
nn.ReLU(),
|
|
nn.Linear(16, 8),
|
|
)
|
|
|
|
vec = SteeringVectorFactory.from_refusal_direction(torch.randn(16))
|
|
config = SteeringConfig(
|
|
vectors=[vec],
|
|
target_layers=[0, 2], # steer at first and third linear layers
|
|
alpha=1.0,
|
|
)
|
|
|
|
manager = SteeringHookManager()
|
|
# Install on specific modules
|
|
layers = list(model.children())
|
|
result = manager.install(model, config, layer_modules=layers)
|
|
assert result.hooks_installed == 2
|
|
assert manager.is_active
|
|
|
|
# Run a forward pass (should not crash)
|
|
x = torch.randn(1, 16)
|
|
output = model(x)
|
|
assert output.shape == (1, 8)
|
|
|
|
# Remove hooks
|
|
manager.remove()
|
|
assert not manager.is_active
|
|
|
|
def test_steering_effectiveness_remove(self):
|
|
eff = compute_steering_effectiveness(2.0, 0.5, direction="remove")
|
|
assert 0 < eff < 1.0 # Reduced but not eliminated
|
|
|
|
def test_steering_effectiveness_perfect_remove(self):
|
|
eff = compute_steering_effectiveness(2.0, 0.0, direction="remove")
|
|
assert eff == 1.0
|
|
|
|
def test_steering_effectiveness_no_change(self):
|
|
eff = compute_steering_effectiveness(2.0, 2.0, direction="remove")
|
|
assert eff == 0.0
|
|
|
|
def test_steering_effectiveness_add(self):
|
|
eff = compute_steering_effectiveness(1.0, 3.0, direction="add")
|
|
assert eff == 1.0 # Capped at 1.0
|
|
|
|
def test_format_report(self):
|
|
vec = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
|
|
config = SteeringConfig(vectors=[vec], target_layers=[3, 5], alpha=0.5)
|
|
result = SteeringResult(config=config, hooks_installed=2, total_steered_layers=2)
|
|
report = format_steering_report(result)
|
|
assert "Steering" in report
|
|
assert "refusal" in report
|
|
|
|
def test_steering_config_position_modes(self):
|
|
"""Test different position modes in config."""
|
|
for pos in ["all", "last", "first"]:
|
|
config = SteeringConfig(
|
|
vectors=[SteeringVectorFactory.from_refusal_direction(torch.randn(8))],
|
|
target_layers=[0],
|
|
position=pos,
|
|
)
|
|
assert config.position == pos
|
|
|
|
def test_imports(self):
|
|
from obliteratus.analysis import SteeringVectorFactory, SteeringHookManager
|
|
assert SteeringVectorFactory is not None
|
|
assert SteeringHookManager is not None
|
|
|
|
|
|
class TestParametrizedDimensions:
|
|
"""Parametrized tests across different hidden dimensions."""
|
|
|
|
@pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256, 768])
|
|
def test_whitened_svd_various_dims(self, hidden_dim):
|
|
n_samples = max(4, hidden_dim // 4)
|
|
harmful = [torch.randn(hidden_dim) for _ in range(n_samples)]
|
|
harmless = [torch.randn(hidden_dim) for _ in range(n_samples)]
|
|
extractor = WhitenedSVDExtractor()
|
|
result = extractor.extract(harmful, harmless, n_directions=1)
|
|
assert result.directions.shape[1] == hidden_dim
|
|
|
|
@pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256])
|
|
def test_cross_layer_various_dims(self, hidden_dim):
|
|
directions = {i: torch.randn(hidden_dim) for i in range(4)}
|
|
analyzer = CrossLayerAlignmentAnalyzer()
|
|
result = analyzer.analyze(directions)
|
|
assert 0.0 <= result.direction_persistence_score <= 1.0
|
|
|
|
@pytest.mark.parametrize("hidden_dim", [4, 32, 128])
|
|
def test_sparse_surgery_various_dims(self, hidden_dim):
|
|
weight = torch.randn(hidden_dim, hidden_dim)
|
|
direction = torch.randn(hidden_dim)
|
|
direction = direction / direction.norm()
|
|
surgeon = SparseDirectionSurgeon()
|
|
result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0)
|
|
assert 0.0 <= result.energy_removed <= 1.0
|
|
|
|
@pytest.mark.parametrize("n_layers", [1, 4, 12, 32])
|
|
def test_imprint_various_layer_counts(self, n_layers):
|
|
directions = {i: torch.randn(64) for i in range(n_layers)}
|
|
detector = AlignmentImprintDetector()
|
|
result = detector.detect_imprint(directions)
|
|
assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown")
|
|
|
|
|
|
class TestExceptionPaths:
|
|
"""Tests for error handling and boundary conditions."""
|
|
|
|
def test_whitened_svd_mismatched_dims(self):
|
|
"""Harmful and harmless with different hidden dims should fail or handle gracefully."""
|
|
harmful = [torch.randn(64) for _ in range(10)]
|
|
harmless = [torch.randn(32) for _ in range(10)]
|
|
extractor = WhitenedSVDExtractor()
|
|
with pytest.raises(Exception):
|
|
extractor.extract(harmful, harmless, n_directions=1)
|
|
|
|
def test_whitened_svd_single_sample(self):
|
|
"""Single sample should not crash (may return 0 directions due to insufficient data)."""
|
|
harmful = [torch.randn(32)]
|
|
harmless = [torch.randn(32)]
|
|
extractor = WhitenedSVDExtractor()
|
|
result = extractor.extract(harmful, harmless, n_directions=1)
|
|
assert result.directions.shape[1] == 32 # hidden dim preserved
|
|
|
|
def test_sparse_surgery_zero_direction(self):
|
|
"""Zero direction vector should not crash."""
|
|
weight = torch.randn(16, 16)
|
|
direction = torch.zeros(16)
|
|
surgeon = SparseDirectionSurgeon()
|
|
# Should handle gracefully (possibly returning 0 energy)
|
|
result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0)
|
|
assert result is not None
|
|
|
|
def test_cross_layer_single_layer(self):
|
|
"""Single layer directions should still produce a result."""
|
|
directions = {0: torch.randn(32)}
|
|
analyzer = CrossLayerAlignmentAnalyzer()
|
|
result = analyzer.analyze(directions)
|
|
assert result is not None
|