mirror of
https://github.com/leigest519/ScreenCoder.git
synced 2026-02-12 17:52:47 +00:00
315 lines
12 KiB
Python
315 lines
12 KiB
Python
import tempfile
|
|
import os
|
|
from PIL import Image
|
|
from playwright.sync_api import sync_playwright
|
|
import os
|
|
import open_clip
|
|
import torch
|
|
from skimage.metrics import structural_similarity as ssim
|
|
import numpy as np
|
|
import warnings
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
from lap import lapjv
|
|
from multiprocessing import Pool
|
|
# from selenium import webdriver
|
|
# from selenium.webdriver.firefox.options import Options
|
|
os.makedirs("./images", exist_ok=True)
|
|
|
|
def solve_assignment_lapjv(cost_matrix):
|
|
_, col_idx, _ = lapjv(cost_matrix)
|
|
return col_idx
|
|
|
|
|
|
def process_imgs(image1, image2, max_size):
|
|
# Get the original sizes
|
|
width1, height1 = image1.size
|
|
width2, height2 = image2.size
|
|
|
|
# Determine the new dimensions (max of both images' width and height)
|
|
new_width = max(width1, width2)
|
|
new_height = max(height1, height2)
|
|
|
|
# Pad images to the new dimensions with random values
|
|
def pad_image(image, new_width, new_height):
|
|
# Create a random padded background with the new dimensions
|
|
random_padding = np.random.randint(0, 256, (new_height, new_width, 3), dtype=np.uint8)
|
|
padded_image = Image.fromarray(random_padding)
|
|
|
|
# Paste the original image onto the padded background (placing in the top-left corner)
|
|
padded_image.paste(image, (0, 0))
|
|
|
|
return padded_image
|
|
|
|
padded_image1 = pad_image(image1, new_width, new_height)
|
|
padded_image2 = pad_image(image2, new_width, new_height)
|
|
|
|
# Calculate the aspect ratio for resizing to the max size
|
|
aspect_ratio = min(max_size / new_width, max_size / new_height)
|
|
new_size = (int(new_width * aspect_ratio), int(new_height * aspect_ratio))
|
|
|
|
# Resize the padded images to the specified max size
|
|
resized_image1 = padded_image1.resize(new_size, Image.LANCZOS)
|
|
resized_image2 = padded_image2.resize(new_size, Image.LANCZOS)
|
|
|
|
# resized_image1.show()
|
|
# resized_image2.show()
|
|
|
|
# Convert the images to numpy arrays with dtype int16
|
|
array1 = np.array(resized_image1).astype(np.int16)
|
|
array2 = np.array(resized_image2).astype(np.int16)
|
|
|
|
return array1, array2
|
|
|
|
|
|
|
|
def calculate_emd_sim(img_array1, img_array2):
|
|
"""img_array1 is the original image, img_array2 is the generated image"""
|
|
if len(img_array1.shape) == 2:
|
|
flat_array1 = img_array1.flatten()
|
|
flat_array2 = img_array2.flatten()
|
|
|
|
cost_matrix = np.abs(flat_array1[:, None] - flat_array2[None, :])
|
|
_, col_idx, _ = lapjv(cost_matrix)
|
|
|
|
total_min_cost = cost_matrix[np.arange(len(flat_array1)), col_idx].sum()
|
|
max_cost = np.maximum(flat_array1, 255 - flat_array1).sum()
|
|
normalized_min_cost = total_min_cost / max_cost
|
|
|
|
else:
|
|
red1, green1, blue1 = img_array1[:, :, 0], img_array1[:, :, 1], img_array1[:, :, 2]
|
|
red2, green2, blue2 = img_array2[:, :, 0], img_array2[:, :, 1], img_array2[:, :, 2]
|
|
|
|
flat_red1, flat_green1, flat_blue1 = red1.flatten(), green1.flatten(), blue1.flatten()
|
|
flat_red2, flat_green2, flat_blue2 = red2.flatten(), green2.flatten(), blue2.flatten()
|
|
|
|
cost_matrix_red = np.abs(flat_red1[:, None] - flat_red2[None, :]).astype(np.float32)
|
|
cost_matrix_green = np.abs(flat_green1[:, None] - flat_green2[None, :]).astype(np.float32)
|
|
cost_matrix_blue = np.abs(flat_blue1[:, None] - flat_blue2[None, :]).astype(np.float32)
|
|
|
|
with Pool(processes=3) as pool:
|
|
results = pool.map(solve_assignment_lapjv, [cost_matrix_red, cost_matrix_green, cost_matrix_blue])
|
|
col_ind_red = results[0]
|
|
col_ind_green = results[1]
|
|
col_ind_blue = results[2]
|
|
|
|
min_cost_red_lapjv = cost_matrix_red[np.arange(len(flat_red1)), col_ind_red].sum()
|
|
min_cost_green_lapjv = cost_matrix_green[np.arange(len(flat_green1)), col_ind_green].sum()
|
|
min_cost_blue_lapjv = cost_matrix_blue[np.arange(len(flat_blue1)), col_ind_blue].sum()
|
|
|
|
total_min_cost_lapjv = min_cost_red_lapjv + min_cost_green_lapjv + min_cost_blue_lapjv
|
|
max_cost = np.maximum(flat_red1, 255 - flat_red1).sum() + np.maximum(flat_green1, 255 - flat_green1).sum() + np.maximum(flat_blue1, 255 - flat_blue1).sum()
|
|
normalized_min_cost = total_min_cost_lapjv / max_cost
|
|
|
|
# return {"cost": total_min_cost_lapjv, "normalized_sim": 1 - normalized_min_cost}
|
|
return 1 - normalized_min_cost
|
|
|
|
def emd_similarity(image1_path, image2_path, max_size=64, mode="L"):
|
|
"""not symmetric, the first image is the original image, the score is normalized according to the original image"""
|
|
image1 = Image.open(image1_path).convert(mode) if type(image1_path) == str else image1_path.convert(mode)
|
|
image2 = Image.open(image2_path).convert(mode) if type(image2_path) == str else image2_path.convert(mode)
|
|
|
|
array1, array2 = process_imgs(image1, image2, max_size)
|
|
similarity = calculate_emd_sim(array1, array2)
|
|
|
|
return similarity
|
|
|
|
class CLIPScorer:
|
|
def __init__(self, model_name='ViT-B-32-quickgelu', pretrained='openai'):
|
|
"""
|
|
Initializes the CLIPScorer with the specified model.
|
|
|
|
Args:
|
|
model_name (str): The name of the CLIP model to use.
|
|
pretrained (str): Specifies whether to load pre-trained weights.
|
|
"""
|
|
self.device = "cuda" if torch.cuda.is_available() else torch.device("mps") if torch.backends.mps.is_available() else "cpu"
|
|
self.device = "cpu" # Force CPU for compatibility
|
|
self.model, _, self.preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained)
|
|
self.model.to(self.device)
|
|
|
|
def clip_score(self, img1: Image.Image, img2: Image.Image) -> float:
|
|
"""
|
|
Calculates the CLIP score (cosine similarity) between two images.
|
|
|
|
Args:
|
|
img1 (Image.Image): The first image as a PIL Image.
|
|
img2 (Image.Image): The second image as a PIL Image.
|
|
|
|
Returns:
|
|
float: The cosine similarity score between the two images.
|
|
"""
|
|
# Preprocess the images
|
|
image1 = self.preprocess(img1).unsqueeze(0).to(self.device)
|
|
image2 = self.preprocess(img2).unsqueeze(0).to(self.device)
|
|
|
|
# Get the image features from CLIP using openclip
|
|
with torch.no_grad():
|
|
image1_features = self.model.encode_image(image1)
|
|
image2_features = self.model.encode_image(image2)
|
|
|
|
# Normalize the features to unit length
|
|
image1_features /= image1_features.norm(dim=-1, keepdim=True)
|
|
image2_features /= image2_features.norm(dim=-1, keepdim=True)
|
|
|
|
# Calculate cosine similarity between the two image features
|
|
cosine_similarity = torch.nn.functional.cosine_similarity(image1_features, image2_features)
|
|
return cosine_similarity.item()
|
|
|
|
clip_scorer = CLIPScorer()
|
|
|
|
def ssim_score(img1, img2):
|
|
# resize images to match the size of the smaller image
|
|
img1, img2 = process_imgs(img1, img2, 512)
|
|
return ssim(img1, img2, channel_axis=-1, data_range=255)
|
|
|
|
|
|
def mae_score(img1, img2):
|
|
"""mean absolute error, it is a pixel-based metric"""
|
|
img1, img2 = process_imgs(img1, img2, 512)
|
|
# max_mae = np.mean(np.maximum(img1, 255 - img1))
|
|
mae = np.mean(np.abs(img1 - img2))
|
|
# return {"mae": mae, "normalized_mae": 1 - mae / max_mae}
|
|
return mae
|
|
|
|
def clip_mae(img1, img2):
|
|
"""clip - mae/255"""
|
|
mae = mae_score(img1, img2)
|
|
clip = clip_scorer.clip_score(img1, img2)
|
|
return clip - mae/255
|
|
|
|
CSS_PATH="/apdcephfs_qy3/share_301812049/jarviswang/wt/yx/VLM-R1/src/open-r1-multimodal/src/open_r1/tailwind.min.css"
|
|
PLACEHOLDER_PATH="/apdcephfs_qy3/share_301812049/jarviswang/wt/yx/VLM-R1/src/open-r1-multimodal/src/open_r1/placeholder.jpg"
|
|
|
|
def preprocess_html(html_str: str) -> str:
|
|
# css_path = os.path.join(os.path.dirname(__file__), "tailwind.min.css")
|
|
# placeholder_path = os.path.join(os.path.dirname(__file__), "placeholder.jpg")
|
|
html_str = html_str.replace("placeholder.jpg", PLACEHOLDER_PATH)
|
|
html_str = html_str.replace("https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css", CSS_PATH)
|
|
return html_str
|
|
|
|
|
|
def generate_screenshot(html_content, path):
|
|
|
|
html_content = preprocess_html(html_content)
|
|
|
|
with sync_playwright() as p:
|
|
|
|
browser = p.chromium.launch(headless=True)
|
|
page = browser.new_page()
|
|
|
|
# Set consistent rendering parameters
|
|
page.set_viewport_size({"width": 1280, "height": 720})
|
|
page.route("**/*", lambda route: route.continue_()) # Allow external resources
|
|
|
|
|
|
# Render and screenshot
|
|
page.set_content(html_content, timeout=100000)
|
|
page.wait_for_load_state("networkidle", timeout=100000)
|
|
page.screenshot(
|
|
path=path,
|
|
full_page=True,
|
|
type="png",
|
|
)
|
|
browser.close()
|
|
|
|
|
|
def rendered_score(ref_html, gen_html, score_func, verbose=True):
|
|
"""Calculate visual similarity score between two HTML documents using screenshots."""
|
|
|
|
if not verbose:
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as ref_file, \
|
|
tempfile.NamedTemporaryFile(delete=False, suffix=".png") as gen_file:
|
|
|
|
ref_path = ref_file.name
|
|
gen_path = gen_file.name
|
|
else:
|
|
ref_name = str(abs(hash(ref_html)))[:6]
|
|
gen_name = f"{ref_name}_{str(abs(hash(gen_html)))[:6]}"
|
|
ref_path = f"./images/{ref_name}.png"
|
|
gen_path = f"./images/{gen_name}.png"
|
|
with open(f"./images/{gen_name}.html", "w") as f:
|
|
f.write(gen_html)
|
|
|
|
|
|
try:
|
|
# Generate screenshots synchronously
|
|
generate_screenshot(ref_html, ref_path)
|
|
generate_screenshot(gen_html, gen_path)
|
|
|
|
# Calculate similarity score
|
|
with Image.open(ref_path) as ref_img, Image.open(gen_path) as gen_img:
|
|
if type(score_func) == list:
|
|
score = {}
|
|
for func in score_func:
|
|
score.update({func.__name__: func(ref_img, gen_img)})
|
|
return score
|
|
|
|
return {score_func.__name__: score_func(ref_img, gen_img)}
|
|
|
|
finally:
|
|
# Cleanup temp files
|
|
# os.unlink(ref_path)
|
|
# os.unlink(gen_path)
|
|
pass
|
|
|
|
|
|
|
|
import json
|
|
import tqdm
|
|
import numpy as np
|
|
from multiprocessing import Pool, cpu_count
|
|
from functools import partial
|
|
|
|
def process_item(i, gt, exp_json, clip_scorer):
|
|
gt_html = gt[i]
|
|
exp_html = exp_json[i]
|
|
|
|
# calculate score
|
|
score = rendered_score(gt_html, exp_html, [mae_score, clip_scorer.clip_score])
|
|
return score
|
|
|
|
if __name__ == "__main__":
|
|
# test_path = "../LLaMA-Factory/data/CodeMLLM/websight/test.json"
|
|
test_path="./test_selected.json"
|
|
exp_name = "original"
|
|
exp_path = "mrweb_3b_original.jsonl"
|
|
# Load data
|
|
gt = json.load(open(test_path, "r"))
|
|
gt = [x["conversations"][1]["value"] for x in gt]
|
|
|
|
|
|
with open(exp_path, "r") as f:
|
|
exp_json = [json.loads(line)["predict"] for line in f.readlines()]
|
|
|
|
|
|
# # Load jsonl files
|
|
# with open("mrweb_3b_original.jsonl", "r") as f:
|
|
# base_json = [json.loads(line)["predict"] for line in f.readlines()]
|
|
# with open("mrweb_3b_sft_2000.jsonl", "r") as f:
|
|
# sft_json = [json.loads(line)["predict"] for line in f.readlines()]
|
|
|
|
# Initialize CLIP scorer (once per process)
|
|
clip_scorer = CLIPScorer()
|
|
|
|
# Create multiprocessing pool
|
|
from multiprocessing.pool import ThreadPool
|
|
num_processes = 6
|
|
with ThreadPool(processes=num_processes) as pool:
|
|
func = partial(process_item, gt=gt, exp_json=exp_json, clip_scorer=clip_scorer)
|
|
results = list(tqdm.tqdm(
|
|
pool.imap(func, range(len(gt))),
|
|
total=len(gt)))
|
|
|
|
# Summarize average results
|
|
keys = results[0].keys()
|
|
summary = {key: [] for key in keys}
|
|
for result in results:
|
|
for key in keys:
|
|
summary[key].append(result[key])
|
|
summary = {key: np.mean(value) for key, value in summary.items()}
|
|
print(f"Average results for {exp_name}:")
|
|
for key, value in summary.items():
|
|
print(f"{key}: {value:.4f}")
|
|
|
|
|
|
|