mirror of
https://github.com/elder-plinius/STEGOSAURUS-WRECKS.git
synced 2026-04-21 19:55:57 +02:00
379 lines
10 KiB
Python
379 lines
10 KiB
Python
"""
|
|
STEGOSAURUS WRECKS - Cryptography Module
|
|
AES encryption for steganographic payloads
|
|
"""
|
|
|
|
import os
|
|
import hashlib
|
|
import secrets
|
|
from typing import Tuple, Optional
|
|
from dataclasses import dataclass
|
|
|
|
# Try to import cryptography library, fall back to basic XOR if not available
|
|
HAS_CRYPTO = False
|
|
try:
|
|
# Pre-check: verify cryptography's native bindings work.
|
|
# Some systems have a broken cryptography install where the Rust
|
|
# bindings crash with a pyo3 panic that Python can't catch.
|
|
import subprocess as _sp
|
|
_probe = _sp.run(
|
|
['python3', '-c', 'from cryptography.exceptions import InvalidSignature'],
|
|
capture_output=True, timeout=5
|
|
)
|
|
if _probe.returncode == 0:
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.primitives import padding
|
|
from cryptography.hazmat.backends import default_backend
|
|
HAS_CRYPTO = True
|
|
except Exception:
|
|
HAS_CRYPTO = False
|
|
|
|
|
|
@dataclass
|
|
class EncryptedPayload:
|
|
"""Container for encrypted data with metadata"""
|
|
ciphertext: bytes
|
|
iv: bytes
|
|
salt: bytes
|
|
method: str # 'aes-256-cbc', 'aes-256-gcm', 'xor'
|
|
|
|
|
|
def derive_key(password: str, salt: bytes, key_length: int = 32) -> bytes:
|
|
"""
|
|
Derive encryption key from password using PBKDF2
|
|
|
|
Args:
|
|
password: User password
|
|
salt: Random salt (should be stored with ciphertext)
|
|
key_length: Desired key length in bytes (32 for AES-256)
|
|
|
|
Returns:
|
|
Derived key bytes
|
|
"""
|
|
return hashlib.pbkdf2_hmac(
|
|
'sha256',
|
|
password.encode('utf-8'),
|
|
salt,
|
|
iterations=600000,
|
|
dklen=key_length
|
|
)
|
|
|
|
|
|
def generate_salt(length: int = 16) -> bytes:
|
|
"""Generate cryptographically secure random salt"""
|
|
return secrets.token_bytes(length)
|
|
|
|
|
|
def generate_iv(length: int = 16) -> bytes:
|
|
"""Generate cryptographically secure random IV"""
|
|
return secrets.token_bytes(length)
|
|
|
|
|
|
# ============== AES Encryption (requires cryptography library) ==============
|
|
|
|
def encrypt_aes_cbc(data: bytes, password: str) -> EncryptedPayload:
|
|
"""
|
|
Encrypt data using AES-256-CBC
|
|
|
|
Args:
|
|
data: Plaintext bytes
|
|
password: Encryption password
|
|
|
|
Returns:
|
|
EncryptedPayload with ciphertext, IV, and salt
|
|
"""
|
|
if not HAS_CRYPTO:
|
|
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
|
|
|
|
salt = generate_salt()
|
|
iv = generate_iv()
|
|
key = derive_key(password, salt)
|
|
|
|
# Pad data to block size
|
|
padder = padding.PKCS7(128).padder()
|
|
padded_data = padder.update(data) + padder.finalize()
|
|
|
|
# Encrypt
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
|
encryptor = cipher.encryptor()
|
|
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
|
|
|
|
return EncryptedPayload(
|
|
ciphertext=ciphertext,
|
|
iv=iv,
|
|
salt=salt,
|
|
method='aes-256-cbc'
|
|
)
|
|
|
|
|
|
def decrypt_aes_cbc(payload: EncryptedPayload, password: str) -> bytes:
|
|
"""
|
|
Decrypt AES-256-CBC encrypted data
|
|
|
|
Args:
|
|
payload: EncryptedPayload from encrypt_aes_cbc
|
|
password: Decryption password
|
|
|
|
Returns:
|
|
Decrypted plaintext bytes
|
|
"""
|
|
if not HAS_CRYPTO:
|
|
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
|
|
|
|
key = derive_key(password, payload.salt)
|
|
|
|
# Decrypt
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(payload.iv), backend=default_backend())
|
|
decryptor = cipher.decryptor()
|
|
padded_data = decryptor.update(payload.ciphertext) + decryptor.finalize()
|
|
|
|
# Unpad
|
|
unpadder = padding.PKCS7(128).unpadder()
|
|
data = unpadder.update(padded_data) + unpadder.finalize()
|
|
|
|
return data
|
|
|
|
|
|
def encrypt_aes_gcm(data: bytes, password: str) -> EncryptedPayload:
|
|
"""
|
|
Encrypt data using AES-256-GCM (authenticated encryption)
|
|
|
|
Args:
|
|
data: Plaintext bytes
|
|
password: Encryption password
|
|
|
|
Returns:
|
|
EncryptedPayload with ciphertext (includes auth tag), IV, and salt
|
|
"""
|
|
if not HAS_CRYPTO:
|
|
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
|
|
|
|
salt = generate_salt()
|
|
iv = generate_iv(12) # GCM uses 12-byte IV
|
|
key = derive_key(password, salt)
|
|
|
|
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
|
|
encryptor = cipher.encryptor()
|
|
ciphertext = encryptor.update(data) + encryptor.finalize()
|
|
|
|
# Append auth tag to ciphertext
|
|
ciphertext_with_tag = ciphertext + encryptor.tag
|
|
|
|
return EncryptedPayload(
|
|
ciphertext=ciphertext_with_tag,
|
|
iv=iv,
|
|
salt=salt,
|
|
method='aes-256-gcm'
|
|
)
|
|
|
|
|
|
def decrypt_aes_gcm(payload: EncryptedPayload, password: str) -> bytes:
|
|
"""
|
|
Decrypt AES-256-GCM encrypted data
|
|
|
|
Args:
|
|
payload: EncryptedPayload from encrypt_aes_gcm
|
|
password: Decryption password
|
|
|
|
Returns:
|
|
Decrypted plaintext bytes
|
|
"""
|
|
if not HAS_CRYPTO:
|
|
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
|
|
|
|
key = derive_key(password, payload.salt)
|
|
|
|
# Extract auth tag (last 16 bytes)
|
|
ciphertext = payload.ciphertext[:-16]
|
|
tag = payload.ciphertext[-16:]
|
|
|
|
cipher = Cipher(algorithms.AES(key), modes.GCM(payload.iv, tag), backend=default_backend())
|
|
decryptor = cipher.decryptor()
|
|
data = decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
|
return data
|
|
|
|
|
|
# ============== XOR Encryption (fallback, no dependencies) ==============
|
|
|
|
def encrypt_xor(data: bytes, password: str) -> EncryptedPayload:
|
|
"""
|
|
Simple XOR encryption (fallback when cryptography not available)
|
|
NOT CRYPTOGRAPHICALLY SECURE - use only as fallback
|
|
|
|
Args:
|
|
data: Plaintext bytes
|
|
password: Encryption password
|
|
|
|
Returns:
|
|
EncryptedPayload with XOR'd ciphertext
|
|
"""
|
|
salt = generate_salt()
|
|
key = derive_key(password, salt, key_length=len(data))
|
|
|
|
# Extend key to match data length using key derivation
|
|
extended_key = b''
|
|
counter = 0
|
|
while len(extended_key) < len(data):
|
|
extended_key += hashlib.sha256(key + counter.to_bytes(4, 'big')).digest()
|
|
counter += 1
|
|
extended_key = extended_key[:len(data)]
|
|
|
|
ciphertext = bytes(a ^ b for a, b in zip(data, extended_key))
|
|
|
|
return EncryptedPayload(
|
|
ciphertext=ciphertext,
|
|
iv=b'', # XOR doesn't use IV
|
|
salt=salt,
|
|
method='xor'
|
|
)
|
|
|
|
|
|
def decrypt_xor(payload: EncryptedPayload, password: str) -> bytes:
|
|
"""
|
|
Decrypt XOR encrypted data
|
|
|
|
Args:
|
|
payload: EncryptedPayload from encrypt_xor
|
|
password: Decryption password
|
|
|
|
Returns:
|
|
Decrypted plaintext bytes
|
|
"""
|
|
key = derive_key(password, payload.salt, key_length=len(payload.ciphertext))
|
|
|
|
# Extend key to match data length
|
|
extended_key = b''
|
|
counter = 0
|
|
while len(extended_key) < len(payload.ciphertext):
|
|
extended_key += hashlib.sha256(key + counter.to_bytes(4, 'big')).digest()
|
|
counter += 1
|
|
extended_key = extended_key[:len(payload.ciphertext)]
|
|
|
|
plaintext = bytes(a ^ b for a, b in zip(payload.ciphertext, extended_key))
|
|
return plaintext
|
|
|
|
|
|
# ============== Unified Interface ==============
|
|
|
|
def encrypt(data: bytes, password: str, method: str = 'auto') -> bytes:
|
|
"""
|
|
Encrypt data with specified method
|
|
|
|
Args:
|
|
data: Plaintext bytes
|
|
password: Encryption password
|
|
method: 'aes-cbc', 'aes-gcm', 'xor', or 'auto'
|
|
|
|
Returns:
|
|
Packed encrypted payload (can be embedded directly)
|
|
"""
|
|
if method == 'auto':
|
|
method = 'aes-gcm' if HAS_CRYPTO else 'xor'
|
|
|
|
if method == 'aes-cbc':
|
|
payload = encrypt_aes_cbc(data, password)
|
|
elif method == 'aes-gcm':
|
|
payload = encrypt_aes_gcm(data, password)
|
|
elif method == 'xor':
|
|
payload = encrypt_xor(data, password)
|
|
else:
|
|
raise ValueError(f"Unknown encryption method: {method}")
|
|
|
|
return pack_payload(payload)
|
|
|
|
|
|
def decrypt(packed_data: bytes, password: str) -> bytes:
|
|
"""
|
|
Decrypt packed encrypted payload
|
|
|
|
Args:
|
|
packed_data: Packed payload from encrypt()
|
|
password: Decryption password
|
|
|
|
Returns:
|
|
Decrypted plaintext bytes
|
|
"""
|
|
payload = unpack_payload(packed_data)
|
|
|
|
if payload.method == 'aes-256-cbc':
|
|
return decrypt_aes_cbc(payload, password)
|
|
elif payload.method == 'aes-256-gcm':
|
|
return decrypt_aes_gcm(payload, password)
|
|
elif payload.method == 'xor':
|
|
return decrypt_xor(payload, password)
|
|
else:
|
|
raise ValueError(f"Unknown encryption method: {payload.method}")
|
|
|
|
|
|
def pack_payload(payload: EncryptedPayload) -> bytes:
|
|
"""
|
|
Pack EncryptedPayload into bytes for embedding
|
|
|
|
Format:
|
|
[1 byte: method ID][1 byte: salt len][salt][1 byte: iv len][iv][ciphertext]
|
|
"""
|
|
method_ids = {'aes-256-cbc': 1, 'aes-256-gcm': 2, 'xor': 3}
|
|
method_id = method_ids.get(payload.method, 0)
|
|
|
|
packed = bytes([method_id])
|
|
packed += bytes([len(payload.salt)]) + payload.salt
|
|
packed += bytes([len(payload.iv)]) + payload.iv
|
|
packed += payload.ciphertext
|
|
|
|
return packed
|
|
|
|
|
|
def unpack_payload(data: bytes) -> EncryptedPayload:
|
|
"""
|
|
Unpack bytes into EncryptedPayload
|
|
|
|
Args:
|
|
data: Packed payload bytes
|
|
|
|
Returns:
|
|
EncryptedPayload object
|
|
"""
|
|
method_names = {1: 'aes-256-cbc', 2: 'aes-256-gcm', 3: 'xor'}
|
|
|
|
idx = 0
|
|
method_id = data[idx]
|
|
method = method_names.get(method_id, 'unknown')
|
|
idx += 1
|
|
|
|
salt_len = data[idx]
|
|
idx += 1
|
|
salt = data[idx:idx + salt_len]
|
|
idx += salt_len
|
|
|
|
iv_len = data[idx]
|
|
idx += 1
|
|
iv = data[idx:idx + iv_len]
|
|
idx += iv_len
|
|
|
|
ciphertext = data[idx:]
|
|
|
|
return EncryptedPayload(
|
|
ciphertext=ciphertext,
|
|
iv=iv,
|
|
salt=salt,
|
|
method=method
|
|
)
|
|
|
|
|
|
def get_available_methods() -> list:
|
|
"""Get list of available encryption methods"""
|
|
methods = ['xor'] # Always available
|
|
if HAS_CRYPTO:
|
|
methods = ['aes-gcm', 'aes-cbc'] + methods
|
|
return methods
|
|
|
|
|
|
def crypto_status() -> dict:
|
|
"""Get cryptography library status"""
|
|
return {
|
|
"cryptography_available": HAS_CRYPTO,
|
|
"available_methods": get_available_methods(),
|
|
"recommended": 'aes-gcm' if HAS_CRYPTO else 'xor'
|
|
}
|