Add files via upload

This commit is contained in:
pliny
2026-03-07 17:53:42 -08:00
committed by GitHub
parent 984ce14059
commit ece134f870
8 changed files with 1010 additions and 77 deletions
+16
View File
@@ -13,6 +13,10 @@ __all__ = [
"save_contribution",
"load_contributions",
"aggregate_results",
"TourneyRunner",
"TourneyResult",
"get_adaptive_recommendation",
"AdaptiveRecommendation",
]
@@ -44,4 +48,16 @@ def __getattr__(name):
if name == "aggregate_results":
from obliteratus.community import aggregate_results
return aggregate_results
if name == "TourneyRunner":
from obliteratus.tourney import TourneyRunner
return TourneyRunner
if name == "TourneyResult":
from obliteratus.tourney import TourneyResult
return TourneyResult
if name == "get_adaptive_recommendation":
from obliteratus.adaptive_defaults import get_adaptive_recommendation
return get_adaptive_recommendation
if name == "AdaptiveRecommendation":
from obliteratus.adaptive_defaults import AdaptiveRecommendation
return AdaptiveRecommendation
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+113 -51
View File
@@ -33,11 +33,12 @@ from typing import Any, Callable
import torch
import torch.nn as nn
from obliteratus import device as dev # noqa: E402 — must import before CUDA setup
# Reduce CUDA memory fragmentation for large models. Must be set before any
# CUDA allocations, so we do it at import time. This is the PyTorch-recommended
# fix for "reserved but unallocated" memory issues.
if "PYTORCH_CUDA_ALLOC_CONF" not in os.environ and torch.cuda.is_available():
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
dev.configure_cuda_alloc()
from obliteratus.models.loader import ModelHandle, load_model # noqa: E402
from obliteratus.strategies.utils import ( # noqa: E402
@@ -76,11 +77,13 @@ METHODS = {
"n_directions": 4,
"norm_preserve": True,
"regularization": 0.3,
"embed_regularization": 0.5,
"refinement_passes": 2,
"project_biases": True,
"use_chat_template": True,
"use_whitened_svd": False,
"true_iterative_refinement": False,
"layer_adaptive_strength": True,
},
"aggressive": {
"label": "Aggressive (Full Gabliteration + Enhanced)",
@@ -504,6 +507,32 @@ class StageResult:
details: dict[str, Any] = field(default_factory=dict)
def auto_hub_repo_id(model_name: str, *, api=None, org: str | None = None) -> str:
"""Generate a Hub repo ID like ``{namespace}/{short_model}-OBLITERATED``.
If *org* is given, uses that as the namespace (e.g. a shared community org).
Otherwise resolves the authenticated HF username via the API.
"""
import re
if org:
namespace = org
else:
if api is None:
from huggingface_hub import HfApi
api = HfApi()
user_info = api.whoami()
namespace = user_info.get("name") or user_info.get("user", "unknown")
# Extract short model name (part after '/')
short = model_name.split("/")[-1] if "/" in model_name else model_name
# Sanitize: keep alphanumeric, hyphens, dots
short = re.sub(r"[^a-zA-Z0-9\-.]", "-", short)
short = re.sub(r"-+", "-", short).strip("-")
return f"{namespace}/{short}-OBLITERATED"
# ── Main pipeline ───────────────────────────────────────────────────────
class AbliterationPipeline:
@@ -534,6 +563,8 @@ class AbliterationPipeline:
trust_remote_code: bool = False,
method: str = "advanced",
push_to_hub: str | None = None,
hub_token: str | None = None,
hub_community_org: str | None = None,
n_directions: int | None = None,
norm_preserve: bool | None = None,
regularization: float | None = None,
@@ -593,6 +624,8 @@ class AbliterationPipeline:
self.trust_remote_code = trust_remote_code
self.large_model_mode = large_model_mode
self.push_to_hub = push_to_hub
self.hub_token = hub_token
self.hub_community_org = hub_community_org
self.harmful_prompts = list(harmful_prompts) if harmful_prompts is not None else list(HARMFUL_PROMPTS)
self.harmless_prompts = list(harmless_prompts) if harmless_prompts is not None else list(HARMLESS_PROMPTS)
if not self.harmful_prompts:
@@ -758,16 +791,8 @@ class AbliterationPipeline:
@staticmethod
def _free_gpu_memory():
"""Release unused GPU memory between pipeline stages."""
import gc
gc.collect()
if torch.cuda.is_available():
try:
torch.cuda.empty_cache()
except Exception:
# CUDA may be in an error state after illegal memory access;
# swallow so we don't cascade into every subsequent stage.
pass
"""Release unused GPU/accelerator memory between pipeline stages."""
dev.free_gpu_memory()
@staticmethod
def _get_model_device(model: nn.Module) -> torch.device:
@@ -1374,12 +1399,8 @@ class AbliterationPipeline:
max_length = self.max_seq_length
else:
max_length = 384 if collect_multi_pos else 256
free_gb = 0.0
if torch.cuda.is_available():
free_gb = sum(
torch.cuda.mem_get_info(i)[0] / (1024 ** 3)
for i in range(torch.cuda.device_count())
)
free_gb = dev.get_total_free_gb()
if dev.is_gpu_available():
if self.max_seq_length is None and free_gb < 2.0:
max_length = 64
self.log(f" Low GPU memory ({free_gb:.1f} GB free), using max_length={max_length}")
@@ -1969,22 +1990,22 @@ class AbliterationPipeline:
# Memory-aware cap: SAE encoder+decoder use
# 2 * hidden * (expansion * hidden) * 4 bytes
sae_mem_mb = 2 * hidden_dim * (sae_expansion * hidden_dim) * 4 / 1e6
if torch.cuda.is_available():
if dev.is_gpu_available():
try:
free_mb = torch.cuda.mem_get_info()[0] / 1e6
free_mb = dev.get_total_free_gb() * 1024
# Leave 512 MB headroom for other ops
while sae_mem_mb > (free_mb - 512) and sae_expansion > 1:
sae_expansion //= 2
sae_mem_mb = 2 * hidden_dim * (sae_expansion * hidden_dim) * 4 / 1e6
except Exception:
pass # Fallback to hidden_dim-based heuristic
# Use GPU when enough headroom exists (SAE is small relative to model)
# Use GPU/MPS when enough headroom exists (SAE is small relative to model)
sae_device = "cpu"
if torch.cuda.is_available():
if dev.is_gpu_available():
try:
sae_free_mb = torch.cuda.mem_get_info()[0] / 1e6
sae_free_mb = dev.get_total_free_gb() * 1024
if sae_free_mb > sae_mem_mb + 1024:
sae_device = "cuda"
sae_device = dev.get_device()
except Exception:
pass
sae = train_sae(
@@ -4081,9 +4102,12 @@ class AbliterationPipeline:
f"Falling back to float weight replacement.",
stacklevel=3,
)
proj_module.weight.data = W_modified.to(
device=proj_module.weight.device,
dtype=proj_module.weight.dtype,
# Cannot cast float back to quantized (Byte/uint8) dtype directly —
# PyTorch rejects Float→Byte casts. Replace the entire parameter
# with a float version so projections are preserved.
proj_module.weight = nn.Parameter(
W_modified.to(device=proj_module.weight.device),
requires_grad=False,
)
@staticmethod
@@ -4122,7 +4146,8 @@ class AbliterationPipeline:
continue
original_norm = saved_norms[param_name]
if original_norm > 0:
data = param.data.float() if not param.data.is_floating_point() else param.data
needs_cast = not param.data.is_floating_point()
data = param.data.float() if needs_cast else param.data
new_norm = data.norm().item()
if math.isnan(new_norm) or math.isinf(new_norm) or new_norm == 0:
continue # Skip — weight is degenerate after projection
@@ -4132,7 +4157,12 @@ class AbliterationPipeline:
# layers. Uncapped amplification destroys coherence.
if ratio > _MAX_NORM_RATIO:
ratio = _MAX_NORM_RATIO
param.data.mul_(ratio)
if needs_cast:
# Non-float dtypes (e.g. uint8) can't mul_ by a float
# scalar in-place — rescale in float then cast back.
param.data.copy_(data.mul_(ratio).to(param.data.dtype))
else:
param.data.mul_(ratio)
@staticmethod
def _project_out_advanced(
@@ -4363,7 +4393,13 @@ class AbliterationPipeline:
param.data = quantized
param.quant_state = new_state
except (ImportError, AttributeError, RuntimeError):
param.data = data
# Cannot cast float back to quantized dtype (Byte) —
# replace the entire parameter with float version.
setattr(
container,
name,
nn.Parameter(data.to(param.device), requires_grad=False),
)
return count
return 0
@@ -4953,7 +4989,13 @@ class AbliterationPipeline:
param.data = quantized
param.quant_state = new_state
except (ImportError, AttributeError, RuntimeError):
param.data = data.to(device=param.device, dtype=param.dtype)
# Cannot cast float back to quantized dtype (Byte) —
# replace the entire parameter with float version.
setattr(
container,
pname,
nn.Parameter(data.to(param.device), requires_grad=False),
)
if count > 0:
return count
@@ -5057,7 +5099,13 @@ class AbliterationPipeline:
param.data = quantized
param.quant_state = new_state
except (ImportError, AttributeError, RuntimeError):
param.data = data.to(device=param.device, dtype=param.dtype)
# Cannot cast float back to quantized dtype (Byte) —
# replace the entire parameter with float version.
setattr(
container,
pname,
nn.Parameter(data.to(param.device), requires_grad=False),
)
if count > 0:
return count
@@ -5320,16 +5368,19 @@ class AbliterationPipeline:
unique_ratio = len(set(words)) / len(words)
if unique_ratio > 0.2:
coherent_count += 1
except torch.cuda.OutOfMemoryError:
self._free_gpu_memory()
self.log(" Skipping generation tests (CUDA out of memory — model too large for KV cache)")
generation_failed = True
except (RuntimeError, Exception) as e:
err_msg = str(e)
if "CUDA" in err_msg or "illegal" in err_msg.lower():
if dev.is_oom_error(e):
self._free_gpu_memory()
self.log(f" Skipping generation tests (CUDA error: {err_msg[:120]})")
self.log(" Skipping generation tests (out of memory — model too large for KV cache)")
generation_failed = True
elif isinstance(e, RuntimeError):
err_msg = str(e)
if "CUDA" in err_msg or "MPS" in err_msg or "illegal" in err_msg.lower():
self._free_gpu_memory()
self.log(f" Skipping generation tests (device error: {err_msg[:120]})")
generation_failed = True
else:
raise
else:
raise
@@ -5472,18 +5523,21 @@ class AbliterationPipeline:
del inputs, outputs
self._free_gpu_memory()
except torch.cuda.OutOfMemoryError:
self._free_gpu_memory()
self.log(f" [batch {batch_start+1}-{batch_end}] CUDA OOM — stopping")
self.log(" Skipping remaining refusal tests (CUDA out of memory)")
oom_break = True
except (RuntimeError, Exception) as e:
err_msg = str(e)
if "CUDA" in err_msg or "illegal" in err_msg.lower():
if dev.is_oom_error(e):
self._free_gpu_memory()
self.log(f" [batch {batch_start+1}-{batch_end}] CUDA error — stopping")
self.log(f" Skipping remaining refusal tests (CUDA error: {err_msg[:120]})")
self.log(f" [batch {batch_start+1}-{batch_end}] OOM — stopping")
self.log(" Skipping remaining refusal tests (out of memory)")
oom_break = True
elif isinstance(e, RuntimeError):
err_msg = str(e)
if "CUDA" in err_msg or "MPS" in err_msg or "illegal" in err_msg.lower():
self._free_gpu_memory()
self.log(f" [batch {batch_start+1}-{batch_end}] device error — stopping")
self.log(f" Skipping remaining refusal tests (device error: {err_msg[:120]})")
oom_break = True
else:
raise
else:
raise
@@ -5900,11 +5954,19 @@ class AbliterationPipeline:
# 5. Optionally push the saved directory to the Hub.
if self.push_to_hub:
repo_id = self.push_to_hub
self.log(f"Uploading to Hub: {repo_id}")
from huggingface_hub import HfApi
api = HfApi()
api = HfApi(token=self.hub_token) if self.hub_token else HfApi()
# Resolve "auto" → {namespace}/{short_model}-OBLITERATED
if self.push_to_hub == "auto":
repo_id = auto_hub_repo_id(
self.model_name, api=api, org=self.hub_community_org,
)
self.log(f"Auto-named Hub repo: {repo_id}")
else:
repo_id = self.push_to_hub
self.log(f"Uploading to Hub: {repo_id}")
api.create_repo(repo_id, exist_ok=True)
api.upload_folder(
folder_path=str(self.output_dir),
+4 -3
View File
@@ -39,6 +39,7 @@ from dataclasses import dataclass
import torch
import torch.nn as nn
from obliteratus import device as dev
@dataclass
@@ -120,11 +121,11 @@ def _auto_detect_device(device: str | None = None) -> str:
"""
if device is not None and device not in ("auto",):
return device
if torch.cuda.is_available():
if dev.is_gpu_available():
try:
free_mb = torch.cuda.mem_get_info()[0] / 1e6
free_mb = dev.get_total_free_gb() * 1024
if free_mb > 512:
return "cuda"
return dev.get_device()
except Exception:
pass
return "cpu"
+305
View File
@@ -0,0 +1,305 @@
"""Unified device abstraction for CUDA, MPS (Apple Silicon), and CPU.
All device-specific queries (availability, memory, cache management) go through
this module so the rest of the codebase never calls ``torch.cuda.*`` directly.
"""
from __future__ import annotations
import gc
import logging
import os
import platform
from dataclasses import dataclass
import torch
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Device detection
# ---------------------------------------------------------------------------
def is_cuda() -> bool:
"""True when at least one NVIDIA CUDA GPU is visible."""
return torch.cuda.is_available()
def is_mps() -> bool:
"""True when Apple Metal Performance Shaders backend is usable."""
return (
hasattr(torch.backends, "mps")
and torch.backends.mps.is_available()
and torch.backends.mps.is_built()
)
def is_gpu_available() -> bool:
"""True if *any* GPU backend (CUDA or MPS) is available."""
return is_cuda() or is_mps()
def get_device(preference: str = "auto") -> str:
"""Resolve a device string.
Parameters
----------
preference : str
``"auto"`` picks the best GPU, ``"cuda"``/``"mps"``/``"cpu"`` forces.
Returns
-------
str
A PyTorch device string (``"cuda"``, ``"mps"``, or ``"cpu"``).
"""
if preference == "auto":
if is_cuda():
return "cuda"
if is_mps():
return "mps"
return "cpu"
return preference
def get_device_name() -> str:
"""Human-readable name of the current accelerator."""
if is_cuda():
return torch.cuda.get_device_name(0)
if is_mps():
# Apple doesn't expose a per-chip name via MPS; use platform info.
chip = platform.processor() or "Apple Silicon"
return f"Apple {chip} (MPS)"
return "CPU"
def device_count() -> int:
"""Number of accelerator devices (GPUs or MPS slots)."""
if is_cuda():
return torch.cuda.device_count()
if is_mps():
return 1 # MPS always exposes a single unified device
return 0
# ---------------------------------------------------------------------------
# Memory information
# ---------------------------------------------------------------------------
@dataclass
class MemoryInfo:
"""Snapshot of accelerator memory (in GB)."""
used_gb: float = 0.0
reserved_gb: float = 0.0
total_gb: float = 0.0
free_gb: float = 0.0
device_name: str = "CPU"
def _system_memory_gb() -> tuple[float, float]:
"""Return (total_gb, available_gb) of system RAM."""
try:
import psutil
vm = psutil.virtual_memory()
return vm.total / 1024 ** 3, vm.available / 1024 ** 3
except ImportError:
pass
try:
total = os.sysconf("SC_PHYS_PAGES") * os.sysconf("SC_PAGE_SIZE") / 1024 ** 3
# Rough estimate: assume 60 % available if we can't query
return total, total * 0.6
except (AttributeError, ValueError):
return 16.0, 8.0 # conservative fallback
def get_memory_info(device_index: int = 0) -> MemoryInfo:
"""Query memory for the given accelerator (or system RAM for MPS/CPU)."""
name = get_device_name()
if is_cuda():
try:
free, total = torch.cuda.mem_get_info(device_index)
used = torch.cuda.memory_allocated(device_index)
reserved = torch.cuda.memory_reserved(device_index)
total_gb = total / 1024 ** 3
return MemoryInfo(
used_gb=used / 1024 ** 3,
reserved_gb=reserved / 1024 ** 3,
total_gb=total_gb,
free_gb=free / 1024 ** 3,
device_name=name,
)
except Exception:
props = torch.cuda.get_device_properties(device_index)
total_gb = props.total_memory / 1024 ** 3
return MemoryInfo(total_gb=total_gb, free_gb=total_gb, device_name=name)
if is_mps():
# MPS uses unified memory — report system RAM as a proxy.
total, avail = _system_memory_gb()
# Apple's unified memory is shared with the OS, so usable fraction
# is typically ~65-75 % of total.
usable = total * 0.70
return MemoryInfo(
used_gb=max(usable - avail, 0.0),
reserved_gb=0.0,
total_gb=usable,
free_gb=min(avail, usable),
device_name=name,
)
# CPU-only
total, avail = _system_memory_gb()
return MemoryInfo(total_gb=total, free_gb=avail, device_name=name)
def get_total_free_gb() -> float:
"""Sum of free memory across all accelerator devices, in GB."""
if is_cuda():
total_free = 0.0
for i in range(torch.cuda.device_count()):
try:
free, _ = torch.cuda.mem_get_info(i)
total_free += free / 1024 ** 3
except Exception:
props = torch.cuda.get_device_properties(i)
total_free += props.total_memory / 1024 ** 3
return total_free
if is_mps():
_, avail = _system_memory_gb()
return avail * 0.70 # usable fraction
return 0.0
# ---------------------------------------------------------------------------
# Cache / memory management
# ---------------------------------------------------------------------------
def empty_cache() -> None:
"""Release cached allocations on the current accelerator."""
if is_cuda():
torch.cuda.empty_cache()
elif is_mps():
# torch.mps.empty_cache() available since PyTorch 2.1
if hasattr(torch.mps, "empty_cache"):
torch.mps.empty_cache()
def free_gpu_memory() -> None:
"""Aggressive memory cleanup: GC + accelerator cache flush."""
gc.collect()
if is_cuda():
try:
torch.cuda.empty_cache()
except Exception:
try:
torch.cuda.synchronize()
except Exception:
pass
try:
torch.cuda.reset_peak_memory_stats()
except Exception:
pass
elif is_mps():
if hasattr(torch.mps, "empty_cache"):
try:
torch.mps.empty_cache()
except Exception:
pass
if hasattr(torch.mps, "synchronize"):
try:
torch.mps.synchronize()
except Exception:
pass
def set_seed_all(seed: int) -> None:
"""Set random seed on all available accelerators."""
torch.manual_seed(seed)
if is_cuda():
torch.cuda.manual_seed_all(seed)
# MPS shares the CPU random state — no separate seed call needed.
# ---------------------------------------------------------------------------
# Dtype helpers
# ---------------------------------------------------------------------------
def default_dtype(device: str | None = None) -> torch.dtype:
"""Sensible default dtype for the given device."""
dev = device or get_device()
if dev == "cpu":
return torch.float32
return torch.float16
def supports_bfloat16(device: str | None = None) -> bool:
"""Whether *bfloat16* is natively supported on the target device."""
dev = device or get_device()
if dev.startswith("cuda"):
if is_cuda():
major, _ = torch.cuda.get_device_capability(0)
return major >= 8 # Ampere+
return False
if dev == "mps":
# MPS added bfloat16 support in PyTorch 2.3+
return hasattr(torch, "__version__") and tuple(
int(x) for x in torch.__version__.split(".")[:2]
) >= (2, 3)
return True # CPU supports bfloat16 on most modern platforms
def supports_float64(device: str | None = None) -> bool:
"""Whether *float64* is supported (MPS does NOT support it)."""
dev = device or get_device()
return dev != "mps"
def safe_svd_dtype(tensor: torch.Tensor) -> torch.dtype:
"""Return a dtype safe for SVD on the tensor's device.
MPS does not support float64, so we cap at float32.
"""
if tensor.device.type == "mps":
return torch.float32
return torch.float64 if tensor.dtype in (torch.float64, torch.float32) else torch.float32
# ---------------------------------------------------------------------------
# OOM exception matching
# ---------------------------------------------------------------------------
def is_oom_error(exc: BaseException) -> bool:
"""Return True if *exc* is an out-of-memory error on any backend."""
if isinstance(exc, torch.cuda.OutOfMemoryError):
return True
# MPS raises a generic RuntimeError containing "out of memory"
if isinstance(exc, RuntimeError) and "out of memory" in str(exc).lower():
return True
return False
# ---------------------------------------------------------------------------
# Quantization compatibility
# ---------------------------------------------------------------------------
def supports_bitsandbytes(device: str | None = None) -> bool:
"""BitsAndBytes requires NVIDIA CUDA — check that."""
dev = device or get_device()
return dev.startswith("cuda")
def supports_device_map_auto(device: str | None = None) -> bool:
"""Accelerate's device_map='auto' is only reliable on CUDA."""
dev = device or get_device()
return dev.startswith("cuda")
# ---------------------------------------------------------------------------
# CUDA env setup (called once at import time of abliterate.py)
# ---------------------------------------------------------------------------
def configure_cuda_alloc() -> None:
"""Set expandable_segments for CUDA if available."""
if is_cuda() and "PYTORCH_CUDA_ALLOC_CONF" not in os.environ:
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
+469
View File
@@ -0,0 +1,469 @@
"""Optional MLX backend for Apple Silicon native inference and weight editing.
MLX is Apple's array framework that runs natively on the Apple Neural Engine
and Metal GPU. When available, it provides significantly faster inference and
weight manipulation than PyTorch's MPS backend on Apple hardware.
This module is entirely optional — if ``mlx`` / ``mlx-lm`` are not installed,
``MLX_AVAILABLE`` is ``False`` and all public functions raise ``RuntimeError``.
Install with::
pip install mlx>=0.22 mlx-lm>=0.20
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Callable
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Availability check
# ---------------------------------------------------------------------------
MLX_AVAILABLE = False
_mx = None # mlx module
_mlx_lm = None # mlx-lm module
_mlx_nn = None # mlx.nn module
try:
import mlx.core as _mx_core # type: ignore[import-untyped]
import mlx.nn as _mlx_nn_mod # type: ignore[import-untyped]
import mlx_lm # type: ignore[import-untyped]
_mx = _mx_core
_mlx_nn = _mlx_nn_mod
_mlx_lm = mlx_lm
MLX_AVAILABLE = True
logger.info("MLX backend available (mlx %s)", _mx.__version__ if hasattr(_mx, "__version__") else "?")
except ImportError:
pass
def _require_mlx() -> None:
if not MLX_AVAILABLE:
raise RuntimeError(
"MLX backend is not available. Install with: pip install mlx>=0.22 mlx-lm>=0.20"
)
# ---------------------------------------------------------------------------
# Model loading
# ---------------------------------------------------------------------------
class MLXModelHandle:
"""Lightweight wrapper around an MLX-loaded model + tokenizer."""
def __init__(self, model: Any, tokenizer: Any, model_name: str):
self.model = model
self.tokenizer = tokenizer
self.model_name = model_name
@property
def config(self) -> Any:
return getattr(self.model, "config", None)
def load_model(
model_name: str,
dtype: str = "float16",
) -> MLXModelHandle:
"""Load a HuggingFace model via ``mlx-lm`` for Apple-native execution.
Parameters
----------
model_name : str
HuggingFace model identifier (e.g. ``"meta-llama/Llama-3.2-3B-Instruct"``).
dtype : str
One of ``"float16"``, ``"bfloat16"``, ``"float32"``.
Returns
-------
MLXModelHandle
Wrapper with ``.model`` and ``.tokenizer`` attributes.
"""
_require_mlx()
from mlx_lm import load # type: ignore[import-untyped]
logger.info("Loading %s via MLX (dtype=%s)", model_name, dtype)
model, tokenizer = load(model_name)
return MLXModelHandle(model=model, tokenizer=tokenizer, model_name=model_name)
# ---------------------------------------------------------------------------
# Inference
# ---------------------------------------------------------------------------
def generate(
handle: MLXModelHandle,
prompt: str,
max_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.9,
repetition_penalty: float | None = None,
) -> str:
"""Generate text using the MLX model.
Parameters
----------
handle : MLXModelHandle
A loaded MLX model handle.
prompt : str
The input prompt string.
max_tokens : int
Maximum number of tokens to generate.
temperature : float
Sampling temperature.
top_p : float
Nucleus sampling threshold.
repetition_penalty : float or None
Repetition penalty (1.0 = no penalty).
Returns
-------
str
Generated text completion.
"""
_require_mlx()
from mlx_lm import generate as mlx_generate # type: ignore[import-untyped]
kwargs: dict[str, Any] = {
"max_tokens": max_tokens,
"temp": temperature,
"top_p": top_p,
}
if repetition_penalty is not None:
kwargs["repetition_penalty"] = repetition_penalty
return mlx_generate(
handle.model,
handle.tokenizer,
prompt=prompt,
**kwargs,
)
# ---------------------------------------------------------------------------
# Activation extraction
# ---------------------------------------------------------------------------
def get_activations(
handle: MLXModelHandle,
prompts: list[str],
layer_indices: list[int],
max_length: int = 256,
) -> dict[int, list[Any]]:
"""Extract hidden-state activations from specified layers.
Uses MLX's computation graph to capture intermediate outputs.
Parameters
----------
handle : MLXModelHandle
Loaded model.
prompts : list[str]
Input prompts to probe.
layer_indices : list[int]
Which transformer layers to capture.
max_length : int
Maximum sequence length for tokenization.
Returns
-------
dict[int, list[mlx.core.array]]
Mapping from layer index to list of activation arrays (one per prompt).
Each array has shape ``(hidden_dim,)`` — the last-token hidden state.
"""
_require_mlx()
import mlx.core as mx # type: ignore[import-untyped]
model = handle.model
tokenizer = handle.tokenizer
# Identify the transformer block list
layers = None
for attr in ("model.layers", "transformer.h", "gpt_neox.layers"):
obj = model
try:
for part in attr.split("."):
obj = getattr(obj, part)
layers = obj
break
except AttributeError:
continue
if layers is None:
raise RuntimeError(
"Cannot locate transformer layers in the MLX model. "
"Supported architectures: LLaMA, GPT-2, GPT-NeoX."
)
activations: dict[int, list[Any]] = {idx: [] for idx in layer_indices}
target_set = set(layer_indices)
for prompt in prompts:
tokens = tokenizer.encode(prompt)
if len(tokens) > max_length:
tokens = tokens[:max_length]
input_ids = mx.array([tokens])
# Forward through embedding
if hasattr(model, "model"):
# LLaMA-style: model.model.embed_tokens
embed_module = model.model
elif hasattr(model, "transformer"):
embed_module = model.transformer
else:
embed_module = model
if hasattr(embed_module, "embed_tokens"):
h = embed_module.embed_tokens(input_ids)
elif hasattr(embed_module, "wte"):
h = embed_module.wte(input_ids)
else:
raise RuntimeError("Cannot find embedding layer in MLX model")
# Walk through layers, capturing activations at target indices
for i, layer in enumerate(layers):
h = layer(h)
# Some layers return tuples (hidden, attention) — take first
if isinstance(h, tuple):
h = h[0]
if i in target_set:
# Last token hidden state
last_hidden = h[0, -1, :]
mx.eval(last_hidden) # force evaluation
activations[i].append(last_hidden)
return activations
# ---------------------------------------------------------------------------
# Weight manipulation
# ---------------------------------------------------------------------------
def get_weight(handle: MLXModelHandle, layer_idx: int, param_path: str) -> Any:
"""Retrieve a weight matrix from the model.
Parameters
----------
handle : MLXModelHandle
Loaded model.
layer_idx : int
Transformer layer index.
param_path : str
Dot-separated path within the layer, e.g. ``"self_attn.o_proj.weight"``.
Returns
-------
mlx.core.array
The weight tensor.
"""
_require_mlx()
model = handle.model
# Navigate to the layer
layers = _get_layers(model)
layer = layers[layer_idx]
# Navigate the param path
obj = layer
for part in param_path.split("."):
obj = getattr(obj, part)
return obj
def modify_weights(
handle: MLXModelHandle,
layer_idx: int,
param_path: str,
modifier_fn: Callable[[Any], Any],
) -> None:
"""Modify a weight matrix in-place using a function.
Parameters
----------
handle : MLXModelHandle
Loaded model.
layer_idx : int
Transformer layer index.
param_path : str
Dot-separated path within the layer to the weight, e.g.
``"self_attn.o_proj.weight"``.
modifier_fn : callable
Function that takes the current weight (mlx array) and returns the
modified weight (mlx array). For abliteration, this would project
out the refusal direction.
"""
_require_mlx()
import mlx.core as mx # type: ignore[import-untyped]
model = handle.model
layers = _get_layers(model)
layer = layers[layer_idx]
# Navigate to the parent module and leaf attribute
parts = param_path.split(".")
parent = layer
for part in parts[:-1]:
parent = getattr(parent, part)
leaf_name = parts[-1]
old_weight = getattr(parent, leaf_name)
new_weight = modifier_fn(old_weight)
# MLX uses a functional update pattern
if hasattr(parent, "update"):
parent.update({leaf_name: new_weight})
else:
setattr(parent, leaf_name, new_weight)
mx.eval(new_weight) # materialize
def project_out_direction(weight: Any, direction: Any) -> Any:
"""Project a direction out of a weight matrix (abliteration).
Given weight matrix W and unit direction d, computes::
W' = W - (W @ d) outer d
Parameters
----------
weight : mlx.core.array
Weight matrix, shape ``(out_features, in_features)``.
direction : mlx.core.array
Unit direction vector, shape ``(in_features,)``.
Returns
-------
mlx.core.array
Modified weight with direction projected out.
"""
_require_mlx()
import mlx.core as mx # type: ignore[import-untyped]
d = direction.astype(weight.dtype)
# W @ d gives the component along d for each row
proj = mx.matmul(weight, d[:, None]) # (out, 1)
return weight - mx.matmul(proj, d[None, :]) # (out, in)
# ---------------------------------------------------------------------------
# Save model
# ---------------------------------------------------------------------------
def save_model(
handle: MLXModelHandle,
output_dir: str | Path,
upload_repo: str | None = None,
) -> Path:
"""Save the (modified) MLX model to disk.
Saves in safetensors format compatible with both MLX and HuggingFace.
Parameters
----------
handle : MLXModelHandle
Model handle (possibly with modified weights).
output_dir : str or Path
Directory to save into.
upload_repo : str or None
If set, also uploads to HuggingFace Hub.
Returns
-------
Path
The output directory.
"""
_require_mlx()
from mlx_lm import convert # type: ignore[import-untyped]
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
# mlx-lm's save uses safetensors
if hasattr(_mlx_lm, "save_model"):
_mlx_lm.save_model(str(out), handle.model, handle.tokenizer)
else:
# Fallback: manual save via mlx.core.save_safetensors
import mlx.core as mx # type: ignore[import-untyped]
weights = dict(handle.model.parameters())
flat = {}
_flatten_dict(weights, "", flat)
mx.save_safetensors(str(out / "model.safetensors"), flat)
# Save tokenizer via transformers
handle.tokenizer.save_pretrained(str(out))
logger.info("MLX model saved to %s", out)
if upload_repo:
try:
from mlx_lm import upload_to_hub # type: ignore[import-untyped]
upload_to_hub(str(out), upload_repo)
logger.info("Uploaded to %s", upload_repo)
except (ImportError, AttributeError):
logger.warning("mlx-lm upload not available — push manually with huggingface-cli")
return out
# ---------------------------------------------------------------------------
# Conversion: PyTorch ↔ MLX
# ---------------------------------------------------------------------------
def torch_tensor_to_mlx(tensor: "torch.Tensor") -> Any: # noqa: F821
"""Convert a PyTorch tensor to an MLX array."""
_require_mlx()
import mlx.core as mx # type: ignore[import-untyped]
import numpy as np
# Move to CPU and convert via numpy
np_array = tensor.detach().cpu().float().numpy()
return mx.array(np_array)
def mlx_to_torch_tensor(array: Any, device: str = "cpu") -> "torch.Tensor": # noqa: F821
"""Convert an MLX array to a PyTorch tensor."""
import numpy as np
import torch
np_array = np.array(array)
return torch.from_numpy(np_array).to(device)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _get_layers(model: Any) -> Any:
"""Locate the transformer block list in an MLX model."""
for attr_path in ("model.layers", "transformer.h", "gpt_neox.layers"):
obj = model
try:
for part in attr_path.split("."):
obj = getattr(obj, part)
return obj
except AttributeError:
continue
raise RuntimeError("Cannot locate transformer layers in MLX model")
def _flatten_dict(d: dict, prefix: str, out: dict) -> None:
"""Flatten a nested dict with dot-separated keys."""
for k, v in d.items():
key = f"{prefix}{k}" if prefix else k
if isinstance(v, dict):
_flatten_dict(v, f"{key}.", out)
else:
out[key] = v
+2 -2
View File
@@ -38,9 +38,9 @@ def set_seed(seed: int = 42, deterministic: bool = True) -> None:
try:
import torch
from obliteratus import device as dev
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
dev.set_seed_all(seed)
if deterministic:
torch.use_deterministic_algorithms(True, warn_only=True)
+93 -21
View File
@@ -429,35 +429,111 @@ def fetch_hub_records(max_records: int = 10000) -> list[dict[str, Any]]:
This is used by :func:`get_leaderboard_data` to merge community-wide
results with local data.
Tries three strategies in order:
1. ``huggingface_hub`` API (preferred on HF Spaces)
2. Git shallow clone (works anywhere git is installed)
3. Returns empty list
"""
repo = _TELEMETRY_REPO
# For adaptive defaults, always try the default repo even locally
if not repo:
repo = _DEFAULT_TELEMETRY_REPO
# Strategy 1: huggingface_hub API
try:
records = _fetch_via_hf_api(repo, max_records)
if records:
return records
except Exception as e:
logger.debug("HF API fetch failed: %s", e)
# Strategy 2: git shallow clone fallback
try:
records = _fetch_via_git_clone(repo, max_records)
if records:
return records
except Exception as e:
logger.debug("Git clone fetch failed: %s", e)
return []
def _fetch_via_hf_api(repo: str, max_records: int) -> list[dict[str, Any]]:
"""Fetch telemetry via huggingface_hub API."""
from huggingface_hub import HfApi, hf_hub_download
api = HfApi(token=os.environ.get("HF_TOKEN"))
try:
all_files = api.list_repo_files(repo, repo_type="dataset")
except Exception:
return []
try:
from huggingface_hub import HfApi, hf_hub_download
jsonl_files = [f for f in all_files if f.startswith("data/") and f.endswith(".jsonl")]
if not jsonl_files:
return []
api = HfApi(token=os.environ.get("HF_TOKEN"))
records: list[dict[str, Any]] = []
for filepath in jsonl_files:
try:
all_files = api.list_repo_files(repo, repo_type="dataset")
local_path = hf_hub_download(
repo, filepath, repo_type="dataset",
# etag_timeout=0 forces a freshness check against Hub
# so we always get the latest data, not stale cache
etag_timeout=0,
)
with open(local_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError:
continue
if len(records) >= max_records:
break
except Exception:
# Repo doesn't exist yet or network error
continue
if len(records) >= max_records:
break
return records
def _fetch_via_git_clone(repo: str, max_records: int) -> list[dict[str, Any]]:
"""Fetch telemetry via git shallow clone (fallback when huggingface_hub unavailable).
Uses GIT_LFS_SKIP_SMUDGE=1 for speed — JSONL files are plain text,
not LFS objects, so this works fine.
"""
import shutil
import subprocess
import tempfile
clone_url = f"https://huggingface.co/datasets/{repo}"
clone_dir = Path(tempfile.mkdtemp(prefix="obliteratus_telemetry_"))
try:
env = dict(os.environ)
env["GIT_LFS_SKIP_SMUDGE"] = "1"
result = subprocess.run(
["git", "clone", "--depth", "1", clone_url, str(clone_dir)],
capture_output=True, text=True, timeout=60, env=env,
)
if result.returncode != 0:
logger.debug("Git clone failed: %s", result.stderr.strip())
return []
jsonl_files = [f for f in all_files if f.startswith("data/") and f.endswith(".jsonl")]
if not jsonl_files:
# Parse all JSONL files in data/
data_dir = clone_dir / "data"
if not data_dir.exists():
return []
records: list[dict[str, Any]] = []
for filepath in jsonl_files:
for jsonl_file in sorted(data_dir.glob("*.jsonl")):
try:
local_path = hf_hub_download(
repo, filepath, repo_type="dataset",
# etag_timeout=0 forces a freshness check against Hub
# so we always get the latest data, not stale cache
etag_timeout=0,
)
with open(local_path) as f:
with open(jsonl_file) as f:
for line in f:
line = line.strip()
if not line:
@@ -474,12 +550,8 @@ def fetch_hub_records(max_records: int = 10000) -> list[dict[str, Any]]:
break
return records
except ImportError:
logger.debug("huggingface_hub not installed — cannot fetch Hub records")
return []
except Exception as e:
logger.debug(f"Failed to fetch Hub records: {e}")
return []
finally:
shutil.rmtree(clone_dir, ignore_errors=True)
# ── Hub restore (warm-start after rebuild) ────────────────────────────
+8
View File
@@ -0,0 +1,8 @@
# Optional Apple Silicon dependencies for native MLX acceleration.
# Install alongside the main requirements on macOS with Apple Silicon:
#
# pip install -r requirements.txt -r requirements-apple.txt
#
# These packages are macOS-only and will fail to install on Linux/Windows.
mlx>=0.22
mlx-lm>=0.20