mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-08 18:34:58 +02:00
366 lines
16 KiB
Python
366 lines
16 KiB
Python
"""P4B: Secure storage secret rotation / rewrap path.
|
|
|
|
Tests prove:
|
|
- Master key rotates from old secret -> new secret and remains readable
|
|
- Domain key rotates from old secret -> new secret and remains readable
|
|
- Secure JSON created before rotation is still readable after rotation
|
|
- Domain JSON created before rotation is still readable after rotation
|
|
- Old secret fails after successful rotation
|
|
- Wrong old secret fails closed and does not partially rewrite state
|
|
- Missing new secret fails closed
|
|
- Same old/new secret fails closed
|
|
- No passphrase envelopes to rotate fails closed
|
|
- Windows DPAPI path unchanged (skipped by rotation)
|
|
- Raw -> passphrase migration path still works after rotation code is present
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
|
|
def _reset(mod):
|
|
mod._MASTER_KEY_CACHE = None
|
|
mod._DOMAIN_KEY_CACHE.clear()
|
|
|
|
|
|
def _setup_passphrase_env(monkeypatch, tmp_path, secret):
|
|
"""Configure monkeypatch for non-Windows passphrase mode."""
|
|
from services.mesh import mesh_secure_storage
|
|
from services import config as config_mod
|
|
|
|
monkeypatch.setattr(mesh_secure_storage, "DATA_DIR", tmp_path)
|
|
monkeypatch.setattr(mesh_secure_storage, "MASTER_KEY_FILE", tmp_path / "wormhole_secure_store.key")
|
|
monkeypatch.setattr(mesh_secure_storage, "_is_windows", lambda: False)
|
|
monkeypatch.delenv("PYTEST_CURRENT_TEST", raising=False)
|
|
monkeypatch.setenv("MESH_SECURE_STORAGE_SECRET", secret)
|
|
monkeypatch.setattr(
|
|
config_mod,
|
|
"get_settings",
|
|
lambda: SimpleNamespace(
|
|
MESH_ALLOW_RAW_SECURE_STORAGE_FALLBACK=False,
|
|
MESH_SECURE_STORAGE_SECRET=secret,
|
|
),
|
|
)
|
|
_reset(mesh_secure_storage)
|
|
return mesh_secure_storage
|
|
|
|
|
|
class TestMasterKeyRotation:
|
|
def test_master_key_rotates_and_remains_readable(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
original_key = mod._load_master_key()
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
assert result["ok"] is True
|
|
assert "wormhole_secure_store.key" in result["rotated"]
|
|
|
|
# Reload with new secret
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "new-secret")
|
|
loaded_key = mod._load_master_key()
|
|
assert loaded_key == original_key
|
|
|
|
def test_old_secret_fails_after_rotation(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
# Try loading with old secret — must fail
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
with pytest.raises(mod.SecureStorageError, match="Failed to unwrap"):
|
|
mod._load_master_key()
|
|
|
|
|
|
class TestDomainKeyRotation:
|
|
def test_domain_key_rotates_and_remains_readable(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
original_key = mod._load_domain_key("testdomain", base_dir=tmp_path)
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
assert result["ok"] is True
|
|
assert "testdomain.key" in result["rotated"]
|
|
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "new-secret")
|
|
loaded_key = mod._load_domain_key("testdomain", base_dir=tmp_path)
|
|
assert loaded_key == original_key
|
|
|
|
def test_multiple_domain_keys_rotate(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
# Create master key first so it's included in rotation
|
|
mod._load_master_key()
|
|
key_a = mod._load_domain_key("domain_a", base_dir=tmp_path)
|
|
key_b = mod._load_domain_key("domain_b", base_dir=tmp_path)
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
assert len(result["rotated"]) == 3 # master + 2 domains
|
|
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "new-secret")
|
|
assert mod._load_domain_key("domain_a", base_dir=tmp_path) == key_a
|
|
assert mod._load_domain_key("domain_b", base_dir=tmp_path) == key_b
|
|
|
|
|
|
class TestSecureJsonSurvivesRotation:
|
|
def test_secure_json_readable_after_rotation(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
json_path = tmp_path / "secret_data.json"
|
|
mod.write_secure_json(json_path, {"classified": "intel"})
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "new-secret")
|
|
data = mod.read_secure_json(json_path, lambda: {})
|
|
assert data == {"classified": "intel"}
|
|
|
|
def test_domain_json_readable_after_rotation(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod.write_domain_json("gate_persona", "state.json", {"persona": "anon"}, base_dir=tmp_path)
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "new-secret")
|
|
data = mod.read_domain_json("gate_persona", "state.json", lambda: {}, base_dir=tmp_path)
|
|
assert data == {"persona": "anon"}
|
|
|
|
|
|
class TestRotationFailsClosed:
|
|
def test_wrong_old_secret_fails_without_partial_rewrite(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "correct-secret")
|
|
mod._load_master_key()
|
|
mod._load_domain_key("testdomain", base_dir=tmp_path)
|
|
|
|
# Capture file contents before failed rotation
|
|
master_before = (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8")
|
|
domain_before = (tmp_path / "_domain_keys" / "testdomain.key").read_text(encoding="utf-8")
|
|
|
|
with pytest.raises(mod.SecureStorageError, match="Old secret cannot unwrap"):
|
|
mod.rotate_storage_secret("wrong-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
# Files must be unchanged
|
|
assert (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8") == master_before
|
|
assert (tmp_path / "_domain_keys" / "testdomain.key").read_text(encoding="utf-8") == domain_before
|
|
|
|
def test_missing_new_secret_fails(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
with pytest.raises(mod.SecureStorageError, match="New secret is required"):
|
|
mod.rotate_storage_secret("old-secret", "", base_dir=tmp_path)
|
|
|
|
def test_missing_old_secret_fails(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
with pytest.raises(mod.SecureStorageError, match="Old secret is required"):
|
|
mod.rotate_storage_secret("", "new-secret", base_dir=tmp_path)
|
|
|
|
def test_same_old_new_secret_fails(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "same-secret")
|
|
mod._load_master_key()
|
|
|
|
with pytest.raises(mod.SecureStorageError, match="must differ"):
|
|
mod.rotate_storage_secret("same-secret", "same-secret", base_dir=tmp_path)
|
|
|
|
def test_no_passphrase_envelopes_fails(self, tmp_path, monkeypatch):
|
|
from services.mesh import mesh_secure_storage
|
|
|
|
# Empty directory — no envelopes at all
|
|
with pytest.raises(mesh_secure_storage.SecureStorageError, match="No passphrase-protected envelopes"):
|
|
mesh_secure_storage.rotate_storage_secret("old", "new", base_dir=tmp_path)
|
|
|
|
|
|
class TestDPAPISkippedDuringRotation:
|
|
@pytest.mark.skipif(os.name != "nt", reason="DPAPI only available on Windows")
|
|
def test_dpapi_envelopes_skipped_not_broken(self, tmp_path, monkeypatch):
|
|
from services.mesh import mesh_secure_storage
|
|
|
|
monkeypatch.setattr(mesh_secure_storage, "DATA_DIR", tmp_path)
|
|
monkeypatch.setattr(mesh_secure_storage, "MASTER_KEY_FILE", tmp_path / "wormhole_secure_store.key")
|
|
_reset(mesh_secure_storage)
|
|
|
|
# Create a DPAPI envelope (Windows default)
|
|
key = mesh_secure_storage._load_master_key()
|
|
envelope_before = (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8")
|
|
assert json.loads(envelope_before)["provider"] == "dpapi-machine"
|
|
|
|
# Rotation should fail with "no passphrase envelopes" — DPAPI is skipped
|
|
with pytest.raises(mesh_secure_storage.SecureStorageError, match="No passphrase-protected envelopes"):
|
|
mesh_secure_storage.rotate_storage_secret("old", "new", base_dir=tmp_path)
|
|
|
|
# DPAPI envelope untouched
|
|
assert (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8") == envelope_before
|
|
|
|
|
|
class TestRawMigrationNotRegressed:
|
|
"""P4A raw -> passphrase migration still works with rotation code present."""
|
|
|
|
def test_raw_to_passphrase_migration_still_works(self, tmp_path, monkeypatch):
|
|
from services.mesh import mesh_secure_storage
|
|
from services import config as config_mod
|
|
|
|
monkeypatch.setattr(mesh_secure_storage, "DATA_DIR", tmp_path)
|
|
monkeypatch.setattr(mesh_secure_storage, "MASTER_KEY_FILE", tmp_path / "wormhole_secure_store.key")
|
|
monkeypatch.setattr(mesh_secure_storage, "_is_windows", lambda: False)
|
|
|
|
# Create raw envelope
|
|
raw_key = os.urandom(32)
|
|
envelope = mesh_secure_storage._master_envelope_for_fallback(raw_key)
|
|
(tmp_path / "wormhole_secure_store.key").write_text(json.dumps(envelope), encoding="utf-8")
|
|
|
|
# Set up with secret and no raw fallback
|
|
monkeypatch.delenv("PYTEST_CURRENT_TEST", raising=False)
|
|
monkeypatch.setenv("MESH_SECURE_STORAGE_SECRET", "migration-secret")
|
|
monkeypatch.setattr(
|
|
config_mod,
|
|
"get_settings",
|
|
lambda: SimpleNamespace(
|
|
MESH_ALLOW_RAW_SECURE_STORAGE_FALLBACK=False,
|
|
MESH_SECURE_STORAGE_SECRET="migration-secret",
|
|
),
|
|
)
|
|
_reset(mesh_secure_storage)
|
|
|
|
loaded = mesh_secure_storage._load_master_key()
|
|
assert loaded == raw_key
|
|
|
|
migrated = json.loads((tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8"))
|
|
assert migrated["provider"] == "passphrase"
|
|
|
|
|
|
class TestRotationSkipsNonPassphrase:
|
|
def test_raw_envelopes_skipped_in_rotation(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
# Create a passphrase master key
|
|
mod._load_master_key()
|
|
|
|
# Manually write a raw domain key alongside
|
|
raw_domain_key = os.urandom(32)
|
|
dk_dir = tmp_path / "_domain_keys"
|
|
dk_dir.mkdir(parents=True, exist_ok=True)
|
|
raw_envelope = mod._domain_key_envelope_for_fallback("rawdomain", raw_domain_key)
|
|
(dk_dir / "rawdomain.key").write_text(json.dumps(raw_envelope), encoding="utf-8")
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
assert "rawdomain.key" in result["skipped"]
|
|
assert "wormhole_secure_store.key" in result["rotated"]
|
|
|
|
# Raw domain key file unchanged
|
|
raw_after = json.loads((dk_dir / "rawdomain.key").read_text(encoding="utf-8"))
|
|
assert raw_after["provider"] == "raw"
|
|
|
|
|
|
class TestDryRunMode:
|
|
"""Dry-run validates without writing anything."""
|
|
|
|
def test_dry_run_returns_would_rotate_without_writing(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
mod._load_domain_key("testdomain", base_dir=tmp_path)
|
|
|
|
master_before = (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8")
|
|
domain_before = (tmp_path / "_domain_keys" / "testdomain.key").read_text(encoding="utf-8")
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path, dry_run=True)
|
|
|
|
assert result["ok"] is True
|
|
assert result["dry_run"] is True
|
|
assert "wormhole_secure_store.key" in result["would_rotate"]
|
|
assert "testdomain.key" in result["would_rotate"]
|
|
assert "rotated" not in result
|
|
assert "backups" not in result
|
|
|
|
# Files must be unchanged
|
|
assert (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8") == master_before
|
|
assert (tmp_path / "_domain_keys" / "testdomain.key").read_text(encoding="utf-8") == domain_before
|
|
|
|
def test_dry_run_fails_on_wrong_old_secret(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "correct-secret")
|
|
mod._load_master_key()
|
|
|
|
with pytest.raises(mod.SecureStorageError, match="Old secret cannot unwrap"):
|
|
mod.rotate_storage_secret("wrong-secret", "new-secret", base_dir=tmp_path, dry_run=True)
|
|
|
|
def test_dry_run_no_bak_files_created(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path, dry_run=True)
|
|
|
|
bak_files = list(tmp_path.rglob("*.bak"))
|
|
assert bak_files == []
|
|
|
|
def test_dry_run_reports_skipped_non_passphrase(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
# Add a raw domain key
|
|
dk_dir = tmp_path / "_domain_keys"
|
|
dk_dir.mkdir(parents=True, exist_ok=True)
|
|
raw_envelope = mod._domain_key_envelope_for_fallback("rawdomain", os.urandom(32))
|
|
(dk_dir / "rawdomain.key").write_text(json.dumps(raw_envelope), encoding="utf-8")
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path, dry_run=True)
|
|
assert "rawdomain.key" in result["skipped"]
|
|
assert "wormhole_secure_store.key" in result["would_rotate"]
|
|
|
|
|
|
class TestPreRotationBackups:
|
|
"""Phase 2a creates .bak copies before rewriting envelopes."""
|
|
|
|
def test_rotation_creates_bak_files(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
mod._load_domain_key("testdomain", base_dir=tmp_path)
|
|
|
|
master_before = (tmp_path / "wormhole_secure_store.key").read_text(encoding="utf-8")
|
|
domain_before = (tmp_path / "_domain_keys" / "testdomain.key").read_text(encoding="utf-8")
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
assert "wormhole_secure_store.key.bak" in result["backups"]
|
|
assert "testdomain.key.bak" in result["backups"]
|
|
|
|
# .bak files contain the old envelopes
|
|
assert (tmp_path / "wormhole_secure_store.key.bak").read_text(encoding="utf-8") == master_before
|
|
assert (tmp_path / "_domain_keys" / "testdomain.key.bak").read_text(encoding="utf-8") == domain_before
|
|
|
|
def test_backup_contains_old_secret_envelope(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
original_key = mod._load_master_key()
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
# The .bak envelope should be unwrappable with the old secret
|
|
bak_envelope = json.loads((tmp_path / "wormhole_secure_store.key.bak").read_text(encoding="utf-8"))
|
|
assert bak_envelope["provider"] == "passphrase"
|
|
recovered_key = mod._passphrase_unwrap(bak_envelope, "old-secret")
|
|
assert recovered_key == original_key
|
|
|
|
def test_rotation_result_includes_backups_list(self, tmp_path, monkeypatch):
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
mod._load_master_key()
|
|
|
|
result = mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
assert "backups" in result
|
|
assert len(result["backups"]) == len(result["rotated"])
|
|
|
|
def test_old_secret_still_works_via_bak_after_rotation(self, tmp_path, monkeypatch):
|
|
"""Operator can recover by restoring .bak files if they lose the new secret."""
|
|
mod = _setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
original_key = mod._load_master_key()
|
|
|
|
mod.rotate_storage_secret("old-secret", "new-secret", base_dir=tmp_path)
|
|
|
|
# Simulate restore: copy .bak back over the rotated file
|
|
import shutil
|
|
shutil.copy2(
|
|
str(tmp_path / "wormhole_secure_store.key.bak"),
|
|
str(tmp_path / "wormhole_secure_store.key"),
|
|
)
|
|
|
|
_setup_passphrase_env(monkeypatch, tmp_path, "old-secret")
|
|
recovered = mod._load_master_key()
|
|
assert recovered == original_key
|