diff --git a/analysis_tools.py b/analysis_tools.py index c33bff6..a789972 100644 --- a/analysis_tools.py +++ b/analysis_tools.py @@ -1142,7 +1142,7 @@ def png_bit_plane_analysis(data: bytes) -> Dict[str, Any]: def png_palette_analysis(data: bytes) -> Dict[str, Any]: - """Analyze PNG palette for steganography indicators""" + """Analyze PNG palette for steganography indicators and attempt LSB decode""" if not HAS_PIL: return {'error': 'PIL not available'} @@ -1176,17 +1176,464 @@ def png_palette_analysis(data: bytes) -> Dict[str, Any]: histogram = img.histogram() used_colors = sum(1 for h in histogram[:256] if h > 0) - return { + # === PALETTE INDEX LSB DECODE ATTEMPT === + # Extract LSB of each pixel's palette index + pixel_indices = list(img.getdata()) + bits = [idx & 1 for idx in pixel_indices] + + decoded_message = None + decoded_length = None + if len(bits) >= 32: + # Try 32-bit length prefix (big-endian) + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + + if 0 < length < min(5000, (len(bits) - 32) // 8): + decoded_length = length + msg_bits = bits[32:32 + length * 8] + msg_bytes = bytearray() + for i in range(0, len(msg_bits), 8): + v = 0 + for j in range(8): + if i + j < len(msg_bits): + v = (v << 1) | msg_bits[i + j] + msg_bytes.append(v) + try: + decoded_message = msg_bytes.decode('utf-8', errors='replace') + except: + decoded_message = msg_bytes.hex() + + # === PALETTE COLOR LSB DECODE ATTEMPT === + # Some steg tools encode in the LSB of palette colors themselves + palette_lsb_bits = [] + for r, g, b in colors: + palette_lsb_bits.extend([r & 1, g & 1, b & 1]) + + palette_decoded = None + if len(palette_lsb_bits) >= 32: + plen = 0 + for i in range(32): + plen = (plen << 1) | palette_lsb_bits[i] + if 0 < plen < (len(palette_lsb_bits) - 32) // 8: + pbits = palette_lsb_bits[32:32 + plen * 8] + pbytes = bytearray() + for i in range(0, len(pbits), 8): + v = 0 + for j in range(8): + if i + j < len(pbits): + v = (v << 1) | pbits[i + j] + pbytes.append(v) + try: + palette_decoded = pbytes.decode('utf-8', errors='replace') + except: + palette_decoded = pbytes.hex() + + result = { 'found': True, 'palette_size': len(colors), 'used_colors': used_colors, 'is_sorted': is_sorted, 'near_duplicate_pairs': len(near_duplicates), - 'near_duplicates': near_duplicates[:10], # First 10 - 'suspicious': len(near_duplicates) > 5 or is_sorted, - 'interpretation': 'Sorted palette or many near-duplicates may indicate palette-based steganography' + 'near_duplicates': near_duplicates[:10], + 'suspicious': len(near_duplicates) > 5 or is_sorted or decoded_message is not None, + 'interpretation': 'Sorted palette or many near-duplicates may indicate palette-based steganography', } + if decoded_message: + result['index_lsb_decode'] = { + 'length': decoded_length, + 'message': decoded_message[:200], + 'method': 'palette_index_lsb' + } + if palette_decoded: + result['palette_color_lsb_decode'] = { + 'message': palette_decoded[:200], + 'method': 'palette_color_lsb' + } + + return result + + except Exception as e: + return {'error': str(e), 'found': False} + + +def detect_pvd_steg(data: bytes) -> Dict[str, Any]: + """Detect and decode Pixel Value Differencing (PVD) steganography. + + PVD encodes bits in the LSB of the second pixel in each horizontal pair. + bit=1 means the R channel of pixel[x+1] is odd, bit=0 means even. + """ + if not HAS_PIL: + return {'error': 'PIL not available', 'found': False} + + try: + img = Image.open(io.BytesIO(data)).convert('RGB') + pixels = img.load() + width, height = img.size + + # Extract bits from R channel LSB of every second pixel in each pair + bits = [] + for y in range(height): + for x in range(0, width - 1, 2): + r2, _, _ = pixels[x + 1, y] + bits.append(r2 & 1) + + if len(bits) < 32: + return {'found': False, 'reason': 'Not enough pixel pairs'} + + # Try 32-bit big-endian length prefix + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + + if length <= 0 or length > min(5000, (len(bits) - 32) // 8): + return {'found': False, 'reason': f'Invalid length prefix: {length}'} + + msg_bits = bits[32:32 + length * 8] + msg_bytes = bytearray() + for i in range(0, len(msg_bits), 8): + v = 0 + for j in range(8): + if i + j < len(msg_bits): + v = (v << 1) | msg_bits[i + j] + msg_bytes.append(v) + + try: + decoded = msg_bytes.decode('utf-8', errors='replace') + except: + decoded = msg_bytes.hex() + + return { + 'found': True, + 'method': 'pvd_pair_lsb', + 'length': length, + 'message': decoded[:200], + 'suspicious': True, + 'findings': [f'PVD decode ({length} bytes): {decoded[:80]}'], + 'interpretation': 'Pixel Value Differencing — data encoded in R channel LSB of paired pixels' + } + + except Exception as e: + return {'error': str(e), 'found': False} + + +def detect_histogram_shift_steg(data: bytes) -> Dict[str, Any]: + """Detect and decode histogram shifting steganography. + + Histogram shifting encodes bits by shifting the peak pixel value: + peak stays = 0, peak+1 = 1. The encoder also shifts all pixels > peak + by +1 to make room, so the encoded image has two peaks (original peak + split into peak and peak+1). We try multiple candidate peaks. + """ + if not HAS_PIL: + return {'error': 'PIL not available', 'found': False} + + try: + img = Image.open(io.BytesIO(data)) + if img.mode != 'L': + img = img.convert('L') + pixels = img.load() + width, height = img.size + + # Build histogram + hist = [0] * 256 + for y in range(height): + for x in range(width): + hist[pixels[x, y]] += 1 + + # After encoding, the original peak is split across peak and peak+1. + # We brute-force all possible peak values (0-254) since the peak + # may not be the highest in the encoded histogram. + candidates = list(range(255)) + + for peak in candidates: + bits = [] + for y in range(height): + for x in range(width): + v = pixels[x, y] + if v == peak: + bits.append(0) + elif v == peak + 1: + bits.append(1) + + if len(bits) < 40: + continue + + # Try 32-bit length prefix + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + + if length <= 0 or length > min(5000, (len(bits) - 32) // 8): + continue + + msg_bits = bits[32:32 + length * 8] + msg_bytes = bytearray() + for i in range(0, len(msg_bits), 8): + v = 0 + for j in range(8): + if i + j < len(msg_bits): + v = (v << 1) | msg_bits[i + j] + msg_bytes.append(v) + + try: + decoded = msg_bytes.decode('utf-8', errors='replace') + except: + continue + + # Check if it looks like valid text (high threshold to avoid false positives) + printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t') + if printable > len(decoded) * 0.85: + return { + 'found': True, + 'method': 'histogram_shift', + 'peak': peak, + 'length': length, + 'message': decoded[:200], + 'suspicious': True, + 'findings': [f'Histogram shift decode (peak={peak}, {length} bytes): {decoded[:80]}'], + 'interpretation': 'Histogram shifting — peak pixel values encode bits via shift' + } + + return {'found': False, 'reason': 'No valid histogram shift pattern found'} + + except Exception as e: + return {'error': str(e), 'found': False} + + +def detect_multibit_lsb(data: bytes) -> Dict[str, Any]: + """Detect and decode multi-bit LSB steganography (2-bit, 4-bit per channel).""" + if not HAS_PIL: + return {'error': 'PIL not available', 'found': False} + + try: + img = Image.open(io.BytesIO(data)).convert('RGBA') + pixels = list(img.getdata()) + results = {} + + for bits_per_ch in [2, 4]: + mask = (1 << bits_per_ch) - 1 + nibbles = [] + for r, g, b, a in pixels: + for ch in [r, g, b]: + nibbles.append(ch & mask) + + units_per_byte = 8 // bits_per_ch + length_units = 4 * units_per_byte + + if len(nibbles) < length_units: + continue + + length = 0 + for i in range(length_units): + length = (length << bits_per_ch) | nibbles[i] + + if 0 < length < min(5000, (len(nibbles) - length_units) // units_per_byte): + msg = bytearray() + idx = length_units + for _ in range(length): + byte_val = 0 + for _ in range(units_per_byte): + if idx < len(nibbles): + byte_val = (byte_val << bits_per_ch) | nibbles[idx] + idx += 1 + msg.append(byte_val & 0xFF) + + try: + decoded = msg.decode('utf-8', errors='replace') + except: + decoded = msg.hex() + + printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t') + if printable > len(decoded) * 0.5: + results[f'{bits_per_ch}bit'] = { + 'found': True, + 'bits_per_channel': bits_per_ch, + 'length': length, + 'message': decoded[:200], + } + + if results: + best = list(results.values())[0] + return { + 'found': True, + 'method': f'multibit_lsb_{best["bits_per_channel"]}bpc', + 'length': best['length'], + 'message': best['message'], + 'suspicious': True, + 'findings': [f'Multi-bit LSB ({best["bits_per_channel"]}bpc, {best["length"]} bytes): {best["message"][:80]}'], + } + + return {'found': False, 'reason': 'No valid multi-bit LSB pattern found'} + + except Exception as e: + return {'error': str(e), 'found': False} + + +def gif_analysis(data: bytes) -> Dict[str, Any]: + """Analyze GIF files for steganography — comment blocks, palette LSB, disposal methods""" + if not HAS_PIL: + return {'error': 'PIL not available'} + + results = { + 'found': False, + 'findings': [], + 'comment_blocks': [], + 'palette_lsb_decode': None, + 'disposal_methods': [], + } + + try: + # 1. Extract GIF comment extension blocks from raw data + pos = 0 + while pos < len(data) - 2: + if data[pos] == 0x21 and data[pos + 1] == 0xFE: # Comment extension + pos += 2 + comment = bytearray() + while pos < len(data) and data[pos] != 0: + block_len = data[pos] + pos += 1 + comment.extend(data[pos:pos + block_len]) + pos += block_len + pos += 1 # Skip terminator + try: + decoded = comment.decode('utf-8', errors='replace') + results['comment_blocks'].append(decoded) + results['found'] = True + results['findings'].append(f'Comment block: {decoded[:100]}') + except: + results['comment_blocks'].append(comment.hex()) + continue + pos += 1 + + # 2. Extract disposal method bits from GCE blocks + pos = 0 + while pos < len(data) - 5: + if data[pos] == 0x21 and data[pos + 1] == 0xF9 and data[pos + 2] == 0x04: + packed = data[pos + 3] + disposal = (packed >> 2) & 0x07 + results['disposal_methods'].append(disposal) + pos += 6 + else: + pos += 1 + + if len(results['disposal_methods']) > 1: + results['findings'].append(f"Disposal methods: {results['disposal_methods'][:20]}") + + # 3. Palette index LSB decode + img = Image.open(io.BytesIO(data)) + if img.mode == 'P': + pixel_indices = list(img.getdata()) + bits = [idx & 1 for idx in pixel_indices] + + if len(bits) >= 32: + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + + if 0 < length < min(5000, (len(bits) - 32) // 8): + msg_bits = bits[32:32 + length * 8] + msg_bytes = bytearray() + for i in range(0, len(msg_bits), 8): + v = 0 + for j in range(8): + if i + j < len(msg_bits): + v = (v << 1) | msg_bits[i + j] + msg_bytes.append(v) + try: + decoded_msg = msg_bytes.decode('utf-8', errors='replace') + results['palette_lsb_decode'] = { + 'length': length, + 'message': decoded_msg[:200], + 'method': 'palette_index_lsb' + } + results['found'] = True + results['findings'].append(f'Palette LSB decode ({length} bytes): {decoded_msg[:50]}') + except: + pass + + results['suspicious'] = results['found'] + return results + + except Exception as e: + return {'error': str(e), 'found': False} + + +def bmp_analysis(data: bytes) -> Dict[str, Any]: + """Analyze BMP files for steganography — reserved header fields, trailing data, LSB""" + results = { + 'found': False, + 'findings': [], + 'reserved_bytes': None, + 'trailing_data': None, + 'lsb_decode': None, + } + + if len(data) < 54: + return {'error': 'File too small for BMP', 'found': False} + + if data[:2] != b'BM': + return {'error': 'Not a BMP file', 'found': False} + + try: + # Check reserved bytes at offset 6-9 (should be zero in clean BMPs) + reserved = data[6:10] + if reserved != b'\x00\x00\x00\x00': + results['reserved_bytes'] = reserved.hex() + results['found'] = True + results['findings'].append(f'Non-zero reserved bytes: {reserved.hex()}') + + # Check for trailing data after pixel data + file_size = struct.unpack(' file_size: + trailing = data[file_size:] + results['trailing_data'] = { + 'size': actual_size - file_size, + 'preview': trailing[:200].decode('utf-8', errors='replace') + } + results['found'] = True + results['findings'].append(f'Trailing data: {actual_size - file_size} bytes after BMP end') + + # LSB decode via PIL + if HAS_PIL: + img = Image.open(io.BytesIO(data)).convert('RGBA') + pixels = list(img.getdata()) + bits = [] + for r, g, b, a in pixels: + for ch in [r, g, b]: + bits.append(ch & 1) + + if len(bits) >= 32: + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + + if 0 < length < min(5000, (len(bits) - 32) // 8): + msg_bits = bits[32:32 + length * 8] + msg_bytes = bytearray() + for i in range(0, len(msg_bits), 8): + v = 0 + for j in range(8): + if i + j < len(msg_bits): + v = (v << 1) | msg_bits[i + j] + msg_bytes.append(v) + try: + decoded = msg_bytes.decode('utf-8', errors='replace') + results['lsb_decode'] = { + 'length': length, + 'message': decoded[:200], + 'method': 'rgb_lsb' + } + results['found'] = True + results['findings'].append(f'LSB decode ({length} bytes): {decoded[:50]}') + except: + pass + + results['suspicious'] = results['found'] + return results + except Exception as e: return {'error': str(e), 'found': False} @@ -1482,6 +1929,12 @@ def _register_png_tools(): TOOL_REGISTRY.register('png_visual_attack', png_visual_attack) TOOL_REGISTRY.register('png_steg_signature_scan', png_steg_signature_scan) TOOL_REGISTRY.register('png_full_analysis', png_full_analysis) + # GIF, BMP, and advanced image analysis + TOOL_REGISTRY.register('gif_analysis', gif_analysis) + TOOL_REGISTRY.register('bmp_analysis', bmp_analysis) + TOOL_REGISTRY.register('detect_pvd_steg', detect_pvd_steg) + TOOL_REGISTRY.register('detect_histogram_shift_steg', detect_histogram_shift_steg) + TOOL_REGISTRY.register('detect_multibit_lsb', detect_multibit_lsb) # Auto-register on module load @@ -1663,3 +2116,588 @@ def detect_capitalization_steg(data: bytes) -> Dict[str, Any]: results['found'] = True return results + + +# ============== AUDIO STEGANOGRAPHY ============== + +def audio_lsb_decode(data: bytes) -> Dict[str, Any]: + """Decode LSB steganography from WAV audio files.""" + import wave + try: + w = wave.open(io.BytesIO(data)) + raw = w.readframes(w.getnframes()) + sampwidth = w.getsampwidth() + w.close() + if sampwidth != 2: + return {'found': False, 'reason': f'Sample width {sampwidth} not supported'} + samples = struct.unpack(f'<{len(raw)//2}h', raw) + bits = [s & 1 for s in samples] + if len(bits) < 32: + return {'found': False} + length = 0 + for i in range(32): + length = (length << 1) | bits[i] + if length <= 0 or length > min(10000, (len(bits) - 32) // 8): + return {'found': False, 'reason': f'Invalid length: {length}'} + msg = bytearray() + for i in range(0, length * 8, 8): + v = 0 + for j in range(8): + if 32 + i + j < len(bits): + v = (v << 1) | bits[32 + i + j] + msg.append(v) + decoded = msg.decode('utf-8', errors='replace') + return {'found': True, 'method': 'audio_lsb', 'length': length, + 'message': decoded[:200], 'suspicious': True, + 'findings': [f'Audio LSB ({length} bytes): {decoded[:80]}']} + except Exception as e: + return {'error': str(e), 'found': False} + + +# ============== PCAP / NETWORK PROTOCOL DECODERS ============== + +def pcap_decode(data: bytes) -> Dict[str, Any]: + """Parse PCAP and extract steganographic data from protocol fields.""" + results = {'found': False, 'findings': [], 'packets': 0, 'methods': {}} + if len(data) < 24: + return results + magic = data[:4] + if magic == b'\xa1\xb2\xc3\xd4': + endian = '>' + elif magic == b'\xd4\xc3\xb2\xa1': + endian = '<' + else: + return {'found': False, 'reason': 'Not PCAP'} + + pos = 24 + ttl_bytes = bytearray() + ipid_bytes = bytearray() + win_bytes = bytearray() + urg_bytes = bytearray() + payloads = bytearray() + timestamps = [] + import base64, re as _re + + while pos + 16 <= len(data): + ts_sec = struct.unpack(f'{endian}I', data[pos:pos+4])[0] + ts_usec = struct.unpack(f'{endian}I', data[pos+4:pos+8])[0] + incl_len = struct.unpack(f'{endian}I', data[pos+8:pos+12])[0] + pos += 16 + if pos + incl_len > len(data): + break + pkt = data[pos:pos + incl_len] + results['packets'] += 1 + timestamps.append(ts_sec * 1000000 + ts_usec) + if len(pkt) > 34 and pkt[12:14] == b'\x08\x00': + ip_start = 14 + ttl_bytes.append(pkt[ip_start + 8]) + ipid_bytes.extend(pkt[ip_start + 4:ip_start + 6]) + protocol = pkt[ip_start + 9] + ip_hdr_len = (pkt[ip_start] & 0x0F) * 4 + if protocol == 17 and len(pkt) > ip_start + ip_hdr_len + 8: + udp_start = ip_start + ip_hdr_len + udp_len = struct.unpack('>H', pkt[udp_start + 4:udp_start + 6])[0] + payloads.extend(pkt[udp_start + 8:udp_start + udp_len]) + elif protocol == 6 and len(pkt) > ip_start + ip_hdr_len + 20: + tcp_start = ip_start + ip_hdr_len + win_bytes.extend(pkt[tcp_start + 14:tcp_start + 16]) + urg_bytes.extend(pkt[tcp_start + 18:tcp_start + 20]) + elif protocol == 1 and len(pkt) > ip_start + ip_hdr_len + 8: + payloads.extend(pkt[ip_start + ip_hdr_len + 8:]) + pos += incl_len + + def try_decode(raw, name): + try: + text = raw.decode('utf-8', errors='strict') + p = sum(1 for c in text if c.isprintable() or c in '\r\n\t') + if len(text) > 4 and p > len(text) * 0.7: + results['methods'][name] = {'message': text[:200]} + results['found'] = True + results['findings'].append(f'{name}: {text[:60]}') + return + except: pass + try: + text = raw.decode('ascii', errors='ignore') + for m in _re.finditer(r'[A-Za-z0-9+/]{16,}={0,2}', text): + d = base64.b64decode(m.group()).decode('utf-8', errors='strict') + if len(d) > 4: + results['methods'][name + '_b64'] = {'message': d[:200]} + results['found'] = True + results['findings'].append(f'{name} (b64): {d[:60]}') + return + except: pass + try: + clean = ''.join(c for c in raw.decode('ascii', errors='ignore').upper() + if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567') + if len(clean) > 10: + d = base64.b32decode(clean + '=' * ((8 - len(clean) % 8) % 8)).decode('utf-8', errors='strict') + if len(d) > 4: + results['methods'][name + '_b32'] = {'message': d[:200]} + results['found'] = True + results['findings'].append(f'{name} (b32): {d[:60]}') + except: pass + + if payloads: try_decode(bytes(payloads), 'payload') + if ttl_bytes: try_decode(bytes(ttl_bytes), 'ip_ttl') + if ipid_bytes: try_decode(bytes(ipid_bytes), 'ip_id') + if win_bytes: try_decode(bytes(win_bytes), 'tcp_window') + if urg_bytes: try_decode(bytes(urg_bytes), 'tcp_urgent') + + # Covert timing + if len(timestamps) > 16: + delays = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] + median = sorted(delays)[len(delays)//2] + tbits = ['1' if d > median else '0' for d in delays] + if len(tbits) >= 16: + tlen = int(''.join(tbits[:16]), 2) + if 0 < tlen < min(500, (len(tbits) - 16) // 8): + tb = bytearray() + for i in range(16, 16 + tlen * 8, 8): + if i + 8 <= len(tbits): + tb.append(int(''.join(tbits[i:i+8]), 2)) + try: + t = tb.decode('utf-8', errors='strict') + if sum(1 for c in t if c.isprintable()) > len(t) * 0.7: + results['methods']['covert_timing'] = {'message': t[:200]} + results['found'] = True + results['findings'].append(f'Timing ({tlen}b): {t[:60]}') + except: pass + + if b'HTTP/' in payloads: + results['findings'].append('HTTP traffic detected') + results['found'] = True + + results['suspicious'] = results['found'] + return results + + +# ============== ARCHIVE DECODERS ============== + +def zip_decode(data: bytes) -> Dict[str, Any]: + """Extract steg data from ZIP — comments, nested ZIPs, trailing data.""" + import zipfile + results = {'found': False, 'findings': []} + try: + zf = zipfile.ZipFile(io.BytesIO(data)) + if zf.comment: + results['comment'] = zf.comment.decode('utf-8', errors='replace')[:200] + results['found'] = True + results['findings'].append(f'ZIP comment: {results["comment"][:60]}') + for name in zf.namelist(): + if any(s in name.lower() for s in ['secret', 'hidden', 'steg', 'flag', 'inner.zip']): + content = zf.read(name) + if content[:2] == b'PK': + inner = zipfile.ZipFile(io.BytesIO(content)) + for iname in inner.namelist(): + ic = inner.read(iname).decode('utf-8', errors='replace') + results['findings'].append(f'Nested {iname}: {ic[:100]}') + results['found'] = True + inner.close() + else: + results['findings'].append(f'{name}: {content.decode("utf-8", errors="replace")[:100]}') + results['found'] = True + zf.close() + eocd = data.rfind(b'PK\x05\x06') + if eocd >= 0: + eocd_size = 22 + struct.unpack(' Dict[str, Any]: + """Extract steg data from TAR — PAX headers, file contents.""" + import tarfile + results = {'found': False, 'findings': []} + try: + tf = tarfile.open(fileobj=io.BytesIO(data)) + for member in tf.getmembers(): + if hasattr(member, 'pax_headers') and member.pax_headers: + for k, v in member.pax_headers.items(): + results['findings'].append(f'PAX {k}: {str(v)[:100]}') + results['found'] = True + if member.isfile(): + f = tf.extractfile(member) + if f: + results['findings'].append(f'{member.name}: {f.read(200).decode("utf-8", errors="replace")[:100]}') + tf.close() + except Exception as e: + results['error'] = str(e) + results['suspicious'] = results['found'] + return results + + +def gzip_decode(data: bytes) -> Dict[str, Any]: + """Extract steg data from GZip — FEXTRA, FCOMMENT fields.""" + results = {'found': False, 'findings': []} + if len(data) < 10 or data[:2] != b'\x1f\x8b': + return results + flags = data[3] + pos = 10 + if flags & 0x04 and pos + 2 <= len(data): + xlen = struct.unpack(' Dict[str, Any]: + """Extract steg data from SQLite — hidden tables.""" + import sqlite3, tempfile, os + results = {'found': False, 'findings': []} + try: + tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.db') + tmp.write(data); tmp.close() + conn = sqlite3.connect(tmp.name) + c = conn.cursor() + c.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = [r[0] for r in c.fetchall()] + results['tables'] = tables + for table in tables: + if any(s in table.lower() for s in ['steg', 'hidden', 'secret', 'payload', '_steg']): + c.execute(f'SELECT * FROM "{table}" LIMIT 10') + for row in c.fetchall(): + results['findings'].append(f'{table}: {" | ".join(str(v)[:80] for v in row)[:150]}') + results['found'] = True + conn.close(); os.unlink(tmp.name) + except Exception as e: + results['error'] = str(e) + results['suspicious'] = results['found'] + return results + + +# ============== DOCUMENT DECODERS ============== + +def pdf_decode(data: bytes) -> Dict[str, Any]: + """Extract steg data from PDF — JS, forms, XMP, trailing data.""" + import re as _re, base64 + results = {'found': False, 'findings': []} + if not data.startswith(b'%PDF'): + return results + text = data.decode('latin-1', errors='replace') + if '/JavaScript' in text or '/JS ' in text: + results['findings'].append('JavaScript detected') + results['found'] = True + for m in _re.finditer(r'/JS\s*\(([^)]+)\)', text): + results['findings'].append(f'JS: {m.group(1)[:80]}') + if '/AcroForm' in text: + for m in _re.finditer(r'/V\s*\(([^)]+)\)', text): + results['findings'].append(f'Form: {m.group(1)[:80]}') + results['found'] = True + eof = data.rfind(b'%%EOF') + if eof >= 0: + trailing = data[eof + 5:].strip() + if trailing: + results['findings'].append(f'Post-EOF ({len(trailing)}b): {trailing.decode("utf-8", errors="replace")[:80]}') + results['found'] = True + xmp = data.find(b'= 0: + xmp_end = data.find(b'', xmp) + if xmp_end >= 0: + xmp_data = data[xmp:xmp_end+12].decode('utf-8', errors='replace') + for m in _re.finditer(r'([^<]+)', xmp_data): + results['findings'].append(f'XMP desc: {m.group(1)[:80]}') + results['found'] = True + results['suspicious'] = results['found'] + return results + + +def jpeg_decode(data: bytes) -> Dict[str, Any]: + """Extract steg data from JPEG — COM markers, APP segments.""" + results = {'found': False, 'findings': []} + if len(data) < 2 or data[:2] != b'\xFF\xD8': + return results + pos = 2 + while pos < len(data) - 4: + if data[pos] != 0xFF: + pos += 1; continue + marker = data[pos + 1] + if marker == 0xFE: # COM + length = struct.unpack('>H', data[pos+2:pos+4])[0] + comment = data[pos+4:pos+2+length].decode('utf-8', errors='replace') + results['findings'].append(f'COM: {comment[:100]}') + results['found'] = True + pos += 2 + length + elif 0xE0 <= marker <= 0xEF: + length = struct.unpack('>H', data[pos+2:pos+4])[0] + if marker not in (0xE0, 0xE1): + seg = data[pos+4:pos+2+length] + text = seg.decode('utf-8', errors='replace') + if any(s in text.lower() for s in ['st3gg', 'steg', 'secret']): + results['findings'].append(f'APP{marker-0xE0}: {text[:80]}') + results['found'] = True + pos += 2 + length + elif marker in (0xDA, 0xD9): + break + else: + try: + length = struct.unpack('>H', data[pos+2:pos+4])[0] + pos += 2 + length + except: + break + results['suspicious'] = results['found'] + return results + + +def svg_decode(data: bytes) -> Dict[str, Any]: + """Extract steg data from SVG — comments, data attributes, metadata.""" + import re as _re + results = {'found': False, 'findings': []} + try: + text = data.decode('utf-8', errors='replace') + for m in _re.finditer(r'', text, _re.DOTALL): + c = m.group(1).strip() + if len(c) > 5: + results['findings'].append(f'Comment: {c[:80]}') + results['found'] = True + for m in _re.finditer(r'data-\w+="([^"]*)"', text): + results['findings'].append(f'Data attr: {m.group(1)[:80]}') + results['found'] = True + meta = text.find('= 0: + meta_end = text.find('', meta) + if meta_end >= 0: + for m in _re.finditer(r'([^<]+)', text[meta:meta_end]): + results['findings'].append(f'Description: {m.group(1)[:80]}') + results['found'] = True + except Exception as e: + results['error'] = str(e) + results['suspicious'] = results['found'] + return results + + +# ============== GENERIC IMAGE LSB ============== + +def generic_image_lsb_decode(data: bytes) -> Dict[str, Any]: + """Decode LSB from any PIL-supported image (TIFF, PPM, PGM, ICO, WebP, etc). + + Handles grayscale (L), palette (P), RGB, and RGBA modes. + Tries both 32-bit and 16-bit length prefixes for small images (ICO). + """ + if not HAS_PIL: + return {'error': 'PIL not available', 'found': False} + try: + img = Image.open(io.BytesIO(data)) + fmt = img.format or 'unknown' + + # Extract bits based on image mode + if img.mode == 'P': + pixels = list(img.getdata()) + bits = [p & 1 for p in pixels] + elif img.mode in ('L', 'LA'): + # Grayscale: 1 channel + if img.mode == 'LA': + img = img.convert('L') + pixels = list(img.getdata()) + bits = [p & 1 for p in pixels] + else: + img = img.convert('RGBA') + pixels = list(img.getdata()) + bits = [] + for r, g, b, a in pixels: + for ch in [r, g, b]: + bits.append(ch & 1) + + if len(bits) < 16: + return {'found': False} + + # Try both 32-bit and 16-bit length prefixes + for prefix_bits in [32, 16]: + if len(bits) < prefix_bits: + continue + length = 0 + for i in range(prefix_bits): + length = (length << 1) | bits[i] + if length <= 0 or length > min(10000, (len(bits) - prefix_bits) // 8): + continue + msg = bytearray() + for i in range(0, length * 8, 8): + v = 0 + for j in range(8): + if prefix_bits + i + j < len(bits): + v = (v << 1) | bits[prefix_bits + i + j] + msg.append(v) + decoded = msg.decode('utf-8', errors='replace') + printable = sum(1 for c in decoded if c.isprintable() or c in '\r\n\t') + if printable > len(decoded) * 0.5: + return {'found': True, 'format': fmt, 'method': 'image_lsb', + 'length': length, 'prefix_bits': prefix_bits, + 'message': decoded[:200], 'suspicious': True, + 'findings': [f'{fmt} LSB ({length}b, {prefix_bits}b prefix): {decoded[:80]}']} + + return {'found': False, 'format': fmt} + except Exception as e: + return {'error': str(e), 'found': False} + + +# ============== TEXT TECHNIQUE DECODERS ============== + +def decode_braille(data: bytes) -> Dict[str, Any]: + """Decode Braille pattern steganography (U+2800 block).""" + try: + text = data.decode('utf-8') + braille = [c for c in text if 0x2800 <= ord(c) <= 0x28FF] + if len(braille) < 4: + return {'found': False} + decoded = bytes(ord(c) - 0x2800 for c in braille).decode('utf-8', errors='replace') + return {'found': True, 'method': 'braille', 'length': len(braille), + 'message': decoded[:200], + 'findings': [f'Braille ({len(braille)} chars): {decoded[:80]}']} + except Exception as e: + return {'error': str(e), 'found': False} + + +def decode_directional_override(data: bytes) -> Dict[str, Any]: + """Decode directional override steganography (RLO=1, LRO=0).""" + try: + text = data.decode('utf-8') + bits = [] + for ch in text: + if ch == '\u202E': bits.append('1') + elif ch == '\u202D': bits.append('0') + if len(bits) < 16: + return {'found': False} + length = int(''.join(bits[:16]), 2) + if length <= 0 or length > (len(bits) - 16) // 8: + return {'found': False} + msg = bytearray() + for i in range(16, 16 + length * 8, 8): + if i + 8 <= len(bits): + msg.append(int(''.join(bits[i:i+8]), 2)) + decoded = msg.decode('utf-8', errors='replace') + return {'found': True, 'method': 'directional', 'length': length, + 'message': decoded[:200], + 'findings': [f'Bidi decode ({length}b): {decoded[:80]}']} + except Exception as e: + return {'error': str(e), 'found': False} + + +def decode_hangul_filler(data: bytes) -> Dict[str, Any]: + """Decode Hangul filler steganography (U+3164=1, space=0). + + Handles partial messages where cover text has fewer spaces than payload needs. + """ + try: + text = data.decode('utf-8') + hf_count = text.count('\u3164') + if hf_count == 0: + return {'found': False} + bits = [] + for ch in text: + if ch == '\u3164': bits.append('1') + elif ch == ' ': bits.append('0') + if len(bits) < 16: + return {'found': False} + length = int(''.join(bits[:16]), 2) + if length <= 0 or length > 5000: + return {'found': False} + # Decode as many bytes as we have bits for (may be partial) + available_bytes = (len(bits) - 16) // 8 + decode_bytes = min(length, available_bytes) + msg = bytearray() + for i in range(16, 16 + decode_bytes * 8, 8): + if i + 8 <= len(bits): + msg.append(int(''.join(bits[i:i+8]), 2)) + decoded = msg.decode('utf-8', errors='replace') + partial = decode_bytes < length + return {'found': True, 'method': 'hangul_filler', + 'length': length, 'decoded_bytes': decode_bytes, + 'partial': partial, 'message': decoded[:200], + 'findings': [f'Hangul ({decode_bytes}/{length}b{"*" if partial else ""}): {decoded[:80]}']} + except Exception as e: + return {'error': str(e), 'found': False} + + +def decode_math_alphanumeric(data: bytes) -> Dict[str, Any]: + """Decode math bold substitution (bold=1, normal=0).""" + try: + text = data.decode('utf-8') + bits = [] + for ch in text: + o = ord(ch) + if 0x1D400 <= o <= 0x1D419 or 0x1D41A <= o <= 0x1D433: + bits.append('1') + elif ch.isascii() and ch.isalpha(): + bits.append('0') + if len(bits) < 16: + return {'found': False} + length = int(''.join(bits[:16]), 2) + if length <= 0 or length > (len(bits) - 16) // 8: + return {'found': False} + msg = bytearray() + for i in range(16, 16 + length * 8, 8): + if i + 8 <= len(bits): + msg.append(int(''.join(bits[i:i+8]), 2)) + decoded = msg.decode('utf-8', errors='replace') + return {'found': True, 'method': 'math_alpha', 'length': length, + 'message': decoded[:200], + 'findings': [f'Math alpha ({length}b): {decoded[:80]}']} + except Exception as e: + return {'error': str(e), 'found': False} + + +def decode_emoji_skin_tone(data: bytes) -> Dict[str, Any]: + """Decode emoji skin tone steganography (4 tones = 2 bits each).""" + try: + text = data.decode('utf-8') + TONES = {'\U0001F3FB': 0, '\U0001F3FC': 1, '\U0001F3FE': 2, '\U0001F3FF': 3} + pairs = [TONES[c] for c in text if c in TONES] + if len(pairs) < 4: + return {'found': False} + msg = bytearray() + for i in range(0, len(pairs) - 3, 4): + msg.append((pairs[i] << 6) | (pairs[i+1] << 4) | (pairs[i+2] << 2) | pairs[i+3]) + decoded = msg.decode('utf-8', errors='replace') + printable = sum(1 for c in decoded if c.isprintable()) + if printable > len(decoded) * 0.5: + return {'found': True, 'method': 'emoji_skin_tone', 'length': len(msg), + 'message': decoded[:200], + 'findings': [f'Skin tone ({len(msg)}b): {decoded[:80]}']} + return {'found': False} + except Exception as e: + return {'error': str(e), 'found': False} + + +# ============== REGISTER ALL TOOLS ============== + +def _register_all_tools(): + """Register ALL analysis and decode tools.""" + TOOL_REGISTRY.register('detect_homoglyph_steg', detect_homoglyph_steg) + TOOL_REGISTRY.register('detect_variation_selector_steg', detect_variation_selector_steg) + TOOL_REGISTRY.register('detect_combining_mark_steg', detect_combining_mark_steg) + TOOL_REGISTRY.register('detect_confusable_whitespace', detect_confusable_whitespace) + TOOL_REGISTRY.register('detect_emoji_steg', detect_emoji_steg) + TOOL_REGISTRY.register('detect_capitalization_steg', detect_capitalization_steg) + TOOL_REGISTRY.register('audio_lsb_decode', audio_lsb_decode) + TOOL_REGISTRY.register('pcap_decode', pcap_decode) + TOOL_REGISTRY.register('zip_decode', zip_decode) + TOOL_REGISTRY.register('tar_decode', tar_decode) + TOOL_REGISTRY.register('gzip_decode', gzip_decode) + TOOL_REGISTRY.register('sqlite_decode', sqlite_decode) + TOOL_REGISTRY.register('pdf_decode', pdf_decode) + TOOL_REGISTRY.register('jpeg_decode', jpeg_decode) + TOOL_REGISTRY.register('svg_decode', svg_decode) + TOOL_REGISTRY.register('generic_image_lsb_decode', generic_image_lsb_decode) + TOOL_REGISTRY.register('decode_braille', decode_braille) + TOOL_REGISTRY.register('decode_directional_override', decode_directional_override) + TOOL_REGISTRY.register('decode_hangul_filler', decode_hangul_filler) + TOOL_REGISTRY.register('decode_math_alphanumeric', decode_math_alphanumeric) + TOOL_REGISTRY.register('decode_emoji_skin_tone', decode_emoji_skin_tone) + +_register_all_tools()