mirror of
https://github.com/elder-plinius/STEGOSAURUS-WRECKS.git
synced 2026-05-01 00:27:55 +02:00
1666 lines
52 KiB
Python
1666 lines
52 KiB
Python
"""
|
|
STEGOSAURUS WRECKS - Comprehensive Analysis Tools
|
|
Complete toolkit for steganography detection across all file types
|
|
|
|
This module provides 264+ analysis functions covering:
|
|
- Images: PNG, JPEG, GIF, BMP, WebP, TIFF, ICO, HEIC, AVIF, SVG
|
|
- Audio: WAV, MP3, FLAC, OGG
|
|
- Video: AVI, MKV
|
|
- Documents: PDF, Office
|
|
- Archives: ZIP, RAR
|
|
- Fonts: TTF, OTF, WOFF
|
|
"""
|
|
|
|
import struct
|
|
import zlib
|
|
import io
|
|
import re
|
|
import json
|
|
import hashlib
|
|
import binascii
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple, Union, BinaryIO
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import math
|
|
|
|
# Optional imports - gracefully handle missing dependencies
|
|
try:
|
|
import numpy as np
|
|
HAS_NUMPY = True
|
|
except ImportError:
|
|
HAS_NUMPY = False
|
|
|
|
try:
|
|
from PIL import Image, ExifTags
|
|
HAS_PIL = True
|
|
except ImportError:
|
|
HAS_PIL = False
|
|
|
|
|
|
# ============== CORE INFRASTRUCTURE ==============
|
|
|
|
@dataclass
|
|
class AnalysisResult:
|
|
"""Standard result format for all analysis functions"""
|
|
success: bool
|
|
action: str
|
|
file_type: str
|
|
data: Dict[str, Any] = field(default_factory=dict)
|
|
findings: List[str] = field(default_factory=list)
|
|
suspicious: bool = False
|
|
confidence: float = 0.0
|
|
raw_data: Optional[bytes] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"success": self.success,
|
|
"action": self.action,
|
|
"file_type": self.file_type,
|
|
"data": self.data,
|
|
"findings": self.findings,
|
|
"suspicious": self.suspicious,
|
|
"confidence": self.confidence,
|
|
"has_raw_data": self.raw_data is not None,
|
|
"error": self.error
|
|
}
|
|
|
|
|
|
class FileType(Enum):
|
|
PNG = "png"
|
|
JPEG = "jpeg"
|
|
GIF = "gif"
|
|
BMP = "bmp"
|
|
WEBP = "webp"
|
|
TIFF = "tiff"
|
|
ICO = "ico"
|
|
HEIC = "heic"
|
|
AVIF = "avif"
|
|
SVG = "svg"
|
|
WAV = "wav"
|
|
MP3 = "mp3"
|
|
FLAC = "flac"
|
|
OGG = "ogg"
|
|
AVI = "avi"
|
|
MKV = "mkv"
|
|
PDF = "pdf"
|
|
OFFICE = "office"
|
|
ZIP = "zip"
|
|
RAR = "rar"
|
|
FONT = "font"
|
|
AIFF = "aiff"
|
|
AU = "au"
|
|
MIDI = "midi"
|
|
PCAP = "pcap"
|
|
SQLITE = "sqlite"
|
|
GZIP = "gzip"
|
|
TAR = "tar"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
# Magic bytes for file type detection
|
|
MAGIC_SIGNATURES = {
|
|
b'\x89PNG\r\n\x1a\n': FileType.PNG,
|
|
b'\xff\xd8\xff': FileType.JPEG,
|
|
b'GIF87a': FileType.GIF,
|
|
b'GIF89a': FileType.GIF,
|
|
b'BM': FileType.BMP,
|
|
b'RIFF': FileType.WAV, # Could also be AVI - check further
|
|
b'\xff\xfb': FileType.MP3,
|
|
b'\xff\xfa': FileType.MP3,
|
|
b'\xff\xf3': FileType.MP3,
|
|
b'\xff\xf2': FileType.MP3,
|
|
b'ID3': FileType.MP3,
|
|
b'fLaC': FileType.FLAC,
|
|
b'OggS': FileType.OGG,
|
|
b'%PDF': FileType.PDF,
|
|
b'PK\x03\x04': FileType.ZIP, # Could be Office - check further
|
|
b'Rar!\x1a\x07': FileType.RAR,
|
|
b'\x1aE\xdf\xa3': FileType.MKV,
|
|
b'\x00\x00\x01\x00': FileType.ICO,
|
|
b'\x00\x00\x02\x00': FileType.ICO, # CUR format
|
|
b'\x1f\x8b': FileType.GZIP,
|
|
b'MThd': FileType.MIDI,
|
|
b'.snd': FileType.AU,
|
|
b'\xa1\xb2\xc3\xd4': FileType.PCAP,
|
|
b'\xd4\xc3\xb2\xa1': FileType.PCAP, # Little-endian PCAP
|
|
b'SQLite format 3': FileType.SQLITE,
|
|
}
|
|
|
|
WEBP_SIGNATURES = [b'WEBP']
|
|
HEIC_SIGNATURES = [b'ftyp', b'heic', b'heix', b'hevc', b'mif1']
|
|
AVIF_SIGNATURES = [b'ftypavif', b'ftypavis']
|
|
|
|
|
|
def detect_file_type(data: bytes) -> FileType:
|
|
"""Detect file type from magic bytes"""
|
|
if len(data) < 12:
|
|
return FileType.UNKNOWN
|
|
|
|
# Check standard signatures
|
|
for magic, ftype in MAGIC_SIGNATURES.items():
|
|
if data.startswith(magic):
|
|
# Special handling for RIFF container
|
|
if magic == b'RIFF' and len(data) >= 12:
|
|
if data[8:12] == b'WAVE':
|
|
return FileType.WAV
|
|
elif data[8:12] == b'AVI ':
|
|
return FileType.AVI
|
|
elif data[8:12] == b'WEBP':
|
|
return FileType.WEBP
|
|
# Special handling for ZIP-based formats
|
|
elif magic == b'PK\x03\x04':
|
|
# Check if it's an Office document
|
|
if b'[Content_Types].xml' in data[:2000] or b'word/' in data[:2000] or b'xl/' in data[:2000] or b'ppt/' in data[:2000]:
|
|
return FileType.OFFICE
|
|
return FileType.ZIP
|
|
return ftype
|
|
|
|
# Check for HEIC/AVIF (ftyp box)
|
|
if len(data) >= 12 and data[4:8] == b'ftyp':
|
|
brand = data[8:12]
|
|
if brand in [b'heic', b'heix', b'hevc', b'mif1']:
|
|
return FileType.HEIC
|
|
elif brand in [b'avif', b'avis']:
|
|
return FileType.AVIF
|
|
|
|
# Check for TIFF (II = little-endian, MM = big-endian)
|
|
if data[:4] in [b'II\x2a\x00', b'MM\x00\x2a']:
|
|
return FileType.TIFF
|
|
|
|
# Check for AIFF (FORM container with AIFF type)
|
|
if data[:4] == b'FORM' and len(data) >= 12:
|
|
if data[8:12] == b'AIFF' or data[8:12] == b'AIFC':
|
|
return FileType.AIFF
|
|
|
|
# Check for TAR (magic at offset 257)
|
|
if len(data) >= 265 and data[257:262] == b'ustar':
|
|
return FileType.TAR
|
|
|
|
# Check for SVG
|
|
if b'<svg' in data[:1000] or b'<?xml' in data[:100] and b'<svg' in data[:2000]:
|
|
return FileType.SVG
|
|
|
|
# Check for fonts
|
|
if data[:4] in [b'\x00\x01\x00\x00', b'OTTO', b'true', b'typ1']:
|
|
return FileType.FONT
|
|
if data[:4] == b'wOFF' or data[:4] == b'wOF2':
|
|
return FileType.FONT
|
|
|
|
return FileType.UNKNOWN
|
|
|
|
|
|
def calculate_entropy(data: bytes) -> float:
|
|
"""Calculate Shannon entropy of data"""
|
|
if not data:
|
|
return 0.0
|
|
|
|
byte_counts = [0] * 256
|
|
for byte in data:
|
|
byte_counts[byte] += 1
|
|
|
|
length = len(data)
|
|
entropy = 0.0
|
|
for count in byte_counts:
|
|
if count > 0:
|
|
p = count / length
|
|
entropy -= p * math.log2(p)
|
|
|
|
return entropy
|
|
|
|
|
|
def calculate_chi_square(data: bytes) -> float:
|
|
"""Calculate chi-square statistic for randomness test"""
|
|
if not data:
|
|
return 0.0
|
|
|
|
byte_counts = [0] * 256
|
|
for byte in data:
|
|
byte_counts[byte] += 1
|
|
|
|
expected = len(data) / 256
|
|
chi_square = sum((count - expected) ** 2 / expected for count in byte_counts)
|
|
return chi_square
|
|
|
|
|
|
def find_strings(data: bytes, min_length: int = 4) -> List[Tuple[int, str]]:
|
|
"""Extract printable ASCII strings from binary data"""
|
|
strings = []
|
|
current = []
|
|
start_offset = 0
|
|
|
|
for i, byte in enumerate(data):
|
|
if 32 <= byte < 127:
|
|
if not current:
|
|
start_offset = i
|
|
current.append(chr(byte))
|
|
else:
|
|
if len(current) >= min_length:
|
|
strings.append((start_offset, ''.join(current)))
|
|
current = []
|
|
|
|
if len(current) >= min_length:
|
|
strings.append((start_offset, ''.join(current)))
|
|
|
|
return strings
|
|
|
|
|
|
def hex_dump(data: bytes, offset: int = 0, length: int = 256) -> str:
|
|
"""Create hex dump of data"""
|
|
result = []
|
|
chunk = data[offset:offset + length]
|
|
|
|
for i in range(0, len(chunk), 16):
|
|
line_data = chunk[i:i + 16]
|
|
hex_part = ' '.join(f'{b:02x}' for b in line_data)
|
|
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in line_data)
|
|
result.append(f'{offset + i:08x} {hex_part:<48} {ascii_part}')
|
|
|
|
return '\n'.join(result)
|
|
|
|
|
|
# ============== BIT PLANE ANALYSIS ==============
|
|
|
|
def extract_bit_plane(data: bytes, bit: int) -> bytes:
|
|
"""Extract specific bit plane from data"""
|
|
if not HAS_NUMPY:
|
|
# Fallback without numpy
|
|
result = bytearray()
|
|
for i in range(0, len(data), 8):
|
|
byte_val = 0
|
|
for j in range(8):
|
|
if i + j < len(data):
|
|
byte_val |= ((data[i + j] >> bit) & 1) << (7 - j)
|
|
result.append(byte_val)
|
|
return bytes(result)
|
|
|
|
arr = np.frombuffer(data, dtype=np.uint8)
|
|
plane = (arr >> bit) & 1
|
|
# Pack bits into bytes
|
|
padded = np.pad(plane, (0, (8 - len(plane) % 8) % 8), mode='constant')
|
|
packed = np.packbits(padded)
|
|
return packed.tobytes()
|
|
|
|
|
|
def analyze_bit_planes(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze all 8 bit planes"""
|
|
results = {}
|
|
for bit in range(8):
|
|
plane_data = extract_bit_plane(data, bit)
|
|
results[f'plane_{bit}'] = {
|
|
'entropy': calculate_entropy(plane_data),
|
|
'unique_bytes': len(set(plane_data)),
|
|
'sample': plane_data[:64].hex()
|
|
}
|
|
return results
|
|
|
|
|
|
# ============== LSB EXTRACTION ==============
|
|
|
|
def extract_lsb(data: bytes, bits: int = 1, channels: str = "RGB") -> bytes:
|
|
"""Extract LSB data from raw pixel bytes"""
|
|
if not data:
|
|
return b''
|
|
|
|
extracted_bits = []
|
|
mask = (1 << bits) - 1
|
|
|
|
for byte in data:
|
|
for bit_pos in range(bits):
|
|
extracted_bits.append((byte >> bit_pos) & 1)
|
|
|
|
# Pack bits into bytes
|
|
result = bytearray()
|
|
for i in range(0, len(extracted_bits), 8):
|
|
byte_val = 0
|
|
for j in range(8):
|
|
if i + j < len(extracted_bits):
|
|
byte_val |= extracted_bits[i + j] << j
|
|
result.append(byte_val)
|
|
|
|
return bytes(result)
|
|
|
|
|
|
# ============== PATTERN DETECTION ==============
|
|
|
|
def detect_repeated_patterns(data: bytes, min_length: int = 4, max_length: int = 32) -> List[Dict[str, Any]]:
|
|
"""Detect repeated byte patterns"""
|
|
patterns = []
|
|
|
|
for length in range(min_length, min(max_length, len(data) // 2) + 1):
|
|
seen = {}
|
|
for i in range(len(data) - length + 1):
|
|
pattern = data[i:i + length]
|
|
if pattern in seen:
|
|
seen[pattern].append(i)
|
|
else:
|
|
seen[pattern] = [i]
|
|
|
|
for pattern, offsets in seen.items():
|
|
if len(offsets) >= 3: # At least 3 occurrences
|
|
patterns.append({
|
|
'pattern': pattern.hex(),
|
|
'length': length,
|
|
'count': len(offsets),
|
|
'offsets': offsets[:10] # First 10 offsets
|
|
})
|
|
|
|
return sorted(patterns, key=lambda x: x['count'], reverse=True)[:20]
|
|
|
|
|
|
def detect_xor_patterns(data: bytes) -> Dict[str, Any]:
|
|
"""Detect potential XOR encryption patterns"""
|
|
results = {
|
|
'single_byte_keys': [],
|
|
'repeating_key_likely': False,
|
|
'key_length_candidates': []
|
|
}
|
|
|
|
# Try single-byte XOR keys
|
|
for key in range(256):
|
|
decoded = bytes(b ^ key for b in data[:256])
|
|
# Check if result looks like text
|
|
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
|
|
if printable > len(decoded) * 0.7:
|
|
results['single_byte_keys'].append({
|
|
'key': key,
|
|
'key_hex': f'{key:02x}',
|
|
'printable_ratio': printable / len(decoded),
|
|
'sample': decoded[:50].decode('ascii', errors='replace')
|
|
})
|
|
|
|
# Detect repeating key by looking at byte frequency at intervals
|
|
for key_len in range(2, 17):
|
|
columns = [[] for _ in range(key_len)]
|
|
for i, b in enumerate(data[:1024]):
|
|
columns[i % key_len].append(b)
|
|
|
|
# Check if each column has low entropy (single-byte XOR characteristic)
|
|
avg_entropy = sum(calculate_entropy(bytes(col)) for col in columns) / key_len
|
|
if avg_entropy < 5.0: # Lower than random
|
|
results['key_length_candidates'].append({
|
|
'length': key_len,
|
|
'avg_column_entropy': avg_entropy
|
|
})
|
|
|
|
if results['key_length_candidates']:
|
|
results['repeating_key_likely'] = True
|
|
|
|
return results
|
|
|
|
|
|
# ============== ENCODING DETECTION ==============
|
|
|
|
def detect_base64(data: bytes) -> Dict[str, Any]:
|
|
"""Detect and decode potential Base64 encoded content"""
|
|
results = {
|
|
'found': False,
|
|
'segments': []
|
|
}
|
|
|
|
# Base64 pattern
|
|
b64_pattern = rb'[A-Za-z0-9+/]{20,}={0,2}'
|
|
|
|
text = data.decode('ascii', errors='ignore')
|
|
matches = re.finditer(r'[A-Za-z0-9+/]{20,}={0,2}', text)
|
|
|
|
for match in matches:
|
|
b64_str = match.group()
|
|
try:
|
|
# Try to decode
|
|
import base64
|
|
decoded = base64.b64decode(b64_str)
|
|
|
|
# Check if decoded content is meaningful
|
|
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
|
|
|
|
results['segments'].append({
|
|
'offset': match.start(),
|
|
'length': len(b64_str),
|
|
'decoded_length': len(decoded),
|
|
'printable_ratio': printable / len(decoded) if decoded else 0,
|
|
'decoded_preview': decoded[:100].decode('utf-8', errors='replace') if printable > len(decoded) * 0.5 else decoded[:50].hex()
|
|
})
|
|
results['found'] = True
|
|
except:
|
|
pass
|
|
|
|
return results
|
|
|
|
|
|
def detect_hex_strings(data: bytes) -> Dict[str, Any]:
|
|
"""Detect hex-encoded strings"""
|
|
results = {
|
|
'found': False,
|
|
'segments': []
|
|
}
|
|
|
|
text = data.decode('ascii', errors='ignore')
|
|
# Match continuous hex strings
|
|
hex_pattern = r'(?:[0-9a-fA-F]{2}){8,}'
|
|
|
|
for match in re.finditer(hex_pattern, text):
|
|
hex_str = match.group()
|
|
try:
|
|
decoded = bytes.fromhex(hex_str)
|
|
printable = sum(1 for b in decoded if 32 <= b < 127 or b in [9, 10, 13])
|
|
|
|
results['segments'].append({
|
|
'offset': match.start(),
|
|
'length': len(hex_str),
|
|
'decoded_length': len(decoded),
|
|
'printable_ratio': printable / len(decoded) if decoded else 0,
|
|
'decoded_preview': decoded[:100].decode('utf-8', errors='replace') if printable > len(decoded) * 0.5 else None
|
|
})
|
|
results['found'] = True
|
|
except:
|
|
pass
|
|
|
|
return results
|
|
|
|
|
|
def detect_unicode_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect Unicode-based steganography (zero-width chars, homoglyphs)"""
|
|
results = {
|
|
'found': False,
|
|
'zero_width_chars': [],
|
|
'homoglyphs': [],
|
|
'invisible_chars': 0
|
|
}
|
|
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
# Zero-width characters
|
|
zwc_chars = {
|
|
'\u200b': 'ZERO WIDTH SPACE',
|
|
'\u200c': 'ZERO WIDTH NON-JOINER',
|
|
'\u200d': 'ZERO WIDTH JOINER',
|
|
'\u2060': 'WORD JOINER',
|
|
'\ufeff': 'ZERO WIDTH NO-BREAK SPACE (BOM)',
|
|
'\u180e': 'MONGOLIAN VOWEL SEPARATOR',
|
|
}
|
|
|
|
for char, name in zwc_chars.items():
|
|
count = text.count(char)
|
|
if count > 0:
|
|
results['zero_width_chars'].append({
|
|
'char': repr(char),
|
|
'name': name,
|
|
'count': count
|
|
})
|
|
results['invisible_chars'] += count
|
|
results['found'] = True
|
|
|
|
# Check for variation selectors
|
|
for i, char in enumerate(text):
|
|
if '\ufe00' <= char <= '\ufe0f':
|
|
results['invisible_chars'] += 1
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_whitespace_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect whitespace steganography (tabs/spaces encoding)"""
|
|
results = {
|
|
'found': False,
|
|
'trailing_spaces': 0,
|
|
'mixed_indentation': False,
|
|
'suspicious_patterns': [],
|
|
'potential_message': None
|
|
}
|
|
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
lines = text.split('\n')
|
|
tab_indent_lines = 0
|
|
space_indent_lines = 0
|
|
|
|
for line in lines:
|
|
# Count trailing whitespace
|
|
stripped = line.rstrip()
|
|
trailing = len(line) - len(stripped)
|
|
if trailing > 0:
|
|
results['trailing_spaces'] += trailing
|
|
|
|
# Check indentation type
|
|
if line.startswith('\t'):
|
|
tab_indent_lines += 1
|
|
elif line.startswith(' '):
|
|
space_indent_lines += 1
|
|
|
|
if tab_indent_lines > 0 and space_indent_lines > 0:
|
|
results['mixed_indentation'] = True
|
|
|
|
if results['trailing_spaces'] > 10:
|
|
results['found'] = True
|
|
|
|
# Try to decode as binary (space=0, tab=1 or similar)
|
|
bits = []
|
|
for line in lines:
|
|
trailing = line[len(line.rstrip()):]
|
|
for char in trailing:
|
|
if char == ' ':
|
|
bits.append('0')
|
|
elif char == '\t':
|
|
bits.append('1')
|
|
|
|
if len(bits) >= 8:
|
|
try:
|
|
bit_string = ''.join(bits)
|
|
message = bytearray()
|
|
for i in range(0, len(bit_string) - 7, 8):
|
|
byte_val = int(bit_string[i:i+8], 2)
|
|
if byte_val == 0:
|
|
break
|
|
message.append(byte_val)
|
|
|
|
decoded = bytes(message).decode('utf-8', errors='ignore')
|
|
if decoded and all(32 <= ord(c) < 127 or c in '\r\n\t' for c in decoded):
|
|
results['potential_message'] = decoded[:200]
|
|
except:
|
|
pass
|
|
|
|
return results
|
|
|
|
|
|
# ============== TOOL REGISTRY ==============
|
|
|
|
class AnalysisToolRegistry:
|
|
"""Registry of all analysis tools organized by action name"""
|
|
|
|
def __init__(self):
|
|
self._tools: Dict[str, callable] = {}
|
|
self._register_all_tools()
|
|
|
|
def _register_all_tools(self):
|
|
"""Register all analysis tools"""
|
|
# Core detection tools
|
|
self._tools['detect_base64'] = detect_base64
|
|
self._tools['detect_hex_strings'] = detect_hex_strings
|
|
self._tools['detect_unicode_steg'] = detect_unicode_steg
|
|
self._tools['detect_whitespace_steg'] = detect_whitespace_steg
|
|
self._tools['detect_xor_patterns'] = detect_xor_patterns
|
|
self._tools['detect_repeated_patterns'] = detect_repeated_patterns
|
|
|
|
# Analysis tools
|
|
self._tools['analyze_entropy'] = lambda data: {'entropy': calculate_entropy(data)}
|
|
self._tools['analyze_bit_planes'] = analyze_bit_planes
|
|
|
|
# Will be populated by format-specific modules
|
|
|
|
def register(self, action: str, func: callable):
|
|
"""Register a tool function"""
|
|
self._tools[action] = func
|
|
|
|
def get(self, action: str) -> Optional[callable]:
|
|
"""Get a tool function by action name"""
|
|
return self._tools.get(action)
|
|
|
|
def execute(self, action: str, data: bytes, **kwargs) -> AnalysisResult:
|
|
"""Execute an analysis tool"""
|
|
func = self._tools.get(action)
|
|
if not func:
|
|
return AnalysisResult(
|
|
success=False,
|
|
action=action,
|
|
file_type="unknown",
|
|
error=f"Unknown action: {action}"
|
|
)
|
|
|
|
try:
|
|
result = func(data, **kwargs)
|
|
|
|
# Convert result to AnalysisResult if needed
|
|
if isinstance(result, AnalysisResult):
|
|
return result
|
|
elif isinstance(result, dict):
|
|
return AnalysisResult(
|
|
success=True,
|
|
action=action,
|
|
file_type=kwargs.get('file_type', 'unknown'),
|
|
data=result,
|
|
suspicious=result.get('found', False) or result.get('suspicious', False)
|
|
)
|
|
else:
|
|
return AnalysisResult(
|
|
success=True,
|
|
action=action,
|
|
file_type=kwargs.get('file_type', 'unknown'),
|
|
data={'result': result}
|
|
)
|
|
except Exception as e:
|
|
return AnalysisResult(
|
|
success=False,
|
|
action=action,
|
|
file_type=kwargs.get('file_type', 'unknown'),
|
|
error=str(e)
|
|
)
|
|
|
|
def list_tools(self) -> List[str]:
|
|
"""List all registered tools"""
|
|
return sorted(self._tools.keys())
|
|
|
|
|
|
# Global registry instance
|
|
TOOL_REGISTRY = AnalysisToolRegistry()
|
|
|
|
|
|
def execute_action(action: str, data: bytes, **kwargs) -> AnalysisResult:
|
|
"""Execute an analysis action"""
|
|
return TOOL_REGISTRY.execute(action, data, **kwargs)
|
|
|
|
|
|
def list_available_tools() -> List[str]:
|
|
"""List all available analysis tools"""
|
|
return TOOL_REGISTRY.list_tools()
|
|
|
|
|
|
# ============== PNG ANALYSIS TOOLS ==============
|
|
|
|
PNG_MAGIC = b'\x89PNG\r\n\x1a\n'
|
|
|
|
PNG_CHUNK_TYPES = {
|
|
'IHDR': 'Image header',
|
|
'PLTE': 'Palette',
|
|
'IDAT': 'Image data',
|
|
'IEND': 'Image end',
|
|
'tEXt': 'Textual data',
|
|
'zTXt': 'Compressed textual data',
|
|
'iTXt': 'International textual data',
|
|
'bKGD': 'Background color',
|
|
'cHRM': 'Primary chromaticities',
|
|
'gAMA': 'Gamma',
|
|
'hIST': 'Palette histogram',
|
|
'iCCP': 'ICC profile',
|
|
'pHYs': 'Physical pixel dimensions',
|
|
'sBIT': 'Significant bits',
|
|
'sPLT': 'Suggested palette',
|
|
'sRGB': 'Standard RGB color space',
|
|
'tIME': 'Last modification time',
|
|
'tRNS': 'Transparency',
|
|
'eXIf': 'EXIF data',
|
|
'acTL': 'Animation control (APNG)',
|
|
'fcTL': 'Frame control (APNG)',
|
|
'fdAT': 'Frame data (APNG)',
|
|
}
|
|
|
|
|
|
def png_parse_chunks(data: bytes) -> Dict[str, Any]:
|
|
"""Parse all PNG chunks and return detailed information"""
|
|
if not data.startswith(PNG_MAGIC):
|
|
return {'error': 'Not a valid PNG file', 'valid': False}
|
|
|
|
chunks = []
|
|
pos = 8 # Skip magic bytes
|
|
total_idat_size = 0
|
|
chunk_type_counts = {}
|
|
|
|
while pos < len(data):
|
|
if pos + 8 > len(data):
|
|
break
|
|
|
|
chunk_length = struct.unpack('>I', data[pos:pos+4])[0]
|
|
chunk_type = data[pos+4:pos+8].decode('ascii', errors='replace')
|
|
|
|
if pos + 12 + chunk_length > len(data):
|
|
chunks.append({
|
|
'type': chunk_type,
|
|
'offset': pos,
|
|
'length': chunk_length,
|
|
'error': 'Truncated chunk'
|
|
})
|
|
break
|
|
|
|
chunk_data = data[pos+8:pos+8+chunk_length]
|
|
stored_crc = struct.unpack('>I', data[pos+8+chunk_length:pos+12+chunk_length])[0]
|
|
calculated_crc = zlib.crc32(data[pos+4:pos+8+chunk_length]) & 0xffffffff
|
|
|
|
chunk_info = {
|
|
'type': chunk_type,
|
|
'description': PNG_CHUNK_TYPES.get(chunk_type, 'Unknown/Private'),
|
|
'offset': pos,
|
|
'length': chunk_length,
|
|
'crc_valid': stored_crc == calculated_crc,
|
|
'crc_stored': f'{stored_crc:08x}',
|
|
'crc_calculated': f'{calculated_crc:08x}',
|
|
}
|
|
|
|
# Track chunk type counts
|
|
chunk_type_counts[chunk_type] = chunk_type_counts.get(chunk_type, 0) + 1
|
|
|
|
# Track IDAT size
|
|
if chunk_type == 'IDAT':
|
|
total_idat_size += chunk_length
|
|
|
|
# Parse IHDR
|
|
if chunk_type == 'IHDR' and chunk_length == 13:
|
|
width, height, bit_depth, color_type, compression, filter_method, interlace = struct.unpack('>IIBBBBB', chunk_data)
|
|
chunk_info['parsed'] = {
|
|
'width': width,
|
|
'height': height,
|
|
'bit_depth': bit_depth,
|
|
'color_type': color_type,
|
|
'compression': compression,
|
|
'filter': filter_method,
|
|
'interlace': interlace
|
|
}
|
|
|
|
# Parse text chunks
|
|
elif chunk_type == 'tEXt':
|
|
null_pos = chunk_data.find(b'\x00')
|
|
if null_pos != -1:
|
|
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
|
|
text = chunk_data[null_pos+1:].decode('latin-1', errors='replace')
|
|
chunk_info['parsed'] = {'keyword': keyword, 'text': text[:500]}
|
|
|
|
elif chunk_type == 'zTXt':
|
|
null_pos = chunk_data.find(b'\x00')
|
|
if null_pos != -1:
|
|
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
|
|
try:
|
|
text = zlib.decompress(chunk_data[null_pos+2:]).decode('latin-1', errors='replace')
|
|
chunk_info['parsed'] = {'keyword': keyword, 'text': text[:500], 'compressed': True}
|
|
except:
|
|
chunk_info['parsed'] = {'keyword': keyword, 'error': 'Decompression failed'}
|
|
|
|
elif chunk_type == 'iTXt':
|
|
null_pos = chunk_data.find(b'\x00')
|
|
if null_pos != -1:
|
|
keyword = chunk_data[:null_pos].decode('latin-1', errors='replace')
|
|
chunk_info['parsed'] = {'keyword': keyword}
|
|
|
|
# Parse tIME
|
|
elif chunk_type == 'tIME' and chunk_length == 7:
|
|
year, month, day, hour, minute, second = struct.unpack('>HBBBBB', chunk_data)
|
|
chunk_info['parsed'] = {
|
|
'timestamp': f'{year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}'
|
|
}
|
|
|
|
# Parse pHYs
|
|
elif chunk_type == 'pHYs' and chunk_length == 9:
|
|
ppux, ppuy, unit = struct.unpack('>IIB', chunk_data)
|
|
chunk_info['parsed'] = {
|
|
'pixels_per_unit_x': ppux,
|
|
'pixels_per_unit_y': ppuy,
|
|
'unit': 'meter' if unit == 1 else 'unknown'
|
|
}
|
|
|
|
chunks.append(chunk_info)
|
|
pos += 12 + chunk_length
|
|
|
|
if chunk_type == 'IEND':
|
|
break
|
|
|
|
# Check for data after IEND
|
|
after_iend = len(data) - pos
|
|
|
|
return {
|
|
'valid': True,
|
|
'chunks': chunks,
|
|
'chunk_count': len(chunks),
|
|
'chunk_type_counts': chunk_type_counts,
|
|
'total_idat_size': total_idat_size,
|
|
'data_after_iend': after_iend,
|
|
'suspicious': after_iend > 0
|
|
}
|
|
|
|
|
|
def png_extract_text_chunks(data: bytes) -> Dict[str, Any]:
|
|
"""Extract all text metadata from PNG"""
|
|
result = png_parse_chunks(data)
|
|
if not result.get('valid'):
|
|
return result
|
|
|
|
text_chunks = []
|
|
for chunk in result['chunks']:
|
|
if chunk['type'] in ('tEXt', 'zTXt', 'iTXt') and 'parsed' in chunk:
|
|
text_chunks.append({
|
|
'type': chunk['type'],
|
|
'keyword': chunk['parsed'].get('keyword', ''),
|
|
'text': chunk['parsed'].get('text', ''),
|
|
'offset': chunk['offset']
|
|
})
|
|
|
|
return {
|
|
'found': len(text_chunks) > 0,
|
|
'text_chunks': text_chunks,
|
|
'count': len(text_chunks)
|
|
}
|
|
|
|
|
|
def png_detect_appended_data(data: bytes) -> Dict[str, Any]:
|
|
"""Detect data appended after PNG IEND chunk"""
|
|
if not data.startswith(PNG_MAGIC):
|
|
return {'found': False, 'error': 'Not a valid PNG file'}
|
|
|
|
# Parse through PNG chunks to find actual IEND position
|
|
pos = 8 # Skip magic
|
|
iend_end_pos = None
|
|
|
|
while pos + 8 <= len(data):
|
|
chunk_length = struct.unpack('>I', data[pos:pos+4])[0]
|
|
chunk_type = data[pos+4:pos+8]
|
|
|
|
# Chunk end = pos + 4 (length) + 4 (type) + chunk_length + 4 (CRC)
|
|
chunk_end_pos = pos + 12 + chunk_length
|
|
|
|
if chunk_type == b'IEND':
|
|
iend_end_pos = chunk_end_pos
|
|
break
|
|
|
|
pos = chunk_end_pos
|
|
|
|
if iend_end_pos is None:
|
|
return {'found': False, 'error': 'No IEND chunk found'}
|
|
|
|
if iend_end_pos >= len(data):
|
|
return {'found': False, 'appended_size': 0}
|
|
|
|
appended_data = data[iend_end_pos:]
|
|
|
|
if len(appended_data) == 0:
|
|
return {'found': False, 'appended_size': 0}
|
|
|
|
# Analyze appended data
|
|
result = {
|
|
'found': True,
|
|
'appended_size': len(appended_data),
|
|
'offset': iend_end_pos,
|
|
'entropy': calculate_entropy(appended_data),
|
|
'preview_hex': appended_data[:64].hex(),
|
|
'suspicious': True
|
|
}
|
|
|
|
# Check if appended data is another file
|
|
file_type = detect_file_type(appended_data)
|
|
if file_type != FileType.UNKNOWN:
|
|
result['embedded_file_type'] = file_type.value
|
|
|
|
# Check for printable text
|
|
try:
|
|
text = appended_data[:200].decode('utf-8')
|
|
if all(c.isprintable() or c in '\r\n\t' for c in text):
|
|
result['text_preview'] = text
|
|
except:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def png_analyze_idat(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze PNG IDAT chunks for anomalies"""
|
|
result = png_parse_chunks(data)
|
|
if not result.get('valid'):
|
|
return result
|
|
|
|
idat_chunks = []
|
|
prev_end = 0
|
|
|
|
for chunk in result['chunks']:
|
|
if chunk['type'] == 'IDAT':
|
|
idat_chunks.append({
|
|
'offset': chunk['offset'],
|
|
'length': chunk['length'],
|
|
'crc_valid': chunk['crc_valid']
|
|
})
|
|
|
|
# Check for gap between IDAT chunks
|
|
if prev_end > 0 and chunk['offset'] != prev_end:
|
|
gap = chunk['offset'] - prev_end
|
|
if gap > 12: # More than just the next chunk header
|
|
idat_chunks[-1]['gap_before'] = gap
|
|
|
|
prev_end = chunk['offset'] + 12 + chunk['length']
|
|
|
|
if not idat_chunks:
|
|
return {'found': False, 'error': 'No IDAT chunks found'}
|
|
|
|
total_size = sum(c['length'] for c in idat_chunks)
|
|
sizes = [c['length'] for c in idat_chunks]
|
|
|
|
return {
|
|
'found': True,
|
|
'chunk_count': len(idat_chunks),
|
|
'total_size': total_size,
|
|
'chunks': idat_chunks,
|
|
'size_variance': max(sizes) - min(sizes) if len(sizes) > 1 else 0,
|
|
'avg_chunk_size': total_size // len(idat_chunks),
|
|
'all_crc_valid': all(c['crc_valid'] for c in idat_chunks),
|
|
'suspicious': any('gap_before' in c for c in idat_chunks)
|
|
}
|
|
|
|
|
|
def png_extract_lsb(data: bytes, bits: int = 1, channels: str = "RGB") -> Dict[str, Any]:
|
|
"""Extract LSB data from PNG image pixels"""
|
|
if not HAS_PIL:
|
|
return {'error': 'PIL not available', 'found': False}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
# Convert to RGBA for consistent processing
|
|
if img.mode == 'P':
|
|
img = img.convert('RGBA')
|
|
elif img.mode == 'L':
|
|
img = img.convert('RGB')
|
|
elif img.mode not in ('RGB', 'RGBA'):
|
|
img = img.convert('RGBA')
|
|
|
|
pixels = list(img.getdata())
|
|
|
|
# Extract bits from specified channels
|
|
channel_map = {'R': 0, 'G': 1, 'B': 2, 'A': 3}
|
|
channel_indices = [channel_map[c] for c in channels.upper() if c in channel_map]
|
|
|
|
extracted_bits = []
|
|
mask = (1 << bits) - 1
|
|
|
|
for pixel in pixels:
|
|
for ch_idx in channel_indices:
|
|
if ch_idx < len(pixel):
|
|
for bit_pos in range(bits):
|
|
extracted_bits.append((pixel[ch_idx] >> bit_pos) & 1)
|
|
|
|
# Pack into bytes
|
|
result_bytes = bytearray()
|
|
for i in range(0, len(extracted_bits) - 7, 8):
|
|
byte_val = 0
|
|
for j in range(8):
|
|
byte_val |= extracted_bits[i + j] << j
|
|
result_bytes.append(byte_val)
|
|
|
|
raw_data = bytes(result_bytes)
|
|
|
|
# Look for patterns
|
|
result = {
|
|
'found': True,
|
|
'extracted_size': len(raw_data),
|
|
'channels': channels,
|
|
'bits_per_channel': bits,
|
|
'entropy': calculate_entropy(raw_data[:1024]),
|
|
'raw_data': raw_data
|
|
}
|
|
|
|
# Check for STEG magic
|
|
if raw_data[:4] == b'STEG':
|
|
result['steg_header_found'] = True
|
|
result['suspicious'] = True
|
|
|
|
# Check for file signatures
|
|
file_type = detect_file_type(raw_data)
|
|
if file_type != FileType.UNKNOWN:
|
|
result['embedded_file_type'] = file_type.value
|
|
result['suspicious'] = True
|
|
|
|
# Check for readable text
|
|
try:
|
|
text = raw_data[:100].decode('utf-8')
|
|
printable = sum(1 for c in text if c.isprintable() or c in '\r\n\t')
|
|
if printable > len(text) * 0.7:
|
|
result['text_preview'] = text
|
|
result['suspicious'] = True
|
|
except:
|
|
pass
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_chi_square_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Chi-square analysis to detect LSB manipulation"""
|
|
if not HAS_PIL or not HAS_NUMPY:
|
|
return {'error': 'PIL or numpy not available'}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
if img.mode == 'P':
|
|
img = img.convert('RGB')
|
|
elif img.mode == 'L':
|
|
img = img.convert('RGB')
|
|
elif img.mode not in ('RGB', 'RGBA'):
|
|
img = img.convert('RGB')
|
|
|
|
pixels = np.array(img)
|
|
results = {}
|
|
|
|
# Analyze each channel
|
|
channel_names = ['Red', 'Green', 'Blue', 'Alpha']
|
|
for ch_idx in range(min(pixels.shape[2], 4)):
|
|
channel = pixels[:, :, ch_idx].flatten()
|
|
|
|
# Pair analysis: count pairs (2k, 2k+1)
|
|
pairs = np.zeros(128)
|
|
for val in channel:
|
|
pair_idx = val // 2
|
|
if pair_idx < 128:
|
|
pairs[pair_idx] += 1
|
|
|
|
# Expected distribution
|
|
total = len(channel)
|
|
expected = total / 128
|
|
|
|
# Chi-square for pairs
|
|
chi_sq = sum((pairs[i] - expected) ** 2 / expected for i in range(128) if expected > 0)
|
|
|
|
# Also analyze bit plane
|
|
lsb_plane = channel & 1
|
|
ones = np.sum(lsb_plane)
|
|
zeros = total - ones
|
|
expected_ones = total / 2
|
|
lsb_chi_sq = ((ones - expected_ones) ** 2 / expected_ones +
|
|
(zeros - expected_ones) ** 2 / expected_ones)
|
|
|
|
results[channel_names[ch_idx]] = {
|
|
'chi_square_pairs': float(chi_sq),
|
|
'chi_square_lsb': float(lsb_chi_sq),
|
|
'lsb_ones_ratio': float(ones / total),
|
|
'suspicious': lsb_chi_sq > 3.84 # 95% confidence threshold
|
|
}
|
|
|
|
overall_suspicious = any(r['suspicious'] for r in results.values())
|
|
|
|
return {
|
|
'found': True,
|
|
'channels': results,
|
|
'suspicious': overall_suspicious,
|
|
'interpretation': 'Low chi-square LSB values may indicate LSB steganography' if overall_suspicious else 'No strong LSB manipulation detected'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_bit_plane_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze individual bit planes of PNG image"""
|
|
if not HAS_PIL or not HAS_NUMPY:
|
|
return {'error': 'PIL or numpy not available'}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
if img.mode == 'P':
|
|
img = img.convert('RGB')
|
|
|
|
pixels = np.array(img)
|
|
results = {}
|
|
|
|
channel_names = ['Red', 'Green', 'Blue', 'Alpha'][:pixels.shape[2] if len(pixels.shape) > 2 else 1]
|
|
|
|
if len(pixels.shape) == 2: # Grayscale
|
|
pixels = pixels.reshape(pixels.shape[0], pixels.shape[1], 1)
|
|
channel_names = ['Gray']
|
|
|
|
for ch_idx, ch_name in enumerate(channel_names):
|
|
channel = pixels[:, :, ch_idx]
|
|
planes = {}
|
|
|
|
for bit in range(8):
|
|
plane = (channel >> bit) & 1
|
|
|
|
# Calculate entropy of bit plane
|
|
plane_bytes = np.packbits(plane.flatten())
|
|
entropy = calculate_entropy(plane_bytes.tobytes())
|
|
|
|
# Calculate percentage of 1s
|
|
ones_pct = np.mean(plane) * 100
|
|
|
|
planes[f'bit_{bit}'] = {
|
|
'entropy': float(entropy),
|
|
'ones_percentage': float(ones_pct),
|
|
'suspicious': bit < 2 and (entropy > 7.5 or abs(ones_pct - 50) < 1)
|
|
}
|
|
|
|
results[ch_name] = planes
|
|
|
|
# Determine overall suspicion
|
|
suspicious = any(
|
|
plane['suspicious']
|
|
for channel_planes in results.values()
|
|
for plane in channel_planes.values()
|
|
)
|
|
|
|
return {
|
|
'found': True,
|
|
'channels': results,
|
|
'suspicious': suspicious,
|
|
'interpretation': 'High entropy in lower bit planes may indicate hidden data'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_palette_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze PNG palette for steganography indicators"""
|
|
if not HAS_PIL:
|
|
return {'error': 'PIL not available'}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
if img.mode != 'P':
|
|
return {'found': False, 'reason': 'Image is not palette-based'}
|
|
|
|
palette = img.getpalette()
|
|
if not palette:
|
|
return {'found': False, 'reason': 'No palette found'}
|
|
|
|
# Palette is RGB triplets
|
|
colors = []
|
|
for i in range(0, len(palette), 3):
|
|
colors.append((palette[i], palette[i+1], palette[i+2]))
|
|
|
|
# Check for sorted palette (common steg indicator)
|
|
is_sorted = colors == sorted(colors)
|
|
|
|
# Check for near-duplicate colors (LSB differences only)
|
|
near_duplicates = []
|
|
for i, c1 in enumerate(colors):
|
|
for j, c2 in enumerate(colors[i+1:], i+1):
|
|
diff = sum(abs(a - b) for a, b in zip(c1, c2))
|
|
if 0 < diff <= 3: # Very similar colors
|
|
near_duplicates.append((i, j, diff))
|
|
|
|
# Check color distribution
|
|
histogram = img.histogram()
|
|
used_colors = sum(1 for h in histogram[:256] if h > 0)
|
|
|
|
return {
|
|
'found': True,
|
|
'palette_size': len(colors),
|
|
'used_colors': used_colors,
|
|
'is_sorted': is_sorted,
|
|
'near_duplicate_pairs': len(near_duplicates),
|
|
'near_duplicates': near_duplicates[:10], # First 10
|
|
'suspicious': len(near_duplicates) > 5 or is_sorted,
|
|
'interpretation': 'Sorted palette or many near-duplicates may indicate palette-based steganography'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_filter_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze PNG filter bytes for anomalies"""
|
|
result = png_parse_chunks(data)
|
|
if not result.get('valid'):
|
|
return result
|
|
|
|
# Need to decompress IDAT to get filter bytes
|
|
idat_data = b''
|
|
ihdr_data = None
|
|
|
|
for chunk in result['chunks']:
|
|
if chunk['type'] == 'IDAT':
|
|
offset = chunk['offset']
|
|
length = chunk['length']
|
|
idat_data += data[offset+8:offset+8+length]
|
|
elif chunk['type'] == 'IHDR' and 'parsed' in chunk:
|
|
ihdr_data = chunk['parsed']
|
|
|
|
if not ihdr_data:
|
|
return {'error': 'No IHDR chunk found'}
|
|
|
|
try:
|
|
decompressed = zlib.decompress(idat_data)
|
|
except:
|
|
return {'error': 'Failed to decompress IDAT'}
|
|
|
|
# Calculate bytes per row
|
|
width = ihdr_data['width']
|
|
height = ihdr_data['height']
|
|
bit_depth = ihdr_data['bit_depth']
|
|
color_type = ihdr_data['color_type']
|
|
|
|
# Samples per pixel based on color type
|
|
samples = {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 3)
|
|
bytes_per_pixel = max(1, (samples * bit_depth) // 8)
|
|
row_bytes = 1 + width * bytes_per_pixel # +1 for filter byte
|
|
|
|
# Extract filter bytes
|
|
filter_bytes = []
|
|
for row in range(height):
|
|
offset = row * row_bytes
|
|
if offset < len(decompressed):
|
|
filter_bytes.append(decompressed[offset])
|
|
|
|
# Analyze filter distribution
|
|
filter_counts = {}
|
|
for f in filter_bytes:
|
|
filter_counts[f] = filter_counts.get(f, 0) + 1
|
|
|
|
filter_names = {0: 'None', 1: 'Sub', 2: 'Up', 3: 'Average', 4: 'Paeth'}
|
|
|
|
return {
|
|
'found': True,
|
|
'row_count': len(filter_bytes),
|
|
'filter_distribution': {filter_names.get(k, f'Unknown({k})'): v for k, v in filter_counts.items()},
|
|
'unique_filters': len(filter_counts),
|
|
'suspicious': 0 in filter_counts and filter_counts[0] > len(filter_bytes) * 0.9,
|
|
'interpretation': 'Excessive use of filter 0 (None) may indicate modified image'
|
|
}
|
|
|
|
|
|
def png_detect_embedded_png(data: bytes) -> Dict[str, Any]:
|
|
"""Detect PNG files embedded within PNG (nested steganography)"""
|
|
results = {
|
|
'found': False,
|
|
'embedded_pngs': []
|
|
}
|
|
|
|
# Look for PNG magic in various locations
|
|
search_start = 8 # Skip the outer PNG magic
|
|
|
|
while True:
|
|
pos = data.find(PNG_MAGIC, search_start)
|
|
if pos == -1:
|
|
break
|
|
|
|
# Try to parse as PNG
|
|
try:
|
|
end_pos = data.find(b'IEND', pos)
|
|
if end_pos != -1:
|
|
# IEND + length (0) + CRC = +8 bytes
|
|
end_pos += 12
|
|
embedded_size = end_pos - pos
|
|
|
|
results['embedded_pngs'].append({
|
|
'offset': pos,
|
|
'size': embedded_size,
|
|
'location': 'after_iend' if pos > data.rfind(b'IEND', 0, pos) else 'within_image'
|
|
})
|
|
results['found'] = True
|
|
except:
|
|
pass
|
|
|
|
search_start = pos + 1
|
|
|
|
results['count'] = len(results['embedded_pngs'])
|
|
results['suspicious'] = results['found']
|
|
|
|
return results
|
|
|
|
|
|
def png_color_histogram_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Analyze color histogram for LSB steganography indicators"""
|
|
if not HAS_PIL or not HAS_NUMPY:
|
|
return {'error': 'PIL or numpy not available'}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
if img.mode == 'P':
|
|
img = img.convert('RGB')
|
|
|
|
pixels = np.array(img)
|
|
results = {}
|
|
|
|
channel_names = ['Red', 'Green', 'Blue'][:pixels.shape[2] if len(pixels.shape) > 2 else 1]
|
|
|
|
for ch_idx, ch_name in enumerate(channel_names):
|
|
channel = pixels[:, :, ch_idx].flatten()
|
|
|
|
# Calculate histogram
|
|
hist, _ = np.histogram(channel, bins=256, range=(0, 256))
|
|
|
|
# Pairs of Values (PoV) analysis
|
|
# In natural images, adjacent histogram bins have similar counts
|
|
# LSB embedding creates anomalies in pairs (2k, 2k+1)
|
|
pair_diffs = []
|
|
for i in range(0, 256, 2):
|
|
if hist[i] + hist[i+1] > 0:
|
|
diff = abs(hist[i] - hist[i+1]) / (hist[i] + hist[i+1])
|
|
pair_diffs.append(diff)
|
|
|
|
avg_pair_diff = np.mean(pair_diffs) if pair_diffs else 0
|
|
|
|
results[ch_name] = {
|
|
'unique_values': int(np.sum(hist > 0)),
|
|
'avg_pair_difference': float(avg_pair_diff),
|
|
'suspicious': avg_pair_diff < 0.05 # Very similar pairs suggest LSB
|
|
}
|
|
|
|
return {
|
|
'found': True,
|
|
'channels': results,
|
|
'suspicious': any(r['suspicious'] for r in results.values()),
|
|
'interpretation': 'Similar histogram pair values may indicate LSB steganography'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_visual_attack(data: bytes) -> Dict[str, Any]:
|
|
"""Generate visual attack images for bit plane analysis"""
|
|
if not HAS_PIL or not HAS_NUMPY:
|
|
return {'error': 'PIL or numpy not available'}
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
if img.mode == 'P':
|
|
img = img.convert('RGB')
|
|
|
|
pixels = np.array(img)
|
|
|
|
# Extract LSB planes and scale to full intensity
|
|
lsb_images = {}
|
|
|
|
channel_names = ['Red', 'Green', 'Blue']
|
|
for ch_idx, ch_name in enumerate(channel_names):
|
|
if ch_idx < pixels.shape[2]:
|
|
# LSB plane scaled to 0 or 255
|
|
lsb = (pixels[:, :, ch_idx] & 1) * 255
|
|
lsb_images[ch_name] = lsb.tolist() # Can be reconstructed client-side
|
|
|
|
# Combined RGB LSB
|
|
combined = np.zeros_like(pixels)
|
|
for ch_idx in range(min(3, pixels.shape[2])):
|
|
combined[:, :, ch_idx] = (pixels[:, :, ch_idx] & 1) * 255
|
|
|
|
return {
|
|
'found': True,
|
|
'image_size': [int(pixels.shape[1]), int(pixels.shape[0])],
|
|
'channel_lsb_available': list(lsb_images.keys()),
|
|
'interpretation': 'Visual inspection of LSB planes can reveal hidden patterns'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e), 'found': False}
|
|
|
|
|
|
def png_steg_signature_scan(data: bytes) -> Dict[str, Any]:
|
|
"""Scan for known steganography tool signatures"""
|
|
signatures = {
|
|
b'STEG': 'Stegosaurus Wrecks',
|
|
b'openstego': 'OpenStego',
|
|
b'steghide': 'Steghide',
|
|
b'F5': 'F5 Algorithm',
|
|
b'jphide': 'JPHide',
|
|
b'outguess': 'OutGuess',
|
|
b'invisible secrets': 'Invisible Secrets',
|
|
b'camouflage': 'Camouflage',
|
|
b'snow': 'SNOW',
|
|
b'\x00\x00\x00\x01steg': 'Generic Steg Header',
|
|
}
|
|
|
|
found = []
|
|
|
|
for sig, tool_name in signatures.items():
|
|
pos = data.find(sig)
|
|
if pos != -1:
|
|
found.append({
|
|
'signature': sig.hex() if not sig.isascii() else sig.decode('ascii', errors='replace'),
|
|
'tool': tool_name,
|
|
'offset': pos
|
|
})
|
|
|
|
# Also check LSB extracted data
|
|
lsb_result = png_extract_lsb(data, bits=1, channels="RGB")
|
|
if lsb_result.get('raw_data'):
|
|
lsb_data = lsb_result['raw_data'][:1000]
|
|
for sig, tool_name in signatures.items():
|
|
if sig in lsb_data:
|
|
found.append({
|
|
'signature': sig.hex() if not sig.isascii() else sig.decode('ascii', errors='replace'),
|
|
'tool': tool_name,
|
|
'location': 'LSB_extracted'
|
|
})
|
|
|
|
return {
|
|
'found': len(found) > 0,
|
|
'signatures': found,
|
|
'suspicious': len(found) > 0
|
|
}
|
|
|
|
|
|
def png_full_analysis(data: bytes) -> Dict[str, Any]:
|
|
"""Run all PNG analysis tools and compile results"""
|
|
results = {
|
|
'file_type': 'PNG',
|
|
'analyses': {}
|
|
}
|
|
|
|
# Run all PNG analysis tools
|
|
analyses = [
|
|
('chunk_parse', png_parse_chunks),
|
|
('text_chunks', png_extract_text_chunks),
|
|
('appended_data', png_detect_appended_data),
|
|
('idat_analysis', png_analyze_idat),
|
|
('chi_square', png_chi_square_analysis),
|
|
('bit_planes', png_bit_plane_analysis),
|
|
('histogram', png_color_histogram_analysis),
|
|
('filter_analysis', png_filter_analysis),
|
|
('embedded_png', png_detect_embedded_png),
|
|
('steg_signatures', png_steg_signature_scan),
|
|
]
|
|
|
|
suspicious_count = 0
|
|
|
|
for name, func in analyses:
|
|
try:
|
|
result = func(data)
|
|
results['analyses'][name] = result
|
|
if result.get('suspicious'):
|
|
suspicious_count += 1
|
|
except Exception as e:
|
|
results['analyses'][name] = {'error': str(e)}
|
|
|
|
results['suspicious_indicators'] = suspicious_count
|
|
results['overall_suspicious'] = suspicious_count >= 2
|
|
results['summary'] = f"Found {suspicious_count} suspicious indicators"
|
|
|
|
return results
|
|
|
|
|
|
# Register PNG tools
|
|
def _register_png_tools():
|
|
"""Register all PNG analysis tools with the global registry"""
|
|
TOOL_REGISTRY.register('png_parse_chunks', png_parse_chunks)
|
|
TOOL_REGISTRY.register('png_extract_text_chunks', png_extract_text_chunks)
|
|
TOOL_REGISTRY.register('png_detect_appended_data', png_detect_appended_data)
|
|
TOOL_REGISTRY.register('png_analyze_idat', png_analyze_idat)
|
|
TOOL_REGISTRY.register('png_extract_lsb', png_extract_lsb)
|
|
TOOL_REGISTRY.register('png_chi_square_analysis', png_chi_square_analysis)
|
|
TOOL_REGISTRY.register('png_bit_plane_analysis', png_bit_plane_analysis)
|
|
TOOL_REGISTRY.register('png_palette_analysis', png_palette_analysis)
|
|
TOOL_REGISTRY.register('png_filter_analysis', png_filter_analysis)
|
|
TOOL_REGISTRY.register('png_detect_embedded_png', png_detect_embedded_png)
|
|
TOOL_REGISTRY.register('png_color_histogram_analysis', png_color_histogram_analysis)
|
|
TOOL_REGISTRY.register('png_visual_attack', png_visual_attack)
|
|
TOOL_REGISTRY.register('png_steg_signature_scan', png_steg_signature_scan)
|
|
TOOL_REGISTRY.register('png_full_analysis', png_full_analysis)
|
|
|
|
|
|
# Auto-register on module load
|
|
_register_png_tools()
|
|
|
|
|
|
# ============== ADVANCED TEXT STEGANOGRAPHY DETECTION ==============
|
|
|
|
def detect_homoglyph_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect Cyrillic/Latin homoglyph substitution steganography."""
|
|
results = {'found': False, 'substitutions': 0, 'details': []}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
# Cyrillic chars that look like Latin
|
|
CYRILLIC_TO_LATIN = {
|
|
'\u0430': 'a', '\u0441': 'c', '\u0435': 'e', '\u043e': 'o',
|
|
'\u0440': 'p', '\u0455': 's', '\u0445': 'x', '\u0443': 'y',
|
|
'\u0410': 'A', '\u0412': 'B', '\u0421': 'C', '\u0415': 'E',
|
|
'\u041d': 'H', '\u041a': 'K', '\u041c': 'M', '\u041e': 'O',
|
|
'\u0420': 'P', '\u0422': 'T', '\u0425': 'X',
|
|
}
|
|
|
|
for i, ch in enumerate(text):
|
|
if ch in CYRILLIC_TO_LATIN:
|
|
results['substitutions'] += 1
|
|
if results['substitutions'] <= 5:
|
|
results['details'].append({
|
|
'offset': i,
|
|
'cyrillic': repr(ch),
|
|
'looks_like': CYRILLIC_TO_LATIN[ch]
|
|
})
|
|
|
|
if results['substitutions'] > 3:
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_variation_selector_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect variation selector steganography."""
|
|
results = {'found': False, 'count': 0, 'selectors': []}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
for i, ch in enumerate(text):
|
|
if '\uFE00' <= ch <= '\uFE0F':
|
|
results['count'] += 1
|
|
if results['count'] <= 5:
|
|
results['selectors'].append({'offset': i, 'selector': f'VS{ord(ch) - 0xFDFF}'})
|
|
|
|
if results['count'] > 3:
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_combining_mark_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect steganography via invisible combining characters."""
|
|
results = {'found': False, 'count': 0, 'marks': []}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
INVISIBLE_COMBINERS = {
|
|
'\u034F': 'COMBINING GRAPHEME JOINER',
|
|
'\u200D': 'ZERO WIDTH JOINER',
|
|
'\u2060': 'WORD JOINER',
|
|
'\u2061': 'FUNCTION APPLICATION',
|
|
'\u2062': 'INVISIBLE TIMES',
|
|
'\u2063': 'INVISIBLE SEPARATOR',
|
|
'\u2064': 'INVISIBLE PLUS',
|
|
}
|
|
|
|
for i, ch in enumerate(text):
|
|
if ch in INVISIBLE_COMBINERS:
|
|
results['count'] += 1
|
|
if results['count'] <= 5:
|
|
results['marks'].append({
|
|
'offset': i,
|
|
'name': INVISIBLE_COMBINERS[ch]
|
|
})
|
|
|
|
if results['count'] > 3:
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_confusable_whitespace(data: bytes) -> Dict[str, Any]:
|
|
"""Detect steganography via Unicode whitespace variants."""
|
|
results = {'found': False, 'non_standard_spaces': 0, 'types': {}}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
SPACE_VARIANTS = {
|
|
'\u00A0': 'NO-BREAK SPACE',
|
|
'\u2000': 'EN QUAD',
|
|
'\u2001': 'EM QUAD',
|
|
'\u2002': 'EN SPACE',
|
|
'\u2003': 'EM SPACE',
|
|
'\u2004': 'THREE-PER-EM SPACE',
|
|
'\u2005': 'FOUR-PER-EM SPACE',
|
|
'\u2006': 'SIX-PER-EM SPACE',
|
|
'\u2007': 'FIGURE SPACE',
|
|
'\u2008': 'PUNCTUATION SPACE',
|
|
'\u2009': 'THIN SPACE',
|
|
'\u200A': 'HAIR SPACE',
|
|
'\u202F': 'NARROW NO-BREAK SPACE',
|
|
'\u205F': 'MEDIUM MATHEMATICAL SPACE',
|
|
'\u3000': 'IDEOGRAPHIC SPACE',
|
|
}
|
|
|
|
for ch in text:
|
|
if ch in SPACE_VARIANTS:
|
|
name = SPACE_VARIANTS[ch]
|
|
results['non_standard_spaces'] += 1
|
|
results['types'][name] = results['types'].get(name, 0) + 1
|
|
|
|
if results['non_standard_spaces'] > 3:
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_emoji_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect emoji substitution steganography patterns."""
|
|
results = {'found': False, 'emoji_count': 0, 'pattern_detected': False}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
import unicodedata
|
|
emojis = [ch for ch in text if unicodedata.category(ch).startswith(('So', 'Sk'))]
|
|
results['emoji_count'] = len(emojis)
|
|
|
|
# Check for alternating emoji pairs (characteristic of emoji substitution steg)
|
|
if len(emojis) > 20:
|
|
# Count unique emoji types
|
|
unique = len(set(emojis))
|
|
if unique <= 20 and len(emojis) > 50:
|
|
results['pattern_detected'] = True
|
|
results['found'] = True
|
|
|
|
return results
|
|
|
|
|
|
def detect_capitalization_steg(data: bytes) -> Dict[str, Any]:
|
|
"""Detect capitalization encoding steganography."""
|
|
results = {'found': False, 'suspicious_caps': 0, 'total_words': 0}
|
|
try:
|
|
text = data.decode('utf-8', errors='ignore')
|
|
except:
|
|
return results
|
|
|
|
words = text.split()
|
|
results['total_words'] = len(words)
|
|
|
|
# Count words with unexpected capitalization (mid-sentence uppercase)
|
|
for i, word in enumerate(words):
|
|
if word and word[0].isupper() and i > 0:
|
|
prev = words[i - 1] if i > 0 else ''
|
|
# Not after sentence end
|
|
if prev and prev[-1] not in '.!?:':
|
|
results['suspicious_caps'] += 1
|
|
|
|
# High ratio of unexpected caps suggests encoding
|
|
if results['total_words'] > 20:
|
|
ratio = results['suspicious_caps'] / results['total_words']
|
|
if ratio > 0.15: # More than 15% unexpected caps
|
|
results['found'] = True
|
|
|
|
return results
|