Merge branch 'tiled-data-caching'

This commit is contained in:
Will Freeman
2024-12-27 20:24:24 -07:00
7 changed files with 196 additions and 36 deletions

1
.gitignore vendored
View File

@@ -47,6 +47,7 @@ serverless/*/src/*
!serverless/*/src/alpr_counts.py
!serverless/*/src/alpr_clusters.py
!serverless/*/src/requirements.txt
!serverless/*/src/Dockerfile
# TODO: need a better way to handle python packages

View File

@@ -0,0 +1,12 @@
# Use the official AWS Lambda Python 3.9 base image
FROM amazon/aws-lambda-python:3.9
# Copy function code
COPY alpr_clusters.py ${LAMBDA_TASK_ROOT}
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Set the CMD to your handler
CMD ["alpr_clusters.lambda_handler"]

View File

@@ -1,10 +1,45 @@
import boto3
import requests
import json
from sklearn.cluster import DBSCAN
import numpy as np
from collections import defaultdict
from typing import Any
def get_clusters():
import boto3
import os
import time
import numpy as np
import requests
from sklearn.cluster import DBSCAN
from typing import Tuple
import re
from concurrent.futures import ThreadPoolExecutor
def terraform_rate_expression_to_minutes(rate_expression: str) -> Tuple[int, int]:
match = re.match(r"rate\((\d+)\s*(day|hour|minute)s?\)", rate_expression)
if not match:
raise ValueError(f"Invalid rate expression: {rate_expression}")
value, unit = int(match.group(1)), match.group(2)
if unit == "day":
return value * 24 * 60
elif unit == "hour":
return value * 60
elif unit == "minute":
return value
else:
raise ValueError(f"Unsupported time unit: {unit}")
UPDATE_RATE_MINS = terraform_rate_expression_to_minutes(os.getenv("UPDATE_RATE_MINS", "rate(60 minutes)"))
GRACE_PERIOD_MINS = int(2) # XXX: set expiration a few minutes after in case put object takes a while
TILE_SIZE_DEGREES = int(20)
WHITELISTED_TAGS = [
"operator",
"manufacturer",
"direction",
"brand",
]
def get_all_nodes():
# Set up the Overpass API query
query = """
[out:json];
@@ -12,31 +47,40 @@ def get_clusters():
out body;
"""
url = "http://overpass-api.de/api/interpreter"
response = requests.get(
url, params={"data": query}, headers={"User-Agent": "DeFlock/1.0"}
)
response.raise_for_status()
return response.json()["elements"]
def get_clusters(nodes: list[Any]):
# Request data from Overpass API
print("Requesting data from Overpass API.")
url = "http://overpass-api.de/api/interpreter"
response = requests.get(url, params={'data': query}, headers={'User-Agent': 'DeFlock/1.0'})
data = response.json()
print("Data received. Parsing nodes...")
# Parse nodes and extract lat/lon for clustering
coordinates = []
node_ids = []
for element in data['elements']:
if element['type'] == 'node':
coordinates.append([element['lat'], element['lon']])
node_ids.append(element['id'])
for element in nodes:
if element["type"] == "node":
coordinates.append([element["lat"], element["lon"]])
node_ids.append(element["id"])
# Convert coordinates to NumPy array for DBSCAN
coordinates = np.array(coordinates)
# Define the clustering radius (10 miles in meters)
# Define the clustering radius (50 miles in meters)
radius_miles = 50
radius_km = radius_miles * 1.60934 # 1 mile = 1.60934 km
radius_in_radians = radius_km / 6371.0 # Earth's radius in km
# Perform DBSCAN clustering
db = DBSCAN(eps=radius_in_radians, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coordinates))
db = DBSCAN(
eps=radius_in_radians, min_samples=1, algorithm="ball_tree", metric="haversine"
).fit(np.radians(coordinates))
labels = db.labels_
# Prepare clusters and calculate centroids
@@ -45,34 +89,108 @@ def get_clusters():
cluster_points = coordinates[labels == label]
centroid = np.mean(cluster_points, axis=0)
first_node_id = node_ids[labels.tolist().index(label)]
# Store in clusters dict with centroid and first node ID
clusters[label] = {
"lat": centroid[0],
"lon": centroid[1],
"id": first_node_id
}
clusters[label] = {"lat": centroid[0], "lon": centroid[1], "id": first_node_id}
output = {"clusters": list(clusters.values())}
print("Clustering complete.")
return output
def segment_regions(nodes: Any, tile_size_degrees: int) -> dict[Any]:
print("Segmenting regions...")
tile_dict = defaultdict(list)
for node in nodes:
lat, lon = node["lat"], node["lon"]
tile_lat = int(np.floor(lat / tile_size_degrees)) * tile_size_degrees
tile_lon = int(np.floor(lon / tile_size_degrees)) * tile_size_degrees
bare_node = {
"id": node["id"],
"lat": lat,
"lon": lon,
"tags": {k: v for k, v in node["tags"].items() if k in WHITELISTED_TAGS},
}
tile_dict[f"{tile_lat}/{tile_lon}"].append(bare_node)
print("Region segmentation complete.")
return tile_dict
def lambda_handler(event, context):
alpr_clusters = get_clusters()
nodes = get_all_nodes()
alpr_clusters = get_clusters(nodes)
regions_dict = segment_regions(nodes=nodes, tile_size_degrees=TILE_SIZE_DEGREES)
s3 = boto3.client('s3')
bucket = 'deflock-clusters'
key = 'alpr_clusters.json'
print("Uploading data to S3...")
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(alpr_clusters),
ContentType='application/json'
)
s3 = boto3.client("s3")
bucket = "deflock-clusters"
bucket_new = "cdn.deflock.me"
key = "alpr_clusters.json"
return {
'statusCode': 200,
'body': 'Successfully fetched ALPR counts.',
}
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(alpr_clusters),
ContentType="application/json",
)
# TODO: handle outdated index files when their referenced files are deleted
epoch = int(time.time())
tile_index = {
"expiration_utc": epoch + (UPDATE_RATE_MINS + GRACE_PERIOD_MINS) * 60,
"regions": list(regions_dict.keys()),
"tile_url": "https://cdn.deflock.me/regions/{lat}/{lon}.json?v=" + str(epoch),
"tile_size_degrees": TILE_SIZE_DEGREES,
}
print("Uploading regions to S3...")
def upload_json_to_s3(bucket, key, body):
s3.put_object(
Bucket=bucket,
Key=key,
Body=body,
ContentType="application/json",
)
# Use ThreadPoolExecutor for concurrent uploads
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for latlng_string, elements in regions_dict.items():
lat, lon = latlng_string.split("/")
key = f"regions/{lat}/{lon}.json"
body = json.dumps(elements)
futures.append(executor.submit(upload_json_to_s3, bucket_new, key, body))
# add index file
futures.append(executor.submit(upload_json_to_s3, bucket_new, "regions/index.json", json.dumps(tile_index)))
# Wait for all futures to complete
for future in futures:
future.result()
print("Regions uploaded to S3. Done!")
return {
"statusCode": 200,
"body": "Successfully clustered.",
}
if __name__ == "__main__":
from pathlib import Path
nodes_file_path = Path("nodes.json")
if nodes_file_path.exists():
nodes = json.load(nodes_file_path.open())
else:
nodes = get_all_nodes()
with nodes_file_path.open("w") as f:
json.dump(nodes, f)
regions_dict = segment_regions(nodes=nodes, tile_size_degrees=5)
with open("regions_dict.json", "w") as f:
json.dump(regions_dict, f)

View File

@@ -13,5 +13,6 @@ module "alpr_clusters" {
module_name = "alpr_clusters"
source = "./modules/alpr_clusters"
deflock_stats_bucket = var.deflock_stats_bucket
deflock_cdn_bucket = var.deflock_cdn_bucket
rate = "rate(1 day)"
}

View File

@@ -15,6 +15,11 @@ resource "aws_iam_role" "lambda_role" {
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic_execution" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_policy" "lambda_s3_write_policy" {
name = "${var.module_name}_lambda_s3_write_policy"
description = "Policy for Lambda to write to S3 bucket ${var.deflock_stats_bucket}"
@@ -28,7 +33,10 @@ resource "aws_iam_policy" "lambda_s3_write_policy" {
"s3:PutObjectAcl"
]
Effect = "Allow"
Resource = "arn:aws:s3:::${var.deflock_stats_bucket}/${var.output_filename}"
Resource = [
"arn:aws:s3:::${var.deflock_cdn_bucket}/*",
"arn:aws:s3:::${var.deflock_stats_bucket}/*"
]
}
]
})
@@ -44,7 +52,13 @@ resource "aws_lambda_function" "overpass_lambda" {
role = aws_iam_role.lambda_role.arn
package_type = "Image"
image_uri = "${aws_ecr_repository.lambda_repository.repository_url}:latest"
timeout = 90
timeout = 180
memory_size = 512
environment {
variables = {
UPDATE_RATE_MINS = var.rate
}
}
}
resource "aws_cloudwatch_event_rule" "lambda_rule" {
@@ -70,3 +84,8 @@ resource "aws_lambda_permission" "allow_cloudwatch_to_call_lambda" {
resource "aws_ecr_repository" "lambda_repository" {
name = "${var.module_name}-lambda"
}
resource "aws_cloudwatch_log_group" "lambda_log_group" {
name = "/aws/lambda/${aws_lambda_function.overpass_lambda.function_name}"
retention_in_days = 14
}

View File

@@ -11,6 +11,10 @@ variable "deflock_stats_bucket" {
description = "S3 bucket for the ALPR clusters JSON file"
}
variable "deflock_cdn_bucket" {
description = "S3 bucket for the CDN"
}
variable "rate" {
description = "Rate at which to run the Lambda function"
}

View File

@@ -2,3 +2,8 @@ variable "deflock_stats_bucket" {
description = "S3 bucket for the ALPR counts JSON file"
default = "deflock-clusters"
}
variable "deflock_cdn_bucket" {
description = "S3 bucket for the CDN"
default = "cdn.deflock.me"
}