mirror of
https://github.com/leigest519/ScreenCoder.git
synced 2026-02-13 02:02:48 +00:00
Initial commit
This commit is contained in:
306
utils.py
Normal file
306
utils.py
Normal file
@@ -0,0 +1,306 @@
|
||||
import os
|
||||
import time
|
||||
from openai import OpenAI
|
||||
from volcenginesdkarkruntime import Ark
|
||||
import base64
|
||||
import io
|
||||
from PIL import Image, ImageDraw
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def encode_image(image):
|
||||
if type(image) == str:
|
||||
try:
|
||||
with open(image, "rb") as image_file:
|
||||
encoding = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
except Exception as e:
|
||||
print(e)
|
||||
with open(image, "r", encoding="utf-8") as image_file:
|
||||
encoding = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
return encoding
|
||||
|
||||
else:
|
||||
buffered = io.BytesIO()
|
||||
image.save(buffered, format="PNG")
|
||||
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
||||
|
||||
def image_mask(image_path: str, bbox_normalized: tuple[int, int, int, int]) -> Image.Image:
|
||||
"""Creates a mask on the image in the specified normalized bounding box."""
|
||||
image = Image.open(image_path)
|
||||
masked_image = image.copy()
|
||||
|
||||
w, h = image.size
|
||||
|
||||
# Convert normalized coordinates to pixel coordinates for drawing
|
||||
bbox_pixels = (
|
||||
int(bbox_normalized[0] * w / 1000),
|
||||
int(bbox_normalized[1] * h / 1000),
|
||||
int(bbox_normalized[2] * w / 1000),
|
||||
int(bbox_normalized[3] * h / 1000)
|
||||
)
|
||||
|
||||
draw = ImageDraw.Draw(masked_image)
|
||||
draw.rectangle(bbox_pixels, fill=(255, 255, 255)) # Pure white
|
||||
|
||||
return masked_image
|
||||
|
||||
def projection_analysis(image_path: str, bbox_normalized: tuple[int, int, int, int]) -> dict:
|
||||
"""
|
||||
Performs projection analysis on a specified normalized bounding box area.
|
||||
All returned coordinates are also normalized.
|
||||
"""
|
||||
image = cv2.imread(image_path)
|
||||
if image is None:
|
||||
print(f"Error: Failed to read image {image_path}")
|
||||
return {}
|
||||
|
||||
h, w = image.shape[:2]
|
||||
|
||||
# Convert normalized bbox to pixel coordinates for cropping
|
||||
bbox_pixels = (
|
||||
int(bbox_normalized[0] * w / 1000),
|
||||
int(bbox_normalized[1] * h / 1000),
|
||||
int(bbox_normalized[2] * w / 1000),
|
||||
int(bbox_normalized[3] * h / 1000)
|
||||
)
|
||||
|
||||
x1, y1, x2, y2 = bbox_pixels
|
||||
roi = image[y1:y2, x1:x2]
|
||||
|
||||
if roi.size == 0:
|
||||
print(f"Error: Invalid bbox region {bbox_pixels}")
|
||||
return {}
|
||||
|
||||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||||
|
||||
# Perform projection analysis (this part operates on pixels within the ROI)
|
||||
horizontal_projection = np.sum(binary, axis=1)
|
||||
vertical_projection = np.sum(binary, axis=0)
|
||||
|
||||
# Find groups and convert their coordinates back to normalized space
|
||||
horizontal_groups = _find_groups_and_normalize(horizontal_projection, 'horizontal', bbox_normalized, w, h)
|
||||
vertical_groups = _find_groups_and_normalize(vertical_projection, 'vertical', bbox_normalized, w, h)
|
||||
|
||||
return {
|
||||
'horizontal_groups': horizontal_groups,
|
||||
'vertical_groups': vertical_groups,
|
||||
'bbox_normalized': bbox_normalized,
|
||||
}
|
||||
|
||||
def _find_groups_and_normalize(projection: np.ndarray, direction: str,
|
||||
bbox_normalized: tuple[int, int, int, int],
|
||||
image_width: int, image_height: int,
|
||||
min_group_size_px: int = 5, threshold_ratio: float = 0.1) -> list:
|
||||
"""
|
||||
Finds contiguous groups from projection data and returns them in normalized coordinates.
|
||||
"""
|
||||
threshold = np.max(projection) * threshold_ratio
|
||||
non_zero_indices = np.where(projection > threshold)[0]
|
||||
|
||||
if len(non_zero_indices) == 0:
|
||||
return []
|
||||
|
||||
groups_px = []
|
||||
start_px = non_zero_indices[0]
|
||||
for i in range(1, len(non_zero_indices)):
|
||||
if non_zero_indices[i] > non_zero_indices[i-1] + 1:
|
||||
if non_zero_indices[i-1] - start_px >= min_group_size_px:
|
||||
groups_px.append((start_px, non_zero_indices[i-1]))
|
||||
start_px = non_zero_indices[i]
|
||||
if non_zero_indices[-1] - start_px >= min_group_size_px:
|
||||
groups_px.append((start_px, non_zero_indices[-1]))
|
||||
|
||||
# Convert pixel groups (relative to ROI) to normalized coordinates (relative to full image)
|
||||
norm_groups = []
|
||||
roi_x1_norm, roi_y1_norm, roi_x2_norm, roi_y2_norm = bbox_normalized
|
||||
roi_w_norm = roi_x2_norm - roi_x1_norm
|
||||
roi_h_norm = roi_y2_norm - roi_y1_norm
|
||||
|
||||
roi_w_px = int(roi_w_norm * image_width / 1000)
|
||||
roi_h_px = int(roi_h_norm * image_height / 1000)
|
||||
|
||||
for start_px, end_px in groups_px:
|
||||
if direction == 'horizontal':
|
||||
start_norm = roi_y1_norm + int(start_px * roi_h_norm / roi_h_px)
|
||||
end_norm = roi_y1_norm + int(end_px * roi_h_norm / roi_h_px)
|
||||
norm_groups.append((roi_x1_norm, roi_x2_norm, start_norm, end_norm))
|
||||
else: # vertical
|
||||
start_norm = roi_x1_norm + int(start_px * roi_w_norm / roi_w_px)
|
||||
end_norm = roi_x1_norm + int(end_px * roi_w_norm / roi_w_px)
|
||||
norm_groups.append((start_norm, end_norm, roi_y1_norm, roi_y2_norm))
|
||||
|
||||
return norm_groups
|
||||
|
||||
def visualize_projection_analysis(image_path: str, analysis_result: dict,
|
||||
save_path: str = None) -> str:
|
||||
"""
|
||||
Visualizes the results of a completed projection analysis.
|
||||
This function takes the analysis result dictionary and draws it on the image.
|
||||
"""
|
||||
if not analysis_result:
|
||||
print("Error: Analysis result is empty.")
|
||||
return ""
|
||||
|
||||
image = cv2.imread(image_path)
|
||||
if image is None:
|
||||
print(f"Error: Failed to read image for visualization: {image_path}")
|
||||
return ""
|
||||
|
||||
h, w = image.shape[:2]
|
||||
vis_image = image.copy()
|
||||
|
||||
bbox_normalized = analysis_result.get('bbox_normalized')
|
||||
if not bbox_normalized:
|
||||
print("Error: 'bbox_normalized' not found in analysis result.")
|
||||
return ""
|
||||
|
||||
# Convert normalized bbox to pixel coordinates for drawing the main ROI
|
||||
x1, y1, x2, y2 = (
|
||||
int(bbox_normalized[0] * w / 1000),
|
||||
int(bbox_normalized[1] * h / 1000),
|
||||
int(bbox_normalized[2] * w / 1000),
|
||||
int(bbox_normalized[3] * h / 1000)
|
||||
)
|
||||
cv2.rectangle(vis_image, (x1, y1), (x2, y2), (0, 255, 0), 2) # Green for main ROI
|
||||
|
||||
# Draw horizontal groups (Blue)
|
||||
for i, group_norm in enumerate(analysis_result.get('horizontal_groups', [])):
|
||||
g_x1, g_y1, g_x2, g_y2 = (
|
||||
int(group_norm[0] * w / 1000),
|
||||
int(group_norm[1] * h / 1000),
|
||||
int(group_norm[2] * w / 1000),
|
||||
int(group_norm[3] * h / 1000)
|
||||
)
|
||||
cv2.rectangle(vis_image, (g_x1, g_y1), (g_x2, g_y2), (255, 0, 0), 1)
|
||||
cv2.putText(vis_image, f'H{i}', (g_x1, g_y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
|
||||
|
||||
# Draw vertical groups (Red)
|
||||
for i, group_norm in enumerate(analysis_result.get('vertical_groups', [])):
|
||||
g_x1, g_y1, g_x2, g_y2 = (
|
||||
int(group_norm[0] * w / 1000),
|
||||
int(group_norm[1] * h / 1000),
|
||||
int(group_norm[2] * w / 1000),
|
||||
int(group_norm[3] * h / 1000)
|
||||
)
|
||||
cv2.rectangle(vis_image, (g_x1, g_y1), (g_x2, g_y2), (0, 0, 255), 1)
|
||||
cv2.putText(vis_image, f'V{i}', (g_x1, g_y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
|
||||
|
||||
if save_path is None:
|
||||
base_name = os.path.splitext(os.path.basename(image_path))[0]
|
||||
save_path = f"data/{base_name}_projection_analysis.png"
|
||||
|
||||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
||||
|
||||
if cv2.imwrite(save_path, vis_image):
|
||||
print(f"Projection analysis visualization saved to: {save_path}")
|
||||
return save_path
|
||||
else:
|
||||
print("Error: Failed to save visualization")
|
||||
return ""
|
||||
|
||||
|
||||
|
||||
class Bot:
|
||||
def __init__(self, key_path, patience=3) -> None:
|
||||
if os.path.exists(key_path):
|
||||
with open(key_path, "r") as f:
|
||||
self.key = f.read().replace("\n", "")
|
||||
else:
|
||||
self.key = key_path
|
||||
self.patience = patience
|
||||
|
||||
def ask(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def try_ask(self, question, image_encoding=None, verbose=False):
|
||||
for i in range(self.patience):
|
||||
try:
|
||||
return self.ask(question, image_encoding, verbose)
|
||||
except Exception as e:
|
||||
print(e, "waiting for 5 seconds")
|
||||
time.sleep(5)
|
||||
return None
|
||||
|
||||
class Doubao(Bot):
|
||||
def __init__(self, key_path, patience=3, model="doubao-1.5-thinking-vision-pro-250428") -> None:
|
||||
super().__init__(key_path, patience)
|
||||
self.client = Ark(api_key=self.key)
|
||||
self.model = model
|
||||
|
||||
def ask(self, question, image_encoding=None, verbose=False):
|
||||
|
||||
if image_encoding:
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": question},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{image_encoding}",
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
else:
|
||||
messages = [{"role": "user", "content": question}]
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=messages,
|
||||
max_tokens=4096,
|
||||
temperature=0,
|
||||
)
|
||||
response = response.choices[0].message.content
|
||||
if verbose:
|
||||
print("####################################")
|
||||
print("question:\n", question)
|
||||
print("####################################")
|
||||
print("response:\n", response)
|
||||
# print("seed used: 42")
|
||||
# img = base64.b64decode(image_encoding)
|
||||
# img = Image.open(io.BytesIO(img))
|
||||
# img.show()
|
||||
return response
|
||||
|
||||
class Qwen_2_5_VL(Bot):
|
||||
def __init__(self, key_path, patience=3, model="qwen2.5-vl-32b-instruct") -> None:
|
||||
super().__init__(key_path, patience)
|
||||
self.client = OpenAI(api_key=self.key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
|
||||
self.name = model
|
||||
|
||||
def ask(self, question, image_encoding=None, verbose=False):
|
||||
if image_encoding:
|
||||
content = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": question},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{image_encoding}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
else:
|
||||
content = {"role": "user", "content": question}
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.name,
|
||||
messages=[content],
|
||||
max_tokens=4096,
|
||||
temperature=0,
|
||||
seed=42,
|
||||
)
|
||||
response = response.choices[0].message.content
|
||||
if verbose:
|
||||
print("####################################")
|
||||
print("question:\n", question)
|
||||
print("####################################")
|
||||
print("response:\n", response)
|
||||
print("seed used: 42")
|
||||
return response
|
||||
Reference in New Issue
Block a user