From ece134f87011e2dea9e40c216bb67650abcdad3f Mon Sep 17 00:00:00 2001 From: pliny <133052465+elder-plinius@users.noreply.github.com> Date: Sat, 7 Mar 2026 17:53:42 -0800 Subject: [PATCH] Add files via upload --- obliteratus/__init__.py | 16 + obliteratus/abliterate.py | 164 +++++--- obliteratus/analysis/sae_abliteration.py | 7 +- obliteratus/device.py | 305 +++++++++++++++ obliteratus/mlx_backend.py | 469 +++++++++++++++++++++++ obliteratus/reproducibility.py | 4 +- obliteratus/telemetry.py | 114 +++++- requirements-apple.txt | 8 + 8 files changed, 1010 insertions(+), 77 deletions(-) create mode 100644 obliteratus/device.py create mode 100644 obliteratus/mlx_backend.py create mode 100644 requirements-apple.txt diff --git a/obliteratus/__init__.py b/obliteratus/__init__.py index 94fc815..7f70058 100644 --- a/obliteratus/__init__.py +++ b/obliteratus/__init__.py @@ -13,6 +13,10 @@ __all__ = [ "save_contribution", "load_contributions", "aggregate_results", + "TourneyRunner", + "TourneyResult", + "get_adaptive_recommendation", + "AdaptiveRecommendation", ] @@ -44,4 +48,16 @@ def __getattr__(name): if name == "aggregate_results": from obliteratus.community import aggregate_results return aggregate_results + if name == "TourneyRunner": + from obliteratus.tourney import TourneyRunner + return TourneyRunner + if name == "TourneyResult": + from obliteratus.tourney import TourneyResult + return TourneyResult + if name == "get_adaptive_recommendation": + from obliteratus.adaptive_defaults import get_adaptive_recommendation + return get_adaptive_recommendation + if name == "AdaptiveRecommendation": + from obliteratus.adaptive_defaults import AdaptiveRecommendation + return AdaptiveRecommendation raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/obliteratus/abliterate.py b/obliteratus/abliterate.py index a90ff3c..6c6f3b9 100644 --- a/obliteratus/abliterate.py +++ b/obliteratus/abliterate.py @@ -33,11 +33,12 @@ from typing import Any, Callable import torch import torch.nn as nn +from obliteratus import device as dev # noqa: E402 — must import before CUDA setup + # Reduce CUDA memory fragmentation for large models. Must be set before any # CUDA allocations, so we do it at import time. This is the PyTorch-recommended # fix for "reserved but unallocated" memory issues. -if "PYTORCH_CUDA_ALLOC_CONF" not in os.environ and torch.cuda.is_available(): - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" +dev.configure_cuda_alloc() from obliteratus.models.loader import ModelHandle, load_model # noqa: E402 from obliteratus.strategies.utils import ( # noqa: E402 @@ -76,11 +77,13 @@ METHODS = { "n_directions": 4, "norm_preserve": True, "regularization": 0.3, + "embed_regularization": 0.5, "refinement_passes": 2, "project_biases": True, "use_chat_template": True, "use_whitened_svd": False, "true_iterative_refinement": False, + "layer_adaptive_strength": True, }, "aggressive": { "label": "Aggressive (Full Gabliteration + Enhanced)", @@ -504,6 +507,32 @@ class StageResult: details: dict[str, Any] = field(default_factory=dict) +def auto_hub_repo_id(model_name: str, *, api=None, org: str | None = None) -> str: + """Generate a Hub repo ID like ``{namespace}/{short_model}-OBLITERATED``. + + If *org* is given, uses that as the namespace (e.g. a shared community org). + Otherwise resolves the authenticated HF username via the API. + """ + import re + + if org: + namespace = org + else: + if api is None: + from huggingface_hub import HfApi + api = HfApi() + user_info = api.whoami() + namespace = user_info.get("name") or user_info.get("user", "unknown") + + # Extract short model name (part after '/') + short = model_name.split("/")[-1] if "/" in model_name else model_name + # Sanitize: keep alphanumeric, hyphens, dots + short = re.sub(r"[^a-zA-Z0-9\-.]", "-", short) + short = re.sub(r"-+", "-", short).strip("-") + + return f"{namespace}/{short}-OBLITERATED" + + # ── Main pipeline ─────────────────────────────────────────────────────── class AbliterationPipeline: @@ -534,6 +563,8 @@ class AbliterationPipeline: trust_remote_code: bool = False, method: str = "advanced", push_to_hub: str | None = None, + hub_token: str | None = None, + hub_community_org: str | None = None, n_directions: int | None = None, norm_preserve: bool | None = None, regularization: float | None = None, @@ -593,6 +624,8 @@ class AbliterationPipeline: self.trust_remote_code = trust_remote_code self.large_model_mode = large_model_mode self.push_to_hub = push_to_hub + self.hub_token = hub_token + self.hub_community_org = hub_community_org self.harmful_prompts = list(harmful_prompts) if harmful_prompts is not None else list(HARMFUL_PROMPTS) self.harmless_prompts = list(harmless_prompts) if harmless_prompts is not None else list(HARMLESS_PROMPTS) if not self.harmful_prompts: @@ -758,16 +791,8 @@ class AbliterationPipeline: @staticmethod def _free_gpu_memory(): - """Release unused GPU memory between pipeline stages.""" - import gc - gc.collect() - if torch.cuda.is_available(): - try: - torch.cuda.empty_cache() - except Exception: - # CUDA may be in an error state after illegal memory access; - # swallow so we don't cascade into every subsequent stage. - pass + """Release unused GPU/accelerator memory between pipeline stages.""" + dev.free_gpu_memory() @staticmethod def _get_model_device(model: nn.Module) -> torch.device: @@ -1374,12 +1399,8 @@ class AbliterationPipeline: max_length = self.max_seq_length else: max_length = 384 if collect_multi_pos else 256 - free_gb = 0.0 - if torch.cuda.is_available(): - free_gb = sum( - torch.cuda.mem_get_info(i)[0] / (1024 ** 3) - for i in range(torch.cuda.device_count()) - ) + free_gb = dev.get_total_free_gb() + if dev.is_gpu_available(): if self.max_seq_length is None and free_gb < 2.0: max_length = 64 self.log(f" Low GPU memory ({free_gb:.1f} GB free), using max_length={max_length}") @@ -1969,22 +1990,22 @@ class AbliterationPipeline: # Memory-aware cap: SAE encoder+decoder use # 2 * hidden * (expansion * hidden) * 4 bytes sae_mem_mb = 2 * hidden_dim * (sae_expansion * hidden_dim) * 4 / 1e6 - if torch.cuda.is_available(): + if dev.is_gpu_available(): try: - free_mb = torch.cuda.mem_get_info()[0] / 1e6 + free_mb = dev.get_total_free_gb() * 1024 # Leave 512 MB headroom for other ops while sae_mem_mb > (free_mb - 512) and sae_expansion > 1: sae_expansion //= 2 sae_mem_mb = 2 * hidden_dim * (sae_expansion * hidden_dim) * 4 / 1e6 except Exception: pass # Fallback to hidden_dim-based heuristic - # Use GPU when enough headroom exists (SAE is small relative to model) + # Use GPU/MPS when enough headroom exists (SAE is small relative to model) sae_device = "cpu" - if torch.cuda.is_available(): + if dev.is_gpu_available(): try: - sae_free_mb = torch.cuda.mem_get_info()[0] / 1e6 + sae_free_mb = dev.get_total_free_gb() * 1024 if sae_free_mb > sae_mem_mb + 1024: - sae_device = "cuda" + sae_device = dev.get_device() except Exception: pass sae = train_sae( @@ -4081,9 +4102,12 @@ class AbliterationPipeline: f"Falling back to float weight replacement.", stacklevel=3, ) - proj_module.weight.data = W_modified.to( - device=proj_module.weight.device, - dtype=proj_module.weight.dtype, + # Cannot cast float back to quantized (Byte/uint8) dtype directly — + # PyTorch rejects Float→Byte casts. Replace the entire parameter + # with a float version so projections are preserved. + proj_module.weight = nn.Parameter( + W_modified.to(device=proj_module.weight.device), + requires_grad=False, ) @staticmethod @@ -4122,7 +4146,8 @@ class AbliterationPipeline: continue original_norm = saved_norms[param_name] if original_norm > 0: - data = param.data.float() if not param.data.is_floating_point() else param.data + needs_cast = not param.data.is_floating_point() + data = param.data.float() if needs_cast else param.data new_norm = data.norm().item() if math.isnan(new_norm) or math.isinf(new_norm) or new_norm == 0: continue # Skip — weight is degenerate after projection @@ -4132,7 +4157,12 @@ class AbliterationPipeline: # layers. Uncapped amplification destroys coherence. if ratio > _MAX_NORM_RATIO: ratio = _MAX_NORM_RATIO - param.data.mul_(ratio) + if needs_cast: + # Non-float dtypes (e.g. uint8) can't mul_ by a float + # scalar in-place — rescale in float then cast back. + param.data.copy_(data.mul_(ratio).to(param.data.dtype)) + else: + param.data.mul_(ratio) @staticmethod def _project_out_advanced( @@ -4363,7 +4393,13 @@ class AbliterationPipeline: param.data = quantized param.quant_state = new_state except (ImportError, AttributeError, RuntimeError): - param.data = data + # Cannot cast float back to quantized dtype (Byte) — + # replace the entire parameter with float version. + setattr( + container, + name, + nn.Parameter(data.to(param.device), requires_grad=False), + ) return count return 0 @@ -4953,7 +4989,13 @@ class AbliterationPipeline: param.data = quantized param.quant_state = new_state except (ImportError, AttributeError, RuntimeError): - param.data = data.to(device=param.device, dtype=param.dtype) + # Cannot cast float back to quantized dtype (Byte) — + # replace the entire parameter with float version. + setattr( + container, + pname, + nn.Parameter(data.to(param.device), requires_grad=False), + ) if count > 0: return count @@ -5057,7 +5099,13 @@ class AbliterationPipeline: param.data = quantized param.quant_state = new_state except (ImportError, AttributeError, RuntimeError): - param.data = data.to(device=param.device, dtype=param.dtype) + # Cannot cast float back to quantized dtype (Byte) — + # replace the entire parameter with float version. + setattr( + container, + pname, + nn.Parameter(data.to(param.device), requires_grad=False), + ) if count > 0: return count @@ -5320,16 +5368,19 @@ class AbliterationPipeline: unique_ratio = len(set(words)) / len(words) if unique_ratio > 0.2: coherent_count += 1 - except torch.cuda.OutOfMemoryError: - self._free_gpu_memory() - self.log(" Skipping generation tests (CUDA out of memory — model too large for KV cache)") - generation_failed = True except (RuntimeError, Exception) as e: - err_msg = str(e) - if "CUDA" in err_msg or "illegal" in err_msg.lower(): + if dev.is_oom_error(e): self._free_gpu_memory() - self.log(f" Skipping generation tests (CUDA error: {err_msg[:120]})") + self.log(" Skipping generation tests (out of memory — model too large for KV cache)") generation_failed = True + elif isinstance(e, RuntimeError): + err_msg = str(e) + if "CUDA" in err_msg or "MPS" in err_msg or "illegal" in err_msg.lower(): + self._free_gpu_memory() + self.log(f" Skipping generation tests (device error: {err_msg[:120]})") + generation_failed = True + else: + raise else: raise @@ -5472,18 +5523,21 @@ class AbliterationPipeline: del inputs, outputs self._free_gpu_memory() - except torch.cuda.OutOfMemoryError: - self._free_gpu_memory() - self.log(f" [batch {batch_start+1}-{batch_end}] CUDA OOM — stopping") - self.log(" Skipping remaining refusal tests (CUDA out of memory)") - oom_break = True except (RuntimeError, Exception) as e: - err_msg = str(e) - if "CUDA" in err_msg or "illegal" in err_msg.lower(): + if dev.is_oom_error(e): self._free_gpu_memory() - self.log(f" [batch {batch_start+1}-{batch_end}] CUDA error — stopping") - self.log(f" Skipping remaining refusal tests (CUDA error: {err_msg[:120]})") + self.log(f" [batch {batch_start+1}-{batch_end}] OOM — stopping") + self.log(" Skipping remaining refusal tests (out of memory)") oom_break = True + elif isinstance(e, RuntimeError): + err_msg = str(e) + if "CUDA" in err_msg or "MPS" in err_msg or "illegal" in err_msg.lower(): + self._free_gpu_memory() + self.log(f" [batch {batch_start+1}-{batch_end}] device error — stopping") + self.log(f" Skipping remaining refusal tests (device error: {err_msg[:120]})") + oom_break = True + else: + raise else: raise @@ -5900,11 +5954,19 @@ class AbliterationPipeline: # 5. Optionally push the saved directory to the Hub. if self.push_to_hub: - repo_id = self.push_to_hub - self.log(f"Uploading to Hub: {repo_id}") from huggingface_hub import HfApi - api = HfApi() + api = HfApi(token=self.hub_token) if self.hub_token else HfApi() + + # Resolve "auto" → {namespace}/{short_model}-OBLITERATED + if self.push_to_hub == "auto": + repo_id = auto_hub_repo_id( + self.model_name, api=api, org=self.hub_community_org, + ) + self.log(f"Auto-named Hub repo: {repo_id}") + else: + repo_id = self.push_to_hub + self.log(f"Uploading to Hub: {repo_id}") api.create_repo(repo_id, exist_ok=True) api.upload_folder( folder_path=str(self.output_dir), diff --git a/obliteratus/analysis/sae_abliteration.py b/obliteratus/analysis/sae_abliteration.py index 1655e37..479a474 100644 --- a/obliteratus/analysis/sae_abliteration.py +++ b/obliteratus/analysis/sae_abliteration.py @@ -39,6 +39,7 @@ from dataclasses import dataclass import torch import torch.nn as nn +from obliteratus import device as dev @dataclass @@ -120,11 +121,11 @@ def _auto_detect_device(device: str | None = None) -> str: """ if device is not None and device not in ("auto",): return device - if torch.cuda.is_available(): + if dev.is_gpu_available(): try: - free_mb = torch.cuda.mem_get_info()[0] / 1e6 + free_mb = dev.get_total_free_gb() * 1024 if free_mb > 512: - return "cuda" + return dev.get_device() except Exception: pass return "cpu" diff --git a/obliteratus/device.py b/obliteratus/device.py new file mode 100644 index 0000000..cef780b --- /dev/null +++ b/obliteratus/device.py @@ -0,0 +1,305 @@ +"""Unified device abstraction for CUDA, MPS (Apple Silicon), and CPU. + +All device-specific queries (availability, memory, cache management) go through +this module so the rest of the codebase never calls ``torch.cuda.*`` directly. +""" + +from __future__ import annotations + +import gc +import logging +import os +import platform +from dataclasses import dataclass + +import torch + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Device detection +# --------------------------------------------------------------------------- + +def is_cuda() -> bool: + """True when at least one NVIDIA CUDA GPU is visible.""" + return torch.cuda.is_available() + + +def is_mps() -> bool: + """True when Apple Metal Performance Shaders backend is usable.""" + return ( + hasattr(torch.backends, "mps") + and torch.backends.mps.is_available() + and torch.backends.mps.is_built() + ) + + +def is_gpu_available() -> bool: + """True if *any* GPU backend (CUDA or MPS) is available.""" + return is_cuda() or is_mps() + + +def get_device(preference: str = "auto") -> str: + """Resolve a device string. + + Parameters + ---------- + preference : str + ``"auto"`` picks the best GPU, ``"cuda"``/``"mps"``/``"cpu"`` forces. + + Returns + ------- + str + A PyTorch device string (``"cuda"``, ``"mps"``, or ``"cpu"``). + """ + if preference == "auto": + if is_cuda(): + return "cuda" + if is_mps(): + return "mps" + return "cpu" + return preference + + +def get_device_name() -> str: + """Human-readable name of the current accelerator.""" + if is_cuda(): + return torch.cuda.get_device_name(0) + if is_mps(): + # Apple doesn't expose a per-chip name via MPS; use platform info. + chip = platform.processor() or "Apple Silicon" + return f"Apple {chip} (MPS)" + return "CPU" + + +def device_count() -> int: + """Number of accelerator devices (GPUs or MPS slots).""" + if is_cuda(): + return torch.cuda.device_count() + if is_mps(): + return 1 # MPS always exposes a single unified device + return 0 + + +# --------------------------------------------------------------------------- +# Memory information +# --------------------------------------------------------------------------- + +@dataclass +class MemoryInfo: + """Snapshot of accelerator memory (in GB).""" + + used_gb: float = 0.0 + reserved_gb: float = 0.0 + total_gb: float = 0.0 + free_gb: float = 0.0 + device_name: str = "CPU" + + +def _system_memory_gb() -> tuple[float, float]: + """Return (total_gb, available_gb) of system RAM.""" + try: + import psutil + vm = psutil.virtual_memory() + return vm.total / 1024 ** 3, vm.available / 1024 ** 3 + except ImportError: + pass + try: + total = os.sysconf("SC_PHYS_PAGES") * os.sysconf("SC_PAGE_SIZE") / 1024 ** 3 + # Rough estimate: assume 60 % available if we can't query + return total, total * 0.6 + except (AttributeError, ValueError): + return 16.0, 8.0 # conservative fallback + + +def get_memory_info(device_index: int = 0) -> MemoryInfo: + """Query memory for the given accelerator (or system RAM for MPS/CPU).""" + name = get_device_name() + + if is_cuda(): + try: + free, total = torch.cuda.mem_get_info(device_index) + used = torch.cuda.memory_allocated(device_index) + reserved = torch.cuda.memory_reserved(device_index) + total_gb = total / 1024 ** 3 + return MemoryInfo( + used_gb=used / 1024 ** 3, + reserved_gb=reserved / 1024 ** 3, + total_gb=total_gb, + free_gb=free / 1024 ** 3, + device_name=name, + ) + except Exception: + props = torch.cuda.get_device_properties(device_index) + total_gb = props.total_memory / 1024 ** 3 + return MemoryInfo(total_gb=total_gb, free_gb=total_gb, device_name=name) + + if is_mps(): + # MPS uses unified memory — report system RAM as a proxy. + total, avail = _system_memory_gb() + # Apple's unified memory is shared with the OS, so usable fraction + # is typically ~65-75 % of total. + usable = total * 0.70 + return MemoryInfo( + used_gb=max(usable - avail, 0.0), + reserved_gb=0.0, + total_gb=usable, + free_gb=min(avail, usable), + device_name=name, + ) + + # CPU-only + total, avail = _system_memory_gb() + return MemoryInfo(total_gb=total, free_gb=avail, device_name=name) + + +def get_total_free_gb() -> float: + """Sum of free memory across all accelerator devices, in GB.""" + if is_cuda(): + total_free = 0.0 + for i in range(torch.cuda.device_count()): + try: + free, _ = torch.cuda.mem_get_info(i) + total_free += free / 1024 ** 3 + except Exception: + props = torch.cuda.get_device_properties(i) + total_free += props.total_memory / 1024 ** 3 + return total_free + if is_mps(): + _, avail = _system_memory_gb() + return avail * 0.70 # usable fraction + return 0.0 + + +# --------------------------------------------------------------------------- +# Cache / memory management +# --------------------------------------------------------------------------- + +def empty_cache() -> None: + """Release cached allocations on the current accelerator.""" + if is_cuda(): + torch.cuda.empty_cache() + elif is_mps(): + # torch.mps.empty_cache() available since PyTorch 2.1 + if hasattr(torch.mps, "empty_cache"): + torch.mps.empty_cache() + + +def free_gpu_memory() -> None: + """Aggressive memory cleanup: GC + accelerator cache flush.""" + gc.collect() + if is_cuda(): + try: + torch.cuda.empty_cache() + except Exception: + try: + torch.cuda.synchronize() + except Exception: + pass + try: + torch.cuda.reset_peak_memory_stats() + except Exception: + pass + elif is_mps(): + if hasattr(torch.mps, "empty_cache"): + try: + torch.mps.empty_cache() + except Exception: + pass + if hasattr(torch.mps, "synchronize"): + try: + torch.mps.synchronize() + except Exception: + pass + + +def set_seed_all(seed: int) -> None: + """Set random seed on all available accelerators.""" + torch.manual_seed(seed) + if is_cuda(): + torch.cuda.manual_seed_all(seed) + # MPS shares the CPU random state — no separate seed call needed. + + +# --------------------------------------------------------------------------- +# Dtype helpers +# --------------------------------------------------------------------------- + +def default_dtype(device: str | None = None) -> torch.dtype: + """Sensible default dtype for the given device.""" + dev = device or get_device() + if dev == "cpu": + return torch.float32 + return torch.float16 + + +def supports_bfloat16(device: str | None = None) -> bool: + """Whether *bfloat16* is natively supported on the target device.""" + dev = device or get_device() + if dev.startswith("cuda"): + if is_cuda(): + major, _ = torch.cuda.get_device_capability(0) + return major >= 8 # Ampere+ + return False + if dev == "mps": + # MPS added bfloat16 support in PyTorch 2.3+ + return hasattr(torch, "__version__") and tuple( + int(x) for x in torch.__version__.split(".")[:2] + ) >= (2, 3) + return True # CPU supports bfloat16 on most modern platforms + + +def supports_float64(device: str | None = None) -> bool: + """Whether *float64* is supported (MPS does NOT support it).""" + dev = device or get_device() + return dev != "mps" + + +def safe_svd_dtype(tensor: torch.Tensor) -> torch.dtype: + """Return a dtype safe for SVD on the tensor's device. + + MPS does not support float64, so we cap at float32. + """ + if tensor.device.type == "mps": + return torch.float32 + return torch.float64 if tensor.dtype in (torch.float64, torch.float32) else torch.float32 + + +# --------------------------------------------------------------------------- +# OOM exception matching +# --------------------------------------------------------------------------- + +def is_oom_error(exc: BaseException) -> bool: + """Return True if *exc* is an out-of-memory error on any backend.""" + if isinstance(exc, torch.cuda.OutOfMemoryError): + return True + # MPS raises a generic RuntimeError containing "out of memory" + if isinstance(exc, RuntimeError) and "out of memory" in str(exc).lower(): + return True + return False + + +# --------------------------------------------------------------------------- +# Quantization compatibility +# --------------------------------------------------------------------------- + +def supports_bitsandbytes(device: str | None = None) -> bool: + """BitsAndBytes requires NVIDIA CUDA — check that.""" + dev = device or get_device() + return dev.startswith("cuda") + + +def supports_device_map_auto(device: str | None = None) -> bool: + """Accelerate's device_map='auto' is only reliable on CUDA.""" + dev = device or get_device() + return dev.startswith("cuda") + + +# --------------------------------------------------------------------------- +# CUDA env setup (called once at import time of abliterate.py) +# --------------------------------------------------------------------------- + +def configure_cuda_alloc() -> None: + """Set expandable_segments for CUDA if available.""" + if is_cuda() and "PYTORCH_CUDA_ALLOC_CONF" not in os.environ: + os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" diff --git a/obliteratus/mlx_backend.py b/obliteratus/mlx_backend.py new file mode 100644 index 0000000..4e5a2ca --- /dev/null +++ b/obliteratus/mlx_backend.py @@ -0,0 +1,469 @@ +"""Optional MLX backend for Apple Silicon native inference and weight editing. + +MLX is Apple's array framework that runs natively on the Apple Neural Engine +and Metal GPU. When available, it provides significantly faster inference and +weight manipulation than PyTorch's MPS backend on Apple hardware. + +This module is entirely optional — if ``mlx`` / ``mlx-lm`` are not installed, +``MLX_AVAILABLE`` is ``False`` and all public functions raise ``RuntimeError``. + +Install with:: + + pip install mlx>=0.22 mlx-lm>=0.20 +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Availability check +# --------------------------------------------------------------------------- + +MLX_AVAILABLE = False +_mx = None # mlx module +_mlx_lm = None # mlx-lm module +_mlx_nn = None # mlx.nn module + +try: + import mlx.core as _mx_core # type: ignore[import-untyped] + import mlx.nn as _mlx_nn_mod # type: ignore[import-untyped] + import mlx_lm # type: ignore[import-untyped] + + _mx = _mx_core + _mlx_nn = _mlx_nn_mod + _mlx_lm = mlx_lm + MLX_AVAILABLE = True + logger.info("MLX backend available (mlx %s)", _mx.__version__ if hasattr(_mx, "__version__") else "?") +except ImportError: + pass + + +def _require_mlx() -> None: + if not MLX_AVAILABLE: + raise RuntimeError( + "MLX backend is not available. Install with: pip install mlx>=0.22 mlx-lm>=0.20" + ) + + +# --------------------------------------------------------------------------- +# Model loading +# --------------------------------------------------------------------------- + +class MLXModelHandle: + """Lightweight wrapper around an MLX-loaded model + tokenizer.""" + + def __init__(self, model: Any, tokenizer: Any, model_name: str): + self.model = model + self.tokenizer = tokenizer + self.model_name = model_name + + @property + def config(self) -> Any: + return getattr(self.model, "config", None) + + +def load_model( + model_name: str, + dtype: str = "float16", +) -> MLXModelHandle: + """Load a HuggingFace model via ``mlx-lm`` for Apple-native execution. + + Parameters + ---------- + model_name : str + HuggingFace model identifier (e.g. ``"meta-llama/Llama-3.2-3B-Instruct"``). + dtype : str + One of ``"float16"``, ``"bfloat16"``, ``"float32"``. + + Returns + ------- + MLXModelHandle + Wrapper with ``.model`` and ``.tokenizer`` attributes. + """ + _require_mlx() + + from mlx_lm import load # type: ignore[import-untyped] + + logger.info("Loading %s via MLX (dtype=%s)", model_name, dtype) + model, tokenizer = load(model_name) + + return MLXModelHandle(model=model, tokenizer=tokenizer, model_name=model_name) + + +# --------------------------------------------------------------------------- +# Inference +# --------------------------------------------------------------------------- + +def generate( + handle: MLXModelHandle, + prompt: str, + max_tokens: int = 256, + temperature: float = 0.7, + top_p: float = 0.9, + repetition_penalty: float | None = None, +) -> str: + """Generate text using the MLX model. + + Parameters + ---------- + handle : MLXModelHandle + A loaded MLX model handle. + prompt : str + The input prompt string. + max_tokens : int + Maximum number of tokens to generate. + temperature : float + Sampling temperature. + top_p : float + Nucleus sampling threshold. + repetition_penalty : float or None + Repetition penalty (1.0 = no penalty). + + Returns + ------- + str + Generated text completion. + """ + _require_mlx() + + from mlx_lm import generate as mlx_generate # type: ignore[import-untyped] + + kwargs: dict[str, Any] = { + "max_tokens": max_tokens, + "temp": temperature, + "top_p": top_p, + } + if repetition_penalty is not None: + kwargs["repetition_penalty"] = repetition_penalty + + return mlx_generate( + handle.model, + handle.tokenizer, + prompt=prompt, + **kwargs, + ) + + +# --------------------------------------------------------------------------- +# Activation extraction +# --------------------------------------------------------------------------- + +def get_activations( + handle: MLXModelHandle, + prompts: list[str], + layer_indices: list[int], + max_length: int = 256, +) -> dict[int, list[Any]]: + """Extract hidden-state activations from specified layers. + + Uses MLX's computation graph to capture intermediate outputs. + + Parameters + ---------- + handle : MLXModelHandle + Loaded model. + prompts : list[str] + Input prompts to probe. + layer_indices : list[int] + Which transformer layers to capture. + max_length : int + Maximum sequence length for tokenization. + + Returns + ------- + dict[int, list[mlx.core.array]] + Mapping from layer index to list of activation arrays (one per prompt). + Each array has shape ``(hidden_dim,)`` — the last-token hidden state. + """ + _require_mlx() + import mlx.core as mx # type: ignore[import-untyped] + + model = handle.model + tokenizer = handle.tokenizer + + # Identify the transformer block list + layers = None + for attr in ("model.layers", "transformer.h", "gpt_neox.layers"): + obj = model + try: + for part in attr.split("."): + obj = getattr(obj, part) + layers = obj + break + except AttributeError: + continue + + if layers is None: + raise RuntimeError( + "Cannot locate transformer layers in the MLX model. " + "Supported architectures: LLaMA, GPT-2, GPT-NeoX." + ) + + activations: dict[int, list[Any]] = {idx: [] for idx in layer_indices} + target_set = set(layer_indices) + + for prompt in prompts: + tokens = tokenizer.encode(prompt) + if len(tokens) > max_length: + tokens = tokens[:max_length] + + input_ids = mx.array([tokens]) + + # Forward through embedding + if hasattr(model, "model"): + # LLaMA-style: model.model.embed_tokens + embed_module = model.model + elif hasattr(model, "transformer"): + embed_module = model.transformer + else: + embed_module = model + + if hasattr(embed_module, "embed_tokens"): + h = embed_module.embed_tokens(input_ids) + elif hasattr(embed_module, "wte"): + h = embed_module.wte(input_ids) + else: + raise RuntimeError("Cannot find embedding layer in MLX model") + + # Walk through layers, capturing activations at target indices + for i, layer in enumerate(layers): + h = layer(h) + # Some layers return tuples (hidden, attention) — take first + if isinstance(h, tuple): + h = h[0] + + if i in target_set: + # Last token hidden state + last_hidden = h[0, -1, :] + mx.eval(last_hidden) # force evaluation + activations[i].append(last_hidden) + + return activations + + +# --------------------------------------------------------------------------- +# Weight manipulation +# --------------------------------------------------------------------------- + +def get_weight(handle: MLXModelHandle, layer_idx: int, param_path: str) -> Any: + """Retrieve a weight matrix from the model. + + Parameters + ---------- + handle : MLXModelHandle + Loaded model. + layer_idx : int + Transformer layer index. + param_path : str + Dot-separated path within the layer, e.g. ``"self_attn.o_proj.weight"``. + + Returns + ------- + mlx.core.array + The weight tensor. + """ + _require_mlx() + model = handle.model + + # Navigate to the layer + layers = _get_layers(model) + layer = layers[layer_idx] + + # Navigate the param path + obj = layer + for part in param_path.split("."): + obj = getattr(obj, part) + + return obj + + +def modify_weights( + handle: MLXModelHandle, + layer_idx: int, + param_path: str, + modifier_fn: Callable[[Any], Any], +) -> None: + """Modify a weight matrix in-place using a function. + + Parameters + ---------- + handle : MLXModelHandle + Loaded model. + layer_idx : int + Transformer layer index. + param_path : str + Dot-separated path within the layer to the weight, e.g. + ``"self_attn.o_proj.weight"``. + modifier_fn : callable + Function that takes the current weight (mlx array) and returns the + modified weight (mlx array). For abliteration, this would project + out the refusal direction. + """ + _require_mlx() + import mlx.core as mx # type: ignore[import-untyped] + + model = handle.model + layers = _get_layers(model) + layer = layers[layer_idx] + + # Navigate to the parent module and leaf attribute + parts = param_path.split(".") + parent = layer + for part in parts[:-1]: + parent = getattr(parent, part) + leaf_name = parts[-1] + + old_weight = getattr(parent, leaf_name) + new_weight = modifier_fn(old_weight) + + # MLX uses a functional update pattern + if hasattr(parent, "update"): + parent.update({leaf_name: new_weight}) + else: + setattr(parent, leaf_name, new_weight) + + mx.eval(new_weight) # materialize + + +def project_out_direction(weight: Any, direction: Any) -> Any: + """Project a direction out of a weight matrix (abliteration). + + Given weight matrix W and unit direction d, computes:: + + W' = W - (W @ d) outer d + + Parameters + ---------- + weight : mlx.core.array + Weight matrix, shape ``(out_features, in_features)``. + direction : mlx.core.array + Unit direction vector, shape ``(in_features,)``. + + Returns + ------- + mlx.core.array + Modified weight with direction projected out. + """ + _require_mlx() + import mlx.core as mx # type: ignore[import-untyped] + + d = direction.astype(weight.dtype) + # W @ d gives the component along d for each row + proj = mx.matmul(weight, d[:, None]) # (out, 1) + return weight - mx.matmul(proj, d[None, :]) # (out, in) + + +# --------------------------------------------------------------------------- +# Save model +# --------------------------------------------------------------------------- + +def save_model( + handle: MLXModelHandle, + output_dir: str | Path, + upload_repo: str | None = None, +) -> Path: + """Save the (modified) MLX model to disk. + + Saves in safetensors format compatible with both MLX and HuggingFace. + + Parameters + ---------- + handle : MLXModelHandle + Model handle (possibly with modified weights). + output_dir : str or Path + Directory to save into. + upload_repo : str or None + If set, also uploads to HuggingFace Hub. + + Returns + ------- + Path + The output directory. + """ + _require_mlx() + + from mlx_lm import convert # type: ignore[import-untyped] + + out = Path(output_dir) + out.mkdir(parents=True, exist_ok=True) + + # mlx-lm's save uses safetensors + if hasattr(_mlx_lm, "save_model"): + _mlx_lm.save_model(str(out), handle.model, handle.tokenizer) + else: + # Fallback: manual save via mlx.core.save_safetensors + import mlx.core as mx # type: ignore[import-untyped] + weights = dict(handle.model.parameters()) + flat = {} + _flatten_dict(weights, "", flat) + mx.save_safetensors(str(out / "model.safetensors"), flat) + # Save tokenizer via transformers + handle.tokenizer.save_pretrained(str(out)) + + logger.info("MLX model saved to %s", out) + + if upload_repo: + try: + from mlx_lm import upload_to_hub # type: ignore[import-untyped] + upload_to_hub(str(out), upload_repo) + logger.info("Uploaded to %s", upload_repo) + except (ImportError, AttributeError): + logger.warning("mlx-lm upload not available — push manually with huggingface-cli") + + return out + + +# --------------------------------------------------------------------------- +# Conversion: PyTorch ↔ MLX +# --------------------------------------------------------------------------- + +def torch_tensor_to_mlx(tensor: "torch.Tensor") -> Any: # noqa: F821 + """Convert a PyTorch tensor to an MLX array.""" + _require_mlx() + import mlx.core as mx # type: ignore[import-untyped] + import numpy as np + + # Move to CPU and convert via numpy + np_array = tensor.detach().cpu().float().numpy() + return mx.array(np_array) + + +def mlx_to_torch_tensor(array: Any, device: str = "cpu") -> "torch.Tensor": # noqa: F821 + """Convert an MLX array to a PyTorch tensor.""" + import numpy as np + import torch + + np_array = np.array(array) + return torch.from_numpy(np_array).to(device) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_layers(model: Any) -> Any: + """Locate the transformer block list in an MLX model.""" + for attr_path in ("model.layers", "transformer.h", "gpt_neox.layers"): + obj = model + try: + for part in attr_path.split("."): + obj = getattr(obj, part) + return obj + except AttributeError: + continue + raise RuntimeError("Cannot locate transformer layers in MLX model") + + +def _flatten_dict(d: dict, prefix: str, out: dict) -> None: + """Flatten a nested dict with dot-separated keys.""" + for k, v in d.items(): + key = f"{prefix}{k}" if prefix else k + if isinstance(v, dict): + _flatten_dict(v, f"{key}.", out) + else: + out[key] = v diff --git a/obliteratus/reproducibility.py b/obliteratus/reproducibility.py index 70fec81..463acb1 100644 --- a/obliteratus/reproducibility.py +++ b/obliteratus/reproducibility.py @@ -38,9 +38,9 @@ def set_seed(seed: int = 42, deterministic: bool = True) -> None: try: import torch + from obliteratus import device as dev torch.manual_seed(seed) - if torch.cuda.is_available(): - torch.cuda.manual_seed_all(seed) + dev.set_seed_all(seed) if deterministic: torch.use_deterministic_algorithms(True, warn_only=True) diff --git a/obliteratus/telemetry.py b/obliteratus/telemetry.py index 2d88218..9041041 100644 --- a/obliteratus/telemetry.py +++ b/obliteratus/telemetry.py @@ -429,35 +429,111 @@ def fetch_hub_records(max_records: int = 10000) -> list[dict[str, Any]]: This is used by :func:`get_leaderboard_data` to merge community-wide results with local data. + + Tries three strategies in order: + 1. ``huggingface_hub`` API (preferred on HF Spaces) + 2. Git shallow clone (works anywhere git is installed) + 3. Returns empty list """ repo = _TELEMETRY_REPO + # For adaptive defaults, always try the default repo even locally if not repo: + repo = _DEFAULT_TELEMETRY_REPO + + # Strategy 1: huggingface_hub API + try: + records = _fetch_via_hf_api(repo, max_records) + if records: + return records + except Exception as e: + logger.debug("HF API fetch failed: %s", e) + + # Strategy 2: git shallow clone fallback + try: + records = _fetch_via_git_clone(repo, max_records) + if records: + return records + except Exception as e: + logger.debug("Git clone fetch failed: %s", e) + + return [] + + +def _fetch_via_hf_api(repo: str, max_records: int) -> list[dict[str, Any]]: + """Fetch telemetry via huggingface_hub API.""" + from huggingface_hub import HfApi, hf_hub_download + + api = HfApi(token=os.environ.get("HF_TOKEN")) + try: + all_files = api.list_repo_files(repo, repo_type="dataset") + except Exception: return [] - try: - from huggingface_hub import HfApi, hf_hub_download + jsonl_files = [f for f in all_files if f.startswith("data/") and f.endswith(".jsonl")] + if not jsonl_files: + return [] - api = HfApi(token=os.environ.get("HF_TOKEN")) + records: list[dict[str, Any]] = [] + for filepath in jsonl_files: try: - all_files = api.list_repo_files(repo, repo_type="dataset") + local_path = hf_hub_download( + repo, filepath, repo_type="dataset", + # etag_timeout=0 forces a freshness check against Hub + # so we always get the latest data, not stale cache + etag_timeout=0, + ) + with open(local_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + continue + if len(records) >= max_records: + break except Exception: - # Repo doesn't exist yet or network error + continue + if len(records) >= max_records: + break + + return records + + +def _fetch_via_git_clone(repo: str, max_records: int) -> list[dict[str, Any]]: + """Fetch telemetry via git shallow clone (fallback when huggingface_hub unavailable). + + Uses GIT_LFS_SKIP_SMUDGE=1 for speed — JSONL files are plain text, + not LFS objects, so this works fine. + """ + import shutil + import subprocess + import tempfile + + clone_url = f"https://huggingface.co/datasets/{repo}" + clone_dir = Path(tempfile.mkdtemp(prefix="obliteratus_telemetry_")) + + try: + env = dict(os.environ) + env["GIT_LFS_SKIP_SMUDGE"] = "1" + result = subprocess.run( + ["git", "clone", "--depth", "1", clone_url, str(clone_dir)], + capture_output=True, text=True, timeout=60, env=env, + ) + if result.returncode != 0: + logger.debug("Git clone failed: %s", result.stderr.strip()) return [] - jsonl_files = [f for f in all_files if f.startswith("data/") and f.endswith(".jsonl")] - if not jsonl_files: + # Parse all JSONL files in data/ + data_dir = clone_dir / "data" + if not data_dir.exists(): return [] records: list[dict[str, Any]] = [] - for filepath in jsonl_files: + for jsonl_file in sorted(data_dir.glob("*.jsonl")): try: - local_path = hf_hub_download( - repo, filepath, repo_type="dataset", - # etag_timeout=0 forces a freshness check against Hub - # so we always get the latest data, not stale cache - etag_timeout=0, - ) - with open(local_path) as f: + with open(jsonl_file) as f: for line in f: line = line.strip() if not line: @@ -474,12 +550,8 @@ def fetch_hub_records(max_records: int = 10000) -> list[dict[str, Any]]: break return records - except ImportError: - logger.debug("huggingface_hub not installed — cannot fetch Hub records") - return [] - except Exception as e: - logger.debug(f"Failed to fetch Hub records: {e}") - return [] + finally: + shutil.rmtree(clone_dir, ignore_errors=True) # ── Hub restore (warm-start after rebuild) ──────────────────────────── diff --git a/requirements-apple.txt b/requirements-apple.txt new file mode 100644 index 0000000..cb82c11 --- /dev/null +++ b/requirements-apple.txt @@ -0,0 +1,8 @@ +# Optional Apple Silicon dependencies for native MLX acceleration. +# Install alongside the main requirements on macOS with Apple Silicon: +# +# pip install -r requirements.txt -r requirements-apple.txt +# +# These packages are macOS-only and will fail to install on Linux/Windows. +mlx>=0.22 +mlx-lm>=0.20