mirror of
https://github.com/elder-plinius/STEGOSAURUS-WRECKS.git
synced 2026-04-21 19:55:57 +02:00
2994 lines
119 KiB
Python
2994 lines
119 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
STEGOSAURUS WRECKS - NiceGUI Web Interface
|
||
Ultimate steganography suite with hacker aesthetic
|
||
|
||
Run with: python webui.py
|
||
"""
|
||
|
||
from nicegui import ui, app
|
||
from fastapi import Request
|
||
from fastapi.responses import JSONResponse
|
||
from PIL import Image
|
||
import io
|
||
import base64
|
||
import tempfile
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
import asyncio
|
||
import json
|
||
|
||
# Import our steg modules
|
||
import steg_core
|
||
from steg_core import (
|
||
encode, decode, create_config, calculate_capacity,
|
||
analyze_image, detect_encoding, CHANNEL_PRESETS,
|
||
Channel, StegConfig, EncodingStrategy, _extract_bit_units,
|
||
_bits_array_to_bytes, _generate_pixel_indices
|
||
)
|
||
import crypto
|
||
import numpy as np
|
||
import re
|
||
import string
|
||
|
||
# ============== THEME / STYLING ==============
|
||
|
||
DARK_CSS = """
|
||
:root {
|
||
--primary: #00ff41;
|
||
--primary-dim: #00aa2a;
|
||
--primary-glow: rgba(0, 255, 65, 0.4);
|
||
--secondary: #00d4ff;
|
||
--accent: #ff00ff;
|
||
--warning: #ffd000;
|
||
--error: #ff3333;
|
||
--bg-dark: #0a0a0a;
|
||
--bg-card: #0d1117;
|
||
--bg-hover: #161b22;
|
||
--text: #00ff41;
|
||
--text-dim: #555;
|
||
--border: #30363d;
|
||
}
|
||
|
||
body {
|
||
background: var(--bg-dark) !important;
|
||
font-family: 'JetBrains Mono', 'Fira Code', 'Courier New', monospace !important;
|
||
}
|
||
|
||
.q-card {
|
||
background: var(--bg-card) !important;
|
||
border: 1px solid var(--border) !important;
|
||
}
|
||
|
||
.nicegui-content {
|
||
background: var(--bg-dark) !important;
|
||
}
|
||
|
||
/* Matrix scanline effect - subtler */
|
||
.matrix-bg {
|
||
background: repeating-linear-gradient(
|
||
0deg,
|
||
transparent,
|
||
transparent 2px,
|
||
rgba(0, 255, 65, 0.015) 2px,
|
||
rgba(0, 255, 65, 0.015) 4px
|
||
);
|
||
pointer-events: none;
|
||
position: fixed;
|
||
top: 0;
|
||
left: 0;
|
||
width: 100%;
|
||
height: 100%;
|
||
z-index: 9999;
|
||
}
|
||
|
||
/* Glow effect for headers */
|
||
.glow {
|
||
text-shadow: 0 0 10px var(--primary), 0 0 20px var(--primary-glow);
|
||
}
|
||
|
||
/* Cyber border animation */
|
||
@keyframes border-pulse {
|
||
0%, 100% { border-color: var(--primary); box-shadow: 0 0 5px var(--primary-glow); }
|
||
50% { border-color: var(--secondary); box-shadow: 0 0 15px rgba(0, 212, 255, 0.3); }
|
||
}
|
||
|
||
@keyframes glow-pulse {
|
||
0%, 100% { opacity: 0.6; }
|
||
50% { opacity: 1; }
|
||
}
|
||
|
||
.cyber-border {
|
||
border: 1px solid var(--primary) !important;
|
||
animation: border-pulse 3s infinite;
|
||
}
|
||
|
||
/* Status indicators */
|
||
.status-success { color: var(--primary) !important; }
|
||
.status-error { color: var(--error) !important; }
|
||
.status-warning { color: var(--warning) !important; }
|
||
.status-info { color: var(--secondary) !important; }
|
||
|
||
/* Custom scrollbar */
|
||
::-webkit-scrollbar { width: 6px; }
|
||
::-webkit-scrollbar-track { background: var(--bg-dark); }
|
||
::-webkit-scrollbar-thumb { background: var(--primary-dim); border-radius: 3px; }
|
||
::-webkit-scrollbar-thumb:hover { background: var(--primary); }
|
||
|
||
/* File upload area - MODERNIZED */
|
||
.drop-zone {
|
||
border: 1px solid var(--border) !important;
|
||
background: linear-gradient(135deg, rgba(0, 255, 65, 0.03) 0%, rgba(0, 212, 255, 0.03) 100%) !important;
|
||
transition: all 0.3s ease;
|
||
padding: 16px;
|
||
text-align: center;
|
||
cursor: pointer;
|
||
border-radius: 8px;
|
||
position: relative;
|
||
overflow: hidden;
|
||
}
|
||
|
||
.drop-zone::before {
|
||
content: '';
|
||
position: absolute;
|
||
top: 0;
|
||
left: -100%;
|
||
width: 100%;
|
||
height: 100%;
|
||
background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.1), transparent);
|
||
transition: left 0.5s ease;
|
||
}
|
||
|
||
.drop-zone:hover {
|
||
border-color: var(--primary) !important;
|
||
background: linear-gradient(135deg, rgba(0, 255, 65, 0.08) 0%, rgba(0, 212, 255, 0.05) 100%) !important;
|
||
box-shadow: 0 0 20px rgba(0, 255, 65, 0.15), inset 0 0 30px rgba(0, 255, 65, 0.05);
|
||
}
|
||
|
||
.drop-zone:hover::before {
|
||
left: 100%;
|
||
}
|
||
|
||
/* Terminal style output */
|
||
.terminal {
|
||
background: #000 !important;
|
||
border: 1px solid var(--border) !important;
|
||
font-family: 'JetBrains Mono', monospace !important;
|
||
color: var(--primary) !important;
|
||
padding: 16px !important;
|
||
white-space: pre-wrap;
|
||
overflow-x: auto;
|
||
border-radius: 6px;
|
||
}
|
||
|
||
/* Button overrides for cooler look */
|
||
.q-btn {
|
||
text-transform: uppercase !important;
|
||
letter-spacing: 1px !important;
|
||
font-weight: 600 !important;
|
||
}
|
||
|
||
/* Input fields */
|
||
.q-field--dark .q-field__control {
|
||
background: var(--bg-card) !important;
|
||
border: 1px solid var(--border) !important;
|
||
border-radius: 6px !important;
|
||
}
|
||
|
||
.q-field--dark .q-field__control:hover {
|
||
border-color: var(--primary-dim) !important;
|
||
}
|
||
|
||
.q-field--focused .q-field__control {
|
||
border-color: var(--primary) !important;
|
||
box-shadow: 0 0 10px var(--primary-glow) !important;
|
||
}
|
||
|
||
/* Select dropdowns */
|
||
.q-menu {
|
||
background: var(--bg-card) !important;
|
||
border: 1px solid var(--border) !important;
|
||
}
|
||
|
||
/* Tabs styling */
|
||
.q-tab--active {
|
||
color: var(--primary) !important;
|
||
text-shadow: 0 0 10px var(--primary-glow);
|
||
}
|
||
|
||
.q-tab:hover {
|
||
color: var(--secondary) !important;
|
||
}
|
||
|
||
/* Switches */
|
||
.q-toggle__track {
|
||
background: var(--border) !important;
|
||
}
|
||
|
||
.q-toggle--active .q-toggle__track {
|
||
background: var(--primary-dim) !important;
|
||
}
|
||
|
||
/* Image preview container */
|
||
.image-preview {
|
||
border: 1px solid var(--border);
|
||
border-radius: 8px;
|
||
overflow: hidden;
|
||
background: #000;
|
||
}
|
||
|
||
/* Stats/info boxes */
|
||
.info-box {
|
||
background: linear-gradient(135deg, var(--bg-card) 0%, var(--bg-hover) 100%);
|
||
border: 1px solid var(--border);
|
||
border-radius: 8px;
|
||
padding: 12px;
|
||
}
|
||
|
||
.info-box:hover {
|
||
border-color: var(--primary-dim);
|
||
}
|
||
|
||
/* Section headers */
|
||
.section-header {
|
||
font-size: 14px;
|
||
font-weight: 700;
|
||
letter-spacing: 2px;
|
||
text-transform: uppercase;
|
||
margin-bottom: 12px;
|
||
display: flex;
|
||
align-items: center;
|
||
gap: 8px;
|
||
}
|
||
|
||
.section-header::after {
|
||
content: '';
|
||
flex: 1;
|
||
height: 1px;
|
||
background: linear-gradient(90deg, var(--border), transparent);
|
||
}
|
||
|
||
/* Capacity meter */
|
||
.capacity-meter {
|
||
background: var(--bg-card);
|
||
border: 1px solid var(--border);
|
||
border-radius: 6px;
|
||
padding: 12px;
|
||
margin: 8px 0;
|
||
}
|
||
|
||
.capacity-bar-outer {
|
||
background: var(--bg-dark);
|
||
border-radius: 4px;
|
||
height: 6px;
|
||
overflow: hidden;
|
||
margin: 8px 0;
|
||
}
|
||
|
||
.capacity-bar-inner {
|
||
height: 100%;
|
||
border-radius: 4px;
|
||
transition: width 0.3s ease, background 0.3s ease;
|
||
}
|
||
|
||
/* Notification styling */
|
||
.q-notification {
|
||
background: var(--bg-card) !important;
|
||
border: 1px solid var(--border) !important;
|
||
}
|
||
|
||
/* 🪆 MATRYOSHKA MODE - Easter egg activation zone (stealth) */
|
||
.matryoshka-trigger {
|
||
position: fixed;
|
||
bottom: 0;
|
||
left: 0;
|
||
width: 12px;
|
||
height: 12px;
|
||
z-index: 99999;
|
||
opacity: 0;
|
||
background: transparent;
|
||
pointer-events: auto !important;
|
||
}
|
||
|
||
.matryoshka-trigger:hover {
|
||
opacity: 0;
|
||
}
|
||
|
||
/* Matryoshka mode indicator */
|
||
.matryoshka-active {
|
||
animation: matryoshka-glow 2s ease-in-out infinite;
|
||
}
|
||
|
||
@keyframes matryoshka-glow {
|
||
0%, 100% { box-shadow: 0 0 5px #ff00ff, 0 0 10px #ff00ff; }
|
||
50% { box-shadow: 0 0 20px #ff00ff, 0 0 40px #ff00ff, 0 0 60px #ff00ff; }
|
||
}
|
||
|
||
/* Nested results tree styling */
|
||
.matryoshka-tree {
|
||
font-family: monospace;
|
||
padding: 12px;
|
||
background: #000;
|
||
border: 1px solid var(--accent);
|
||
border-radius: 6px;
|
||
}
|
||
|
||
.matryoshka-layer {
|
||
padding-left: 20px;
|
||
border-left: 2px solid var(--accent);
|
||
margin-left: 10px;
|
||
}
|
||
|
||
.matryoshka-layer-0 { border-color: #ff00ff; }
|
||
.matryoshka-layer-1 { border-color: #ff66ff; }
|
||
.matryoshka-layer-2 { border-color: #ff99ff; }
|
||
.matryoshka-layer-3 { border-color: #ffccff; }
|
||
"""
|
||
|
||
BANNER_ASCII = """
|
||
███████╗████████╗███████╗ ██████╗ ██████╗ ███████╗ █████╗ ██╗ ██╗██████╗ ██╗ ██╗███████╗
|
||
██╔════╝╚══██╔══╝██╔════╝██╔════╝ ██╔═══██╗██╔════╝██╔══██╗██║ ██║██╔══██╗██║ ██║██╔════╝
|
||
███████╗ ██║ █████╗ ██║ ███╗██║ ██║███████╗███████║██║ ██║██████╔╝██║ ██║███████╗
|
||
╚════██║ ██║ ██╔══╝ ██║ ██║██║ ██║╚════██║██╔══██║██║ ██║██╔══██╗██║ ██║╚════██║
|
||
███████║ ██║ ███████╗╚██████╔╝╚██████╔╝███████║██║ ██║╚██████╔╝██║ ██║╚██████╔╝███████║
|
||
╚══════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
|
||
W R E C K S v3.0
|
||
"""
|
||
|
||
STEGO_ASCII = """
|
||
__
|
||
/ '-.
|
||
/ .-. | STEGOSAURUS
|
||
/.' \\| WRECKS
|
||
// |\\ \\
|
||
|| | \\ | >> LSB Suite <<
|
||
/|| | \\ |
|
||
/ ||__/ \\/
|
||
\\ --' |\\_\\
|
||
'._____.' \\/
|
||
"""
|
||
|
||
|
||
# ============== STATE MANAGEMENT ==============
|
||
|
||
class AppState:
|
||
"""Global application state"""
|
||
def __init__(self):
|
||
# Encode state
|
||
self.carrier_image: Optional[Image.Image] = None
|
||
self.carrier_path: Optional[str] = None
|
||
self.encoded_image: Optional[Image.Image] = None
|
||
self.encode_file_data: Optional[bytes] = None
|
||
self.encode_file_name: Optional[str] = None
|
||
self.encode_result: Optional[Image.Image] = None
|
||
# Decode state
|
||
self.decode_image: Optional[Image.Image] = None
|
||
self.detected_config: Optional[dict] = None
|
||
self.extracted_file: Optional[bytes] = None
|
||
self.extracted_filename: Optional[str] = None
|
||
# Analyze state
|
||
self.analyze_image: Optional[Image.Image] = None
|
||
# General
|
||
self.advanced_mode: bool = False
|
||
self.capacity_info: dict = {}
|
||
# UI state - track active tab across reloads
|
||
self.active_tab: str = 'encode'
|
||
# 🪆 MATRYOSHKA MODE - Russian nesting doll steganography
|
||
self.matryoshka_mode: bool = False
|
||
self.matryoshka_depth: int = 3 # Default depth (1-11)
|
||
# Store nested decode results
|
||
self.matryoshka_results: list = []
|
||
# Matryoshka encoding - carrier images (outermost first)
|
||
self.matryoshka_carriers: list = [] # List of (Image, filename) tuples
|
||
self.matryoshka_encode_result: Optional[Image.Image] = None
|
||
|
||
state = AppState()
|
||
|
||
|
||
def generate_blank_image(width: int = 1024, height: int = 1024, color: str = 'noise') -> Image.Image:
|
||
"""Generate a blank or noise image for steganography"""
|
||
import numpy as np
|
||
|
||
if color == 'noise':
|
||
# Random noise - best for hiding data
|
||
pixels = np.random.randint(0, 256, (height, width, 4), dtype=np.uint8)
|
||
pixels[:, :, 3] = 255 # Full alpha
|
||
return Image.fromarray(pixels, 'RGBA')
|
||
elif color == 'black':
|
||
return Image.new('RGBA', (width, height), (0, 0, 0, 255))
|
||
elif color == 'white':
|
||
return Image.new('RGBA', (width, height), (255, 255, 255, 255))
|
||
else:
|
||
# Gradient - looks nicer
|
||
pixels = np.zeros((height, width, 4), dtype=np.uint8)
|
||
for y in range(height):
|
||
for x in range(width):
|
||
pixels[y, x] = [
|
||
int(255 * x / width),
|
||
int(255 * y / height),
|
||
128,
|
||
255
|
||
]
|
||
return Image.fromarray(pixels, 'RGBA')
|
||
|
||
|
||
# ============== HELPER FUNCTIONS ==============
|
||
|
||
def image_to_base64(img: Image.Image, format: str = "PNG") -> str:
|
||
"""Convert PIL Image to base64 string for display"""
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format=format)
|
||
return base64.b64encode(buffer.getvalue()).decode()
|
||
|
||
|
||
def base64_to_image(b64: str) -> Image.Image:
|
||
"""Convert base64 string to PIL Image"""
|
||
# Handle data URL format
|
||
if ',' in b64:
|
||
b64 = b64.split(',')[1]
|
||
data = base64.b64decode(b64)
|
||
return Image.open(io.BytesIO(data))
|
||
|
||
|
||
def format_size(size_bytes: int) -> str:
|
||
"""Format bytes to human readable"""
|
||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||
if size_bytes < 1024:
|
||
return f"{size_bytes:.1f} {unit}"
|
||
size_bytes /= 1024
|
||
return f"{size_bytes:.1f} TB"
|
||
|
||
|
||
# ============== SMART DECODE / APERI'SOLVE STYLE ==============
|
||
|
||
def detect_coherent_text(data: bytes, min_printable_ratio: float = 0.85) -> dict:
|
||
"""
|
||
Detect if data looks like coherent readable text.
|
||
Returns analysis dict with confidence and detected text preview.
|
||
Properly handles Unicode text, not just ASCII.
|
||
"""
|
||
if not data or len(data) == 0:
|
||
return {"is_text": False, "confidence": 0, "preview": "", "reason": "empty"}
|
||
|
||
# Try UTF-8 decode
|
||
try:
|
||
text = data.decode('utf-8', errors='strict')
|
||
except:
|
||
try:
|
||
text = data.decode('latin-1', errors='replace')
|
||
except:
|
||
return {"is_text": False, "confidence": 0, "preview": "", "reason": "decode_failed"}
|
||
|
||
if len(text) == 0:
|
||
return {"is_text": False, "confidence": 0, "preview": "", "reason": "empty_text"}
|
||
|
||
# Calculate printable character ratio - Unicode aware
|
||
# Count chars that are NOT control characters as printable
|
||
# Control chars: 0x00-0x1F (except \t, \n, \r), 0x7F, 0x80-0x9F
|
||
def is_printable_unicode(c):
|
||
code = ord(c)
|
||
# Whitespace is fine
|
||
if c in '\t\n\r ':
|
||
return True
|
||
# ASCII control chars
|
||
if code <= 0x1F or code == 0x7F:
|
||
return False
|
||
# C1 control chars
|
||
if 0x80 <= code <= 0x9F:
|
||
return False
|
||
# Everything else (printable ASCII, Latin-1 supplement, Unicode) is OK
|
||
return True
|
||
|
||
printable_count = sum(1 for c in text[:500] if is_printable_unicode(c))
|
||
printable_ratio = printable_count / min(len(text), 500)
|
||
|
||
# Check for null bytes in first 100 chars (binary indicator)
|
||
null_count = text[:100].count('\x00')
|
||
has_nulls = null_count > 2
|
||
|
||
# Check for common words (English)
|
||
common_words = ['the', 'and', 'is', 'in', 'to', 'of', 'a', 'for', 'on', 'that',
|
||
'this', 'with', 'are', 'be', 'as', 'at', 'by', 'or', 'an', 'it',
|
||
'from', 'was', 'have', 'has', 'not', 'but', 'what', 'all', 'were',
|
||
'flag', 'ctf', 'secret', 'hidden', 'password', 'key', 'message']
|
||
text_lower = text.lower()
|
||
word_matches = sum(1 for w in common_words if w in text_lower)
|
||
|
||
# Check for repeated patterns (often indicates noise)
|
||
is_repetitive = False
|
||
if len(text) > 20:
|
||
first_20 = text[:20]
|
||
repetition = text.count(first_20)
|
||
is_repetitive = repetition > 2
|
||
|
||
# Calculate entropy (lower = more structured/text-like)
|
||
char_freq = {}
|
||
for c in text[:500]: # Sample first 500 chars
|
||
char_freq[c] = char_freq.get(c, 0) + 1
|
||
total = sum(char_freq.values())
|
||
entropy = -sum((f/total) * np.log2(f/total) for f in char_freq.values() if f > 0)
|
||
|
||
# Score calculation
|
||
confidence = 0
|
||
|
||
# Printable ratio is crucial
|
||
if printable_ratio >= 0.95:
|
||
confidence += 40
|
||
elif printable_ratio >= 0.85:
|
||
confidence += 25
|
||
elif printable_ratio >= 0.70:
|
||
confidence += 10
|
||
|
||
# Word matches boost confidence
|
||
confidence += min(30, word_matches * 5)
|
||
|
||
# Low entropy (structured text) is good
|
||
if entropy < 4.5:
|
||
confidence += 15
|
||
elif entropy < 5.5:
|
||
confidence += 5
|
||
|
||
# Penalties
|
||
if has_nulls:
|
||
confidence -= 30
|
||
if len(text) > 20 and is_repetitive:
|
||
confidence -= 20
|
||
|
||
# Clean preview - keep Unicode, only replace control chars
|
||
preview_text = text[:200]
|
||
preview_clean = ''.join(c if is_printable_unicode(c) else '·' for c in preview_text)
|
||
|
||
is_text = confidence >= 35 and printable_ratio >= min_printable_ratio
|
||
|
||
return {
|
||
"is_text": is_text,
|
||
"confidence": max(0, min(100, confidence)),
|
||
"preview": preview_clean,
|
||
"printable_ratio": printable_ratio,
|
||
"entropy": entropy,
|
||
"word_matches": word_matches,
|
||
"reason": "looks_like_text" if is_text else "binary_data"
|
||
}
|
||
|
||
|
||
def extract_raw_lsb(image: Image.Image, channels: list, bits: int = 1,
|
||
bit_offset: int = 0, max_bytes: int = 4096,
|
||
strategy: EncodingStrategy = EncodingStrategy.INTERLEAVED,
|
||
seed: int = None) -> bytes:
|
||
"""
|
||
Extract raw LSB data without requiring STEG header.
|
||
For blind extraction like Aperi'Solve does.
|
||
|
||
Args:
|
||
image: PIL Image to extract from
|
||
channels: List of Channel enums to extract from
|
||
bits: Bits per channel (1-8)
|
||
bit_offset: Bit offset within each channel
|
||
max_bytes: Maximum bytes to extract
|
||
strategy: Encoding strategy used (SEQUENTIAL, INTERLEAVED, SPREAD, RANDOMIZED)
|
||
seed: Random seed for RANDOMIZED strategy
|
||
"""
|
||
img = image.convert("RGBA")
|
||
pixels = np.array(img, dtype=np.uint8)
|
||
height, width = pixels.shape[:2]
|
||
total_pixels = height * width
|
||
flat_pixels = pixels.reshape(-1, 4)
|
||
|
||
# Create simple extraction config
|
||
channel_indices = np.array([c.value for c in channels], dtype=np.uint8)
|
||
num_channels = len(channels)
|
||
|
||
# Calculate how many units we need
|
||
bits_needed = max_bytes * 8
|
||
units_needed = bits_needed // bits
|
||
if bits_needed % bits:
|
||
units_needed += 1
|
||
|
||
bit_mask = ((1 << bits) - 1) << bit_offset
|
||
result = np.zeros(units_needed, dtype=np.uint8)
|
||
|
||
if strategy == EncodingStrategy.INTERLEAVED:
|
||
# Cycle through channels at each pixel
|
||
pixels_needed = (units_needed + num_channels - 1) // num_channels
|
||
pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed)
|
||
|
||
unit_idx = 0
|
||
for pix_idx in pixel_indices:
|
||
for ch in channel_indices:
|
||
if unit_idx >= units_needed:
|
||
break
|
||
value = flat_pixels[pix_idx, ch]
|
||
result[unit_idx] = (value & bit_mask) >> bit_offset
|
||
unit_idx += 1
|
||
if unit_idx >= units_needed:
|
||
break
|
||
|
||
elif strategy == EncodingStrategy.SEQUENTIAL:
|
||
# Fill each channel completely before moving to next
|
||
unit_idx = 0
|
||
for ch in channel_indices:
|
||
pixels_for_channel = min(total_pixels, units_needed - unit_idx)
|
||
pixel_indices = _generate_pixel_indices(total_pixels, pixels_for_channel, strategy, seed)
|
||
for pix_idx in pixel_indices:
|
||
if unit_idx >= units_needed:
|
||
break
|
||
value = flat_pixels[pix_idx, ch]
|
||
result[unit_idx] = (value & bit_mask) >> bit_offset
|
||
unit_idx += 1
|
||
if unit_idx >= units_needed:
|
||
break
|
||
|
||
elif strategy == EncodingStrategy.SPREAD:
|
||
# Spread evenly across image
|
||
pixels_needed = (units_needed + num_channels - 1) // num_channels
|
||
pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed)
|
||
|
||
unit_idx = 0
|
||
for pix_idx in pixel_indices:
|
||
for ch in channel_indices:
|
||
if unit_idx >= units_needed:
|
||
break
|
||
value = flat_pixels[pix_idx, ch]
|
||
result[unit_idx] = (value & bit_mask) >> bit_offset
|
||
unit_idx += 1
|
||
if unit_idx >= units_needed:
|
||
break
|
||
|
||
elif strategy == EncodingStrategy.RANDOMIZED:
|
||
# Pseudo-random order (seeded for reproducibility)
|
||
pixels_needed = (units_needed + num_channels - 1) // num_channels
|
||
pixel_indices = _generate_pixel_indices(total_pixels, min(pixels_needed, total_pixels), strategy, seed or 42)
|
||
|
||
unit_idx = 0
|
||
for pix_idx in pixel_indices:
|
||
for ch in channel_indices:
|
||
if unit_idx >= units_needed:
|
||
break
|
||
value = flat_pixels[pix_idx, ch]
|
||
result[unit_idx] = (value & bit_mask) >> bit_offset
|
||
unit_idx += 1
|
||
if unit_idx >= units_needed:
|
||
break
|
||
|
||
# Convert to bytes
|
||
return _bits_array_to_bytes(result, bits, bits_needed)[:max_bytes]
|
||
|
||
|
||
def smart_scan_image(image: Image.Image, password: str = None) -> list:
|
||
"""
|
||
Aperi'Solve-style multi-config scan.
|
||
Tries multiple channel/bit/strategy combinations and reports findings.
|
||
"""
|
||
results = []
|
||
|
||
# Base channel/bit configurations (ordered by likelihood)
|
||
base_configs = [
|
||
# Common single channels - 1 bit
|
||
{"name": "R", "channels": [Channel.R], "bits": 1},
|
||
{"name": "G", "channels": [Channel.G], "bits": 1},
|
||
{"name": "B", "channels": [Channel.B], "bits": 1},
|
||
{"name": "A", "channels": [Channel.A], "bits": 1},
|
||
# Common single channels - 2 bit
|
||
{"name": "R-2bit", "channels": [Channel.R], "bits": 2},
|
||
{"name": "G-2bit", "channels": [Channel.G], "bits": 2},
|
||
{"name": "B-2bit", "channels": [Channel.B], "bits": 2},
|
||
{"name": "A-2bit", "channels": [Channel.A], "bits": 2},
|
||
# RGB combos - 1 and 2 bit
|
||
{"name": "RGB", "channels": [Channel.R, Channel.G, Channel.B], "bits": 1},
|
||
{"name": "RGB-2bit", "channels": [Channel.R, Channel.G, Channel.B], "bits": 2},
|
||
# RGBA - 1 and 2 bit
|
||
{"name": "RGBA", "channels": [Channel.R, Channel.G, Channel.B, Channel.A], "bits": 1},
|
||
{"name": "RGBA-2bit", "channels": [Channel.R, Channel.G, Channel.B, Channel.A], "bits": 2},
|
||
# Two channel combos - 1 bit
|
||
{"name": "RG", "channels": [Channel.R, Channel.G], "bits": 1},
|
||
{"name": "RB", "channels": [Channel.R, Channel.B], "bits": 1},
|
||
{"name": "GB", "channels": [Channel.G, Channel.B], "bits": 1},
|
||
{"name": "RA", "channels": [Channel.R, Channel.A], "bits": 1},
|
||
{"name": "GA", "channels": [Channel.G, Channel.A], "bits": 1},
|
||
{"name": "BA", "channels": [Channel.B, Channel.A], "bits": 1},
|
||
# Two channel combos - 2 bit
|
||
{"name": "RG-2bit", "channels": [Channel.R, Channel.G], "bits": 2},
|
||
{"name": "RB-2bit", "channels": [Channel.R, Channel.B], "bits": 2},
|
||
{"name": "GB-2bit", "channels": [Channel.G, Channel.B], "bits": 2},
|
||
{"name": "RA-2bit", "channels": [Channel.R, Channel.A], "bits": 2},
|
||
{"name": "GA-2bit", "channels": [Channel.G, Channel.A], "bits": 2},
|
||
{"name": "BA-2bit", "channels": [Channel.B, Channel.A], "bits": 2},
|
||
# Higher bit depths (4-bit)
|
||
{"name": "R-4bit", "channels": [Channel.R], "bits": 4},
|
||
{"name": "RGB-4bit", "channels": [Channel.R, Channel.G, Channel.B], "bits": 4},
|
||
]
|
||
|
||
# All encoding strategies to try
|
||
strategies_to_try = [
|
||
{"name": "interleaved", "strategy": EncodingStrategy.INTERLEAVED},
|
||
{"name": "sequential", "strategy": EncodingStrategy.SEQUENTIAL},
|
||
{"name": "spread", "strategy": EncodingStrategy.SPREAD},
|
||
{"name": "randomized", "strategy": EncodingStrategy.RANDOMIZED},
|
||
]
|
||
|
||
# Generate all combinations of base configs and strategies
|
||
configs_to_try = []
|
||
for base in base_configs:
|
||
for strat in strategies_to_try:
|
||
# Interleaved is default, so omit suffix for cleaner names
|
||
if strat["strategy"] == EncodingStrategy.INTERLEAVED:
|
||
name = base["name"]
|
||
else:
|
||
name = f"{base['name']}-{strat['name']}"
|
||
configs_to_try.append({
|
||
"name": name,
|
||
"channels": base["channels"],
|
||
"bits": base["bits"],
|
||
"strategy": strat["strategy"],
|
||
})
|
||
|
||
# First check for STEG header (our own format)
|
||
detection = detect_encoding(image)
|
||
if detection:
|
||
results.append({
|
||
"name": f"STEG-HEADER ({detection['config']['channels']})",
|
||
"channels": detection['config']['channels'],
|
||
"bits": detection['config']['bits_per_channel'],
|
||
"has_header": True,
|
||
"is_text": True,
|
||
"confidence": 95,
|
||
"preview": f"[STEG v3 detected - {format_size(detection['original_length'])}]",
|
||
"payload_size": detection['original_length'],
|
||
"status": "STEG_DETECTED",
|
||
"can_decode": True,
|
||
})
|
||
|
||
# Scan each config
|
||
for cfg in configs_to_try:
|
||
try:
|
||
raw_data = extract_raw_lsb(
|
||
image,
|
||
channels=cfg["channels"],
|
||
bits=cfg["bits"],
|
||
max_bytes=2048, # Sample first 2KB
|
||
strategy=cfg["strategy"]
|
||
)
|
||
|
||
# Check for STEG magic at start
|
||
has_magic = raw_data[:4] == b'STEG'
|
||
|
||
# Decrypt if password provided
|
||
data_to_check = raw_data
|
||
if password and not has_magic:
|
||
try:
|
||
data_to_check = crypto.decrypt(raw_data, password)
|
||
except:
|
||
pass # Decryption failed, use raw
|
||
|
||
# Analyze for coherent text
|
||
text_analysis = detect_coherent_text(data_to_check)
|
||
|
||
# Determine status
|
||
if has_magic:
|
||
status = "STEG_HEADER"
|
||
elif text_analysis["is_text"] and text_analysis["confidence"] >= 50:
|
||
status = "TEXT_FOUND"
|
||
elif text_analysis["is_text"]:
|
||
status = "POSSIBLE_TEXT"
|
||
elif text_analysis["printable_ratio"] > 0.5:
|
||
status = "MIXED_DATA"
|
||
else:
|
||
status = "BINARY/NOISE"
|
||
|
||
results.append({
|
||
"name": cfg["name"],
|
||
"channels": [c.name for c in cfg["channels"]],
|
||
"bits": cfg["bits"],
|
||
"strategy": cfg["strategy"].value,
|
||
"has_header": has_magic,
|
||
"is_text": text_analysis["is_text"],
|
||
"confidence": text_analysis["confidence"],
|
||
"preview": text_analysis["preview"][:100] if text_analysis["preview"] else raw_data[:50].hex(),
|
||
"printable_ratio": text_analysis.get("printable_ratio", 0),
|
||
"entropy": text_analysis.get("entropy", 8),
|
||
"status": status,
|
||
"can_decode": has_magic or text_analysis["is_text"],
|
||
"raw_data": raw_data if (has_magic or text_analysis["confidence"] >= 30) else None,
|
||
})
|
||
|
||
except Exception as e:
|
||
results.append({
|
||
"name": cfg["name"],
|
||
"channels": [c.name for c in cfg["channels"]],
|
||
"bits": cfg["bits"],
|
||
"strategy": cfg["strategy"].value,
|
||
"has_header": False,
|
||
"is_text": False,
|
||
"confidence": 0,
|
||
"preview": f"Error: {str(e)[:50]}",
|
||
"status": "ERROR",
|
||
"can_decode": False,
|
||
})
|
||
|
||
# Sort by confidence (highest first), then by whether text was found
|
||
results.sort(key=lambda x: (
|
||
x.get("has_header", False), # STEG headers first
|
||
x.get("status") == "TEXT_FOUND",
|
||
x.get("confidence", 0)
|
||
), reverse=True)
|
||
|
||
return results
|
||
|
||
|
||
# ============== 🪆 MATRYOSHKA MODE - RECURSIVE STEG ==============
|
||
|
||
def is_image_data(data: bytes) -> bool:
|
||
"""Check if bytes look like an image file"""
|
||
if len(data) < 8:
|
||
return False
|
||
# PNG magic bytes
|
||
if data[:8] == b'\x89PNG\r\n\x1a\n':
|
||
return True
|
||
# JPEG magic bytes
|
||
if data[:2] == b'\xff\xd8':
|
||
return True
|
||
# GIF magic bytes
|
||
if data[:6] in (b'GIF87a', b'GIF89a'):
|
||
return True
|
||
# BMP magic bytes
|
||
if data[:2] == b'BM':
|
||
return True
|
||
return False
|
||
|
||
|
||
# Valid file extensions for file format detection
|
||
VALID_FILE_EXTENSIONS = {
|
||
# Images
|
||
'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp', 'ico', 'svg', 'tiff', 'tif',
|
||
# Documents
|
||
'txt', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'odt', 'rtf',
|
||
# Code
|
||
'py', 'js', 'ts', 'html', 'css', 'json', 'xml', 'yaml', 'yml', 'md', 'csv',
|
||
'java', 'c', 'cpp', 'h', 'hpp', 'rs', 'go', 'rb', 'php', 'sh', 'bash',
|
||
# Archives
|
||
'zip', 'tar', 'gz', 'bz2', '7z', 'rar',
|
||
# Media
|
||
'mp3', 'mp4', 'wav', 'avi', 'mkv', 'mov', 'flac', 'ogg',
|
||
# Other
|
||
'bin', 'dat', 'exe', 'dll', 'so', 'key', 'pem', 'crt',
|
||
}
|
||
|
||
|
||
def extract_file_from_data(data: bytes) -> tuple:
|
||
"""
|
||
Extract filename and file data from encoded bytes.
|
||
Returns (filename, file_data) or (None, data) if not a file format.
|
||
|
||
File format: <length_byte><filename><file_data>
|
||
Where length_byte is the length of the filename (1-100 bytes).
|
||
"""
|
||
if len(data) < 3:
|
||
return (None, data)
|
||
|
||
fname_len = data[0]
|
||
|
||
# Filename length must be reasonable (3-100 chars for "a.b" to reasonable max)
|
||
if fname_len < 3 or fname_len > 100:
|
||
return (None, data)
|
||
|
||
# Must have enough data for filename + at least 1 byte of content
|
||
if len(data) < fname_len + 2:
|
||
return (None, data)
|
||
|
||
try:
|
||
filename = data[1:1+fname_len].decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
return (None, data)
|
||
|
||
# Validate filename structure
|
||
if '.' not in filename:
|
||
return (None, data)
|
||
|
||
# Check for invalid characters (only allow alphanumeric, ., -, _, space)
|
||
if not re.match(r'^[\w\-. ]+$', filename):
|
||
return (None, data)
|
||
|
||
# Filename must not start with . or space
|
||
if filename[0] in '. ':
|
||
return (None, data)
|
||
|
||
# Extract and validate extension
|
||
ext = filename.rsplit('.', 1)[-1].lower()
|
||
if ext not in VALID_FILE_EXTENSIONS:
|
||
return (None, data)
|
||
|
||
# All checks passed - this looks like a real file
|
||
file_data = data[1+fname_len:]
|
||
return (filename, file_data)
|
||
|
||
|
||
def matryoshka_decode(image: Image.Image, max_depth: int = 3, password: str = None,
|
||
current_depth: int = 0) -> list:
|
||
"""
|
||
🪆 Recursively decode nested steganographic images.
|
||
|
||
Args:
|
||
image: The image to decode
|
||
max_depth: Maximum recursion depth (1-11)
|
||
password: Optional decryption password
|
||
current_depth: Current recursion level (internal)
|
||
|
||
Returns:
|
||
List of extraction results at each layer
|
||
"""
|
||
results = []
|
||
layer_info = {
|
||
"depth": current_depth,
|
||
"type": "unknown",
|
||
"filename": None,
|
||
"data_size": 0,
|
||
"preview": "",
|
||
"has_nested": False,
|
||
"nested_results": [],
|
||
}
|
||
|
||
if current_depth >= max_depth:
|
||
layer_info["type"] = "max_depth_reached"
|
||
layer_info["preview"] = f"⚠️ Max depth ({max_depth}) reached"
|
||
results.append(layer_info)
|
||
return results
|
||
|
||
try:
|
||
# Try auto-decode first (looks for STEG header)
|
||
try:
|
||
data = decode(image, None)
|
||
layer_info["type"] = "steg_header"
|
||
except:
|
||
# If no STEG header, try smart scan
|
||
scan_results = smart_scan_image(image, password)
|
||
best_result = None
|
||
for r in scan_results:
|
||
if r.get("status") in ["STEG_DETECTED", "STEG_HEADER", "TEXT_FOUND"]:
|
||
best_result = r
|
||
break
|
||
|
||
if best_result and best_result.get("raw_data"):
|
||
data = best_result["raw_data"]
|
||
layer_info["type"] = f"smart_scan_{best_result['name']}"
|
||
else:
|
||
layer_info["type"] = "no_data_found"
|
||
layer_info["preview"] = "No hidden data detected"
|
||
results.append(layer_info)
|
||
return results
|
||
|
||
# Decrypt if password provided
|
||
if password:
|
||
try:
|
||
data = crypto.decrypt(data, password)
|
||
except:
|
||
pass # Decryption failed, use raw data
|
||
|
||
layer_info["data_size"] = len(data)
|
||
|
||
# Check if it's a file
|
||
filename, file_data = extract_file_from_data(data)
|
||
|
||
if filename:
|
||
layer_info["filename"] = filename
|
||
layer_info["data_size"] = len(file_data)
|
||
|
||
# Check if the extracted file is an image
|
||
if is_image_data(file_data):
|
||
layer_info["type"] = "nested_image"
|
||
layer_info["has_nested"] = True
|
||
|
||
# Recursively decode the nested image
|
||
try:
|
||
nested_img = Image.open(io.BytesIO(file_data))
|
||
nested_results = matryoshka_decode(
|
||
nested_img,
|
||
max_depth=max_depth,
|
||
password=password,
|
||
current_depth=current_depth + 1
|
||
)
|
||
layer_info["nested_results"] = nested_results
|
||
layer_info["preview"] = f"🪆 Found nested image: {filename}"
|
||
except Exception as e:
|
||
layer_info["preview"] = f"📁 Image file: {filename} (failed to recurse: {e})"
|
||
else:
|
||
layer_info["type"] = "file"
|
||
# Try to show preview of text files
|
||
ext = filename.split('.')[-1].lower() if '.' in filename else ''
|
||
if ext in ['txt', 'md', 'json', 'xml', 'html', 'css', 'js', 'py', 'csv']:
|
||
try:
|
||
layer_info["preview"] = file_data[:200].decode('utf-8')
|
||
except:
|
||
layer_info["preview"] = f"📁 Binary file: {filename}"
|
||
else:
|
||
layer_info["preview"] = f"📁 File: {filename} ({format_size(len(file_data))})"
|
||
else:
|
||
# Raw data - check if it's an image
|
||
if is_image_data(data):
|
||
layer_info["type"] = "nested_image_raw"
|
||
layer_info["has_nested"] = True
|
||
|
||
try:
|
||
nested_img = Image.open(io.BytesIO(data))
|
||
nested_results = matryoshka_decode(
|
||
nested_img,
|
||
max_depth=max_depth,
|
||
password=password,
|
||
current_depth=current_depth + 1
|
||
)
|
||
layer_info["nested_results"] = nested_results
|
||
layer_info["preview"] = "🪆 Found raw nested image data"
|
||
except Exception as e:
|
||
layer_info["preview"] = f"Image data (failed to recurse: {e})"
|
||
else:
|
||
# Try as text
|
||
try:
|
||
text = data.decode('utf-8')
|
||
layer_info["type"] = "text"
|
||
layer_info["preview"] = text[:300]
|
||
except:
|
||
layer_info["type"] = "binary"
|
||
layer_info["preview"] = f"Binary data: {data[:50].hex()}..."
|
||
|
||
# Store raw data for download
|
||
layer_info["raw_data"] = file_data if filename else data
|
||
|
||
except Exception as e:
|
||
layer_info["type"] = "error"
|
||
layer_info["preview"] = f"Error: {str(e)}"
|
||
|
||
results.append(layer_info)
|
||
return results
|
||
|
||
|
||
def matryoshka_encode(payload: bytes, carriers: list, config = None,
|
||
password: str = None) -> tuple:
|
||
"""
|
||
🪆 Recursively encode nested steganographic images.
|
||
|
||
Creates a "Russian nesting doll" of hidden data, encoding the payload
|
||
into the innermost carrier, then that result into the next carrier, etc.
|
||
|
||
Args:
|
||
payload: The data to hide (innermost secret)
|
||
carriers: List of (Image, filename) tuples - innermost carrier FIRST
|
||
config: StegConfig to use for all layers (or None for auto)
|
||
password: Optional encryption password
|
||
|
||
Returns:
|
||
Tuple of (final_image, layer_info_list)
|
||
"""
|
||
if not carriers:
|
||
raise ValueError("At least one carrier image is required")
|
||
|
||
if config is None:
|
||
config = create_config(channels='RGBA', bits=2) # Default: good capacity
|
||
|
||
layer_info = []
|
||
current_data = payload
|
||
|
||
# Encode from innermost to outermost
|
||
for i, (carrier_img, carrier_name) in enumerate(carriers):
|
||
layer_num = i + 1
|
||
|
||
# Calculate capacity
|
||
capacity = _calculate_capacity_bytes(carrier_img, config)
|
||
data_size = len(current_data)
|
||
|
||
layer_info.append({
|
||
'layer': layer_num,
|
||
'carrier': carrier_name,
|
||
'capacity': capacity,
|
||
'payload_size': data_size,
|
||
'fits': data_size <= capacity
|
||
})
|
||
|
||
if data_size > capacity:
|
||
raise ValueError(f"Layer {layer_num} ({carrier_name}): payload {data_size} bytes exceeds capacity {capacity} bytes")
|
||
|
||
# Encrypt if password provided (only innermost layer or all?)
|
||
data_to_encode = current_data
|
||
if password and i == 0: # Only encrypt the innermost payload
|
||
data_to_encode = crypto.encrypt(current_data, password)
|
||
|
||
# Encode current data into this carrier
|
||
encoded_img = encode(carrier_img, data_to_encode, config)
|
||
|
||
# If there are more carriers, convert this to PNG bytes for next layer
|
||
if i < len(carriers) - 1:
|
||
buffer = io.BytesIO()
|
||
encoded_img.save(buffer, format='PNG')
|
||
current_data = buffer.getvalue()
|
||
layer_info[-1]['output_size'] = len(current_data)
|
||
else:
|
||
# Final layer - return the image
|
||
layer_info[-1]['output_size'] = 'final'
|
||
|
||
return encoded_img, layer_info
|
||
|
||
|
||
def _calculate_capacity_bytes(image: Image.Image, config) -> int:
|
||
"""Calculate byte capacity for an image with given config (simplified, returns int)"""
|
||
width, height = image.size
|
||
channels = len(config.channels)
|
||
bits = config.bits_per_channel
|
||
# Account for STEG header overhead (~48 bytes)
|
||
raw_capacity = (width * height * channels * bits) // 8
|
||
return max(0, raw_capacity - 64) # Reserve space for header
|
||
|
||
|
||
# ============== UI COMPONENTS ==============
|
||
|
||
def create_header():
|
||
"""Create the main header with ASCII art"""
|
||
with ui.column().classes('w-full items-center'):
|
||
ui.html(f'<pre style="color: #00ff00; font-size: 8px; line-height: 1.1; text-align: center;">{BANNER_ASCII}</pre>', sanitize=False)
|
||
ui.label('🦕 Ultimate LSB Steganography Suite 🦕').classes('text-cyan-400 text-lg')
|
||
|
||
|
||
def create_channel_selector(advanced: bool = False):
|
||
"""Create channel selection UI"""
|
||
if advanced:
|
||
# All 15 presets
|
||
options = list(CHANNEL_PRESETS.keys())
|
||
else:
|
||
# Simple options
|
||
options = ['RGB', 'RGBA', 'R', 'G', 'B']
|
||
return options
|
||
|
||
|
||
def create_capacity_display(capacity_label: ui.label, image: Image.Image, config_dict: dict):
|
||
"""Update capacity display"""
|
||
if image is None:
|
||
capacity_label.set_text('No image loaded')
|
||
return
|
||
|
||
config = create_config(**config_dict)
|
||
cap = calculate_capacity(image, config)
|
||
|
||
text = f"📊 Capacity: {cap['human']} ({cap['usable_bytes']:,} bytes)"
|
||
capacity_label.set_text(text)
|
||
state.capacity_info = cap
|
||
|
||
|
||
def create_file_picker(element_id: str, accept: str, endpoint: str, tab: str = 'encode'):
|
||
"""Create a custom file picker that works with Python 3.9
|
||
|
||
Returns tuple of (html, javascript) - call ui.html() for html and ui.add_body_html() for js
|
||
Args:
|
||
element_id: Unique ID for the element
|
||
accept: File types to accept
|
||
endpoint: API endpoint to POST to
|
||
tab: Tab name to restore after reload ('encode', 'decode', 'analyze')
|
||
"""
|
||
# Cleaner accept display
|
||
accept_display = accept.replace(',', ' · ').replace('.', '').upper() if accept != '*' else 'ANY FILE'
|
||
|
||
html = f"""
|
||
<div class="drop-zone" id="{element_id}_zone" onclick="document.getElementById('{element_id}').click()">
|
||
<div style="color: #00ff41; font-size: 13px; font-weight: 500;">
|
||
<span style="opacity: 0.7;">↑</span> DROP FILE OR CLICK TO BROWSE
|
||
</div>
|
||
<div style="color: #555; font-size: 11px; margin-top: 6px; letter-spacing: 1px;">{accept_display}</div>
|
||
<input type="file" id="{element_id}" accept="{accept}" style="display: none;"
|
||
onchange="handleFileSelect_{element_id}(this.files[0])">
|
||
</div>
|
||
"""
|
||
|
||
js = f"""
|
||
<script>
|
||
function handleFileSelect_{element_id}(file) {{
|
||
if (!file) return;
|
||
const reader = new FileReader();
|
||
reader.onload = async function(e) {{
|
||
const base64 = e.target.result;
|
||
try {{
|
||
const response = await fetch('{endpoint}', {{
|
||
method: 'POST',
|
||
headers: {{'Content-Type': 'application/json'}},
|
||
body: JSON.stringify({{data: base64, filename: file.name, tab: '{tab}'}})
|
||
}});
|
||
const result = await response.json();
|
||
if (result.success) {{
|
||
// Reload with hash to preserve tab state
|
||
window.location.hash = '{tab}';
|
||
window.location.reload();
|
||
}} else {{
|
||
alert('Upload failed: ' + result.error);
|
||
}}
|
||
}} catch (err) {{
|
||
alert('Upload error: ' + err.message);
|
||
}}
|
||
}};
|
||
reader.readAsDataURL(file);
|
||
}}
|
||
|
||
// Drag and drop support - wait for DOM
|
||
setTimeout(function() {{
|
||
const zone = document.getElementById('{element_id}_zone');
|
||
if (!zone) return;
|
||
zone.addEventListener('dragover', (e) => {{
|
||
e.preventDefault();
|
||
zone.style.borderColor = '#00ffff';
|
||
zone.style.background = 'rgba(0, 255, 255, 0.1)';
|
||
}});
|
||
zone.addEventListener('dragleave', (e) => {{
|
||
zone.style.borderColor = '#00ff00';
|
||
zone.style.background = 'rgba(0, 255, 0, 0.05)';
|
||
}});
|
||
zone.addEventListener('drop', (e) => {{
|
||
e.preventDefault();
|
||
zone.style.borderColor = '#00ff00';
|
||
zone.style.background = 'rgba(0, 255, 0, 0.05)';
|
||
if (e.dataTransfer.files.length > 0) {{
|
||
handleFileSelect_{element_id}(e.dataTransfer.files[0]);
|
||
}}
|
||
}});
|
||
}}, 100);
|
||
</script>
|
||
"""
|
||
return html, js
|
||
|
||
|
||
# ============== API ENDPOINTS FOR FILE UPLOADS ==============
|
||
|
||
@app.post('/api/upload/carrier')
|
||
async def upload_carrier(request: Request):
|
||
"""Handle carrier image upload"""
|
||
try:
|
||
body = await request.json()
|
||
b64_data = body.get('data', '')
|
||
filename = body.get('filename', 'image.png')
|
||
tab = body.get('tab', 'encode')
|
||
|
||
state.carrier_image = base64_to_image(b64_data)
|
||
state.carrier_path = filename
|
||
state.active_tab = tab
|
||
return JSONResponse({'success': True, 'filename': filename})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/upload/payload')
|
||
async def upload_payload(request: Request):
|
||
"""Handle payload file upload"""
|
||
try:
|
||
body = await request.json()
|
||
b64_data = body.get('data', '')
|
||
filename = body.get('filename', 'file.bin')
|
||
tab = body.get('tab', 'encode')
|
||
|
||
# Decode base64 data
|
||
if ',' in b64_data:
|
||
b64_data = b64_data.split(',')[1]
|
||
state.encode_file_data = base64.b64decode(b64_data)
|
||
state.encode_file_name = filename
|
||
state.active_tab = tab
|
||
return JSONResponse({'success': True, 'filename': filename})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/upload/decode')
|
||
async def upload_decode(request: Request):
|
||
"""Handle decode image upload"""
|
||
try:
|
||
body = await request.json()
|
||
b64_data = body.get('data', '')
|
||
filename = body.get('filename', 'image.png')
|
||
tab = body.get('tab', 'decode')
|
||
|
||
state.decode_image = base64_to_image(b64_data)
|
||
state.active_tab = tab
|
||
|
||
# Try auto-detection
|
||
detection = detect_encoding(state.decode_image)
|
||
state.detected_config = detection
|
||
|
||
return JSONResponse({
|
||
'success': True,
|
||
'filename': filename,
|
||
'detected': detection is not None,
|
||
'config': detection['config'] if detection else None,
|
||
'payload_size': detection['original_length'] if detection else 0
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/upload/analyze')
|
||
async def upload_analyze(request: Request):
|
||
"""Handle analyze image upload"""
|
||
try:
|
||
body = await request.json()
|
||
b64_data = body.get('data', '')
|
||
filename = body.get('filename', 'image.png')
|
||
tab = body.get('tab', 'analyze')
|
||
|
||
state.analyze_image = base64_to_image(b64_data)
|
||
state.active_tab = tab
|
||
return JSONResponse({'success': True, 'filename': filename})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/matryoshka/toggle')
|
||
async def toggle_matryoshka(request: Request):
|
||
"""🪆 Toggle Matryoshka mode"""
|
||
try:
|
||
body = await request.json()
|
||
active = body.get('active', False)
|
||
state.matryoshka_mode = active
|
||
return JSONResponse({
|
||
'success': True,
|
||
'matryoshka_active': state.matryoshka_mode,
|
||
'depth': state.matryoshka_depth
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/matryoshka/decode')
|
||
async def matryoshka_decode_api(request: Request):
|
||
"""🪆 Perform recursive Matryoshka decode"""
|
||
try:
|
||
body = await request.json()
|
||
depth = body.get('depth', state.matryoshka_depth)
|
||
password = body.get('password', None)
|
||
|
||
if state.decode_image is None:
|
||
return JSONResponse({'success': False, 'error': 'No image loaded'})
|
||
|
||
results = matryoshka_decode(state.decode_image, max_depth=depth, password=password)
|
||
state.matryoshka_results = results
|
||
|
||
# Flatten results for JSON response
|
||
def flatten_results(res_list, level=0):
|
||
flat = []
|
||
for r in res_list:
|
||
flat.append({
|
||
'depth': r.get('depth', level),
|
||
'type': r.get('type', 'unknown'),
|
||
'filename': r.get('filename'),
|
||
'data_size': r.get('data_size', 0),
|
||
'preview': r.get('preview', '')[:500],
|
||
'has_nested': r.get('has_nested', False),
|
||
})
|
||
if r.get('nested_results'):
|
||
flat.extend(flatten_results(r['nested_results'], level + 1))
|
||
return flat
|
||
|
||
flat_results = flatten_results(results)
|
||
|
||
return JSONResponse({
|
||
'success': True,
|
||
'layers_found': len(flat_results),
|
||
'results': flat_results
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/matryoshka/add_carrier')
|
||
async def add_matryoshka_carrier(request: Request):
|
||
"""🪆 Add a carrier image to the Matryoshka stack"""
|
||
try:
|
||
body = await request.json()
|
||
b64_data = body.get('data', '')
|
||
filename = body.get('filename', 'carrier.png')
|
||
|
||
img = base64_to_image(b64_data)
|
||
state.matryoshka_carriers.append((img, filename))
|
||
|
||
# Calculate capacity for this carrier
|
||
config = create_config(channels='RGBA', bits=2)
|
||
capacity = _calculate_capacity_bytes(img, config)
|
||
|
||
return JSONResponse({
|
||
'success': True,
|
||
'carrier_count': len(state.matryoshka_carriers),
|
||
'filename': filename,
|
||
'dimensions': f"{img.size[0]}x{img.size[1]}",
|
||
'capacity': capacity
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.post('/api/matryoshka/clear_carriers')
|
||
async def clear_matryoshka_carriers():
|
||
"""🪆 Clear all Matryoshka carrier images"""
|
||
state.matryoshka_carriers = []
|
||
state.matryoshka_encode_result = None
|
||
return JSONResponse({'success': True, 'carrier_count': 0})
|
||
|
||
|
||
@app.post('/api/matryoshka/encode')
|
||
async def matryoshka_encode_api(request: Request):
|
||
"""🪆 Perform recursive Matryoshka encode"""
|
||
try:
|
||
body = await request.json()
|
||
password = body.get('password')
|
||
channels = body.get('channels', 'RGBA')
|
||
bits = body.get('bits', 2)
|
||
|
||
if not state.matryoshka_carriers:
|
||
return JSONResponse({'success': False, 'error': 'No carrier images added'})
|
||
|
||
# Get payload - either from encode_file_data or encode_text
|
||
if state.encode_file_data:
|
||
payload = state.encode_file_data
|
||
# Prepend filename if available
|
||
if state.encode_file_name:
|
||
fn_bytes = state.encode_file_name.encode('utf-8')
|
||
payload = bytes([len(fn_bytes)]) + fn_bytes + payload
|
||
elif hasattr(state, 'encode_text') and state.encode_text:
|
||
payload = state.encode_text.encode('utf-8')
|
||
else:
|
||
return JSONResponse({'success': False, 'error': 'No payload to encode. Upload a file or enter text first.'})
|
||
|
||
config = create_config(channels=channels, bits=bits)
|
||
|
||
# Carriers are stored outermost-first in UI, but we need innermost-first for encoding
|
||
# So reverse the list
|
||
carriers_reversed = list(reversed(state.matryoshka_carriers))
|
||
|
||
result_img, layer_info = matryoshka_encode(
|
||
payload=payload,
|
||
carriers=carriers_reversed,
|
||
config=config,
|
||
password=password
|
||
)
|
||
|
||
state.matryoshka_encode_result = result_img
|
||
|
||
# Convert result to base64 for preview
|
||
buffer = io.BytesIO()
|
||
result_img.save(buffer, format='PNG')
|
||
result_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||
|
||
return JSONResponse({
|
||
'success': True,
|
||
'layers': len(state.matryoshka_carriers),
|
||
'layer_info': layer_info,
|
||
'result_b64': f"data:image/png;base64,{result_b64}"
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({'success': False, 'error': str(e)})
|
||
|
||
|
||
@app.get('/api/matryoshka/download')
|
||
async def download_matryoshka_result():
|
||
"""🪆 Download the Matryoshka encoded result"""
|
||
if state.matryoshka_encode_result is None:
|
||
return JSONResponse({'success': False, 'error': 'No result to download'})
|
||
|
||
buffer = io.BytesIO()
|
||
state.matryoshka_encode_result.save(buffer, format='PNG')
|
||
buffer.seek(0)
|
||
|
||
return Response(
|
||
content=buffer.getvalue(),
|
||
media_type='image/png',
|
||
headers={'Content-Disposition': 'attachment; filename="matryoshka_encoded.png"'}
|
||
)
|
||
|
||
|
||
# ============== MAIN PAGE ==============
|
||
|
||
@ui.page('/')
|
||
async def main_page():
|
||
# Add custom CSS
|
||
ui.add_head_html(f'<style>{DARK_CSS}</style>')
|
||
ui.add_head_html('<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&display=swap" rel="stylesheet">')
|
||
|
||
# Matrix scanline overlay
|
||
ui.html('<div class="matrix-bg"></div>', sanitize=False)
|
||
|
||
with ui.column().classes('w-full max-w-6xl mx-auto p-4 gap-4'):
|
||
create_header()
|
||
|
||
# Tab navigation
|
||
with ui.tabs().classes('w-full') as tabs:
|
||
encode_tab = ui.tab('ENCODE', icon='lock').classes('text-green-400')
|
||
decode_tab = ui.tab('DECODE', icon='lock_open').classes('text-cyan-400')
|
||
analyze_tab = ui.tab('ANALYZE', icon='search').classes('text-yellow-400')
|
||
# 🪆 Hidden Matryoshka tab - revealed by easter egg
|
||
matryoshka_tab = ui.tab('🪆 MATRYOSHKA', icon='auto_awesome').classes('text-pink-400')
|
||
matryoshka_tab.props('id="matryoshka-tab"')
|
||
matryoshka_tab.style('display: none;') # Hidden until activated
|
||
|
||
# Determine initial tab from state (set by API) or URL hash
|
||
tab_map = {
|
||
'encode': encode_tab,
|
||
'decode': decode_tab,
|
||
'analyze': analyze_tab,
|
||
'matryoshka': matryoshka_tab,
|
||
}
|
||
initial_tab = tab_map.get(state.active_tab, encode_tab)
|
||
|
||
with ui.tab_panels(tabs, value=initial_tab).classes('w-full') as panels:
|
||
|
||
# ==================== ENCODE PANEL ====================
|
||
with ui.tab_panel(encode_tab):
|
||
await create_encode_panel()
|
||
|
||
# ==================== DECODE PANEL ====================
|
||
with ui.tab_panel(decode_tab):
|
||
await create_decode_panel()
|
||
|
||
# ==================== ANALYZE PANEL ====================
|
||
with ui.tab_panel(analyze_tab):
|
||
await create_analyze_panel()
|
||
|
||
# ==================== MATRYOSHKA PANEL ====================
|
||
with ui.tab_panel(matryoshka_tab):
|
||
await create_matryoshka_panel()
|
||
|
||
# JavaScript to read URL hash on load and sync tabs
|
||
ui.add_body_html("""
|
||
<script>
|
||
// On page load, check URL hash and click the correct tab
|
||
document.addEventListener('DOMContentLoaded', function() {
|
||
const hash = window.location.hash.replace('#', '');
|
||
if (hash) {
|
||
// Find and click the tab button
|
||
const tabButtons = document.querySelectorAll('.q-tab');
|
||
tabButtons.forEach(btn => {
|
||
if (btn.textContent.trim().toLowerCase().includes(hash.toLowerCase())) {
|
||
btn.click();
|
||
}
|
||
});
|
||
}
|
||
});
|
||
|
||
// Update hash when tab changes
|
||
document.addEventListener('click', function(e) {
|
||
const tab = e.target.closest('.q-tab');
|
||
if (tab) {
|
||
const tabName = tab.textContent.trim().toLowerCase();
|
||
if (['encode', 'decode', 'analyze'].includes(tabName)) {
|
||
window.location.hash = tabName;
|
||
}
|
||
}
|
||
});
|
||
</script>
|
||
""")
|
||
|
||
# Footer
|
||
ui.html('<div style="text-align: center; color: #666; margin-top: 20px;">.-.-.-.-<={LOVE PLINY}=>-.-.-.-.</div>', sanitize=False)
|
||
|
||
# 🪆 MATRYOSHKA MODE - Easter egg trigger zone (bottom-left pixel)
|
||
ui.html('<div class="matryoshka-trigger" id="matryoshka-trigger" title="🪆"></div>', sanitize=False)
|
||
|
||
# Simple mode indicator (shown when active)
|
||
ui.html('''
|
||
<div id="matryoshka-indicator" class="hidden" style="
|
||
position: fixed; bottom: 20px; right: 20px; z-index: 9999;
|
||
background: linear-gradient(135deg, #1a0020 0%, #0d0015 100%);
|
||
border: 2px solid #ff00ff; border-radius: 8px; padding: 8px 16px;
|
||
cursor: pointer;
|
||
" onclick="deactivateMatryoshka()">
|
||
<span style="color: #ff00ff; font-weight: bold;">🪆 MATRYOSHKA MODE</span>
|
||
<span style="color: #ff99ff; font-size: 10px; margin-left: 8px;">click to deactivate</span>
|
||
</div>
|
||
''', sanitize=False)
|
||
|
||
# JavaScript to handle easter egg activation
|
||
ui.add_body_html("""
|
||
<script>
|
||
// 🪆 Matryoshka Mode Easter Egg
|
||
let matryoshkaActive = false;
|
||
|
||
function activateMatryoshka() {
|
||
matryoshkaActive = true;
|
||
|
||
// Show the hidden tab
|
||
const tab = document.getElementById('matryoshka-tab');
|
||
if (tab) {
|
||
tab.style.display = '';
|
||
// Click the tab to switch to it
|
||
setTimeout(() => tab.click(), 100);
|
||
}
|
||
|
||
// Show indicator
|
||
const indicator = document.getElementById('matryoshka-indicator');
|
||
if (indicator) indicator.classList.remove('hidden');
|
||
|
||
document.body.classList.add('matryoshka-active');
|
||
|
||
// Send activation to backend
|
||
fetch('/api/matryoshka/toggle', {
|
||
method: 'POST',
|
||
headers: {'Content-Type': 'application/json'},
|
||
body: JSON.stringify({active: true})
|
||
});
|
||
|
||
console.log('🪆 MATRYOSHKA MODE ACTIVATED 🪆');
|
||
}
|
||
|
||
function deactivateMatryoshka() {
|
||
matryoshkaActive = false;
|
||
|
||
// Hide the tab
|
||
const tab = document.getElementById('matryoshka-tab');
|
||
if (tab) tab.style.display = 'none';
|
||
|
||
// Hide indicator
|
||
const indicator = document.getElementById('matryoshka-indicator');
|
||
if (indicator) indicator.classList.add('hidden');
|
||
|
||
document.body.classList.remove('matryoshka-active');
|
||
|
||
// Switch back to encode tab
|
||
const encodeTab = document.querySelector('.q-tab');
|
||
if (encodeTab) encodeTab.click();
|
||
|
||
fetch('/api/matryoshka/toggle', {
|
||
method: 'POST',
|
||
headers: {'Content-Type': 'application/json'},
|
||
body: JSON.stringify({active: false})
|
||
});
|
||
|
||
console.log('🪆 Matryoshka mode deactivated');
|
||
}
|
||
|
||
(function() {
|
||
let attempts = 0;
|
||
const maxAttempts = 50;
|
||
|
||
function setupMatryoshka() {
|
||
const trigger = document.getElementById('matryoshka-trigger');
|
||
|
||
if (!trigger) {
|
||
attempts++;
|
||
if (attempts < maxAttempts) {
|
||
setTimeout(setupMatryoshka, 100);
|
||
}
|
||
return;
|
||
}
|
||
|
||
console.log('🪆 Matryoshka trigger initialized');
|
||
|
||
trigger.addEventListener('click', function(e) {
|
||
e.preventDefault();
|
||
e.stopPropagation();
|
||
|
||
if (!matryoshkaActive) {
|
||
activateMatryoshka();
|
||
} else {
|
||
deactivateMatryoshka();
|
||
}
|
||
});
|
||
|
||
// Touch events for mobile
|
||
trigger.addEventListener('touchend', function(e) {
|
||
e.preventDefault();
|
||
trigger.click();
|
||
});
|
||
}
|
||
|
||
if (document.readyState === 'loading') {
|
||
document.addEventListener('DOMContentLoaded', setupMatryoshka);
|
||
} else {
|
||
setTimeout(setupMatryoshka, 500);
|
||
}
|
||
})();
|
||
|
||
// 🪆 Matryoshka carrier upload handler
|
||
function addMatryoshkaCarrier(file) {
|
||
if (!file) return;
|
||
const reader = new FileReader();
|
||
reader.onload = async function(e) {
|
||
const base64 = e.target.result;
|
||
try {
|
||
const response = await fetch('/api/matryoshka/add_carrier', {
|
||
method: 'POST',
|
||
headers: {'Content-Type': 'application/json'},
|
||
body: JSON.stringify({data: base64, filename: file.name})
|
||
});
|
||
const result = await response.json();
|
||
if (result.success) {
|
||
// Reload to refresh carrier list
|
||
window.location.reload();
|
||
} else {
|
||
alert('Failed to add carrier: ' + result.error);
|
||
}
|
||
} catch (err) {
|
||
alert('Upload error: ' + err.message);
|
||
}
|
||
};
|
||
reader.readAsDataURL(file);
|
||
}
|
||
</script>
|
||
""")
|
||
|
||
|
||
async def create_encode_panel():
|
||
"""Create the encoding interface"""
|
||
|
||
# Use global state for persistence
|
||
global state
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
# Left column - Image upload and preview
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('📷 CARRIER IMAGE').classes('text-green-400 text-lg font-bold')
|
||
|
||
# Show current image if loaded
|
||
if state.carrier_image:
|
||
b64 = image_to_base64(state.carrier_image)
|
||
ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-64 object-contain')
|
||
w, h = state.carrier_image.size
|
||
ui.label(f'{state.carrier_path or "Generated"} | {w}x{h}').classes('text-green-400 text-sm')
|
||
|
||
# Show capacity
|
||
try:
|
||
config = create_config(channels='RGB', bits=1)
|
||
cap = calculate_capacity(state.carrier_image, config)
|
||
ui.label(f"📊 Capacity: {cap['human']} ({cap['usable_bytes']:,} bytes)").classes('text-cyan-400')
|
||
except:
|
||
pass
|
||
else:
|
||
ui.image().classes('w-full max-h-64 object-contain')
|
||
ui.label('No image loaded').classes('text-gray-500 text-sm')
|
||
ui.label('📊 Capacity: --').classes('text-cyan-400')
|
||
|
||
# Custom file picker - uses API endpoint
|
||
picker_html, picker_js = create_file_picker('encode_carrier', '.png,.PNG,image/png', '/api/upload/carrier', tab='encode')
|
||
ui.html(picker_html, sanitize=False)
|
||
ui.add_body_html(picker_js)
|
||
|
||
# Generate image buttons
|
||
ui.label('Or generate a carrier:').classes('text-gray-500 text-sm mt-2')
|
||
|
||
# Size selector for generated images
|
||
with ui.row().classes('gap-2 items-center'):
|
||
gen_size_select = ui.select(
|
||
options=['512x512', '1024x1024', '2048x2048', '4096x4096'],
|
||
value='1024x1024',
|
||
label='Size'
|
||
).props('dense').classes('w-24')
|
||
|
||
with ui.row().classes('gap-2'):
|
||
async def gen_image(color: str):
|
||
size_str = gen_size_select.value or '1024x1024'
|
||
w, h = map(int, size_str.split('x'))
|
||
state.carrier_image = generate_blank_image(w, h, color)
|
||
state.carrier_path = f'{color}_{size_str}.png'
|
||
state.active_tab = 'encode'
|
||
ui.notify(f'Generated {color} image ({size_str})', type='positive')
|
||
await asyncio.sleep(0.3)
|
||
await ui.run_javascript('window.location.hash = "encode"; window.location.reload();')
|
||
|
||
ui.button('🎲 Noise', on_click=lambda: gen_image('noise')).props('dense').classes('text-xs')
|
||
ui.button('⬛ Black', on_click=lambda: gen_image('black')).props('dense').classes('text-xs')
|
||
ui.button('🌈 Gradient', on_click=lambda: gen_image('gradient')).props('dense').classes('text-xs')
|
||
ui.button('⬜ White', on_click=lambda: gen_image('white')).props('dense').classes('text-xs')
|
||
|
||
# Right column - Data and settings
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('📝 PAYLOAD DATA').classes('text-green-400 text-lg font-bold')
|
||
|
||
# Data source toggle
|
||
with ui.row().classes('gap-2'):
|
||
data_source = ui.toggle(
|
||
{
|
||
'text': '💬 Text',
|
||
'file': '📁 File',
|
||
},
|
||
value='text'
|
||
).classes('w-full')
|
||
|
||
# Text input
|
||
text_container = ui.column().classes('w-full')
|
||
with text_container:
|
||
text_input = ui.textarea(
|
||
label='Secret message',
|
||
placeholder='Enter your hidden message here...'
|
||
).classes('w-full').props('rows=6 dark')
|
||
|
||
# File upload for payload
|
||
file_container = ui.column().classes('w-full hidden')
|
||
with file_container:
|
||
if state.encode_file_data and state.encode_file_name:
|
||
ui.label(f'📄 {state.encode_file_name} ({format_size(len(state.encode_file_data))})').classes('text-green-400')
|
||
else:
|
||
ui.label('No file selected').classes('text-gray-500')
|
||
|
||
payload_html, payload_js = create_file_picker('encode_payload', '*', '/api/upload/payload', tab='encode')
|
||
ui.html(payload_html, sanitize=False)
|
||
ui.add_body_html(payload_js)
|
||
|
||
def toggle_data_source():
|
||
if data_source.value == 'text':
|
||
text_container.classes(remove='hidden')
|
||
file_container.classes(add='hidden')
|
||
else:
|
||
text_container.classes(add='hidden')
|
||
file_container.classes(remove='hidden')
|
||
|
||
data_source.on('update:model-value', toggle_data_source)
|
||
|
||
ui.separator().classes('my-2')
|
||
|
||
# ===== ENCODING SETTINGS =====
|
||
ui.label('⚙️ ENCODING SETTINGS').classes('text-green-400 text-lg font-bold')
|
||
|
||
# Advanced mode toggle
|
||
advanced_toggle = ui.switch('🔧 Advanced Mode').classes('text-cyan-400')
|
||
|
||
# Basic settings (always visible)
|
||
with ui.row().classes('w-full gap-4'):
|
||
channel_select = ui.select(
|
||
label='Channels',
|
||
options=['RGB', 'RGBA', 'R', 'G', 'B'],
|
||
value='RGB'
|
||
).classes('w-1/2')
|
||
|
||
bits_select = ui.select(
|
||
label='Bits/Channel',
|
||
options=[1, 2],
|
||
value=1
|
||
).classes('w-1/2')
|
||
|
||
# Live capacity display
|
||
capacity_display = ui.html('', sanitize=False).classes('w-full')
|
||
|
||
def update_capacity_display():
|
||
"""Update capacity display based on current settings"""
|
||
if state.carrier_image is None:
|
||
capacity_display.set_content(
|
||
'<div style="color: #666; padding: 8px; border: 1px solid #333; border-radius: 4px;">'
|
||
'📊 Capacity: Load an image to see capacity'
|
||
'</div>'
|
||
)
|
||
return
|
||
|
||
try:
|
||
if advanced_toggle.value:
|
||
ch = adv_channel_select.value or 'RGB'
|
||
b = adv_bits_select.value or 1
|
||
else:
|
||
ch = channel_select.value or 'RGB'
|
||
b = bits_select.value or 1
|
||
|
||
config = create_config(channels=ch, bits=b)
|
||
cap = calculate_capacity(state.carrier_image, config)
|
||
|
||
# Calculate fill percentage if we have data
|
||
data_size = 0
|
||
if data_source.value == 'text' and text_input.value:
|
||
data_size = len(text_input.value.encode('utf-8'))
|
||
elif data_source.value == 'file' and state.encode_file_data:
|
||
data_size = len(state.encode_file_data) + len(state.encode_file_name or '') + 1
|
||
|
||
# Don't cap at 100% - we need to detect overflow!
|
||
fill_pct = (data_size / max(1, cap['usable_bytes'])) * 100
|
||
bar_width = min(100, fill_pct)
|
||
|
||
if fill_pct > 100:
|
||
color = '#ff3333'
|
||
bar_color = '#ff3333'
|
||
status = 'OVER CAPACITY'
|
||
icon = '⛔'
|
||
elif fill_pct > 80:
|
||
color = '#ffd000'
|
||
bar_color = '#ffd000'
|
||
status = 'NEARLY FULL'
|
||
icon = '⚠️'
|
||
elif fill_pct > 0:
|
||
color = '#00ff41'
|
||
bar_color = '#00ff41'
|
||
status = 'READY'
|
||
icon = '✓'
|
||
else:
|
||
color = '#555'
|
||
bar_color = '#333'
|
||
status = 'NO DATA'
|
||
icon = '○'
|
||
|
||
capacity_display.set_content(
|
||
f'<div class="capacity-meter">'
|
||
f'<div style="display: flex; justify-content: space-between; align-items: center;">'
|
||
f'<span style="color: #00ff41; font-size: 12px; font-weight: 600;">CAPACITY</span>'
|
||
f'<span style="color: #888; font-size: 11px;">{cap["usable_bytes"]:,} bytes</span>'
|
||
f'</div>'
|
||
f'<div class="capacity-bar-outer">'
|
||
f'<div class="capacity-bar-inner" style="width: {bar_width:.1f}%; background: {bar_color};"></div>'
|
||
f'</div>'
|
||
f'<div style="display: flex; justify-content: space-between; align-items: center; font-size: 11px;">'
|
||
f'<span style="color: {color};">{icon} {status}</span>'
|
||
f'<span style="color: #888;">{format_size(data_size)} / {cap["human"]} ({fill_pct:.1f}%)</span>'
|
||
f'</div>'
|
||
f'</div>'
|
||
)
|
||
|
||
# Store capacity for encode check
|
||
state.capacity_info = cap
|
||
|
||
except Exception as e:
|
||
capacity_display.set_content(f'<div style="color: #ff3333;">Error: {e}</div>')
|
||
|
||
# Update capacity on various changes
|
||
if state.carrier_image:
|
||
update_capacity_display()
|
||
|
||
# Advanced settings (hidden by default)
|
||
advanced_container = ui.column().classes('w-full hidden gap-2')
|
||
with advanced_container:
|
||
ui.label('🎯 Channel & Bit Configuration').classes('text-yellow-400 text-sm')
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
# All channel presets
|
||
adv_channel_select = ui.select(
|
||
label='All Channel Presets',
|
||
options=list(CHANNEL_PRESETS.keys()),
|
||
value='RGB'
|
||
).classes('w-1/2')
|
||
|
||
# Full bits range
|
||
adv_bits_select = ui.select(
|
||
label='Bits (1-8)',
|
||
options=list(range(1, 9)),
|
||
value=1
|
||
).classes('w-1/2')
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
strategy_select = ui.select(
|
||
label='Strategy',
|
||
options=['interleaved', 'sequential', 'spread', 'randomized'],
|
||
value='interleaved'
|
||
).classes('w-1/2')
|
||
|
||
bit_offset_select = ui.select(
|
||
label='Bit Offset (0=LSB, 7=MSB)',
|
||
options=list(range(8)),
|
||
value=0
|
||
).classes('w-1/2')
|
||
|
||
ui.separator().classes('my-2')
|
||
ui.label('🎲 Randomization').classes('text-yellow-400 text-sm')
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
seed_input = ui.number(
|
||
label='Random Seed (manual)',
|
||
value=None,
|
||
placeholder='Optional'
|
||
).classes('w-1/2')
|
||
|
||
seed_password = ui.input(
|
||
label='Or derive seed from password',
|
||
placeholder='Enter passphrase...'
|
||
).classes('w-1/2')
|
||
|
||
def derive_seed_from_password():
|
||
"""Derive a numeric seed from password hash"""
|
||
if seed_password.value:
|
||
import hashlib
|
||
hash_bytes = hashlib.sha256(seed_password.value.encode()).digest()
|
||
derived = int.from_bytes(hash_bytes[:4], 'big')
|
||
seed_input.value = derived
|
||
ui.notify(f'Derived seed: {derived}', type='info')
|
||
|
||
ui.button('🔑 Derive Seed', on_click=derive_seed_from_password).props('dense').classes('text-xs')
|
||
|
||
ui.separator().classes('my-2')
|
||
ui.label('🛡️ Stealth & Obfuscation').classes('text-yellow-400 text-sm')
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
compress_toggle = ui.switch('Compress data', value=True).classes('w-1/2')
|
||
invert_toggle = ui.switch('XOR invert bits', value=False).classes('w-1/2')
|
||
|
||
ui.html(
|
||
'<div style="color: #666; font-size: 11px; margin-top: 4px;">'
|
||
'• Compression reduces payload size but adds header bytes<br>'
|
||
'• XOR invert flips all bits for additional obfuscation'
|
||
'</div>', sanitize=False
|
||
)
|
||
|
||
ui.separator().classes('my-2')
|
||
ui.label('📊 Encoding Statistics').classes('text-yellow-400 text-sm')
|
||
|
||
stats_display = ui.html('', sanitize=False)
|
||
|
||
def update_stats():
|
||
"""Show encoding statistics preview"""
|
||
if state.carrier_image is None:
|
||
stats_display.set_content('<div style="color: #666;">Load carrier to see stats</div>')
|
||
return
|
||
|
||
w, h = state.carrier_image.size
|
||
total_pixels = w * h
|
||
|
||
ch = adv_channel_select.value or 'RGB'
|
||
b = adv_bits_select.value or 1
|
||
num_channels = len(CHANNEL_PRESETS.get(ch, []))
|
||
|
||
bits_per_pixel = num_channels * b
|
||
total_bits = total_pixels * bits_per_pixel
|
||
total_bytes = total_bits // 8
|
||
|
||
# Estimate pixels modified for a 1KB payload
|
||
sample_payload = 1024 * 8 # 1KB in bits
|
||
pixels_for_1kb = sample_payload // bits_per_pixel if bits_per_pixel else 0
|
||
|
||
stats_display.set_content(
|
||
f'<div style="font-family: monospace; font-size: 11px; color: #888;">'
|
||
f'Image: {w}x{h} = {total_pixels:,} pixels<br>'
|
||
f'Channels: {ch} ({num_channels} channels)<br>'
|
||
f'Bits/pixel: {bits_per_pixel} ({b} bits × {num_channels} ch)<br>'
|
||
f'Raw capacity: {total_bytes:,} bytes<br>'
|
||
f'Pixels for 1KB: ~{pixels_for_1kb:,} ({pixels_for_1kb/total_pixels*100:.2f}% of image)'
|
||
f'</div>'
|
||
)
|
||
|
||
# Initial stats update
|
||
update_stats()
|
||
|
||
# Update stats when settings change
|
||
adv_channel_select.on('update:model-value', lambda: (update_stats(), update_capacity_display()))
|
||
adv_bits_select.on('update:model-value', lambda: (update_stats(), update_capacity_display()))
|
||
|
||
def toggle_advanced():
|
||
if advanced_toggle.value:
|
||
advanced_container.classes(remove='hidden')
|
||
# Sync values
|
||
adv_channel_select.value = channel_select.value
|
||
adv_bits_select.value = bits_select.value
|
||
update_stats()
|
||
update_capacity_display()
|
||
else:
|
||
advanced_container.classes(add='hidden')
|
||
update_capacity_display()
|
||
|
||
advanced_toggle.on('update:model-value', toggle_advanced)
|
||
|
||
# Also update capacity when basic settings change
|
||
channel_select.on('update:model-value', update_capacity_display)
|
||
bits_select.on('update:model-value', update_capacity_display)
|
||
|
||
# Encryption settings
|
||
ui.separator().classes('my-2')
|
||
ui.label('🔐 ENCRYPTION').classes('text-green-400 text-lg font-bold')
|
||
|
||
encrypt_toggle = ui.switch('Enable Encryption').classes('text-cyan-400')
|
||
|
||
encrypt_container = ui.column().classes('w-full hidden gap-2')
|
||
with encrypt_container:
|
||
password_input = ui.input(
|
||
label='Password',
|
||
password=True,
|
||
password_toggle_button=True
|
||
).classes('w-full')
|
||
|
||
crypto_methods = crypto.get_available_methods()
|
||
encrypt_method = ui.select(
|
||
label='Method',
|
||
options=crypto_methods,
|
||
value=crypto_methods[0] if crypto_methods else 'xor'
|
||
).classes('w-full')
|
||
|
||
def toggle_encrypt():
|
||
if encrypt_toggle.value:
|
||
encrypt_container.classes(remove='hidden')
|
||
else:
|
||
encrypt_container.classes(add='hidden')
|
||
|
||
encrypt_toggle.on('update:model-value', toggle_encrypt)
|
||
|
||
ui.separator().classes('my-4')
|
||
|
||
# ENCODE BUTTON & OUTPUT
|
||
with ui.row().classes('w-full justify-center gap-4'):
|
||
async def do_encode():
|
||
if state.carrier_image is None:
|
||
ui.notify('Please upload or generate a carrier image first', type='negative')
|
||
return
|
||
|
||
# Get data to encode
|
||
if data_source.value == 'text':
|
||
if not text_input.value:
|
||
ui.notify('Please enter some text to hide', type='negative')
|
||
return
|
||
data = text_input.value.encode('utf-8')
|
||
else:
|
||
if not state.encode_file_data:
|
||
ui.notify('Please upload a file to hide', type='negative')
|
||
return
|
||
# Prepend filename for extraction
|
||
filename_bytes = state.encode_file_name.encode('utf-8')
|
||
data = bytes([len(filename_bytes)]) + filename_bytes + state.encode_file_data
|
||
|
||
# Build config
|
||
if advanced_toggle.value:
|
||
config = create_config(
|
||
channels=adv_channel_select.value or 'RGB',
|
||
bits=adv_bits_select.value or 1,
|
||
strategy=strategy_select.value or 'interleaved',
|
||
bit_offset=bit_offset_select.value or 0,
|
||
seed=int(seed_input.value) if seed_input.value else None,
|
||
compress=compress_toggle.value,
|
||
)
|
||
else:
|
||
config = create_config(
|
||
channels=channel_select.value or 'RGB',
|
||
bits=bits_select.value or 1,
|
||
)
|
||
|
||
# Check capacity before proceeding
|
||
cap = calculate_capacity(state.carrier_image, config)
|
||
# Account for header overhead (~32 bytes + compression metadata)
|
||
estimated_size = len(data) + 64 # Conservative estimate with header
|
||
if estimated_size > cap['usable_bytes']:
|
||
ui.notify(
|
||
f'⛔ Data too large! Need {format_size(estimated_size)}, but capacity is {cap["human"]}. '
|
||
f'Try: more bits, more channels, or larger image.',
|
||
type='negative',
|
||
timeout=8000
|
||
)
|
||
return
|
||
|
||
# Encrypt if enabled
|
||
if encrypt_toggle.value:
|
||
if not password_input.value:
|
||
ui.notify('Please enter an encryption password', type='negative')
|
||
return
|
||
data = crypto.encrypt(data, password_input.value, encrypt_method.value)
|
||
|
||
# XOR invert if enabled (advanced mode only)
|
||
if advanced_toggle.value and invert_toggle.value:
|
||
data = bytes(b ^ 0xFF for b in data)
|
||
|
||
try:
|
||
result = encode(state.carrier_image, data, config)
|
||
|
||
# Calculate encoding stats
|
||
data_bits = len(data) * 8
|
||
if advanced_toggle.value:
|
||
ch = adv_channel_select.value or 'RGB'
|
||
b = adv_bits_select.value or 1
|
||
else:
|
||
ch = channel_select.value or 'RGB'
|
||
b = bits_select.value or 1
|
||
num_ch = len(CHANNEL_PRESETS.get(ch, []))
|
||
bits_per_pixel = num_ch * b
|
||
pixels_used = (data_bits // bits_per_pixel) + 1 if bits_per_pixel else 0
|
||
total_pixels = state.carrier_image.size[0] * state.carrier_image.size[1]
|
||
pct_used = (pixels_used / total_pixels) * 100 if total_pixels else 0
|
||
|
||
# Show result
|
||
b64 = image_to_base64(result)
|
||
result_preview.set_source(f'data:image/png;base64,{b64}')
|
||
result_container.classes(remove='hidden')
|
||
|
||
# Update result stats
|
||
result_stats.set_content(
|
||
f'<div style="color: #00ff00; font-family: monospace; font-size: 12px;">'
|
||
f'✓ Encoded {format_size(len(data))} into {pixels_used:,} pixels ({pct_used:.2f}% of image)<br>'
|
||
f'Config: {ch} @ {b} bits | Strategy: {strategy_select.value if advanced_toggle.value else "interleaved"}'
|
||
f'{"<br>🔐 Encrypted" if encrypt_toggle.value else ""}'
|
||
f'{"<br>🔀 XOR Inverted" if advanced_toggle.value and invert_toggle.value else ""}'
|
||
f'</div>'
|
||
)
|
||
|
||
# Store for download
|
||
state.encode_result = result
|
||
|
||
ui.notify('✓ Encoding successful!', type='positive')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'Encoding failed: {str(e)}', type='negative')
|
||
|
||
ui.button('🔒 ENCODE', on_click=do_encode).classes(
|
||
'bg-green-600 hover:bg-green-500 text-black font-bold px-8 py-2 text-lg'
|
||
)
|
||
|
||
# Result display
|
||
result_container = ui.column().classes('w-full hidden gap-2 mt-4')
|
||
with result_container:
|
||
ui.label('✓ ENCODED IMAGE').classes('text-green-400 text-lg font-bold')
|
||
result_stats = ui.html('', sanitize=False).classes('w-full')
|
||
result_preview = ui.image().classes('w-full max-h-96 object-contain')
|
||
|
||
async def download_result():
|
||
if state.encode_result is None:
|
||
return
|
||
|
||
# Create download
|
||
buffer = io.BytesIO()
|
||
state.encode_result.save(buffer, format='PNG')
|
||
buffer.seek(0)
|
||
|
||
ui.download(buffer.getvalue(), 'stego_output.png')
|
||
|
||
ui.button('💾 Download Encoded Image', on_click=download_result).classes(
|
||
'bg-cyan-600 hover:bg-cyan-500 text-black font-bold'
|
||
)
|
||
|
||
|
||
async def create_decode_panel():
|
||
"""Create the decoding interface - Aperi'Solve inspired"""
|
||
|
||
global state
|
||
|
||
# Store scan results for UI updates
|
||
scan_results_data = {"results": [], "selected": None}
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
# Left column - Image upload
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('🖼️ STEGO IMAGE').classes('text-cyan-400 text-lg font-bold')
|
||
|
||
# Show current image if loaded
|
||
if state.decode_image:
|
||
b64 = image_to_base64(state.decode_image)
|
||
ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-48 object-contain')
|
||
w, h = state.decode_image.size
|
||
ui.label(f'Loaded | {w}x{h} | {w*h:,} pixels').classes('text-green-400 text-sm')
|
||
|
||
# Quick detection status
|
||
if state.detected_config:
|
||
ui.html(
|
||
f'<div style="color: #00ff00; background: rgba(0,255,0,0.1); padding: 8px; border: 1px solid #00ff00; border-radius: 4px;">'
|
||
f'✓ STEG v3 header detected<br>'
|
||
f'Channels: {state.detected_config["config"]["channels"]}<br>'
|
||
f'Payload: {format_size(state.detected_config["original_length"])}'
|
||
f'</div>', sanitize=False
|
||
)
|
||
else:
|
||
ui.html(
|
||
'<div style="color: #ffff00; background: rgba(255,255,0,0.1); padding: 8px; border: 1px solid #666; border-radius: 4px;">'
|
||
'⚠ No STEG header - use Smart Scan to detect LSB data'
|
||
'</div>', sanitize=False
|
||
)
|
||
else:
|
||
ui.image().classes('w-full max-h-48 object-contain')
|
||
ui.label('No image loaded').classes('text-gray-500 text-sm')
|
||
|
||
# Custom file picker
|
||
decode_html, decode_js = create_file_picker('decode_image', '.png,.PNG,image/png', '/api/upload/decode', tab='decode')
|
||
ui.html(decode_html, sanitize=False)
|
||
ui.add_body_html(decode_js)
|
||
|
||
# Right column - Decode options
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('⚙️ DECODE OPTIONS').classes('text-cyan-400 text-lg font-bold')
|
||
|
||
# Decryption password (optional)
|
||
ui.label('🔓 Decryption (optional)').classes('text-sm text-gray-400')
|
||
dec_password_input = ui.input(
|
||
label='Password (leave empty if unencrypted)',
|
||
password=True,
|
||
password_toggle_button=True
|
||
).classes('w-full')
|
||
|
||
ui.separator().classes('my-2')
|
||
|
||
# Manual decode settings
|
||
ui.label('📋 Manual Config (for targeted decode)').classes('text-sm text-gray-400')
|
||
with ui.row().classes('w-full gap-2'):
|
||
dec_channel_select = ui.select(
|
||
label='Channels',
|
||
options=list(CHANNEL_PRESETS.keys()),
|
||
value='RGB'
|
||
).classes('w-1/3')
|
||
|
||
dec_bits_select = ui.select(
|
||
label='Bits',
|
||
options=list(range(1, 9)),
|
||
value=1
|
||
).classes('w-1/3')
|
||
|
||
dec_strategy_select = ui.select(
|
||
label='Strategy',
|
||
options=['interleaved', 'sequential', 'spread', 'randomized'],
|
||
value='interleaved'
|
||
).classes('w-1/3')
|
||
|
||
ui.separator().classes('my-4')
|
||
|
||
# ACTION BUTTONS
|
||
with ui.row().classes('w-full justify-center gap-4'):
|
||
async def do_smart_scan():
|
||
"""Run Aperi'Solve-style multi-config scan"""
|
||
if state.decode_image is None:
|
||
ui.notify('Please upload an image first', type='negative')
|
||
return
|
||
|
||
ui.notify('🔍 Scanning all channel combinations...', type='info')
|
||
|
||
try:
|
||
password = dec_password_input.value if dec_password_input.value else None
|
||
results = smart_scan_image(state.decode_image, password)
|
||
scan_results_data["results"] = results
|
||
|
||
# Build results HTML table
|
||
html = '<div style="max-height: 400px; overflow-y: auto;">'
|
||
html += '<table style="width: 100%; border-collapse: collapse; font-size: 12px;">'
|
||
html += '<thead><tr style="background: #1a1a1a; color: #00ff00;">'
|
||
html += '<th style="padding: 8px; text-align: left; border: 1px solid #333;">Config</th>'
|
||
html += '<th style="padding: 8px; text-align: left; border: 1px solid #333;">Channels</th>'
|
||
html += '<th style="padding: 8px; text-align: center; border: 1px solid #333;">Bits</th>'
|
||
html += '<th style="padding: 8px; text-align: center; border: 1px solid #333;">Status</th>'
|
||
html += '<th style="padding: 8px; text-align: center; border: 1px solid #333;">Conf</th>'
|
||
html += '<th style="padding: 8px; text-align: left; border: 1px solid #333;">Preview</th>'
|
||
html += '</tr></thead><tbody>'
|
||
|
||
for i, r in enumerate(results):
|
||
# Status color coding
|
||
if r["status"] == "STEG_DETECTED" or r["status"] == "STEG_HEADER":
|
||
status_color = "#00ff00"
|
||
status_icon = "✓"
|
||
row_bg = "rgba(0, 255, 0, 0.15)"
|
||
elif r["status"] == "TEXT_FOUND":
|
||
status_color = "#00ffff"
|
||
status_icon = "📝"
|
||
row_bg = "rgba(0, 255, 255, 0.15)"
|
||
elif r["status"] == "POSSIBLE_TEXT":
|
||
status_color = "#ffff00"
|
||
status_icon = "?"
|
||
row_bg = "rgba(255, 255, 0, 0.1)"
|
||
elif r["status"] == "MIXED_DATA":
|
||
status_color = "#ff9900"
|
||
status_icon = "~"
|
||
row_bg = "transparent"
|
||
else:
|
||
status_color = "#666666"
|
||
status_icon = "·"
|
||
row_bg = "transparent"
|
||
|
||
# Confidence bar
|
||
conf = r.get("confidence", 0)
|
||
conf_bar = f'<div style="width: 50px; height: 8px; background: #333; border-radius: 2px;"><div style="width: {conf}%; height: 100%; background: {status_color}; border-radius: 2px;"></div></div>'
|
||
|
||
# Preview (escape HTML)
|
||
preview = r.get("preview", "")[:60]
|
||
preview_escaped = preview.replace("<", "<").replace(">", ">").replace("&", "&")
|
||
if len(r.get("preview", "")) > 60:
|
||
preview_escaped += "..."
|
||
|
||
channels_str = ",".join(r["channels"]) if isinstance(r["channels"], list) else str(r["channels"])
|
||
|
||
html += f'<tr style="background: {row_bg}; border-bottom: 1px solid #333;">'
|
||
html += f'<td style="padding: 6px; color: #00ff00; border: 1px solid #333;"><b>{r["name"]}</b></td>'
|
||
html += f'<td style="padding: 6px; color: #888; border: 1px solid #333;">{channels_str}</td>'
|
||
html += f'<td style="padding: 6px; color: #888; text-align: center; border: 1px solid #333;">{r["bits"]}</td>'
|
||
html += f'<td style="padding: 6px; color: {status_color}; text-align: center; border: 1px solid #333;">{status_icon} {r["status"]}</td>'
|
||
html += f'<td style="padding: 6px; text-align: center; border: 1px solid #333;">{conf_bar}</td>'
|
||
html += f'<td style="padding: 6px; color: #aaa; font-family: monospace; font-size: 11px; border: 1px solid #333; max-width: 300px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;">{preview_escaped}</td>'
|
||
html += '</tr>'
|
||
|
||
html += '</tbody></table></div>'
|
||
|
||
# Summary
|
||
text_found = sum(1 for r in results if r["status"] in ["TEXT_FOUND", "STEG_DETECTED", "STEG_HEADER"])
|
||
possible = sum(1 for r in results if r["status"] == "POSSIBLE_TEXT")
|
||
|
||
summary = f'<div style="margin-top: 12px; padding: 8px; background: #111; border: 1px solid #333;">'
|
||
summary += f'<span style="color: #00ff00;">📊 Scanned {len(results)} configurations</span> | '
|
||
summary += f'<span style="color: #00ffff;">Found: {text_found} likely</span> | '
|
||
summary += f'<span style="color: #ffff00;">Possible: {possible}</span>'
|
||
summary += '</div>'
|
||
|
||
scan_results_container.set_content(html + summary)
|
||
scan_results_container.classes(remove='hidden')
|
||
|
||
if text_found > 0:
|
||
ui.notify(f'✓ Found {text_found} potential hidden data!', type='positive')
|
||
else:
|
||
ui.notify('Scan complete - no obvious text found', type='warning')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'Scan failed: {str(e)}', type='negative')
|
||
|
||
async def do_decode():
|
||
"""Standard decode with current settings"""
|
||
if state.decode_image is None:
|
||
ui.notify('Please upload an image first', type='negative')
|
||
return
|
||
|
||
config = create_config(
|
||
channels=dec_channel_select.value or 'RGB',
|
||
bits=dec_bits_select.value or 1,
|
||
strategy=dec_strategy_select.value or 'interleaved',
|
||
)
|
||
|
||
try:
|
||
data = decode(state.decode_image, config)
|
||
|
||
# Decrypt if needed
|
||
if dec_password_input.value:
|
||
try:
|
||
data = crypto.decrypt(data, dec_password_input.value)
|
||
except Exception as e:
|
||
ui.notify(f'Decryption failed: {e}', type='warning')
|
||
|
||
# Process extracted data
|
||
process_extracted_data(data, output_text, file_download_btn)
|
||
ui.notify('✓ Decode successful!', type='positive')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'Decode failed: {str(e)}', type='negative')
|
||
output_text.set_value(f"Error: {str(e)}\n\nTry using Smart Scan to detect the correct configuration.")
|
||
|
||
async def do_auto_decode():
|
||
"""Auto-decode using detected STEG header"""
|
||
if state.decode_image is None:
|
||
ui.notify('Please upload an image first', type='negative')
|
||
return
|
||
|
||
try:
|
||
# Use auto-detection (config=None)
|
||
data = decode(state.decode_image, None)
|
||
|
||
# Decrypt if needed
|
||
if dec_password_input.value:
|
||
try:
|
||
data = crypto.decrypt(data, dec_password_input.value)
|
||
except Exception as e:
|
||
ui.notify(f'Decryption failed: {e}', type='warning')
|
||
|
||
# Process extracted data
|
||
process_extracted_data(data, output_text, file_download_btn)
|
||
ui.notify('✓ Auto-decode successful!', type='positive')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'Auto-decode failed: {str(e)}. Try Smart Scan.', type='negative')
|
||
output_text.set_value(f"Auto-decode failed: {str(e)}\n\nThe image may not contain STEG v3 header.\nTry using Smart Scan to detect raw LSB data.")
|
||
|
||
ui.button('🔍 SMART SCAN', on_click=do_smart_scan).classes(
|
||
'bg-purple-600 hover:bg-purple-500 text-white font-bold px-6 py-2'
|
||
).props('unelevated')
|
||
|
||
ui.button('🤖 AUTO DECODE', on_click=do_auto_decode).classes(
|
||
'bg-cyan-600 hover:bg-cyan-500 text-black font-bold px-6 py-2'
|
||
)
|
||
|
||
ui.button('📋 MANUAL DECODE', on_click=do_decode).classes(
|
||
'bg-gray-600 hover:bg-gray-500 text-white font-bold px-6 py-2'
|
||
)
|
||
|
||
# 🪆 MATRYOSHKA MODE BUTTON (only shows when matryoshka mode is active)
|
||
matryoshka_btn_container = ui.row().classes('w-full justify-center gap-4 mt-2')
|
||
|
||
with matryoshka_btn_container:
|
||
async def do_matryoshka_decode():
|
||
"""🪆 Perform recursive Matryoshka decode"""
|
||
if state.decode_image is None:
|
||
ui.notify('Please upload an image first', type='negative')
|
||
return
|
||
|
||
if not state.matryoshka_mode:
|
||
ui.notify('🪆 Matryoshka mode not active! Click the bottom-left corner to activate.', type='warning')
|
||
return
|
||
|
||
ui.notify(f'🪆 Starting Matryoshka decode (depth: {state.matryoshka_depth})...', type='info')
|
||
|
||
try:
|
||
password = dec_password_input.value if dec_password_input.value else None
|
||
results = matryoshka_decode(state.decode_image, max_depth=state.matryoshka_depth, password=password)
|
||
state.matryoshka_results = results
|
||
|
||
# Build nested results display
|
||
def build_results_html(res_list, indent=0):
|
||
html = ''
|
||
for r in res_list:
|
||
depth = r.get('depth', indent)
|
||
indent_px = depth * 24
|
||
doll_icon = '🪆' * (depth + 1) # More dolls for deeper layers
|
||
|
||
# Color based on type
|
||
if r.get('type') == 'nested_image' or r.get('type') == 'nested_image_raw':
|
||
color = '#ff00ff'
|
||
icon = '🖼️'
|
||
elif r.get('type') == 'text':
|
||
color = '#00ff00'
|
||
icon = '📝'
|
||
elif r.get('type') == 'file':
|
||
color = '#00ffff'
|
||
icon = '📁'
|
||
elif r.get('type') == 'no_data_found':
|
||
color = '#666666'
|
||
icon = '∅'
|
||
elif r.get('type') == 'max_depth_reached':
|
||
color = '#ffff00'
|
||
icon = '⚠️'
|
||
else:
|
||
color = '#888888'
|
||
icon = '?'
|
||
|
||
html += f'''
|
||
<div style="margin-left: {indent_px}px; padding: 8px; margin-bottom: 4px;
|
||
background: rgba(255,0,255,{0.1 - depth*0.02}); border-left: 3px solid {color};
|
||
border-radius: 4px;">
|
||
<div style="color: {color}; font-weight: bold;">
|
||
{doll_icon} Layer {depth} {icon}
|
||
<span style="color: #888; font-weight: normal;">
|
||
{r.get('filename') or r.get('type', 'unknown')}
|
||
</span>
|
||
</div>
|
||
<div style="color: #aaa; font-size: 11px; margin-top: 4px;">
|
||
Size: {format_size(r.get('data_size', 0))}
|
||
</div>
|
||
<div style="color: #ccc; font-family: monospace; font-size: 11px; margin-top: 4px;
|
||
max-height: 60px; overflow-y: auto; white-space: pre-wrap;">
|
||
{r.get('preview', '')[:200]}
|
||
</div>
|
||
</div>
|
||
'''
|
||
|
||
# Recurse into nested results
|
||
if r.get('nested_results'):
|
||
html += build_results_html(r['nested_results'], indent + 1)
|
||
|
||
return html
|
||
|
||
results_html = build_results_html(results)
|
||
|
||
# Count layers found
|
||
def count_layers(res_list):
|
||
count = len(res_list)
|
||
for r in res_list:
|
||
if r.get('nested_results'):
|
||
count += count_layers(r['nested_results'])
|
||
return count
|
||
|
||
total_layers = count_layers(results)
|
||
|
||
matryoshka_results_container.set_content(
|
||
f'''
|
||
<div class="matryoshka-tree">
|
||
<div style="color: #ff00ff; font-size: 14px; font-weight: bold; margin-bottom: 12px;">
|
||
🪆 MATRYOSHKA DECODE RESULTS
|
||
<span style="color: #888; font-weight: normal;">({total_layers} layers found)</span>
|
||
</div>
|
||
{results_html}
|
||
</div>
|
||
'''
|
||
)
|
||
matryoshka_results_container.classes(remove='hidden')
|
||
|
||
# Also extract the deepest/last data for download
|
||
def get_deepest_data(res_list):
|
||
for r in reversed(res_list):
|
||
if r.get('nested_results'):
|
||
deep = get_deepest_data(r['nested_results'])
|
||
if deep:
|
||
return deep
|
||
if r.get('raw_data'):
|
||
return r
|
||
return None
|
||
|
||
deepest = get_deepest_data(results)
|
||
if deepest and deepest.get('raw_data'):
|
||
state.extracted_file = deepest['raw_data']
|
||
state.extracted_filename = deepest.get('filename', 'extracted_data.bin')
|
||
file_download_btn.classes(remove='hidden')
|
||
|
||
# Also show in output text
|
||
try:
|
||
text = deepest['raw_data'].decode('utf-8')
|
||
output_text.set_value(f"🪆 Deepest layer content:\n\n{text}")
|
||
except:
|
||
output_text.set_value(f"🪆 Deepest layer: {deepest.get('filename', 'binary data')} ({format_size(len(deepest['raw_data']))})")
|
||
|
||
ui.notify(f'🪆 Matryoshka decode complete! Found {total_layers} layers.', type='positive')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'🪆 Matryoshka decode failed: {str(e)}', type='negative')
|
||
matryoshka_results_container.set_content(
|
||
f'<div style="color: #ff3333; padding: 12px;">Error: {str(e)}</div>'
|
||
)
|
||
|
||
matryoshka_decode_btn = ui.button('🪆 MATRYOSHKA DECODE', on_click=do_matryoshka_decode).classes(
|
||
'bg-pink-600 hover:bg-pink-500 text-white font-bold px-6 py-2'
|
||
).props('unelevated').style('display: none;')
|
||
|
||
# Show/hide based on matryoshka mode state
|
||
ui.add_body_html('''
|
||
<script>
|
||
// Check matryoshka state and show/hide button
|
||
function updateMatryoshkaButton() {
|
||
const btn = document.querySelector('[class*="bg-pink-600"]');
|
||
const panel = document.getElementById('matryoshka-panel');
|
||
if (btn && panel) {
|
||
if (!panel.classList.contains('hidden')) {
|
||
btn.style.display = 'inline-flex';
|
||
} else {
|
||
btn.style.display = 'none';
|
||
}
|
||
}
|
||
}
|
||
|
||
// Check periodically
|
||
setInterval(updateMatryoshkaButton, 500);
|
||
</script>
|
||
''')
|
||
|
||
# 🪆 MATRYOSHKA RESULTS CONTAINER
|
||
matryoshka_results_container = ui.html('', sanitize=False).classes('w-full hidden mt-4')
|
||
|
||
# SCAN RESULTS (hidden until scan runs)
|
||
ui.label('📊 SCAN RESULTS').classes('text-purple-400 text-lg font-bold mt-4')
|
||
scan_results_container = ui.html(
|
||
'<div style="color: #666; padding: 20px; text-align: center;">'
|
||
'Click "Smart Scan" to analyze all channel combinations'
|
||
'</div>', sanitize=False
|
||
).classes('w-full')
|
||
|
||
ui.separator().classes('my-4')
|
||
|
||
# OUTPUT SECTION
|
||
ui.label('📤 EXTRACTED DATA').classes('text-cyan-400 text-lg font-bold')
|
||
output_text = ui.textarea(
|
||
label='Decoded output',
|
||
placeholder='Extracted data will appear here...'
|
||
).classes('w-full terminal').props('rows=8 readonly dark')
|
||
|
||
with ui.row().classes('gap-2'):
|
||
async def download_file():
|
||
if state.extracted_file is None:
|
||
return
|
||
ui.download(state.extracted_file, state.extracted_filename)
|
||
|
||
file_download_btn = ui.button('💾 Download Extracted File', on_click=download_file).classes(
|
||
'bg-green-600 hover:bg-green-500 text-black font-bold hidden'
|
||
)
|
||
|
||
async def download_raw():
|
||
if not output_text.value:
|
||
return
|
||
ui.download(output_text.value.encode('utf-8'), 'extracted_text.txt')
|
||
|
||
ui.button('📄 Download as Text', on_click=download_raw).classes(
|
||
'bg-gray-600 hover:bg-gray-500 text-white font-bold'
|
||
)
|
||
|
||
|
||
def process_extracted_data(data: bytes, output_text, file_download_btn):
|
||
"""Process and display extracted data"""
|
||
try:
|
||
# Check if it's a file (starts with filename length byte)
|
||
if len(data) > 2 and data[0] < 255:
|
||
fname_len = data[0]
|
||
if fname_len > 0 and fname_len < 200:
|
||
try:
|
||
filename = data[1:1+fname_len].decode('utf-8')
|
||
file_data = data[1+fname_len:]
|
||
# Check if it looks like a valid filename
|
||
if '.' in filename and len(filename) < 200 and '/' not in filename:
|
||
state.extracted_file = file_data
|
||
state.extracted_filename = filename
|
||
output_text.set_value(
|
||
f"📁 Extracted file: {filename}\n"
|
||
f" Size: {format_size(len(file_data))}\n\n"
|
||
f"(Click 'Download Extracted File' to save)"
|
||
)
|
||
file_download_btn.classes(remove='hidden')
|
||
return
|
||
except:
|
||
pass
|
||
|
||
# Try as plain text
|
||
text = data.decode('utf-8')
|
||
output_text.set_value(text)
|
||
file_download_btn.classes(add='hidden')
|
||
|
||
except UnicodeDecodeError:
|
||
# Binary data - show hex preview
|
||
hex_preview = data[:512].hex()
|
||
formatted = ' '.join(hex_preview[i:i+2] for i in range(0, min(len(hex_preview), 512), 2))
|
||
output_text.set_value(
|
||
f"Binary data ({format_size(len(data))}):\n\n{formatted}"
|
||
+ ("..." if len(data) > 256 else "")
|
||
)
|
||
state.extracted_file = data
|
||
state.extracted_filename = 'extracted_data.bin'
|
||
file_download_btn.classes(remove='hidden')
|
||
|
||
|
||
async def create_matryoshka_panel():
|
||
"""Create the Matryoshka (nested steganography) interface"""
|
||
|
||
global state
|
||
|
||
ui.html('''
|
||
<div style="text-align: center; margin-bottom: 20px;">
|
||
<div style="color: #ff00ff; font-size: 24px; font-weight: bold;">🪆 MATRYOSHKA MODE 🪆</div>
|
||
<div style="color: #ff99ff; font-size: 12px;">Russian Nesting Doll Steganography</div>
|
||
</div>
|
||
''', sanitize=False)
|
||
|
||
with ui.row().classes('w-full gap-6'):
|
||
# ==================== ENCODE SECTION ====================
|
||
with ui.column().classes('w-1/2 gap-3'):
|
||
ui.label('🔐 NESTED ENCODE').classes('text-pink-400 text-lg font-bold')
|
||
|
||
ui.html('<div style="color: #999; font-size: 11px; margin-bottom: 8px;">Add carrier images (outermost first). Each layer wraps the previous.</div>', sanitize=False)
|
||
|
||
# Carrier list with capacity info
|
||
carrier_container = ui.column().classes('w-full gap-1 p-3').style(
|
||
'background: rgba(255,0,255,0.05); border: 1px solid rgba(255,0,255,0.3); border-radius: 8px; min-height: 120px;'
|
||
)
|
||
|
||
def refresh_carriers():
|
||
carrier_container.clear()
|
||
with carrier_container:
|
||
if not state.matryoshka_carriers:
|
||
ui.html('<div style="color: #666; text-align: center; padding: 20px;">No carriers added yet</div>', sanitize=False)
|
||
else:
|
||
config = create_config(channels='RGBA', bits=2)
|
||
for i, (img, name) in enumerate(state.matryoshka_carriers):
|
||
cap = _calculate_capacity_bytes(img, config)
|
||
layer_num = len(state.matryoshka_carriers) - i
|
||
with ui.row().classes('w-full items-center gap-2 p-2').style(
|
||
'background: rgba(255,0,255,0.1); border-radius: 4px; margin-bottom: 4px;'
|
||
):
|
||
ui.html(f'<span style="color: #ff00ff; font-weight: bold;">L{layer_num}</span>', sanitize=False)
|
||
ui.label(f'{name[:25]}').classes('text-pink-300 text-sm flex-grow')
|
||
ui.label(f'{img.size[0]}x{img.size[1]}').classes('text-gray-500 text-xs')
|
||
ui.label(f'{cap//1024}KB').classes('text-pink-400 text-xs')
|
||
|
||
refresh_carriers()
|
||
|
||
# Carrier upload
|
||
carrier_upload_html, carrier_upload_js = create_drop_zone(
|
||
'matryoshka_carrier', 'image/png,image/jpeg,image/webp',
|
||
'/api/matryoshka/add_carrier', 'matryoshka'
|
||
)
|
||
ui.html(carrier_upload_html, sanitize=False)
|
||
ui.add_body_html(carrier_upload_js)
|
||
|
||
with ui.row().classes('w-full gap-2'):
|
||
def clear_carriers():
|
||
state.matryoshka_carriers = []
|
||
refresh_carriers()
|
||
ui.notify('🪆 Carriers cleared', type='info')
|
||
|
||
ui.button('🗑️ Clear Carriers', on_click=clear_carriers).props('flat dense').classes('text-pink-400')
|
||
|
||
ui.separator().classes('my-2')
|
||
|
||
# Payload info
|
||
ui.html('<div style="color: #ff99ff; font-size: 12px; font-weight: bold;">PAYLOAD</div>', sanitize=False)
|
||
with ui.row().classes('w-full items-center gap-2 p-2').style('background: rgba(0,0,0,0.3); border-radius: 4px;'):
|
||
if state.encode_file_data:
|
||
ui.icon('description').classes('text-pink-400')
|
||
ui.label(f'{state.encode_file_name or "file"} ({len(state.encode_file_data)} bytes)').classes('text-pink-300 text-sm')
|
||
else:
|
||
ui.icon('text_fields').classes('text-gray-500')
|
||
ui.label('Set payload in ENCODE tab first').classes('text-gray-500 text-sm')
|
||
|
||
# Encode button
|
||
async def do_encode():
|
||
if not state.matryoshka_carriers:
|
||
ui.notify('Add carrier images first!', type='warning')
|
||
return
|
||
if not state.encode_file_data and not (hasattr(state, 'encode_text') and state.encode_text):
|
||
ui.notify('Set payload in ENCODE tab first', type='warning')
|
||
return
|
||
|
||
ui.notify('🪆 Encoding nested layers...', type='info')
|
||
ui.run_javascript('''
|
||
fetch('/api/matryoshka/encode', {
|
||
method: 'POST',
|
||
headers: {'Content-Type': 'application/json'},
|
||
body: JSON.stringify({channels: 'RGBA', bits: 2})
|
||
})
|
||
.then(r => r.json())
|
||
.then(data => {
|
||
if (data.success) {
|
||
window.location.href = '/api/matryoshka/download';
|
||
} else {
|
||
alert('Error: ' + data.error);
|
||
}
|
||
});
|
||
''')
|
||
|
||
ui.button('🪆 ENCODE MATRYOSHKA', on_click=do_encode).props('unelevated').classes(
|
||
'w-full mt-4 bg-pink-900 text-pink-200'
|
||
).style('font-size: 14px; font-weight: bold;')
|
||
|
||
# ==================== DECODE SECTION ====================
|
||
with ui.column().classes('w-1/2 gap-3'):
|
||
ui.label('🔓 NESTED DECODE').classes('text-pink-400 text-lg font-bold')
|
||
|
||
ui.html('<div style="color: #999; font-size: 11px; margin-bottom: 8px;">Upload an image and recursively extract all nested layers.</div>', sanitize=False)
|
||
|
||
# Depth control
|
||
with ui.row().classes('w-full items-center gap-3'):
|
||
ui.label('Max Depth:').classes('text-pink-400')
|
||
depth_slider = ui.slider(min=1, max=11, value=state.matryoshka_depth, step=1).classes('flex-grow').props('color="pink"')
|
||
depth_label = ui.label(f'{state.matryoshka_depth}').classes('text-pink-300 font-bold text-lg')
|
||
|
||
def update_depth(e):
|
||
state.matryoshka_depth = int(depth_slider.value)
|
||
depth_label.set_text(str(state.matryoshka_depth))
|
||
|
||
depth_slider.on('update:model-value', update_depth)
|
||
|
||
# Image upload for decode
|
||
decode_upload_html, decode_upload_js = create_drop_zone(
|
||
'matryoshka_decode', 'image/png',
|
||
'/api/upload/decode', 'matryoshka'
|
||
)
|
||
ui.html(decode_upload_html, sanitize=False)
|
||
ui.add_body_html(decode_upload_js)
|
||
|
||
# Current decode image preview
|
||
if state.decode_image:
|
||
b64 = image_to_base64(state.decode_image)
|
||
ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-48 object-contain mt-2')
|
||
|
||
# Decode button
|
||
async def do_decode():
|
||
if not state.decode_image:
|
||
ui.notify('Upload an image first', type='warning')
|
||
return
|
||
|
||
ui.notify('🪆 Decoding nested layers...', type='info')
|
||
ui.run_javascript(f'''
|
||
fetch('/api/matryoshka/decode', {{
|
||
method: 'POST',
|
||
headers: {{'Content-Type': 'application/json'}},
|
||
body: JSON.stringify({{depth: {state.matryoshka_depth}}})
|
||
}})
|
||
.then(r => r.json())
|
||
.then(data => {{
|
||
if (data.success) {{
|
||
alert('🪆 Found ' + data.layers_found + ' nested layers!');
|
||
window.location.reload();
|
||
}} else {{
|
||
alert('Error: ' + data.error);
|
||
}}
|
||
}});
|
||
''')
|
||
|
||
ui.button('🪆 DECODE MATRYOSHKA', on_click=do_decode).props('unelevated').classes(
|
||
'w-full mt-4 bg-pink-900 text-pink-200'
|
||
).style('font-size: 14px; font-weight: bold;')
|
||
|
||
# Results display
|
||
if state.matryoshka_results:
|
||
ui.separator().classes('my-3')
|
||
ui.label('📦 EXTRACTED LAYERS').classes('text-pink-400 font-bold')
|
||
|
||
for result in state.matryoshka_results:
|
||
with ui.card().classes('w-full p-2 mt-2').style('background: rgba(255,0,255,0.1); border: 1px solid rgba(255,0,255,0.3);'):
|
||
depth = result.get('depth', 0)
|
||
rtype = result.get('type', 'unknown')
|
||
preview = result.get('preview', '')[:200]
|
||
|
||
ui.html(f'''
|
||
<div style="color: #ff00ff; font-weight: bold;">Layer {depth}: {rtype}</div>
|
||
<div style="color: #ccc; font-size: 11px; margin-top: 4px; word-break: break-all;">{preview}</div>
|
||
''', sanitize=False)
|
||
|
||
|
||
async def create_analyze_panel():
|
||
"""Create the analysis interface"""
|
||
|
||
global state
|
||
|
||
with ui.row().classes('w-full gap-4'):
|
||
# Left column - Image upload
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('🔍 IMAGE TO ANALYZE').classes('text-yellow-400 text-lg font-bold')
|
||
|
||
# Show current image if loaded
|
||
if state.analyze_image:
|
||
b64 = image_to_base64(state.analyze_image)
|
||
ui.image(f'data:image/png;base64,{b64}').classes('w-full max-h-64 object-contain')
|
||
w, h = state.analyze_image.size
|
||
ui.label(f'Loaded | {w}x{h}').classes('text-green-400 text-sm')
|
||
else:
|
||
ui.image().classes('w-full max-h-64 object-contain')
|
||
ui.label('No image loaded').classes('text-gray-500 text-sm')
|
||
|
||
# Custom file picker - uses API endpoint
|
||
analyze_html, analyze_js = create_file_picker('analyze_image', '.png,.PNG,image/png', '/api/upload/analyze', tab='analyze')
|
||
ui.html(analyze_html, sanitize=False)
|
||
ui.add_body_html(analyze_js)
|
||
|
||
# Right column - Results
|
||
with ui.column().classes('w-1/2 gap-2'):
|
||
ui.label('📊 ANALYSIS RESULTS').classes('text-yellow-400 text-lg font-bold')
|
||
|
||
analysis_output = ui.html('', sanitize=False).classes('w-full terminal p-4')
|
||
|
||
ui.separator().classes('my-4')
|
||
|
||
with ui.row().classes('w-full justify-center'):
|
||
async def do_analyze():
|
||
if state.analyze_image is None:
|
||
ui.notify('Please upload an image first', type='negative')
|
||
return
|
||
|
||
try:
|
||
analysis = analyze_image(state.analyze_image)
|
||
detection = detect_encoding(state.analyze_image)
|
||
|
||
# Format output
|
||
html = '<pre style="color: #00ff00;">'
|
||
html += '╔══════════════════════════════════════╗\n'
|
||
html += '║ STEGANALYSIS REPORT ║\n'
|
||
html += '╠══════════════════════════════════════╣\n'
|
||
|
||
html += f'║ Dimensions: {analysis["dimensions"]["width"]}x{analysis["dimensions"]["height"]}\n'
|
||
html += f'║ Total Pixels: {analysis["total_pixels"]:,}\n'
|
||
html += f'║ Format: {analysis["format"] or "PNG"}\n'
|
||
html += '╠══════════════════════════════════════╣\n'
|
||
|
||
# Detection result
|
||
if detection:
|
||
html += '<span style="color: #00ff00;">║ ✓ STEG HEADER DETECTED</span>\n'
|
||
html += f'║ Channels: {detection["config"]["channels"]}\n'
|
||
html += f'║ Bits: {detection["config"]["bits_per_channel"]}\n'
|
||
html += f'║ Strategy: {detection["config"]["strategy"]}\n'
|
||
html += f'║ Payload: {format_size(detection["original_length"])}\n'
|
||
else:
|
||
html += '<span style="color: #666;">║ No STEG header detected</span>\n'
|
||
|
||
html += '╠══════════════════════════════════════╣\n'
|
||
|
||
# Statistical analysis
|
||
det = analysis['detection']
|
||
if det['level'] == 'HIGH':
|
||
color = '#ff4444'
|
||
elif det['level'] == 'MEDIUM':
|
||
color = '#ffff00'
|
||
else:
|
||
color = '#00ff00'
|
||
|
||
html += f'<span style="color: {color};">║ Detection Level: {det["level"]}</span>\n'
|
||
html += f'║ Confidence: {det["confidence"]*100:.1f}%\n'
|
||
html += f'║ {det["recommendation"]}\n'
|
||
|
||
html += '╠══════════════════════════════════════╣\n'
|
||
html += '║ CAPACITY BY CONFIG:\n'
|
||
for cfg, cap in analysis['capacity_by_config'].items():
|
||
html += f'║ {cfg}: {cap}\n'
|
||
|
||
html += '╠══════════════════════════════════════╣\n'
|
||
html += '║ CHANNEL ANALYSIS:\n'
|
||
for ch_name, ch_data in analysis['channels'].items():
|
||
chi = ch_data['chi_square_indicator']
|
||
chi_color = '#ff4444' if chi > 0.5 else '#ffff00' if chi > 0.2 else '#00ff00'
|
||
html += f'║ {ch_name}: μ={ch_data["mean"]:.1f} σ={ch_data["std"]:.1f} '
|
||
html += f'<span style="color: {chi_color};">χ²={chi:.3f}</span>\n'
|
||
|
||
html += '╚══════════════════════════════════════╝'
|
||
html += '</pre>'
|
||
|
||
analysis_output.set_content(html)
|
||
ui.notify('Analysis complete!', type='positive')
|
||
|
||
except Exception as e:
|
||
ui.notify(f'Analysis failed: {str(e)}', type='negative')
|
||
|
||
ui.button('🔬 ANALYZE', on_click=do_analyze).classes(
|
||
'bg-yellow-600 hover:bg-yellow-500 text-black font-bold px-8 py-2 text-lg'
|
||
)
|
||
|
||
|
||
# ============== MAIN ==============
|
||
|
||
def main():
|
||
"""Run the web UI"""
|
||
print(STEGO_ASCII)
|
||
print("\n🦕 Starting STEGOSAURUS WRECKS Web UI...")
|
||
print(" Open http://localhost:8080 in your browser\n")
|
||
|
||
ui.run(
|
||
title='STEGOSAURUS WRECKS',
|
||
favicon='🦕',
|
||
dark=True,
|
||
port=8080,
|
||
reload=False,
|
||
)
|
||
|
||
|
||
if __name__ in {"__main__", "__mp_main__"}:
|
||
main()
|