From 231f0afc4e22f3ba0ed5a12e36232d8b8fafedf1 Mon Sep 17 00:00:00 2001 From: Wa1iD Date: Fri, 20 Mar 2026 22:05:05 +0100 Subject: [PATCH] fix: restore CCTV layer ingestion and map rendering --- backend/services/cctv_pipeline.py | 283 ++++++++++++--------- backend/services/data_fetcher.py | 121 +++++++-- backend/tests/test_cctv_pipeline.py | 79 ++++++ frontend/src/components/MaplibreViewer.tsx | 2 + 4 files changed, 340 insertions(+), 145 deletions(-) create mode 100644 backend/tests/test_cctv_pipeline.py diff --git a/backend/services/cctv_pipeline.py b/backend/services/cctv_pipeline.py index 37efcc2..1a3b89d 100644 --- a/backend/services/cctv_pipeline.py +++ b/backend/services/cctv_pipeline.py @@ -1,69 +1,89 @@ -import sqlite3 -import requests -from services.network_utils import fetch_with_curl import logging +import sqlite3 from abc import ABC, abstractmethod -from typing import List, Dict, Any +from pathlib import Path +from typing import Any, Dict, List + +from services.network_utils import fetch_with_curl logger = logging.getLogger(__name__) -DB_PATH = "cctv.db" +DB_PATH = Path(__file__).resolve().parent.parent / "data" / "cctv.db" + + +def _connect() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + return sqlite3.connect(str(DB_PATH)) + def init_db(): - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - CREATE TABLE IF NOT EXISTS cameras ( - id TEXT PRIMARY KEY, - source_agency TEXT, - lat REAL, - lon REAL, - direction_facing TEXT, - media_url TEXT, - refresh_rate_seconds INTEGER, - last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - conn.commit() - conn.close() + conn = _connect() + try: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS cameras ( + id TEXT PRIMARY KEY, + source_agency TEXT, + lat REAL, + lon REAL, + direction_facing TEXT, + media_url TEXT, + refresh_rate_seconds INTEGER, + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.commit() + finally: + conn.close() + class BaseCCTVIngestor(ABC): - def __init__(self): - self.conn = sqlite3.connect(DB_PATH) - @abstractmethod def fetch_data(self) -> List[Dict[str, Any]]: pass def ingest(self): + conn = None try: + init_db() cameras = self.fetch_data() - cursor = self.conn.cursor() + conn = _connect() + cursor = conn.cursor() for cam in cameras: - cursor.execute(""" + cursor.execute( + """ INSERT INTO cameras (id, source_agency, lat, lon, direction_facing, media_url, refresh_rate_seconds) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET media_url=excluded.media_url, last_updated=CURRENT_TIMESTAMP - """, ( - cam.get("id"), - cam.get("source_agency"), - cam.get("lat"), - cam.get("lon"), - cam.get("direction_facing", "Unknown"), - cam.get("media_url"), - cam.get("refresh_rate_seconds", 60) - )) - self.conn.commit() - logger.info(f"Successfully ingested {len(cameras)} cameras from {self.__class__.__name__}") + """, + ( + cam.get("id"), + cam.get("source_agency"), + cam.get("lat"), + cam.get("lon"), + cam.get("direction_facing", "Unknown"), + cam.get("media_url"), + cam.get("refresh_rate_seconds", 60), + ), + ) + conn.commit() + logger.info( + f"Successfully ingested {len(cameras)} cameras from {self.__class__.__name__}" + ) except Exception as e: try: - self.conn.rollback() + if conn is not None: + conn.rollback() except Exception: pass logger.error(f"Failed to ingest cameras in {self.__class__.__name__}: {e}") + finally: + if conn is not None: + conn.close() + class TFLJamCamIngestor(BaseCCTVIngestor): def fetch_data(self) -> List[Dict[str, Any]]: @@ -71,114 +91,128 @@ class TFLJamCamIngestor(BaseCCTVIngestor): url = "https://api.tfl.gov.uk/Place/Type/JamCam" response = fetch_with_curl(url, timeout=15) response.raise_for_status() - + data = response.json() cameras = [] for item in data: # TfL returns URLs without protocols sometimes or with a base path vid_url = None img_url = None - - for prop in item.get('additionalProperties', []): - if prop.get('key') == 'videoUrl': - vid_url = prop.get('value') - elif prop.get('key') == 'imageUrl': - img_url = prop.get('value') - + + for prop in item.get("additionalProperties", []): + if prop.get("key") == "videoUrl": + vid_url = prop.get("value") + elif prop.get("key") == "imageUrl": + img_url = prop.get("value") + media = vid_url if vid_url else img_url if media: - cameras.append({ - "id": f"TFL-{item.get('id')}", - "source_agency": "TfL", - "lat": item.get('lat'), - "lon": item.get('lon'), - "direction_facing": item.get('commonName', 'Unknown'), - "media_url": media, - "refresh_rate_seconds": 15 - }) + cameras.append( + { + "id": f"TFL-{item.get('id')}", + "source_agency": "TfL", + "lat": item.get("lat"), + "lon": item.get("lon"), + "direction_facing": item.get("commonName", "Unknown"), + "media_url": media, + "refresh_rate_seconds": 15, + } + ) return cameras + class LTASingaporeIngestor(BaseCCTVIngestor): def fetch_data(self) -> List[Dict[str, Any]]: # Singapore Land Transport Authority (LTA) Traffic Images API url = "https://api.data.gov.sg/v1/transport/traffic-images" response = fetch_with_curl(url, timeout=15) response.raise_for_status() - + data = response.json() cameras = [] if "items" in data and len(data["items"]) > 0: for item in data["items"][0].get("cameras", []): loc = item.get("location", {}) if "latitude" in loc and "longitude" in loc and "image" in item: - cameras.append({ - "id": f"SGP-{item.get('camera_id', 'UNK')}", - "source_agency": "Singapore LTA", - "lat": loc.get("latitude"), - "lon": loc.get("longitude"), - "direction_facing": f"Camera {item.get('camera_id')}", - "media_url": item.get("image"), - "refresh_rate_seconds": 60 - }) + cameras.append( + { + "id": f"SGP-{item.get('camera_id', 'UNK')}", + "source_agency": "Singapore LTA", + "lat": loc.get("latitude"), + "lon": loc.get("longitude"), + "direction_facing": f"Camera {item.get('camera_id')}", + "media_url": item.get("image"), + "refresh_rate_seconds": 60, + } + ) return cameras - class AustinTXIngestor(BaseCCTVIngestor): def fetch_data(self) -> List[Dict[str, Any]]: # City of Austin Traffic Cameras Open Data url = "https://data.austintexas.gov/resource/b4k4-adkb.json?$limit=2000" response = fetch_with_curl(url, timeout=15) response.raise_for_status() - + data = response.json() cameras = [] for item in data: cam_id = item.get("camera_id") - if not cam_id: continue - + if not cam_id: + continue + loc = item.get("location", {}) coords = loc.get("coordinates", []) - + # coords is usually [lon, lat] if len(coords) == 2: - cameras.append({ - "id": f"ATX-{cam_id}", - "source_agency": "Austin TxDOT", - "lat": coords[1], - "lon": coords[0], - "direction_facing": item.get("location_name", "Austin TX Camera"), - "media_url": f"https://cctv.austinmobility.io/image/{cam_id}.jpg", - "refresh_rate_seconds": 60 - }) + cameras.append( + { + "id": f"ATX-{cam_id}", + "source_agency": "Austin TxDOT", + "lat": coords[1], + "lon": coords[0], + "direction_facing": item.get( + "location_name", "Austin TX Camera" + ), + "media_url": f"https://cctv.austinmobility.io/image/{cam_id}.jpg", + "refresh_rate_seconds": 60, + } + ) return cameras + class NYCDOTIngestor(BaseCCTVIngestor): def fetch_data(self) -> List[Dict[str, Any]]: url = "https://webcams.nyctmc.org/api/cameras" response = fetch_with_curl(url, timeout=15) response.raise_for_status() - + data = response.json() cameras = [] for item in data: cam_id = item.get("id") - if not cam_id: continue - + if not cam_id: + continue + lat = item.get("latitude") lon = item.get("longitude") if lat and lon: - cameras.append({ - "id": f"NYC-{cam_id}", - "source_agency": "NYC DOT", - "lat": lat, - "lon": lon, - "direction_facing": item.get("name", "NYC Camera"), - "media_url": f"https://webcams.nyctmc.org/api/cameras/{cam_id}/image", - "refresh_rate_seconds": 30 - }) + cameras.append( + { + "id": f"NYC-{cam_id}", + "source_agency": "NYC DOT", + "lat": lat, + "lon": lon, + "direction_facing": item.get("name", "NYC Camera"), + "media_url": f"https://webcams.nyctmc.org/api/cameras/{cam_id}/image", + "refresh_rate_seconds": 30, + } + ) return cameras + class GlobalOSMCrawlingIngestor(BaseCCTVIngestor): def fetch_data(self) -> List[Dict[str, Any]]: # This will pull physical street surveillance cameras across all global hotspots @@ -193,77 +227,87 @@ class GlobalOSMCrawlingIngestor(BaseCCTVIngestor): ("25.1,55.2,25.3,55.4", "Dubai"), ("19.3,-99.2,19.5,-99.0", "Mexico City"), ("-23.6,-46.7,-23.4,-46.5", "Sao Paulo"), - ("39.6,-105.1,39.9,-104.8", "Denver") + ("39.6,-105.1,39.9,-104.8", "Denver"), + ] + + query_parts = [ + f'node["man_made"="surveillance"]({bbox});' for bbox, city in regions ] - - query_parts = [f'node["man_made"="surveillance"]({bbox});' for bbox, city in regions] query = "".join(query_parts) url = f"https://overpass-api.de/api/interpreter?data=[out:json];({query});out%202000;" - + try: response = fetch_with_curl(url, timeout=15) response.raise_for_status() data = response.json() - + cameras = [] - for item in data.get('elements', []): + for item in data.get("elements", []): lat = item.get("lat") lon = item.get("lon") cam_id = item.get("id") - + if lat and lon: # Find which city this belongs to source_city = "Global OSINT" for bbox, city in regions: - s, w, n, e = map(float, bbox.split(',')) + s, w, n, e = map(float, bbox.split(",")) if s <= lat <= n and w <= lon <= e: source_city = f"OSINT: {city}" break - + # Attempt to parse camera direction for a cool realistic bearing angle if OSM mapped it direction_str = item.get("tags", {}).get("camera:direction", "0") try: bearing = int(float(direction_str)) except (ValueError, TypeError): bearing = 0 - + mapbox_key = "YOUR_MAPBOX_TOKEN_HERE" mapbox_url = f"https://api.mapbox.com/styles/v1/mapbox/satellite-streets-v12/static/{lon},{lat},18,{bearing},60/600x400?access_token={mapbox_key}" - - cameras.append({ - "id": f"OSM-{cam_id}", - "source_agency": source_city, - "lat": lat, - "lon": lon, - "direction_facing": item.get("tags", {}).get("surveillance:type", "Street Level Camera"), - "media_url": mapbox_url, - "refresh_rate_seconds": 3600 - }) + + cameras.append( + { + "id": f"OSM-{cam_id}", + "source_agency": source_city, + "lat": lat, + "lon": lon, + "direction_facing": item.get("tags", {}).get( + "surveillance:type", "Street Level Camera" + ), + "media_url": mapbox_url, + "refresh_rate_seconds": 3600, + } + ) return cameras except Exception: return [] - def _detect_media_type(url: str) -> str: """Detect the media type from a camera URL for proper frontend rendering.""" if not url: return "image" url_lower = url.lower() - if any(ext in url_lower for ext in ['.mp4', '.webm', '.ogg']): + if any(ext in url_lower for ext in [".mp4", ".webm", ".ogg"]): return "video" - if any(kw in url_lower for kw in ['.mjpg', '.mjpeg', 'mjpg', 'axis-cgi/mjpg', 'mode=motion']): + if any( + kw in url_lower + for kw in [".mjpg", ".mjpeg", "mjpg", "axis-cgi/mjpg", "mode=motion"] + ): return "mjpeg" - if '.m3u8' in url_lower or 'hls' in url_lower: + if ".m3u8" in url_lower or "hls" in url_lower: return "hls" - if any(kw in url_lower for kw in ['embed', 'maps/embed', 'iframe']): + if any(kw in url_lower for kw in ["embed", "maps/embed", "iframe"]): return "embed" - if 'mapbox.com' in url_lower or 'satellite' in url_lower: + if "mapbox.com" in url_lower or "satellite" in url_lower: return "satellite" return "image" + def get_all_cameras() -> List[Dict[str, Any]]: - conn = sqlite3.connect(DB_PATH) + init_db() + conn = _connect() conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM cameras") @@ -272,7 +316,6 @@ def get_all_cameras() -> List[Dict[str, Any]]: cameras = [] for row in rows: cam = dict(row) - cam['media_type'] = _detect_media_type(cam.get('media_url', '')) + cam["media_type"] = _detect_media_type(cam.get("media_url", "")) cameras.append(cam) return cameras - diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 9edfd30..2e1a90a 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -13,10 +13,12 @@ Heavy logic has been extracted into services/fetchers/: - infrastructure.py — internet outages, data centers, CCTV, KiwiSDR - geo.py — ships, airports, frontlines, GDELT, LiveUAMap """ + import logging import concurrent.futures from datetime import datetime from dotenv import load_dotenv + load_dotenv() from apscheduler.schedulers.background import BackgroundScheduler @@ -24,7 +26,10 @@ from services.cctv_pipeline import init_db # Shared state — all fetcher modules read/write through this from services.fetchers._store import ( - latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — re-exported for main.py + latest_data, + source_timestamps, + _mark_fresh, + _data_lock, # noqa: F401 — re-exported for main.py ) # Domain-specific fetcher modules (already extracted) @@ -37,19 +42,55 @@ from services.fetchers.news import fetch_news # noqa: F401 # Newly extracted fetcher modules from services.fetchers.financial import fetch_defense_stocks, fetch_oil_prices # noqa: F401 from services.fetchers.earth_observation import ( # noqa: F401 - fetch_earthquakes, fetch_firms_fires, fetch_space_weather, fetch_weather, + fetch_earthquakes, + fetch_firms_fires, + fetch_space_weather, + fetch_weather, ) from services.fetchers.infrastructure import ( # noqa: F401 - fetch_internet_outages, fetch_datacenters, fetch_military_bases, fetch_power_plants, - fetch_cctv, fetch_kiwisdr, + fetch_internet_outages, + fetch_datacenters, + fetch_military_bases, + fetch_power_plants, + fetch_cctv, + fetch_kiwisdr, ) from services.fetchers.geo import ( # noqa: F401 - fetch_ships, fetch_airports, find_nearest_airport, cached_airports, - fetch_frontlines, fetch_gdelt, fetch_geopolitics, update_liveuamap, + fetch_ships, + fetch_airports, + find_nearest_airport, + cached_airports, + fetch_frontlines, + fetch_gdelt, + fetch_geopolitics, + update_liveuamap, ) logger = logging.getLogger(__name__) + +def run_cctv_ingest_cycle(): + """Populate the CCTV database, then refresh the in-memory cache.""" + logger.info("CCTV ingest cycle starting...") + from services.cctv_pipeline import ( + AustinTXIngestor, + LTASingaporeIngestor, + NYCDOTIngestor, + TFLJamCamIngestor, + ) + + for ingestor_cls in ( + TFLJamCamIngestor, + LTASingaporeIngestor, + AustinTXIngestor, + NYCDOTIngestor, + ): + ingestor_cls().ingest() + + fetch_cctv() + logger.info("CCTV ingest cycle complete.") + + # --------------------------------------------------------------------------- # Scheduler & Orchestration # --------------------------------------------------------------------------- @@ -66,9 +107,10 @@ def update_fast_data(): futures = [executor.submit(func) for func in fast_funcs] concurrent.futures.wait(futures) with _data_lock: - latest_data['last_updated'] = datetime.utcnow().isoformat() + latest_data["last_updated"] = datetime.utcnow().isoformat() logger.info("Fast-tier update complete.") + def update_slow_data(): """Slow-tier: contextual + enrichment data that refreshes less often (every 5–10 min).""" logger.info("Slow-tier data update starting...") @@ -94,6 +136,7 @@ def update_slow_data(): concurrent.futures.wait(futures) logger.info("Slow-tier update complete.") + def update_all_data(): """Full refresh — all tiers run IN PARALLEL for fastest startup.""" logger.info("Full data update starting (parallel)...") @@ -104,45 +147,73 @@ def update_all_data(): concurrent.futures.wait([f0, f1, f2]) logger.info("Full data update complete.") + _scheduler = None + def start_scheduler(): global _scheduler init_db() _scheduler = BackgroundScheduler(daemon=True) # Fast tier — every 60 seconds - _scheduler.add_job(update_fast_data, 'interval', seconds=60, id='fast_tier', max_instances=1, misfire_grace_time=30) + _scheduler.add_job( + update_fast_data, + "interval", + seconds=60, + id="fast_tier", + max_instances=1, + misfire_grace_time=30, + ) # Slow tier — every 5 minutes - _scheduler.add_job(update_slow_data, 'interval', minutes=5, id='slow_tier', max_instances=1, misfire_grace_time=120) + _scheduler.add_job( + update_slow_data, + "interval", + minutes=5, + id="slow_tier", + max_instances=1, + misfire_grace_time=120, + ) # Very slow — every 15 minutes - _scheduler.add_job(fetch_gdelt, 'interval', minutes=15, id='gdelt', max_instances=1, misfire_grace_time=120) - _scheduler.add_job(update_liveuamap, 'interval', minutes=15, id='liveuamap', max_instances=1, misfire_grace_time=120) - - # CCTV pipeline refresh — every 10 minutes - # Instantiate once and reuse — avoids re-creating DB connections on every tick - from services.cctv_pipeline import ( - TFLJamCamIngestor, LTASingaporeIngestor, - AustinTXIngestor, NYCDOTIngestor, + _scheduler.add_job( + fetch_gdelt, + "interval", + minutes=15, + id="gdelt", + max_instances=1, + misfire_grace_time=120, + ) + _scheduler.add_job( + update_liveuamap, + "interval", + minutes=15, + id="liveuamap", + max_instances=1, + misfire_grace_time=120, + ) + + # CCTV pipeline refresh — hydrate DB-backed cameras immediately, then every 10 minutes. + _scheduler.add_job( + run_cctv_ingest_cycle, + "interval", + minutes=10, + id="cctv_ingest", + max_instances=1, + misfire_grace_time=120, + next_run_time=datetime.utcnow(), ) - _cctv_tfl = TFLJamCamIngestor() - _cctv_lta = LTASingaporeIngestor() - _cctv_atx = AustinTXIngestor() - _cctv_nyc = NYCDOTIngestor() - _scheduler.add_job(_cctv_tfl.ingest, 'interval', minutes=10, id='cctv_tfl', max_instances=1, misfire_grace_time=120) - _scheduler.add_job(_cctv_lta.ingest, 'interval', minutes=10, id='cctv_lta', max_instances=1, misfire_grace_time=120) - _scheduler.add_job(_cctv_atx.ingest, 'interval', minutes=10, id='cctv_atx', max_instances=1, misfire_grace_time=120) - _scheduler.add_job(_cctv_nyc.ingest, 'interval', minutes=10, id='cctv_nyc', max_instances=1, misfire_grace_time=120) _scheduler.start() logger.info("Scheduler started.") + def stop_scheduler(): if _scheduler: _scheduler.shutdown(wait=False) + def get_latest_data(): with _data_lock: return dict(latest_data) diff --git a/backend/tests/test_cctv_pipeline.py b/backend/tests/test_cctv_pipeline.py new file mode 100644 index 0000000..0f435af --- /dev/null +++ b/backend/tests/test_cctv_pipeline.py @@ -0,0 +1,79 @@ +"""Regression tests for CCTV ingestion and persistence.""" + +import threading + +from services import cctv_pipeline + + +class DummyIngestor(cctv_pipeline.BaseCCTVIngestor): + def __init__(self, cameras): + self._cameras = cameras + + def fetch_data(self): + return self._cameras + + +def test_ingestor_can_run_from_another_thread(tmp_path, monkeypatch): + db_path = tmp_path / "data" / "cctv.db" + monkeypatch.setattr(cctv_pipeline, "DB_PATH", db_path) + + ingestor = DummyIngestor( + [ + { + "id": "cam-1", + "source_agency": "Test", + "lat": 51.5, + "lon": -0.12, + "direction_facing": "North", + "media_url": "https://example.com/camera.jpg", + "refresh_rate_seconds": 30, + } + ] + ) + + thread = threading.Thread(target=ingestor.ingest) + thread.start() + thread.join() + + cameras = cctv_pipeline.get_all_cameras() + assert len(cameras) == 1 + assert cameras[0]["id"] == "cam-1" + assert cameras[0]["media_type"] == "image" + + +def test_ingest_updates_existing_rows_in_persistent_data_dir(tmp_path, monkeypatch): + db_path = tmp_path / "persistent" / "cctv.db" + monkeypatch.setattr(cctv_pipeline, "DB_PATH", db_path) + + DummyIngestor( + [ + { + "id": "cam-2", + "source_agency": "Test", + "lat": 40.71, + "lon": -74.0, + "direction_facing": "East", + "media_url": "https://example.com/old.jpg", + "refresh_rate_seconds": 60, + } + ] + ).ingest() + DummyIngestor( + [ + { + "id": "cam-2", + "source_agency": "Test", + "lat": 40.71, + "lon": -74.0, + "direction_facing": "East", + "media_url": "https://example.com/live.m3u8", + "refresh_rate_seconds": 60, + } + ] + ).ingest() + + cameras = cctv_pipeline.get_all_cameras() + assert db_path.exists() + assert len(cameras) == 1 + assert cameras[0]["media_url"] == "https://example.com/live.m3u8" + assert cameras[0]["media_type"] == "hls" diff --git a/frontend/src/components/MaplibreViewer.tsx b/frontend/src/components/MaplibreViewer.tsx index 3133866..74b6936 100644 --- a/frontend/src/components/MaplibreViewer.tsx +++ b/frontend/src/components/MaplibreViewer.tsx @@ -593,6 +593,8 @@ const MaplibreViewer = ({ data, activeLayers, onEntityClick, flyToLocation, sele frontlineGeoJSON && 'ukraine-frontline-layer', earthquakesGeoJSON && 'earthquakes-layer', satellitesGeoJSON && 'satellites-layer', + cctvGeoJSON && 'cctv-clusters', + cctvGeoJSON && 'cctv-cluster-count', cctvGeoJSON && 'cctv-layer', kiwisdrGeoJSON && 'kiwisdr-clusters', kiwisdrGeoJSON && 'kiwisdr-layer',