#!/usr/bin/env python3 """ STEGOSAURUS WRECKS - NiceGUI Web Interface Ultimate steganography suite with hacker aesthetic Run with: python webui.py """ from nicegui import ui, app from fastapi import Request from fastapi.responses import JSONResponse from PIL import Image import io import base64 import tempfile import os from pathlib import Path from typing import Optional import asyncio import json # Import our steg modules import steg_core from steg_core import ( encode, decode, create_config, calculate_capacity, analyze_image, detect_encoding, CHANNEL_PRESETS, Channel, StegConfig, EncodingStrategy, _extract_bit_units, _bits_array_to_bytes, _generate_pixel_indices ) import crypto import numpy as np import re import string # ============== THEME / STYLING ============== DARK_CSS = """ :root { --primary: #00ff41; --primary-dim: #00aa2a; --primary-glow: rgba(0, 255, 65, 0.4); --secondary: #00d4ff; --accent: #ff00ff; --warning: #ffd000; --error: #ff3333; --bg-dark: #0a0a0a; --bg-card: #0d1117; --bg-hover: #161b22; --text: #00ff41; --text-dim: #555; --border: #30363d; } body { background: var(--bg-dark) !important; font-family: 'JetBrains Mono', 'Fira Code', 'Courier New', monospace !important; } .q-card { background: var(--bg-card) !important; border: 1px solid var(--border) !important; } .nicegui-content { background: var(--bg-dark) !important; } /* Matrix scanline effect - subtler */ .matrix-bg { background: repeating-linear-gradient( 0deg, transparent, transparent 2px, rgba(0, 255, 65, 0.015) 2px, rgba(0, 255, 65, 0.015) 4px ); pointer-events: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; z-index: 9999; } /* Glow effect for headers */ .glow { text-shadow: 0 0 10px var(--primary), 0 0 20px var(--primary-glow); } /* Cyber border animation */ @keyframes border-pulse { 0%, 100% { border-color: var(--primary); box-shadow: 0 0 5px var(--primary-glow); } 50% { border-color: var(--secondary); box-shadow: 0 0 15px rgba(0, 212, 255, 0.3); } } @keyframes glow-pulse { 0%, 100% { opacity: 0.6; } 50% { opacity: 1; } } .cyber-border { border: 1px solid var(--primary) !important; animation: border-pulse 3s infinite; } /* Status indicators */ .status-success { color: var(--primary) !important; } .status-error { color: var(--error) !important; } .status-warning { color: var(--warning) !important; } .status-info { color: var(--secondary) !important; } /* Custom scrollbar */ ::-webkit-scrollbar { width: 6px; } ::-webkit-scrollbar-track { background: var(--bg-dark); } ::-webkit-scrollbar-thumb { background: var(--primary-dim); border-radius: 3px; } ::-webkit-scrollbar-thumb:hover { background: var(--primary); } /* File upload area - MODERNIZED */ .drop-zone { border: 1px solid var(--border) !important; background: linear-gradient(135deg, rgba(0, 255, 65, 0.03) 0%, rgba(0, 212, 255, 0.03) 100%) !important; transition: all 0.3s ease; padding: 16px; text-align: center; cursor: pointer; border-radius: 8px; position: relative; overflow: hidden; } .drop-zone::before { content: ''; position: absolute; top: 0; left: -100%; width: 100%; height: 100%; background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.1), transparent); transition: left 0.5s ease; } .drop-zone:hover { border-color: var(--primary) !important; background: linear-gradient(135deg, rgba(0, 255, 65, 0.08) 0%, rgba(0, 212, 255, 0.05) 100%) !important; box-shadow: 0 0 20px rgba(0, 255, 65, 0.15), inset 0 0 30px rgba(0, 255, 65, 0.05); } .drop-zone:hover::before { left: 100%; } /* Terminal style output */ .terminal { background: #000 !important; border: 1px solid var(--border) !important; font-family: 'JetBrains Mono', monospace !important; color: var(--primary) !important; padding: 16px !important; white-space: pre-wrap; overflow-x: auto; border-radius: 6px; } /* Button overrides for cooler look */ .q-btn { text-transform: uppercase !important; letter-spacing: 1px !important; font-weight: 600 !important; } /* Input fields */ .q-field--dark .q-field__control { background: var(--bg-card) !important; border: 1px solid var(--border) !important; border-radius: 6px !important; } .q-field--dark .q-field__control:hover { border-color: var(--primary-dim) !important; } .q-field--focused .q-field__control { border-color: var(--primary) !important; box-shadow: 0 0 10px var(--primary-glow) !important; } /* Select dropdowns */ .q-menu { background: var(--bg-card) !important; border: 1px solid var(--border) !important; } /* Tabs styling */ .q-tab--active { color: var(--primary) !important; text-shadow: 0 0 10px var(--primary-glow); } .q-tab:hover { color: var(--secondary) !important; } /* Switches */ .q-toggle__track { background: var(--border) !important; } .q-toggle--active .q-toggle__track { background: var(--primary-dim) !important; } /* Image preview container */ .image-preview { border: 1px solid var(--border); border-radius: 8px; overflow: hidden; background: #000; } /* Stats/info boxes */ .info-box { background: linear-gradient(135deg, var(--bg-card) 0%, var(--bg-hover) 100%); border: 1px solid var(--border); border-radius: 8px; padding: 12px; } .info-box:hover { border-color: var(--primary-dim); } /* Section headers */ .section-header { font-size: 14px; font-weight: 700; letter-spacing: 2px; text-transform: uppercase; margin-bottom: 12px; display: flex; align-items: center; gap: 8px; } .section-header::after { content: ''; flex: 1; height: 1px; background: linear-gradient(90deg, var(--border), transparent); } /* Capacity meter */ .capacity-meter { background: var(--bg-card); border: 1px solid var(--border); border-radius: 6px; padding: 12px; margin: 8px 0; } .capacity-bar-outer { background: var(--bg-dark); border-radius: 4px; height: 6px; overflow: hidden; margin: 8px 0; } .capacity-bar-inner { height: 100%; border-radius: 4px; transition: width 0.3s ease, background 0.3s ease; } /* Notification styling */ .q-notification { background: var(--bg-card) !important; border: 1px solid var(--border) !important; } /* ๐Ÿช† MATRYOSHKA MODE - Easter egg activation zone (stealth) */ .matryoshka-trigger { position: fixed; bottom: 0; left: 0; width: 12px; height: 12px; z-index: 99999; opacity: 0; background: transparent; pointer-events: auto !important; } .matryoshka-trigger:hover { opacity: 0; } /* Matryoshka mode indicator */ .matryoshka-active { animation: matryoshka-glow 2s ease-in-out infinite; } @keyframes matryoshka-glow { 0%, 100% { box-shadow: 0 0 5px #ff00ff, 0 0 10px #ff00ff; } 50% { box-shadow: 0 0 20px #ff00ff, 0 0 40px #ff00ff, 0 0 60px #ff00ff; } } /* Nested results tree styling */ .matryoshka-tree { font-family: monospace; padding: 12px; background: #000; border: 1px solid var(--accent); border-radius: 6px; } .matryoshka-layer { padding-left: 20px; border-left: 2px solid var(--accent); margin-left: 10px; } .matryoshka-layer-0 { border-color: #ff00ff; } .matryoshka-layer-1 { border-color: #ff66ff; } .matryoshka-layer-2 { border-color: #ff99ff; } .matryoshka-layer-3 { border-color: #ffccff; } """ BANNER_ASCII = """ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•—โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•—โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•”โ•โ•โ•โ•โ•โ•šโ•โ•โ–ˆโ–ˆโ•”โ•โ•โ•โ–ˆโ–ˆโ•”โ•โ•โ•โ•โ•โ–ˆโ–ˆโ•”โ•โ•โ•โ•โ• โ–ˆโ–ˆโ•”โ•โ•โ•โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ•”โ•โ•โ•โ•โ•โ–ˆโ–ˆโ•”โ•โ•โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•”โ•โ•โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•”โ•โ•โ•โ•โ• โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ–ˆโ•—โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•—โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•”โ•โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•— โ•šโ•โ•โ•โ•โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•”โ•โ•โ• โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ•šโ•โ•โ•โ•โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•”โ•โ•โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•”โ•โ•โ–ˆโ–ˆโ•—โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ•šโ•โ•โ•โ•โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•—โ•šโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•”โ•โ•šโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•”โ•โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•‘โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ•šโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•”โ•โ–ˆโ–ˆโ•‘ โ–ˆโ–ˆโ•‘โ•šโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•”โ•โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ•‘ โ•šโ•โ•โ•โ•โ•โ•โ• โ•šโ•โ• โ•šโ•โ•โ•โ•โ•โ•โ• โ•šโ•โ•โ•โ•โ•โ• โ•šโ•โ•โ•โ•โ•โ• โ•šโ•โ•โ•โ•โ•โ•โ•โ•šโ•โ• โ•šโ•โ• โ•šโ•โ•โ•โ•โ•โ• โ•šโ•โ• โ•šโ•โ• โ•šโ•โ•โ•โ•โ•โ• โ•šโ•โ•โ•โ•โ•โ•โ• W R E C K S v3.0 """ STEGO_ASCII = """ __ / '-. / .-. | STEGOSAURUS /.' \\| WRECKS // |\\ \\ || | \\ | >> LSB Suite << /|| | \\ | / ||__/ \\/ \\ --' |\\_\\ '._____.' \\/ """ # ============== STATE MANAGEMENT ============== class AppState: """Global application state""" def __init__(self): # Encode state self.carrier_image: Optional[Image.Image] = None self.carrier_path: Optional[str] = None self.encoded_image: Optional[Image.Image] = None self.encode_file_data: Optional[bytes] = None self.encode_file_name: Optional[str] = None self.encode_result: Optional[Image.Image] = None # Decode state self.decode_image: Optional[Image.Image] = None self.detected_config: Optional[dict] = None self.extracted_file: Optional[bytes] = None self.extracted_filename: Optional[str] = None # Analyze state self.analyze_image: Optional[Image.Image] = None # General self.advanced_mode: bool = False self.capacity_info: dict = {} # UI state - track active tab across reloads self.active_tab: str = 'encode' # ๐Ÿช† MATRYOSHKA MODE - Russian nesting doll steganography self.matryoshka_mode: bool = False self.matryoshka_depth: int = 3 # Default depth (1-11) # Store nested decode results self.matryoshka_results: list = [] # Matryoshka encoding - carrier images (outermost first) self.matryoshka_carriers: list = [] # List of (Image, filename) tuples self.matryoshka_encode_result: Optional[Image.Image] = None state = AppState() def generate_blank_image(width: int = 1024, height: int = 1024, color: str = 'noise') -> Image.Image: """Generate a blank or noise image for steganography""" import numpy as np if color == 'noise': # Random noise - best for hiding data pixels = np.random.randint(0, 256, (height, width, 4), dtype=np.uint8) pixels[:, :, 3] = 255 # Full alpha return Image.fromarray(pixels, 'RGBA') elif color == 'black': return Image.new('RGBA', (width, height), (0, 0, 0, 255)) elif color == 'white': return Image.new('RGBA', (width, height), (255, 255, 255, 255)) else: # Gradient - looks nicer pixels = np.zeros((height, width, 4), dtype=np.uint8) for y in range(height): for x in range(width): pixels[y, x] = [ int(255 * x / width), int(255 * y / height), 128, 255 ] return Image.fromarray(pixels, 'RGBA') # ============== HELPER FUNCTIONS ============== def image_to_base64(img: Image.Image, format: str = "PNG") -> str: """Convert PIL Image to base64 string for display""" buffer = io.BytesIO() img.save(buffer, format=format) return base64.b64encode(buffer.getvalue()).decode() def base64_to_image(b64: str) -> Image.Image: """Convert base64 string to PIL Image""" # Handle data URL format if ',' in b64: b64 = b64.split(',')[1] data = base64.b64decode(b64) return Image.open(io.BytesIO(data)) def format_size(size_bytes: int) -> str: """Format bytes to human readable""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} TB" # ============== SMART DECODE / APERI'SOLVE STYLE ============== def detect_coherent_text(data: bytes, min_printable_ratio: float = 0.85) -> dict: """ Detect if data looks like coherent readable text. Returns analysis dict with confidence and detected text preview. Properly handles Unicode text, not just ASCII. """ if not data or len(data) == 0: return {"is_text": False, "confidence": 0, "preview": "", "reason": "empty"} # Try UTF-8 decode try: text = data.decode('utf-8', errors='strict') except: try: text = data.decode('latin-1', errors='replace') except: return {"is_text": False, "confidence": 0, "preview": "", "reason": "decode_failed"} if len(text) == 0: return {"is_text": False, "confidence": 0, "preview": "", "reason": "empty_text"} # Calculate printable character ratio - Unicode aware # Count chars that are NOT control characters as printable # Control chars: 0x00-0x1F (except \t, \n, \r), 0x7F, 0x80-0x9F def is_printable_unicode(c): code = ord(c) # Whitespace is fine if c in '\t\n\r ': return True # ASCII control chars if code <= 0x1F or code == 0x7F: return False # C1 control chars if 0x80 <= code <= 0x9F: return False # Everything else (printable ASCII, Latin-1 supplement, Unicode) is OK return True printable_count = sum(1 for c in text[:500] if is_printable_unicode(c)) printable_ratio = printable_count / min(len(text), 500) # Check for null bytes in first 100 chars (binary indicator) null_count = text[:100].count('\x00') has_nulls = null_count > 2 # Check for common words (English) common_words = ['the', 'and', 'is', 'in', 'to', 'of', 'a', 'for', 'on', 'that', 'this', 'with', 'are', 'be', 'as', 'at', 'by', 'or', 'an', 'it', 'from', 'was', 'have', 'has', 'not', 'but', 'what', 'all', 'were', 'flag', 'ctf', 'secret', 'hidden', 'password', 'key', 'message'] text_lower = text.lower() word_matches = sum(1 for w in common_words if w in text_lower) # Check for repeated patterns (often indicates noise) is_repetitive = False if len(text) > 20: first_20 = text[:20] repetition = text.count(first_20) is_repetitive = repetition > 2 # Calculate entropy (lower = more structured/text-like) char_freq = {} for c in text[:500]: # Sample first 500 chars char_freq[c] = char_freq.get(c, 0) + 1 total = sum(char_freq.values()) entropy = -sum((f/total) * np.log2(f/total) for f in char_freq.values() if f > 0) # Score calculation confidence = 0 # Printable ratio is crucial if printable_ratio >= 0.95: confidence += 40 elif printable_ratio >= 0.85: confidence += 25 elif printable_ratio >= 0.70: confidence += 10 # Word matches boost confidence confidence += min(30, word_matches * 5) # Low entropy (structured text) is good if entropy < 4.5: confidence += 15 elif entropy < 5.5: confidence += 5 # Penalties if has_nulls: confidence -= 30 if len(text) > 20 and is_repetitive: confidence -= 20 # Clean preview - keep Unicode, only replace control chars preview_text = text[:200] preview_clean = ''.join(c if is_printable_unicode(c) else 'ยท' for c in preview_text) is_text = confidence >= 35 and printable_ratio >= min_printable_ratio return { "is_text": is_text, "confidence": max(0, min(100, confidence)), "preview": preview_clean, "printable_ratio": printable_ratio, "entropy": entropy, "word_matches": word_matches, "reason": "looks_like_text" if is_text else "binary_data" } def extract_raw_lsb(image: Image.Image, channels: list, bits: int = 1, bit_offset: int = 0, max_bytes: int = 4096, strategy: EncodingStrategy = EncodingStrategy.INTERLEAVED, seed: int = None) -> bytes: """ Extract raw LSB data without requiring STEG header. For blind extraction like Aperi'Solve does. Args: image: PIL Image to extract from channels: List of Channel enums to extract from bits: Bits per channel (1-8) bit_offset: Bit offset within each channel max_bytes: Maximum bytes to extract strategy: Encoding strategy used (SEQUENTIAL, INTERLEAVED, SPREAD, RANDOMIZED) seed: Random seed for RANDOMIZED strategy """ img = image.convert("RGBA") pixels = np.array(img, dtype=np.uint8) height, width = pixels.shape[:2] total_pixels = height * width flat_pixels = pixels.reshape(-1, 4) # Create simple extraction config channel_indices = np.array([c.value for c in channels], dtype=np.uint8) num_channels = len(channels) # Calculate how many units we need bits_needed = max_bytes * 8 units_needed = bits_needed // bits if bits_needed % bits: units_needed += 1 bit_mask = ((1 << bits) - 1) << bit_offset result = np.zeros(units_needed, dtype=np.uint8) if strategy == EncodingStrategy.INTERLEAVED: # Cycle through channels at each pixel pixels_needed = (units_needed + num_channels - 1) // num_channels pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed) unit_idx = 0 for pix_idx in pixel_indices: for ch in channel_indices: if unit_idx >= units_needed: break value = flat_pixels[pix_idx, ch] result[unit_idx] = (value & bit_mask) >> bit_offset unit_idx += 1 if unit_idx >= units_needed: break elif strategy == EncodingStrategy.SEQUENTIAL: # Fill each channel completely before moving to next unit_idx = 0 for ch in channel_indices: pixels_for_channel = min(total_pixels, units_needed - unit_idx) pixel_indices = _generate_pixel_indices(total_pixels, pixels_for_channel, strategy, seed) for pix_idx in pixel_indices: if unit_idx >= units_needed: break value = flat_pixels[pix_idx, ch] result[unit_idx] = (value & bit_mask) >> bit_offset unit_idx += 1 if unit_idx >= units_needed: break elif strategy == EncodingStrategy.SPREAD: # Spread evenly across image pixels_needed = (units_needed + num_channels - 1) // num_channels pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed) unit_idx = 0 for pix_idx in pixel_indices: for ch in channel_indices: if unit_idx >= units_needed: break value = flat_pixels[pix_idx, ch] result[unit_idx] = (value & bit_mask) >> bit_offset unit_idx += 1 if unit_idx >= units_needed: break elif strategy == EncodingStrategy.RANDOMIZED: # Pseudo-random order (seeded for reproducibility) pixels_needed = (units_needed + num_channels - 1) // num_channels pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed or 42) unit_idx = 0 for pix_idx in pixel_indices: for ch in channel_indices: if unit_idx >= units_needed: break value = flat_pixels[pix_idx, ch] result[unit_idx] = (value & bit_mask) >> bit_offset unit_idx += 1 if unit_idx >= units_needed: break # Convert to bytes return _bits_array_to_bytes(result, bits, bits_needed)[:max_bytes] def smart_scan_image(image: Image.Image, password: str = None) -> list: """ Aperi'Solve-style multi-config scan. Tries multiple channel/bit/strategy combinations and reports findings. """ results = [] # Base channel/bit configurations (ordered by likelihood) base_configs = [ # Common single channels - 1 bit {"name": "R", "channels": [Channel.R], "bits": 1}, {"name": "G", "channels": [Channel.G], "bits": 1}, {"name": "B", "channels": [Channel.B], "bits": 1}, {"name": "A", "channels": [Channel.A], "bits": 1}, # Common single channels - 2 bit {"name": "R-2bit", "channels": [Channel.R], "bits": 2}, {"name": "G-2bit", "channels": [Channel.G], "bits": 2}, {"name": "B-2bit", "channels": [Channel.B], "bits": 2}, {"name": "A-2bit", "channels": [Channel.A], "bits": 2}, # RGB combos - 1 and 2 bit {"name": "RGB", "channels": [Channel.R, Channel.G, Channel.B], "bits": 1}, {"name": "RGB-2bit", "channels": [Channel.R, Channel.G, Channel.B], "bits": 2}, # RGBA - 1 and 2 bit {"name": "RGBA", "channels": [Channel.R, Channel.G, Channel.B, Channel.A], "bits": 1}, {"name": "RGBA-2bit", "channels": [Channel.R, Channel.G, Channel.B, Channel.A], "bits": 2}, # Two channel combos - 1 bit {"name": "RG", "channels": [Channel.R, Channel.G], "bits": 1}, {"name": "RB", "channels": [Channel.R, Channel.B], "bits": 1}, {"name": "GB", "channels": [Channel.G, Channel.B], "bits": 1}, {"name": "RA", "channels": [Channel.R, Channel.A], "bits": 1}, {"name": "GA", "channels": [Channel.G, Channel.A], "bits": 1}, {"name": "BA", "channels": [Channel.B, Channel.A], "bits": 1}, # Two channel combos - 2 bit {"name": "RG-2bit", "channels": [Channel.R, Channel.G], "bits": 2}, {"name": "RB-2bit", "channels": [Channel.R, Channel.B], "bits": 2}, {"name": "GB-2bit", "channels": [Channel.G, Channel.B], "bits": 2}, {"name": "RA-2bit", "channels": [Channel.R, Channel.A], "bits": 2}, {"name": "GA-2bit", "channels": [Channel.G, Channel.A], "bits": 2}, {"name": "BA-2bit", "channels": [Channel.B, Channel.A], "bits": 2}, # Higher bit depths (4-bit) {"name": "R-4bit", "channels": [Channel.R], "bits": 4}, {"name": "RGB-4bit", "channels": [Channel.R, Channel.G, Channel.B], "bits": 4}, ] # All encoding strategies to try strategies_to_try = [ {"name": "interleaved", "strategy": EncodingStrategy.INTERLEAVED}, {"name": "sequential", "strategy": EncodingStrategy.SEQUENTIAL}, {"name": "spread", "strategy": EncodingStrategy.SPREAD}, {"name": "randomized", "strategy": EncodingStrategy.RANDOMIZED}, ] # Generate all combinations of base configs and strategies configs_to_try = [] for base in base_configs: for strat in strategies_to_try: # Interleaved is default, so omit suffix for cleaner names if strat["strategy"] == EncodingStrategy.INTERLEAVED: name = base["name"] else: name = f"{base['name']}-{strat['name']}" configs_to_try.append({ "name": name, "channels": base["channels"], "bits": base["bits"], "strategy": strat["strategy"], }) # First check for STEG header (our own format) detection = detect_encoding(image) if detection: results.append({ "name": f"STEG-HEADER ({detection['config']['channels']})", "channels": detection['config']['channels'], "bits": detection['config']['bits_per_channel'], "has_header": True, "is_text": True, "confidence": 95, "preview": f"[STEG v3 detected - {format_size(detection['original_length'])}]", "payload_size": detection['original_length'], "status": "STEG_DETECTED", "can_decode": True, }) # Scan each config for cfg in configs_to_try: try: raw_data = extract_raw_lsb( image, channels=cfg["channels"], bits=cfg["bits"], max_bytes=2048, # Sample first 2KB strategy=cfg["strategy"] ) # Check for STEG magic at start has_magic = raw_data[:4] == b'STEG' # Decrypt if password provided data_to_check = raw_data if password and not has_magic: try: data_to_check = crypto.decrypt(raw_data, password) except: pass # Decryption failed, use raw # Analyze for coherent text text_analysis = detect_coherent_text(data_to_check) # Determine status if has_magic: status = "STEG_HEADER" elif text_analysis["is_text"] and text_analysis["confidence"] >= 50: status = "TEXT_FOUND" elif text_analysis["is_text"]: status = "POSSIBLE_TEXT" elif text_analysis["printable_ratio"] > 0.5: status = "MIXED_DATA" else: status = "BINARY/NOISE" results.append({ "name": cfg["name"], "channels": [c.name for c in cfg["channels"]], "bits": cfg["bits"], "strategy": cfg["strategy"].value, "has_header": has_magic, "is_text": text_analysis["is_text"], "confidence": text_analysis["confidence"], "preview": text_analysis["preview"][:100] if text_analysis["preview"] else raw_data[:50].hex(), "printable_ratio": text_analysis.get("printable_ratio", 0), "entropy": text_analysis.get("entropy", 8), "status": status, "can_decode": has_magic or text_analysis["is_text"], "raw_data": raw_data if (has_magic or text_analysis["confidence"] >= 30) else None, }) except Exception as e: results.append({ "name": cfg["name"], "channels": [c.name for c in cfg["channels"]], "bits": cfg["bits"], "strategy": cfg["strategy"].value, "has_header": False, "is_text": False, "confidence": 0, "preview": f"Error: {str(e)[:50]}", "status": "ERROR", "can_decode": False, }) # Sort by confidence (highest first), then by whether text was found results.sort(key=lambda x: ( x.get("has_header", False), # STEG headers first x.get("status") == "TEXT_FOUND", x.get("confidence", 0) ), reverse=True) return results # ============== ๐Ÿช† MATRYOSHKA MODE - RECURSIVE STEG ============== def is_image_data(data: bytes) -> bool: """Check if bytes look like an image file""" if len(data) < 8: return False # PNG magic bytes if data[:8] == b'\x89PNG\r\n\x1a\n': return True # JPEG magic bytes if data[:2] == b'\xff\xd8': return True # GIF magic bytes if data[:6] in (b'GIF87a', b'GIF89a'): return True # BMP magic bytes if data[:2] == b'BM': return True return False # Valid file extensions for file format detection VALID_FILE_EXTENSIONS = { # Images 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp', 'ico', 'svg', 'tiff', 'tif', # Documents 'txt', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'odt', 'rtf', # Code 'py', 'js', 'ts', 'html', 'css', 'json', 'xml', 'yaml', 'yml', 'md', 'csv', 'java', 'c', 'cpp', 'h', 'hpp', 'rs', 'go', 'rb', 'php', 'sh', 'bash', # Archives 'zip', 'tar', 'gz', 'bz2', '7z', 'rar', # Media 'mp3', 'mp4', 'wav', 'avi', 'mkv', 'mov', 'flac', 'ogg', # Other 'bin', 'dat', 'exe', 'dll', 'so', 'key', 'pem', 'crt', } def extract_file_from_data(data: bytes) -> tuple: """ Extract filename and file data from encoded bytes. Returns (filename, file_data) or (None, data) if not a file format. File format: Where length_byte is the length of the filename (1-100 bytes). """ if len(data) < 3: return (None, data) fname_len = data[0] # Filename length must be reasonable (3-100 chars for "a.b" to reasonable max) if fname_len < 3 or fname_len > 100: return (None, data) # Must have enough data for filename + at least 1 byte of content if len(data) < fname_len + 2: return (None, data) try: filename = data[1:1+fname_len].decode('utf-8') except UnicodeDecodeError: return (None, data) # Validate filename structure if '.' not in filename: return (None, data) # Check for invalid characters (only allow alphanumeric, ., -, _, space) if not re.match(r'^[\w\-. ]+$', filename): return (None, data) # Filename must not start with . or space if filename[0] in '. ': return (None, data) # Extract and validate extension ext = filename.rsplit('.', 1)[-1].lower() if ext not in VALID_FILE_EXTENSIONS: return (None, data) # All checks passed - this looks like a real file file_data = data[1+fname_len:] return (filename, file_data) def matryoshka_decode(image: Image.Image, max_depth: int = 3, password: str = None, current_depth: int = 0) -> list: """ ๐Ÿช† Recursively decode nested steganographic images. Args: image: The image to decode max_depth: Maximum recursion depth (1-11) password: Optional decryption password current_depth: Current recursion level (internal) Returns: List of extraction results at each layer """ results = [] layer_info = { "depth": current_depth, "type": "unknown", "filename": None, "data_size": 0, "preview": "", "has_nested": False, "nested_results": [], } if current_depth >= max_depth: layer_info["type"] = "max_depth_reached" layer_info["preview"] = f"โš ๏ธ Max depth ({max_depth}) reached" results.append(layer_info) return results try: # Try auto-decode first (looks for STEG header) try: data = decode(image, None) layer_info["type"] = "steg_header" except: # If no STEG header, try smart scan scan_results = smart_scan_image(image, password) best_result = None for r in scan_results: if r.get("status") in ["STEG_DETECTED", "STEG_HEADER", "TEXT_FOUND"]: best_result = r break if best_result and best_result.get("raw_data"): data = best_result["raw_data"] layer_info["type"] = f"smart_scan_{best_result['name']}" else: layer_info["type"] = "no_data_found" layer_info["preview"] = "No hidden data detected" results.append(layer_info) return results # Decrypt if password provided if password: try: data = crypto.decrypt(data, password) except: pass # Decryption failed, use raw data layer_info["data_size"] = len(data) # Check if it's a file filename, file_data = extract_file_from_data(data) if filename: layer_info["filename"] = filename layer_info["data_size"] = len(file_data) # Check if the extracted file is an image if is_image_data(file_data): layer_info["type"] = "nested_image" layer_info["has_nested"] = True # Recursively decode the nested image try: nested_img = Image.open(io.BytesIO(file_data)) nested_results = matryoshka_decode( nested_img, max_depth=max_depth, password=password, current_depth=current_depth + 1 ) layer_info["nested_results"] = nested_results layer_info["preview"] = f"๐Ÿช† Found nested image: {filename}" except Exception as e: layer_info["preview"] = f"๐Ÿ“ Image file: {filename} (failed to recurse: {e})" else: layer_info["type"] = "file" # Try to show preview of text files ext = filename.split('.')[-1].lower() if '.' in filename else '' if ext in ['txt', 'md', 'json', 'xml', 'html', 'css', 'js', 'py', 'csv']: try: layer_info["preview"] = file_data[:200].decode('utf-8') except: layer_info["preview"] = f"๐Ÿ“ Binary file: {filename}" else: layer_info["preview"] = f"๐Ÿ“ File: {filename} ({format_size(len(file_data))})" else: # Raw data - check if it's an image if is_image_data(data): layer_info["type"] = "nested_image_raw" layer_info["has_nested"] = True try: nested_img = Image.open(io.BytesIO(data)) nested_results = matryoshka_decode( nested_img, max_depth=max_depth, password=password, current_depth=current_depth + 1 ) layer_info["nested_results"] = nested_results layer_info["preview"] = "๐Ÿช† Found raw nested image data" except Exception as e: layer_info["preview"] = f"Image data (failed to recurse: {e})" else: # Try as text try: text = data.decode('utf-8') layer_info["type"] = "text" layer_info["preview"] = text[:300] except: layer_info["type"] = "binary" layer_info["preview"] = f"Binary data: {data[:50].hex()}..." # Store raw data for download layer_info["raw_data"] = file_data if filename else data except Exception as e: layer_info["type"] = "error" layer_info["preview"] = f"Error: {str(e)}" results.append(layer_info) return results def matryoshka_encode(payload: bytes, carriers: list, config = None, password: str = None) -> tuple: """ ๐Ÿช† Recursively encode nested steganographic images. Creates a "Russian nesting doll" of hidden data, encoding the payload into the innermost carrier, then that result into the next carrier, etc. Args: payload: The data to hide (innermost secret) carriers: List of (Image, filename) tuples - innermost carrier FIRST config: StegConfig to use for all layers (or None for auto) password: Optional encryption password Returns: Tuple of (final_image, layer_info_list) """ if not carriers: raise ValueError("At least one carrier image is required") if config is None: config = create_config(channels='RGBA', bits=2) # Default: good capacity layer_info = [] current_data = payload # Encode from innermost to outermost for i, (carrier_img, carrier_name) in enumerate(carriers): layer_num = i + 1 # Calculate capacity capacity = _calculate_capacity_bytes(carrier_img, config) data_size = len(current_data) layer_info.append({ 'layer': layer_num, 'carrier': carrier_name, 'capacity': capacity, 'payload_size': data_size, 'fits': data_size <= capacity }) if data_size > capacity: raise ValueError(f"Layer {layer_num} ({carrier_name}): payload {data_size} bytes exceeds capacity {capacity} bytes") # Encrypt if password provided (only innermost layer or all?) data_to_encode = current_data if password and i == 0: # Only encrypt the innermost payload data_to_encode = crypto.encrypt(current_data, password) # Encode current data into this carrier encoded_img = encode(carrier_img, data_to_encode, config) # If there are more carriers, convert this to PNG bytes for next layer if i < len(carriers) - 1: buffer = io.BytesIO() encoded_img.save(buffer, format='PNG') current_data = buffer.getvalue() layer_info[-1]['output_size'] = len(current_data) else: # Final layer - return the image layer_info[-1]['output_size'] = 'final' return encoded_img, layer_info def _calculate_capacity_bytes(image: Image.Image, config) -> int: """Calculate byte capacity for an image with given config (simplified, returns int)""" width, height = image.size channels = len(config.channels) bits = config.bits_per_channel # Account for STEG header overhead (~48 bytes) raw_capacity = (width * height * channels * bits) // 8 return max(0, raw_capacity - 64) # Reserve space for header # ============== UI COMPONENTS ============== def create_header(): """Create the main header with ASCII art""" with ui.column().classes('w-full items-center'): ui.html(f'
{BANNER_ASCII}
', sanitize=False) ui.label('๐Ÿฆ• Ultimate LSB Steganography Suite ๐Ÿฆ•').classes('text-cyan-400 text-lg') def create_channel_selector(advanced: bool = False): """Create channel selection UI""" if advanced: # All 15 presets options = list(CHANNEL_PRESETS.keys()) else: # Simple options options = ['RGB', 'RGBA', 'R', 'G', 'B'] return options def create_capacity_display(capacity_label: ui.label, image: Image.Image, config_dict: dict): """Update capacity display""" if image is None: capacity_label.set_text('No image loaded') return config = create_config(**config_dict) cap = calculate_capacity(image, config) text = f"๐Ÿ“Š Capacity: {cap['human']} ({cap['usable_bytes']:,} bytes)" capacity_label.set_text(text) state.capacity_info = cap def create_file_picker(element_id: str, accept: str, endpoint: str, tab: str = 'encode'): """Create a custom file picker that works with Python 3.9 Returns tuple of (html, javascript) - call ui.html() for html and ui.add_body_html() for js Args: element_id: Unique ID for the element accept: File types to accept endpoint: API endpoint to POST to tab: Tab name to restore after reload ('encode', 'decode', 'analyze') """ # Cleaner accept display accept_display = accept.replace(',', ' ยท ').replace('.', '').upper() if accept != '*' else 'ANY FILE' html = f"""
โ†‘ DROP FILE OR CLICK TO BROWSE
{accept_display}
""" js = f""" """ return html, js # ============== API ENDPOINTS FOR FILE UPLOADS ============== @app.post('/api/upload/carrier') async def upload_carrier(request: Request): """Handle carrier image upload""" try: body = await request.json() b64_data = body.get('data', '') filename = body.get('filename', 'image.png') tab = body.get('tab', 'encode') state.carrier_image = base64_to_image(b64_data) state.carrier_path = filename state.active_tab = tab return JSONResponse({'success': True, 'filename': filename}) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/upload/payload') async def upload_payload(request: Request): """Handle payload file upload""" try: body = await request.json() b64_data = body.get('data', '') filename = body.get('filename', 'file.bin') tab = body.get('tab', 'encode') # Decode base64 data if ',' in b64_data: b64_data = b64_data.split(',')[1] state.encode_file_data = base64.b64decode(b64_data) state.encode_file_name = filename state.active_tab = tab return JSONResponse({'success': True, 'filename': filename}) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/upload/decode') async def upload_decode(request: Request): """Handle decode image upload""" try: body = await request.json() b64_data = body.get('data', '') filename = body.get('filename', 'image.png') tab = body.get('tab', 'decode') state.decode_image = base64_to_image(b64_data) state.active_tab = tab # Try auto-detection detection = detect_encoding(state.decode_image) state.detected_config = detection return JSONResponse({ 'success': True, 'filename': filename, 'detected': detection is not None, 'config': detection['config'] if detection else None, 'payload_size': detection['original_length'] if detection else 0 }) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/upload/analyze') async def upload_analyze(request: Request): """Handle analyze image upload""" try: body = await request.json() b64_data = body.get('data', '') filename = body.get('filename', 'image.png') tab = body.get('tab', 'analyze') state.analyze_image = base64_to_image(b64_data) state.active_tab = tab return JSONResponse({'success': True, 'filename': filename}) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/matryoshka/toggle') async def toggle_matryoshka(request: Request): """๐Ÿช† Toggle Matryoshka mode""" try: body = await request.json() active = body.get('active', False) state.matryoshka_mode = active return JSONResponse({ 'success': True, 'matryoshka_active': state.matryoshka_mode, 'depth': state.matryoshka_depth }) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/matryoshka/decode') async def matryoshka_decode_api(request: Request): """๐Ÿช† Perform recursive Matryoshka decode""" try: body = await request.json() depth = body.get('depth', state.matryoshka_depth) password = body.get('password', None) if state.decode_image is None: return JSONResponse({'success': False, 'error': 'No image loaded'}) results = matryoshka_decode(state.decode_image, max_depth=depth, password=password) state.matryoshka_results = results # Flatten results for JSON response def flatten_results(res_list, level=0): flat = [] for r in res_list: flat.append({ 'depth': r.get('depth', level), 'type': r.get('type', 'unknown'), 'filename': r.get('filename'), 'data_size': r.get('data_size', 0), 'preview': r.get('preview', '')[:500], 'has_nested': r.get('has_nested', False), }) if r.get('nested_results'): flat.extend(flatten_results(r['nested_results'], level + 1)) return flat flat_results = flatten_results(results) return JSONResponse({ 'success': True, 'layers_found': len(flat_results), 'results': flat_results }) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/matryoshka/add_carrier') async def add_matryoshka_carrier(request: Request): """๐Ÿช† Add a carrier image to the Matryoshka stack""" try: body = await request.json() b64_data = body.get('data', '') filename = body.get('filename', 'carrier.png') img = base64_to_image(b64_data) state.matryoshka_carriers.append((img, filename)) # Calculate capacity for this carrier config = create_config(channels='RGBA', bits=2) capacity = _calculate_capacity_bytes(img, config) return JSONResponse({ 'success': True, 'carrier_count': len(state.matryoshka_carriers), 'filename': filename, 'dimensions': f"{img.size[0]}x{img.size[1]}", 'capacity': capacity }) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.post('/api/matryoshka/clear_carriers') async def clear_matryoshka_carriers(): """๐Ÿช† Clear all Matryoshka carrier images""" state.matryoshka_carriers = [] state.matryoshka_encode_result = None return JSONResponse({'success': True, 'carrier_count': 0}) @app.post('/api/matryoshka/encode') async def matryoshka_encode_api(request: Request): """๐Ÿช† Perform recursive Matryoshka encode""" try: body = await request.json() password = body.get('password') channels = body.get('channels', 'RGBA') bits = body.get('bits', 2) if not state.matryoshka_carriers: return JSONResponse({'success': False, 'error': 'No carrier images added'}) # Get payload - either from encode_file_data or encode_text if state.encode_file_data: payload = state.encode_file_data # Prepend filename if available if state.encode_file_name: fn_bytes = state.encode_file_name.encode('utf-8') payload = bytes([len(fn_bytes)]) + fn_bytes + payload elif hasattr(state, 'encode_text') and state.encode_text: payload = state.encode_text.encode('utf-8') else: return JSONResponse({'success': False, 'error': 'No payload to encode. Upload a file or enter text first.'}) config = create_config(channels=channels, bits=bits) # Carriers are stored outermost-first in UI, but we need innermost-first for encoding # So reverse the list carriers_reversed = list(reversed(state.matryoshka_carriers)) result_img, layer_info = matryoshka_encode( payload=payload, carriers=carriers_reversed, config=config, password=password ) state.matryoshka_encode_result = result_img # Convert result to base64 for preview buffer = io.BytesIO() result_img.save(buffer, format='PNG') result_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return JSONResponse({ 'success': True, 'layers': len(state.matryoshka_carriers), 'layer_info': layer_info, 'result_b64': f"data:image/png;base64,{result_b64}" }) except Exception as e: return JSONResponse({'success': False, 'error': str(e)}) @app.get('/api/matryoshka/download') async def download_matryoshka_result(): """๐Ÿช† Download the Matryoshka encoded result""" if state.matryoshka_encode_result is None: return JSONResponse({'success': False, 'error': 'No result to download'}) buffer = io.BytesIO() state.matryoshka_encode_result.save(buffer, format='PNG') buffer.seek(0) return Response( content=buffer.getvalue(), media_type='image/png', headers={'Content-Disposition': 'attachment; filename="matryoshka_encoded.png"'} ) # ============== MAIN PAGE ============== @ui.page('/') async def main_page(): # Add custom CSS ui.add_head_html(f'') ui.add_head_html('') # Matrix scanline overlay ui.html('
', sanitize=False) with ui.column().classes('w-full max-w-6xl mx-auto p-4 gap-4'): create_header() # Tab navigation with ui.tabs().classes('w-full') as tabs: encode_tab = ui.tab('ENCODE', icon='lock').classes('text-green-400') decode_tab = ui.tab('DECODE', icon='lock_open').classes('text-cyan-400') analyze_tab = ui.tab('ANALYZE', icon='search').classes('text-yellow-400') # ๐Ÿช† Hidden Matryoshka tab - revealed by easter egg matryoshka_tab = ui.tab('๐Ÿช† MATRYOSHKA', icon='auto_awesome').classes('text-pink-400') matryoshka_tab.props('id="matryoshka-tab"') matryoshka_tab.style('display: none;') # Hidden until activated # Determine initial tab from state (set by API) or URL hash tab_map = { 'encode': encode_tab, 'decode': decode_tab, 'analyze': analyze_tab, 'matryoshka': matryoshka_tab, } initial_tab = tab_map.get(state.active_tab, encode_tab) with ui.tab_panels(tabs, value=initial_tab).classes('w-full') as panels: # ==================== ENCODE PANEL ==================== with ui.tab_panel(encode_tab): await create_encode_panel() # ==================== DECODE PANEL ==================== with ui.tab_panel(decode_tab): await create_decode_panel() # ==================== ANALYZE PANEL ==================== with ui.tab_panel(analyze_tab): await create_analyze_panel() # ==================== MATRYOSHKA PANEL ==================== with ui.tab_panel(matryoshka_tab): await create_matryoshka_panel() # JavaScript to read URL hash on load and sync tabs ui.add_body_html(""" """) # Footer ui.html('
.-.-.-.-<={LOVE PLINY}=>-.-.-.-.
', sanitize=False) # ๐Ÿช† MATRYOSHKA MODE - Easter egg trigger zone (bottom-left pixel) ui.html('
', sanitize=False) # Simple mode indicator (shown when active) ui.html(''' ''', sanitize=False) # JavaScript to handle easter egg activation ui.add_body_html(""" """) async def create_encode_panel(): """Create the encoding interface""" # Use global state for persistence global state with ui.row().classes('w-full gap-4'): # Left column - Image upload and preview with ui.column().classes('w-1/2 gap-2'): ui.label('๐Ÿ“ท CARRIER IMAGE').classes('text-green-400 text-lg font-bold') # Show current image if loaded if state.carrier_image: b64 = image_to_base64(state.carrier_image) ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-64 object-contain') w, h = state.carrier_image.size ui.label(f'{state.carrier_path or "Generated"} | {w}x{h}').classes('text-green-400 text-sm') # Show capacity try: config = create_config(channels='RGB', bits=1) cap = calculate_capacity(state.carrier_image, config) ui.label(f"๐Ÿ“Š Capacity: {cap['human']} ({cap['usable_bytes']:,} bytes)").classes('text-cyan-400') except: pass else: ui.image().classes('w-full max-h-64 object-contain') ui.label('No image loaded').classes('text-gray-500 text-sm') ui.label('๐Ÿ“Š Capacity: --').classes('text-cyan-400') # Custom file picker - uses API endpoint picker_html, picker_js = create_file_picker('encode_carrier', '.png,.PNG,image/png', '/api/upload/carrier', tab='encode') ui.html(picker_html, sanitize=False) ui.add_body_html(picker_js) # Generate image buttons ui.label('Or generate a carrier:').classes('text-gray-500 text-sm mt-2') # Size selector for generated images with ui.row().classes('gap-2 items-center'): gen_size_select = ui.select( options=['512x512', '1024x1024', '2048x2048', '4096x4096'], value='1024x1024', label='Size' ).props('dense').classes('w-24') with ui.row().classes('gap-2'): async def gen_image(color: str): size_str = gen_size_select.value or '1024x1024' w, h = map(int, size_str.split('x')) state.carrier_image = generate_blank_image(w, h, color) state.carrier_path = f'{color}_{size_str}.png' state.active_tab = 'encode' ui.notify(f'Generated {color} image ({size_str})', type='positive') await asyncio.sleep(0.3) await ui.run_javascript('window.location.hash = "encode"; window.location.reload();') ui.button('๐ŸŽฒ Noise', on_click=lambda: gen_image('noise')).props('dense').classes('text-xs') ui.button('โฌ› Black', on_click=lambda: gen_image('black')).props('dense').classes('text-xs') ui.button('๐ŸŒˆ Gradient', on_click=lambda: gen_image('gradient')).props('dense').classes('text-xs') ui.button('โฌœ White', on_click=lambda: gen_image('white')).props('dense').classes('text-xs') # Right column - Data and settings with ui.column().classes('w-1/2 gap-2'): ui.label('๐Ÿ“ PAYLOAD DATA').classes('text-green-400 text-lg font-bold') # Data source toggle with ui.row().classes('gap-2'): data_source = ui.toggle( { 'text': '๐Ÿ’ฌ Text', 'file': '๐Ÿ“ File', }, value='text' ).classes('w-full') # Text input text_container = ui.column().classes('w-full') with text_container: text_input = ui.textarea( label='Secret message', placeholder='Enter your hidden message here...' ).classes('w-full').props('rows=6 dark') # File upload for payload file_container = ui.column().classes('w-full hidden') with file_container: if state.encode_file_data and state.encode_file_name: ui.label(f'๐Ÿ“„ {state.encode_file_name} ({format_size(len(state.encode_file_data))})').classes('text-green-400') else: ui.label('No file selected').classes('text-gray-500') payload_html, payload_js = create_file_picker('encode_payload', '*', '/api/upload/payload', tab='encode') ui.html(payload_html, sanitize=False) ui.add_body_html(payload_js) def toggle_data_source(): if data_source.value == 'text': text_container.classes(remove='hidden') file_container.classes(add='hidden') else: text_container.classes(add='hidden') file_container.classes(remove='hidden') data_source.on('update:model-value', toggle_data_source) ui.separator().classes('my-2') # ===== ENCODING SETTINGS ===== ui.label('โš™๏ธ ENCODING SETTINGS').classes('text-green-400 text-lg font-bold') # Advanced mode toggle advanced_toggle = ui.switch('๐Ÿ”ง Advanced Mode').classes('text-cyan-400') # Basic settings (always visible) with ui.row().classes('w-full gap-4'): channel_select = ui.select( label='Channels', options=['RGB', 'RGBA', 'R', 'G', 'B'], value='RGB' ).classes('w-1/2') bits_select = ui.select( label='Bits/Channel', options=[1, 2], value=1 ).classes('w-1/2') # Live capacity display capacity_display = ui.html('', sanitize=False).classes('w-full') def update_capacity_display(): """Update capacity display based on current settings""" if state.carrier_image is None: capacity_display.set_content( '
' '๐Ÿ“Š Capacity: Load an image to see capacity' '
' ) return try: if advanced_toggle.value: ch = adv_channel_select.value or 'RGB' b = adv_bits_select.value or 1 else: ch = channel_select.value or 'RGB' b = bits_select.value or 1 config = create_config(channels=ch, bits=b) cap = calculate_capacity(state.carrier_image, config) # Calculate fill percentage if we have data data_size = 0 if data_source.value == 'text' and text_input.value: data_size = len(text_input.value.encode('utf-8')) elif data_source.value == 'file' and state.encode_file_data: data_size = len(state.encode_file_data) + len(state.encode_file_name or '') + 1 # Don't cap at 100% - we need to detect overflow! fill_pct = (data_size / max(1, cap['usable_bytes'])) * 100 bar_width = min(100, fill_pct) if fill_pct > 100: color = '#ff3333' bar_color = '#ff3333' status = 'OVER CAPACITY' icon = 'โ›”' elif fill_pct > 80: color = '#ffd000' bar_color = '#ffd000' status = 'NEARLY FULL' icon = 'โš ๏ธ' elif fill_pct > 0: color = '#00ff41' bar_color = '#00ff41' status = 'READY' icon = 'โœ“' else: color = '#555' bar_color = '#333' status = 'NO DATA' icon = 'โ—‹' capacity_display.set_content( f'
' f'
' f'CAPACITY' f'{cap["usable_bytes"]:,} bytes' f'
' f'
' f'
' f'
' f'
' f'{icon} {status}' f'{format_size(data_size)} / {cap["human"]} ({fill_pct:.1f}%)' f'
' f'
' ) # Store capacity for encode check state.capacity_info = cap except Exception as e: capacity_display.set_content(f'
Error: {e}
') # Update capacity on various changes if state.carrier_image: update_capacity_display() # Advanced settings (hidden by default) advanced_container = ui.column().classes('w-full hidden gap-2') with advanced_container: ui.label('๐ŸŽฏ Channel & Bit Configuration').classes('text-yellow-400 text-sm') with ui.row().classes('w-full gap-4'): # All channel presets adv_channel_select = ui.select( label='All Channel Presets', options=list(CHANNEL_PRESETS.keys()), value='RGB' ).classes('w-1/2') # Full bits range adv_bits_select = ui.select( label='Bits (1-8)', options=list(range(1, 9)), value=1 ).classes('w-1/2') with ui.row().classes('w-full gap-4'): strategy_select = ui.select( label='Strategy', options=['interleaved', 'sequential', 'spread', 'randomized'], value='interleaved' ).classes('w-1/2') bit_offset_select = ui.select( label='Bit Offset (0=LSB, 7=MSB)', options=list(range(8)), value=0 ).classes('w-1/2') ui.separator().classes('my-2') ui.label('๐ŸŽฒ Randomization').classes('text-yellow-400 text-sm') with ui.row().classes('w-full gap-4'): seed_input = ui.number( label='Random Seed (manual)', value=None, placeholder='Optional' ).classes('w-1/2') seed_password = ui.input( label='Or derive seed from password', placeholder='Enter passphrase...' ).classes('w-1/2') def derive_seed_from_password(): """Derive a numeric seed from password hash""" if seed_password.value: import hashlib hash_bytes = hashlib.sha256(seed_password.value.encode()).digest() derived = int.from_bytes(hash_bytes[:4], 'big') seed_input.value = derived ui.notify(f'Derived seed: {derived}', type='info') ui.button('๐Ÿ”‘ Derive Seed', on_click=derive_seed_from_password).props('dense').classes('text-xs') ui.separator().classes('my-2') ui.label('๐Ÿ›ก๏ธ Stealth & Obfuscation').classes('text-yellow-400 text-sm') with ui.row().classes('w-full gap-4'): compress_toggle = ui.switch('Compress data', value=True).classes('w-1/2') invert_toggle = ui.switch('XOR invert bits', value=False).classes('w-1/2') ui.html( '
' 'โ€ข Compression reduces payload size but adds header bytes
' 'โ€ข XOR invert flips all bits for additional obfuscation' '
', sanitize=False ) ui.separator().classes('my-2') ui.label('๐Ÿ“Š Encoding Statistics').classes('text-yellow-400 text-sm') stats_display = ui.html('', sanitize=False) def update_stats(): """Show encoding statistics preview""" if state.carrier_image is None: stats_display.set_content('
Load carrier to see stats
') return w, h = state.carrier_image.size total_pixels = w * h ch = adv_channel_select.value or 'RGB' b = adv_bits_select.value or 1 num_channels = len(CHANNEL_PRESETS.get(ch, [])) bits_per_pixel = num_channels * b total_bits = total_pixels * bits_per_pixel total_bytes = total_bits // 8 # Estimate pixels modified for a 1KB payload sample_payload = 1024 * 8 # 1KB in bits pixels_for_1kb = sample_payload // bits_per_pixel if bits_per_pixel else 0 stats_display.set_content( f'
' f'Image: {w}x{h} = {total_pixels:,} pixels
' f'Channels: {ch} ({num_channels} channels)
' f'Bits/pixel: {bits_per_pixel} ({b} bits ร— {num_channels} ch)
' f'Raw capacity: {total_bytes:,} bytes
' f'Pixels for 1KB: ~{pixels_for_1kb:,} ({pixels_for_1kb/total_pixels*100:.2f}% of image)' f'
' ) # Initial stats update update_stats() # Update stats when settings change adv_channel_select.on('update:model-value', lambda: (update_stats(), update_capacity_display())) adv_bits_select.on('update:model-value', lambda: (update_stats(), update_capacity_display())) def toggle_advanced(): if advanced_toggle.value: advanced_container.classes(remove='hidden') # Sync values adv_channel_select.value = channel_select.value adv_bits_select.value = bits_select.value update_stats() update_capacity_display() else: advanced_container.classes(add='hidden') update_capacity_display() advanced_toggle.on('update:model-value', toggle_advanced) # Also update capacity when basic settings change channel_select.on('update:model-value', update_capacity_display) bits_select.on('update:model-value', update_capacity_display) # Encryption settings ui.separator().classes('my-2') ui.label('๐Ÿ” ENCRYPTION').classes('text-green-400 text-lg font-bold') encrypt_toggle = ui.switch('Enable Encryption').classes('text-cyan-400') encrypt_container = ui.column().classes('w-full hidden gap-2') with encrypt_container: password_input = ui.input( label='Password', password=True, password_toggle_button=True ).classes('w-full') crypto_methods = crypto.get_available_methods() encrypt_method = ui.select( label='Method', options=crypto_methods, value=crypto_methods[0] if crypto_methods else 'xor' ).classes('w-full') def toggle_encrypt(): if encrypt_toggle.value: encrypt_container.classes(remove='hidden') else: encrypt_container.classes(add='hidden') encrypt_toggle.on('update:model-value', toggle_encrypt) ui.separator().classes('my-4') # ENCODE BUTTON & OUTPUT with ui.row().classes('w-full justify-center gap-4'): async def do_encode(): if state.carrier_image is None: ui.notify('Please upload or generate a carrier image first', type='negative') return # Get data to encode if data_source.value == 'text': if not text_input.value: ui.notify('Please enter some text to hide', type='negative') return data = text_input.value.encode('utf-8') else: if not state.encode_file_data: ui.notify('Please upload a file to hide', type='negative') return # Prepend filename for extraction filename_bytes = state.encode_file_name.encode('utf-8') data = bytes([len(filename_bytes)]) + filename_bytes + state.encode_file_data # Build config if advanced_toggle.value: config = create_config( channels=adv_channel_select.value or 'RGB', bits=adv_bits_select.value or 1, strategy=strategy_select.value or 'interleaved', bit_offset=bit_offset_select.value or 0, seed=int(seed_input.value) if seed_input.value else None, compress=compress_toggle.value, ) else: config = create_config( channels=channel_select.value or 'RGB', bits=bits_select.value or 1, ) # Check capacity before proceeding cap = calculate_capacity(state.carrier_image, config) # Account for header overhead (~32 bytes + compression metadata) estimated_size = len(data) + 64 # Conservative estimate with header if estimated_size > cap['usable_bytes']: ui.notify( f'โ›” Data too large! Need {format_size(estimated_size)}, but capacity is {cap["human"]}. ' f'Try: more bits, more channels, or larger image.', type='negative', timeout=8000 ) return # Encrypt if enabled if encrypt_toggle.value: if not password_input.value: ui.notify('Please enter an encryption password', type='negative') return data = crypto.encrypt(data, password_input.value, encrypt_method.value) # XOR invert if enabled (advanced mode only) if advanced_toggle.value and invert_toggle.value: data = bytes(b ^ 0xFF for b in data) try: result = encode(state.carrier_image, data, config) # Calculate encoding stats data_bits = len(data) * 8 if advanced_toggle.value: ch = adv_channel_select.value or 'RGB' b = adv_bits_select.value or 1 else: ch = channel_select.value or 'RGB' b = bits_select.value or 1 num_ch = len(CHANNEL_PRESETS.get(ch, [])) bits_per_pixel = num_ch * b pixels_used = (data_bits // bits_per_pixel) + 1 if bits_per_pixel else 0 total_pixels = state.carrier_image.size[0] * state.carrier_image.size[1] pct_used = (pixels_used / total_pixels) * 100 if total_pixels else 0 # Show result b64 = image_to_base64(result) result_preview.set_source(f'data:image/png;base64,{b64}') result_container.classes(remove='hidden') # Update result stats result_stats.set_content( f'
' f'โœ“ Encoded {format_size(len(data))} into {pixels_used:,} pixels ({pct_used:.2f}% of image)
' f'Config: {ch} @ {b} bits | Strategy: {strategy_select.value if advanced_toggle.value else "interleaved"}' f'{"
๐Ÿ” Encrypted" if encrypt_toggle.value else ""}' f'{"
๐Ÿ”€ XOR Inverted" if advanced_toggle.value and invert_toggle.value else ""}' f'
' ) # Store for download state.encode_result = result ui.notify('โœ“ Encoding successful!', type='positive') except Exception as e: ui.notify(f'Encoding failed: {str(e)}', type='negative') ui.button('๐Ÿ”’ ENCODE', on_click=do_encode).classes( 'bg-green-600 hover:bg-green-500 text-black font-bold px-8 py-2 text-lg' ) # Result display result_container = ui.column().classes('w-full hidden gap-2 mt-4') with result_container: ui.label('โœ“ ENCODED IMAGE').classes('text-green-400 text-lg font-bold') result_stats = ui.html('', sanitize=False).classes('w-full') result_preview = ui.image().classes('w-full max-h-96 object-contain') async def download_result(): if state.encode_result is None: return # Create download buffer = io.BytesIO() state.encode_result.save(buffer, format='PNG') buffer.seek(0) ui.download(buffer.getvalue(), 'stego_output.png') ui.button('๐Ÿ’พ Download Encoded Image', on_click=download_result).classes( 'bg-cyan-600 hover:bg-cyan-500 text-black font-bold' ) async def create_decode_panel(): """Create the decoding interface - Aperi'Solve inspired""" global state # Store scan results for UI updates scan_results_data = {"results": [], "selected": None} with ui.row().classes('w-full gap-4'): # Left column - Image upload with ui.column().classes('w-1/2 gap-2'): ui.label('๐Ÿ–ผ๏ธ STEGO IMAGE').classes('text-cyan-400 text-lg font-bold') # Show current image if loaded if state.decode_image: b64 = image_to_base64(state.decode_image) ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-48 object-contain') w, h = state.decode_image.size ui.label(f'Loaded | {w}x{h} | {w*h:,} pixels').classes('text-green-400 text-sm') # Quick detection status if state.detected_config: ui.html( f'
' f'โœ“ STEG v3 header detected
' f'Channels: {state.detected_config["config"]["channels"]}
' f'Payload: {format_size(state.detected_config["original_length"])}' f'
', sanitize=False ) else: ui.html( '
' 'โš  No STEG header - use Smart Scan to detect LSB data' '
', sanitize=False ) else: ui.image().classes('w-full max-h-48 object-contain') ui.label('No image loaded').classes('text-gray-500 text-sm') # Custom file picker decode_html, decode_js = create_file_picker('decode_image', '.png,.PNG,image/png', '/api/upload/decode', tab='decode') ui.html(decode_html, sanitize=False) ui.add_body_html(decode_js) # Right column - Decode options with ui.column().classes('w-1/2 gap-2'): ui.label('โš™๏ธ DECODE OPTIONS').classes('text-cyan-400 text-lg font-bold') # Decryption password (optional) ui.label('๐Ÿ”“ Decryption (optional)').classes('text-sm text-gray-400') dec_password_input = ui.input( label='Password (leave empty if unencrypted)', password=True, password_toggle_button=True ).classes('w-full') ui.separator().classes('my-2') # Manual decode settings ui.label('๐Ÿ“‹ Manual Config (for targeted decode)').classes('text-sm text-gray-400') with ui.row().classes('w-full gap-2'): dec_channel_select = ui.select( label='Channels', options=list(CHANNEL_PRESETS.keys()), value='RGB' ).classes('w-1/3') dec_bits_select = ui.select( label='Bits', options=list(range(1, 9)), value=1 ).classes('w-1/3') dec_strategy_select = ui.select( label='Strategy', options=['interleaved', 'sequential', 'spread', 'randomized'], value='interleaved' ).classes('w-1/3') ui.separator().classes('my-4') # ACTION BUTTONS with ui.row().classes('w-full justify-center gap-4'): async def do_smart_scan(): """Run Aperi'Solve-style multi-config scan""" if state.decode_image is None: ui.notify('Please upload an image first', type='negative') return ui.notify('๐Ÿ” Scanning all channel combinations...', type='info') try: password = dec_password_input.value if dec_password_input.value else None results = smart_scan_image(state.decode_image, password) scan_results_data["results"] = results # Build results HTML table html = '
' html += '' html += '' html += '' html += '' html += '' html += '' html += '' html += '' html += '' for i, r in enumerate(results): # Status color coding if r["status"] == "STEG_DETECTED" or r["status"] == "STEG_HEADER": status_color = "#00ff00" status_icon = "โœ“" row_bg = "rgba(0, 255, 0, 0.15)" elif r["status"] == "TEXT_FOUND": status_color = "#00ffff" status_icon = "๐Ÿ“" row_bg = "rgba(0, 255, 255, 0.15)" elif r["status"] == "POSSIBLE_TEXT": status_color = "#ffff00" status_icon = "?" row_bg = "rgba(255, 255, 0, 0.1)" elif r["status"] == "MIXED_DATA": status_color = "#ff9900" status_icon = "~" row_bg = "transparent" else: status_color = "#666666" status_icon = "ยท" row_bg = "transparent" # Confidence bar conf = r.get("confidence", 0) conf_bar = f'
' # Preview (escape HTML) preview = r.get("preview", "")[:60] preview_escaped = preview.replace("<", "<").replace(">", ">").replace("&", "&") if len(r.get("preview", "")) > 60: preview_escaped += "..." channels_str = ",".join(r["channels"]) if isinstance(r["channels"], list) else str(r["channels"]) html += f'' html += f'' html += f'' html += f'' html += f'' html += f'' html += f'' html += '' html += '
ConfigChannelsBitsStatusConfPreview
{r["name"]}{channels_str}{r["bits"]}{status_icon} {r["status"]}{conf_bar}{preview_escaped}
' # Summary text_found = sum(1 for r in results if r["status"] in ["TEXT_FOUND", "STEG_DETECTED", "STEG_HEADER"]) possible = sum(1 for r in results if r["status"] == "POSSIBLE_TEXT") summary = f'
' summary += f'๐Ÿ“Š Scanned {len(results)} configurations | ' summary += f'Found: {text_found} likely | ' summary += f'Possible: {possible}' summary += '
' scan_results_container.set_content(html + summary) scan_results_container.classes(remove='hidden') if text_found > 0: ui.notify(f'โœ“ Found {text_found} potential hidden data!', type='positive') else: ui.notify('Scan complete - no obvious text found', type='warning') except Exception as e: ui.notify(f'Scan failed: {str(e)}', type='negative') async def do_decode(): """Standard decode with current settings""" if state.decode_image is None: ui.notify('Please upload an image first', type='negative') return config = create_config( channels=dec_channel_select.value or 'RGB', bits=dec_bits_select.value or 1, strategy=dec_strategy_select.value or 'interleaved', ) try: data = decode(state.decode_image, config) # Decrypt if needed if dec_password_input.value: try: data = crypto.decrypt(data, dec_password_input.value) except Exception as e: ui.notify(f'Decryption failed: {e}', type='warning') # Process extracted data process_extracted_data(data, output_text, file_download_btn) ui.notify('โœ“ Decode successful!', type='positive') except Exception as e: ui.notify(f'Decode failed: {str(e)}', type='negative') output_text.set_value(f"Error: {str(e)}\n\nTry using Smart Scan to detect the correct configuration.") async def do_auto_decode(): """Auto-decode using detected STEG header""" if state.decode_image is None: ui.notify('Please upload an image first', type='negative') return try: # Use auto-detection (config=None) data = decode(state.decode_image, None) # Decrypt if needed if dec_password_input.value: try: data = crypto.decrypt(data, dec_password_input.value) except Exception as e: ui.notify(f'Decryption failed: {e}', type='warning') # Process extracted data process_extracted_data(data, output_text, file_download_btn) ui.notify('โœ“ Auto-decode successful!', type='positive') except Exception as e: ui.notify(f'Auto-decode failed: {str(e)}. Try Smart Scan.', type='negative') output_text.set_value(f"Auto-decode failed: {str(e)}\n\nThe image may not contain STEG v3 header.\nTry using Smart Scan to detect raw LSB data.") ui.button('๐Ÿ” SMART SCAN', on_click=do_smart_scan).classes( 'bg-purple-600 hover:bg-purple-500 text-white font-bold px-6 py-2' ).props('unelevated') ui.button('๐Ÿค– AUTO DECODE', on_click=do_auto_decode).classes( 'bg-cyan-600 hover:bg-cyan-500 text-black font-bold px-6 py-2' ) ui.button('๐Ÿ“‹ MANUAL DECODE', on_click=do_decode).classes( 'bg-gray-600 hover:bg-gray-500 text-white font-bold px-6 py-2' ) # ๐Ÿช† MATRYOSHKA MODE BUTTON (only shows when matryoshka mode is active) matryoshka_btn_container = ui.row().classes('w-full justify-center gap-4 mt-2') with matryoshka_btn_container: async def do_matryoshka_decode(): """๐Ÿช† Perform recursive Matryoshka decode""" if state.decode_image is None: ui.notify('Please upload an image first', type='negative') return if not state.matryoshka_mode: ui.notify('๐Ÿช† Matryoshka mode not active! Click the bottom-left corner to activate.', type='warning') return ui.notify(f'๐Ÿช† Starting Matryoshka decode (depth: {state.matryoshka_depth})...', type='info') try: password = dec_password_input.value if dec_password_input.value else None results = matryoshka_decode(state.decode_image, max_depth=state.matryoshka_depth, password=password) state.matryoshka_results = results # Build nested results display def build_results_html(res_list, indent=0): html = '' for r in res_list: depth = r.get('depth', indent) indent_px = depth * 24 doll_icon = '๐Ÿช†' * (depth + 1) # More dolls for deeper layers # Color based on type if r.get('type') == 'nested_image' or r.get('type') == 'nested_image_raw': color = '#ff00ff' icon = '๐Ÿ–ผ๏ธ' elif r.get('type') == 'text': color = '#00ff00' icon = '๐Ÿ“' elif r.get('type') == 'file': color = '#00ffff' icon = '๐Ÿ“' elif r.get('type') == 'no_data_found': color = '#666666' icon = 'โˆ…' elif r.get('type') == 'max_depth_reached': color = '#ffff00' icon = 'โš ๏ธ' else: color = '#888888' icon = '?' html += f'''
{doll_icon} Layer {depth} {icon} {r.get('filename') or r.get('type', 'unknown')}
Size: {format_size(r.get('data_size', 0))}
{r.get('preview', '')[:200]}
''' # Recurse into nested results if r.get('nested_results'): html += build_results_html(r['nested_results'], indent + 1) return html results_html = build_results_html(results) # Count layers found def count_layers(res_list): count = len(res_list) for r in res_list: if r.get('nested_results'): count += count_layers(r['nested_results']) return count total_layers = count_layers(results) matryoshka_results_container.set_content( f'''
๐Ÿช† MATRYOSHKA DECODE RESULTS ({total_layers} layers found)
{results_html}
''' ) matryoshka_results_container.classes(remove='hidden') # Also extract the deepest/last data for download def get_deepest_data(res_list): for r in reversed(res_list): if r.get('nested_results'): deep = get_deepest_data(r['nested_results']) if deep: return deep if r.get('raw_data'): return r return None deepest = get_deepest_data(results) if deepest and deepest.get('raw_data'): state.extracted_file = deepest['raw_data'] state.extracted_filename = deepest.get('filename', 'extracted_data.bin') file_download_btn.classes(remove='hidden') # Also show in output text try: text = deepest['raw_data'].decode('utf-8') output_text.set_value(f"๐Ÿช† Deepest layer content:\n\n{text}") except: output_text.set_value(f"๐Ÿช† Deepest layer: {deepest.get('filename', 'binary data')} ({format_size(len(deepest['raw_data']))})") ui.notify(f'๐Ÿช† Matryoshka decode complete! Found {total_layers} layers.', type='positive') except Exception as e: ui.notify(f'๐Ÿช† Matryoshka decode failed: {str(e)}', type='negative') matryoshka_results_container.set_content( f'
Error: {str(e)}
' ) matryoshka_decode_btn = ui.button('๐Ÿช† MATRYOSHKA DECODE', on_click=do_matryoshka_decode).classes( 'bg-pink-600 hover:bg-pink-500 text-white font-bold px-6 py-2' ).props('unelevated').style('display: none;') # Show/hide based on matryoshka mode state ui.add_body_html(''' ''') # ๐Ÿช† MATRYOSHKA RESULTS CONTAINER matryoshka_results_container = ui.html('', sanitize=False).classes('w-full hidden mt-4') # SCAN RESULTS (hidden until scan runs) ui.label('๐Ÿ“Š SCAN RESULTS').classes('text-purple-400 text-lg font-bold mt-4') scan_results_container = ui.html( '
' 'Click "Smart Scan" to analyze all channel combinations' '
', sanitize=False ).classes('w-full') ui.separator().classes('my-4') # OUTPUT SECTION ui.label('๐Ÿ“ค EXTRACTED DATA').classes('text-cyan-400 text-lg font-bold') output_text = ui.textarea( label='Decoded output', placeholder='Extracted data will appear here...' ).classes('w-full terminal').props('rows=8 readonly dark') with ui.row().classes('gap-2'): async def download_file(): if state.extracted_file is None: return ui.download(state.extracted_file, state.extracted_filename) file_download_btn = ui.button('๐Ÿ’พ Download Extracted File', on_click=download_file).classes( 'bg-green-600 hover:bg-green-500 text-black font-bold hidden' ) async def download_raw(): if not output_text.value: return ui.download(output_text.value.encode('utf-8'), 'extracted_text.txt') ui.button('๐Ÿ“„ Download as Text', on_click=download_raw).classes( 'bg-gray-600 hover:bg-gray-500 text-white font-bold' ) def process_extracted_data(data: bytes, output_text, file_download_btn): """Process and display extracted data""" try: # Check if it's a file (starts with filename length byte) if len(data) > 2 and data[0] < 255: fname_len = data[0] if fname_len > 0 and fname_len < 200: try: filename = data[1:1+fname_len].decode('utf-8') file_data = data[1+fname_len:] # Check if it looks like a valid filename if '.' in filename and len(filename) < 200 and '/' not in filename: state.extracted_file = file_data state.extracted_filename = filename output_text.set_value( f"๐Ÿ“ Extracted file: {filename}\n" f" Size: {format_size(len(file_data))}\n\n" f"(Click 'Download Extracted File' to save)" ) file_download_btn.classes(remove='hidden') return except: pass # Try as plain text text = data.decode('utf-8') output_text.set_value(text) file_download_btn.classes(add='hidden') except UnicodeDecodeError: # Binary data - show hex preview hex_preview = data[:512].hex() formatted = ' '.join(hex_preview[i:i+2] for i in range(0, min(len(hex_preview), 512), 2)) output_text.set_value( f"Binary data ({format_size(len(data))}):\n\n{formatted}" + ("..." if len(data) > 256 else "") ) state.extracted_file = data state.extracted_filename = 'extracted_data.bin' file_download_btn.classes(remove='hidden') async def create_matryoshka_panel(): """Create the Matryoshka (nested steganography) interface""" global state ui.html('''
๐Ÿช† MATRYOSHKA MODE ๐Ÿช†
Russian Nesting Doll Steganography
''', sanitize=False) with ui.row().classes('w-full gap-6'): # ==================== ENCODE SECTION ==================== with ui.column().classes('w-1/2 gap-3'): ui.label('๐Ÿ” NESTED ENCODE').classes('text-pink-400 text-lg font-bold') ui.html('
Add carrier images (outermost first). Each layer wraps the previous.
', sanitize=False) # Carrier list with capacity info carrier_container = ui.column().classes('w-full gap-1 p-3').style( 'background: rgba(255,0,255,0.05); border: 1px solid rgba(255,0,255,0.3); border-radius: 8px; min-height: 120px;' ) def refresh_carriers(): carrier_container.clear() with carrier_container: if not state.matryoshka_carriers: ui.html('
No carriers added yet
', sanitize=False) else: config = create_config(channels='RGBA', bits=2) for i, (img, name) in enumerate(state.matryoshka_carriers): cap = _calculate_capacity_bytes(img, config) layer_num = len(state.matryoshka_carriers) - i with ui.row().classes('w-full items-center gap-2 p-2').style( 'background: rgba(255,0,255,0.1); border-radius: 4px; margin-bottom: 4px;' ): ui.html(f'L{layer_num}', sanitize=False) ui.label(f'{name[:25]}').classes('text-pink-300 text-sm flex-grow') ui.label(f'{img.size[0]}x{img.size[1]}').classes('text-gray-500 text-xs') ui.label(f'{cap//1024}KB').classes('text-pink-400 text-xs') refresh_carriers() # Carrier upload carrier_upload_html, carrier_upload_js = create_drop_zone( 'matryoshka_carrier', 'image/png,image/jpeg,image/webp', '/api/matryoshka/add_carrier', 'matryoshka' ) ui.html(carrier_upload_html, sanitize=False) ui.add_body_html(carrier_upload_js) with ui.row().classes('w-full gap-2'): def clear_carriers(): state.matryoshka_carriers = [] refresh_carriers() ui.notify('๐Ÿช† Carriers cleared', type='info') ui.button('๐Ÿ—‘๏ธ Clear Carriers', on_click=clear_carriers).props('flat dense').classes('text-pink-400') ui.separator().classes('my-2') # Payload info ui.html('
PAYLOAD
', sanitize=False) with ui.row().classes('w-full items-center gap-2 p-2').style('background: rgba(0,0,0,0.3); border-radius: 4px;'): if state.encode_file_data: ui.icon('description').classes('text-pink-400') ui.label(f'{state.encode_file_name or "file"} ({len(state.encode_file_data)} bytes)').classes('text-pink-300 text-sm') else: ui.icon('text_fields').classes('text-gray-500') ui.label('Set payload in ENCODE tab first').classes('text-gray-500 text-sm') # Encode button async def do_encode(): if not state.matryoshka_carriers: ui.notify('Add carrier images first!', type='warning') return if not state.encode_file_data and not (hasattr(state, 'encode_text') and state.encode_text): ui.notify('Set payload in ENCODE tab first', type='warning') return ui.notify('๐Ÿช† Encoding nested layers...', type='info') ui.run_javascript(''' fetch('/api/matryoshka/encode', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({channels: 'RGBA', bits: 2}) }) .then(r => r.json()) .then(data => { if (data.success) { window.location.href = '/api/matryoshka/download'; } else { alert('Error: ' + data.error); } }); ''') ui.button('๐Ÿช† ENCODE MATRYOSHKA', on_click=do_encode).props('unelevated').classes( 'w-full mt-4 bg-pink-900 text-pink-200' ).style('font-size: 14px; font-weight: bold;') # ==================== DECODE SECTION ==================== with ui.column().classes('w-1/2 gap-3'): ui.label('๐Ÿ”“ NESTED DECODE').classes('text-pink-400 text-lg font-bold') ui.html('
Upload an image and recursively extract all nested layers.
', sanitize=False) # Depth control with ui.row().classes('w-full items-center gap-3'): ui.label('Max Depth:').classes('text-pink-400') depth_slider = ui.slider(min=1, max=11, value=state.matryoshka_depth, step=1).classes('flex-grow').props('color="pink"') depth_label = ui.label(f'{state.matryoshka_depth}').classes('text-pink-300 font-bold text-lg') def update_depth(e): state.matryoshka_depth = int(depth_slider.value) depth_label.set_text(str(state.matryoshka_depth)) depth_slider.on('update:model-value', update_depth) # Image upload for decode decode_upload_html, decode_upload_js = create_drop_zone( 'matryoshka_decode', 'image/png', '/api/upload/decode', 'matryoshka' ) ui.html(decode_upload_html, sanitize=False) ui.add_body_html(decode_upload_js) # Current decode image preview if state.decode_image: b64 = image_to_base64(state.decode_image) ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-48 object-contain mt-2') # Decode button async def do_decode(): if not state.decode_image: ui.notify('Upload an image first', type='warning') return ui.notify('๐Ÿช† Decoding nested layers...', type='info') ui.run_javascript(f''' fetch('/api/matryoshka/decode', {{ method: 'POST', headers: {{'Content-Type': 'application/json'}}, body: JSON.stringify({{depth: {state.matryoshka_depth}}}) }}) .then(r => r.json()) .then(data => {{ if (data.success) {{ alert('๐Ÿช† Found ' + data.layers_found + ' nested layers!'); window.location.reload(); }} else {{ alert('Error: ' + data.error); }} }}); ''') ui.button('๐Ÿช† DECODE MATRYOSHKA', on_click=do_decode).props('unelevated').classes( 'w-full mt-4 bg-pink-900 text-pink-200' ).style('font-size: 14px; font-weight: bold;') # Results display if state.matryoshka_results: ui.separator().classes('my-3') ui.label('๐Ÿ“ฆ EXTRACTED LAYERS').classes('text-pink-400 font-bold') for result in state.matryoshka_results: with ui.card().classes('w-full p-2 mt-2').style('background: rgba(255,0,255,0.1); border: 1px solid rgba(255,0,255,0.3);'): depth = result.get('depth', 0) rtype = result.get('type', 'unknown') preview = result.get('preview', '')[:200] ui.html(f'''
Layer {depth}: {rtype}
{preview}
''', sanitize=False) async def create_analyze_panel(): """Create the analysis interface""" global state with ui.row().classes('w-full gap-4'): # Left column - Image upload with ui.column().classes('w-1/2 gap-2'): ui.label('๐Ÿ” IMAGE TO ANALYZE').classes('text-yellow-400 text-lg font-bold') # Show current image if loaded if state.analyze_image: b64 = image_to_base64(state.analyze_image) ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-64 object-contain') w, h = state.analyze_image.size ui.label(f'Loaded | {w}x{h}').classes('text-green-400 text-sm') else: ui.image().classes('w-full max-h-64 object-contain') ui.label('No image loaded').classes('text-gray-500 text-sm') # Custom file picker - uses API endpoint analyze_html, analyze_js = create_file_picker('analyze_image', '.png,.PNG,image/png', '/api/upload/analyze', tab='analyze') ui.html(analyze_html, sanitize=False) ui.add_body_html(analyze_js) # Right column - Results with ui.column().classes('w-1/2 gap-2'): ui.label('๐Ÿ“Š ANALYSIS RESULTS').classes('text-yellow-400 text-lg font-bold') analysis_output = ui.html('', sanitize=False).classes('w-full terminal p-4') ui.separator().classes('my-4') with ui.row().classes('w-full justify-center'): async def do_analyze(): if state.analyze_image is None: ui.notify('Please upload an image first', type='negative') return try: analysis = analyze_image(state.analyze_image) detection = detect_encoding(state.analyze_image) # Format output html = '
'
                html += 'โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—\n'
                html += 'โ•‘       STEGANALYSIS REPORT            โ•‘\n'
                html += 'โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ\n'

                html += f'โ•‘ Dimensions: {analysis["dimensions"]["width"]}x{analysis["dimensions"]["height"]}\n'
                html += f'โ•‘ Total Pixels: {analysis["total_pixels"]:,}\n'
                html += f'โ•‘ Format: {analysis["format"] or "PNG"}\n'
                html += 'โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ\n'

                # Detection result
                if detection:
                    html += 'โ•‘ โœ“ STEG HEADER DETECTED\n'
                    html += f'โ•‘   Channels: {detection["config"]["channels"]}\n'
                    html += f'โ•‘   Bits: {detection["config"]["bits_per_channel"]}\n'
                    html += f'โ•‘   Strategy: {detection["config"]["strategy"]}\n'
                    html += f'โ•‘   Payload: {format_size(detection["original_length"])}\n'
                else:
                    html += 'โ•‘ No STEG header detected\n'

                html += 'โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ\n'

                # Statistical analysis
                det = analysis['detection']
                if det['level'] == 'HIGH':
                    color = '#ff4444'
                elif det['level'] == 'MEDIUM':
                    color = '#ffff00'
                else:
                    color = '#00ff00'

                html += f'โ•‘ Detection Level: {det["level"]}\n'
                html += f'โ•‘ Confidence: {det["confidence"]*100:.1f}%\n'
                html += f'โ•‘ {det["recommendation"]}\n'

                html += 'โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ\n'
                html += 'โ•‘ CAPACITY BY CONFIG:\n'
                for cfg, cap in analysis['capacity_by_config'].items():
                    html += f'โ•‘   {cfg}: {cap}\n'

                html += 'โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ\n'
                html += 'โ•‘ CHANNEL ANALYSIS:\n'
                for ch_name, ch_data in analysis['channels'].items():
                    chi = ch_data['chi_square_indicator']
                    chi_color = '#ff4444' if chi > 0.5 else '#ffff00' if chi > 0.2 else '#00ff00'
                    html += f'โ•‘ {ch_name}: ฮผ={ch_data["mean"]:.1f} ฯƒ={ch_data["std"]:.1f} '
                    html += f'ฯ‡ยฒ={chi:.3f}\n'

                html += 'โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'
                html += '
' analysis_output.set_content(html) ui.notify('Analysis complete!', type='positive') except Exception as e: ui.notify(f'Analysis failed: {str(e)}', type='negative') ui.button('๐Ÿ”ฌ ANALYZE', on_click=do_analyze).classes( 'bg-yellow-600 hover:bg-yellow-500 text-black font-bold px-8 py-2 text-lg' ) # ============== MAIN ============== def main(): """Run the web UI""" print(STEGO_ASCII) print("\n๐Ÿฆ• Starting STEGOSAURUS WRECKS Web UI...") print(" Open http://localhost:8080 in your browser\n") ui.run( title='STEGOSAURUS WRECKS', favicon='๐Ÿฆ•', dark=True, port=8080, reload=False, ) if __name__ in {"__main__", "__mp_main__"}: main()