Files
2025-10-22 15:38:32 +08:00

315 lines
12 KiB
Python

import tempfile
import os
from PIL import Image
from playwright.sync_api import sync_playwright
import os
import open_clip
import torch
from skimage.metrics import structural_similarity as ssim
import numpy as np
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
from lap import lapjv
from multiprocessing import Pool
# from selenium import webdriver
# from selenium.webdriver.firefox.options import Options
os.makedirs("./images", exist_ok=True)
def solve_assignment_lapjv(cost_matrix):
_, col_idx, _ = lapjv(cost_matrix)
return col_idx
def process_imgs(image1, image2, max_size):
# Get the original sizes
width1, height1 = image1.size
width2, height2 = image2.size
# Determine the new dimensions (max of both images' width and height)
new_width = max(width1, width2)
new_height = max(height1, height2)
# Pad images to the new dimensions with random values
def pad_image(image, new_width, new_height):
# Create a random padded background with the new dimensions
random_padding = np.random.randint(0, 256, (new_height, new_width, 3), dtype=np.uint8)
padded_image = Image.fromarray(random_padding)
# Paste the original image onto the padded background (placing in the top-left corner)
padded_image.paste(image, (0, 0))
return padded_image
padded_image1 = pad_image(image1, new_width, new_height)
padded_image2 = pad_image(image2, new_width, new_height)
# Calculate the aspect ratio for resizing to the max size
aspect_ratio = min(max_size / new_width, max_size / new_height)
new_size = (int(new_width * aspect_ratio), int(new_height * aspect_ratio))
# Resize the padded images to the specified max size
resized_image1 = padded_image1.resize(new_size, Image.LANCZOS)
resized_image2 = padded_image2.resize(new_size, Image.LANCZOS)
# resized_image1.show()
# resized_image2.show()
# Convert the images to numpy arrays with dtype int16
array1 = np.array(resized_image1).astype(np.int16)
array2 = np.array(resized_image2).astype(np.int16)
return array1, array2
def calculate_emd_sim(img_array1, img_array2):
"""img_array1 is the original image, img_array2 is the generated image"""
if len(img_array1.shape) == 2:
flat_array1 = img_array1.flatten()
flat_array2 = img_array2.flatten()
cost_matrix = np.abs(flat_array1[:, None] - flat_array2[None, :])
_, col_idx, _ = lapjv(cost_matrix)
total_min_cost = cost_matrix[np.arange(len(flat_array1)), col_idx].sum()
max_cost = np.maximum(flat_array1, 255 - flat_array1).sum()
normalized_min_cost = total_min_cost / max_cost
else:
red1, green1, blue1 = img_array1[:, :, 0], img_array1[:, :, 1], img_array1[:, :, 2]
red2, green2, blue2 = img_array2[:, :, 0], img_array2[:, :, 1], img_array2[:, :, 2]
flat_red1, flat_green1, flat_blue1 = red1.flatten(), green1.flatten(), blue1.flatten()
flat_red2, flat_green2, flat_blue2 = red2.flatten(), green2.flatten(), blue2.flatten()
cost_matrix_red = np.abs(flat_red1[:, None] - flat_red2[None, :]).astype(np.float32)
cost_matrix_green = np.abs(flat_green1[:, None] - flat_green2[None, :]).astype(np.float32)
cost_matrix_blue = np.abs(flat_blue1[:, None] - flat_blue2[None, :]).astype(np.float32)
with Pool(processes=3) as pool:
results = pool.map(solve_assignment_lapjv, [cost_matrix_red, cost_matrix_green, cost_matrix_blue])
col_ind_red = results[0]
col_ind_green = results[1]
col_ind_blue = results[2]
min_cost_red_lapjv = cost_matrix_red[np.arange(len(flat_red1)), col_ind_red].sum()
min_cost_green_lapjv = cost_matrix_green[np.arange(len(flat_green1)), col_ind_green].sum()
min_cost_blue_lapjv = cost_matrix_blue[np.arange(len(flat_blue1)), col_ind_blue].sum()
total_min_cost_lapjv = min_cost_red_lapjv + min_cost_green_lapjv + min_cost_blue_lapjv
max_cost = np.maximum(flat_red1, 255 - flat_red1).sum() + np.maximum(flat_green1, 255 - flat_green1).sum() + np.maximum(flat_blue1, 255 - flat_blue1).sum()
normalized_min_cost = total_min_cost_lapjv / max_cost
# return {"cost": total_min_cost_lapjv, "normalized_sim": 1 - normalized_min_cost}
return 1 - normalized_min_cost
def emd_similarity(image1_path, image2_path, max_size=64, mode="L"):
"""not symmetric, the first image is the original image, the score is normalized according to the original image"""
image1 = Image.open(image1_path).convert(mode) if type(image1_path) == str else image1_path.convert(mode)
image2 = Image.open(image2_path).convert(mode) if type(image2_path) == str else image2_path.convert(mode)
array1, array2 = process_imgs(image1, image2, max_size)
similarity = calculate_emd_sim(array1, array2)
return similarity
class CLIPScorer:
def __init__(self, model_name='ViT-B-32-quickgelu', pretrained='openai'):
"""
Initializes the CLIPScorer with the specified model.
Args:
model_name (str): The name of the CLIP model to use.
pretrained (str): Specifies whether to load pre-trained weights.
"""
self.device = "cuda" if torch.cuda.is_available() else torch.device("mps") if torch.backends.mps.is_available() else "cpu"
self.device = "cpu" # Force CPU for compatibility
self.model, _, self.preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained)
self.model.to(self.device)
def clip_score(self, img1: Image.Image, img2: Image.Image) -> float:
"""
Calculates the CLIP score (cosine similarity) between two images.
Args:
img1 (Image.Image): The first image as a PIL Image.
img2 (Image.Image): The second image as a PIL Image.
Returns:
float: The cosine similarity score between the two images.
"""
# Preprocess the images
image1 = self.preprocess(img1).unsqueeze(0).to(self.device)
image2 = self.preprocess(img2).unsqueeze(0).to(self.device)
# Get the image features from CLIP using openclip
with torch.no_grad():
image1_features = self.model.encode_image(image1)
image2_features = self.model.encode_image(image2)
# Normalize the features to unit length
image1_features /= image1_features.norm(dim=-1, keepdim=True)
image2_features /= image2_features.norm(dim=-1, keepdim=True)
# Calculate cosine similarity between the two image features
cosine_similarity = torch.nn.functional.cosine_similarity(image1_features, image2_features)
return cosine_similarity.item()
clip_scorer = CLIPScorer()
def ssim_score(img1, img2):
# resize images to match the size of the smaller image
img1, img2 = process_imgs(img1, img2, 512)
return ssim(img1, img2, channel_axis=-1, data_range=255)
def mae_score(img1, img2):
"""mean absolute error, it is a pixel-based metric"""
img1, img2 = process_imgs(img1, img2, 512)
# max_mae = np.mean(np.maximum(img1, 255 - img1))
mae = np.mean(np.abs(img1 - img2))
# return {"mae": mae, "normalized_mae": 1 - mae / max_mae}
return mae
def clip_mae(img1, img2):
"""clip - mae/255"""
mae = mae_score(img1, img2)
clip = clip_scorer.clip_score(img1, img2)
return clip - mae/255
CSS_PATH="/apdcephfs_qy3/share_301812049/jarviswang/wt/yx/VLM-R1/src/open-r1-multimodal/src/open_r1/tailwind.min.css"
PLACEHOLDER_PATH="/apdcephfs_qy3/share_301812049/jarviswang/wt/yx/VLM-R1/src/open-r1-multimodal/src/open_r1/placeholder.jpg"
def preprocess_html(html_str: str) -> str:
# css_path = os.path.join(os.path.dirname(__file__), "tailwind.min.css")
# placeholder_path = os.path.join(os.path.dirname(__file__), "placeholder.jpg")
html_str = html_str.replace("placeholder.jpg", PLACEHOLDER_PATH)
html_str = html_str.replace("https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css", CSS_PATH)
return html_str
def generate_screenshot(html_content, path):
html_content = preprocess_html(html_content)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# Set consistent rendering parameters
page.set_viewport_size({"width": 1280, "height": 720})
page.route("**/*", lambda route: route.continue_()) # Allow external resources
# Render and screenshot
page.set_content(html_content, timeout=100000)
page.wait_for_load_state("networkidle", timeout=100000)
page.screenshot(
path=path,
full_page=True,
type="png",
)
browser.close()
def rendered_score(ref_html, gen_html, score_func, verbose=True):
"""Calculate visual similarity score between two HTML documents using screenshots."""
if not verbose:
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as ref_file, \
tempfile.NamedTemporaryFile(delete=False, suffix=".png") as gen_file:
ref_path = ref_file.name
gen_path = gen_file.name
else:
ref_name = str(abs(hash(ref_html)))[:6]
gen_name = f"{ref_name}_{str(abs(hash(gen_html)))[:6]}"
ref_path = f"./images/{ref_name}.png"
gen_path = f"./images/{gen_name}.png"
with open(f"./images/{gen_name}.html", "w") as f:
f.write(gen_html)
try:
# Generate screenshots synchronously
generate_screenshot(ref_html, ref_path)
generate_screenshot(gen_html, gen_path)
# Calculate similarity score
with Image.open(ref_path) as ref_img, Image.open(gen_path) as gen_img:
if type(score_func) == list:
score = {}
for func in score_func:
score.update({func.__name__: func(ref_img, gen_img)})
return score
return {score_func.__name__: score_func(ref_img, gen_img)}
finally:
# Cleanup temp files
# os.unlink(ref_path)
# os.unlink(gen_path)
pass
import json
import tqdm
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
def process_item(i, gt, exp_json, clip_scorer):
gt_html = gt[i]
exp_html = exp_json[i]
# calculate score
score = rendered_score(gt_html, exp_html, [mae_score, clip_scorer.clip_score])
return score
if __name__ == "__main__":
# test_path = "../LLaMA-Factory/data/CodeMLLM/websight/test.json"
test_path="./test_selected.json"
exp_name = "original"
exp_path = "mrweb_3b_original.jsonl"
# Load data
gt = json.load(open(test_path, "r"))
gt = [x["conversations"][1]["value"] for x in gt]
with open(exp_path, "r") as f:
exp_json = [json.loads(line)["predict"] for line in f.readlines()]
# # Load jsonl files
# with open("mrweb_3b_original.jsonl", "r") as f:
# base_json = [json.loads(line)["predict"] for line in f.readlines()]
# with open("mrweb_3b_sft_2000.jsonl", "r") as f:
# sft_json = [json.loads(line)["predict"] for line in f.readlines()]
# Initialize CLIP scorer (once per process)
clip_scorer = CLIPScorer()
# Create multiprocessing pool
from multiprocessing.pool import ThreadPool
num_processes = 6
with ThreadPool(processes=num_processes) as pool:
func = partial(process_item, gt=gt, exp_json=exp_json, clip_scorer=clip_scorer)
results = list(tqdm.tqdm(
pool.imap(func, range(len(gt))),
total=len(gt)))
# Summarize average results
keys = results[0].keys()
summary = {key: [] for key in keys}
for result in results:
for key in keys:
summary[key].append(result[key])
summary = {key: np.mean(value) for key, value in summary.items()}
print(f"Average results for {exp_name}:")
for key, value in summary.items():
print(f"{key}: {value:.4f}")