fix: address whole-project code review (visible all/batch, engine consolidation, I/O)

Nine findings from a high-effort project-wide review, fixed and verified
(571 passed, ruff/pyright clean):

Correctness:
- all/batch now remove Doubao/Jimeng/Samsung visible text marks: the visible
  step routes through the registry (new cli._remove_visible_auto) instead of a
  hardcoded GeminiEngine, so they no longer leave the wordmark intact.
- batch always reads the original source (dropped the out_path-reuse that
  re-processed already-cleaned outputs on a re-run).
- img2img_runner only retries the diffusion call on the deprecated-callback
  TypeError; any other TypeError now propagates instead of double-running.
- gemini detect/remove and the reverse-alpha engines normalize channels via a
  new image_io.to_bgr, fixing a grayscale/BGRA crash in the FP-gate path.
- _png_late_metadata advances its cursor by the clamped length, so a malformed
  chunk length no longer aborts the late AI-label scan.

Cleanup / efficiency:
- Consolidate the ~90%-identical Doubao/Jimeng/Samsung engines into a shared
  config-driven _text_mark_engine.TextMarkEngine base; each engine is now a thin
  subclass (TextMarkConfig + test shims). Behavior is byte-exact (the three
  engine test suites pass unchanged). Registry adapters collapse to one
  _text_mark(...) row each. Gemini stays a separate engine.
- scan_head is memoized per (path, size, mtime), so identify() reads the file
  head once instead of ~8 times.
- invisible_engine post-processing decodes/encodes the output once (chained in
  memory) instead of 2-4 times across stages.
- Remove the orphaned get_model_id_for_profile (+ CONTROLNET_PROFILE); derive
  the --strength help from the strength constants (strength_default_help) so it
  cannot drift; share the --pipeline/--strength click options; simplify the
  retired --auto resolver.

