fix: restore CCTV layer ingestion and map rendering

This commit is contained in:
Wa1iD
2026-03-20 22:05:05 +01:00
parent f0b6f9a8d1
commit 231f0afc4e
4 changed files with 340 additions and 145 deletions
+163 -120
View File
@@ -1,69 +1,89 @@
import sqlite3
import requests
from services.network_utils import fetch_with_curl
import logging
import sqlite3
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from pathlib import Path
from typing import Any, Dict, List
from services.network_utils import fetch_with_curl
logger = logging.getLogger(__name__)
DB_PATH = "cctv.db"
DB_PATH = Path(__file__).resolve().parent.parent / "data" / "cctv.db"
def _connect() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
return sqlite3.connect(str(DB_PATH))
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS cameras (
id TEXT PRIMARY KEY,
source_agency TEXT,
lat REAL,
lon REAL,
direction_facing TEXT,
media_url TEXT,
refresh_rate_seconds INTEGER,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
conn = _connect()
try:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS cameras (
id TEXT PRIMARY KEY,
source_agency TEXT,
lat REAL,
lon REAL,
direction_facing TEXT,
media_url TEXT,
refresh_rate_seconds INTEGER,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
finally:
conn.close()
class BaseCCTVIngestor(ABC):
def __init__(self):
self.conn = sqlite3.connect(DB_PATH)
@abstractmethod
def fetch_data(self) -> List[Dict[str, Any]]:
pass
def ingest(self):
conn = None
try:
init_db()
cameras = self.fetch_data()
cursor = self.conn.cursor()
conn = _connect()
cursor = conn.cursor()
for cam in cameras:
cursor.execute("""
cursor.execute(
"""
INSERT INTO cameras
(id, source_agency, lat, lon, direction_facing, media_url, refresh_rate_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
media_url=excluded.media_url,
last_updated=CURRENT_TIMESTAMP
""", (
cam.get("id"),
cam.get("source_agency"),
cam.get("lat"),
cam.get("lon"),
cam.get("direction_facing", "Unknown"),
cam.get("media_url"),
cam.get("refresh_rate_seconds", 60)
))
self.conn.commit()
logger.info(f"Successfully ingested {len(cameras)} cameras from {self.__class__.__name__}")
""",
(
cam.get("id"),
cam.get("source_agency"),
cam.get("lat"),
cam.get("lon"),
cam.get("direction_facing", "Unknown"),
cam.get("media_url"),
cam.get("refresh_rate_seconds", 60),
),
)
conn.commit()
logger.info(
f"Successfully ingested {len(cameras)} cameras from {self.__class__.__name__}"
)
except Exception as e:
try:
self.conn.rollback()
if conn is not None:
conn.rollback()
except Exception:
pass
logger.error(f"Failed to ingest cameras in {self.__class__.__name__}: {e}")
finally:
if conn is not None:
conn.close()
class TFLJamCamIngestor(BaseCCTVIngestor):
def fetch_data(self) -> List[Dict[str, Any]]:
@@ -71,114 +91,128 @@ class TFLJamCamIngestor(BaseCCTVIngestor):
url = "https://api.tfl.gov.uk/Place/Type/JamCam"
response = fetch_with_curl(url, timeout=15)
response.raise_for_status()
data = response.json()
cameras = []
for item in data:
# TfL returns URLs without protocols sometimes or with a base path
vid_url = None
img_url = None
for prop in item.get('additionalProperties', []):
if prop.get('key') == 'videoUrl':
vid_url = prop.get('value')
elif prop.get('key') == 'imageUrl':
img_url = prop.get('value')
for prop in item.get("additionalProperties", []):
if prop.get("key") == "videoUrl":
vid_url = prop.get("value")
elif prop.get("key") == "imageUrl":
img_url = prop.get("value")
media = vid_url if vid_url else img_url
if media:
cameras.append({
"id": f"TFL-{item.get('id')}",
"source_agency": "TfL",
"lat": item.get('lat'),
"lon": item.get('lon'),
"direction_facing": item.get('commonName', 'Unknown'),
"media_url": media,
"refresh_rate_seconds": 15
})
cameras.append(
{
"id": f"TFL-{item.get('id')}",
"source_agency": "TfL",
"lat": item.get("lat"),
"lon": item.get("lon"),
"direction_facing": item.get("commonName", "Unknown"),
"media_url": media,
"refresh_rate_seconds": 15,
}
)
return cameras
class LTASingaporeIngestor(BaseCCTVIngestor):
def fetch_data(self) -> List[Dict[str, Any]]:
# Singapore Land Transport Authority (LTA) Traffic Images API
url = "https://api.data.gov.sg/v1/transport/traffic-images"
response = fetch_with_curl(url, timeout=15)
response.raise_for_status()
data = response.json()
cameras = []
if "items" in data and len(data["items"]) > 0:
for item in data["items"][0].get("cameras", []):
loc = item.get("location", {})
if "latitude" in loc and "longitude" in loc and "image" in item:
cameras.append({
"id": f"SGP-{item.get('camera_id', 'UNK')}",
"source_agency": "Singapore LTA",
"lat": loc.get("latitude"),
"lon": loc.get("longitude"),
"direction_facing": f"Camera {item.get('camera_id')}",
"media_url": item.get("image"),
"refresh_rate_seconds": 60
})
cameras.append(
{
"id": f"SGP-{item.get('camera_id', 'UNK')}",
"source_agency": "Singapore LTA",
"lat": loc.get("latitude"),
"lon": loc.get("longitude"),
"direction_facing": f"Camera {item.get('camera_id')}",
"media_url": item.get("image"),
"refresh_rate_seconds": 60,
}
)
return cameras
class AustinTXIngestor(BaseCCTVIngestor):
def fetch_data(self) -> List[Dict[str, Any]]:
# City of Austin Traffic Cameras Open Data
url = "https://data.austintexas.gov/resource/b4k4-adkb.json?$limit=2000"
response = fetch_with_curl(url, timeout=15)
response.raise_for_status()
data = response.json()
cameras = []
for item in data:
cam_id = item.get("camera_id")
if not cam_id: continue
if not cam_id:
continue
loc = item.get("location", {})
coords = loc.get("coordinates", [])
# coords is usually [lon, lat]
if len(coords) == 2:
cameras.append({
"id": f"ATX-{cam_id}",
"source_agency": "Austin TxDOT",
"lat": coords[1],
"lon": coords[0],
"direction_facing": item.get("location_name", "Austin TX Camera"),
"media_url": f"https://cctv.austinmobility.io/image/{cam_id}.jpg",
"refresh_rate_seconds": 60
})
cameras.append(
{
"id": f"ATX-{cam_id}",
"source_agency": "Austin TxDOT",
"lat": coords[1],
"lon": coords[0],
"direction_facing": item.get(
"location_name", "Austin TX Camera"
),
"media_url": f"https://cctv.austinmobility.io/image/{cam_id}.jpg",
"refresh_rate_seconds": 60,
}
)
return cameras
class NYCDOTIngestor(BaseCCTVIngestor):
def fetch_data(self) -> List[Dict[str, Any]]:
url = "https://webcams.nyctmc.org/api/cameras"
response = fetch_with_curl(url, timeout=15)
response.raise_for_status()
data = response.json()
cameras = []
for item in data:
cam_id = item.get("id")
if not cam_id: continue
if not cam_id:
continue
lat = item.get("latitude")
lon = item.get("longitude")
if lat and lon:
cameras.append({
"id": f"NYC-{cam_id}",
"source_agency": "NYC DOT",
"lat": lat,
"lon": lon,
"direction_facing": item.get("name", "NYC Camera"),
"media_url": f"https://webcams.nyctmc.org/api/cameras/{cam_id}/image",
"refresh_rate_seconds": 30
})
cameras.append(
{
"id": f"NYC-{cam_id}",
"source_agency": "NYC DOT",
"lat": lat,
"lon": lon,
"direction_facing": item.get("name", "NYC Camera"),
"media_url": f"https://webcams.nyctmc.org/api/cameras/{cam_id}/image",
"refresh_rate_seconds": 30,
}
)
return cameras
class GlobalOSMCrawlingIngestor(BaseCCTVIngestor):
def fetch_data(self) -> List[Dict[str, Any]]:
# This will pull physical street surveillance cameras across all global hotspots
@@ -193,77 +227,87 @@ class GlobalOSMCrawlingIngestor(BaseCCTVIngestor):
("25.1,55.2,25.3,55.4", "Dubai"),
("19.3,-99.2,19.5,-99.0", "Mexico City"),
("-23.6,-46.7,-23.4,-46.5", "Sao Paulo"),
("39.6,-105.1,39.9,-104.8", "Denver")
("39.6,-105.1,39.9,-104.8", "Denver"),
]
query_parts = [
f'node["man_made"="surveillance"]({bbox});' for bbox, city in regions
]
query_parts = [f'node["man_made"="surveillance"]({bbox});' for bbox, city in regions]
query = "".join(query_parts)
url = f"https://overpass-api.de/api/interpreter?data=[out:json];({query});out%202000;"
try:
response = fetch_with_curl(url, timeout=15)
response.raise_for_status()
data = response.json()
cameras = []
for item in data.get('elements', []):
for item in data.get("elements", []):
lat = item.get("lat")
lon = item.get("lon")
cam_id = item.get("id")
if lat and lon:
# Find which city this belongs to
source_city = "Global OSINT"
for bbox, city in regions:
s, w, n, e = map(float, bbox.split(','))
s, w, n, e = map(float, bbox.split(","))
if s <= lat <= n and w <= lon <= e:
source_city = f"OSINT: {city}"
break
# Attempt to parse camera direction for a cool realistic bearing angle if OSM mapped it
direction_str = item.get("tags", {}).get("camera:direction", "0")
try:
bearing = int(float(direction_str))
except (ValueError, TypeError):
bearing = 0
mapbox_key = "YOUR_MAPBOX_TOKEN_HERE"
mapbox_url = f"https://api.mapbox.com/styles/v1/mapbox/satellite-streets-v12/static/{lon},{lat},18,{bearing},60/600x400?access_token={mapbox_key}"
cameras.append({
"id": f"OSM-{cam_id}",
"source_agency": source_city,
"lat": lat,
"lon": lon,
"direction_facing": item.get("tags", {}).get("surveillance:type", "Street Level Camera"),
"media_url": mapbox_url,
"refresh_rate_seconds": 3600
})
cameras.append(
{
"id": f"OSM-{cam_id}",
"source_agency": source_city,
"lat": lat,
"lon": lon,
"direction_facing": item.get("tags", {}).get(
"surveillance:type", "Street Level Camera"
),
"media_url": mapbox_url,
"refresh_rate_seconds": 3600,
}
)
return cameras
except Exception:
return []
def _detect_media_type(url: str) -> str:
"""Detect the media type from a camera URL for proper frontend rendering."""
if not url:
return "image"
url_lower = url.lower()
if any(ext in url_lower for ext in ['.mp4', '.webm', '.ogg']):
if any(ext in url_lower for ext in [".mp4", ".webm", ".ogg"]):
return "video"
if any(kw in url_lower for kw in ['.mjpg', '.mjpeg', 'mjpg', 'axis-cgi/mjpg', 'mode=motion']):
if any(
kw in url_lower
for kw in [".mjpg", ".mjpeg", "mjpg", "axis-cgi/mjpg", "mode=motion"]
):
return "mjpeg"
if '.m3u8' in url_lower or 'hls' in url_lower:
if ".m3u8" in url_lower or "hls" in url_lower:
return "hls"
if any(kw in url_lower for kw in ['embed', 'maps/embed', 'iframe']):
if any(kw in url_lower for kw in ["embed", "maps/embed", "iframe"]):
return "embed"
if 'mapbox.com' in url_lower or 'satellite' in url_lower:
if "mapbox.com" in url_lower or "satellite" in url_lower:
return "satellite"
return "image"
def get_all_cameras() -> List[Dict[str, Any]]:
conn = sqlite3.connect(DB_PATH)
init_db()
conn = _connect()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM cameras")
@@ -272,7 +316,6 @@ def get_all_cameras() -> List[Dict[str, Any]]:
cameras = []
for row in rows:
cam = dict(row)
cam['media_type'] = _detect_media_type(cam.get('media_url', ''))
cam["media_type"] = _detect_media_type(cam.get("media_url", ""))
cameras.append(cam)
return cameras
+96 -25
View File
@@ -13,10 +13,12 @@ Heavy logic has been extracted into services/fetchers/:
- infrastructure.py — internet outages, data centers, CCTV, KiwiSDR
- geo.py — ships, airports, frontlines, GDELT, LiveUAMap
"""
import logging
import concurrent.futures
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
from apscheduler.schedulers.background import BackgroundScheduler
@@ -24,7 +26,10 @@ from services.cctv_pipeline import init_db
# Shared state — all fetcher modules read/write through this
from services.fetchers._store import (
latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — re-exported for main.py
latest_data,
source_timestamps,
_mark_fresh,
_data_lock, # noqa: F401 — re-exported for main.py
)
# Domain-specific fetcher modules (already extracted)
@@ -37,19 +42,55 @@ from services.fetchers.news import fetch_news # noqa: F401
# Newly extracted fetcher modules
from services.fetchers.financial import fetch_defense_stocks, fetch_oil_prices # noqa: F401
from services.fetchers.earth_observation import ( # noqa: F401
fetch_earthquakes, fetch_firms_fires, fetch_space_weather, fetch_weather,
fetch_earthquakes,
fetch_firms_fires,
fetch_space_weather,
fetch_weather,
)
from services.fetchers.infrastructure import ( # noqa: F401
fetch_internet_outages, fetch_datacenters, fetch_military_bases, fetch_power_plants,
fetch_cctv, fetch_kiwisdr,
fetch_internet_outages,
fetch_datacenters,
fetch_military_bases,
fetch_power_plants,
fetch_cctv,
fetch_kiwisdr,
)
from services.fetchers.geo import ( # noqa: F401
fetch_ships, fetch_airports, find_nearest_airport, cached_airports,
fetch_frontlines, fetch_gdelt, fetch_geopolitics, update_liveuamap,
fetch_ships,
fetch_airports,
find_nearest_airport,
cached_airports,
fetch_frontlines,
fetch_gdelt,
fetch_geopolitics,
update_liveuamap,
)
logger = logging.getLogger(__name__)
def run_cctv_ingest_cycle():
"""Populate the CCTV database, then refresh the in-memory cache."""
logger.info("CCTV ingest cycle starting...")
from services.cctv_pipeline import (
AustinTXIngestor,
LTASingaporeIngestor,
NYCDOTIngestor,
TFLJamCamIngestor,
)
for ingestor_cls in (
TFLJamCamIngestor,
LTASingaporeIngestor,
AustinTXIngestor,
NYCDOTIngestor,
):
ingestor_cls().ingest()
fetch_cctv()
logger.info("CCTV ingest cycle complete.")
# ---------------------------------------------------------------------------
# Scheduler & Orchestration
# ---------------------------------------------------------------------------
@@ -66,9 +107,10 @@ def update_fast_data():
futures = [executor.submit(func) for func in fast_funcs]
concurrent.futures.wait(futures)
with _data_lock:
latest_data['last_updated'] = datetime.utcnow().isoformat()
latest_data["last_updated"] = datetime.utcnow().isoformat()
logger.info("Fast-tier update complete.")
def update_slow_data():
"""Slow-tier: contextual + enrichment data that refreshes less often (every 510 min)."""
logger.info("Slow-tier data update starting...")
@@ -94,6 +136,7 @@ def update_slow_data():
concurrent.futures.wait(futures)
logger.info("Slow-tier update complete.")
def update_all_data():
"""Full refresh — all tiers run IN PARALLEL for fastest startup."""
logger.info("Full data update starting (parallel)...")
@@ -104,45 +147,73 @@ def update_all_data():
concurrent.futures.wait([f0, f1, f2])
logger.info("Full data update complete.")
_scheduler = None
def start_scheduler():
global _scheduler
init_db()
_scheduler = BackgroundScheduler(daemon=True)
# Fast tier — every 60 seconds
_scheduler.add_job(update_fast_data, 'interval', seconds=60, id='fast_tier', max_instances=1, misfire_grace_time=30)
_scheduler.add_job(
update_fast_data,
"interval",
seconds=60,
id="fast_tier",
max_instances=1,
misfire_grace_time=30,
)
# Slow tier — every 5 minutes
_scheduler.add_job(update_slow_data, 'interval', minutes=5, id='slow_tier', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(
update_slow_data,
"interval",
minutes=5,
id="slow_tier",
max_instances=1,
misfire_grace_time=120,
)
# Very slow — every 15 minutes
_scheduler.add_job(fetch_gdelt, 'interval', minutes=15, id='gdelt', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(update_liveuamap, 'interval', minutes=15, id='liveuamap', max_instances=1, misfire_grace_time=120)
# CCTV pipeline refresh — every 10 minutes
# Instantiate once and reuse — avoids re-creating DB connections on every tick
from services.cctv_pipeline import (
TFLJamCamIngestor, LTASingaporeIngestor,
AustinTXIngestor, NYCDOTIngestor,
_scheduler.add_job(
fetch_gdelt,
"interval",
minutes=15,
id="gdelt",
max_instances=1,
misfire_grace_time=120,
)
_scheduler.add_job(
update_liveuamap,
"interval",
minutes=15,
id="liveuamap",
max_instances=1,
misfire_grace_time=120,
)
# CCTV pipeline refresh — hydrate DB-backed cameras immediately, then every 10 minutes.
_scheduler.add_job(
run_cctv_ingest_cycle,
"interval",
minutes=10,
id="cctv_ingest",
max_instances=1,
misfire_grace_time=120,
next_run_time=datetime.utcnow(),
)
_cctv_tfl = TFLJamCamIngestor()
_cctv_lta = LTASingaporeIngestor()
_cctv_atx = AustinTXIngestor()
_cctv_nyc = NYCDOTIngestor()
_scheduler.add_job(_cctv_tfl.ingest, 'interval', minutes=10, id='cctv_tfl', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_lta.ingest, 'interval', minutes=10, id='cctv_lta', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_atx.ingest, 'interval', minutes=10, id='cctv_atx', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_nyc.ingest, 'interval', minutes=10, id='cctv_nyc', max_instances=1, misfire_grace_time=120)
_scheduler.start()
logger.info("Scheduler started.")
def stop_scheduler():
if _scheduler:
_scheduler.shutdown(wait=False)
def get_latest_data():
with _data_lock:
return dict(latest_data)
+79
View File
@@ -0,0 +1,79 @@
"""Regression tests for CCTV ingestion and persistence."""
import threading
from services import cctv_pipeline
class DummyIngestor(cctv_pipeline.BaseCCTVIngestor):
def __init__(self, cameras):
self._cameras = cameras
def fetch_data(self):
return self._cameras
def test_ingestor_can_run_from_another_thread(tmp_path, monkeypatch):
db_path = tmp_path / "data" / "cctv.db"
monkeypatch.setattr(cctv_pipeline, "DB_PATH", db_path)
ingestor = DummyIngestor(
[
{
"id": "cam-1",
"source_agency": "Test",
"lat": 51.5,
"lon": -0.12,
"direction_facing": "North",
"media_url": "https://example.com/camera.jpg",
"refresh_rate_seconds": 30,
}
]
)
thread = threading.Thread(target=ingestor.ingest)
thread.start()
thread.join()
cameras = cctv_pipeline.get_all_cameras()
assert len(cameras) == 1
assert cameras[0]["id"] == "cam-1"
assert cameras[0]["media_type"] == "image"
def test_ingest_updates_existing_rows_in_persistent_data_dir(tmp_path, monkeypatch):
db_path = tmp_path / "persistent" / "cctv.db"
monkeypatch.setattr(cctv_pipeline, "DB_PATH", db_path)
DummyIngestor(
[
{
"id": "cam-2",
"source_agency": "Test",
"lat": 40.71,
"lon": -74.0,
"direction_facing": "East",
"media_url": "https://example.com/old.jpg",
"refresh_rate_seconds": 60,
}
]
).ingest()
DummyIngestor(
[
{
"id": "cam-2",
"source_agency": "Test",
"lat": 40.71,
"lon": -74.0,
"direction_facing": "East",
"media_url": "https://example.com/live.m3u8",
"refresh_rate_seconds": 60,
}
]
).ingest()
cameras = cctv_pipeline.get_all_cameras()
assert db_path.exists()
assert len(cameras) == 1
assert cameras[0]["media_url"] == "https://example.com/live.m3u8"
assert cameras[0]["media_type"] == "hls"
@@ -593,6 +593,8 @@ const MaplibreViewer = ({ data, activeLayers, onEntityClick, flyToLocation, sele
frontlineGeoJSON && 'ukraine-frontline-layer',
earthquakesGeoJSON && 'earthquakes-layer',
satellitesGeoJSON && 'satellites-layer',
cctvGeoJSON && 'cctv-clusters',
cctvGeoJSON && 'cctv-cluster-count',
cctvGeoJSON && 'cctv-layer',
kiwisdrGeoJSON && 'kiwisdr-clusters',
kiwisdrGeoJSON && 'kiwisdr-layer',