Merge pull request #36 from StellaAthena/main

Add multi-GPU and remote support
This commit is contained in:
pliny
2026-03-26 15:50:29 -07:00
committed by GitHub
8 changed files with 1182 additions and 8 deletions
+188
View File
@@ -415,6 +415,194 @@ Includes pre-liberated variants (Dolphin, Hermes, WhiteRabbitNeo) for A/B compar
obliteratus models
```
## Multi-GPU and remote execution
OBLITERATUS automatically shards models across multiple GPUs when they don't fit on a single card. It also supports remote execution over SSH, so you can run the pipeline on a GPU server from your laptop.
### How model sharding works
When you have multiple GPUs, OBLITERATUS uses accelerate's `device_map="auto"` to split the model's layers across all available GPUs. This is **naive pipeline parallelism** — layers are distributed evenly, but only one GPU computes at a time as activations flow sequentially through the layer stack. The other GPUs hold their assigned layers in memory but are idle until their turn.
This means multi-GPU sharding is a **memory solution, not a speed solution**. It lets you run models that don't fit on one GPU, but it won't make small models run faster. In fact, more GPUs can be *slower* due to inter-GPU data transfer overhead at layer boundaries.
### Selecting GPUs
Use `--gpus` to control which GPUs are used:
```bash
# Use all 8 GPUs (default)
obliteratus obliterate bigmodel/200B --gpus all
# Use only GPUs 0-3
obliteratus obliterate bigmodel/200B --gpus 0,1,2,3
# Use a specific pair
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct --gpus 2,5
```
This sets `CUDA_VISIBLE_DEVICES` before CUDA initializes. The model is then sharded across the selected GPUs.
### Precision and quantization
The `--dtype` flag controls the precision of model weights, which directly determines how much VRAM you need. Lower precision means smaller memory footprint at the cost of some numerical fidelity:
| Dtype | Bytes/param | 7B model | 70B model | 405B model |
|-------|-----------|---------|----------|-----------|
| `float32` | 4 | 28 GB | 280 GB | 1620 GB |
| `float16` / `bfloat16` | 2 | 14 GB | 140 GB | 810 GB |
| `int8` (via `--quantization bitsandbytes-8bit`) | 1 | 7 GB | 70 GB | 405 GB |
| `int4` (via `--quantization bitsandbytes-4bit`) | 0.5 | 3.5 GB | 35 GB | 203 GB |
```bash
# Default: bfloat16
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct
# 8-bit quantization — fits on fewer GPUs
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct \
--quantization bitsandbytes-8bit
# 4-bit quantization — Llama-405B on 4x A100-80GB
obliteratus obliterate meta-llama/Llama-3.1-405B-Instruct \
--quantization bitsandbytes-4bit --dtype float16
```
Quantization roughly halves the GPU count at each step down. A 70B model that needs 3x A100-80GB in bf16 fits on 2 in int8 or 1 in int4.
### GPU calculator
Not sure how many GPUs you need? The `gpu-calc` command estimates the minimum GPU count for any model, accounting for weight memory, activation overhead, and CUDA context:
```bash
# Auto-detect from HuggingFace model name
obliteratus gpu-calc meta-llama/Llama-3.1-70B-Instruct --gpu-mem 24
# Manual: specify params and precision
obliteratus gpu-calc --params 70 --dtype bfloat16 --gpu-mem 80
# MoE models: specify active params separately
obliteratus gpu-calc --params 117 --active-params 13 --dtype bfloat16 --gpu-mem 80
```
The calculator fetches the model config from HuggingFace to estimate parameter counts (including MoE expert structure), then shows a table of GPU configurations with headroom estimates. For MoE models, activation overhead is computed from the active parameter count rather than total parameters.
### Pipeline parallel benchmarks
We benchmarked the full abliteration pipeline across varying numbers of A100-80GB GPUs on two large models.
**GPT-OSS-120B** (117B MoE, ~234 GB in bf16):
| GPUs | Total time | VRAM/GPU | Notes |
|------|-----------|----------|-------|
| 3 | **FAILED** | ~78 GB | Not enough headroom for activations; some layers offloaded to CPU as meta tensors, crashes during EXCISE |
| 4 | **615s** (10m15s) | ~58 GB | Fastest. Fewest inter-GPU transfers. Snapshot auto-skipped (insufficient free VRAM) |
| 5 | 763s (12m43s) | ~47 GB | +24% slower than 4 GPUs |
| 6 | 766s (12m46s) | ~39 GB | +25% slower than 4 GPUs |
| 8 | 633s (10m33s) | ~29 GB | +3% slower than 4 GPUs. Ran CPU-side state dict snapshot (adds ~20s) |
**DeepSeek-R1-Distill-Llama-70B** (70B dense, ~149 GB in bf16, 80 layers):
| GPUs | Total time | VRAM/GPU | Notes |
|------|-----------|----------|-------|
| 2 | **FAILED** | ~75 GB | Meta tensor crash — 149 GB model on 160 GB total VRAM leaves no activation headroom |
| 3 | **536s** (8m56s) | ~50 GB | Fastest. Minimum viable GPU count for this model |
| 4 | 626s (10m26s) | ~37 GB | +17% slower than 3 GPUs |
| 8 | 627s (10m27s) | ~19 GB | +17% slower than 3 GPUs. No benefit over 4 |
Stage breakdown (approximately constant across GPU counts):
| Stage | GPT-OSS-120B | DeepSeek-70B | Bottleneck |
|-------|-------------|-------------|-----------|
| SUMMON (load) | ~11s | ~24s | Disk I/O (model cached locally) |
| PROBE (activations) | ~20s | ~20s | Forward passes through sharded model |
| DISTILL + EXCISE | ~30s | ~30s | SVD + weight projection (CPU-bound) |
| VERIFY | ~210s | ~270s | Forward passes on validation prompts |
| REBIRTH (save) | ~350s | ~194s | Writing model to disk (234 GB vs 141 GB) |
Key findings:
- **Use the minimum number of GPUs that fits your model.** Extra GPUs only add cross-device transfer overhead. 4 GPUs was faster than 8 for GPT-OSS-120B; 3 GPUs was fastest for DeepSeek-70B.
- **The pipeline is I/O-dominated for large models.** VERIFY and REBIRTH together account for ~90% of wall time. The actual compute (PROBE, DISTILL, EXCISE) is fast regardless of GPU count.
- **Leave headroom.** The model needs VRAM beyond just its parameter storage — activation tensors, KV cache, and intermediate computations during PROBE and VERIFY all consume memory. 3x A100-80GB (240 GB) was not enough for a 234 GB model; 2x A100-80GB (160 GB) was not enough for a 149 GB model.
- **Pipeline parallelism doesn't help compute-bound stages.** Since only one GPU computes at a time, doubling GPUs doesn't halve PROBE or VERIFY time. It only enables fitting larger models.
### When you actually need data parallelism
For models that fit on a single GPU with room to spare, the PROBE stage (which runs 1024 forward passes to collect activations) is the main computational bottleneck. Pipeline parallelism doesn't help here — it still processes one prompt at a time through the full layer stack.
True data parallelism (replicating the model and splitting prompts across GPUs) can speed up PROBE, but it requires enough VRAM to hold a full copy of the model on each GPU. An experimental pre-replicated data parallel implementation is available on the `data-parallel-prereplication` branch:
```bash
git checkout data-parallel-prereplication
obliteratus obliterate EleutherAI/pythia-12b --data-parallel
```
This deep-copies the model to each GPU once, then distributes prompt batches across replicas using a thread pool. Benchmarks on Pythia 12B (24 GB model, 8x A100-80GB):
| Mode | PROBE time | Notes |
|------|-----------|-------|
| Single GPU | 7.1s | Baseline |
| Pre-replicated DP (8 GPUs) | 7.7s | Near parity — PROBE is too fast at this scale for parallelism to help |
Data parallelism becomes more valuable as the prompt count or model size increases relative to the per-forward-pass cost. For most models, the overhead of replication exceeds the time saved.
### Remote execution over SSH
Run the full pipeline on a remote GPU node from your local machine. OBLITERATUS handles SSH connection, auto-installs itself on the remote if needed, streams logs in real time, and copies results back when done.
```bash
# Basic remote run
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct \
--remote user@gpu-node
# With SSH key and custom options
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct \
--remote root@10.0.0.5 \
--ssh-key ~/.ssh/id_rsa \
--ssh-port 2222 \
--remote-dir /data/obliteratus \
--remote-python python3.11
# Don't copy results back (keep on remote only)
obliteratus obliterate meta-llama/Llama-3.1-70B-Instruct \
--remote user@gpu-node --no-sync
```
Remote execution also works with `obliteratus run` (YAML configs) and `obliteratus tourney` (method comparison). You can specify remote settings in YAML:
```yaml
model:
name: meta-llama/Llama-3.1-70B-Instruct
dtype: float16
remote:
host: gpu-node
user: root
ssh_key: ~/.ssh/id_rsa
remote_dir: /tmp/obliteratus_run
gpus: "0,1,2,3" # select GPUs on the remote
sync_results: true # copy results back when done
```
The remote runner:
1. Tests SSH connectivity
2. Detects GPUs on the remote (`nvidia-smi`)
3. Installs obliteratus if not already present
4. Uploads config files if using `obliteratus run`
5. Runs the pipeline with real-time log streaming
6. Copies results back via SCP
### Choosing the right setup
| Scenario | Recommendation |
|----------|---------------|
| Model fits on 1 GPU | Use 1 GPU. Adding more won't help and may slow things down. |
| Model almost fits on 1 GPU | Try `--quantization bitsandbytes-8bit` or `bitsandbytes-4bit` to reduce memory. Halving precision roughly halves VRAM. |
| Model fits on 1 GPU, PROBE is slow (many prompts) | Try `data-parallel-prereplication` branch. Only helps if model fits on each GPU with room for activations. |
| Model doesn't fit on 1 GPU | Use `--gpus` with the **minimum** number of GPUs that fits. Run `obliteratus gpu-calc` to find that number. |
| Model needs 4+ GPUs | Pipeline parallel via `device_map="auto"` is the only option. Expect I/O-dominated runtimes for very large models. Consider quantization first — int4 can cut the GPU count by 4x. |
| Not sure how many GPUs you need | Run `obliteratus gpu-calc <model> --gpu-mem <your_vram>` for an estimate. |
| No local GPUs | Use `--remote user@gpu-node` to run on a remote machine, or use HuggingFace Spaces / Colab. |
## 10 study presets
Pre-configured ablation studies you can run out of the box:
+47
View File
@@ -0,0 +1,47 @@
# Example: Run an ablation study on a remote GPU node via SSH.
#
# Usage:
# obliteratus run examples/remote_gpu_node.yaml
#
# The 'remote' section tells Obliteratus to SSH into the specified host,
# install obliteratus if needed, run the pipeline there, and copy results
# back to the local machine.
#
# You can also use --remote on any command instead of a YAML section:
# obliteratus obliterate meta-llama/Llama-3.1-8B-Instruct --remote root@gpu-node --ssh-key ~/.ssh/id_rsa
#
# Multi-GPU: Models are automatically split across all available GPUs via
# accelerate's device_map="auto". Use --gpus or the gpus: field to select
# specific GPUs:
# obliteratus obliterate model --remote root@gpu-node --gpus 0,1,2,3
model:
name: meta-llama/Llama-3.1-8B-Instruct
task: causal_lm
dtype: float16
device: auto
dataset:
name: wikitext
split: test
max_samples: 500
strategies:
- name: layer_removal
params:
layer_indices: [10, 11, 12]
metrics: [perplexity]
batch_size: 8
max_length: 512
output_dir: results/remote_example
remote:
host: gpu-node.example.com
user: root
port: 22
ssh_key: ~/.ssh/id_rsa
remote_dir: /tmp/obliteratus_run
python: python3
sync_results: true
# gpus: "0,1,2,3" # uncomment to select specific GPUs (default: all)
+8
View File
@@ -17,6 +17,8 @@ __all__ = [
"TourneyResult",
"get_adaptive_recommendation",
"AdaptiveRecommendation",
"RemoteRunner",
"RemoteConfig",
]
@@ -60,4 +62,10 @@ def __getattr__(name):
if name == "AdaptiveRecommendation":
from obliteratus.adaptive_defaults import AdaptiveRecommendation
return AdaptiveRecommendation
if name == "RemoteRunner":
from obliteratus.remote import RemoteRunner
return RemoteRunner
if name == "RemoteConfig":
from obliteratus.remote import RemoteConfig
return RemoteConfig
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+5
View File
@@ -0,0 +1,5 @@
"""Allow running obliteratus as ``python -m obliteratus``."""
from obliteratus.cli import main
main()
+462 -4
View File
@@ -22,6 +22,70 @@ _BANNER = r"""
"""
def _add_gpu_args(parser):
"""Add --gpus flag for multi-GPU control."""
gpu_group = parser.add_argument_group("GPU selection")
gpu_group.add_argument(
"--gpus", type=str, default=None, metavar="IDS",
help=(
"Comma-separated GPU IDs to use (e.g. '0,1,2,3' or 'all'). "
"Sets CUDA_VISIBLE_DEVICES. By default uses all available GPUs. "
"Models are automatically split across selected GPUs via accelerate."
),
)
def _add_remote_args(parser):
"""Add --remote execution flags to a subcommand parser."""
remote_group = parser.add_argument_group("remote execution")
remote_group.add_argument(
"--remote", type=str, default=None, metavar="[USER@]HOST",
help="Run on a remote GPU node via SSH (e.g. root@gpu-node or just gpu-node)",
)
remote_group.add_argument(
"--ssh-key", type=str, default=None,
help="Path to SSH private key (default: use SSH agent or ~/.ssh/id_rsa)",
)
remote_group.add_argument(
"--ssh-port", type=int, default=22,
help="SSH port on remote host (default: 22)",
)
remote_group.add_argument(
"--remote-dir", type=str, default="/tmp/obliteratus_run",
help="Working directory on the remote machine (default: /tmp/obliteratus_run)",
)
remote_group.add_argument(
"--remote-python", type=str, default="python3",
help="Python binary on the remote machine (default: python3)",
)
remote_group.add_argument(
"--no-sync", action="store_true", default=False,
help="Don't copy results back to local machine after remote run",
)
def _apply_gpu_selection(args):
"""Set CUDA_VISIBLE_DEVICES based on --gpus flag (for local runs only)."""
import os
gpus = getattr(args, "gpus", None)
if gpus is None or getattr(args, "remote", None):
return # skip for remote runs (handled by remote runner)
if gpus.lower() == "all":
return # use all GPUs (default behavior)
# Validate: should be comma-separated integers
try:
gpu_ids = [int(g.strip()) for g in gpus.split(",")]
except ValueError:
console.print(f"[red]Invalid --gpus value: {gpus!r}. Expected comma-separated integers or 'all'.[/]")
raise SystemExit(1)
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(g) for g in gpu_ids)
console.print(f"[dim]Using GPUs: {gpu_ids} (CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']})[/dim]")
def main(argv: list[str] | None = None):
console.print(_BANNER)
parser = argparse.ArgumentParser(
@@ -40,6 +104,8 @@ def main(argv: list[str] | None = None):
default=None,
help="Apply a preset (e.g. quick, full, attention, jailbreak, guardrail)",
)
_add_gpu_args(run_parser)
_add_remote_args(run_parser)
# --- info ---
info_parser = subparsers.add_parser("info", help="Print model architecture info")
@@ -144,9 +210,13 @@ def main(argv: list[str] | None = None):
help="One-click: remove refusal directions from a model (SOTA multi-technique)",
)
_add_obliterate_args(abl_parser)
_add_gpu_args(abl_parser)
_add_remote_args(abl_parser)
# Backward-compat alias (hidden from help)
abl_alias = subparsers.add_parser("abliterate", help=argparse.SUPPRESS)
_add_obliterate_args(abl_alias)
_add_gpu_args(abl_alias)
_add_remote_args(abl_alias)
# --- report ---
report_parser = subparsers.add_parser("report", help="Regenerate report from saved results")
@@ -180,6 +250,8 @@ def main(argv: list[str] | None = None):
"--methods", type=str, nargs="+", default=None,
help="Override: only run these methods (space-separated)",
)
_add_gpu_args(tourney_parser)
_add_remote_args(tourney_parser)
# --- recommend ---
recommend_parser = subparsers.add_parser(
@@ -194,10 +266,46 @@ def main(argv: list[str] | None = None):
help="Also show global cross-architecture insights",
)
# --- gpu-calc ---
calc_parser = subparsers.add_parser(
"gpu-calc",
help="Estimate minimum GPUs needed for a model",
)
calc_parser.add_argument(
"model", type=str, nargs="?", default=None,
help="HuggingFace model name/path (auto-fetches param counts)",
)
calc_parser.add_argument(
"--params", type=float, default=None, metavar="B",
help="Total parameters in billions (overrides auto-detection)",
)
calc_parser.add_argument(
"--active-params", type=float, default=None, metavar="B",
help="Active parameters in billions (for MoE models; defaults to --params)",
)
calc_parser.add_argument(
"--dtype", type=str, default="bfloat16",
choices=["float32", "float16", "bfloat16", "int8", "int4"],
help="Data type for model weights (default: bfloat16)",
)
calc_parser.add_argument(
"--gpu-mem", type=float, default=80.0, metavar="GB",
help="VRAM per GPU in GB (default: 80 for A100-80GB)",
)
args = parser.parse_args(argv)
if args.command == "run":
_cmd_run(args)
# Apply GPU selection early (before any CUDA init)
_apply_gpu_selection(args)
if args.command == "gpu-calc":
_cmd_gpu_calc(args)
return
elif args.command == "run":
if getattr(args, "remote", None):
_cmd_remote_run(args)
else:
_cmd_run(args)
elif args.command == "interactive":
_cmd_interactive()
elif args.command == "models":
@@ -217,9 +325,15 @@ def main(argv: list[str] | None = None):
elif args.command == "recommend":
_cmd_recommend(args)
elif args.command == "tourney":
_cmd_tourney(args)
if getattr(args, "remote", None):
_cmd_remote_tourney(args)
else:
_cmd_tourney(args)
elif args.command in ("obliterate", "abliterate"):
_cmd_abliterate(args)
if getattr(args, "remote", None):
_cmd_remote_abliterate(args)
else:
_cmd_abliterate(args)
def _cmd_ui(args):
@@ -314,6 +428,34 @@ def _cmd_run(args):
config = StudyConfig.from_dict(raw)
if args.output_dir:
config.output_dir = args.output_dir
# If YAML has a remote: section, dispatch to remote runner
if config.remote is not None:
from obliteratus.remote import RemoteConfig as _RC, RemoteRunner
rc = _RC(
host=config.remote.host,
user=config.remote.user,
port=config.remote.port,
ssh_key=config.remote.ssh_key,
remote_dir=config.remote.remote_dir,
python=config.remote.python,
sync_results=config.remote.sync_results,
gpus=config.remote.gpus,
)
runner = RemoteRunner(rc)
result_path = runner.run_config(
local_config_path=args.config,
local_output_dir=config.output_dir,
preset=args.preset,
)
if result_path:
console.print(f"\n[bold green]Remote run complete.[/] Results at: [cyan]{result_path}[/]")
else:
console.print("[red]Remote run failed. Check logs above.[/]")
raise SystemExit(1)
return
run_study(config)
@@ -653,5 +795,321 @@ def _cmd_abliterate(args):
)
def _cmd_gpu_calc(args):
import math
from rich.panel import Panel
from rich.table import Table
BYTES_PER_PARAM = {
"float32": 4,
"float16": 2,
"bfloat16": 2,
"int8": 1,
"int4": 0.5,
}
# Resolve param counts
total_params_b = args.params
active_params_b = args.active_params
if total_params_b is None:
if args.model is None:
console.print("[red]Provide either a model name or --params.[/]")
raise SystemExit(1)
console.print(f"Fetching config for [cyan]{args.model}[/]...")
try:
from transformers import AutoConfig
config = AutoConfig.from_pretrained(args.model, trust_remote_code=True)
except Exception as e:
console.print(f"[red]Could not load config: {e}[/]")
raise SystemExit(1)
# Total params: prefer explicit num_parameters, else estimate from config
total_params_b = _estimate_total_params_b(config)
# Active params for MoE
if active_params_b is None:
active_params_b = _estimate_active_params_b(config, total_params_b)
if active_params_b is None:
active_params_b = total_params_b
bpp = BYTES_PER_PARAM[args.dtype]
gpu_mem_gb = args.gpu_mem
# Model weight memory (use base-10 GB to match HF/nvidia conventions)
weight_gb = total_params_b * bpp
# Activation overhead during forward passes (PROBE/VERIFY).
# Scales with active params, not total. Empirical from benchmarks:
# - DeepSeek-70B (149GB): failed at 160GB (2 GPUs), OK at 240GB (3 GPUs)
# - GPT-OSS-120B (234GB): failed at 240GB (3 GPUs), OK at 320GB (4 GPUs)
# This implies ~15-35% overhead. We use 20% as a reasonable middle ground.
active_weight_gb = active_params_b * bpp
activation_overhead_gb = active_weight_gb * 0.20
# CUDA context + fragmentation overhead: ~1.5 GB per GPU (fixed cost)
cuda_overhead_per_gpu = 1.5
# Total memory needed (before splitting across GPUs)
total_needed_gb = weight_gb + activation_overhead_gb
# Find minimum GPUs: we need total_needed / (gpu_mem - cuda_overhead) GPUs
usable_per_gpu = gpu_mem_gb - cuda_overhead_per_gpu
if usable_per_gpu <= 0:
console.print("[red]GPU memory too small after CUDA overhead.[/]")
raise SystemExit(1)
min_gpus = math.ceil(total_needed_gb / usable_per_gpu)
min_gpus = max(min_gpus, 1)
# Show results for a range of GPU counts
is_moe = active_params_b < total_params_b * 0.99
table = Table(title="GPU Configurations", show_edge=True)
table.add_column("GPUs", justify="right", style="cyan")
table.add_column("VRAM/GPU", justify="right")
table.add_column("Total VRAM", justify="right")
table.add_column("Headroom", justify="right")
table.add_column("Verdict", min_width=20)
# Show from min_gpus-1 (to show why it fails) up to 8
low = max(1, min_gpus - 1)
high = max(min_gpus + 3, 8)
for n in range(low, high + 1):
total_vram = n * gpu_mem_gb
usable_vram = n * usable_per_gpu
headroom = usable_vram - total_needed_gb
headroom_pct = headroom / total_needed_gb * 100
vram_per = total_needed_gb / n
if headroom < 0:
verdict = "[red]INSUFFICIENT[/]"
elif headroom_pct < 15:
verdict = "[yellow]TIGHT — may fail[/]"
elif n == min_gpus:
verdict = "[bold green]MINIMUM (recommended)[/]"
else:
verdict = "[green]OK[/] [dim](more GPUs = slower)[/]"
table.add_row(
str(n),
f"{vram_per:.1f} GB",
f"{total_vram:.0f} GB",
f"{headroom:+.1f} GB ({headroom_pct:+.0f}%)",
verdict,
)
model_label = args.model or f"{total_params_b:.1f}B params"
moe_line = ""
if is_moe:
moe_line = f"\n Active params: [cyan]{active_params_b:.1f}B[/] ({active_params_b/total_params_b*100:.0f}% of total — MoE)"
console.print(Panel(
f" Model: [cyan]{model_label}[/]\n"
f" Total params: [cyan]{total_params_b:.1f}B[/]"
f"{moe_line}\n"
f" Dtype: [cyan]{args.dtype}[/] ({bpp} bytes/param)\n"
f" Weight memory: [cyan]{weight_gb:.1f} GB[/]\n"
f" Activation est: [cyan]{activation_overhead_gb:.1f} GB[/]\n"
f" Total needed: [bold]{total_needed_gb:.1f} GB[/]\n"
f" GPU VRAM: [cyan]{gpu_mem_gb:.0f} GB[/] per device",
title="[bold]GPU Calculator[/]",
border_style="cyan",
))
console.print(table)
console.print(
f"\n [bold green]Minimum GPUs: {min_gpus}[/]"
f" ({min_gpus} x {gpu_mem_gb:.0f} GB = {min_gpus * gpu_mem_gb:.0f} GB)\n"
)
console.print(
"[dim]Note: fewer GPUs = faster (pipeline parallel has cross-device overhead).\n"
"Estimates are conservative. Actual memory may vary with sequence length\n"
"and model architecture. See 'obliteratus obliterate --help' for runtime options.[/]\n"
)
def _estimate_total_params_b(config) -> float:
"""Estimate total parameter count in billions from a HuggingFace config."""
# Some configs have explicit param counts
for attr in ("num_parameters", "n_params"):
val = getattr(config, attr, None)
if val and val > 1000:
return val / 1e9
# Estimate from architecture dimensions
h = getattr(config, "hidden_size", 0)
L = getattr(config, "num_hidden_layers", 0)
V = getattr(config, "vocab_size", 0)
i = getattr(config, "intermediate_size", h * 4)
if h == 0 or L == 0:
console.print("[red]Cannot determine model size from config. Use --params.[/]")
raise SystemExit(1)
n_heads = getattr(config, "num_attention_heads", None) or (h // 128)
head_dim = getattr(config, "head_dim", None) or (h // n_heads if n_heads else 128)
kv_heads = getattr(config, "num_key_value_heads", None) or n_heads
# Attention: Q + K + V projections + output projection
attn_params = h * (n_heads * head_dim) + h * (kv_heads * head_dim) * 2 + (n_heads * head_dim) * h
# FFN (MoE or dense)
n_experts = getattr(config, "num_local_experts", getattr(config, "num_experts", 1)) or 1
# MoE models often have a separate intermediate size for expert FFNs
moe_i = getattr(config, "moe_intermediate_size", i)
# gate + up + down projections per expert
ffn_per_expert = h * moe_i * 3
ffn_params = ffn_per_expert * n_experts
# Some architectures (Qwen, DeepSeek) also have a shared/dense FFN per layer
if n_experts > 1 and hasattr(config, "moe_intermediate_size"):
# The dense FFN uses the main intermediate_size
ffn_params += h * i * 3
# Router
if n_experts > 1:
ffn_params += h * n_experts
# Per-layer: attention + FFN + layernorms
layer_params = attn_params + ffn_params + h * 4 # 2 layernorms, 2 params each
# Embedding + LM head
embed_params = V * h * 2 # input + output embeddings (may be tied but counts for memory)
total = L * layer_params + embed_params
return total / 1e9
def _estimate_active_params_b(config, total_params_b: float) -> float:
"""For MoE models, estimate active parameters per forward pass."""
n_experts = getattr(config, "num_local_experts", getattr(config, "num_experts", 1)) or 1
if n_experts <= 1:
return total_params_b
top_k = getattr(config, "num_experts_per_tok", getattr(config, "top_k", 2)) or 2
h = getattr(config, "hidden_size", 0)
i = getattr(config, "intermediate_size", h * 4)
moe_i = getattr(config, "moe_intermediate_size", i)
L = getattr(config, "num_hidden_layers", 0)
# FFN per expert (uses moe_intermediate_size if available)
ffn_per_expert = h * moe_i * 3
# Active FFN = top_k experts instead of all n_experts
ffn_all = ffn_per_expert * n_experts * L
ffn_active = ffn_per_expert * top_k * L
# Non-FFN params (includes any shared/dense FFN)
non_ffn = total_params_b * 1e9 - ffn_all
active = non_ffn + ffn_active
return max(active / 1e9, 0.1)
def _make_remote_runner(args):
"""Create a RemoteRunner from CLI --remote flags."""
from obliteratus.remote import RemoteConfig, RemoteRunner
rc = RemoteConfig.from_cli_args(
args.remote,
port=args.ssh_port,
ssh_key=args.ssh_key,
remote_dir=args.remote_dir,
python=args.remote_python,
sync_results=not args.no_sync,
gpus=getattr(args, "gpus", None),
)
return RemoteRunner(rc)
def _cmd_remote_abliterate(args):
from rich.panel import Panel
runner = _make_remote_runner(args)
kwargs = {}
if args.method:
kwargs["method"] = args.method
if args.device:
kwargs["device"] = args.device
if args.dtype:
kwargs["dtype"] = args.dtype
if args.quantization:
kwargs["quantization"] = args.quantization
if args.n_directions is not None:
kwargs["n_directions"] = args.n_directions
if getattr(args, "direction_method", None):
kwargs["direction_method"] = args.direction_method
if args.regularization is not None:
kwargs["regularization"] = args.regularization
if args.refinement_passes is not None:
kwargs["refinement_passes"] = args.refinement_passes
if getattr(args, "large_model", False):
kwargs["large_model"] = True
if getattr(args, "verify_sample_size", None) is not None:
kwargs["verify_sample_size"] = args.verify_sample_size
result_path = runner.run_obliterate(
model=args.model,
local_output_dir=args.output_dir,
**kwargs,
)
if result_path:
console.print(
Panel(
f"[bold green]Remote abliteration complete![/]\n\n"
f" Results at: [cyan]{result_path}[/]\n\n"
f" [dim]Load with:[/] AutoModelForCausalLM.from_pretrained('{result_path}')",
border_style="green",
title="[bold green]REBIRTH COMPLETE (remote)[/]",
)
)
else:
console.print("[red]Remote abliteration failed. Check logs above.[/]")
raise SystemExit(1)
def _cmd_remote_run(args):
runner = _make_remote_runner(args)
result_path = runner.run_config(
local_config_path=args.config,
local_output_dir=args.output_dir,
preset=args.preset,
)
if result_path:
console.print(f"\n[bold green]Remote run complete.[/] Results at: [cyan]{result_path}[/]")
else:
console.print("[red]Remote run failed. Check logs above.[/]")
raise SystemExit(1)
def _cmd_remote_tourney(args):
from rich.panel import Panel
runner = _make_remote_runner(args)
result_path = runner.run_tourney(
model=args.model,
local_output_dir=args.output_dir,
device=args.device,
dtype=args.dtype,
quantization=args.quantization,
methods=args.methods,
hub_org=args.hub_org,
hub_repo=args.hub_repo,
dataset=args.dataset,
)
if result_path:
console.print(
Panel(
f"[bold green]Remote tournament complete![/]\n\n"
f" Results at: [cyan]{result_path}[/]",
border_style="green",
title="[bold green]TOURNAMENT COMPLETE (remote)[/]",
)
)
else:
console.print("[red]Remote tournament failed. Check logs above.[/]")
raise SystemExit(1)
if __name__ == "__main__":
main()
+20
View File
@@ -35,6 +35,20 @@ class StrategyConfig:
params: dict[str, Any] = field(default_factory=dict)
@dataclass
class RemoteConfig:
"""Optional remote execution settings for running on a GPU node via SSH."""
host: str
user: str = "root"
port: int = 22
ssh_key: str | None = None
remote_dir: str = "/tmp/obliteratus_run"
python: str = "python3"
sync_results: bool = True
gpus: str | None = None # comma-separated GPU IDs or "all"
@dataclass
class StudyConfig:
"""Top-level configuration for an ablation run."""
@@ -46,6 +60,7 @@ class StudyConfig:
batch_size: int = 8
max_length: int = 512
output_dir: str = "results"
remote: RemoteConfig | None = None
@classmethod
def from_yaml(cls, path: str | Path) -> StudyConfig:
@@ -82,6 +97,10 @@ class StudyConfig:
model = ModelConfig(**d["model"])
dataset = DatasetConfig(**d["dataset"])
strategies = [StrategyConfig(**s) for s in d["strategies"]]
remote = None
if "remote" in d and d["remote"]:
remote = RemoteConfig(**d["remote"])
return cls(
model=model,
dataset=dataset,
@@ -90,6 +109,7 @@ class StudyConfig:
batch_size=d.get("batch_size", 8),
max_length=d.get("max_length", 512),
output_dir=d.get("output_dir", "results"),
remote=remote,
)
def to_dict(self) -> dict:
+17 -4
View File
@@ -312,14 +312,27 @@ class ModelHandle:
)
def snapshot(self):
"""Save a deep copy of the model state dict so we can restore after ablation."""
self._original_state = copy.deepcopy(self.model.state_dict())
"""Save a copy of the model state dict so we can restore after ablation.
Tensors are moved to CPU to avoid doubling GPU memory usage on
multi-GPU (device_map) setups.
"""
self._original_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()}
def restore(self):
"""Restore the model to the snapshot state."""
"""Restore the model to the snapshot state.
Moves CPU-saved tensors back to each parameter's current device.
"""
if self._original_state is None:
raise RuntimeError("No snapshot to restore — call .snapshot() first.")
self.model.load_state_dict(self._original_state)
# Map each key to the device where the model currently holds it
current_state = self.model.state_dict()
restored = {}
for k, v in self._original_state.items():
target = current_state[k].device if k in current_state else None
restored[k] = v.to(target) if target is not None else v
self.model.load_state_dict(restored)
def cleanup(self):
"""Remove temporary offload directory if one was auto-created."""
+435
View File
@@ -0,0 +1,435 @@
"""Remote execution support for Obliteratus.
Run abliteration pipelines on remote GPU nodes via SSH. The remote machine
must have CUDA-capable GPUs and a Python environment. Obliteratus will be
auto-installed if not present.
Usage (CLI):
obliteratus obliterate meta-llama/Llama-3.1-8B-Instruct \
--remote user@gpu-node \
--ssh-key ~/.ssh/id_rsa
Usage (YAML config):
remote:
host: gpu-node
user: root
ssh_key: ~/.ssh/id_rsa
remote_dir: /tmp/obliteratus_run
"""
from __future__ import annotations
import os
import shlex
import subprocess
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable
from rich.console import Console
console = Console()
@dataclass
class RemoteConfig:
"""SSH connection and remote execution settings."""
host: str
user: str = "root"
port: int = 22
ssh_key: str | None = None
remote_dir: str = "/tmp/obliteratus_run"
install_timeout: int = 600 # seconds
python: str = "python3" # remote python binary
sync_results: bool = True
gpus: str | None = None # comma-separated GPU IDs or "all"
@property
def ssh_target(self) -> str:
return f"{self.user}@{self.host}"
@classmethod
def from_cli_args(cls, remote_str: str, **kwargs) -> RemoteConfig:
"""Parse 'user@host' or just 'host' from CLI --remote flag."""
if "@" in remote_str:
user, host = remote_str.rsplit("@", 1)
else:
user = "root"
host = remote_str
return cls(host=host, user=user, **kwargs)
@classmethod
def from_dict(cls, d: dict) -> RemoteConfig:
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
class RemoteRunner:
"""Execute Obliteratus commands on a remote machine via SSH."""
def __init__(
self,
config: RemoteConfig,
on_log: Callable[[str], None] | None = None,
):
self.config = config
self.on_log = on_log or (lambda msg: console.print(f"[dim][remote][/] {msg}"))
def _ssh_base_cmd(self) -> list[str]:
"""Build base SSH command with common options."""
cmd = [
"ssh",
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=30",
"-p", str(self.config.port),
]
if self.config.ssh_key:
key_path = os.path.expanduser(self.config.ssh_key)
cmd.extend(["-i", key_path])
cmd.append(self.config.ssh_target)
return cmd
def _scp_base_cmd(self) -> list[str]:
"""Build base SCP command."""
cmd = [
"scp",
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-P", str(self.config.port),
"-r",
]
if self.config.ssh_key:
key_path = os.path.expanduser(self.config.ssh_key)
cmd.extend(["-i", key_path])
return cmd
def run_ssh(self, remote_cmd: str, stream: bool = False, timeout: int | None = None) -> subprocess.CompletedProcess | int:
"""Run a command on the remote host.
If stream=True, streams stdout/stderr in real-time and returns the
exit code. Otherwise returns CompletedProcess.
"""
cmd = self._ssh_base_cmd() + [remote_cmd]
if stream:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
try:
for line in proc.stdout:
line = line.rstrip("\n")
self.on_log(line)
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
self.on_log("[red]Remote command timed out[/]")
return 1
return proc.returncode
else:
return subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
)
def check_connection(self) -> bool:
"""Verify SSH connectivity."""
self.on_log(f"Testing SSH connection to {self.config.ssh_target}...")
result = self.run_ssh("echo ok", timeout=30)
if isinstance(result, subprocess.CompletedProcess) and result.returncode == 0:
self.on_log("SSH connection OK")
return True
self.on_log("[red]SSH connection failed[/]")
return False
def check_gpu(self) -> str | None:
"""Check for CUDA GPUs on remote. Returns nvidia-smi output or None."""
result = self.run_ssh(
"nvidia-smi --query-gpu=index,name,memory.total,memory.free --format=csv,noheader",
timeout=30,
)
if isinstance(result, subprocess.CompletedProcess) and result.returncode == 0:
gpu_info = result.stdout.strip()
lines = gpu_info.split("\n")
self.on_log(f"Remote GPUs ({len(lines)} detected):")
for line in lines:
self.on_log(f" {line.strip()}")
if self.config.gpus and self.config.gpus.lower() != "all":
self.on_log(f" Selected GPUs: {self.config.gpus}")
else:
self.on_log(f" Using: all {len(lines)} GPUs")
return gpu_info
self.on_log("[yellow]No GPUs detected on remote (nvidia-smi failed)[/]")
return None
def _env_prefix(self) -> str:
"""Build environment variable prefix for remote commands (e.g. CUDA_VISIBLE_DEVICES)."""
parts = []
if self.config.gpus and self.config.gpus.lower() != "all":
parts.append(f"CUDA_VISIBLE_DEVICES={self.config.gpus}")
return " ".join(parts) + " " if parts else ""
def ensure_obliteratus(self) -> bool:
"""Install or update obliteratus on the remote if needed."""
# Check if already installed
check = self.run_ssh(
f"{self.config.python} -c \"import obliteratus; print(obliteratus.__version__)\"",
timeout=30,
)
if isinstance(check, subprocess.CompletedProcess) and check.returncode == 0:
version = check.stdout.strip()
self.on_log(f"Obliteratus {version} already installed on remote")
return True
# Install from PyPI or git
self.on_log("Installing obliteratus on remote...")
install_cmd = (
f"{self.config.python} -m pip install --quiet "
f"git+https://github.com/StellaAthena/OBLITERATUS.git"
)
rc = self.run_ssh(install_cmd, stream=True, timeout=self.config.install_timeout)
if rc != 0:
self.on_log("[red]Failed to install obliteratus on remote[/]")
return False
self.on_log("Obliteratus installed successfully")
return True
def sync_results_back(self, remote_output_dir: str, local_output_dir: str) -> bool:
"""Copy results from remote back to local machine via scp."""
local_path = Path(local_output_dir)
local_path.mkdir(parents=True, exist_ok=True)
self.on_log(f"Syncing results: {self.config.ssh_target}:{remote_output_dir} -> {local_output_dir}")
cmd = self._scp_base_cmd() + [
f"{self.config.ssh_target}:{remote_output_dir}/",
str(local_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode == 0:
self.on_log(f"Results synced to {local_output_dir}")
return True
else:
self.on_log(f"[red]SCP failed: {result.stderr}[/]")
return False
def build_obliterate_command(
self,
model: str,
output_dir: str | None = None,
method: str = "advanced",
device: str = "auto",
dtype: str = "float16",
quantization: str | None = None,
n_directions: int | None = None,
direction_method: str | None = None,
regularization: float | None = None,
refinement_passes: int | None = None,
large_model: bool = False,
verify_sample_size: int | None = None,
) -> str:
"""Build the remote obliteratus CLI command."""
remote_output = output_dir or f"{self.config.remote_dir}/output/{model.replace('/', '_')}"
parts = [
self._env_prefix() + self.config.python, "-m", "obliteratus",
"obliterate", shlex.quote(model),
"--output-dir", shlex.quote(remote_output),
"--method", method,
"--device", device,
"--dtype", dtype,
]
if quantization:
parts.extend(["--quantization", quantization])
if n_directions is not None:
parts.extend(["--n-directions", str(n_directions)])
if direction_method:
parts.extend(["--direction-method", direction_method])
if regularization is not None:
parts.extend(["--regularization", str(regularization)])
if refinement_passes is not None:
parts.extend(["--refinement-passes", str(refinement_passes)])
if large_model:
parts.append("--large-model")
if verify_sample_size is not None:
parts.extend(["--verify-sample-size", str(verify_sample_size)])
return " ".join(parts)
def build_run_command(self, remote_config_path: str, output_dir: str | None = None, preset: str | None = None) -> str:
"""Build remote 'obliteratus run' command."""
parts = [
self._env_prefix() + self.config.python, "-m", "obliteratus",
"run", shlex.quote(remote_config_path),
]
if output_dir:
parts.extend(["--output-dir", shlex.quote(output_dir)])
if preset:
parts.extend(["--preset", preset])
return " ".join(parts)
def build_tourney_command(
self,
model: str,
output_dir: str | None = None,
device: str = "auto",
dtype: str = "float16",
quantization: str | None = None,
methods: list[str] | None = None,
hub_org: str | None = None,
hub_repo: str | None = None,
dataset: str = "builtin",
) -> str:
"""Build remote 'obliteratus tourney' command."""
remote_output = output_dir or f"{self.config.remote_dir}/tourney/{model.replace('/', '_')}"
parts = [
self._env_prefix() + self.config.python, "-m", "obliteratus",
"tourney", shlex.quote(model),
"--output-dir", shlex.quote(remote_output),
"--device", device,
"--dtype", dtype,
"--dataset", dataset,
]
if quantization:
parts.extend(["--quantization", quantization])
if hub_org:
parts.extend(["--hub-org", hub_org])
if hub_repo:
parts.extend(["--hub-repo", hub_repo])
if methods:
parts.extend(["--methods"] + methods)
return " ".join(parts)
def upload_config(self, local_config_path: str) -> str:
"""Upload a YAML config file to the remote."""
remote_path = f"{self.config.remote_dir}/config.yaml"
self.run_ssh(f"mkdir -p {shlex.quote(self.config.remote_dir)}")
cmd = self._scp_base_cmd()
# scp uses -P not -p, already handled in _scp_base_cmd
cmd += [local_config_path, f"{self.config.ssh_target}:{remote_path}"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise RuntimeError(f"Failed to upload config: {result.stderr}")
self.on_log(f"Config uploaded to {remote_path}")
return remote_path
def run_obliterate(
self,
model: str,
local_output_dir: str | None = None,
**kwargs,
) -> str | None:
"""Full remote obliteration: setup, run, sync results.
Returns local path to results, or None on failure.
"""
# 1. Verify connection
if not self.check_connection():
return None
# 2. Check GPUs
self.check_gpu()
# 3. Ensure obliteratus is installed
if not self.ensure_obliteratus():
return None
# 4. Create remote working directory
self.run_ssh(f"mkdir -p {shlex.quote(self.config.remote_dir)}")
# 5. Build and run the command
remote_output = f"{self.config.remote_dir}/output/{model.replace('/', '_')}"
cmd = self.build_obliterate_command(model, output_dir=remote_output, **kwargs)
self.on_log(f"Running: {cmd}")
rc = self.run_ssh(cmd, stream=True)
if rc != 0:
self.on_log(f"[red]Remote obliteration failed (exit code {rc})[/]")
return None
# 6. Sync results back
if self.config.sync_results:
local_output = local_output_dir or f"abliterated/{model.replace('/', '_')}"
if self.sync_results_back(remote_output, local_output):
return local_output
return None
self.on_log(f"Results on remote: {remote_output}")
return remote_output
def run_config(
self,
local_config_path: str,
local_output_dir: str | None = None,
preset: str | None = None,
) -> str | None:
"""Upload config, run study remotely, sync results."""
if not self.check_connection():
return None
self.check_gpu()
if not self.ensure_obliteratus():
return None
# Upload config
remote_config = self.upload_config(local_config_path)
# Determine remote output dir
remote_output = f"{self.config.remote_dir}/results"
cmd = self.build_run_command(remote_config, output_dir=remote_output, preset=preset)
self.on_log(f"Running: {cmd}")
rc = self.run_ssh(cmd, stream=True)
if rc != 0:
self.on_log(f"[red]Remote run failed (exit code {rc})[/]")
return None
if self.config.sync_results:
local_output = local_output_dir or "results"
if self.sync_results_back(remote_output, local_output):
return local_output
return None
return remote_output
def run_tourney(
self,
model: str,
local_output_dir: str | None = None,
**kwargs,
) -> str | None:
"""Run tournament remotely, sync results."""
if not self.check_connection():
return None
self.check_gpu()
if not self.ensure_obliteratus():
return None
remote_output = f"{self.config.remote_dir}/tourney/{model.replace('/', '_')}"
cmd = self.build_tourney_command(model, output_dir=remote_output, **kwargs)
self.on_log(f"Running: {cmd}")
rc = self.run_ssh(cmd, stream=True)
if rc != 0:
self.on_log(f"[red]Remote tourney failed (exit code {rc})[/]")
return None
if self.config.sync_results:
local_output = local_output_dir or f"/tmp/obliteratus_tourney/{model.replace('/', '_')}"
if self.sync_results_back(remote_output, local_output):
return local_output
return None
return remote_output