Net -835 lines. Tests added for the registry-routed visible pass, to_bgr,
the polish/model/guidance wiring, and strength_default_help. CLAUDE.md updated
for the new base module, the engine/registry changes, image_io.to_bgr, and the
scan_head cache.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Victor Kuznetsov
2026-06-09 13:21:13 -07:00
parent b1189549b8
commit 2fcd00ced0
17 changed files with 777 additions and 1264 deletions
+7 -6
View File
File diff suppressed because one or more lines are too long
@@ -0,0 +1,349 @@
"""Shared base for the reverse-alpha visible text-mark engines.
The Doubao "豆包AI生成", Jimeng "★ 即梦AI", and Samsung "✦ Contenuti generati
dall'AI" marks are the SAME algorithm: anchor a bottom-corner box by width-relative
geometry, extract the light low-saturation glyph candidate, detect by matching the
bundled alpha-glyph silhouette via ``TM_CCOEFF_NORMED``, and remove by inverting the
alpha blend ``original = (wm - a*logo)/(1-a)`` (always trying fixed AND NCC-aligned
placement, keeping the lower-residual one) plus a thin footprint inpaint.
They differ ONLY in a bounded set of tuned values captured by :class:`TextMarkConfig`:
the constants, the bundled asset, the corner (Doubao/Jimeng bottom-right, Samsung
bottom-left), and a few structural knobs (the morphology-open kernel size and the
minimum glyph width used by the alignment / template-match). Each engine module is a
thin :class:`TextMarkEngine` subclass plus the test-facing module constants/helpers.
Gemini stays a SEPARATE engine (``gemini_engine``): its multi-size fixed-slot sparkle
model is genuinely different, not a tuned variant of this one.
"""
# cv2/numpy boundary: third-party libs ship no usable element types; relax the
# unknown-type rules for this file only.
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
import cv2
import numpy as np
from remove_ai_watermarks import image_io
if TYPE_CHECKING:
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class TextMarkConfig:
"""All per-mark tuning for a reverse-alpha text-mark engine."""
name: str # short label for log lines (e.g. "Doubao")
asset_name: str # bundled alpha PNG under assets/ (e.g. "doubao_alpha.png")
corner: Literal["br", "bl"] # bottom-right (Doubao/Jimeng) or bottom-left (Samsung)
margin_floor: int # min margin in px for locate (4 for br marks, 2 for Samsung)
# locate geometry (fraction of image WIDTH)
width_frac: float
height_frac: float
margin_x_frac: float # right margin (br) or left margin (bl)
margin_bottom_frac: float
# glyph appearance
max_saturation: float
logo_min_luma: float
tophat_delta: float
morph_open_size: int # MORPH_OPEN kernel side (5 for br marks, 3 for Samsung)
# detection
detect_min_coverage: float
detect_ncc_threshold: float
# alpha-map geometry (fraction of WIDTH) emitted by scripts/visible_alpha_solve.py
alpha_width_frac: float
alpha_height_frac: float
alpha_margin_x_frac: float
alpha_margin_bottom_frac: float
alpha_align_search: tuple[float, float, int] # np.linspace(start, stop, num) scale search
min_gw: int # minimum glyph width for the template match / align search (8 br, 16 Samsung)
alpha_logo_bgr: tuple[float, float, float] = (255.0, 255.0, 255.0)
# residual inpaint over the glyph footprint (thin)
residual_alpha_floor: float = 0.05
residual_dilate: int = 5
residual_inpaint_radius: int = 2
@dataclass
class TextMarkLocation:
"""Located watermark box, in absolute pixel coordinates."""
x: int
y: int
w: int
h: int
is_fallback: bool = True # geometry anchor (no template match) -> always True for now
@property
def bbox(self) -> tuple[int, int, int, int]:
return self.x, self.y, self.w, self.h
@dataclass
class TextMarkDetection:
"""Result of visible text-mark detection."""
detected: bool = False
confidence: float = 0.0
region: tuple[int, int, int, int] = (0, 0, 0, 0)
coverage: float = 0.0 # fraction of the box occupied by glyph pixels
# Alpha / silhouette templates, cached per asset name (the originals cached per
# module global; this keys by asset so the three engines share the loader without
# re-reading). Only SUCCESSFUL loads are cached, so a missing asset is retried.
_alpha_cache: dict[str, NDArray[Any]] = {}
_silhouette_cache: dict[str, NDArray[Any]] = {}
def load_alpha_template(asset_name: str) -> NDArray[Any] | None:
"""Lazily load the bundled alpha template (float [0,1]) for ``asset_name``, or None."""
cached = _alpha_cache.get(asset_name)
if cached is not None:
return cached
path = Path(__file__).parent / "assets" / asset_name
img = image_io.imread(str(path), cv2.IMREAD_GRAYSCALE)
if img is None:
return None
_alpha_cache[asset_name] = img.astype(np.float32) / 255.0
return _alpha_cache[asset_name]
def glyph_silhouette(asset_name: str) -> NDArray[Any] | None:
"""Binary glyph silhouette (255 = glyph) from the bundled alpha map, or None."""
cached = _silhouette_cache.get(asset_name)
if cached is not None:
return cached
at = load_alpha_template(asset_name)
if at is None:
return None
_silhouette_cache[asset_name] = (at > 0.15).astype(np.uint8) * 255
return _silhouette_cache[asset_name]
def template_match_score(box_mask: NDArray[Any], image_width: int, config: TextMarkConfig) -> float:
"""Zero-mean normalized correlation of the alpha-template glyph silhouette
(scaled to the mark's expected size) against the candidate ``box_mask``.
``TM_CCOEFF_NORMED`` keys on glyph SHAPE, not coverage, so a dense textured
corner does not score highly -- only the actual glyph shape does.
"""
sil = glyph_silhouette(config.asset_name)
if sil is None or box_mask.size == 0:
return 0.0
gw = min(box_mask.shape[1] - 1, max(config.min_gw, int(config.alpha_width_frac * image_width)))
gh = min(box_mask.shape[0] - 1, max(4, int(config.alpha_height_frac * image_width)))
if gw < config.min_gw or gh < 4:
return 0.0
template = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
return float(cv2.matchTemplate(box_mask, template, cv2.TM_CCOEFF_NORMED).max())
class TextMarkEngine:
"""Reverse-alpha visible text-mark remover (locate -> mask -> detect -> reverse-alpha)."""
def __init__(self, config: TextMarkConfig) -> None:
self.config = config
# ── Templates (delegate to the asset-keyed module cache) ────────────
def _alpha_template(self) -> NDArray[Any] | None:
return load_alpha_template(self.config.asset_name)
def _glyph_silhouette(self) -> NDArray[Any] | None:
return glyph_silhouette(self.config.asset_name)
def _template_match_score(self, box_mask: NDArray[Any], image_width: int) -> float:
return template_match_score(box_mask, image_width, self.config)
# ── Locate ──────────────────────────────────────────────────────────
def locate(self, image: NDArray[Any]) -> TextMarkLocation:
"""Anchor the watermark box in the configured bottom corner by geometry."""
c = self.config
h, w = image.shape[:2]
wm_w = max(40, int(w * c.width_frac))
wm_h = max(16, int(w * c.height_frac))
margin_x = max(c.margin_floor, int(w * c.margin_x_frac))
margin_b = max(c.margin_floor, int(w * c.margin_bottom_frac))
x = max(0, w - margin_x - wm_w) if c.corner == "br" else min(margin_x, max(0, w - wm_w))
y = max(0, h - margin_b - wm_h)
wm_w = min(wm_w, w - x)
wm_h = min(wm_h, h - y)
return TextMarkLocation(x=x, y=y, w=wm_w, h=wm_h, is_fallback=True)
# ── Mask ────────────────────────────────────────────────────────────
def extract_mask(self, image: NDArray[Any], loc: TextMarkLocation) -> NDArray[Any]:
"""Build a full-image uint8 mask (255 = watermark glyph) for the box.
Polarity-aware: the mark is a light, low-saturation gray rendered brighter
than the local background (white top-hat), so a white-paper document is left
untouched (nothing brighter than its surroundings is masked there).
"""
c = self.config
h, w = image.shape[:2]
x, y, bw, bh = loc.bbox
# A degenerate ROI (a sliver from an extremely wide/short image) cannot hold
# the mark and would feed cv2's GaussianBlur/morphology a ~1-px-tall array,
# which can fault native code on some platforms. Skip the cv2 pipeline.
if bh < 16 or bw < 16:
return np.zeros((h, w), np.uint8)
# Normalize the ROI to 3-channel BGR (grayscale / BGRA would break axis=2).
roi = image_io.to_bgr(image[y : y + bh, x : x + bw]).astype(np.float32)
luma = roi.mean(axis=2)
sat = roi.max(axis=2) - roi.min(axis=2)
grayish = sat < c.max_saturation
# Local background model: a strong Gaussian blur (sigma ~ box height); the
# white top-hat (luma - local_bg) lights up bright thin strokes regardless
# of the absolute background level.
sigma = max(4.0, bh * 0.4)
local_bg = cv2.GaussianBlur(luma, (0, 0), sigmaX=sigma, sigmaY=sigma)
tophat = luma - local_bg
cand = grayish & (tophat > c.tophat_delta) & (luma > c.logo_min_luma)
glyph = cand.astype(np.uint8) * 255
glyph = cv2.morphologyEx(glyph, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
k = c.morph_open_size
glyph = cv2.morphologyEx(glyph, cv2.MORPH_OPEN, np.ones((k, k), np.uint8))
mask = np.zeros((h, w), np.uint8)
mask[y : y + bh, x : x + bw] = glyph
return mask
# ── Detect ──────────────────────────────────────────────────────────
def detect(self, image: NDArray[Any]) -> TextMarkDetection:
"""Detect the mark by matching the alpha-template glyph silhouette against
the corner candidate (``TM_CCOEFF_NORMED``); keys on glyph SHAPE, not coverage."""
c = self.config
det = TextMarkDetection()
if image is None or image.size == 0:
return det
loc = self.locate(image)
mask = self.extract_mask(image, loc)
x, y, bw, bh = loc.bbox
box = mask[y : y + bh, x : x + bw]
coverage = float((box > 0).sum()) / float(max(1, bw * bh))
det.region = loc.bbox
det.coverage = coverage
if coverage >= c.detect_min_coverage:
score = self._template_match_score(box, image.shape[1])
det.confidence = score
det.detected = score >= c.detect_ncc_threshold
logger.debug("%s detect: coverage=%.3f ncc=%.2f detected=%s", c.name, coverage, score, det.detected)
return det
# ── Reverse-alpha (recovery + thin residual inpaint) ────────────────
def reverse_alpha_available(self, image: NDArray[Any]) -> bool:
"""True if the bundled alpha map is loadable (NCC alignment places it at any
resolution; the caller still gates on ``detect`` so a clean corner is untouched)."""
return image is not None and image.size > 0 and self._alpha_template() is not None
def _fixed_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Place the template by fixed width-relative geometry (pixel-exact at the
captured width)."""
c = self.config
at = self._alpha_template()
if at is None:
return None
h, w = image.shape[:2]
# Clamp both dims so a wide/short image cannot overflow the slice assignment.
gw = min(w, max(1, int(c.alpha_width_frac * w)))
gh = min(h, max(1, int(c.alpha_height_frac * w)))
if c.corner == "br":
ax = max(0, w - int(c.alpha_margin_x_frac * w) - gw)
else: # bottom-left
ax = min(max(0, int(c.alpha_margin_x_frac * w)), max(0, w - gw))
ay = max(0, h - int(c.alpha_margin_bottom_frac * w) - gh)
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _aligned_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Register the captured template to the actual mark via a TM_CCOEFF_NORMED
scale + position search. Returns ``(alpha_map, glyph_bbox)`` or None."""
c = self.config
at = self._alpha_template()
sil = self._glyph_silhouette()
if at is None or sil is None:
return None
h, w = image.shape[:2]
loc = self.locate(image)
bx, by, bw, bh = loc.bbox
box_mask = self.extract_mask(image, loc)[by : by + bh, bx : bx + bw]
expected = c.alpha_width_frac * w
best: tuple[float, int, int, int, int] | None = None
for scale in np.linspace(*c.alpha_align_search):
gw, gh = int(expected * scale), int(c.alpha_height_frac * w * scale)
if gw < c.min_gw or gh < 4 or gw >= bw or gh >= bh:
continue
t = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
_, score, _, top_left = cv2.minMaxLoc(cv2.matchTemplate(box_mask, t, cv2.TM_CCOEFF_NORMED))
if best is None or score > best[0]:
best = (score, gw, gh, top_left[0], top_left[1])
if best is None:
return None
_, gw, gh, ox, oy = best
ax, ay = bx + ox, by + oy
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _apply_reverse_alpha(self, image: NDArray[Any], amap: NDArray[Any]) -> NDArray[Any]:
"""Invert the alpha blend with ``amap``: ``original = (wm - a*logo)/(1-a)``."""
a3 = np.clip(amap, 0.0, 1.0)[:, :, None]
logo = np.array(self.config.alpha_logo_bgr, np.float32)
return np.clip((image.astype(np.float32) - a3 * logo) / np.clip(1.0 - a3, 0.25, 1.0), 0, 255).astype(np.uint8)
def remove_watermark_reverse_alpha(self, image: NDArray[Any], *, residual_inpaint: bool = True) -> NDArray[Any]:
"""Recover the original pixels by inverting the alpha blend, then clear the
residual outline with a thin inpaint over the glyph footprint.
Placement: fixed geometry AND the NCC-aligned placement are always tried and
the one leaving the least residual mark (lowest re-``detect`` confidence) is
kept -- the mark re-rasterizes a few px per image, so fixed geometry alone is
not reliable. A single capture cannot pixel-cancel the mark on every image, so
a deliberately THIN residual inpaint (``residual_*``) follows: reverse-alpha
has already recovered the true background under the mark, so the inpaint only
finishes the residual edges instead of smearing the whole footprint. Call only
when :meth:`reverse_alpha_available` and the mark is detected.
"""
c = self.config
# Normalize to 3-channel BGR (the reverse-alpha math assumes a 3-channel logo).
image = image_io.to_bgr(image)
# An image too small to hold the mark would make the geometry boxes degenerate
# and feed cv2.resize a ~1-px-tall target; skip cv2 entirely.
h, w = image.shape[:2]
if h < 32 or w < 64:
return image.copy()
maps = [m for m in (self._fixed_alpha_map(image), self._aligned_alpha_map(image)) if m is not None]
if not maps:
return image.copy()
best_out: NDArray[Any] | None = None
best_amap: NDArray[Any] | None = None
best_residual = float("inf")
for amap, _region in maps:
out = self._apply_reverse_alpha(image, amap)
residual = self.detect(out).confidence
if residual < best_residual:
best_residual, best_out, best_amap = residual, out, amap
if best_out is None or best_amap is None: # pragma: no cover - maps is non-empty
return image.copy()
if residual_inpaint:
kernel = np.ones((c.residual_dilate, c.residual_dilate), np.uint8)
rm = cv2.dilate((best_amap > c.residual_alpha_floor).astype(np.uint8) * 255, kernel)
best_out = cv2.inpaint(best_out, rm, c.residual_inpaint_radius, cv2.INPAINT_NS)
return best_out
+34 -38
View File
@@ -29,7 +29,6 @@ if TYPE_CHECKING:
from numpy.typing import NDArray
from remove_ai_watermarks.gemini_engine import DetectionResult
# --- plain-text output layer (replaces rich: no colors, no markup, no boxes) ---
@@ -291,15 +290,32 @@ def _warn_if_esrgan_unavailable(upscaler: str) -> None:
console.print(" Note: --upscaler esrgan needs the 'esrgan' extra; falling back to Lanczos.")
def _watermark_region(det: DetectionResult, width: int, height: int) -> tuple[int, int, int, int]:
"""Pick a watermark bbox: detector's region if confident, else the default config slot."""
if det.confidence > 0.15:
return det.region
from remove_ai_watermarks.gemini_engine import get_watermark_config
def _remove_visible_auto(
image: NDArray[Any],
*,
inpaint: bool = True,
inpaint_method: str = "ns",
inpaint_strength: float = 0.85,
) -> tuple[NDArray[Any], str | None]:
"""Remove the strongest auto-detected visible mark via the registry.
config = get_watermark_config(width, height)
px, py = config.get_position(width, height)
return (px, py, config.logo_size, config.logo_size)
Routes the ``all``/``batch`` visible step through the same registry path the
standalone ``visible`` command uses, so EVERY registered mark is handled (the
Gemini sparkle AND the Doubao/Jimeng/Samsung text marks), not just the sparkle.
Returns ``(result, label-or-None)``; when no ``in_auto`` mark fires the image is
returned unchanged with ``None``. ``inpaint*`` tune the Gemini edge-residual
cleanup only (the text engines ignore them).
"""
from remove_ai_watermarks import watermark_registry
best = watermark_registry.best_auto_mark(image)
if best is None:
return image, None
method: Literal["telea", "ns"] = "ns" if inpaint_method == "ns" else "telea"
result, _ = watermark_registry.get_mark(best.key).remove(
image, inpaint_method=method, inpaint=inpaint, inpaint_strength=inpaint_strength, force=False
)
return result, best.label
def _read_bgr_and_alpha(path: Path) -> tuple[NDArray[Any] | None, NDArray[Any] | None]:
@@ -893,8 +909,6 @@ def cmd_all(
If invisible watermark deps are not installed, skips step 2 with a warning.
"""
from remove_ai_watermarks.gemini_engine import GeminiEngine
_banner()
source = _validate_image(source)
_warn_if_esrgan_unavailable(upscaler)
@@ -918,7 +932,6 @@ def cmd_all(
# -- Step 1: Visible watermark --------------------------------
console.print("\n 1) Visible watermark removal")
engine = GeminiEngine()
image, alpha = _read_bgr_and_alpha(source)
if image is None:
console.print(f"Error: Failed to read image: {source}")
@@ -928,15 +941,10 @@ def cmd_all(
console.print(f" Input: {source.name} ({w}x{h})")
with console.status("Removing visible watermark..."):
det = engine.detect_watermark(image)
if det.detected:
result = engine.remove_watermark(image)
if inpaint:
region = _watermark_region(det, w, h)
result = engine.inpaint_residual(result, region, method=inpaint_method)
console.print(" Visible watermark removed")
result, removed_label = _remove_visible_auto(image, inpaint=inpaint, inpaint_method=inpaint_method)
if removed_label is not None:
console.print(f" Visible watermark removed ({removed_label})")
else:
result = image.copy()
console.print(" Skipped (no visible watermark detected)")
# Save to temp file for invisible engine input (preserve alpha if present)
@@ -1058,27 +1066,15 @@ def _process_batch_image(
saved_alpha: NDArray[Any] | None = None
if mode in ("visible", "all"):
from remove_ai_watermarks.gemini_engine import GeminiEngine
if "_vis_engine" not in ctx.obj:
ctx.obj["_vis_engine"] = GeminiEngine()
engine = ctx.obj["_vis_engine"]
read_path = img_path
if mode == "all" and out_path.exists():
read_path = out_path
image, alpha = _read_bgr_and_alpha(read_path)
# Always read the ORIGINAL source: the visible pass is the first step, so a
# stale out_path from a previous run must not be re-processed as if it were
# the input. (The invisible step below reads out_path for `all` -- that chain
# is within a single run.)
image, alpha = _read_bgr_and_alpha(img_path)
if image is None:
raise ValueError("Failed to read image")
det = engine.detect_watermark(image)
if det.detected:
result = engine.remove_watermark(image)
if inpaint:
h, w = image.shape[:2]
region = _watermark_region(det, w, h)
result = engine.inpaint_residual(result, region)
else:
result = image.copy()
result, _ = _remove_visible_auto(image, inpaint=inpaint)
_write_bgr_with_alpha(out_path, result, alpha)
saved_alpha = alpha
+64 -359
View File
@@ -4,417 +4,122 @@ Doubao (ByteDance) stamps every generated image with a visible "豆包AI生成"
(Doubao AI generated) text strip in the bottom-right corner -- the explicit AIGC
label mandated by China's TC260 standard, a near-white semi-transparent overlay.
Like the Gemini sparkle and the Jimeng wordmark, it is a fixed overlay, so removal
starts from **reverse-alpha blending** against a captured alpha map
(``remove_watermark_reverse_alpha``): ``original = (wm - a*logo)/(1-a)``. The alpha
map is rebuilt by ``scripts/visible_alpha_solve.py`` from black/gray Doubao captures
(the careful gray-self solve; logo is pure white) and bundled as
``assets/doubao_alpha.png``. The mark re-rasterizes a few px off per image, so
removal ALWAYS NCC-aligns the template to the actual mark and then clears the
residual edges with a deliberately THIN inpaint over the glyph footprint (an
earlier under-estimated alpha + fixed-no-inpaint left a readable outline that the
detector did not flag -- see the reverse-alpha section below).
Detection (``detect``) is shape-consistent: it matches that same alpha glyph
silhouette against the corner via normalized correlation, so it keys on the actual
"豆包AI生成" shape rather than coverage/structure heuristics.
``locate`` (geometry box, scales with image WIDTH) and ``extract_mask`` (the
candidate glyph mask the detector correlates) mirror the Jimeng engine.
Arbitrary-region inpainting still lives in ``region_eraser`` / the ``erase``
command. Fast, offline, no GPU.
Removal is **reverse-alpha blending** against a captured alpha map
(``original = (wm - a*logo)/(1-a)``), always NCC-aligned to the actual mark plus a
thin residual inpaint over the glyph footprint. This is one of the three text-mark
engines that share :class:`remove_ai_watermarks._text_mark_engine.TextMarkEngine`;
this module supplies only Doubao's tuned :class:`TextMarkConfig` (bottom-right corner,
``assets/doubao_alpha.png`` rebuilt by ``scripts/visible_alpha_solve.py``). Arbitrary-
region inpainting still lives in ``region_eraser`` / the ``erase`` command.
"""
# The module-level _alpha_template / _glyph_silhouette / _template_match_score below
# are thin test-facing shims (imported by tests/), so pyright's src-only pass sees them
# as unused; the use is cross-module.
# pyright: reportUnusedFunction=false
# cv2/numpy boundary: third-party libs ship no usable element types; relax the
# unknown-type rules for this file only.
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import cv2
import numpy as np
from remove_ai_watermarks import _text_mark_engine
from remove_ai_watermarks._text_mark_engine import TextMarkConfig, TextMarkDetection, TextMarkEngine
if TYPE_CHECKING:
from pathlib import Path
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
# Geometry as a fraction of image WIDTH. The Doubao mark scales with width and
# is anchored bottom-right. The box must be GENEROUSLY wider than the mark and
# reach close to the corner -- the mark is re-rasterized a few px off per image,
# and the NCC alignment search only registers within this box, so a tight box
# (the old 0.185 / margin 0.012) let a corner-ward shift fall partly outside it
# and the alignment missed. The glyph mask tightens the actual removal.
# Locate geometry as a fraction of image WIDTH (the mark scales with width, anchored
# bottom-right). The box is GENEROUSLY wider than the mark and reaches close to the
# corner so a per-image re-rasterization shift stays inside the NCC alignment search.
WM_WIDTH_FRAC = 0.22
WM_HEIGHT_FRAC = 0.075
MARGIN_RIGHT_FRAC = 0.004
MARGIN_BOTTOM_FRAC = 0.004
# Glyph appearance: the label is a low-saturation light gray, rendered brighter
# than the surrounding content (the common case: a generated photo/illustration).
# We detect it as a local bright feature (white top-hat: brighter than a blurred
# local background) intersected with the grayish + minimum-brightness tests.
# This is polarity-correct for bright-on-darker backgrounds and, crucially,
# leaves white-paper documents untouched (there the mark is not brighter than
# its surroundings, so nothing is masked rather than damaging the document text).
# Glyph appearance: a light, low-saturation gray rendered brighter than the local
# background (white top-hat), so a white-paper document is left untouched.
MAX_SATURATION = 55 # max channel spread to count a pixel as "grayish"
LOGO_MIN_LUMA = 150 # glyphs are at least this bright in absolute terms
TOPHAT_DELTA = 12 # glyph must exceed the local background by this many levels
# Detection is reverse-alpha-consistent: the mark is recognized by matching the
# bundled alpha-template glyph silhouette (assets/doubao_alpha.png -- the exact
# shape we invert) against the extracted candidate mask via zero-mean normalized
# correlation (cv2 TM_CCOEFF_NORMED). It keys on the actual "豆包AI生成" glyph
# SHAPE, not on coverage/structure heuristics, so a merely-textured corner does
# not fire (the old coverage detector false-positived on ~28% of images; #23).
# Corpus-tuned: real marks score median ~0.61, arbitrary corners <=0.17 (p99);
# threshold 0.4 -> false positives 7/1243 (0.6%). A small coverage floor skips
# the template match on a near-empty candidate box.
# Shape-consistent detection: match the bundled alpha glyph silhouette against the
# corner candidate via TM_CCOEFF_NORMED (keys on glyph SHAPE, not coverage; #23).
DETECT_MIN_COVERAGE = 0.04
DETECT_NCC_THRESHOLD = 0.4
# ── Reverse-alpha (recovery + thin residual inpaint) ─────────────────
# The Doubao mark is a fixed semi-transparent white overlay, so given its alpha
# map the original pixels are recovered by inverting the blend: (wm - a*logo)/(1-a).
# The alpha map is rebuilt by scripts/visible_alpha_solve.py from the black/gray
# Doubao captures (data/doubao_capture/): the CAREFUL solve -- a = (I - B)/(255 - B)
# on the gray capture with B a per-channel cubic background fit, mean over channels,
# full halo extent, unblurred. The earlier build (a coarser solve) under-estimated
# the alpha and left a clearly READABLE "豆包AI生成" outline on real samples
# (issue #13 follow-up: the detector was fooled by the outline -- conf 0.0 -- so the
# test passed while the result was visibly bad; suspect the captured alpha map, not
# the method). The mark is re-rasterized and a few px off per image, so removal
# does NOT trust fixed geometry: it ALWAYS tries fixed AND `_aligned_alpha_map`'s
# TM_CCOEFF_NORMED scale+position search and keeps the lower-residual placement,
# then a deliberately THIN residual inpaint clears the leftover edges without
# smearing the recovered texture. Geometry below is emitted by the solver -- keep in
# sync when the asset is rebuilt.
# Reverse-alpha geometry, emitted by scripts/visible_alpha_solve.py at the captured
# width. Removal always tries fixed AND NCC-aligned placement and keeps the lower
# residual, then a thin footprint inpaint clears the leftover edges.
_ALPHA_NATIVE_WIDTH = 2048
_ALPHA_LOGO_BGR: tuple[float, float, float] = (255.0, 255.0, 255.0)
_ALPHA_WIDTH_FRAC = 0.1636 # asset width / image width -- the alignment scale seed
_ALPHA_HEIGHT_FRAC = 0.0405
# Margins (of image WIDTH) of the captured mark -- the geometry record / where to
# seed; alignment refines the actual position, so these are not load-bearing.
_ALPHA_MARGIN_RIGHT_FRAC = 0.0132
_ALPHA_MARGIN_BOTTOM_FRAC = 0.0166
# Alignment scale search (np.linspace args) around the width-scaled glyph size.
_ALPHA_ALIGN_SEARCH = (0.88, 1.12, 25)
# Residual inpaint over the glyph footprint -- thin (NS, small radius) so it clears
# the leftover edges without the smear a wide full-footprint pass caused.
_RESIDUAL_ALPHA_FLOOR = 0.05
_RESIDUAL_DILATE = 5
_RESIDUAL_INPAINT_RADIUS = 2
_alpha_template_cache: NDArray[Any] | None = None
_CONFIG = TextMarkConfig(
name="Doubao",
asset_name="doubao_alpha.png",
corner="br",
margin_floor=4,
width_frac=WM_WIDTH_FRAC,
height_frac=WM_HEIGHT_FRAC,
margin_x_frac=MARGIN_RIGHT_FRAC,
margin_bottom_frac=MARGIN_BOTTOM_FRAC,
max_saturation=MAX_SATURATION,
logo_min_luma=LOGO_MIN_LUMA,
tophat_delta=TOPHAT_DELTA,
morph_open_size=5,
detect_min_coverage=DETECT_MIN_COVERAGE,
detect_ncc_threshold=DETECT_NCC_THRESHOLD,
alpha_width_frac=_ALPHA_WIDTH_FRAC,
alpha_height_frac=_ALPHA_HEIGHT_FRAC,
alpha_margin_x_frac=_ALPHA_MARGIN_RIGHT_FRAC,
alpha_margin_bottom_frac=_ALPHA_MARGIN_BOTTOM_FRAC,
alpha_align_search=_ALPHA_ALIGN_SEARCH,
min_gw=8,
alpha_logo_bgr=_ALPHA_LOGO_BGR,
residual_alpha_floor=_RESIDUAL_ALPHA_FLOOR,
residual_dilate=_RESIDUAL_DILATE,
residual_inpaint_radius=_RESIDUAL_INPAINT_RADIUS,
)
# Doubao-specific aliases for the shared detection result/engine.
DoubaoDetection = TextMarkDetection
def _alpha_template() -> NDArray[Any] | None:
"""Lazily load the bundled Doubao alpha template (float [0,1]), or None."""
global _alpha_template_cache
if _alpha_template_cache is None:
from pathlib import Path
from remove_ai_watermarks import image_io
path = Path(__file__).parent / "assets" / "doubao_alpha.png"
img = image_io.imread(str(path), cv2.IMREAD_GRAYSCALE)
if img is None:
return None
_alpha_template_cache = img.astype(np.float32) / 255.0
return _alpha_template_cache
@dataclass(frozen=True)
class DoubaoLocation:
"""Located watermark box (bottom-right), in absolute pixel coordinates."""
x: int
y: int
w: int
h: int
is_fallback: bool = True # geometry anchor (no template match) -> always True for now
@property
def bbox(self) -> tuple[int, int, int, int]:
return self.x, self.y, self.w, self.h
@dataclass
class DoubaoDetection:
"""Result of visible Doubao watermark detection."""
detected: bool = False
confidence: float = 0.0
region: tuple[int, int, int, int] = (0, 0, 0, 0)
coverage: float = 0.0 # fraction of the box occupied by glyph pixels
_silhouette_cache: NDArray[Any] | None = None
"""The bundled Doubao alpha template (float [0,1]), or None."""
return _text_mark_engine.load_alpha_template(_CONFIG.asset_name)
def _glyph_silhouette() -> NDArray[Any] | None:
"""Binary "豆包AI生成" silhouette (255 = glyph) from the bundled alpha map,
used as the detection template. None if the alpha asset is missing."""
global _silhouette_cache
if _silhouette_cache is None:
at = _alpha_template()
if at is None:
return None
_silhouette_cache = (at > 0.15).astype(np.uint8) * 255
return _silhouette_cache
"""Binary "豆包AI生成" silhouette (255 = glyph) from the alpha map, or None."""
return _text_mark_engine.glyph_silhouette(_CONFIG.asset_name)
def _template_match_score(box_mask: NDArray[Any], image_width: int) -> float:
"""Zero-mean normalized correlation of the alpha-template glyph silhouette
(scaled to the mark's expected size) against the candidate ``box_mask``.
TM_CCOEFF_NORMED keys on glyph SHAPE, not coverage, so a dense textured
corner does not score highly -- only the actual "豆包AI生成" shape does.
"""
sil = _glyph_silhouette()
if sil is None or box_mask.size == 0:
return 0.0
gw = min(box_mask.shape[1] - 1, max(8, int(_ALPHA_WIDTH_FRAC * image_width)))
gh = min(box_mask.shape[0] - 1, max(4, int(_ALPHA_HEIGHT_FRAC * image_width)))
if gw < 8 or gh < 4:
return 0.0
template = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
return float(cv2.matchTemplate(box_mask, template, cv2.TM_CCOEFF_NORMED).max())
"""TM_CCOEFF_NORMED of the Doubao glyph silhouette against ``box_mask``."""
return _text_mark_engine.template_match_score(box_mask, image_width, _CONFIG)
class DoubaoEngine:
"""Remove the visible Doubao "豆包AI生成" watermark (locate -> mask -> inpaint)."""
class DoubaoEngine(TextMarkEngine):
"""Remove the visible Doubao "豆包AI生成" watermark (locate -> mask -> reverse-alpha)."""
def __init__(
self,
*,
width_frac: float = WM_WIDTH_FRAC,
height_frac: float = WM_HEIGHT_FRAC,
margin_right_frac: float = MARGIN_RIGHT_FRAC,
margin_bottom_frac: float = MARGIN_BOTTOM_FRAC,
) -> None:
self.width_frac = width_frac
self.height_frac = height_frac
self.margin_right_frac = margin_right_frac
self.margin_bottom_frac = margin_bottom_frac
# ── Locate ────────────────────────────────────────────────────────
def locate(self, image: NDArray[Any]) -> DoubaoLocation:
"""Anchor the watermark box in the bottom-right corner by geometry."""
h, w = image.shape[:2]
wm_w = max(40, int(w * self.width_frac))
wm_h = max(16, int(w * self.height_frac))
margin_r = max(4, int(w * self.margin_right_frac))
margin_b = max(4, int(w * self.margin_bottom_frac))
x = max(0, w - margin_r - wm_w)
y = max(0, h - margin_b - wm_h)
wm_w = min(wm_w, w - x)
wm_h = min(wm_h, h - y)
return DoubaoLocation(x=x, y=y, w=wm_w, h=wm_h, is_fallback=True)
# ── Mask ──────────────────────────────────────────────────────────
def extract_mask(self, image: NDArray[Any], loc: DoubaoLocation) -> NDArray[Any]:
"""Build a full-image uint8 mask (255 = watermark glyph) for the box.
Polarity-aware: the mark is a light, low-saturation gray. On a dark
background it is the bright region; on a light background it is the
off-white gray below paper-white. Both cases are captured by the logo
luminance band intersected with the grayish constraint, plus a
brighter-than-local-background test on dark backgrounds.
"""
h, w = image.shape[:2]
x, y, bw, bh = loc.bbox
# A degenerate ROI (a sliver from an extremely wide/short image) cannot hold
# the mark and would feed cv2's GaussianBlur/morphology a ~1-px-tall array,
# which can fault the native code on some platforms (observed: a Windows
# access violation via the always-align removal's residual `detect`). Skip
# the cv2 pipeline and return an empty mask there.
if bh < 16 or bw < 16:
return np.zeros((h, w), np.uint8)
# Normalize the ROI to 3-channel BGR: a 2D grayscale or 4-channel BGRA
# input would otherwise break the axis=2 channel reductions below.
roi = image[y : y + bh, x : x + bw]
if roi.ndim == 2:
roi = cv2.cvtColor(roi, cv2.COLOR_GRAY2BGR)
elif roi.shape[2] == 4:
roi = cv2.cvtColor(roi, cv2.COLOR_BGRA2BGR)
roi = roi.astype(np.float32)
luma = roi.mean(axis=2)
sat = roi.max(axis=2) - roi.min(axis=2)
grayish = sat < MAX_SATURATION
# Local background model: a strong Gaussian blur (sigma ~ box height)
# approximates the content under the glyphs. The white top-hat
# (luma - local_bg) lights up bright thin strokes regardless of the
# absolute background level.
sigma = max(4.0, bh * 0.4)
local_bg = cv2.GaussianBlur(luma, (0, 0), sigmaX=sigma, sigmaY=sigma)
tophat = luma - local_bg
cand = grayish & (tophat > TOPHAT_DELTA) & (luma > LOGO_MIN_LUMA)
glyph = cand.astype(np.uint8) * 255
# Connect glyph parts, then drop isolated specks (5x5 open clears the
# scattered grayish pixels that random/textured corners produce).
glyph = cv2.morphologyEx(glyph, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
glyph = cv2.morphologyEx(glyph, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
mask = np.zeros((h, w), np.uint8)
mask[y : y + bh, x : x + bw] = glyph
return mask
# ── Detect ────────────────────────────────────────────────────────
def detect(self, image: NDArray[Any]) -> DoubaoDetection:
"""Detect the visible Doubao mark by matching the alpha-template glyph
silhouette against the corner candidate (TM_CCOEFF_NORMED).
Keys on the "豆包AI生成" SHAPE, not coverage, so a textured corner does
not fire. ``confidence`` is the correlation score; ``detected`` is it
clearing ``DETECT_NCC_THRESHOLD``.
"""
det = DoubaoDetection()
if image is None or image.size == 0:
return det
loc = self.locate(image)
mask = self.extract_mask(image, loc)
x, y, bw, bh = loc.bbox
box = mask[y : y + bh, x : x + bw]
coverage = float((box > 0).sum()) / float(max(1, bw * bh))
det.region = loc.bbox
det.coverage = coverage
if coverage >= DETECT_MIN_COVERAGE:
score = _template_match_score(box, image.shape[1])
det.confidence = score
det.detected = score >= DETECT_NCC_THRESHOLD
logger.debug("Doubao detect: coverage=%.3f ncc=%.2f detected=%s", coverage, score, det.detected)
return det
# ── Reverse-alpha (exact recovery) ────────────────────────────────
def reverse_alpha_available(self, image: NDArray[Any]) -> bool:
"""True if the bundled alpha map is loadable. Sub-pixel NCC alignment
(see ``_aligned_alpha_map``) places it on the actual mark at ANY
resolution, so there is no width gate -- the caller still gates on
``detect`` so a clean corner is never touched."""
return image is not None and image.size > 0 and _alpha_template() is not None
def _fixed_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Place the template by fixed width-relative geometry -- pixel-exact at
the captured width (used there instead of integer-pixel NCC alignment)."""
at = _alpha_template()
if at is None:
return None
h, w = image.shape[:2]
# Glyph box scales with WIDTH; on a wide/short image the height-from-width
# box can exceed the image height. Clamp both dims so the slice assignment
# below cannot overflow (a degenerate 2048x1 input otherwise raised
# ValueError on the broadcast). Normal images are unaffected.
gw = min(w, max(1, int(_ALPHA_WIDTH_FRAC * w)))
gh = min(h, max(1, int(_ALPHA_HEIGHT_FRAC * w)))
ax = max(0, w - int(_ALPHA_MARGIN_RIGHT_FRAC * w) - gw)
ay = max(0, h - int(_ALPHA_MARGIN_BOTTOM_FRAC * w) - gh)
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _aligned_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Build a full-image alpha map with the captured template registered to
the actual mark via a TM_CCOEFF_NORMED scale + position search -- so the
single capture works off the captured width (a pure width-scale ghosts).
Returns ``(alpha_map, glyph_bbox)`` or None."""
at = _alpha_template()
sil = _glyph_silhouette()
if at is None or sil is None:
return None
h, w = image.shape[:2]
loc = self.locate(image)
bx, by, bw, bh = loc.bbox
box_mask = self.extract_mask(image, loc)[by : by + bh, bx : bx + bw]
expected = _ALPHA_WIDTH_FRAC * w
best: tuple[float, int, int, int, int] | None = None
for scale in np.linspace(*_ALPHA_ALIGN_SEARCH):
gw, gh = int(expected * scale), int(_ALPHA_HEIGHT_FRAC * w * scale)
if gw < 8 or gh < 4 or gw >= bw or gh >= bh:
continue
t = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
_, score, _, top_left = cv2.minMaxLoc(cv2.matchTemplate(box_mask, t, cv2.TM_CCOEFF_NORMED))
if best is None or score > best[0]:
best = (score, gw, gh, top_left[0], top_left[1])
if best is None:
return None
_, gw, gh, ox, oy = best
ax, ay = bx + ox, by + oy
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _apply_reverse_alpha(self, image: NDArray[Any], amap: NDArray[Any]) -> NDArray[Any]:
"""Invert the alpha blend with ``amap``: ``original = (wm - a*logo)/(1-a)``."""
a3 = np.clip(amap, 0.0, 1.0)[:, :, None]
logo = np.array(_ALPHA_LOGO_BGR, np.float32)
return np.clip((image.astype(np.float32) - a3 * logo) / np.clip(1.0 - a3, 0.25, 1.0), 0, 255).astype(np.uint8)
def remove_watermark_reverse_alpha(self, image: NDArray[Any], *, residual_inpaint: bool = True) -> NDArray[Any]:
"""Recover the original pixels by inverting the alpha blend
``original = (wm - a*logo)/(1-a)``, then clear the residual edges with a
thin inpaint over the glyph footprint.
Placement: fixed geometry AND the NCC-aligned placement are always tried and
the one leaving the least residual mark (lowest re-``detect`` confidence) is
kept -- the mark is re-rasterized and a few px off per image, so fixed
geometry alone leaves a visible outline (it did on the doubao-1.png sample).
A single capture cannot pixel-cancel the mark on every image, so a
deliberately THIN residual inpaint (``_RESIDUAL_*``) follows: reverse-alpha
has already recovered the true background under the mark, so the inpaint only
finishes the leftover edges instead of smearing the whole footprint.
Call only when :meth:`reverse_alpha_available` and the mark is detected.
"""
# Normalize to 3-channel BGR so a 2D grayscale or 4-channel BGRA input
# does not break the reverse-alpha math (which assumes a 3-channel logo).
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
# An image too small to hold the mark would make the geometry boxes
# degenerate and feed cv2.resize a ~1-px-tall target / GaussianBlur a sliver
# ROI, which faults natively on Windows (access violation / "Unknown C++
# exception"). No real watermarked image is this small; skip cv2 entirely.
h, w = image.shape[:2]
if h < 32 or w < 64:
return image.copy()
maps = [c for c in (self._fixed_alpha_map(image), self._aligned_alpha_map(image)) if c is not None]
if not maps:
return image.copy()
best_out: NDArray[Any] | None = None
best_amap: NDArray[Any] | None = None
best_residual = float("inf")
for amap, _region in maps:
out = self._apply_reverse_alpha(image, amap)
residual = self.detect(out).confidence
if residual < best_residual:
best_residual, best_out, best_amap = residual, out, amap
if best_out is None or best_amap is None: # pragma: no cover - maps is non-empty
return image.copy()
if residual_inpaint:
kernel = np.ones((_RESIDUAL_DILATE, _RESIDUAL_DILATE), np.uint8)
rm = cv2.dilate((best_amap > _RESIDUAL_ALPHA_FLOOR).astype(np.uint8) * 255, kernel)
best_out = cv2.inpaint(best_out, rm, _RESIDUAL_INPAINT_RADIUS, cv2.INPAINT_NS)
return best_out
def __init__(self) -> None:
super().__init__(_CONFIG)
def load_image_bgr(path: str | Path) -> NDArray[Any]:
"""Read an image as BGR ndarray (helper for scripts/tests)."""
from remove_ai_watermarks import image_io
img = image_io.imread(path, cv2.IMREAD_COLOR)
img = image_io.imread(path)
if img is None:
raise FileNotFoundError(f"Failed to read image: {path}")
return img
+11 -9
View File
@@ -28,6 +28,8 @@ from typing import TYPE_CHECKING, Any, Literal
import cv2
import numpy as np
from remove_ai_watermarks import image_io
if TYPE_CHECKING:
from collections.abc import Iterator
@@ -290,6 +292,11 @@ class GeminiEngine:
if image is None or image.size == 0:
return result
# Normalize to 3-channel BGR: the multi-scale search tolerates grayscale, but
# the FP-gate / alpha-gain helpers (_core_and_bg) reduce over axis=2 and would
# crash on a 2D/BGRA input reaching this public entry point (e.g. via the
# registry detect adapter or the library API).
image = image_io.to_bgr(image)
h, w = image.shape[:2]
base_size = force_size or get_watermark_size(w, h)
result.size = base_size
@@ -481,17 +488,10 @@ class GeminiEngine:
Cleaned BGR image as numpy array, or an unmodified copy when no
watermark is detected.
"""
result = image.copy()
# Normalize to 3-channel BGR up front: 2D grayscale (no channel axis) and
# 4-channel BGRA both reach this public entry point and would otherwise
# crash on the channel-count checks / downstream 3-channel math.
if result.ndim == 2:
result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR)
elif result.shape[2] == 4:
result = cv2.cvtColor(result, cv2.COLOR_BGRA2BGR)
elif result.shape[2] == 1:
result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR)
result = image_io.to_bgr(image.copy())
size = force_size or get_watermark_size(result.shape[1], result.shape[0])
@@ -554,7 +554,9 @@ class GeminiEngine:
Returns:
Cleaned BGR image.
"""
result = image.copy()
# Same channel normalization as remove_watermark: the reverse-alpha blend
# assumes 3-channel BGR (a grayscale/BGRA input would mis-broadcast).
result = image_io.to_bgr(image.copy())
x, y, rw, rh = region
# Check standard sizes
+19
View File
@@ -49,6 +49,25 @@ def imread(path: str | Path, flags: int | None = None) -> NDArray[Any] | None:
return cv2.imdecode(data, flags)
def to_bgr(image: NDArray[Any]) -> NDArray[Any]:
"""Return a 3-channel BGR view of ``image``, promoting grayscale and BGRA.
The cv2-based engines (sparkle + the reverse-alpha text marks) assume a
3-channel BGR array for their channel reductions (``mean(axis=2)``, the
per-pixel logo subtraction). A 2D grayscale or 4-channel BGRA input -- a real
Gemini-app export is opaque RGBA -- would otherwise crash or mis-broadcast.
Centralizes the shape coercion that was inlined across the engines. A 3-channel
input is returned unchanged (no copy).
"""
import cv2
if image.ndim == 2 or image.shape[2] == 1:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
if image.shape[2] == 4:
return cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
return image
def imwrite(path: str | Path, img: NDArray[Any]) -> bool:
"""Unicode-safe ``cv2.imwrite``.
+26 -44
View File
@@ -261,8 +261,14 @@ class InvisibleEngine:
vendor=vendor,
)
# Post-processing: optional Humanizer, then restore original resolution.
if humanize > 0.0:
# Post-processing chain: decode the diffusion output ONCE, apply the
# optional stages in memory in order (humanize -> restore original
# resolution -> unsharp -> adaptive polish), and write ONCE. Previously
# each stage independently imread/imwrote the full-res output, so a run
# with several stages PNG-decoded+re-encoded the same image 2-4 times.
# PNG is lossless, so the single-write output is byte-identical.
needs_restore = target is not None # the input was resized before diffusion
if humanize > 0.0 or unsharp > 0.0 or adaptive_polish or needs_restore:
import cv2
from remove_ai_watermarks import image_io
@@ -271,67 +277,43 @@ class InvisibleEngine:
if out_cv is None:
return out_path
if self._progress_callback:
self._progress_callback(f"Applying Analog Humanizer (grain: {humanize})...")
from remove_ai_watermarks.humanizer import apply_analog_humanizer
if humanize > 0.0:
if self._progress_callback:
self._progress_callback(f"Applying Analog Humanizer (grain: {humanize})...")
from remove_ai_watermarks.humanizer import apply_analog_humanizer
out_cv = apply_analog_humanizer(out_cv, grain_intensity=humanize, chromatic_shift=1)
out_cv = apply_analog_humanizer(out_cv, grain_intensity=humanize, chromatic_shift=1)
# Restore original resolution
# Restore original resolution if the input was resized for diffusion.
if (out_cv.shape[1], out_cv.shape[0]) != orig_size:
if self._progress_callback:
self._progress_callback(
f"Upscaling result back to original resolution {orig_size[0]}x{orig_size[1]}..."
)
# Using INTER_LANCZOS4 for high-quality upscaling back to original
out_cv = cv2.resize(out_cv, orig_size, interpolation=cv2.INTER_LANCZOS4)
image_io.imwrite(out_path, out_cv)
else:
# No humanize: still restore the original size if it was capped.
import cv2
from remove_ai_watermarks import image_io
out_cv = image_io.imread(out_path, cv2.IMREAD_COLOR)
if out_cv is not None and (out_cv.shape[1], out_cv.shape[0]) != orig_size:
if self._progress_callback:
self._progress_callback(
f"Upscaling result back to original resolution {orig_size[0]}x{orig_size[1]}..."
)
out_cv = cv2.resize(out_cv, orig_size, interpolation=cv2.INTER_LANCZOS4)
image_io.imwrite(out_path, out_cv)
# Final sharpening.
if unsharp > 0.0:
import cv2
from remove_ai_watermarks import image_io
from remove_ai_watermarks.humanizer import unsharp_mask
out_cv = image_io.imread(out_path, cv2.IMREAD_COLOR)
if out_cv is not None:
if unsharp > 0.0:
if self._progress_callback:
self._progress_callback(f"Sharpening (unsharp mask: {unsharp})...")
image_io.imwrite(out_path, unsharp_mask(out_cv, amount=unsharp))
from remove_ai_watermarks.humanizer import unsharp_mask
# Adaptive polish (CLI default): restore the input's detail level in the
# softened output, sparing text/edges. Self-limiting where there is no deficit.
if adaptive_polish:
import cv2
import numpy as np
out_cv = unsharp_mask(out_cv, amount=unsharp)
from remove_ai_watermarks import humanizer, image_io
# Adaptive polish (CLI default): restore the input's detail level in the
# softened output, sparing text/edges. Self-limiting where no deficit.
if adaptive_polish:
import numpy as np
from remove_ai_watermarks import humanizer
out_cv = image_io.imread(out_path, cv2.IMREAD_COLOR)
if out_cv is not None:
ref = cv2.cvtColor(np.array(reference_pil.convert("RGB")), cv2.COLOR_RGB2BGR)
if (ref.shape[1], ref.shape[0]) != (out_cv.shape[1], out_cv.shape[0]):
ref = cv2.resize(ref, (out_cv.shape[1], out_cv.shape[0]), interpolation=cv2.INTER_LANCZOS4)
if self._progress_callback:
self._progress_callback("Adaptive polish (sharpen + grain to the input's detail level)...")
image_io.imwrite(out_path, humanizer.adaptive_polish(out_cv, ref, seed=seed))
out_cv = humanizer.adaptive_polish(out_cv, ref, seed=seed)
image_io.imwrite(out_path, out_cv)
return out_path
finally:
+66 -372
View File
@@ -1,415 +1,109 @@
"""Jimeng (即梦AI) visible watermark removal engine.
"""Jimeng / Dreamina visible watermark removal engine.
Jimeng / Dreamina (ByteDance's image generator, distinct from Doubao) stamps a
visible "★ 即梦AI" wordmark -- a four-point sparkle icon followed by the 即梦AI
characters -- in the bottom-right corner: a near-white semi-transparent overlay,
the explicit AIGC label under China's TC260 standard.
Jimeng (即梦AI, ByteDance) stamps generated images with a visible "★ 即梦AI" wordmark
in the bottom-right corner -- a near-white semi-transparent overlay, the same overlay
class as the Doubao text strip.
Like the Gemini sparkle and the Doubao strip, it is a fixed overlay, so removal
starts from **reverse-alpha blending** against a captured alpha map
(``remove_watermark_reverse_alpha``): ``original = (wm - a*logo)/(1-a)``. The logo
is pure white (255,255,255); the alpha map was solved from the GRAY Jimeng capture
(see data/jimeng_capture/), bundled as ``assets/jimeng_alpha.png`` -- a careful
build (cubic-background fit, mean over channels, full halo extent, unblurred) that
drops the self-residual to ~1.3. Gray is the chosen background because the mark
sits on bright photo content in real use, not on black.
Unlike the Doubao mark, Jimeng re-rasterizes its mark per generation AND jitters
its position a few px (the alpha maps solved from independent captures correlate
0.998 but not 1.0), so a single 2048 alpha map does not pixel-cancel the mark on
every image/resolution the way Doubao's deterministic overlay does. Removal
therefore NCC-aligns the alpha to the actual mark (always, not only off-native),
reverse-alphas, then clears the residual with a deliberately THIN inpaint over the
glyph footprint. The reverse-alpha pre-step recovers the true background (including
edges) under the semi-transparent mark, so the thin inpaint only finishes the
residual edges rather than smearing the whole footprint -- a wide full-footprint
pass blurred the texture/edges under the mark. Verified clean on the solid captures
(native 2048) and on a real 1440-wide Jimeng download (off-native, table edge kept).
Detection (``detect``) matches the bundled "即梦AI" glyph silhouette against the
corner candidate via normalized correlation, so it keys on the actual mark shape
(real marks score >=0.81, the Doubao strip 0.21, other AI output 0.0) rather than
coverage heuristics, and does not hijack ``--mark auto`` on a Doubao image.
``locate`` (geometry box, scales with image WIDTH) and ``extract_mask`` (the
candidate glyph mask the detector correlates) mirror the Doubao engine. Fast,
offline, no GPU. Arbitrary-region inpainting still lives in ``region_eraser`` /
the ``erase`` command.
Removal is **reverse-alpha blending** against a captured alpha map
(``original = (wm - a*logo)/(1-a)``), always NCC-aligned to the actual mark plus a thin
residual inpaint over the glyph footprint. This is one of the three text-mark engines
that share :class:`remove_ai_watermarks._text_mark_engine.TextMarkEngine`; this module
supplies only Jimeng's tuned :class:`TextMarkConfig` (bottom-right corner,
``assets/jimeng_alpha.png`` rebuilt by ``scripts/visible_alpha_solve.py`` from the gray
capture). Jimeng images are also caught by the China TC260 AIGC metadata label, so this
is the visible-mark *removal* path, not a new ``identify`` signal.
"""
# The module-level _alpha_template / _glyph_silhouette / _template_match_score below
# are thin test-facing shims (imported by tests/), so pyright's src-only pass sees them
# as unused; the use is cross-module.
# pyright: reportUnusedFunction=false
# cv2/numpy boundary: third-party libs ship no usable element types; relax the
# unknown-type rules for this file only.
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import cv2
import numpy as np
from remove_ai_watermarks import _text_mark_engine
from remove_ai_watermarks._text_mark_engine import TextMarkConfig, TextMarkDetection, TextMarkEngine
if TYPE_CHECKING:
from pathlib import Path
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
# Geometry as a fraction of image WIDTH. The Jimeng mark scales with width and is
# anchored bottom-right. The box is intentionally generous (the glyph mask
# tightens it); values cover the measured 2048 captures plus a real 1440 download.
# Locate geometry as a fraction of image WIDTH (mark scales with width, bottom-right).
WM_WIDTH_FRAC = 0.27
WM_HEIGHT_FRAC = 0.092
MARGIN_RIGHT_FRAC = 0.008
MARGIN_BOTTOM_FRAC = 0.010
# Glyph appearance: a low-saturation light gray rendered brighter than the
# surrounding content (white top-hat: brighter than a blurred local background)
# intersected with the grayish + minimum-brightness tests. Same polarity logic as
# the Doubao engine: leaves white-paper documents untouched (the mark is not
# brighter than its surroundings there, so nothing is masked).
MAX_SATURATION = 55 # max channel spread to count a pixel as "grayish"
LOGO_MIN_LUMA = 150 # glyphs are at least this bright in absolute terms
TOPHAT_DELTA = 12 # glyph must exceed the local background by this many levels
# Glyph appearance: a light, low-saturation gray brighter than the local background.
MAX_SATURATION = 55
LOGO_MIN_LUMA = 150
TOPHAT_DELTA = 12
# Detection matches the bundled alpha-template glyph silhouette
# (assets/jimeng_alpha.png) against the candidate via zero-mean normalized
# correlation (cv2 TM_CCOEFF_NORMED). Real Jimeng marks score >=0.83, the Doubao
# strip 0.22, other AI output 0.0 -> threshold 0.45 separates cleanly while
# keeping `--mark auto` from confusing Jimeng with Doubao. A small coverage floor
# skips the template match on a near-empty candidate box.
# Shape-consistent detection. Threshold 0.45 cleanly separates real Jimeng marks
# (>=0.81) from the Doubao strip (0.21), so the two ByteDance marks do not cross-fire.
DETECT_MIN_COVERAGE = 0.02
DETECT_NCC_THRESHOLD = 0.45
# ── Reverse-alpha (recovery, Gemini/Doubao-style) ────────────────────
# The Jimeng mark is a fixed semi-transparent white overlay; given its alpha map
# the original pixels are recovered by inverting the blend. The logo is pure white
# (the white capture confirms L=255 and a pair-solve of L lands at ~254.6). The
# alpha map was solved from the GRAY capture: a = (I - B)/(255 - B) with B a
# per-capture CUBIC background fit over the non-glyph pixels, averaged over the
# three channels, kept at full halo extent (down to a~0.02) and UNBLURRED. Gray
# (background ~132, mark contrast ~120) is chosen over black because it is the
# best proxy for real content, where the mark sits on bright photo areas, not on
# black; the careful build drops the gray self-residual to ~1.3 (the earlier
# max-channel / quadratic-bg / blurred / halo-truncated build was visibly worse --
# the mask, not the method, was the limit). The bundled asset
# (assets/jimeng_alpha.png) is the alpha template (a*255) at the captured width.
# The mark scales with image WIDTH; a pure width-scale is only sub-pixel-accurate
# at the captured width, so removal also registers the template to the actual mark
# via a TM_CCOEFF_NORMED scale+position search (`_aligned_alpha_map`) off it.
# Reverse-alpha geometry, emitted by scripts/visible_alpha_solve.py from the gray
# capture at the captured width.
_ALPHA_NATIVE_WIDTH = 2048
_ALPHA_LOGO_BGR: tuple[float, float, float] = (255.0, 255.0, 255.0)
# Geometry below is emitted by scripts/visible_alpha_solve.py for the bundled
# asset -- keep them in sync when the asset is rebuilt.
_ALPHA_WIDTH_FRAC = 0.2021 # asset width / image width -- the alignment scale seed
_ALPHA_HEIGHT_FRAC = 0.0576
# Margins (of image WIDTH) of the captured mark -- the geometry record / where to
# seed; alignment refines the actual position, so these are not load-bearing.
_ALPHA_MARGIN_RIGHT_FRAC = 0.0288
_ALPHA_MARGIN_BOTTOM_FRAC = 0.0288
# Alignment scale search (np.linspace args) around the width-scaled glyph size --
# fine enough that a per-image scale/position jitter does not leave a thick
# edge-misalignment outline (a coarse step left ~4px slop at the mark ends).
_ALPHA_ALIGN_SEARCH = (0.90, 1.12, 23)
# Residual inpaint footprint: unlike Doubao, Jimeng's per-image render variation
# leaves a faint outline even at native, so the glyph footprint (alpha above this)
# is always inpainted after reverse-alpha (dilated by this kernel, INPAINT_NS).
# Kept deliberately THIN -- the careful alpha map (cubic-background, mean-channel,
# full-halo solve) knocks the mark down far enough that a tight footprint clears
# it, so the inpaint does not smear the texture/edges under the mark the way a
# wide full-footprint pass did.
_RESIDUAL_ALPHA_FLOOR = 0.05
_RESIDUAL_DILATE = 5
_RESIDUAL_INPAINT_RADIUS = 2
_alpha_template_cache: NDArray[Any] | None = None
_CONFIG = TextMarkConfig(
name="Jimeng",
asset_name="jimeng_alpha.png",
corner="br",
margin_floor=4,
width_frac=WM_WIDTH_FRAC,
height_frac=WM_HEIGHT_FRAC,
margin_x_frac=MARGIN_RIGHT_FRAC,
margin_bottom_frac=MARGIN_BOTTOM_FRAC,
max_saturation=MAX_SATURATION,
logo_min_luma=LOGO_MIN_LUMA,
tophat_delta=TOPHAT_DELTA,
morph_open_size=5,
detect_min_coverage=DETECT_MIN_COVERAGE,
detect_ncc_threshold=DETECT_NCC_THRESHOLD,
alpha_width_frac=_ALPHA_WIDTH_FRAC,
alpha_height_frac=_ALPHA_HEIGHT_FRAC,
alpha_margin_x_frac=_ALPHA_MARGIN_RIGHT_FRAC,
alpha_margin_bottom_frac=_ALPHA_MARGIN_BOTTOM_FRAC,
alpha_align_search=_ALPHA_ALIGN_SEARCH,
min_gw=8,
alpha_logo_bgr=_ALPHA_LOGO_BGR,
residual_alpha_floor=_RESIDUAL_ALPHA_FLOOR,
residual_dilate=_RESIDUAL_DILATE,
residual_inpaint_radius=_RESIDUAL_INPAINT_RADIUS,
)
JimengDetection = TextMarkDetection
def _alpha_template() -> NDArray[Any] | None:
"""Lazily load the bundled Jimeng alpha template (float [0,1]), or None."""
global _alpha_template_cache
if _alpha_template_cache is None:
from pathlib import Path
from remove_ai_watermarks import image_io
path = Path(__file__).parent / "assets" / "jimeng_alpha.png"
img = image_io.imread(str(path), cv2.IMREAD_GRAYSCALE)
if img is None:
return None
_alpha_template_cache = img.astype(np.float32) / 255.0
return _alpha_template_cache
@dataclass(frozen=True)
class JimengLocation:
"""Located watermark box (bottom-right), in absolute pixel coordinates."""
x: int
y: int
w: int
h: int
is_fallback: bool = True # geometry anchor (no template match) -> always True for now
@property
def bbox(self) -> tuple[int, int, int, int]:
return self.x, self.y, self.w, self.h
@dataclass
class JimengDetection:
"""Result of visible Jimeng watermark detection."""
detected: bool = False
confidence: float = 0.0
region: tuple[int, int, int, int] = (0, 0, 0, 0)
coverage: float = 0.0 # fraction of the box occupied by glyph pixels
_silhouette_cache: NDArray[Any] | None = None
"""The bundled Jimeng alpha template (float [0,1]), or None."""
return _text_mark_engine.load_alpha_template(_CONFIG.asset_name)
def _glyph_silhouette() -> NDArray[Any] | None:
"""Binary "即梦AI" silhouette (255 = glyph) from the bundled alpha map, used
as the detection template. None if the alpha asset is missing."""
global _silhouette_cache
if _silhouette_cache is None:
at = _alpha_template()
if at is None:
return None
_silhouette_cache = (at > 0.15).astype(np.uint8) * 255
return _silhouette_cache
"""Binary "即梦AI" silhouette (255 = glyph) from the alpha map, or None."""
return _text_mark_engine.glyph_silhouette(_CONFIG.asset_name)
def _template_match_score(box_mask: NDArray[Any], image_width: int) -> float:
"""Zero-mean normalized correlation of the alpha-template glyph silhouette
(scaled to the mark's expected size) against the candidate ``box_mask``."""
sil = _glyph_silhouette()
if sil is None or box_mask.size == 0:
return 0.0
gw = min(box_mask.shape[1] - 1, max(8, int(_ALPHA_WIDTH_FRAC * image_width)))
gh = min(box_mask.shape[0] - 1, max(4, int(_ALPHA_HEIGHT_FRAC * image_width)))
if gw < 8 or gh < 4:
return 0.0
template = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
return float(cv2.matchTemplate(box_mask, template, cv2.TM_CCOEFF_NORMED).max())
"""TM_CCOEFF_NORMED of the Jimeng glyph silhouette against ``box_mask``."""
return _text_mark_engine.template_match_score(box_mask, image_width, _CONFIG)
class JimengEngine:
"""Remove the visible Jimeng "即梦AI" watermark (locate -> mask -> reverse-alpha)."""
class JimengEngine(TextMarkEngine):
"""Remove the visible Jimeng "即梦AI" watermark (locate -> mask -> reverse-alpha)."""
def __init__(
self,
*,
width_frac: float = WM_WIDTH_FRAC,
height_frac: float = WM_HEIGHT_FRAC,
margin_right_frac: float = MARGIN_RIGHT_FRAC,
margin_bottom_frac: float = MARGIN_BOTTOM_FRAC,
) -> None:
self.width_frac = width_frac
self.height_frac = height_frac
self.margin_right_frac = margin_right_frac
self.margin_bottom_frac = margin_bottom_frac
# ── Locate ────────────────────────────────────────────────────────
def locate(self, image: NDArray[Any]) -> JimengLocation:
"""Anchor the watermark box in the bottom-right corner by geometry."""
h, w = image.shape[:2]
wm_w = max(40, int(w * self.width_frac))
wm_h = max(16, int(w * self.height_frac))
margin_r = max(4, int(w * self.margin_right_frac))
margin_b = max(4, int(w * self.margin_bottom_frac))
x = max(0, w - margin_r - wm_w)
y = max(0, h - margin_b - wm_h)
wm_w = min(wm_w, w - x)
wm_h = min(wm_h, h - y)
return JimengLocation(x=x, y=y, w=wm_w, h=wm_h, is_fallback=True)
# ── Mask ──────────────────────────────────────────────────────────
def extract_mask(self, image: NDArray[Any], loc: JimengLocation) -> NDArray[Any]:
"""Build a full-image uint8 mask (255 = watermark glyph) for the box.
Polarity-aware: the mark is a light, low-saturation gray rendered brighter
than the local background (white top-hat), so a white-paper document is
left untouched (nothing brighter than its surroundings is masked there).
"""
h, w = image.shape[:2]
x, y, bw, bh = loc.bbox
# A degenerate ROI (a sliver from an extremely wide/short image) cannot hold
# the mark and would feed cv2's GaussianBlur/morphology a ~1-px-tall array,
# which can fault the native code on some platforms (observed: a Windows
# access violation via the always-align removal's residual `detect`). Skip
# the cv2 pipeline and return an empty mask there.
if bh < 16 or bw < 16:
return np.zeros((h, w), np.uint8)
# Normalize the ROI to 3-channel BGR: a 2D grayscale or 4-channel BGRA
# input would otherwise break the axis=2 channel reductions below.
roi = image[y : y + bh, x : x + bw]
if roi.ndim == 2:
roi = cv2.cvtColor(roi, cv2.COLOR_GRAY2BGR)
elif roi.shape[2] == 4:
roi = cv2.cvtColor(roi, cv2.COLOR_BGRA2BGR)
roi = roi.astype(np.float32)
luma = roi.mean(axis=2)
sat = roi.max(axis=2) - roi.min(axis=2)
grayish = sat < MAX_SATURATION
sigma = max(4.0, bh * 0.4)
local_bg = cv2.GaussianBlur(luma, (0, 0), sigmaX=sigma, sigmaY=sigma)
tophat = luma - local_bg
cand = grayish & (tophat > TOPHAT_DELTA) & (luma > LOGO_MIN_LUMA)
glyph = cand.astype(np.uint8) * 255
glyph = cv2.morphologyEx(glyph, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
glyph = cv2.morphologyEx(glyph, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
mask = np.zeros((h, w), np.uint8)
mask[y : y + bh, x : x + bw] = glyph
return mask
# ── Detect ────────────────────────────────────────────────────────
def detect(self, image: NDArray[Any]) -> JimengDetection:
"""Detect the visible Jimeng mark by matching the alpha-template glyph
silhouette against the corner candidate (TM_CCOEFF_NORMED)."""
det = JimengDetection()
if image is None or image.size == 0:
return det
loc = self.locate(image)
mask = self.extract_mask(image, loc)
x, y, bw, bh = loc.bbox
box = mask[y : y + bh, x : x + bw]
coverage = float((box > 0).sum()) / float(max(1, bw * bh))
det.region = loc.bbox
det.coverage = coverage
if coverage >= DETECT_MIN_COVERAGE:
score = _template_match_score(box, image.shape[1])
det.confidence = score
det.detected = score >= DETECT_NCC_THRESHOLD
logger.debug("Jimeng detect: coverage=%.3f ncc=%.2f detected=%s", coverage, score, det.detected)
return det
# ── Reverse-alpha (recovery + residual inpaint) ───────────────────
def reverse_alpha_available(self, image: NDArray[Any]) -> bool:
"""True if the bundled alpha map is loadable (NCC alignment places it at
any resolution; the caller still gates on ``detect``)."""
return image is not None and image.size > 0 and _alpha_template() is not None
def _fixed_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Place the template by fixed width-relative geometry."""
at = _alpha_template()
if at is None:
return None
h, w = image.shape[:2]
gw = min(w, max(1, int(_ALPHA_WIDTH_FRAC * w)))
gh = min(h, max(1, int(_ALPHA_HEIGHT_FRAC * w)))
ax = max(0, w - int(_ALPHA_MARGIN_RIGHT_FRAC * w) - gw)
ay = max(0, h - int(_ALPHA_MARGIN_BOTTOM_FRAC * w) - gh)
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _aligned_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Register the captured template to the actual mark via a
TM_CCOEFF_NORMED scale + position search -- so the single capture works
off the captured width. Returns ``(alpha_map, glyph_bbox)`` or None."""
at = _alpha_template()
sil = _glyph_silhouette()
if at is None or sil is None:
return None
h, w = image.shape[:2]
loc = self.locate(image)
bx, by, bw, bh = loc.bbox
box_mask = self.extract_mask(image, loc)[by : by + bh, bx : bx + bw]
expected = _ALPHA_WIDTH_FRAC * w
best: tuple[float, int, int, int, int] | None = None
for scale in np.linspace(*_ALPHA_ALIGN_SEARCH):
gw, gh = int(expected * scale), int(_ALPHA_HEIGHT_FRAC * w * scale)
if gw < 8 or gh < 4 or gw >= bw or gh >= bh:
continue
t = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
_, score, _, top_left = cv2.minMaxLoc(cv2.matchTemplate(box_mask, t, cv2.TM_CCOEFF_NORMED))
if best is None or score > best[0]:
best = (score, gw, gh, top_left[0], top_left[1])
if best is None:
return None
_, gw, gh, ox, oy = best
ax, ay = bx + ox, by + oy
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _apply_reverse_alpha(self, image: NDArray[Any], amap: NDArray[Any]) -> NDArray[Any]:
"""Invert the alpha blend with ``amap``: ``original = (wm - a*logo)/(1-a)``."""
a3 = np.clip(amap, 0.0, 1.0)[:, :, None]
logo = np.array(_ALPHA_LOGO_BGR, np.float32)
return np.clip((image.astype(np.float32) - a3 * logo) / np.clip(1.0 - a3, 0.25, 1.0), 0, 255).astype(np.uint8)
def remove_watermark_reverse_alpha(self, image: NDArray[Any], *, residual_inpaint: bool = True) -> NDArray[Any]:
"""Recover the original pixels by inverting the alpha blend, then clear
the residual outline with a thin inpaint over the glyph footprint.
Placement: fixed geometry AND the NCC-aligned placement are always tried
and the one leaving the least residual mark (lowest re-``detect``
confidence) is kept -- Jimeng jitters the mark a few px per image even at
the captured width, so fixed geometry alone is not reliable. A single 2048
alpha cannot pixel-cancel the mark re-rasterized at another resolution, so a
deliberately THIN residual inpaint (``_RESIDUAL_*``) follows: reverse-alpha
has already recovered the true background (edges included) under the mark,
so the inpaint only finishes the residual edges instead of smearing the
whole footprint. Call only when :meth:`reverse_alpha_available` and the mark
is detected.
"""
# Normalize to 3-channel BGR so a 2D grayscale or 4-channel BGRA input
# does not break the reverse-alpha math (which assumes a 3-channel logo).
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
# An image too small to hold the mark would make the geometry boxes
# degenerate and feed cv2.resize a ~1-px-tall target / GaussianBlur a sliver
# ROI, which faults natively on Windows (access violation / "Unknown C++
# exception"). No real watermarked image is this small; skip cv2 entirely.
h, w = image.shape[:2]
if h < 32 or w < 64:
return image.copy()
# Always try fixed geometry AND the NCC-aligned placement and keep
# whichever leaves the least residual mark (re-detect confidence on the
# bare reverse-alpha). Unlike Doubao's deterministic overlay, Jimeng jitters
# the mark's position a few px PER IMAGE even at the captured width, so
# fixed geometry alone misses there too -- the NCC search registers the
# template to the actual mark; fixed stays as a fallback if the search has
# no saliency to lock onto (a flat/contrastless mark).
maps = [c for c in (self._fixed_alpha_map(image), self._aligned_alpha_map(image)) if c is not None]
if not maps:
return image.copy()
best_out: NDArray[Any] | None = None
best_amap: NDArray[Any] | None = None
best_residual = float("inf")
for amap, _region in maps:
out = self._apply_reverse_alpha(image, amap)
residual = self.detect(out).confidence
if residual < best_residual:
best_residual, best_out, best_amap = residual, out, amap
if best_out is None or best_amap is None: # pragma: no cover - maps is non-empty
return image.copy()
if residual_inpaint:
kernel = np.ones((_RESIDUAL_DILATE, _RESIDUAL_DILATE), np.uint8)
rm = cv2.dilate((best_amap > _RESIDUAL_ALPHA_FLOOR).astype(np.uint8) * 255, kernel)
best_out = cv2.inpaint(best_out, rm, _RESIDUAL_INPAINT_RADIUS, cv2.INPAINT_NS)
return best_out
def load_image_bgr(path: str | Path) -> NDArray[Any]:
"""Read an image as BGR ndarray (helper for scripts/tests)."""
from remove_ai_watermarks import image_io
img = image_io.imread(path, cv2.IMREAD_COLOR)
if img is None:
raise FileNotFoundError(f"Failed to read image: {path}")
return img
def __init__(self) -> None:
super().__init__(_CONFIG)
+27 -1
View File
@@ -9,6 +9,7 @@ For metadata-only operations, the heavy ML dependencies are NOT required.
from __future__ import annotations
import contextlib
import functools
import logging
import re
import struct
@@ -209,7 +210,10 @@ def _png_late_metadata(image_path: Path, window: int) -> bytes:
if chunk_type in _PNG_META_CHUNKS and data_start >= window:
f.seek(data_start)
out += f.read(safe_length)
pos = data_start + length + 4 # data + CRC
# Advance by the CLAMPED length: a malformed/inflated `length` that
# overshoots EOF must not push `pos` past the file and abort the scan
# (which would silently skip a genuine AI-label chunk after it).
pos = data_start + safe_length + 4 # data + CRC
except OSError as exc:
logger.debug("PNG late-metadata scan failed on %s: %s", image_path, exc)
return b""
@@ -227,7 +231,29 @@ def scan_head(image_path: Path, size: int = 1024 * 1024) -> bytes:
non-faststart MP4 manifest, or a PNG XMP packet appended after the pixels --
which a fixed first-MB read would miss. For other inputs, and for files that
fit within ``size``, it is exactly ``f.read(size)`` -- behavior-neutral.
The result is memoized per (path, size, mtime): one ``identify``/``get_ai_metadata``
call fans out to ~8 byte-scan detectors that each call this on the same file, so
the cache turns those repeated reads into one. The mtime key invalidates the entry
when the file changes; the small ``maxsize`` bounds memory to a few MB.
"""
try:
mtime = image_path.stat().st_mtime_ns
except OSError:
# No stat (e.g. a pipe, or a race): read uncached rather than fail.
return _scan_head_impl(image_path, size)
return _scan_head_cached(str(image_path), size, mtime)
@functools.lru_cache(maxsize=8)
def _scan_head_cached(path_str: str, size: int, _mtime_ns: int) -> bytes:
"""Cache shim: ``_mtime_ns`` is part of the key only (invalidates on change)."""
from pathlib import Path as _Path
return _scan_head_impl(_Path(path_str), size)
def _scan_head_impl(image_path: Path, size: int) -> bytes:
with open(image_path, "rb") as f:
head = f.read(size)
# Lazy import: isobmff imports this module's constants at top level.
@@ -52,7 +52,16 @@ def run_img2img(
)
done_ev.set()
return result.images[0]
except TypeError:
except TypeError as exc:
# The only TypeError we retry is the deprecated-callback case: `_call_pipeline`
# passes the legacy `callback`/`callback_steps` kwargs, and a diffusers version
# that removed them raises TypeError("... unexpected keyword argument
# 'callback'"). We then re-run once WITHOUT the progress callback. Any OTHER
# TypeError (e.g. a bad control_image/dtype in the forward pass) is a real error
# -- re-raise it instead of silently re-running the whole diffusion pass and
# masking the cause.
if "callback" not in str(exc):
raise
first_step.set()
result = _call_pipeline(
pipeline, image, strength, num_inference_steps, guidance_scale, generator, None, extra_kwargs
@@ -16,7 +16,6 @@ DEFAULT_MODEL_ID = "stabilityai/stable-diffusion-xl-base-1.0"
# profile is ``sdxl``; ``default`` is kept as an accepted alias (it was the profile's
# name before ``controlnet`` became the default-selected pipeline, 2026-06-09).
SDXL_PROFILE = "sdxl"
CONTROLNET_PROFILE = "controlnet"
_PROFILE_ALIASES = {"default": SDXL_PROFILE}
@@ -119,16 +118,3 @@ def vendor_for_strength(image_path: Path) -> Literal["openai", "google"] | None:
if "openai" in src:
return "openai"
return None
def get_model_id_for_profile(profile: str) -> str:
"""Map CLI model profile names to concrete Hugging Face model IDs.
Both ``sdxl`` and ``controlnet`` use the SDXL base checkpoint -- the canny
ControlNet (``CONTROLNET_CANNY_MODEL``) is an add-on loaded on top of it, not a
separate base model. The legacy ``default`` alias resolves to ``sdxl``.
"""
normalized = normalize_profile(profile)
if normalized in (SDXL_PROFILE, CONTROLNET_PROFILE):
return DEFAULT_MODEL_ID
raise ValueError(f"Unknown model profile '{profile}'. Use one of: sdxl, controlnet.")
+72 -350
View File
@@ -1,394 +1,116 @@
"""Samsung Galaxy AI visible watermark removal engine.
Samsung's on-device Generative AI photo edits (Generative Edit / Sketch to Image /
Portrait Studio on Galaxy phones) stamp a visible localized wordmark -- a sparkle
icon followed by a "generated with AI" string -- in the **bottom-left** corner: a
light, low-opacity semi-transparent white overlay. The string is locale-specific;
this engine is calibrated for the Italian "Contenuti generati dall'AI" variant
(issue #37, captures from @f-liva). Other locales need their own captured alpha
template, but the geometry and removal recipe are shared.
Samsung's on-device Generative AI photo edits burn a visible "✦ Contenuti generati
dall'AI" wordmark into the bottom-LEFT corner (the Italian locale variant calibrated
here; the string is locale-specific). It is a faint, near-white semi-transparent
overlay, the same overlay class as the Doubao/Jimeng marks but bottom-left.
Like the Gemini sparkle and the Doubao / Jimeng marks it is a fixed overlay, so
removal starts from **reverse-alpha blending** against a captured alpha map
(``remove_watermark_reverse_alpha``): ``original = (wm - a*logo)/(1-a)``. The logo
is pure white (255,255,255); the alpha map was solved from the GRAY Samsung capture
(see ``data/samsung_capture/``), bundled as ``assets/samsung_alpha.png`` -- the same
careful build as Jimeng/Doubao (cubic-background fit, mean over channels, full halo
extent, unblurred). The Samsung mark is faint (peak alpha ~0.38), so the glyph reads
as a soft light-gray strip.
The mark is anchored bottom-LEFT (Doubao/Jimeng are bottom-right) and scales with
image WIDTH (~0.32 of width). The flat calibration captures arrive at the phone's
flat-edit size (~1086 wide) while real photos are ~3000 wide, so a single alpha map
cannot pixel-cancel the upscaled, per-image re-rasterized mark; removal therefore
NCC-aligns the alpha to the actual mark (always), reverse-alphas, then clears the
residual with a deliberately THIN inpaint over the glyph footprint -- the exact
recipe Jimeng uses. Verified on the flat captures and a real ~2958-wide download.
Detection (``detect``) matches the bundled glyph silhouette against the corner
candidate via normalized correlation, keying on the actual mark shape rather than
coverage heuristics. Samsung edits also carry C2PA + the Galaxy ``genAIType``
marker (see ``metadata``/``identify``), so the visible path is the stripped-metadata
fallback / the *removal* path, not a new ``identify`` signal.
``locate`` (geometry box) and ``extract_mask`` (the candidate glyph mask the
detector correlates) mirror the Doubao/Jimeng engines. Fast, offline, no GPU.
Arbitrary-region inpainting still lives in ``region_eraser`` / the ``erase`` command.
Removal is **reverse-alpha blending** against a captured alpha map
(``original = (wm - a*logo)/(1-a)``), always NCC-aligned to the actual mark plus a thin
residual inpaint over the glyph footprint. This is one of the three text-mark engines
that share :class:`remove_ai_watermarks._text_mark_engine.TextMarkEngine`; this module
supplies only Samsung's tuned :class:`TextMarkConfig` (bottom-LEFT corner, a lower glyph
luma since the mark is faint, ``assets/samsung_alpha.png`` solved from the flat captures
by ``scripts/visible_alpha_solve.py``). Samsung Galaxy AI edits are also caught by C2PA
+ the ``genAIType`` marker, so this is the visible-mark *removal* path; it also feeds
``identify`` as the medium-confidence ``visible_samsung`` signal via the registry.
"""
# The module-level _alpha_template / _glyph_silhouette / _template_match_score below
# are thin test-facing shims (imported by tests/), so pyright's src-only pass sees them
# as unused; the use is cross-module.
# pyright: reportUnusedFunction=false
# cv2/numpy boundary: third-party libs ship no usable element types; relax the
# unknown-type rules for this file only.
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import cv2
import numpy as np
from remove_ai_watermarks import _text_mark_engine
from remove_ai_watermarks._text_mark_engine import TextMarkConfig, TextMarkDetection, TextMarkEngine
if TYPE_CHECKING:
from pathlib import Path
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
# Geometry as a fraction of image WIDTH. The Samsung mark scales with width and is
# anchored bottom-LEFT. The box is intentionally generous (the glyph mask tightens
# it and the alignment search refines position); values cover the 1086 flat captures
# and the ~2958 real photos (both measured at width_frac ~0.31).
# Locate geometry as a fraction of image WIDTH (mark scales with width, bottom-LEFT).
WM_WIDTH_FRAC = 0.40
WM_HEIGHT_FRAC = 0.060
MARGIN_LEFT_FRAC = 0.004
MARGIN_BOTTOM_FRAC = 0.002
# Glyph appearance: a low-saturation light gray rendered brighter than the
# surrounding content (white top-hat), same polarity logic as Doubao/Jimeng so a
# white-paper document is left untouched. LOGO_MIN_LUMA is lower than Jimeng's
# because the Samsung mark is fainter (peak alpha ~0.38), so on a mid/dark
# background the glyph luma is lower; the top-hat + NCC shape gate keep precision.
MAX_SATURATION = 55 # max channel spread to count a pixel as "grayish"
LOGO_MIN_LUMA = 110 # glyphs are at least this bright in absolute terms
TOPHAT_DELTA = 8 # glyph must exceed the local background by this many levels
# Glyph appearance: a light, low-saturation gray. LOGO_MIN_LUMA is lower than Jimeng's
# because the mark is faint (peak alpha ~0.38), so on a mid/dark background its glyph
# luma is lower; a white-paper document is still left untouched.
MAX_SATURATION = 55
LOGO_MIN_LUMA = 110
TOPHAT_DELTA = 8
# Detection matches the bundled alpha-template glyph silhouette
# (assets/samsung_alpha.png) against the candidate via zero-mean normalized
# correlation (cv2 TM_CCOEFF_NORMED). A small coverage floor skips the template
# match on a near-empty candidate box. The threshold is validated against the real
# capture set and the other visible marks (Doubao/Jimeng/Gemini must not cross-fire).
# Shape-consistent detection. Threshold 0.40; real marks ~0.79, and Doubao/Jimeng score
# 0.0 here (and Samsung 0.0 on theirs) -- no cross-fire (the corner also differs).
DETECT_MIN_COVERAGE = 0.01
DETECT_NCC_THRESHOLD = 0.40
# ── Reverse-alpha (recovery, Gemini/Doubao/Jimeng-style) ─────────────
# The Samsung mark is a fixed semi-transparent white overlay; given its alpha map
# the original pixels are recovered by inverting the blend. The logo is pure white
# (the white capture confirms it). The alpha map was solved from the GRAY capture by
# scripts/visible_alpha_solve.py (cubic-background fit, mean over channels, full halo,
# unblurred); the bundled asset (assets/samsung_alpha.png) is that template (a*255)
# at the captured width. The mark scales with image WIDTH, and the flat captures are
# ~2.7x smaller than real photos, so a pure width-scale is only approximate; removal
# also registers the template to the actual mark via a TM_CCOEFF_NORMED scale+position
# search (`_aligned_alpha_map`).
# Reverse-alpha geometry, solved by scripts/visible_alpha_solve.py from the flat gray
# capture (native width 1086). Real photos are ~2958 wide, so the captured glyph is
# upscaled; width-scale + NCC-align removes it cleanly (a flat capture at the real
# resolution would make the alpha pixel-sharp -- an open quality upgrade).
_ALPHA_NATIVE_WIDTH = 1086
_ALPHA_LOGO_BGR: tuple[float, float, float] = (255.0, 255.0, 255.0)
# Geometry below is emitted by scripts/visible_alpha_solve.py for the bundled
# asset -- keep them in sync when the asset is rebuilt.
_ALPHA_WIDTH_FRAC = 0.3195 # asset width / image width -- the alignment scale seed
_ALPHA_HEIGHT_FRAC = 0.0378
# Margins (of image WIDTH) of the captured mark -- the geometry record / where to
# seed; alignment refines the actual position, so these are not load-bearing.
_ALPHA_MARGIN_LEFT_FRAC = 0.0110
_ALPHA_MARGIN_BOTTOM_FRAC = 0.0064
# Alignment scale search (np.linspace args) around the width-scaled glyph size --
# wider than Jimeng's because the flat captures are far off the real-photo width, so
# the per-image scale can drift more from the width-scaled seed.
# Wider scale search: the flat capture is far off the real-photo width.
_ALPHA_ALIGN_SEARCH = (0.85, 1.18, 23)
# Residual inpaint footprint: a single capture upscaled to the real-photo width
# cannot pixel-cancel the re-rasterized mark, so the glyph footprint (alpha above
# this) is always inpainted after reverse-alpha (dilated by this kernel, INPAINT_NS).
# Kept deliberately THIN -- reverse-alpha already recovers the true background under
# the semi-transparent mark, so the inpaint only finishes the residual edges.
_RESIDUAL_ALPHA_FLOOR = 0.05
_RESIDUAL_DILATE = 5
_RESIDUAL_INPAINT_RADIUS = 2
_alpha_template_cache: NDArray[Any] | None = None
_CONFIG = TextMarkConfig(
name="Samsung Galaxy AI",
asset_name="samsung_alpha.png",
corner="bl",
margin_floor=2,
width_frac=WM_WIDTH_FRAC,
height_frac=WM_HEIGHT_FRAC,
margin_x_frac=MARGIN_LEFT_FRAC,
margin_bottom_frac=MARGIN_BOTTOM_FRAC,
max_saturation=MAX_SATURATION,
logo_min_luma=LOGO_MIN_LUMA,
tophat_delta=TOPHAT_DELTA,
morph_open_size=3,
detect_min_coverage=DETECT_MIN_COVERAGE,
detect_ncc_threshold=DETECT_NCC_THRESHOLD,
alpha_width_frac=_ALPHA_WIDTH_FRAC,
alpha_height_frac=_ALPHA_HEIGHT_FRAC,
alpha_margin_x_frac=_ALPHA_MARGIN_LEFT_FRAC,
alpha_margin_bottom_frac=_ALPHA_MARGIN_BOTTOM_FRAC,
alpha_align_search=_ALPHA_ALIGN_SEARCH,
min_gw=16,
alpha_logo_bgr=_ALPHA_LOGO_BGR,
residual_alpha_floor=_RESIDUAL_ALPHA_FLOOR,
residual_dilate=_RESIDUAL_DILATE,
residual_inpaint_radius=_RESIDUAL_INPAINT_RADIUS,
)
SamsungDetection = TextMarkDetection
def _alpha_template() -> NDArray[Any] | None:
"""Lazily load the bundled Samsung alpha template (float [0,1]), or None."""
global _alpha_template_cache
if _alpha_template_cache is None:
from pathlib import Path
from remove_ai_watermarks import image_io
path = Path(__file__).parent / "assets" / "samsung_alpha.png"
img = image_io.imread(str(path), cv2.IMREAD_GRAYSCALE)
if img is None:
return None
_alpha_template_cache = img.astype(np.float32) / 255.0
return _alpha_template_cache
@dataclass(frozen=True)
class SamsungLocation:
"""Located watermark box (bottom-left), in absolute pixel coordinates."""
x: int
y: int
w: int
h: int
is_fallback: bool = True # geometry anchor (no template match) -> always True for now
@property
def bbox(self) -> tuple[int, int, int, int]:
return self.x, self.y, self.w, self.h
@dataclass
class SamsungDetection:
"""Result of visible Samsung Galaxy AI watermark detection."""
detected: bool = False
confidence: float = 0.0
region: tuple[int, int, int, int] = (0, 0, 0, 0)
coverage: float = 0.0 # fraction of the box occupied by glyph pixels
_silhouette_cache: NDArray[Any] | None = None
"""The bundled Samsung alpha template (float [0,1]), or None."""
return _text_mark_engine.load_alpha_template(_CONFIG.asset_name)
def _glyph_silhouette() -> NDArray[Any] | None:
"""Binary glyph silhouette (255 = glyph) from the bundled alpha map, used as the
detection template. None if the alpha asset is missing. The threshold is a
fraction of the (faint) peak alpha so the thin strokes survive."""
global _silhouette_cache
if _silhouette_cache is None:
at = _alpha_template()
if at is None:
return None
_silhouette_cache = (at > 0.10).astype(np.uint8) * 255
return _silhouette_cache
"""Binary "Contenuti generati dall'AI" silhouette (255 = glyph), or None."""
return _text_mark_engine.glyph_silhouette(_CONFIG.asset_name)
def _template_match_score(box_mask: NDArray[Any], image_width: int) -> float:
"""Zero-mean normalized correlation of the alpha-template glyph silhouette
(scaled to the mark's expected size) against the candidate ``box_mask``."""
sil = _glyph_silhouette()
if sil is None or box_mask.size == 0:
return 0.0
gw = min(box_mask.shape[1] - 1, max(16, int(_ALPHA_WIDTH_FRAC * image_width)))
gh = min(box_mask.shape[0] - 1, max(4, int(_ALPHA_HEIGHT_FRAC * image_width)))
if gw < 16 or gh < 4:
return 0.0
template = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
return float(cv2.matchTemplate(box_mask, template, cv2.TM_CCOEFF_NORMED).max())
"""TM_CCOEFF_NORMED of the Samsung glyph silhouette against ``box_mask``."""
return _text_mark_engine.template_match_score(box_mask, image_width, _CONFIG)
class SamsungEngine:
"""Remove the visible Samsung Galaxy AI watermark (locate -> mask -> reverse-alpha)."""
class SamsungEngine(TextMarkEngine):
"""Remove the visible Samsung Galaxy AI text mark (locate -> mask -> reverse-alpha)."""
def __init__(
self,
*,
width_frac: float = WM_WIDTH_FRAC,
height_frac: float = WM_HEIGHT_FRAC,
margin_left_frac: float = MARGIN_LEFT_FRAC,
margin_bottom_frac: float = MARGIN_BOTTOM_FRAC,
) -> None:
self.width_frac = width_frac
self.height_frac = height_frac
self.margin_left_frac = margin_left_frac
self.margin_bottom_frac = margin_bottom_frac
# ── Locate ────────────────────────────────────────────────────────
def locate(self, image: NDArray[Any]) -> SamsungLocation:
"""Anchor the watermark box in the bottom-left corner by geometry."""
h, w = image.shape[:2]
wm_w = max(40, int(w * self.width_frac))
wm_h = max(16, int(w * self.height_frac))
margin_l = max(2, int(w * self.margin_left_frac))
margin_b = max(2, int(w * self.margin_bottom_frac))
x = min(margin_l, max(0, w - wm_w))
y = max(0, h - margin_b - wm_h)
wm_w = min(wm_w, w - x)
wm_h = min(wm_h, h - y)
return SamsungLocation(x=x, y=y, w=wm_w, h=wm_h, is_fallback=True)
# ── Mask ──────────────────────────────────────────────────────────
def extract_mask(self, image: NDArray[Any], loc: SamsungLocation) -> NDArray[Any]:
"""Build a full-image uint8 mask (255 = watermark glyph) for the box.
Polarity-aware: the mark is a light, low-saturation gray rendered brighter
than the local background (white top-hat), so a white-paper document is left
untouched (nothing brighter than its surroundings is masked there).
"""
h, w = image.shape[:2]
x, y, bw, bh = loc.bbox
# A degenerate ROI (a sliver from an extremely wide/short image) cannot hold
# the mark and would feed cv2's GaussianBlur/morphology a ~1-px-tall array,
# which can fault the native code on some platforms (mirrors the Doubao/Jimeng
# guard). Skip the cv2 pipeline and return an empty mask there.
if bh < 16 or bw < 16:
return np.zeros((h, w), np.uint8)
# Normalize the ROI to 3-channel BGR: a 2D grayscale or 4-channel BGRA input
# would otherwise break the axis=2 channel reductions below.
roi = image[y : y + bh, x : x + bw]
if roi.ndim == 2:
roi = cv2.cvtColor(roi, cv2.COLOR_GRAY2BGR)
elif roi.shape[2] == 4:
roi = cv2.cvtColor(roi, cv2.COLOR_BGRA2BGR)
roi = roi.astype(np.float32)
luma = roi.mean(axis=2)
sat = roi.max(axis=2) - roi.min(axis=2)
grayish = sat < MAX_SATURATION
sigma = max(4.0, bh * 0.4)
local_bg = cv2.GaussianBlur(luma, (0, 0), sigmaX=sigma, sigmaY=sigma)
tophat = luma - local_bg
cand = grayish & (tophat > TOPHAT_DELTA) & (luma > LOGO_MIN_LUMA)
glyph = cand.astype(np.uint8) * 255
glyph = cv2.morphologyEx(glyph, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8))
glyph = cv2.morphologyEx(glyph, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8))
mask = np.zeros((h, w), np.uint8)
mask[y : y + bh, x : x + bw] = glyph
return mask
# ── Detect ────────────────────────────────────────────────────────
def detect(self, image: NDArray[Any]) -> SamsungDetection:
"""Detect the visible Samsung mark by matching the alpha-template glyph
silhouette against the corner candidate (TM_CCOEFF_NORMED)."""
det = SamsungDetection()
if image is None or image.size == 0:
return det
loc = self.locate(image)
mask = self.extract_mask(image, loc)
x, y, bw, bh = loc.bbox
box = mask[y : y + bh, x : x + bw]
coverage = float((box > 0).sum()) / float(max(1, bw * bh))
det.region = loc.bbox
det.coverage = coverage
if coverage >= DETECT_MIN_COVERAGE:
score = _template_match_score(box, image.shape[1])
det.confidence = score
det.detected = score >= DETECT_NCC_THRESHOLD
logger.debug("Samsung detect: coverage=%.3f ncc=%.2f detected=%s", coverage, score, det.detected)
return det
# ── Reverse-alpha (recovery + residual inpaint) ───────────────────
def reverse_alpha_available(self, image: NDArray[Any]) -> bool:
"""True if the bundled alpha map is loadable (NCC alignment places it at any
resolution; the caller still gates on ``detect``)."""
return image is not None and image.size > 0 and _alpha_template() is not None
def _fixed_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Place the template by fixed width-relative geometry (bottom-left)."""
at = _alpha_template()
if at is None:
return None
h, w = image.shape[:2]
gw = min(w, max(1, int(_ALPHA_WIDTH_FRAC * w)))
gh = min(h, max(1, int(_ALPHA_HEIGHT_FRAC * w)))
ax = min(max(0, int(_ALPHA_MARGIN_LEFT_FRAC * w)), max(0, w - gw))
ay = max(0, h - int(_ALPHA_MARGIN_BOTTOM_FRAC * w) - gh)
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _aligned_alpha_map(self, image: NDArray[Any]) -> tuple[NDArray[Any], tuple[int, int, int, int]] | None:
"""Register the captured template to the actual mark via a TM_CCOEFF_NORMED
scale + position search -- so the single capture works off the captured
width. Returns ``(alpha_map, glyph_bbox)`` or None."""
at = _alpha_template()
sil = _glyph_silhouette()
if at is None or sil is None:
return None
h, w = image.shape[:2]
loc = self.locate(image)
bx, by, bw, bh = loc.bbox
box_mask = self.extract_mask(image, loc)[by : by + bh, bx : bx + bw]
expected = _ALPHA_WIDTH_FRAC * w
best: tuple[float, int, int, int, int] | None = None
for scale in np.linspace(*_ALPHA_ALIGN_SEARCH):
gw, gh = int(expected * scale), int(_ALPHA_HEIGHT_FRAC * w * scale)
if gw < 16 or gh < 4 or gw >= bw or gh >= bh:
continue
t = cv2.resize(sil, (gw, gh), interpolation=cv2.INTER_NEAREST)
_, score, _, top_left = cv2.minMaxLoc(cv2.matchTemplate(box_mask, t, cv2.TM_CCOEFF_NORMED))
if best is None or score > best[0]:
best = (score, gw, gh, top_left[0], top_left[1])
if best is None:
return None
_, gw, gh, ox, oy = best
ax, ay = bx + ox, by + oy
amap = np.zeros((h, w), np.float32)
amap[ay : ay + gh, ax : ax + gw] = cv2.resize(at, (gw, gh), interpolation=cv2.INTER_LINEAR)
return amap, (ax, ay, gw, gh)
def _apply_reverse_alpha(self, image: NDArray[Any], amap: NDArray[Any]) -> NDArray[Any]:
"""Invert the alpha blend with ``amap``: ``original = (wm - a*logo)/(1-a)``."""
a3 = np.clip(amap, 0.0, 1.0)[:, :, None]
logo = np.array(_ALPHA_LOGO_BGR, np.float32)
return np.clip((image.astype(np.float32) - a3 * logo) / np.clip(1.0 - a3, 0.25, 1.0), 0, 255).astype(np.uint8)
def remove_watermark_reverse_alpha(self, image: NDArray[Any], *, residual_inpaint: bool = True) -> NDArray[Any]:
"""Recover the original pixels by inverting the alpha blend, then clear the
residual outline with a thin inpaint over the glyph footprint.
Placement: fixed geometry AND the NCC-aligned placement are always tried and
the one leaving the least residual mark (lowest re-``detect`` confidence) is
kept -- the flat capture is far off the real-photo width and the mark
re-rasterizes per image, so fixed geometry alone is not reliable. A single
capture cannot pixel-cancel the upscaled mark, so a deliberately THIN residual
inpaint (``_RESIDUAL_*``) follows. Call only when
:meth:`reverse_alpha_available` and the mark is detected.
"""
# Normalize to 3-channel BGR so a 2D grayscale or 4-channel BGRA input does
# not break the reverse-alpha math (which assumes a 3-channel logo).
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
# An image too small to hold the mark would make the geometry boxes degenerate
# and feed cv2.resize a ~1-px-tall target; skip cv2 entirely (mirrors Jimeng).
h, w = image.shape[:2]
if h < 32 or w < 64:
return image.copy()
maps = [c for c in (self._fixed_alpha_map(image), self._aligned_alpha_map(image)) if c is not None]
if not maps:
return image.copy()
best_out: NDArray[Any] | None = None
best_amap: NDArray[Any] | None = None
best_residual = float("inf")
for amap, _region in maps:
out = self._apply_reverse_alpha(image, amap)
residual = self.detect(out).confidence
if residual < best_residual:
best_residual, best_out, best_amap = residual, out, amap
if best_out is None or best_amap is None: # pragma: no cover - maps is non-empty
return image.copy()
if residual_inpaint:
kernel = np.ones((_RESIDUAL_DILATE, _RESIDUAL_DILATE), np.uint8)
rm = cv2.dilate((best_amap > _RESIDUAL_ALPHA_FLOOR).astype(np.uint8) * 255, kernel)
best_out = cv2.inpaint(best_out, rm, _RESIDUAL_INPAINT_RADIUS, cv2.INPAINT_NS)
return best_out
def load_image_bgr(path: str | Path) -> NDArray[Any]:
"""Read an image as BGR ndarray (helper for scripts/tests)."""
from remove_ai_watermarks import image_io
img = image_io.imread(path, cv2.IMREAD_COLOR)
if img is None:
raise FileNotFoundError(f"Failed to read image: {path}")
return img
def __init__(self) -> None:
super().__init__(_CONFIG)
+31 -57
View File
@@ -159,71 +159,45 @@ def _gemini_remove(
return result, det.region
def _doubao_detect(image: NDArray[Any]) -> MarkDetection:
d = _engine("doubao").detect(image)
return MarkDetection("doubao", "Doubao 豆包AI生成 text", "bottom-right", d.detected, d.confidence, d.region)
# The three text-mark engines (Doubao/Jimeng/Samsung) share the TextMarkEngine
# interface, so one parameterized adapter pair drives all of them -- a new
# reverse-alpha text mark is one `_text_mark(...)` row below, not another copy-paste
# of these bodies. Removal is reverse-alpha only: applied when the mark is detected
# (or forced) and the alpha asset loads, otherwise skipped (no hallucination on a
# clean corner).
def _text_mark_detect(key: str, label: str, location: str) -> Callable[[NDArray[Any]], MarkDetection]:
def detect(image: NDArray[Any]) -> MarkDetection:
d = _engine(key).detect(image)
return MarkDetection(key, label, location, d.detected, d.confidence, d.region)
return detect
def _doubao_remove(
image: NDArray[Any], _inpaint_method: InpaintMethod, _inpaint: bool, _strength: float, force: bool
) -> tuple[NDArray[Any], Region | None]:
# Reverse-alpha only: apply when the mark is present AND the resolution is in
# the alpha map's calibrated band. Outside it we do NOT inpaint (no
# hallucination) -- removal is skipped until a capture for that resolution.
engine = _engine("doubao")
det = engine.detect(image)
if (det.detected or force) and engine.reverse_alpha_available(image):
return engine.remove_watermark_reverse_alpha(image), (det.region if det.detected else None)
return image.copy(), None
def _text_mark_remove(key: str) -> Callable[..., tuple[NDArray[Any], Region | None]]:
def remove(
image: NDArray[Any], _inpaint_method: InpaintMethod, _inpaint: bool, _strength: float, force: bool
) -> tuple[NDArray[Any], Region | None]:
engine = _engine(key)
det = engine.detect(image)
if (det.detected or force) and engine.reverse_alpha_available(image):
return engine.remove_watermark_reverse_alpha(image), (det.region if det.detected else None)
return image.copy(), None
return remove
def _jimeng_detect(image: NDArray[Any]) -> MarkDetection:
d = _engine("jimeng").detect(image)
return MarkDetection("jimeng", "Jimeng 即梦AI wordmark", "bottom-right", d.detected, d.confidence, d.region)
def _jimeng_remove(
image: NDArray[Any], _inpaint_method: InpaintMethod, _inpaint: bool, _strength: float, force: bool
) -> tuple[NDArray[Any], Region | None]:
# Reverse-alpha (with an always-on residual inpaint over the glyph footprint,
# see the engine): apply when the mark is present and the alpha asset loads.
# Skipped otherwise (no hallucination on a clean corner).
engine = _engine("jimeng")
det = engine.detect(image)
if (det.detected or force) and engine.reverse_alpha_available(image):
return engine.remove_watermark_reverse_alpha(image), (det.region if det.detected else None)
return image.copy(), None
def _samsung_detect(image: NDArray[Any]) -> MarkDetection:
d = _engine("samsung").detect(image)
return MarkDetection("samsung", "Samsung Galaxy AI text", "bottom-left", d.detected, d.confidence, d.region)
def _samsung_remove(
image: NDArray[Any], _inpaint_method: InpaintMethod, _inpaint: bool, _strength: float, force: bool
) -> tuple[NDArray[Any], Region | None]:
# Reverse-alpha (with an always-on thin residual inpaint over the glyph
# footprint, see the engine): apply when the mark is present and the alpha asset
# loads. Skipped otherwise (no hallucination on a clean corner).
engine = _engine("samsung")
det = engine.detect(image)
if (det.detected or force) and engine.reverse_alpha_available(image):
return engine.remove_watermark_reverse_alpha(image), (det.region if det.detected else None)
return image.copy(), None
def _text_mark(key: str, label: str, location: str) -> KnownMark:
"""A reverse-alpha text-mark registry row (Doubao/Jimeng/Samsung)."""
return KnownMark(
key, label, location, True, "reverse-alpha", _text_mark_detect(key, label, location), _text_mark_remove(key)
)
_REGISTRY: tuple[KnownMark, ...] = (
KnownMark("gemini", "Google Gemini sparkle", "bottom-right", True, "reverse-alpha", _gemini_detect, _gemini_remove),
KnownMark(
"doubao", "Doubao 豆包AI生成 text", "bottom-right", True, "reverse-alpha", _doubao_detect, _doubao_remove
),
KnownMark(
"jimeng", "Jimeng 即梦AI wordmark", "bottom-right", True, "reverse-alpha", _jimeng_detect, _jimeng_remove
),
KnownMark(
"samsung", "Samsung Galaxy AI text", "bottom-left", True, "reverse-alpha", _samsung_detect, _samsung_remove
),
_text_mark("doubao", "Doubao 豆包AI生成 text", "bottom-right"),
_text_mark("jimeng", "Jimeng 即梦AI wordmark", "bottom-right"),
_text_mark("samsung", "Samsung Galaxy AI text", "bottom-left"),
)
+15
View File
@@ -374,6 +374,21 @@ class TestAllCommand:
result = runner.invoke(main, ["all", "/nonexistent/file.png"])
assert result.exit_code != 0
def test_all_visible_step_uses_registry(self, runner, sample_png, tmp_path):
"""Regression (#1): the `all` visible step must route through the registry
(best_auto_mark), so Doubao/Jimeng/Samsung text marks are handled -- not just
the Gemini sparkle via a hardcoded GeminiEngine."""
mock_cls, _mock_engine = _mock_invisible_engine()
output = tmp_path / "clean.png"
with (
patch("remove_ai_watermarks.cli.InvisibleEngine", mock_cls, create=True),
patch("remove_ai_watermarks.invisible_engine.InvisibleEngine", mock_cls),
patch("remove_ai_watermarks.watermark_registry.best_auto_mark", return_value=None) as mock_best,
):
result = runner.invoke(main, ["all", str(sample_png), "-o", str(output)])
assert result.exit_code == 0, result.output
mock_best.assert_called() # the registry auto-detector drove the visible pass
def test_all_preserves_rgba_across_invisible_step(self, runner, tmp_path):
"""Regression: ``all`` must keep transparency even when the invisible
step writes a 3-channel result (as the real diffusion engine does).
+13
View File
@@ -124,6 +124,19 @@ class TestGeminiEngine:
result = self.engine.remove_watermark_custom(image, (10, 10, 48, 48))
assert result.shape == image.shape
def test_detect_on_grayscale_does_not_crash(self):
# A 2D grayscale array reaching detect_watermark (registry adapter / library
# API) must not crash the FP-gate's axis=2 reduction; it is normalized to BGR.
gray = np.full((300, 300), 100, dtype=np.uint8)
result = self.engine.detect_watermark(gray)
assert result is not None
def test_remove_on_bgra_returns_3_channel(self):
bgra = np.zeros((300, 300, 4), dtype=np.uint8)
bgra[..., 3] = 255
result = self.engine.remove_watermark(bgra)
assert result.shape == (300, 300, 3)
def test_remove_watermark_custom_large_region(self, tmp_image_path):
image = cv2.imread(str(tmp_image_path), cv2.IMREAD_COLOR)
result = self.engine.remove_watermark_custom(image, (10, 10, 96, 96))
+26
View File
@@ -59,6 +59,32 @@ class TestUnicodeRoundTrip:
assert np.array_equal(out, src)
class TestToBgr:
def test_grayscale_2d_promoted_to_bgr(self) -> None:
gray = np.full((4, 5), 120, dtype=np.uint8)
out = image_io.to_bgr(gray)
assert out.shape == (4, 5, 3)
# GRAY2BGR replicates the channel, so all three match the source.
assert np.array_equal(out[..., 0], gray)
assert np.array_equal(out[..., 0], out[..., 2])
def test_single_channel_3d_promoted(self) -> None:
gray = np.full((4, 5, 1), 7, dtype=np.uint8)
assert image_io.to_bgr(gray).shape == (4, 5, 3)
def test_bgra_dropped_to_bgr(self) -> None:
bgra = np.zeros((4, 5, 4), dtype=np.uint8)
bgra[..., :3] = (10, 120, 240)
out = image_io.to_bgr(bgra)
assert out.shape == (4, 5, 3)
assert np.array_equal(out, bgra[..., :3])
def test_bgr_returned_unchanged(self) -> None:
bgr = _make_bgr()
out = image_io.to_bgr(bgr)
assert out is bgr # 3-channel: no copy
class TestFailureSemantics:
def test_missing_file_returns_none(self, tmp_path: Path) -> None:
assert image_io.imread(tmp_path / "does-not-exist-不存在.png") is None
+7 -13
View File
@@ -20,7 +20,6 @@ from remove_ai_watermarks.noai.watermark_profiles import (
GEMINI_STRENGTH,
OPENAI_STRENGTH,
UNKNOWN_STRENGTH,
get_model_id_for_profile,
normalize_profile,
resolve_strength,
strength_default_help,
@@ -111,24 +110,19 @@ class TestMpsErrorDetection:
class TestModelProfiles:
"""Tests for watermark_profiles.py."""
"""Tests for watermark_profiles.py profile-name normalization."""
def test_sdxl_profile(self):
assert get_model_id_for_profile("sdxl") == "stabilityai/stable-diffusion-xl-base-1.0"
def test_canonical_profiles_unchanged(self):
assert normalize_profile("sdxl") == "sdxl"
assert normalize_profile("controlnet") == "controlnet"
def test_default_alias_resolves_to_sdxl(self):
# "default" is the legacy alias for "sdxl" (back-compat for existing scripts).
assert get_model_id_for_profile("default") == "stabilityai/stable-diffusion-xl-base-1.0"
assert normalize_profile("default") == "sdxl"
assert normalize_profile("controlnet") == "controlnet"
def test_controlnet_profile(self):
# controlnet shares the SDXL base checkpoint (the ControlNet is an add-on).
assert get_model_id_for_profile("controlnet") == "stabilityai/stable-diffusion-xl-base-1.0"
def test_unknown_profile_raises(self):
with pytest.raises(ValueError, match="Unknown model profile"):
get_model_id_for_profile("nonexistent")
def test_normalize_is_case_and_whitespace_insensitive(self):
assert normalize_profile(" Default ") == "sdxl"
assert normalize_profile("CONTROLNET") == "controlnet"
class TestResolveStrength: