Files
2026-04-02 14:42:09 -07:00

2799 lines
98 KiB
Python

"""
STEGOSAURUS WRECKS - Comprehensive Analysis Tools
Complete toolkit for steganography detection across all file types
This module provides 264+ analysis functions covering:
- Images: PNG, JPEG, GIF, BMP, WebP, TIFF, ICO, HEIC, AVIF, SVG
- Audio: WAV, MP3, FLAC, OGG
- Video: AVI, MKV
- Documents: PDF, Office
- Archives: ZIP, RAR
- Fonts: TTF, OTF, WOFF
"""
import struct
import zlib
import io
import re
import json
import hashlib
import binascii
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union, BinaryIO
from dataclasses import dataclass, field
from enum import Enum
import math
# Optional imports - gracefully handle missing dependencies
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
try:
from PIL import Image, ExifTags
HAS_PIL = True
except ImportError:
HAS_PIL = False
# ============== CORE INFRASTRUCTURE ==============
@dataclass
class AnalysisResult:
"""Standard result format for all analysis functions"""
success: bool
action: str
file_type: str
data: Dict[str, Any] = field(default_factory=dict)
findings: List[str] = field(default_factory=list)
suspicious: bool = False
confidence: float = 0.0
raw_data: Optional[bytes] = None
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"success": self.success,
"action": self.action,
"file_type": self.file_type,
"data": self.data,
"findings": self.findings,
"suspicious": self.suspicious,
"confidence": self.confidence,
"has_raw_data": self.raw_data is not None,
"error": self.error
}
class FileType(Enum):
PNG = "png"
JPEG = "jpeg"
GIF = "gif"
BMP = "bmp"
WEBP = "webp"
TIFF = "tiff"
ICO = "ico"
HEIC = "heic"
AVIF = "avif"
SVG = "svg"
WAV = "wav"
MP3 = "mp3"
FLAC = "flac"
OGG = "ogg"
AVI = "avi"
MKV = "mkv"
PDF = "pdf"
OFFICE = "office"
ZIP = "zip"
RAR = "rar"
FONT = "font"
AIFF = "aiff"
AU = "au"
MIDI = "midi"
PCAP = "pcap"
SQLITE = "sqlite"
GZIP = "gzip"
TAR = "tar"
UNKNOWN = "unknown"
# Magic bytes for file type detection
MAGIC_SIGNATURES = {
b'\x89PNG\r\n\x1a\n': FileType.PNG,
b'\xff\xd8\xff': FileType.JPEG,
b'GIF87a': FileType.GIF,
b'GIF89a': FileType.GIF,
b'BM': FileType.BMP,
b'RIFF': FileType.WAV, # Could also be AVI - check further
b'\xff\xfb': FileType.MP3,
b'\xff\xfa': FileType.MP3,
b'\xff\xf3': FileType.MP3,
b'\xff\xf2': FileType.MP3,
b'ID3': FileType.MP3,
b'fLaC': FileType.FLAC,
b'OggS': FileType.OGG,
b'%PDF': FileType.PDF,
b'PK\x03\x04': FileType.ZIP, # Could be Office - check further
b'Rar!\x1a\x07': FileType.RAR,
b'\x1aE\xdf\xa3': FileType.MKV,
b'\x00\x00\x01\x00': FileType.ICO,
b'\x00\x00\x02\x00': FileType.ICO, # CUR format
b'\x1f\x8b': FileType.GZIP,
b'MThd': FileType.MIDI,
b'.snd': FileType.AU,
b'\xa1\xb2\xc3\xd4': FileType.PCAP,
b'\xd4\xc3\xb2\xa1': FileType.PCAP, # Little-endian PCAP
b'SQLite format 3': FileType.SQLITE,
}
WEBP_SIGNATURES = [b'WEBP']
HEIC_SIGNATURES = [b'ftyp', b'heic', b'heix', b'hevc', b'mif1']
AVIF_SIGNATURES = [b'ftypavif', b'ftypavis']
def detect_file_type(data: bytes) -> FileType:
"""Detect file type from magic bytes"""
if len(data) < 12:
return FileType.UNKNOWN
# Check standard signatures
for magic, ftype in MAGIC_SIGNATURES.items():
if data.startswith(magic):
# Special handling for RIFF container
if magic == b'RIFF' and len(data) >= 12:
if data[8:12] == b'WAVE':
return FileType.WAV
elif data[8:12] == b'AVI ':
return FileType.AVI
elif data[8:12] == b'WEBP':
return FileType.WEBP
# Special handling for ZIP-based formats
elif magic == b'PK\x03\x04':
# Check if it's an Office document
if b'[Content_Types].xml' in data[:2000] or b'word/' in data[:2000] or b'xl/' in data[:2000] or b'ppt/' in data[:2000]:
return FileType.OFFICE
return FileType.ZIP
return ftype
# Check for HEIC/AVIF (ftyp box)
if len(data) >= 12 and data[4:8] == b'ftyp':
brand = data[8:12]
if brand in [b'heic', b'heix', b'hevc', b'mif1']:
return FileType.HEIC
elif brand in [b'avif', b'avis']:
return FileType.AVIF
# Check for TIFF (II = little-endian, MM = big-endian)
if data[:4] in [b'II\x2a\x00', b'MM\x00\x2a']:
return FileType.TIFF
# Check for AIFF (FORM container with AIFF type)
if data[:4] == b'FORM' and len(data) >= 12:
if data[8:12] == b'AIFF' or data[8:12] == b'AIFC':
return FileType.AIFF
# Check for TAR (magic at offset 257)
if len(data) >= 265 and data[257:262] == b'ustar':
return FileType.TAR
# Check for SVG
if b'<svg' in data[:1000] or b'<?xml' in data[:100] and b'<svg' in data[:2000]:
return FileType.SVG
# Check for fonts
if data[:4] in [b'\x00\x01\x00\x00', b'OTTO', b'true', b'typ1']:
return FileType.FONT
if data[:4] == b'wOFF' or data[:4] == b'wOF2':
return FileType.FONT
return FileType.UNKNOWN
def calculate_entropy(data: bytes) -> float:
"""Calculate Shannon entropy of data"""
if not data:
return 0.0
byte_counts = [0] * 256
for byte in data:
byte_counts[byte] += 1
length = len(data)
entropy = 0.0
for count in byte_counts:
if count > 0:
p = count / length
entropy -= p * math.log2(p)
return entropy
def calculate_chi_square(data: bytes) -> float:
"""Calculate chi-square statistic for randomness test"""
if not data:
return 0.0
byte_counts = [0] * 256
for byte in data:
byte_counts[byte] += 1
expected = len(data) / 256
chi_square = sum((count - expected) ** 2 / expected for count in byte_counts)
return chi_square
def find_strings(data: bytes, min_length: int = 4) -> List[Tuple[int, str]]:
"""Extract printable ASCII strings from binary data"""
strings = []
current = []
start_offset = 0
for i, byte in enumerate(data):
if 32 <= byte < 127:
if not current:
start_offset = i
current.append(chr(byte))
else:
if len(current) >= min_length:
strings.append((start_offset, ''.join(current)))
current = []
if len(current) >= min_length:
strings.append((start_offset, ''.join(current)))
return strings
def hex_dump(data: bytes, offset: int = 0, length: int = 256) -> str:
"""Create hex dump of data"""
result = []
chunk = data[offset:offset + length]
for i in range(0, len(chunk), 16):
line_data = chunk[i:i + 16]
hex_part = ' '.join(f'{b:02x}' for b in line_data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in line_data)
result.append(f'{offset + i:08x} {hex_part:<48} {ascii_part}')
return '\n'.join(result)
# ============== BIT PLANE ANALYSIS ==============
def extract_bit_plane(data: bytes, bit: int) -> bytes:
"""Extract specific bit plane from data"""
if not HAS_NUMPY:
# Fallback without numpy
result = bytearray()
for i in range(0, len(data), 8):
byte_val = 0
for j in range(8):
if i + j < len(data):
byte_val |= ((data[i + j] >> bit) & 1) << (7 - j)
result.append(byte_val)
return bytes(result)
arr = np.frombuffer(data, dtype=np.uint8)
plane = (arr >> bit) & 1
# Pack bits into bytes
padded = np.pad(plane, (0, (8 - len(plane) % 8) % 8), mode='constant')
packed = np.packbits(padded)
return packed.tobytes()
def analyze_bit_planes(data: bytes) -> Dict[str, Any]:
"""Analyze all 8 bit planes"""
results = {}
for bit in range(8):
plane_data = extract_bit_plane(data, bit)
results[f'plane_{bit}'] = {
'entropy': calculate_entropy(plane_data),
'unique_bytes': len(set(plane_data)),
'sample': plane_data[:64].hex()
}
return results
# ============== LSB EXTRACTION ==============
def extract_lsb(data: bytes, bits: int = 1, channels: str = "RGB") -> bytes:
"""Extract LSB data from raw pixel bytes"""
if not data:
return b''
extracted_bits = []
mask = (1 << bits) - 1
for byte in data:
for bit_pos in range(bits):
extracted_bits.append((byte >> bit_pos) & 1)
# Pack bits into bytes
result = bytearray()
for i in range(0, len(extracted_bits), 8):
byte_val = 0
for j in range(8):
if i + j < len(extracted_bits):
byte_val |= extracted_bits[i + j] << j
result.append(byte_val)
return bytes(result)
# ============== PATTERN DETECTION ==============
def detect_repeated_patterns(data: bytes, min_length: int = 4, max_length: int = 32) -> List[Dict[str, Any]]:
"""Detect repeated byte patterns"""
patterns = []
for length in range(min_length, min(max_length, len(data) // 2) + 1):
seen = {}
for i in range(len(data) - length + 1):
pattern = data[i:i + length]
if pattern in seen:
seen[pattern].append(i)
else:
seen[pattern] = [i]
for pattern, offsets in seen.items():
if len(offsets) >= 3: # At least 3 occurrences
patterns.append({
'pattern': pattern.hex(),
'length': length,
'count': len(offsets),
'offsets': offsets[:10] # First 10 offsets
})
return sorted(patterns, key=lambda x: x['count'], reverse=True)[:20]
def detect_xor_patterns(data: bytes) -> Dict[str, Any]:
"""Detect potential XOR encryption patterns"""
results = {
'single_byte_keys': [],
'repeating_key_likely': False,
'key_length_candidates': []
}
# Try single-byte XOR keys
for key in range(256):
decoded = bytes(b ^ key for b in data[:256])
# Check if result looks like text
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
if printable > len(decoded) * 0.7:
results['single_byte_keys'].append({
'key': key,
'key_hex': f'{key:02x}',
'printable_ratio': printable / len(decoded),
'sample': decoded[:50].decode('ascii', errors='replace')
})
# Detect repeating key by looking at byte frequency at intervals
for key_len in range(2, 17):
columns = [[] for _ in range(key_len)]
for i, b in enumerate(data[:1024]):
columns[i % key_len].append(b)
# Check if each column has low entropy (single-byte XOR characteristic)
avg_entropy = sum(calculate_entropy(bytes(col)) for col in columns) / key_len
if avg_entropy < 5.0: # Lower than random
results['key_length_candidates'].append({
'length': key_len,
'avg_column_entropy': avg_entropy
})
if results['key_length_candidates']:
results['repeating_key_likely'] = True
return results
# ============== ENCODING DETECTION ==============
def detect_base64(data: bytes) -> Dict[str, Any]:
"""Detect and decode potential Base64 encoded content"""
results = {
'found': False,
'segments': []
}
# Base64 pattern
b64_pattern = rb'[A-Za-z0-9+/]{20,}={0,2}'
text = data.decode('ascii', errors='ignore')
matches = re.finditer(r'[A-Za-z0-9+/]{20,}={0,2}', text)
for match in matches:
b64_str = match.group()
try:
# Try to decode
import base64
decoded = base64.b64decode(b64_str)
# Check if decoded content is meaningful
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
results['segments'].append({
'offset': match.start(),
'length': len(b64_str),
'decoded_length': len(decoded),
'printable_ratio': printable / len(decoded) if decoded else 0,
'decoded_preview': decoded[:100].decode('utf-8', errors='replace') if printable > len(decoded) * 0.5 else decoded[:50].hex()
})
results['found'] = True
except:
pass
return results
def detect_hex_strings(data: bytes) -> Dict[str, Any]:
"""Detect hex-encoded strings"""
results = {
'found': False,
'segments': []
}
text = data.decode('ascii', errors='ignore')
# Match continuous hex strings
hex_pattern = r'(?:[0-9a-fA-F]{2}){8,}'
for match in re.finditer(hex_pattern, text):
hex_str = match.group()
try:
decoded = bytes.fromhex(hex_str)
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
results['segments'].append({
'offset': match.start(),
'length': len(hex_str),
'decoded_length': len(decoded),
'printable_ratio': printable / len(decoded) if decoded else 0,
'decoded_preview': decoded[:100].decode('utf-8', errors='replace') if printable > len(decoded) * 0.5 else None
})
results['found'] = True
except:
pass
return results
def detect_unicode_steg(data: bytes) -> Dict[str, Any]:
"""Detect Unicode-based steganography (zero-width chars, homoglyphs)"""
results = {
'found': False,
'zero_width_chars': [],
'homoglyphs': [],
'invisible_chars': 0
}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
# Zero-width characters
zwc_chars = {
'\u200b': 'ZERO WIDTH SPACE',
'\u200c': 'ZERO WIDTH NON-JOINER',
'\u200d': 'ZERO WIDTH JOINER',
'\u2060': 'WORD JOINER',
'\ufeff': 'ZERO WIDTH NO-BREAK SPACE (BOM)',
'\u180e': 'MONGOLIAN VOWEL SEPARATOR',
}
for char, name in zwc_chars.items():
count = text.count(char)
if count > 0:
results['zero_width_chars'].append({
'char': repr(char),
'name': name,
'count': count
})
results['invisible_chars'] += count
results['found'] = True
# Check for variation selectors
for i, char in enumerate(text):
if '\ufe00' <= char <= '\ufe0f':
results['invisible_chars'] += 1
results['found'] = True
return results
def detect_whitespace_steg(data: bytes) -> Dict[str, Any]:
"""Detect whitespace steganography (tabs/spaces encoding)"""
results = {
'found': False,
'trailing_spaces': 0,
'mixed_indentation': False,
'suspicious_patterns': [],
'potential_message': None
}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
lines = text.split('\n')
tab_indent_lines = 0
space_indent_lines = 0
for line in lines:
# Count trailing whitespace
stripped = line.rstrip()
trailing = len(line) - len(stripped)
if trailing > 0:
results['trailing_spaces'] += trailing
# Check indentation type
if line.startswith('\t'):
tab_indent_lines += 1
elif line.startswith(' '):
space_indent_lines += 1
if tab_indent_lines > 0 and space_indent_lines > 0:
results['mixed_indentation'] = True
if results['trailing_spaces'] > 10:
results['found'] = True
# Try to decode as binary (space=0, tab=1 or similar)
bits = []
for line in lines:
trailing = line[len(line.rstrip()):]
for char in trailing:
if char == ' ':
bits.append('0')
elif char == '\t':
bits.append('1')
if len(bits) >= 8:
try:
bit_string = ''.join(bits)
message = bytearray()
for i in range(0, len(bit_string) - 7, 8):
byte_val = int(bit_string[i:i+8], 2)
if byte_val == 0:
break
message.append(byte_val)
decoded = bytes(message).decode('utf-8', errors='ignore')
if decoded and all(32 <= ord(c) < 127 or c in '\r\n\t' for c in decoded):
results['potential_message'] = decoded[:200]
except:
pass
return results
# ============== TOOL REGISTRY ==============
class AnalysisToolRegistry:
"""Registry of all analysis tools organized by action name"""
def __init__(self):
self._tools: Dict[str, callable] = {}
self._register_all_tools()
def _register_all_tools(self):
"""Register all analysis tools"""
# Core detection tools
self._tools['detect_base64'] = detect_base64
self._tools['detect_hex_strings'] = detect_hex_strings
self._tools['detect_unicode_steg'] = detect_unicode_steg
self._tools['detect_whitespace_steg'] = detect_whitespace_steg
self._tools['detect_xor_patterns'] = detect_xor_patterns
self._tools['detect_repeated_patterns'] = detect_repeated_patterns
# Analysis tools
self._tools['analyze_entropy'] = lambda data: {'entropy': calculate_entropy(data)}
self._tools['analyze_bit_planes'] = analyze_bit_planes
# Will be populated by format-specific modules
def register(self, action: str, func: callable):
"""Register a tool function"""
self._tools[action] = func
def get(self, action: str) -> Optional[callable]:
"""Get a tool function by action name"""
return self._tools.get(action)
def execute(self, action: str, data: bytes, **kwargs) -> AnalysisResult:
"""Execute an analysis tool"""
func = self._tools.get(action)
if not func:
return AnalysisResult(
success=False,
action=action,
file_type="unknown",
error=f"Unknown action: {action}"
)
try:
result = func(data, **kwargs)
# Convert result to AnalysisResult if needed
if isinstance(result, AnalysisResult):
return result
elif isinstance(result, dict):
return AnalysisResult(
success=True,
action=action,
file_type=kwargs.get('file_type', 'unknown'),
data=result,
suspicious=result.get('found', False) or result.get('suspicious', False)
)
else:
return AnalysisResult(
success=True,
action=action,
file_type=kwargs.get('file_type', 'unknown'),
data={'result': result}
)
except Exception as e:
return AnalysisResult(
success=False,
action=action,
file_type=kwargs.get('file_type', 'unknown'),
error=str(e)
)
def list_tools(self) -> List[str]:
"""List all registered tools"""
return sorted(self._tools.keys())
# Global registry instance
TOOL_REGISTRY = AnalysisToolRegistry()
def execute_action(action: str, data: bytes, **kwargs) -> AnalysisResult:
"""Execute an analysis action"""
return TOOL_REGISTRY.execute(action, data, **kwargs)
def list_available_tools() -> List[str]:
"""List all available analysis tools"""
return TOOL_REGISTRY.list_tools()
# ============== PNG ANALYSIS TOOLS ==============
PNG_MAGIC = b'\x89PNG\r\n\x1a\n'
PNG_CHUNK_TYPES = {
'IHDR': 'Image header',
'PLTE': 'Palette',
'IDAT': 'Image data',
'IEND': 'Image end',
'tEXt': 'Textual data',
'zTXt': 'Compressed textual data',
'iTXt': 'International textual data',
'bKGD': 'Background color',
'cHRM': 'Primary chromaticities',
'gAMA': 'Gamma',
'hIST': 'Palette histogram',
'iCCP': 'ICC profile',
'pHYs': 'Physical pixel dimensions',
'sBIT': 'Significant bits',
'sPLT': 'Suggested palette',
'sRGB': 'Standard RGB color space',
'tIME': 'Last modification time',
'tRNS': 'Transparency',
'eXIf': 'EXIF data',
'acTL': 'Animation control (APNG)',
'fcTL': 'Frame control (APNG)',
'fdAT': 'Frame data (APNG)',
}
def png_parse_chunks(data: bytes) -> Dict[str, Any]:
"""Parse all PNG chunks and return detailed information"""
if not data.startswith(PNG_MAGIC):
return {'error': 'Not a valid PNG file', 'valid': False}
chunks = []
pos = 8 # Skip magic bytes
total_idat_size = 0
chunk_type_counts = {}
while pos < len(data):
if pos + 8 > len(data):
break
chunk_length = struct.unpack('>I', data[pos:pos+4])[0]
chunk_type = data[pos+4:pos+8].decode('ascii', errors='replace')
if pos + 12 + chunk_length > len(data):
chunks.append({
'type': chunk_type,
'offset': pos,
'length': chunk_length,
'error': 'Truncated chunk'
})
break
chunk_data = data[pos+8:pos+8+chunk_length]
stored_crc = struct.unpack('>I', data[pos+8+chunk_length:pos+12+chunk_length])[0]
calculated_crc = zlib.crc32(data[pos+4:pos+8+chunk_length]) & 0xffffffff
chunk_info = {
'type': chunk_type,
'description': PNG_CHUNK_TYPES.get(chunk_type, 'Unknown/Private'),
'offset': pos,
'length': chunk_length,
'crc_valid': stored_crc == calculated_crc,
'crc_stored': f'{stored_crc:08x}',
'crc_calculated': f'{calculated_crc:08x}',
}
# Track chunk type counts
chunk_type_counts[chunk_type] = chunk_type_counts.get(chunk_type, 0) + 1
# Track IDAT size
if chunk_type == 'IDAT':
total_idat_size += chunk_length
# Parse IHDR
if chunk_type == 'IHDR' and chunk_length == 13:
width, height, bit_depth, color_type, compression, filter_method, interlace = struct.unpack('>IIBBBBB', chunk_data)
chunk_info['parsed'] = {
'width': width,
'height': height,
'bit_depth': bit_depth,
'color_type': color_type,
'compression': compression,
'filter': filter_method,
'interlace': interlace
}
# Parse text chunks
elif chunk_type == 'tEXt':
null_pos = chunk_data.find(b'\x00')
if null_pos != -1:
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
text = chunk_data[null_pos+1:].decode('latin-1', errors='replace')
chunk_info['parsed'] = {'keyword': keyword, 'text': text[:500]}
elif chunk_type == 'zTXt':
null_pos = chunk_data.find(b'\x00')
if null_pos != -1:
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
try:
text = zlib.decompress(chunk_data[null_pos+2:]).decode('latin-1', errors='replace')
chunk_info['parsed'] = {'keyword': keyword, 'text': text[:500], 'compressed': True}
except:
chunk_info['parsed'] = {'keyword': keyword, 'error': 'Decompression failed'}
elif chunk_type == 'iTXt':
null_pos = chunk_data.find(b'\x00')
if null_pos != -1:
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
chunk_info['parsed'] = {'keyword': keyword}
# Parse tIME
elif chunk_type == 'tIME' and chunk_length == 7:
year, month, day, hour, minute, second = struct.unpack('>HBBBBB', chunk_data)
chunk_info['parsed'] = {
'timestamp': f'{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}'
}
# Parse pHYs
elif chunk_type == 'pHYs' and chunk_length == 9:
ppux, ppuy, unit = struct.unpack('>IIB', chunk_data)
chunk_info['parsed'] = {
'pixels_per_unit_x': ppux,
'pixels_per_unit_y': ppuy,
'unit': 'meter' if unit == 1 else 'unknown'
}
chunks.append(chunk_info)
pos += 12 + chunk_length
if chunk_type == 'IEND':
break
# Check for data after IEND
after_iend = len(data) - pos
return {
'valid': True,
'chunks': chunks,
'chunk_count': len(chunks),
'chunk_type_counts': chunk_type_counts,
'total_idat_size': total_idat_size,
'data_after_iend': after_iend,
'suspicious': after_iend > 0
}
def png_extract_text_chunks(data: bytes) -> Dict[str, Any]:
"""Extract all text metadata from PNG"""
result = png_parse_chunks(data)
if not result.get('valid'):
return result
text_chunks = []
for chunk in result['chunks']:
if chunk['type'] in ('tEXt', 'zTXt', 'iTXt') and 'parsed' in chunk:
text_chunks.append({
'type': chunk['type'],
'keyword': chunk['parsed'].get('keyword', ''),
'text': chunk['parsed'].get('text', ''),
'offset': chunk['offset']
})
return {
'found': len(text_chunks) > 0,
'text_chunks': text_chunks,
'count': len(text_chunks)
}
def png_detect_appended_data(data: bytes) -> Dict[str, Any]:
"""Detect data appended after PNG IEND chunk"""
if not data.startswith(PNG_MAGIC):
return {'found': False, 'error': 'Not a valid PNG file'}
# Parse through PNG chunks to find actual IEND position
pos = 8 # Skip magic
iend_end_pos = None
while pos + 8 <= len(data):
chunk_length = struct.unpack('>I', data[pos:pos+4])[0]
chunk_type = data[pos+4:pos+8]
# Chunk end = pos + 4 (length) + 4 (type) + chunk_length + 4 (CRC)
chunk_end_pos = pos + 12 + chunk_length
if chunk_type == b'IEND':
iend_end_pos = chunk_end_pos
break
pos = chunk_end_pos
if iend_end_pos is None:
return {'found': False, 'error': 'No IEND chunk found'}
if iend_end_pos >= len(data):
return {'found': False, 'appended_size': 0}
appended_data = data[iend_end_pos:]
if len(appended_data) == 0:
return {'found': False, 'appended_size': 0}
# Analyze appended data
result = {
'found': True,
'appended_size': len(appended_data),
'offset': iend_end_pos,
'entropy': calculate_entropy(appended_data),
'preview_hex': appended_data[:64].hex(),
'suspicious': True
}
# Check if appended data is another file
file_type = detect_file_type(appended_data)
if file_type != FileType.UNKNOWN:
result['embedded_file_type'] = file_type.value
# Check for printable text
try:
text = appended_data[:200].decode('utf-8')
if all(c.isprintable() or c in '\r\n\t' for c in text):
result['text_preview'] = text
except:
pass
return result
def png_analyze_idat(data: bytes) -> Dict[str, Any]:
"""Analyze PNG IDAT chunks for anomalies"""
result = png_parse_chunks(data)
if not result.get('valid'):
return result
idat_chunks = []
prev_end = 0
for chunk in result['chunks']:
if chunk['type'] == 'IDAT':
idat_chunks.append({
'offset': chunk['offset'],
'length': chunk['length'],
'crc_valid': chunk['crc_valid']
})
# Check for gap between IDAT chunks
if prev_end > 0 and chunk['offset'] != prev_end:
gap = chunk['offset'] - prev_end
if gap > 12: # More than just the next chunk header
idat_chunks[-1]['gap_before'] = gap
prev_end = chunk['offset'] + 12 + chunk['length']
if not idat_chunks:
return {'found': False, 'error': 'No IDAT chunks found'}
total_size = sum(c['length'] for c in idat_chunks)
sizes = [c['length'] for c in idat_chunks]
return {
'found': True,
'chunk_count': len(idat_chunks),
'total_size': total_size,
'chunks': idat_chunks,
'size_variance': max(sizes) - min(sizes) if len(sizes) > 1 else 0,
'avg_chunk_size': total_size // len(idat_chunks),
'all_crc_valid': all(c['crc_valid'] for c in idat_chunks),
'suspicious': any('gap_before' in c for c in idat_chunks)
}
def png_extract_lsb(data: bytes, bits: int = 1, channels: str = "RGB") -> Dict[str, Any]:
"""Extract LSB data from PNG image pixels"""
if not HAS_PIL:
return {'error': 'PIL not available', 'found': False}
try:
img = Image.open(io.BytesIO(data))
# Convert to RGBA for consistent processing
if img.mode == 'P':
img = img.convert('RGBA')
elif img.mode == 'L':
img = img.convert('RGB')
elif img.mode not in ('RGB', 'RGBA'):
img = img.convert('RGBA')
pixels = list(img.getdata())
# Extract bits from specified channels
channel_map = {'R': 0, 'G': 1, 'B': 2, 'A': 3}
channel_indices = [channel_map[c] for c in channels.upper() if c in channel_map]
extracted_bits = []
mask = (1 << bits) - 1
for pixel in pixels:
for ch_idx in channel_indices:
if ch_idx < len(pixel):
for bit_pos in range(bits):
extracted_bits.append((pixel[ch_idx] >> bit_pos) & 1)
# Pack into bytes
result_bytes = bytearray()
for i in range(0, len(extracted_bits) - 7, 8):
byte_val = 0
for j in range(8):
byte_val |= extracted_bits[i + j] << j
result_bytes.append(byte_val)
raw_data = bytes(result_bytes)
# Look for patterns
result = {
'found': True,
'extracted_size': len(raw_data),
'channels': channels,
'bits_per_channel': bits,
'entropy': calculate_entropy(raw_data[:1024]),
'raw_data': raw_data
}
# Check for STEG magic
if raw_data[:4] == b'STEG':
result['steg_header_found'] = True
result['suspicious'] = True
# Check for file signatures
file_type = detect_file_type(raw_data)
if file_type != FileType.UNKNOWN:
result['embedded_file_type'] = file_type.value
result['suspicious'] = True
# Check for readable text
try:
text = raw_data[:100].decode('utf-8')
printable = sum(1 for c in text if c.isprintable() or c in '\r\n\t')
if printable > len(text) * 0.7:
result['text_preview'] = text
result['suspicious'] = True
except:
pass
return result
except Exception as e:
return {'error': str(e), 'found': False}
def png_chi_square_analysis(data: bytes) -> Dict[str, Any]:
"""Chi-square analysis to detect LSB manipulation"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL or numpy not available'}
try:
img = Image.open(io.BytesIO(data))
if img.mode == 'P':
img = img.convert('RGB')
elif img.mode == 'L':
img = img.convert('RGB')
elif img.mode not in ('RGB', 'RGBA'):
img = img.convert('RGB')
pixels = np.array(img)
results = {}
# Analyze each channel
channel_names = ['Red', 'Green', 'Blue', 'Alpha']
for ch_idx in range(min(pixels.shape[2], 4)):
channel = pixels[:, :, ch_idx].flatten()
# Pair analysis: count pairs (2k, 2k+1)
pairs = np.zeros(128)
for val in channel:
pair_idx = val // 2
if pair_idx < 128:
pairs[pair_idx] += 1
# Expected distribution
total = len(channel)
expected = total / 128
# Chi-square for pairs
chi_sq = sum((pairs[i] - expected) ** 2 / expected for i in range(128) if expected > 0)
# Also analyze bit plane
lsb_plane = channel & 1
ones = np.sum(lsb_plane)
zeros = total - ones
expected_ones = total / 2
lsb_chi_sq = ((ones - expected_ones) ** 2 / expected_ones +
(zeros - expected_ones) ** 2 / expected_ones)
results[channel_names[ch_idx]] = {
'chi_square_pairs': float(chi_sq),
'chi_square_lsb': float(lsb_chi_sq),
'lsb_ones_ratio': float(ones / total),
'suspicious': lsb_chi_sq > 3.84 # 95% confidence threshold
}
overall_suspicious = any(r['suspicious'] for r in results.values())
return {
'found': True,
'channels': results,
'suspicious': overall_suspicious,
'interpretation': 'Low chi-square LSB values may indicate LSB steganography' if overall_suspicious else 'No strong LSB manipulation detected'
}
except Exception as e:
return {'error': str(e), 'found': False}
def png_bit_plane_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze individual bit planes of PNG image"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL or numpy not available'}
try:
img = Image.open(io.BytesIO(data))
if img.mode == 'P':
img = img.convert('RGB')
pixels = np.array(img)
results = {}
channel_names = ['Red', 'Green', 'Blue', 'Alpha'][:pixels.shape[2] if len(pixels.shape) > 2 else 1]
if len(pixels.shape) == 2: # Grayscale
pixels = pixels.reshape(pixels.shape[0], pixels.shape[1], 1)
channel_names = ['Gray']
for ch_idx, ch_name in enumerate(channel_names):
channel = pixels[:, :, ch_idx]
planes = {}
for bit in range(8):
plane = (channel >> bit) & 1
# Calculate entropy of bit plane
plane_bytes = np.packbits(plane.flatten())
entropy = calculate_entropy(plane_bytes.tobytes())
# Calculate percentage of 1s
ones_pct = np.mean(plane) * 100
planes[f'bit_{bit}'] = {
'entropy': float(entropy),
'ones_percentage': float(ones_pct),
'suspicious': bit < 2 and (entropy > 7.5 or abs(ones_pct - 50) < 1)
}
results[ch_name] = planes
# Determine overall suspicion
suspicious = any(
plane['suspicious']
for channel_planes in results.values()
for plane in channel_planes.values()
)
return {
'found': True,
'channels': results,
'suspicious': suspicious,
'interpretation': 'High entropy in lower bit planes may indicate hidden data'
}
except Exception as e:
return {'error': str(e), 'found': False}
def png_palette_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze PNG palette for steganography indicators and attempt LSB decode"""
if not HAS_PIL:
return {'error': 'PIL not available'}
try:
img = Image.open(io.BytesIO(data))
if img.mode != 'P':
return {'found': False, 'reason': 'Image is not palette-based'}
palette = img.getpalette()
if not palette:
return {'found': False, 'reason': 'No palette found'}
# Palette is RGB triplets
colors = []
for i in range(0, len(palette), 3):
colors.append((palette[i], palette[i+1], palette[i+2]))
# Check for sorted palette (common steg indicator)
is_sorted = colors == sorted(colors)
# Check for near-duplicate colors (LSB differences only)
near_duplicates = []
for i, c1 in enumerate(colors):
for j, c2 in enumerate(colors[i+1:], i+1):
diff = sum(abs(a - b) for a, b in zip(c1, c2))
if 0 < diff <= 3: # Very similar colors
near_duplicates.append((i, j, diff))
# Check color distribution
histogram = img.histogram()
used_colors = sum(1 for h in histogram[:256] if h > 0)
# === PALETTE INDEX LSB DECODE ATTEMPT ===
# Extract LSB of each pixel's palette index
pixel_indices = list(img.getdata())
bits = [idx & 1 for idx in pixel_indices]
decoded_message = None
decoded_length = None
if len(bits) >= 32:
# Try 32-bit length prefix (big-endian)
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if 0 < length < min(5000, (len(bits) - 32) // 8):
decoded_length = length
msg_bits = bits[32:32 + length * 8]
msg_bytes = bytearray()
for i in range(0, len(msg_bits), 8):
v = 0
for j in range(8):
if i + j < len(msg_bits):
v = (v << 1) | msg_bits[i + j]
msg_bytes.append(v)
try:
decoded_message = msg_bytes.decode('utf-8', errors='replace')
except:
decoded_message = msg_bytes.hex()
# === PALETTE COLOR LSB DECODE ATTEMPT ===
# Some steg tools encode in the LSB of palette colors themselves
palette_lsb_bits = []
for r, g, b in colors:
palette_lsb_bits.extend([r & 1, g & 1, b & 1])
palette_decoded = None
if len(palette_lsb_bits) >= 32:
plen = 0
for i in range(32):
plen = (plen << 1) | palette_lsb_bits[i]
if 0 < plen < (len(palette_lsb_bits) - 32) // 8:
pbits = palette_lsb_bits[32:32 + plen * 8]
pbytes = bytearray()
for i in range(0, len(pbits), 8):
v = 0
for j in range(8):
if i + j < len(pbits):
v = (v << 1) | pbits[i + j]
pbytes.append(v)
try:
palette_decoded = pbytes.decode('utf-8', errors='replace')
except:
palette_decoded = pbytes.hex()
result = {
'found': True,
'palette_size': len(colors),
'used_colors': used_colors,
'is_sorted': is_sorted,
'near_duplicate_pairs': len(near_duplicates),
'near_duplicates': near_duplicates[:10],
'suspicious': len(near_duplicates) > 5 or is_sorted or decoded_message is not None,
'interpretation': 'Sorted palette or many near-duplicates may indicate palette-based steganography',
}
if decoded_message:
result['index_lsb_decode'] = {
'length': decoded_length,
'message': decoded_message[:200],
'method': 'palette_index_lsb'
}
if palette_decoded:
result['palette_color_lsb_decode'] = {
'message': palette_decoded[:200],
'method': 'palette_color_lsb'
}
return result
except Exception as e:
return {'error': str(e), 'found': False}
def detect_pvd_steg(data: bytes) -> Dict[str, Any]:
"""Detect and decode Pixel Value Differencing (PVD) steganography.
PVD encodes bits in the LSB of the second pixel in each horizontal pair.
bit=1 means the R channel of pixel[x+1] is odd, bit=0 means even.
"""
if not HAS_PIL:
return {'error': 'PIL not available', 'found': False}
try:
img = Image.open(io.BytesIO(data)).convert('RGB')
pixels = img.load()
width, height = img.size
# Extract bits from R channel LSB of every second pixel in each pair
bits = []
for y in range(height):
for x in range(0, width - 1, 2):
r2, _, _ = pixels[x + 1, y]
bits.append(r2 & 1)
if len(bits) < 32:
return {'found': False, 'reason': 'Not enough pixel pairs'}
# Try 32-bit big-endian length prefix
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if length <= 0 or length > min(5000, (len(bits) - 32) // 8):
return {'found': False, 'reason': f'Invalid length prefix: {length}'}
msg_bits = bits[32:32 + length * 8]
msg_bytes = bytearray()
for i in range(0, len(msg_bits), 8):
v = 0
for j in range(8):
if i + j < len(msg_bits):
v = (v << 1) | msg_bits[i + j]
msg_bytes.append(v)
try:
decoded = msg_bytes.decode('utf-8', errors='replace')
except:
decoded = msg_bytes.hex()
return {
'found': True,
'method': 'pvd_pair_lsb',
'length': length,
'message': decoded[:200],
'suspicious': True,
'findings': [f'PVD decode ({length} bytes): {decoded[:80]}'],
'interpretation': 'Pixel Value Differencing — data encoded in R channel LSB of paired pixels'
}
except Exception as e:
return {'error': str(e), 'found': False}
def detect_histogram_shift_steg(data: bytes) -> Dict[str, Any]:
"""Detect and decode histogram shifting steganography.
Histogram shifting encodes bits by shifting the peak pixel value:
peak stays = 0, peak+1 = 1. The encoder also shifts all pixels > peak
by +1 to make room, so the encoded image has two peaks (original peak
split into peak and peak+1). We try multiple candidate peaks.
"""
if not HAS_PIL:
return {'error': 'PIL not available', 'found': False}
try:
img = Image.open(io.BytesIO(data))
if img.mode != 'L':
img = img.convert('L')
pixels = img.load()
width, height = img.size
# Build histogram
hist = [0] * 256
for y in range(height):
for x in range(width):
hist[pixels[x, y]] += 1
# After encoding, the original peak is split across peak and peak+1.
# We brute-force all possible peak values (0-254) since the peak
# may not be the highest in the encoded histogram.
candidates = list(range(255))
for peak in candidates:
bits = []
for y in range(height):
for x in range(width):
v = pixels[x, y]
if v == peak:
bits.append(0)
elif v == peak + 1:
bits.append(1)
if len(bits) < 40:
continue
# Try 32-bit length prefix
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if length <= 0 or length > min(5000, (len(bits) - 32) // 8):
continue
msg_bits = bits[32:32 + length * 8]
msg_bytes = bytearray()
for i in range(0, len(msg_bits), 8):
v = 0
for j in range(8):
if i + j < len(msg_bits):
v = (v << 1) | msg_bits[i + j]
msg_bytes.append(v)
try:
decoded = msg_bytes.decode('utf-8', errors='replace')
except:
continue
# Check if it looks like valid text (high threshold to avoid false positives)
printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t')
if printable > len(decoded) * 0.85:
return {
'found': True,
'method': 'histogram_shift',
'peak': peak,
'length': length,
'message': decoded[:200],
'suspicious': True,
'findings': [f'Histogram shift decode (peak={peak}, {length} bytes): {decoded[:80]}'],
'interpretation': 'Histogram shifting — peak pixel values encode bits via shift'
}
return {'found': False, 'reason': 'No valid histogram shift pattern found'}
except Exception as e:
return {'error': str(e), 'found': False}
def detect_multibit_lsb(data: bytes) -> Dict[str, Any]:
"""Detect and decode multi-bit LSB steganography (2-bit, 4-bit per channel)."""
if not HAS_PIL:
return {'error': 'PIL not available', 'found': False}
try:
img = Image.open(io.BytesIO(data)).convert('RGBA')
pixels = list(img.getdata())
results = {}
for bits_per_ch in [2, 4]:
mask = (1 << bits_per_ch) - 1
nibbles = []
for r, g, b, a in pixels:
for ch in [r, g, b]:
nibbles.append(ch & mask)
units_per_byte = 8 // bits_per_ch
length_units = 4 * units_per_byte
if len(nibbles) < length_units:
continue
length = 0
for i in range(length_units):
length = (length << bits_per_ch) | nibbles[i]
if 0 < length < min(5000, (len(nibbles) - length_units) // units_per_byte):
msg = bytearray()
idx = length_units
for _ in range(length):
byte_val = 0
for _ in range(units_per_byte):
if idx < len(nibbles):
byte_val = (byte_val << bits_per_ch) | nibbles[idx]
idx += 1
msg.append(byte_val & 0xFF)
try:
decoded = msg.decode('utf-8', errors='replace')
except:
decoded = msg.hex()
printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t')
if printable > len(decoded) * 0.5:
results[f'{bits_per_ch}bit'] = {
'found': True,
'bits_per_channel': bits_per_ch,
'length': length,
'message': decoded[:200],
}
if results:
best = list(results.values())[0]
return {
'found': True,
'method': f'multibit_lsb_{best["bits_per_channel"]}bpc',
'length': best['length'],
'message': best['message'],
'suspicious': True,
'findings': [f'Multi-bit LSB ({best["bits_per_channel"]}bpc, {best["length"]} bytes): {best["message"][:80]}'],
}
return {'found': False, 'reason': 'No valid multi-bit LSB pattern found'}
except Exception as e:
return {'error': str(e), 'found': False}
def gif_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze GIF files for steganography — comment blocks, palette LSB, disposal methods"""
if not HAS_PIL:
return {'error': 'PIL not available'}
results = {
'found': False,
'findings': [],
'comment_blocks': [],
'palette_lsb_decode': None,
'disposal_methods': [],
}
try:
# 1. Extract GIF comment extension blocks from raw data
pos = 0
while pos < len(data) - 2:
if data[pos] == 0x21 and data[pos + 1] == 0xFE: # Comment extension
pos += 2
comment = bytearray()
while pos < len(data) and data[pos] != 0:
block_len = data[pos]
pos += 1
comment.extend(data[pos:pos + block_len])
pos += block_len
pos += 1 # Skip terminator
try:
decoded = comment.decode('utf-8', errors='replace')
results['comment_blocks'].append(decoded)
results['found'] = True
results['findings'].append(f'Comment block: {decoded[:100]}')
except:
results['comment_blocks'].append(comment.hex())
continue
pos += 1
# 2. Extract disposal method bits from GCE blocks
pos = 0
while pos < len(data) - 5:
if data[pos] == 0x21 and data[pos + 1] == 0xF9 and data[pos + 2] == 0x04:
packed = data[pos + 3]
disposal = (packed >> 2) & 0x07
results['disposal_methods'].append(disposal)
pos += 6
else:
pos += 1
if len(results['disposal_methods']) > 1:
results['findings'].append(f"Disposal methods: {results['disposal_methods'][:20]}")
# 3. Palette index LSB decode
img = Image.open(io.BytesIO(data))
if img.mode == 'P':
pixel_indices = list(img.getdata())
bits = [idx & 1 for idx in pixel_indices]
if len(bits) >= 32:
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if 0 < length < min(5000, (len(bits) - 32) // 8):
msg_bits = bits[32:32 + length * 8]
msg_bytes = bytearray()
for i in range(0, len(msg_bits), 8):
v = 0
for j in range(8):
if i + j < len(msg_bits):
v = (v << 1) | msg_bits[i + j]
msg_bytes.append(v)
try:
decoded_msg = msg_bytes.decode('utf-8', errors='replace')
results['palette_lsb_decode'] = {
'length': length,
'message': decoded_msg[:200],
'method': 'palette_index_lsb'
}
results['found'] = True
results['findings'].append(f'Palette LSB decode ({length} bytes): {decoded_msg[:50]}')
except:
pass
results['suspicious'] = results['found']
return results
except Exception as e:
return {'error': str(e), 'found': False}
def bmp_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze BMP files for steganography — reserved header fields, trailing data, LSB"""
results = {
'found': False,
'findings': [],
'reserved_bytes': None,
'trailing_data': None,
'lsb_decode': None,
}
if len(data) < 54:
return {'error': 'File too small for BMP', 'found': False}
if data[:2] != b'BM':
return {'error': 'Not a BMP file', 'found': False}
try:
# Check reserved bytes at offset 6-9 (should be zero in clean BMPs)
reserved = data[6:10]
if reserved != b'\x00\x00\x00\x00':
results['reserved_bytes'] = reserved.hex()
results['found'] = True
results['findings'].append(f'Non-zero reserved bytes: {reserved.hex()}')
# Check for trailing data after pixel data
file_size = struct.unpack('<I', data[2:6])[0]
actual_size = len(data)
if actual_size > file_size:
trailing = data[file_size:]
results['trailing_data'] = {
'size': actual_size - file_size,
'preview': trailing[:200].decode('utf-8', errors='replace')
}
results['found'] = True
results['findings'].append(f'Trailing data: {actual_size - file_size} bytes after BMP end')
# LSB decode via PIL
if HAS_PIL:
img = Image.open(io.BytesIO(data)).convert('RGBA')
pixels = list(img.getdata())
bits = []
for r, g, b, a in pixels:
for ch in [r, g, b]:
bits.append(ch & 1)
if len(bits) >= 32:
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if 0 < length < min(5000, (len(bits) - 32) // 8):
msg_bits = bits[32:32 + length * 8]
msg_bytes = bytearray()
for i in range(0, len(msg_bits), 8):
v = 0
for j in range(8):
if i + j < len(msg_bits):
v = (v << 1) | msg_bits[i + j]
msg_bytes.append(v)
try:
decoded = msg_bytes.decode('utf-8', errors='replace')
results['lsb_decode'] = {
'length': length,
'message': decoded[:200],
'method': 'rgb_lsb'
}
results['found'] = True
results['findings'].append(f'LSB decode ({length} bytes): {decoded[:50]}')
except:
pass
results['suspicious'] = results['found']
return results
except Exception as e:
return {'error': str(e), 'found': False}
def png_filter_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze PNG filter bytes for anomalies"""
result = png_parse_chunks(data)
if not result.get('valid'):
return result
# Need to decompress IDAT to get filter bytes
idat_data = b''
ihdr_data = None
for chunk in result['chunks']:
if chunk['type'] == 'IDAT':
offset = chunk['offset']
length = chunk['length']
idat_data += data[offset+8:offset+8+length]
elif chunk['type'] == 'IHDR' and 'parsed' in chunk:
ihdr_data = chunk['parsed']
if not ihdr_data:
return {'error': 'No IHDR chunk found'}
try:
decompressed = zlib.decompress(idat_data)
except:
return {'error': 'Failed to decompress IDAT'}
# Calculate bytes per row
width = ihdr_data['width']
height = ihdr_data['height']
bit_depth = ihdr_data['bit_depth']
color_type = ihdr_data['color_type']
# Samples per pixel based on color type
samples = {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 3)
bytes_per_pixel = max(1, (samples * bit_depth) // 8)
row_bytes = 1 + width * bytes_per_pixel # +1 for filter byte
# Extract filter bytes
filter_bytes = []
for row in range(height):
offset = row * row_bytes
if offset < len(decompressed):
filter_bytes.append(decompressed[offset])
# Analyze filter distribution
filter_counts = {}
for f in filter_bytes:
filter_counts[f] = filter_counts.get(f, 0) + 1
filter_names = {0: 'None', 1: 'Sub', 2: 'Up', 3: 'Average', 4: 'Paeth'}
return {
'found': True,
'row_count': len(filter_bytes),
'filter_distribution': {filter_names.get(k, f'Unknown({k})'): v for k, v in filter_counts.items()},
'unique_filters': len(filter_counts),
'suspicious': 0 in filter_counts and filter_counts[0] > len(filter_bytes) * 0.9,
'interpretation': 'Excessive use of filter 0 (None) may indicate modified image'
}
def png_detect_embedded_png(data: bytes) -> Dict[str, Any]:
"""Detect PNG files embedded within PNG (nested steganography)"""
results = {
'found': False,
'embedded_pngs': []
}
# Look for PNG magic in various locations
search_start = 8 # Skip the outer PNG magic
while True:
pos = data.find(PNG_MAGIC, search_start)
if pos == -1:
break
# Try to parse as PNG
try:
end_pos = data.find(b'IEND', pos)
if end_pos != -1:
# IEND + length (0) + CRC = +8 bytes
end_pos += 12
embedded_size = end_pos - pos
results['embedded_pngs'].append({
'offset': pos,
'size': embedded_size,
'location': 'after_iend' if pos > data.rfind(b'IEND', 0, pos) else 'within_image'
})
results['found'] = True
except:
pass
search_start = pos + 1
results['count'] = len(results['embedded_pngs'])
results['suspicious'] = results['found']
return results
def png_color_histogram_analysis(data: bytes) -> Dict[str, Any]:
"""Analyze color histogram for LSB steganography indicators"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL or numpy not available'}
try:
img = Image.open(io.BytesIO(data))
if img.mode == 'P':
img = img.convert('RGB')
pixels = np.array(img)
results = {}
channel_names = ['Red', 'Green', 'Blue'][:pixels.shape[2] if len(pixels.shape) > 2 else 1]
for ch_idx, ch_name in enumerate(channel_names):
channel = pixels[:, :, ch_idx].flatten()
# Calculate histogram
hist, _ = np.histogram(channel, bins=256, range=(0, 256))
# Pairs of Values (PoV) analysis
# In natural images, adjacent histogram bins have similar counts
# LSB embedding creates anomalies in pairs (2k, 2k+1)
pair_diffs = []
for i in range(0, 256, 2):
if hist[i] + hist[i+1] > 0:
diff = abs(hist[i] - hist[i+1]) / (hist[i] + hist[i+1])
pair_diffs.append(diff)
avg_pair_diff = np.mean(pair_diffs) if pair_diffs else 0
results[ch_name] = {
'unique_values': int(np.sum(hist > 0)),
'avg_pair_difference': float(avg_pair_diff),
'suspicious': avg_pair_diff < 0.05 # Very similar pairs suggest LSB
}
return {
'found': True,
'channels': results,
'suspicious': any(r['suspicious'] for r in results.values()),
'interpretation': 'Similar histogram pair values may indicate LSB steganography'
}
except Exception as e:
return {'error': str(e), 'found': False}
def png_visual_attack(data: bytes) -> Dict[str, Any]:
"""Generate visual attack images for bit plane analysis"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL or numpy not available'}
try:
img = Image.open(io.BytesIO(data))
if img.mode == 'P':
img = img.convert('RGB')
pixels = np.array(img)
# Extract LSB planes and scale to full intensity
lsb_images = {}
channel_names = ['Red', 'Green', 'Blue']
for ch_idx, ch_name in enumerate(channel_names):
if ch_idx < pixels.shape[2]:
# LSB plane scaled to 0 or 255
lsb = (pixels[:, :, ch_idx] & 1) * 255
lsb_images[ch_name] = lsb.tolist() # Can be reconstructed client-side
# Combined RGB LSB
combined = np.zeros_like(pixels)
for ch_idx in range(min(3, pixels.shape[2])):
combined[:, :, ch_idx] = (pixels[:, :, ch_idx] & 1) * 255
return {
'found': True,
'image_size': [int(pixels.shape[1]), int(pixels.shape[0])],
'channel_lsb_available': list(lsb_images.keys()),
'interpretation': 'Visual inspection of LSB planes can reveal hidden patterns'
}
except Exception as e:
return {'error': str(e), 'found': False}
def png_steg_signature_scan(data: bytes) -> Dict[str, Any]:
"""Scan for known steganography tool signatures"""
signatures = {
b'STEG': 'Stegosaurus Wrecks',
b'openstego': 'OpenStego',
b'steghide': 'Steghide',
b'F5': 'F5 Algorithm',
b'jphide': 'JPHide',
b'outguess': 'OutGuess',
b'invisible secrets': 'Invisible Secrets',
b'camouflage': 'Camouflage',
b'snow': 'SNOW',
b'\x00\x00\x00\x01steg': 'Generic Steg Header',
}
found = []
for sig, tool_name in signatures.items():
pos = data.find(sig)
if pos != -1:
found.append({
'signature': sig.hex() if not sig.isascii() else sig.decode('ascii', errors='replace'),
'tool': tool_name,
'offset': pos
})
# Also check LSB extracted data
lsb_result = png_extract_lsb(data, bits=1, channels="RGB")
if lsb_result.get('raw_data'):
lsb_data = lsb_result['raw_data'][:1000]
for sig, tool_name in signatures.items():
if sig in lsb_data:
found.append({
'signature': sig.hex() if not sig.isascii() else sig.decode('ascii', errors='replace'),
'tool': tool_name,
'location': 'LSB_extracted'
})
return {
'found': len(found) > 0,
'signatures': found,
'suspicious': len(found) > 0
}
def png_full_analysis(data: bytes) -> Dict[str, Any]:
"""Run all PNG analysis tools and compile results"""
results = {
'file_type': 'PNG',
'analyses': {}
}
# Run all PNG analysis tools
analyses = [
('chunk_parse', png_parse_chunks),
('text_chunks', png_extract_text_chunks),
('appended_data', png_detect_appended_data),
('idat_analysis', png_analyze_idat),
('chi_square', png_chi_square_analysis),
('bit_planes', png_bit_plane_analysis),
('histogram', png_color_histogram_analysis),
('filter_analysis', png_filter_analysis),
('embedded_png', png_detect_embedded_png),
('steg_signatures', png_steg_signature_scan),
]
suspicious_count = 0
for name, func in analyses:
try:
result = func(data)
results['analyses'][name] = result
if result.get('suspicious'):
suspicious_count += 1
except Exception as e:
results['analyses'][name] = {'error': str(e)}
results['suspicious_indicators'] = suspicious_count
results['overall_suspicious'] = suspicious_count >= 2
results['summary'] = f"Found {suspicious_count} suspicious indicators"
return results
# Register PNG tools
def _register_png_tools():
"""Register all PNG analysis tools with the global registry"""
TOOL_REGISTRY.register('png_parse_chunks', png_parse_chunks)
TOOL_REGISTRY.register('png_extract_text_chunks', png_extract_text_chunks)
TOOL_REGISTRY.register('png_detect_appended_data', png_detect_appended_data)
TOOL_REGISTRY.register('png_analyze_idat', png_analyze_idat)
TOOL_REGISTRY.register('png_extract_lsb', png_extract_lsb)
TOOL_REGISTRY.register('png_chi_square_analysis', png_chi_square_analysis)
TOOL_REGISTRY.register('png_bit_plane_analysis', png_bit_plane_analysis)
TOOL_REGISTRY.register('png_palette_analysis', png_palette_analysis)
TOOL_REGISTRY.register('png_filter_analysis', png_filter_analysis)
TOOL_REGISTRY.register('png_detect_embedded_png', png_detect_embedded_png)
TOOL_REGISTRY.register('png_color_histogram_analysis', png_color_histogram_analysis)
TOOL_REGISTRY.register('png_visual_attack', png_visual_attack)
TOOL_REGISTRY.register('png_steg_signature_scan', png_steg_signature_scan)
TOOL_REGISTRY.register('png_full_analysis', png_full_analysis)
# GIF, BMP, and advanced image analysis
TOOL_REGISTRY.register('gif_analysis', gif_analysis)
TOOL_REGISTRY.register('bmp_analysis', bmp_analysis)
TOOL_REGISTRY.register('detect_pvd_steg', detect_pvd_steg)
TOOL_REGISTRY.register('detect_histogram_shift_steg', detect_histogram_shift_steg)
TOOL_REGISTRY.register('detect_multibit_lsb', detect_multibit_lsb)
# Auto-register on module load
_register_png_tools()
# ============== ADVANCED TEXT STEGANOGRAPHY DETECTION ==============
def detect_homoglyph_steg(data: bytes) -> Dict[str, Any]:
"""Detect Cyrillic/Latin homoglyph substitution steganography."""
results = {'found': False, 'substitutions': 0, 'details': []}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
# Cyrillic chars that look like Latin
CYRILLIC_TO_LATIN = {
'\u0430': 'a', '\u0441': 'c', '\u0435': 'e', '\u043e': 'o',
'\u0440': 'p', '\u0455': 's', '\u0445': 'x', '\u0443': 'y',
'\u0410': 'A', '\u0412': 'B', '\u0421': 'C', '\u0415': 'E',
'\u041d': 'H', '\u041a': 'K', '\u041c': 'M', '\u041e': 'O',
'\u0420': 'P', '\u0422': 'T', '\u0425': 'X',
}
for i, ch in enumerate(text):
if ch in CYRILLIC_TO_LATIN:
results['substitutions'] += 1
if results['substitutions'] <= 5:
results['details'].append({
'offset': i,
'cyrillic': repr(ch),
'looks_like': CYRILLIC_TO_LATIN[ch]
})
if results['substitutions'] > 3:
results['found'] = True
return results
def detect_variation_selector_steg(data: bytes) -> Dict[str, Any]:
"""Detect variation selector steganography."""
results = {'found': False, 'count': 0, 'selectors': []}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
for i, ch in enumerate(text):
if '\uFE00' <= ch <= '\uFE0F':
results['count'] += 1
if results['count'] <= 5:
results['selectors'].append({'offset': i, 'selector': f'VS{ord(ch) - 0xFDFF}'})
if results['count'] > 3:
results['found'] = True
return results
def detect_combining_mark_steg(data: bytes) -> Dict[str, Any]:
"""Detect steganography via invisible combining characters."""
results = {'found': False, 'count': 0, 'marks': []}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
INVISIBLE_COMBINERS = {
'\u034F': 'COMBINING GRAPHEME JOINER',
'\u200D': 'ZERO WIDTH JOINER',
'\u2060': 'WORD JOINER',
'\u2061': 'FUNCTION APPLICATION',
'\u2062': 'INVISIBLE TIMES',
'\u2063': 'INVISIBLE SEPARATOR',
'\u2064': 'INVISIBLE PLUS',
}
for i, ch in enumerate(text):
if ch in INVISIBLE_COMBINERS:
results['count'] += 1
if results['count'] <= 5:
results['marks'].append({
'offset': i,
'name': INVISIBLE_COMBINERS[ch]
})
if results['count'] > 3:
results['found'] = True
return results
def detect_confusable_whitespace(data: bytes) -> Dict[str, Any]:
"""Detect steganography via Unicode whitespace variants."""
results = {'found': False, 'non_standard_spaces': 0, 'types': {}}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
SPACE_VARIANTS = {
'\u00A0': 'NO-BREAK SPACE',
'\u2000': 'EN QUAD',
'\u2001': 'EM QUAD',
'\u2002': 'EN SPACE',
'\u2003': 'EM SPACE',
'\u2004': 'THREE-PER-EM SPACE',
'\u2005': 'FOUR-PER-EM SPACE',
'\u2006': 'SIX-PER-EM SPACE',
'\u2007': 'FIGURE SPACE',
'\u2008': 'PUNCTUATION SPACE',
'\u2009': 'THIN SPACE',
'\u200A': 'HAIR SPACE',
'\u202F': 'NARROW NO-BREAK SPACE',
'\u205F': 'MEDIUM MATHEMATICAL SPACE',
'\u3000': 'IDEOGRAPHIC SPACE',
}
for ch in text:
if ch in SPACE_VARIANTS:
name = SPACE_VARIANTS[ch]
results['non_standard_spaces'] += 1
results['types'][name] = results['types'].get(name, 0) + 1
if results['non_standard_spaces'] > 3:
results['found'] = True
return results
def detect_emoji_steg(data: bytes) -> Dict[str, Any]:
"""Detect emoji substitution steganography patterns."""
results = {'found': False, 'emoji_count': 0, 'pattern_detected': False}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
import unicodedata
emojis = [ch for ch in text if unicodedata.category(ch).startswith(('So', 'Sk'))]
results['emoji_count'] = len(emojis)
# Check for alternating emoji pairs (characteristic of emoji substitution steg)
if len(emojis) > 20:
# Count unique emoji types
unique = len(set(emojis))
if unique <= 20 and len(emojis) > 50:
results['pattern_detected'] = True
results['found'] = True
return results
def detect_capitalization_steg(data: bytes) -> Dict[str, Any]:
"""Detect capitalization encoding steganography."""
results = {'found': False, 'suspicious_caps': 0, 'total_words': 0}
try:
text = data.decode('utf-8', errors='ignore')
except:
return results
words = text.split()
results['total_words'] = len(words)
# Count words with unexpected capitalization (mid-sentence uppercase)
for i, word in enumerate(words):
if word and word[0].isupper() and i > 0:
prev = words[i - 1] if i > 0 else ''
# Not after sentence end
if prev and prev[-1] not in '.!?:':
results['suspicious_caps'] += 1
# High ratio of unexpected caps suggests encoding
if results['total_words'] > 20:
ratio = results['suspicious_caps'] / results['total_words']
if ratio > 0.15: # More than 15% unexpected caps
results['found'] = True
return results
# ============== AUDIO STEGANOGRAPHY ==============
def audio_lsb_decode(data: bytes) -> Dict[str, Any]:
"""Decode LSB steganography from WAV audio files."""
import wave
try:
w = wave.open(io.BytesIO(data))
raw = w.readframes(w.getnframes())
sampwidth = w.getsampwidth()
w.close()
if sampwidth != 2:
return {'found': False, 'reason': f'Sample width {sampwidth} not supported'}
samples = struct.unpack(f'<{len(raw)//2}h', raw)
bits = [s & 1 for s in samples]
if len(bits) < 32:
return {'found': False}
length = 0
for i in range(32):
length = (length << 1) | bits[i]
if length <= 0 or length > min(10000, (len(bits) - 32) // 8):
return {'found': False, 'reason': f'Invalid length: {length}'}
msg = bytearray()
for i in range(0, length * 8, 8):
v = 0
for j in range(8):
if 32 + i + j < len(bits):
v = (v << 1) | bits[32 + i + j]
msg.append(v)
decoded = msg.decode('utf-8', errors='replace')
return {'found': True, 'method': 'audio_lsb', 'length': length,
'message': decoded[:200], 'suspicious': True,
'findings': [f'Audio LSB ({length} bytes): {decoded[:80]}']}
except Exception as e:
return {'error': str(e), 'found': False}
# ============== PCAP / NETWORK PROTOCOL DECODERS ==============
def pcap_decode(data: bytes) -> Dict[str, Any]:
"""Parse PCAP and extract steganographic data from protocol fields."""
results = {'found': False, 'findings': [], 'packets': 0, 'methods': {}}
if len(data) < 24:
return results
magic = data[:4]
if magic == b'\xa1\xb2\xc3\xd4':
endian = '>'
elif magic == b'\xd4\xc3\xb2\xa1':
endian = '<'
else:
return {'found': False, 'reason': 'Not PCAP'}
pos = 24
ttl_bytes = bytearray()
ipid_bytes = bytearray()
win_bytes = bytearray()
urg_bytes = bytearray()
payloads = bytearray()
timestamps = []
import base64, re as _re
while pos + 16 <= len(data):
ts_sec = struct.unpack(f'{endian}I', data[pos:pos+4])[0]
ts_usec = struct.unpack(f'{endian}I', data[pos+4:pos+8])[0]
incl_len = struct.unpack(f'{endian}I', data[pos+8:pos+12])[0]
pos += 16
if pos + incl_len > len(data):
break
pkt = data[pos:pos + incl_len]
results['packets'] += 1
timestamps.append(ts_sec * 1000000 + ts_usec)
if len(pkt) > 34 and pkt[12:14] == b'\x08\x00':
ip_start = 14
ttl_bytes.append(pkt[ip_start + 8])
ipid_bytes.extend(pkt[ip_start + 4:ip_start + 6])
protocol = pkt[ip_start + 9]
ip_hdr_len = (pkt[ip_start] & 0x0F) * 4
if protocol == 17 and len(pkt) > ip_start + ip_hdr_len + 8:
udp_start = ip_start + ip_hdr_len
udp_len = struct.unpack('>H', pkt[udp_start + 4:udp_start + 6])[0]
payloads.extend(pkt[udp_start + 8:udp_start + udp_len])
elif protocol == 6 and len(pkt) > ip_start + ip_hdr_len + 20:
tcp_start = ip_start + ip_hdr_len
win_bytes.extend(pkt[tcp_start + 14:tcp_start + 16])
urg_bytes.extend(pkt[tcp_start + 18:tcp_start + 20])
elif protocol == 1 and len(pkt) > ip_start + ip_hdr_len + 8:
payloads.extend(pkt[ip_start + ip_hdr_len + 8:])
pos += incl_len
def try_decode(raw, name):
try:
text = raw.decode('utf-8', errors='strict')
p = sum(1 for c in text if c.isprintable() or c in '\r\n\t')
if len(text) > 4 and p > len(text) * 0.7:
results['methods'][name] = {'message': text[:200]}
results['found'] = True
results['findings'].append(f'{name}: {text[:60]}')
return
except: pass
try:
text = raw.decode('ascii', errors='ignore')
for m in _re.finditer(r'[A-Za-z0-9+/]{16,}={0,2}', text):
d = base64.b64decode(m.group()).decode('utf-8', errors='strict')
if len(d) > 4:
results['methods'][name + '_b64'] = {'message': d[:200]}
results['found'] = True
results['findings'].append(f'{name} (b64): {d[:60]}')
return
except: pass
try:
clean = ''.join(c for c in raw.decode('ascii', errors='ignore').upper()
if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
if len(clean) > 10:
d = base64.b32decode(clean + '=' * ((8 - len(clean) % 8) % 8)).decode('utf-8', errors='strict')
if len(d) > 4:
results['methods'][name + '_b32'] = {'message': d[:200]}
results['found'] = True
results['findings'].append(f'{name} (b32): {d[:60]}')
except: pass
if payloads: try_decode(bytes(payloads), 'payload')
if ttl_bytes: try_decode(bytes(ttl_bytes), 'ip_ttl')
if ipid_bytes: try_decode(bytes(ipid_bytes), 'ip_id')
if win_bytes: try_decode(bytes(win_bytes), 'tcp_window')
if urg_bytes: try_decode(bytes(urg_bytes), 'tcp_urgent')
# Covert timing
if len(timestamps) > 16:
delays = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
median = sorted(delays)[len(delays)//2]
tbits = ['1' if d > median else '0' for d in delays]
if len(tbits) >= 16:
tlen = int(''.join(tbits[:16]), 2)
if 0 < tlen < min(500, (len(tbits) - 16) // 8):
tb = bytearray()
for i in range(16, 16 + tlen * 8, 8):
if i + 8 <= len(tbits):
tb.append(int(''.join(tbits[i:i+8]), 2))
try:
t = tb.decode('utf-8', errors='strict')
if sum(1 for c in t if c.isprintable()) > len(t) * 0.7:
results['methods']['covert_timing'] = {'message': t[:200]}
results['found'] = True
results['findings'].append(f'Timing ({tlen}b): {t[:60]}')
except: pass
if b'HTTP/' in payloads:
results['findings'].append('HTTP traffic detected')
results['found'] = True
results['suspicious'] = results['found']
return results
# ============== ARCHIVE DECODERS ==============
def zip_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from ZIP — comments, nested ZIPs, trailing data."""
import zipfile
results = {'found': False, 'findings': []}
try:
zf = zipfile.ZipFile(io.BytesIO(data))
if zf.comment:
results['comment'] = zf.comment.decode('utf-8', errors='replace')[:200]
results['found'] = True
results['findings'].append(f'ZIP comment: {results["comment"][:60]}')
for name in zf.namelist():
if any(s in name.lower() for s in ['secret', 'hidden', 'steg', 'flag', 'inner.zip']):
content = zf.read(name)
if content[:2] == b'PK':
inner = zipfile.ZipFile(io.BytesIO(content))
for iname in inner.namelist():
ic = inner.read(iname).decode('utf-8', errors='replace')
results['findings'].append(f'Nested {iname}: {ic[:100]}')
results['found'] = True
inner.close()
else:
results['findings'].append(f'{name}: {content.decode("utf-8", errors="replace")[:100]}')
results['found'] = True
zf.close()
eocd = data.rfind(b'PK\x05\x06')
if eocd >= 0:
eocd_size = 22 + struct.unpack('<H', data[eocd+20:eocd+22])[0]
if eocd + eocd_size < len(data):
trailing = data[eocd + eocd_size:]
results['findings'].append(f'Trailing ({len(trailing)}b): {trailing.decode("utf-8", errors="replace")[:60]}')
results['found'] = True
except Exception as e:
results['error'] = str(e)
results['suspicious'] = results['found']
return results
def tar_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from TAR — PAX headers, file contents."""
import tarfile
results = {'found': False, 'findings': []}
try:
tf = tarfile.open(fileobj=io.BytesIO(data))
# Note: we only READ members, never extract to filesystem — no path traversal risk
for member in tf.getmembers():
if hasattr(member, 'pax_headers') and member.pax_headers:
for k, v in member.pax_headers.items():
results['findings'].append(f'PAX {k}: {str(v)[:100]}')
results['found'] = True
if member.isfile():
f = tf.extractfile(member)
if f:
results['findings'].append(f'{member.name}: {f.read(200).decode("utf-8", errors="replace")[:100]}')
tf.close()
except Exception as e:
results['error'] = str(e)
results['suspicious'] = results['found']
return results
def gzip_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from GZip — FEXTRA, FCOMMENT fields."""
results = {'found': False, 'findings': []}
if len(data) < 10 or data[:2] != b'\x1f\x8b':
return results
flags = data[3]
pos = 10
if flags & 0x04 and pos + 2 <= len(data):
xlen = struct.unpack('<H', data[pos:pos+2])[0]
pos += 2
extra = data[pos:pos+xlen]
results['findings'].append(f'FEXTRA ({xlen}b): {extra.decode("utf-8", errors="replace")[:60]}')
results['found'] = True
pos += xlen
if flags & 0x08:
end = data.index(0, pos); pos = end + 1
if flags & 0x10:
end = data.index(0, pos)
comment = data[pos:end].decode('utf-8', errors='replace')
results['findings'].append(f'FCOMMENT: {comment[:60]}')
results['found'] = True
results['suspicious'] = results['found']
return results
def sqlite_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from SQLite — hidden tables."""
import sqlite3, tempfile, os
results = {'found': False, 'findings': []}
try:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
tmp.write(data); tmp.close()
conn = sqlite3.connect(tmp.name)
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [r[0] for r in c.fetchall()]
results['tables'] = tables
for table in tables:
if any(s in table.lower() for s in ['steg', 'hidden', 'secret', 'payload', '_steg']):
c.execute(f'SELECT * FROM "{table}" LIMIT 10')
for row in c.fetchall():
results['findings'].append(f'{table}: {" | ".join(str(v)[:80] for v in row)[:150]}')
results['found'] = True
conn.close(); os.unlink(tmp.name)
except Exception as e:
results['error'] = str(e)
results['suspicious'] = results['found']
return results
# ============== DOCUMENT DECODERS ==============
def pdf_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from PDF — JS, forms, XMP, trailing data."""
import re as _re, base64
results = {'found': False, 'findings': []}
if not data.startswith(b'%PDF'):
return results
text = data.decode('latin-1', errors='replace')
if '/JavaScript' in text or '/JS ' in text:
results['findings'].append('JavaScript detected')
results['found'] = True
for m in _re.finditer(r'/JS\s*\(([^)]+)\)', text):
results['findings'].append(f'JS: {m.group(1)[:80]}')
if '/AcroForm' in text:
for m in _re.finditer(r'/V\s*\(([^)]+)\)', text):
results['findings'].append(f'Form: {m.group(1)[:80]}')
results['found'] = True
eof = data.rfind(b'%%EOF')
if eof >= 0:
trailing = data[eof + 5:].strip()
if trailing:
results['findings'].append(f'Post-EOF ({len(trailing)}b): {trailing.decode("utf-8", errors="replace")[:80]}')
results['found'] = True
xmp = data.find(b'<x:xmpmeta')
if xmp >= 0:
xmp_end = data.find(b'</x:xmpmeta>', xmp)
if xmp_end >= 0:
xmp_data = data[xmp:xmp_end+12].decode('utf-8', errors='replace')
for m in _re.finditer(r'<dc:description>([^<]+)</dc:description>', xmp_data):
results['findings'].append(f'XMP desc: {m.group(1)[:80]}')
results['found'] = True
results['suspicious'] = results['found']
return results
def jpeg_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from JPEG — COM markers, APP segments."""
results = {'found': False, 'findings': []}
if len(data) < 2 or data[:2] != b'\xFF\xD8':
return results
pos = 2
while pos < len(data) - 4:
if data[pos] != 0xFF:
pos += 1; continue
marker = data[pos + 1]
if marker == 0xFE: # COM
length = struct.unpack('>H', data[pos+2:pos+4])[0]
comment = data[pos+4:pos+2+length].decode('utf-8', errors='replace')
results['findings'].append(f'COM: {comment[:100]}')
results['found'] = True
pos += 2 + length
elif 0xE0 <= marker <= 0xEF:
length = struct.unpack('>H', data[pos+2:pos+4])[0]
if marker not in (0xE0, 0xE1):
seg = data[pos+4:pos+2+length]
text = seg.decode('utf-8', errors='replace')
if any(s in text.lower() for s in ['st3gg', 'steg', 'secret']):
results['findings'].append(f'APP{marker-0xE0}: {text[:80]}')
results['found'] = True
pos += 2 + length
elif marker in (0xDA, 0xD9):
break
else:
try:
length = struct.unpack('>H', data[pos+2:pos+4])[0]
pos += 2 + length
except:
break
results['suspicious'] = results['found']
return results
def svg_decode(data: bytes) -> Dict[str, Any]:
"""Extract steg data from SVG — comments, data attributes, metadata."""
import re as _re
results = {'found': False, 'findings': []}
try:
text = data.decode('utf-8', errors='replace')
for m in _re.finditer(r'<!--(.*?)-->', text, _re.DOTALL):
c = m.group(1).strip()
if len(c) > 5:
results['findings'].append(f'Comment: {c[:80]}')
results['found'] = True
for m in _re.finditer(r'data-\w+="([^"]*)"', text):
results['findings'].append(f'Data attr: {m.group(1)[:80]}')
results['found'] = True
meta = text.find('<metadata')
if meta >= 0:
meta_end = text.find('</metadata>', meta)
if meta_end >= 0:
for m in _re.finditer(r'<dc:description>([^<]+)</dc:description>', text[meta:meta_end]):
results['findings'].append(f'Description: {m.group(1)[:80]}')
results['found'] = True
except Exception as e:
results['error'] = str(e)
results['suspicious'] = results['found']
return results
# ============== GENERIC IMAGE LSB ==============
def generic_image_lsb_decode(data: bytes) -> Dict[str, Any]:
"""Decode LSB from any PIL-supported image (TIFF, PPM, PGM, ICO, WebP, etc).
Handles grayscale (L), palette (P), RGB, and RGBA modes.
Tries both 32-bit and 16-bit length prefixes for small images (ICO).
"""
if not HAS_PIL:
return {'error': 'PIL not available', 'found': False}
try:
img = Image.open(io.BytesIO(data))
fmt = img.format or 'unknown'
# Extract bits based on image mode
if img.mode == 'P':
pixels = list(img.getdata())
bits = [p & 1 for p in pixels]
elif img.mode in ('L', 'LA'):
# Grayscale: 1 channel
if img.mode == 'LA':
img = img.convert('L')
pixels = list(img.getdata())
bits = [p & 1 for p in pixels]
else:
img = img.convert('RGBA')
pixels = list(img.getdata())
bits = []
for r, g, b, a in pixels:
for ch in [r, g, b]:
bits.append(ch & 1)
if len(bits) < 16:
return {'found': False}
# Try both 32-bit and 16-bit length prefixes
for prefix_bits in [32, 16]:
if len(bits) < prefix_bits:
continue
length = 0
for i in range(prefix_bits):
length = (length << 1) | bits[i]
if length <= 0 or length > min(10000, (len(bits) - prefix_bits) // 8):
continue
msg = bytearray()
for i in range(0, length * 8, 8):
v = 0
for j in range(8):
if prefix_bits + i + j < len(bits):
v = (v << 1) | bits[prefix_bits + i + j]
msg.append(v)
decoded = msg.decode('utf-8', errors='replace')
printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t')
if printable > len(decoded) * 0.5:
return {'found': True, 'format': fmt, 'method': 'image_lsb',
'length': length, 'prefix_bits': prefix_bits,
'message': decoded[:200], 'suspicious': True,
'findings': [f'{fmt} LSB ({length}b, {prefix_bits}b prefix): {decoded[:80]}']}
return {'found': False, 'format': fmt}
except Exception as e:
return {'error': str(e), 'found': False}
# ============== TEXT TECHNIQUE DECODERS ==============
def decode_braille(data: bytes) -> Dict[str, Any]:
"""Decode Braille pattern steganography (U+2800 block)."""
try:
text = data.decode('utf-8')
braille = [c for c in text if 0x2800 <= ord(c) <= 0x28FF]
if len(braille) < 4:
return {'found': False}
decoded = bytes(ord(c) - 0x2800 for c in braille).decode('utf-8', errors='replace')
return {'found': True, 'method': 'braille', 'length': len(braille),
'message': decoded[:200],
'findings': [f'Braille ({len(braille)} chars): {decoded[:80]}']}
except Exception as e:
return {'error': str(e), 'found': False}
def decode_directional_override(data: bytes) -> Dict[str, Any]:
"""Decode directional override steganography (RLO=1, LRO=0)."""
try:
text = data.decode('utf-8')
bits = []
for ch in text:
if ch == '\u202E': bits.append('1')
elif ch == '\u202D': bits.append('0')
if len(bits) < 16:
return {'found': False}
length = int(''.join(bits[:16]), 2)
if length <= 0 or length > (len(bits) - 16) // 8:
return {'found': False}
msg = bytearray()
for i in range(16, 16 + length * 8, 8):
if i + 8 <= len(bits):
msg.append(int(''.join(bits[i:i+8]), 2))
decoded = msg.decode('utf-8', errors='replace')
return {'found': True, 'method': 'directional', 'length': length,
'message': decoded[:200],
'findings': [f'Bidi decode ({length}b): {decoded[:80]}']}
except Exception as e:
return {'error': str(e), 'found': False}
def decode_hangul_filler(data: bytes) -> Dict[str, Any]:
"""Decode Hangul filler steganography (U+3164=1, space=0).
Handles partial messages where cover text has fewer spaces than payload needs.
"""
try:
text = data.decode('utf-8')
hf_count = text.count('\u3164')
if hf_count == 0:
return {'found': False}
bits = []
for ch in text:
if ch == '\u3164': bits.append('1')
elif ch == ' ': bits.append('0')
if len(bits) < 16:
return {'found': False}
length = int(''.join(bits[:16]), 2)
if length <= 0 or length > 5000:
return {'found': False}
# Decode as many bytes as we have bits for (may be partial)
available_bytes = (len(bits) - 16) // 8
decode_bytes = min(length, available_bytes)
msg = bytearray()
for i in range(16, 16 + decode_bytes * 8, 8):
if i + 8 <= len(bits):
msg.append(int(''.join(bits[i:i+8]), 2))
decoded = msg.decode('utf-8', errors='replace')
partial = decode_bytes < length
return {'found': True, 'method': 'hangul_filler',
'length': length, 'decoded_bytes': decode_bytes,
'partial': partial, 'message': decoded[:200],
'findings': [f'Hangul ({decode_bytes}/{length}b{"*" if partial else ""}): {decoded[:80]}']}
except Exception as e:
return {'error': str(e), 'found': False}
def decode_math_alphanumeric(data: bytes) -> Dict[str, Any]:
"""Decode math bold substitution (bold=1, normal=0)."""
try:
text = data.decode('utf-8')
bits = []
for ch in text:
o = ord(ch)
if 0x1D400 <= o <= 0x1D419 or 0x1D41A <= o <= 0x1D433:
bits.append('1')
elif ch.isascii() and ch.isalpha():
bits.append('0')
if len(bits) < 16:
return {'found': False}
length = int(''.join(bits[:16]), 2)
if length <= 0 or length > (len(bits) - 16) // 8:
return {'found': False}
msg = bytearray()
for i in range(16, 16 + length * 8, 8):
if i + 8 <= len(bits):
msg.append(int(''.join(bits[i:i+8]), 2))
decoded = msg.decode('utf-8', errors='replace')
return {'found': True, 'method': 'math_alpha', 'length': length,
'message': decoded[:200],
'findings': [f'Math alpha ({length}b): {decoded[:80]}']}
except Exception as e:
return {'error': str(e), 'found': False}
def decode_emoji_skin_tone(data: bytes) -> Dict[str, Any]:
"""Decode emoji skin tone steganography (4 tones = 2 bits each)."""
try:
text = data.decode('utf-8')
TONES = {'\U0001F3FB': 0, '\U0001F3FC': 1, '\U0001F3FE': 2, '\U0001F3FF': 3}
pairs = [TONES[c] for c in text if c in TONES]
if len(pairs) < 4:
return {'found': False}
msg = bytearray()
for i in range(0, len(pairs) - 3, 4):
msg.append((pairs[i] << 6) | (pairs[i+1] << 4) | (pairs[i+2] << 2) | pairs[i+3])
decoded = msg.decode('utf-8', errors='replace')
printable = sum(1 for c in decoded if c.isprintable())
if printable > len(decoded) * 0.5:
return {'found': True, 'method': 'emoji_skin_tone', 'length': len(msg),
'message': decoded[:200],
'findings': [f'Skin tone ({len(msg)}b): {decoded[:80]}']}
return {'found': False}
except Exception as e:
return {'error': str(e), 'found': False}
# ============== ADVANCED STEGANALYSIS ==============
def rs_analysis(data: bytes) -> Dict[str, Any]:
"""RS (Regular-Singular) Analysis — gold standard for LSB detection.
Divides pixels into pairs and measures how LSB flipping affects smoothness.
Clean images: flipping increases/decreases regularity equally.
Stego images: balance is skewed because LSBs already carry data.
More accurate than chi-square for low embedding rates.
"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL/numpy required', 'found': False}
try:
img = Image.open(io.BytesIO(data)).convert('RGB')
pixels = np.array(img, dtype=np.int16)
results = {}
for ch_idx, ch_name in enumerate(['Red', 'Green', 'Blue']):
ch = pixels[:, :, ch_idx].flatten()
n = len(ch) // 2
p1, p2 = ch[:n*2:2], ch[1:n*2:2]
d_orig = float(np.mean(np.abs(p1 - p2)))
d_flip = float(np.mean(np.abs((p1 ^ 1) - p2)))
rs_ratio = d_flip / d_orig if d_orig > 0 else 1.0
est_rate = max(0, min(1, (rs_ratio - 1.0) * 2))
results[ch_name] = {
'smoothness_original': round(d_orig, 4),
'smoothness_flipped': round(d_flip, 4),
'rs_ratio': round(rs_ratio, 4),
'estimated_embedding_rate': round(est_rate, 4),
'suspicious': rs_ratio > 1.02 or est_rate > 0.05,
}
rate = max(r['estimated_embedding_rate'] for r in results.values())
return {
'found': True, 'channels': results,
'overall_embedding_rate': round(rate, 4),
'suspicious': any(r['suspicious'] for r in results.values()),
'interpretation': f"RS analysis: {rate:.1%} estimated embedding. " + (
"HIGH probability of LSB steg." if rate > 0.1
else "MODERATE indicators." if rate > 0.03
else "LOW — likely clean."),
'method': 'rs_analysis'
}
except Exception as e:
return {'error': str(e), 'found': False}
def sample_pairs_analysis(data: bytes) -> Dict[str, Any]:
"""Sample Pairs Analysis (SPA) — detects LSB by pixel pair statistics.
Examines how adjacent pixel pairs relate when LSBs are considered.
Clean images have predictable pair-type ratios. LSB embedding disrupts them.
Complementary to RS analysis — catches different patterns.
"""
if not HAS_PIL or not HAS_NUMPY:
return {'error': 'PIL/numpy required', 'found': False}
try:
img = Image.open(io.BytesIO(data)).convert('RGB')
pixels = np.array(img, dtype=np.int16)
results = {}
for ch_idx, ch_name in enumerate(['Red', 'Green', 'Blue']):
ch = pixels[:, :, ch_idx].flatten()
n = len(ch) - 1
p1, p2 = ch[:n], ch[1:n+1]
h1, h2 = p1 >> 1, p2 >> 1
x = int(np.sum(h1 == h2))
y = int(np.sum(np.abs(h1 - h2) == 1))
total = float(n)
x_r, y_r = x/total, y/total
spa = abs(x_r - y_r) / (x_r + y_r) if (x_r + y_r) > 0 else 0
est = max(0, min(1, 1.0 - spa * 3))
results[ch_name] = {
'x_pairs': x, 'y_pairs': y, 'z_pairs': n - x - y,
'spa_ratio': round(spa, 4),
'estimated_embedding_rate': round(est, 4),
'suspicious': spa < 0.1,
}
rate = max(r['estimated_embedding_rate'] for r in results.values())
return {
'found': True, 'channels': results,
'overall_embedding_rate': round(rate, 4),
'suspicious': any(r['suspicious'] for r in results.values()),
'interpretation': f"SPA: {rate:.1%} estimated embedding. " + (
"HIGH probability." if rate > 0.5
else "MODERATE." if rate > 0.2
else "LOW."),
'method': 'sample_pairs_analysis'
}
except Exception as e:
return {'error': str(e), 'found': False}
# ============== REGISTER ALL TOOLS ==============
def _register_all_tools():
"""Register ALL analysis and decode tools."""
TOOL_REGISTRY.register('detect_homoglyph_steg', detect_homoglyph_steg)
TOOL_REGISTRY.register('detect_variation_selector_steg', detect_variation_selector_steg)
TOOL_REGISTRY.register('detect_combining_mark_steg', detect_combining_mark_steg)
TOOL_REGISTRY.register('detect_confusable_whitespace', detect_confusable_whitespace)
TOOL_REGISTRY.register('detect_emoji_steg', detect_emoji_steg)
TOOL_REGISTRY.register('detect_capitalization_steg', detect_capitalization_steg)
# Advanced steganalysis
TOOL_REGISTRY.register('rs_analysis', rs_analysis)
TOOL_REGISTRY.register('sample_pairs_analysis', sample_pairs_analysis)
TOOL_REGISTRY.register('audio_lsb_decode', audio_lsb_decode)
TOOL_REGISTRY.register('pcap_decode', pcap_decode)
TOOL_REGISTRY.register('zip_decode', zip_decode)
TOOL_REGISTRY.register('tar_decode', tar_decode)
TOOL_REGISTRY.register('gzip_decode', gzip_decode)
TOOL_REGISTRY.register('sqlite_decode', sqlite_decode)
TOOL_REGISTRY.register('pdf_decode', pdf_decode)
TOOL_REGISTRY.register('jpeg_decode', jpeg_decode)
TOOL_REGISTRY.register('svg_decode', svg_decode)
TOOL_REGISTRY.register('generic_image_lsb_decode', generic_image_lsb_decode)
TOOL_REGISTRY.register('decode_braille', decode_braille)
TOOL_REGISTRY.register('decode_directional_override', decode_directional_override)
TOOL_REGISTRY.register('decode_hangul_filler', decode_hangul_filler)
TOOL_REGISTRY.register('decode_math_alphanumeric', decode_math_alphanumeric)
TOOL_REGISTRY.register('decode_emoji_skin_tone', decode_emoji_skin_tone)
_register_all_tools()