mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-08 10:24:48 +02:00
release: prepare v0.9.7
This commit is contained in:
@@ -0,0 +1,179 @@
|
||||
"""S5C DM Ciphertext Bucket Padding — prove padding envelope correctness.
|
||||
|
||||
Tests:
|
||||
- Padded payload length rounds to PAD_BUCKET_STEP
|
||||
- encrypt_dm + decrypt_dm round-trip returns original plaintext
|
||||
- Nearby plaintexts collapse into same bucket size
|
||||
- Legacy unpadded MLS ciphertext still decrypts
|
||||
- Truncated padding envelope is rejected
|
||||
"""
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit tests for the padding helpers directly
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_pad_rounds_to_bucket_step():
|
||||
"""Padded output length must be a multiple of PAD_BUCKET_STEP."""
|
||||
from services.mesh.mesh_dm_mls import PAD_BUCKET_STEP, _pad_plaintext
|
||||
|
||||
for size in [0, 1, 100, 500, 504, 505, 512, 1000, 2048, 4096]:
|
||||
padded = _pad_plaintext(b"x" * size)
|
||||
assert len(padded) % PAD_BUCKET_STEP == 0, f"size={size} → len={len(padded)}"
|
||||
assert len(padded) >= size + 8 # header is 8 bytes
|
||||
|
||||
|
||||
def test_pad_unpad_round_trip():
|
||||
"""_pad_plaintext followed by _unpad_plaintext returns the original bytes."""
|
||||
from services.mesh.mesh_dm_mls import _pad_plaintext, _unpad_plaintext
|
||||
|
||||
for msg in [b"", b"hello", b"x" * 504, b"x" * 505, b"x" * 1024, b"\xff" * 4096]:
|
||||
assert _unpad_plaintext(_pad_plaintext(msg)) == msg
|
||||
|
||||
|
||||
def test_nearby_sizes_same_bucket():
|
||||
"""Plaintexts of different nearby sizes must collapse into the same padded length."""
|
||||
from services.mesh.mesh_dm_mls import PAD_BUCKET_STEP, PAD_HEADER_SIZE, _pad_plaintext
|
||||
|
||||
# All sizes 1..100 should fit within the first bucket (header + data ≤ 512)
|
||||
lengths = {len(_pad_plaintext(b"a" * n)) for n in range(1, 101)}
|
||||
assert len(lengths) == 1, f"Expected 1 bucket, got {lengths}"
|
||||
assert lengths.pop() == PAD_BUCKET_STEP
|
||||
|
||||
|
||||
def test_bucket_boundary_steps_up():
|
||||
"""Once plaintext + header exceeds one bucket, the next bucket is used."""
|
||||
from services.mesh.mesh_dm_mls import PAD_BUCKET_STEP, PAD_HEADER_SIZE, _pad_plaintext
|
||||
|
||||
# Exactly fills one bucket: header(8) + data(504) = 512
|
||||
fits = _pad_plaintext(b"x" * (PAD_BUCKET_STEP - PAD_HEADER_SIZE))
|
||||
assert len(fits) == PAD_BUCKET_STEP
|
||||
|
||||
# One byte over spills into second bucket
|
||||
spills = _pad_plaintext(b"x" * (PAD_BUCKET_STEP - PAD_HEADER_SIZE + 1))
|
||||
assert len(spills) == PAD_BUCKET_STEP * 2
|
||||
|
||||
|
||||
def test_legacy_unpadded_passthrough():
|
||||
"""Bytes without SBP1 magic are returned unchanged (legacy compatibility)."""
|
||||
from services.mesh.mesh_dm_mls import _unpad_plaintext
|
||||
|
||||
legacy = b"plain old text without padding"
|
||||
assert _unpad_plaintext(legacy) == legacy
|
||||
|
||||
# Also test short data
|
||||
assert _unpad_plaintext(b"") == b""
|
||||
assert _unpad_plaintext(b"SBP") == b"SBP" # too short for header
|
||||
|
||||
|
||||
def test_truncated_padded_payload_rejected():
|
||||
"""A valid magic but truncated body must raise an error."""
|
||||
from services.mesh.mesh_dm_mls import PAD_MAGIC, _unpad_plaintext
|
||||
from services.privacy_core_client import PrivacyCoreError
|
||||
|
||||
# Claim original_len = 1000, but only provide 10 bytes of body
|
||||
bad = PAD_MAGIC + struct.pack(">I", 1000) + b"x" * 10
|
||||
with pytest.raises(PrivacyCoreError, match="truncated"):
|
||||
_unpad_plaintext(bad)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration tests through the full encrypt_dm / decrypt_dm seam
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _fresh_dm_mls_state(tmp_path, monkeypatch):
|
||||
from services import wormhole_supervisor
|
||||
from services.mesh import mesh_dm_mls, mesh_dm_relay, mesh_secure_storage, mesh_wormhole_persona
|
||||
|
||||
monkeypatch.setattr(mesh_secure_storage, "DATA_DIR", tmp_path)
|
||||
monkeypatch.setattr(mesh_secure_storage, "MASTER_KEY_FILE", tmp_path / "wormhole_secure_store.key")
|
||||
monkeypatch.setattr(mesh_wormhole_persona, "DATA_DIR", tmp_path)
|
||||
monkeypatch.setattr(mesh_wormhole_persona, "PERSONA_FILE", tmp_path / "wormhole_persona.json")
|
||||
monkeypatch.setattr(
|
||||
mesh_wormhole_persona,
|
||||
"LEGACY_DM_IDENTITY_FILE",
|
||||
tmp_path / "wormhole_identity.json",
|
||||
)
|
||||
monkeypatch.setattr(mesh_dm_mls, "DATA_DIR", tmp_path)
|
||||
monkeypatch.setattr(mesh_dm_mls, "STATE_FILE", tmp_path / "wormhole_dm_mls.json")
|
||||
monkeypatch.setattr(mesh_dm_relay, "DATA_DIR", tmp_path)
|
||||
monkeypatch.setattr(mesh_dm_relay, "RELAY_FILE", tmp_path / "dm_relay.json")
|
||||
monkeypatch.setattr(
|
||||
mesh_dm_mls,
|
||||
"get_wormhole_state",
|
||||
lambda: {"configured": True, "ready": True, "arti_ready": True, "rns_ready": True},
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
wormhole_supervisor,
|
||||
"get_wormhole_state",
|
||||
lambda: {"configured": True, "ready": True, "arti_ready": True, "rns_ready": True},
|
||||
)
|
||||
relay = mesh_dm_relay.DMRelay()
|
||||
monkeypatch.setattr(mesh_dm_relay, "dm_relay", relay)
|
||||
mesh_dm_mls.reset_dm_mls_state(clear_privacy_core=True, clear_persistence=True)
|
||||
return mesh_dm_mls, relay
|
||||
|
||||
|
||||
def _establish_session(dm_mls):
|
||||
"""Helper: create alice→bob MLS session and return dm_mls module."""
|
||||
bob_bundle = dm_mls.export_dm_key_package_for_alias("bob")
|
||||
assert bob_bundle["ok"] is True
|
||||
initiated = dm_mls.initiate_dm_session("alice", "bob", bob_bundle)
|
||||
assert initiated["ok"] is True
|
||||
accepted = dm_mls.accept_dm_session("bob", "alice", initiated["welcome"])
|
||||
assert accepted["ok"] is True
|
||||
|
||||
|
||||
def test_encrypt_decrypt_round_trip_through_mls(tmp_path, monkeypatch):
|
||||
"""encrypt_dm + decrypt_dm must round-trip the original plaintext with padding active."""
|
||||
dm_mls, _ = _fresh_dm_mls_state(tmp_path, monkeypatch)
|
||||
_establish_session(dm_mls)
|
||||
|
||||
original = "hello bob, this is a secret message"
|
||||
encrypted = dm_mls.encrypt_dm("alice", "bob", original)
|
||||
assert encrypted["ok"] is True
|
||||
|
||||
decrypted = dm_mls.decrypt_dm("bob", "alice", encrypted["ciphertext"], encrypted["nonce"])
|
||||
assert decrypted["ok"] is True
|
||||
assert decrypted["plaintext"] == original
|
||||
|
||||
|
||||
def test_encrypt_produces_padded_ciphertext(tmp_path, monkeypatch):
|
||||
"""The plaintext fed to privacy-core must be bucket-padded (verify via round-trip size)."""
|
||||
dm_mls, _ = _fresh_dm_mls_state(tmp_path, monkeypatch)
|
||||
_establish_session(dm_mls)
|
||||
|
||||
# Capture the padded bytes that privacy-core receives
|
||||
captured = {}
|
||||
original_dm_encrypt = dm_mls._privacy_client().dm_encrypt
|
||||
|
||||
def spy_dm_encrypt(handle, data):
|
||||
captured["padded"] = data
|
||||
return original_dm_encrypt(handle, data)
|
||||
|
||||
monkeypatch.setattr(dm_mls._privacy_client(), "dm_encrypt", spy_dm_encrypt)
|
||||
|
||||
dm_mls.encrypt_dm("alice", "bob", "short")
|
||||
padded = captured["padded"]
|
||||
assert padded[:4] == dm_mls.PAD_MAGIC
|
||||
assert len(padded) % dm_mls.PAD_BUCKET_STEP == 0
|
||||
|
||||
|
||||
def test_legacy_unpadded_mls_ciphertext_decrypts(tmp_path, monkeypatch):
|
||||
"""Legacy ciphertext (no SBP1 header) must still decrypt successfully."""
|
||||
dm_mls, _ = _fresh_dm_mls_state(tmp_path, monkeypatch)
|
||||
_establish_session(dm_mls)
|
||||
|
||||
# Encrypt without padding by calling privacy-core directly (simulating legacy)
|
||||
binding = dm_mls._session_binding("alice", "bob")
|
||||
raw_plaintext = b"legacy unpadded message"
|
||||
raw_ciphertext = dm_mls._privacy_client().dm_encrypt(binding.session_handle, raw_plaintext)
|
||||
ciphertext_b64 = dm_mls._b64(raw_ciphertext)
|
||||
|
||||
decrypted = dm_mls.decrypt_dm("bob", "alice", ciphertext_b64, "")
|
||||
assert decrypted["ok"] is True
|
||||
assert decrypted["plaintext"] == "legacy unpadded message"
|
||||
Reference in New Issue
Block a user