update clustering code to segment regions, still cluster until next release for backward compat

This commit is contained in:
Will Freeman
2024-12-23 19:30:14 -08:00
parent 6f50bd32b7
commit be88a7950c
7 changed files with 116 additions and 20 deletions

1
.gitignore vendored
View File

@@ -47,5 +47,6 @@ serverless/*/src/*
!serverless/*/src/alpr_counts.py
!serverless/*/src/alpr_clusters.py
!serverless/*/src/requirements.txt
!serverless/*/src/Dockerfile
# TODO: need a better way to handle python packages

View File

@@ -0,0 +1,12 @@
# Use the official AWS Lambda Python 3.9 base image
FROM amazon/aws-lambda-python:3.9
# Copy function code
COPY alpr_clusters.py ${LAMBDA_TASK_ROOT}
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Set the CMD to your handler
CMD ["alpr_clusters.lambda_handler"]

View File

@@ -3,10 +3,34 @@ from collections import defaultdict
from typing import Any
import boto3
import os
import time
import numpy as np
import requests
from sklearn.cluster import DBSCAN
from typing import Tuple
import re
from concurrent.futures import ThreadPoolExecutor
def terraform_rate_expression_to_minutes(rate_expression: str) -> Tuple[int, int]:
match = re.match(r"rate\((\d+)\s*(day|hour|minute)s?\)", rate_expression)
if not match:
raise ValueError(f"Invalid rate expression: {rate_expression}")
value, unit = int(match.group(1)), match.group(2)
if unit == "day":
return value * 24 * 60
elif unit == "hour":
return value * 60
elif unit == "minute":
return value
else:
raise ValueError(f"Unsupported time unit: {unit}")
UPDATE_RATE_MINS = terraform_rate_expression_to_minutes(os.getenv("UPDATE_RATE_MINS", "60"))
GRACE_PERIOD_MINS = int(2) # XXX: set expiration a few minutes after in case put object takes a while
TILE_SIZE_DEGREES = int(5)
def get_all_nodes():
# Set up the Overpass API query
@@ -67,28 +91,28 @@ def get_clusters(nodes: list[Any]):
return output
def segment_regions(nodes: Any, tile_size_degrees: int) -> list[dict[str, Any]]:
def segment_regions(nodes: Any, tile_size_degrees: int) -> dict[Any]:
print("Segmenting regions...")
tile_dict = defaultdict(list)
for node in nodes["elements"]:
for node in nodes:
lat, lon = node["lat"], node["lon"]
tile_lat = int(np.floor(lat / tile_size_degrees)) * tile_size_degrees
tile_lon = int(np.floor(lon / tile_size_degrees)) * tile_size_degrees
tile_dict[(tile_lat, tile_lon)].append(node)
tile_list = []
for region, nodes in tile_dict.items():
tile_list.append({"lat": region[0], "lon": region[1], "nodes": nodes})
return tile_list
tile_dict[f"{tile_lat},{tile_lon}"].append(node)
print("Region segmentation complete.")
return tile_dict
def lambda_handler(event, context):
nodes = get_all_nodes()
alpr_clusters = get_clusters(nodes)
regions = segment_regions(nodes=nodes, tile_size_degrees=5)
regions_dict = segment_regions(nodes=nodes, tile_size_degrees=TILE_SIZE_DEGREES)
print("Uploading data to S3...")
s3 = boto3.client("s3")
bucket = "deflock-clusters"
bucket_new = "cdn.deflock.me"
key = "alpr_clusters.json"
s3.put_object(
@@ -98,18 +122,46 @@ def lambda_handler(event, context):
ContentType="application/json",
)
for region in regions:
lat, lon = region["lat"], region["lon"]
# TODO: handle outdated index files when their referenced files are deleted
epoch = int(time.time())
tile_index = {
"expiration_utc": epoch + (UPDATE_RATE_MINS + GRACE_PERIOD_MINS) * 60,
"regions": list(regions_dict.keys()),
"tile_url": "https://cdn.deflock.me/regions/{lat}/{lon}.json?v=" + str(epoch),
"tile_size_degrees": TILE_SIZE_DEGREES,
}
print("Uploading regions to S3...")
def upload_json_to_s3(bucket, key, body):
s3.put_object(
Bucket=bucket,
Key=f"regions/{lat}/{lon}.json",
Body=json.dumps(region["nodes"]),
Key=key,
Body=body,
ContentType="application/json",
)
# Use ThreadPoolExecutor for concurrent uploads
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for latlng_string, elements in regions_dict.items():
lat, lon = latlng_string.split(",")
key = f"regions/{lat}/{lon}.json"
body = json.dumps(elements)
futures.append(executor.submit(upload_json_to_s3, bucket_new, key, body))
# add index file
futures.append(executor.submit(upload_json_to_s3, bucket_new, "regions/index.json", json.dumps(tile_index)))
# Wait for all futures to complete
for future in futures:
future.result()
print("Regions uploaded to S3. Done!")
return {
"statusCode": 200,
"body": "Successfully fetched ALPR counts.",
"body": "Successfully clustered.",
}
@@ -124,6 +176,8 @@ if __name__ == "__main__":
with nodes_file_path.open("w") as f:
json.dump(nodes, f)
regions = segment_regions(nodes=nodes, tile_size_degrees=5)
with open("regions.json", "w") as f:
json.dump(regions, f)
regions_dict = segment_regions(nodes=nodes, tile_size_degrees=5)
with open("regions_dict.json", "w") as f:
json.dump(regions_dict, f)

View File

@@ -13,5 +13,6 @@ module "alpr_clusters" {
module_name = "alpr_clusters"
source = "./modules/alpr_clusters"
deflock_stats_bucket = var.deflock_stats_bucket
deflock_cdn_bucket = var.deflock_cdn_bucket
rate = "rate(1 day)"
}

View File

@@ -15,6 +15,11 @@ resource "aws_iam_role" "lambda_role" {
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic_execution" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_policy" "lambda_s3_write_policy" {
name = "${var.module_name}_lambda_s3_write_policy"
description = "Policy for Lambda to write to S3 bucket ${var.deflock_stats_bucket}"
@@ -28,7 +33,10 @@ resource "aws_iam_policy" "lambda_s3_write_policy" {
"s3:PutObjectAcl"
]
Effect = "Allow"
Resource = "arn:aws:s3:::${var.deflock_stats_bucket}/${var.output_filename}"
Resource = [
"arn:aws:s3:::${var.deflock_cdn_bucket}/*",
"arn:aws:s3:::${var.deflock_stats_bucket}/*"
]
}
]
})
@@ -44,7 +52,13 @@ resource "aws_lambda_function" "overpass_lambda" {
role = aws_iam_role.lambda_role.arn
package_type = "Image"
image_uri = "${aws_ecr_repository.lambda_repository.repository_url}:latest"
timeout = 90
timeout = 180
memory_size = 1024
environment {
variables = {
UPDATE_RATE_MINS = var.rate
}
}
}
resource "aws_cloudwatch_event_rule" "lambda_rule" {
@@ -70,3 +84,8 @@ resource "aws_lambda_permission" "allow_cloudwatch_to_call_lambda" {
resource "aws_ecr_repository" "lambda_repository" {
name = "${var.module_name}-lambda"
}
resource "aws_cloudwatch_log_group" "lambda_log_group" {
name = "/aws/lambda/${aws_lambda_function.overpass_lambda.function_name}"
retention_in_days = 14
}

View File

@@ -11,6 +11,10 @@ variable "deflock_stats_bucket" {
description = "S3 bucket for the ALPR clusters JSON file"
}
variable "deflock_cdn_bucket" {
description = "S3 bucket for the CDN"
}
variable "rate" {
description = "Rate at which to run the Lambda function"
}

View File

@@ -2,3 +2,8 @@ variable "deflock_stats_bucket" {
description = "S3 bucket for the ALPR counts JSON file"
default = "deflock-clusters"
}
variable "deflock_cdn_bucket" {
description = "S3 bucket for the CDN"
default = "cdn.deflock.me"
}