diff --git a/.gitignore b/.gitignore index 27a7411..0759349 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ serverless/*/src/* !serverless/*/src/alpr_counts.py !serverless/*/src/alpr_clusters.py !serverless/*/src/requirements.txt +!serverless/*/src/Dockerfile # TODO: need a better way to handle python packages diff --git a/serverless/alpr_clusters/src/Dockerfile b/serverless/alpr_clusters/src/Dockerfile new file mode 100644 index 0000000..d80bd11 --- /dev/null +++ b/serverless/alpr_clusters/src/Dockerfile @@ -0,0 +1,12 @@ +# Use the official AWS Lambda Python 3.9 base image +FROM amazon/aws-lambda-python:3.9 + +# Copy function code +COPY alpr_clusters.py ${LAMBDA_TASK_ROOT} + +# Install dependencies +COPY requirements.txt . +RUN pip install -r requirements.txt + +# Set the CMD to your handler +CMD ["alpr_clusters.lambda_handler"] diff --git a/serverless/alpr_clusters/src/alpr_clusters.py b/serverless/alpr_clusters/src/alpr_clusters.py index f00b092..56bee4f 100644 --- a/serverless/alpr_clusters/src/alpr_clusters.py +++ b/serverless/alpr_clusters/src/alpr_clusters.py @@ -1,10 +1,45 @@ -import boto3 -import requests import json -from sklearn.cluster import DBSCAN -import numpy as np +from collections import defaultdict +from typing import Any -def get_clusters(): +import boto3 +import os +import time +import numpy as np +import requests +from sklearn.cluster import DBSCAN +from typing import Tuple +import re +from concurrent.futures import ThreadPoolExecutor + +def terraform_rate_expression_to_minutes(rate_expression: str) -> Tuple[int, int]: + match = re.match(r"rate\((\d+)\s*(day|hour|minute)s?\)", rate_expression) + if not match: + raise ValueError(f"Invalid rate expression: {rate_expression}") + + value, unit = int(match.group(1)), match.group(2) + + if unit == "day": + return value * 24 * 60 + elif unit == "hour": + return value * 60 + elif unit == "minute": + return value + else: + raise ValueError(f"Unsupported time unit: {unit}") + +UPDATE_RATE_MINS = terraform_rate_expression_to_minutes(os.getenv("UPDATE_RATE_MINS", "rate(60 minutes)")) +GRACE_PERIOD_MINS = int(2) # XXX: set expiration a few minutes after in case put object takes a while +TILE_SIZE_DEGREES = int(20) + +WHITELISTED_TAGS = [ + "operator", + "manufacturer", + "direction", + "brand", +] + +def get_all_nodes(): # Set up the Overpass API query query = """ [out:json]; @@ -12,31 +47,40 @@ def get_clusters(): out body; """ + url = "http://overpass-api.de/api/interpreter" + response = requests.get( + url, params={"data": query}, headers={"User-Agent": "DeFlock/1.0"} + ) + response.raise_for_status() + return response.json()["elements"] + + +def get_clusters(nodes: list[Any]): # Request data from Overpass API print("Requesting data from Overpass API.") - url = "http://overpass-api.de/api/interpreter" - response = requests.get(url, params={'data': query}, headers={'User-Agent': 'DeFlock/1.0'}) - data = response.json() + print("Data received. Parsing nodes...") # Parse nodes and extract lat/lon for clustering coordinates = [] node_ids = [] - for element in data['elements']: - if element['type'] == 'node': - coordinates.append([element['lat'], element['lon']]) - node_ids.append(element['id']) + for element in nodes: + if element["type"] == "node": + coordinates.append([element["lat"], element["lon"]]) + node_ids.append(element["id"]) # Convert coordinates to NumPy array for DBSCAN coordinates = np.array(coordinates) - # Define the clustering radius (10 miles in meters) + # Define the clustering radius (50 miles in meters) radius_miles = 50 radius_km = radius_miles * 1.60934 # 1 mile = 1.60934 km radius_in_radians = radius_km / 6371.0 # Earth's radius in km # Perform DBSCAN clustering - db = DBSCAN(eps=radius_in_radians, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coordinates)) + db = DBSCAN( + eps=radius_in_radians, min_samples=1, algorithm="ball_tree", metric="haversine" + ).fit(np.radians(coordinates)) labels = db.labels_ # Prepare clusters and calculate centroids @@ -45,34 +89,108 @@ def get_clusters(): cluster_points = coordinates[labels == label] centroid = np.mean(cluster_points, axis=0) first_node_id = node_ids[labels.tolist().index(label)] - + # Store in clusters dict with centroid and first node ID - clusters[label] = { - "lat": centroid[0], - "lon": centroid[1], - "id": first_node_id - } + clusters[label] = {"lat": centroid[0], "lon": centroid[1], "id": first_node_id} output = {"clusters": list(clusters.values())} print("Clustering complete.") return output +def segment_regions(nodes: Any, tile_size_degrees: int) -> dict[Any]: + print("Segmenting regions...") + tile_dict = defaultdict(list) + for node in nodes: + lat, lon = node["lat"], node["lon"] + tile_lat = int(np.floor(lat / tile_size_degrees)) * tile_size_degrees + tile_lon = int(np.floor(lon / tile_size_degrees)) * tile_size_degrees + bare_node = { + "id": node["id"], + "lat": lat, + "lon": lon, + "tags": {k: v for k, v in node["tags"].items() if k in WHITELISTED_TAGS}, + } + tile_dict[f"{tile_lat}/{tile_lon}"].append(bare_node) + print("Region segmentation complete.") + + return tile_dict + def lambda_handler(event, context): - alpr_clusters = get_clusters() + nodes = get_all_nodes() + alpr_clusters = get_clusters(nodes) + regions_dict = segment_regions(nodes=nodes, tile_size_degrees=TILE_SIZE_DEGREES) - s3 = boto3.client('s3') - bucket = 'deflock-clusters' - key = 'alpr_clusters.json' + print("Uploading data to S3...") - s3.put_object( - Bucket=bucket, - Key=key, - Body=json.dumps(alpr_clusters), - ContentType='application/json' - ) + s3 = boto3.client("s3") + bucket = "deflock-clusters" + bucket_new = "cdn.deflock.me" + key = "alpr_clusters.json" - return { - 'statusCode': 200, - 'body': 'Successfully fetched ALPR counts.', - } + s3.put_object( + Bucket=bucket, + Key=key, + Body=json.dumps(alpr_clusters), + ContentType="application/json", + ) + + # TODO: handle outdated index files when their referenced files are deleted + epoch = int(time.time()) + tile_index = { + "expiration_utc": epoch + (UPDATE_RATE_MINS + GRACE_PERIOD_MINS) * 60, + "regions": list(regions_dict.keys()), + "tile_url": "https://cdn.deflock.me/regions/{lat}/{lon}.json?v=" + str(epoch), + "tile_size_degrees": TILE_SIZE_DEGREES, + } + + print("Uploading regions to S3...") + + def upload_json_to_s3(bucket, key, body): + s3.put_object( + Bucket=bucket, + Key=key, + Body=body, + ContentType="application/json", + ) + + # Use ThreadPoolExecutor for concurrent uploads + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [] + for latlng_string, elements in regions_dict.items(): + lat, lon = latlng_string.split("/") + key = f"regions/{lat}/{lon}.json" + body = json.dumps(elements) + futures.append(executor.submit(upload_json_to_s3, bucket_new, key, body)) + + # add index file + futures.append(executor.submit(upload_json_to_s3, bucket_new, "regions/index.json", json.dumps(tile_index))) + + # Wait for all futures to complete + for future in futures: + future.result() + + print("Regions uploaded to S3. Done!") + + return { + "statusCode": 200, + "body": "Successfully clustered.", + } + + +if __name__ == "__main__": + from pathlib import Path + + nodes_file_path = Path("nodes.json") + if nodes_file_path.exists(): + nodes = json.load(nodes_file_path.open()) + else: + nodes = get_all_nodes() + with nodes_file_path.open("w") as f: + json.dump(nodes, f) + + + + regions_dict = segment_regions(nodes=nodes, tile_size_degrees=5) + with open("regions_dict.json", "w") as f: + json.dump(regions_dict, f) diff --git a/terraform/main.tf b/terraform/main.tf index e9b2896..b219fe1 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -13,5 +13,6 @@ module "alpr_clusters" { module_name = "alpr_clusters" source = "./modules/alpr_clusters" deflock_stats_bucket = var.deflock_stats_bucket + deflock_cdn_bucket = var.deflock_cdn_bucket rate = "rate(1 day)" } diff --git a/terraform/modules/alpr_clusters/main.tf b/terraform/modules/alpr_clusters/main.tf index 2488460..fc3e989 100644 --- a/terraform/modules/alpr_clusters/main.tf +++ b/terraform/modules/alpr_clusters/main.tf @@ -15,6 +15,11 @@ resource "aws_iam_role" "lambda_role" { }) } +resource "aws_iam_role_policy_attachment" "lambda_basic_execution" { + role = aws_iam_role.lambda_role.name + policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" +} + resource "aws_iam_policy" "lambda_s3_write_policy" { name = "${var.module_name}_lambda_s3_write_policy" description = "Policy for Lambda to write to S3 bucket ${var.deflock_stats_bucket}" @@ -28,7 +33,10 @@ resource "aws_iam_policy" "lambda_s3_write_policy" { "s3:PutObjectAcl" ] Effect = "Allow" - Resource = "arn:aws:s3:::${var.deflock_stats_bucket}/${var.output_filename}" + Resource = [ + "arn:aws:s3:::${var.deflock_cdn_bucket}/*", + "arn:aws:s3:::${var.deflock_stats_bucket}/*" + ] } ] }) @@ -44,7 +52,13 @@ resource "aws_lambda_function" "overpass_lambda" { role = aws_iam_role.lambda_role.arn package_type = "Image" image_uri = "${aws_ecr_repository.lambda_repository.repository_url}:latest" - timeout = 90 + timeout = 180 + memory_size = 512 + environment { + variables = { + UPDATE_RATE_MINS = var.rate + } + } } resource "aws_cloudwatch_event_rule" "lambda_rule" { @@ -70,3 +84,8 @@ resource "aws_lambda_permission" "allow_cloudwatch_to_call_lambda" { resource "aws_ecr_repository" "lambda_repository" { name = "${var.module_name}-lambda" } + +resource "aws_cloudwatch_log_group" "lambda_log_group" { + name = "/aws/lambda/${aws_lambda_function.overpass_lambda.function_name}" + retention_in_days = 14 +} diff --git a/terraform/modules/alpr_clusters/variables.tf b/terraform/modules/alpr_clusters/variables.tf index 98d92fa..ab6b9b1 100644 --- a/terraform/modules/alpr_clusters/variables.tf +++ b/terraform/modules/alpr_clusters/variables.tf @@ -11,6 +11,10 @@ variable "deflock_stats_bucket" { description = "S3 bucket for the ALPR clusters JSON file" } +variable "deflock_cdn_bucket" { + description = "S3 bucket for the CDN" +} + variable "rate" { description = "Rate at which to run the Lambda function" } diff --git a/terraform/variables.tf b/terraform/variables.tf index b0f2d59..28ab153 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -2,3 +2,8 @@ variable "deflock_stats_bucket" { description = "S3 bucket for the ALPR counts JSON file" default = "deflock-clusters" } + +variable "deflock_cdn_bucket" { + description = "S3 bucket for the CDN" + default = "cdn.deflock.me" +}