fix: alpr_clusters.py type annotations (#57)

This commit is contained in:
Payton Ward
2025-11-21 10:09:33 -06:00
committed by GitHub
parent 3136040f3d
commit a2dc11c5be

View File

@@ -7,11 +7,10 @@ import os
import time import time
import numpy as np import numpy as np
import requests import requests
from typing import Tuple
import re import re
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
def terraform_rate_expression_to_minutes(rate_expression: str) -> Tuple[int, int]: def terraform_rate_expression_to_minutes(rate_expression: str) -> int:
match = re.match(r"rate\((\d+)\s*(day|hour|minute)s?\)", rate_expression) match = re.match(r"rate\((\d+)\s*(day|hour|minute)s?\)", rate_expression)
if not match: if not match:
raise ValueError(f"Invalid rate expression: {rate_expression}") raise ValueError(f"Invalid rate expression: {rate_expression}")
@@ -26,7 +25,7 @@ def terraform_rate_expression_to_minutes(rate_expression: str) -> Tuple[int, int
return value return value
else: else:
raise ValueError(f"Unsupported time unit: {unit}") raise ValueError(f"Unsupported time unit: {unit}")
UPDATE_RATE_MINS = terraform_rate_expression_to_minutes(os.getenv("UPDATE_RATE_MINS", "rate(60 minutes)")) UPDATE_RATE_MINS = terraform_rate_expression_to_minutes(os.getenv("UPDATE_RATE_MINS", "rate(60 minutes)"))
GRACE_PERIOD_MINS = int(2) # XXX: set expiration a few minutes after in case put object takes a while GRACE_PERIOD_MINS = int(2) # XXX: set expiration a few minutes after in case put object takes a while
TILE_SIZE_DEGREES = int(20) TILE_SIZE_DEGREES = int(20)
@@ -54,7 +53,7 @@ def get_all_nodes():
response.raise_for_status() response.raise_for_status()
return response.json()["elements"] return response.json()["elements"]
def segment_regions(nodes: Any, tile_size_degrees: int) -> dict[Any]: def segment_regions(nodes: Any, tile_size_degrees: int) -> dict[str, list[Any]]:
print("Segmenting regions...") print("Segmenting regions...")
tile_dict = defaultdict(list) tile_dict = defaultdict(list)
for node in nodes: for node in nodes:
@@ -108,7 +107,7 @@ def lambda_handler(event, context):
key = f"regions/{lat}/{lon}.json" key = f"regions/{lat}/{lon}.json"
body = json.dumps(elements) body = json.dumps(elements)
futures.append(executor.submit(upload_json_to_s3, bucket_new, key, body)) futures.append(executor.submit(upload_json_to_s3, bucket_new, key, body))
# add index file # add index file
futures.append(executor.submit(upload_json_to_s3, bucket_new, "regions/index.json", json.dumps(tile_index))) futures.append(executor.submit(upload_json_to_s3, bucket_new, "regions/index.json", json.dumps(tile_index)))