Add Sentinel-2 road freight trends with Analyze Here UI.

Port DrishX truck-motion detection as an opt-in slow layer: on-demand map-center analysis, preset corridors, layer panel toggle, and Docker road-corridor extras.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-07 23:39:13 -06:00
parent 76f4deb3a7
commit b64b9e0962
33 changed files with 3127 additions and 21 deletions
+13
View File
@@ -77,6 +77,19 @@ ADMIN_KEY=
# pip install earthengine-api
# GEE_SERVICE_ACCOUNT_KEY=
# Copernicus CDSE — Sentinel-2 imagery (Settings → Imagery, or backend .env).
# Free OAuth app at https://dataspace.copernicus.eu/
# SENTINEL_CLIENT_ID=
# SENTINEL_CLIENT_SECRET=
# Sentinel-2 road corridor freight trends (DrishX engine port — opt-in slow layer).
# pip install -e backend[road-corridor] (or uv sync --extra road-corridor)
# ROAD_CORRIDOR_SAT_ENABLED=false
# ROAD_CORRIDOR_SCHEDULED_PRESETS=laredo_i35
# ROAD_CORRIDOR_MONTHS=2
# ROAD_CORRIDOR_MAX_FRAMES=6
# ROAD_CORRIDOR_REFRESH_HOURS=24
# Override the backend URL the frontend uses (leave blank for auto-detect)
# NEXT_PUBLIC_API_URL=http://192.168.1.50:8000
+3
View File
@@ -109,6 +109,9 @@ backend/data/*
# release. Used ONLY on first-ever startup to bootstrap carrier_cache.json;
# after that the cache reflects this install's own GDELT observations.
!backend/data/carrier_seed.json
# DrishX RF model weights (MIT — see backend/third_party/drishx/NOTICE.md)
!backend/data/drishx/
!backend/data/drishx/rf_model.pickle
# OS generated files
.DS_Store
+2 -1
View File
@@ -44,7 +44,8 @@ These sources have their own terms; consult each link before redistributing.
| aisstream.io | https://aisstream.io | Free-tier API terms (attribution required) | AIS vessel positions |
| Global Fishing Watch | https://globalfishingwatch.org | CC BY 4.0 (for public data) | Fishing activity events |
| Microsoft Planetary Computer | https://planetarycomputer.microsoft.com | Sentinel-2 / ESA Copernicus terms | Sentinel-2 imagery |
| Copernicus CDSE (Sentinel Hub) | https://dataspace.copernicus.eu | ESA Copernicus open data terms | SAR + optical imagery |
| Copernicus CDSE (Sentinel Hub) | https://dataspace.copernicus.eu | ESA Copernicus open data terms | SAR + optical imagery, optional road-corridor truck trends |
| DrishX / Fisser et al. 2022 | https://github.com/sparkyniner/DRISH-X-Satellite-powered-freight-intelligence- | MIT (engine); research methodology attribution | Sentinel-2 motion-smear truck detection on major roads (opt-in) |
| Shodan | https://www.shodan.io | Operator-supplied API key, Shodan ToS | Internet device search |
| Smithsonian GVP | https://volcano.si.edu | Attribution required | Volcanoes |
| OpenAQ | https://openaq.org | CC BY 4.0 | Air quality stations |
+1 -1
View File
@@ -45,7 +45,7 @@ COPY uv.lock /workspace/uv.lock
COPY backend/pyproject.toml /workspace/backend/pyproject.toml
# Install Python dependencies using the lockfile
RUN cd /workspace/backend && uv sync --frozen --no-dev \
RUN cd /workspace/backend && uv sync --frozen --no-dev --extra road-corridor \
&& playwright install --with-deps chromium
# Copy backend source code
+3
View File
@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:72b69418aa860a0d92ccae398a08722bc85e64a992b5515dd7bf9ae9f79f2fd1
size 107194128
+2
View File
@@ -365,6 +365,7 @@ wormhole_router = _load_optional_router("routers.wormhole")
ai_intel_router = _load_optional_router("routers.ai_intel")
sar_router = _load_optional_router("routers.sar")
infonet_router = _load_optional_router("routers.infonet")
road_corridors_router = _load_optional_router("routers.road_corridors")
# ---------------------------------------------------------------------------
@@ -3641,6 +3642,7 @@ app.include_router(wormhole_router)
app.include_router(ai_intel_router)
app.include_router(sar_router)
app.include_router(infonet_router)
app.include_router(road_corridors_router)
from services.data_fetcher import update_all_data
+11
View File
@@ -39,6 +39,17 @@ dependencies = [
"yfinance==1.3.0",
]
[project.optional-dependencies]
road-corridor = [
"geopandas>=1.0.0",
"imageio>=2.34.0",
"osmnx>=2.0.0",
"rasterio>=1.4.0",
"scikit-learn>=1.5.0",
"sentinelhub>=3.10.0",
"shapely>=2.0.0",
]
[dependency-groups]
dev = ["pytest>=9.0.3", "pytest-asyncio>=1.4.0", "ruff>=0.9.0", "black>=24.0.0"]
+6 -1
View File
@@ -758,7 +758,7 @@ async def live_data_slow(
"firms_fires", "datacenters", "military_bases", "power_plants", "viirs_change_nodes",
"scanners", "weather_alerts", "ukraine_alerts", "air_quality", "volcanoes",
"fishing_activity", "psk_reporter", "correlations", "uap_sightings", "wastewater",
"crowdthreat", "threat_level", "trending_markets",
"crowdthreat", "threat_level", "trending_markets", "road_corridor_trends",
)
freshness = get_source_timestamps_snapshot()
payload = {
@@ -799,6 +799,11 @@ async def live_data_slow(
"uap_sightings": (d.get("uap_sightings") or []) if active_layers.get("uap_sightings", True) else [],
"wastewater": (d.get("wastewater") or []) if active_layers.get("wastewater", True) else [],
"crowdthreat": (d.get("crowdthreat") or []) if active_layers.get("crowdthreat", True) else [],
"road_corridor_trends": (
d.get("road_corridor_trends") or {"updated_at": None, "corridors": []}
)
if active_layers.get("road_corridor_trends", False)
else {"updated_at": None, "corridors": []},
"freshness": freshness,
}
# Issue #288: bbox filter heavy/dense layers only when all four bounds
+105
View File
@@ -0,0 +1,105 @@
"""Road corridor Sentinel-2 freight trend endpoints (opt-in slow layer)."""
from fastapi import APIRouter, HTTPException, Query, Request
from pydantic import BaseModel, Field
from limiter import limiter
from services.road_corridor_sat.config import optional_deps_available, road_corridor_sat_enabled
from services.road_corridor_sat.credentials import sentinel_credentials_configured
from services.road_corridor_sat.jobs import enqueue_analyze, get_job, get_latest_job, job_to_dict
from services.road_corridor_sat.presets import CORRIDOR_PRESETS, get_preset
from services.road_corridor_sat.storage import build_trends_payload, preset_metadata
router = APIRouter()
def _status_payload() -> dict:
latest = get_latest_job()
return {
"enabled": road_corridor_sat_enabled(),
"deps_installed": optional_deps_available(),
"credentials_configured": sentinel_credentials_configured(),
"preset_count": len(CORRIDOR_PRESETS),
"attribution": "backend/third_party/drishx/NOTICE.md",
"active_job": job_to_dict(latest) if latest and latest.status in {"queued", "running"} else None,
}
def _require_analyze_ready() -> None:
if not optional_deps_available():
raise HTTPException(
status_code=503,
detail="Install optional road-corridor dependencies (uv sync --extra road-corridor)",
)
if not sentinel_credentials_configured():
raise HTTPException(
status_code=503,
detail="Set SENTINEL_CLIENT_ID and SENTINEL_CLIENT_SECRET in Imagery settings",
)
class AnalyzeRequest(BaseModel):
lat: float = Field(ge=-90, le=90)
lon: float = Field(ge=-180, le=180)
label: str | None = Field(default=None, max_length=120)
@router.get("/api/road-corridors/status")
@limiter.limit("60/minute")
async def road_corridors_status(request: Request) -> dict:
return {"ok": True, **_status_payload()}
@router.get("/api/road-corridors")
@limiter.limit("60/minute")
async def list_road_corridors(request: Request) -> dict:
return {
"ok": True,
"status": _status_payload(),
"presets": CORRIDOR_PRESETS,
"trends": build_trends_payload(),
}
@router.post("/api/road-corridors/analyze")
@limiter.limit("6/minute")
async def analyze_road_corridor_here(request: Request, payload: AnalyzeRequest) -> dict:
"""Start an on-demand Sentinel-2 corridor analysis at map center."""
_require_analyze_ready()
try:
job = enqueue_analyze(payload.lat, payload.lon, payload.label)
except RuntimeError as exc:
if str(exc) == "analysis_already_running":
active = get_latest_job()
raise HTTPException(
status_code=409,
detail="Analysis already in progress",
headers={"X-Job-Id": active.job_id if active else ""},
) from exc
raise
return {"ok": True, **job_to_dict(job)}
@router.get("/api/road-corridors/analyze/status")
@limiter.limit("120/minute")
async def analyze_road_corridor_status(
request: Request,
job_id: str | None = Query(default=None),
) -> dict:
job = get_job(job_id) if job_id else get_latest_job()
if job is None:
return {"ok": True, "job": None}
return {"ok": True, "job": job_to_dict(job)}
@router.get("/api/road-corridors/{preset_id}")
@limiter.limit("60/minute")
async def get_road_corridor(preset_id: str, request: Request) -> dict:
meta = preset_metadata(preset_id)
if meta is None:
raise HTTPException(status_code=404, detail="Unknown corridor preset")
preset = get_preset(preset_id)
if preset is None:
# Ad-hoc viewport runs are stored on disk but not in CORRIDOR_PRESETS.
return {"ok": True, "preset": None, "result": meta, "status": _status_payload()}
return {"ok": True, "preset": preset, "result": meta, "status": _status_payload()}
+11
View File
@@ -76,6 +76,7 @@ from services.fetchers.infrastructure import ( # noqa: F401
fetch_tinygs,
fetch_psk_reporter,
)
from services.fetchers.road_corridor_sat import fetch_road_corridor_trends # noqa: F401
from services.fetchers.geo import ( # noqa: F401
fetch_ships,
fetch_airports,
@@ -1060,6 +1061,16 @@ def start_scheduler():
misfire_grace_time=600,
)
# Sentinel-2 road corridor freight trends — daily (opt-in, heavy CDSE usage)
_scheduler.add_job(
lambda: _run_task_with_health(fetch_road_corridor_trends, "fetch_road_corridor_trends"),
"interval",
hours=24,
id="road_corridor_trends",
max_instances=1,
misfire_grace_time=3600,
)
# FIMI disinformation index — every 12 hours (weekly editorial feed)
_scheduler.add_job(
lambda: _run_task_with_health(fetch_fimi, "fetch_fimi"),
+3
View File
@@ -69,6 +69,7 @@ class DashboardData(TypedDict, total=False):
sar_scenes: List[Dict[str, Any]]
sar_anomalies: List[Dict[str, Any]]
sar_aoi_coverage: List[Dict[str, Any]]
road_corridor_trends: Dict[str, Any]
# In-memory store
@@ -119,6 +120,7 @@ latest_data: DashboardData = {
"sar_scenes": [],
"sar_anomalies": [],
"sar_aoi_coverage": [],
"road_corridor_trends": {"updated_at": None, "corridors": []},
}
# Per-source freshness timestamps
@@ -328,6 +330,7 @@ active_layers: dict[str, bool] = {
"ai_intel": True,
"crowdthreat": False,
"sar": True,
"road_corridor_trends": False,
}
@@ -0,0 +1,84 @@
"""Scheduled Sentinel-2 road corridor freight trend fetcher (opt-in, slow tier)."""
from __future__ import annotations
import logging
import os
from datetime import datetime, timezone
from services.fetchers._store import _data_lock, _mark_fresh, is_any_active, latest_data
logger = logging.getLogger(__name__)
_REFRESH_HOURS = float(os.environ.get("ROAD_CORRIDOR_REFRESH_HOURS", "24"))
def _hours_since(iso_ts: str) -> float | None:
try:
dt = datetime.fromisoformat(iso_ts.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - dt).total_seconds() / 3600.0
except ValueError:
return None
def _feature_ready() -> bool:
from services.road_corridor_sat.config import optional_deps_available, road_corridor_sat_enabled
from services.road_corridor_sat.credentials import sentinel_credentials_configured
if not road_corridor_sat_enabled():
return False
if not optional_deps_available():
logger.debug("road_corridor_trends skipped — optional deps not installed")
return False
if not sentinel_credentials_configured():
logger.debug("road_corridor_trends skipped — Sentinel credentials missing")
return False
return True
def refresh_road_corridor_store() -> None:
from services.road_corridor_sat.storage import build_trends_payload
payload = build_trends_payload()
with _data_lock:
latest_data["road_corridor_trends"] = payload
_mark_fresh("road_corridor_trends")
def fetch_road_corridor_trends(force: bool = False) -> None:
"""Refresh scheduled corridor presets (default: laredo_i35 every 24h)."""
if not is_any_active("road_corridor_trends"):
return
if not _feature_ready():
return
from services.road_corridor_sat.config import SCHEDULED_PRESET_IDS
from services.road_corridor_sat.pipeline import analyze_preset
from services.road_corridor_sat.presets import get_preset
from services.road_corridor_sat.storage import load_refresh_state
state = load_refresh_state()
for preset_id in SCHEDULED_PRESET_IDS:
preset = get_preset(preset_id)
if preset is None:
logger.warning("Unknown scheduled road corridor preset: %s", preset_id)
continue
last = state.get(preset_id)
if last and not force:
age_h = _hours_since(last)
if age_h is not None and age_h < _REFRESH_HOURS:
logger.info(
"road_corridor %s fresh (%.1fh < %.1fh) — skipping",
preset_id,
age_h,
_REFRESH_HOURS,
)
continue
try:
logger.info("road_corridor analysis starting for %s", preset_id)
analyze_preset(preset_id)
except Exception as exc:
logger.exception("road_corridor analysis failed for %s: %s", preset_id, exc)
refresh_road_corridor_store()
@@ -0,0 +1,5 @@
"""Sentinel-2 road corridor freight trend analysis (DrishX engine port)."""
from .config import optional_deps_available, road_corridor_sat_enabled
__all__ = ["optional_deps_available", "road_corridor_sat_enabled"]
@@ -0,0 +1,4 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())
+53
View File
@@ -0,0 +1,53 @@
"""CLI for manual road corridor analysis runs."""
from __future__ import annotations
import argparse
import logging
import sys
from .config import optional_deps_available, road_corridor_sat_enabled
from .credentials import sentinel_credentials_configured
from .pipeline import analyze_preset
from .presets import CORRIDOR_PRESETS
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Run Sentinel-2 road corridor truck trend analysis")
parser.add_argument("--preset", required=True, help="Preset id (e.g. laredo_i35)")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
if not optional_deps_available():
print(
"Install optional deps: uv sync --extra road-corridor "
"(geopandas, osmnx, rasterio, sentinelhub, scikit-learn, imageio)",
file=sys.stderr,
)
return 2
if not road_corridor_sat_enabled() and not args.verbose:
print("Note: ROAD_CORRIDOR_SAT_ENABLED is off — CLI still runs for manual analysis.")
if not sentinel_credentials_configured():
print("Set SENTINEL_CLIENT_ID and SENTINEL_CLIENT_SECRET first.", file=sys.stderr)
return 2
valid = {p["id"] for p in CORRIDOR_PRESETS}
if args.preset not in valid:
print(f"Unknown preset {args.preset!r}. Choose from: {', '.join(sorted(valid))}", file=sys.stderr)
return 2
def progress(msg: str, pct: int | None = None) -> None:
suffix = f" ({pct}%)" if pct is not None else ""
print(f"{msg}{suffix}")
result = analyze_preset(args.preset, progress_cb=progress)
print(
f"Done: {result.get('total_detections', 0)} detections across "
f"{len(result.get('daily_counts') or [])} days — status={result.get('status')}"
)
return 0 if result.get("status") == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,41 @@
"""Configuration for Sentinel-2 road corridor trend analysis."""
from __future__ import annotations
import os
from pathlib import Path
_BACKEND_ROOT = Path(__file__).resolve().parents[2]
DATA_ROOT = Path(os.environ.get("ROAD_CORRIDOR_DATA_DIR", str(_BACKEND_ROOT / "data" / "road_corridors")))
CACHE_DIR = DATA_ROOT / "cache"
DETECTION_CROP_DIR = DATA_ROOT / "detection_crops"
STATE_PATH = DATA_ROOT / "_refresh_state.json"
DEFAULT_MONTHS = int(os.environ.get("ROAD_CORRIDOR_MONTHS", "2"))
DEFAULT_MAX_FRAMES = int(os.environ.get("ROAD_CORRIDOR_MAX_FRAMES", "6"))
SCHEDULED_PRESET_IDS = [
s.strip()
for s in os.environ.get("ROAD_CORRIDOR_SCHEDULED_PRESETS", "laredo_i35").split(",")
if s.strip()
]
def road_corridor_sat_enabled() -> bool:
return os.environ.get("ROAD_CORRIDOR_SAT_ENABLED", "").strip().lower() in {
"1",
"true",
"yes",
"on",
}
def optional_deps_available() -> bool:
try:
import geopandas # noqa: F401
import osmnx # noqa: F401
import rasterio # noqa: F401
import sentinelhub # noqa: F401
import sklearn # noqa: F401
return True
except ImportError:
return False
@@ -0,0 +1,37 @@
"""Reuse Shadowbroker Sentinel Hub / Copernicus CDSE credentials."""
from __future__ import annotations
import os
from .config import CACHE_DIR
def resolve_sentinel_credentials() -> tuple[str, str]:
client_id = (os.environ.get("SENTINEL_CLIENT_ID") or "").strip()
client_secret = (os.environ.get("SENTINEL_CLIENT_SECRET") or "").strip()
return client_id, client_secret
def sentinel_credentials_configured() -> bool:
client_id, client_secret = resolve_sentinel_credentials()
return bool(client_id and client_secret)
def build_sh_config():
from sentinelhub import SHConfig
client_id, client_secret = resolve_sentinel_credentials()
if not client_id or not client_secret:
raise RuntimeError(
"SENTINEL_CLIENT_ID and SENTINEL_CLIENT_SECRET are required for road corridor analysis"
)
config = SHConfig()
config.sh_client_id = client_id
config.sh_client_secret = client_secret
config.sh_base_url = "https://sh.dataspace.copernicus.eu"
config.sh_token_url = (
"https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token"
)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
config.cache_dir = str(CACHE_DIR / "sentinelhub")
return config
+149
View File
@@ -0,0 +1,149 @@
"""In-memory job queue for on-demand Analyze Here runs."""
from __future__ import annotations
import logging
import threading
import uuid
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
_lock = threading.Lock()
_jobs: dict[str, AnalyzeJob] = {}
@dataclass
class AnalyzeJob:
job_id: str
lat: float
lon: float
status: str = "queued"
message: str = "Queued"
progress: int = 0
result: dict[str, Any] | None = None
error: str | None = None
def get_job(job_id: str) -> AnalyzeJob | None:
with _lock:
return _jobs.get(job_id)
def get_latest_job() -> AnalyzeJob | None:
with _lock:
if not _jobs:
return None
return max(_jobs.values(), key=lambda j: j.job_id)
def _running_job() -> AnalyzeJob | None:
with _lock:
for job in _jobs.values():
if job.status in {"queued", "running"}:
return job
return None
def _prune_jobs(max_keep: int = 8) -> None:
with _lock:
if len(_jobs) <= max_keep:
return
ordered = sorted(_jobs.items(), key=lambda item: item[0], reverse=True)
for job_id, _ in ordered[max_keep:]:
_jobs.pop(job_id, None)
def _worker(job_id: str, lat: float, lon: float, label: str | None) -> None:
from services.fetchers.road_corridor_sat import refresh_road_corridor_store
from .pipeline import analyze_corridor
from .viewport import adhoc_preset_id, bbox_around_point, default_label_for_point
job = get_job(job_id)
if job is None:
return
def progress(msg: str, pct: int | None = None) -> None:
with _lock:
current = _jobs.get(job_id)
if current is None:
return
current.message = msg
if pct is not None:
current.progress = pct
with _lock:
job.status = "running"
job.message = "Starting road corridor analysis"
job.progress = 0
try:
bbox = bbox_around_point(lat, lon)
preset_id = adhoc_preset_id(lat, lon)
corridor_label = label or default_label_for_point(lat, lon)
result = analyze_corridor(
preset_id=preset_id,
label=corridor_label,
bbox=bbox,
country="adhoc",
category="viewport",
progress_cb=progress,
)
refresh_road_corridor_store()
with _lock:
current = _jobs.get(job_id)
if current is None:
return
current.status = "ok" if result.get("status") == "ok" else "error"
current.result = result
current.error = result.get("error")
current.message = (
f"{result.get('total_detections', 0)} signatures · "
f"{len(result.get('daily_counts') or [])} days"
)
current.progress = 100
except Exception as exc:
logger.exception("road corridor analyze job %s failed", job_id)
with _lock:
current = _jobs.get(job_id)
if current is None:
return
current.status = "error"
current.error = str(exc)
current.message = "Analysis failed"
current.progress = 100
def enqueue_analyze(lat: float, lon: float, label: str | None = None) -> AnalyzeJob:
running = _running_job()
if running is not None:
raise RuntimeError("analysis_already_running")
job_id = uuid.uuid4().hex[:12]
job = AnalyzeJob(job_id=job_id, lat=lat, lon=lon)
with _lock:
_jobs[job_id] = job
_prune_jobs()
thread = threading.Thread(
target=_worker,
args=(job_id, lat, lon, label),
name=f"road-corridor-analyze-{job_id}",
daemon=True,
)
thread.start()
return job
def job_to_dict(job: AnalyzeJob) -> dict[str, Any]:
return {
"job_id": job.job_id,
"lat": job.lat,
"lon": job.lon,
"status": job.status,
"message": job.message,
"progress": job.progress,
"result": job.result,
"error": job.error,
}
@@ -0,0 +1,216 @@
"""Run Sentinel-2 road-corridor truck trend analysis for a bbox preset."""
from __future__ import annotations
import logging
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Any
from .config import CACHE_DIR, DEFAULT_MAX_FRAMES, DEFAULT_MONTHS, DETECTION_CROP_DIR
from .storage import store_analysis_result
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int | None], None]
_EVALSCRIPT = """//VERSION=3
function setup() {
return {
input: ["B02", "B03", "B04", "B08", "CLM"],
output: { id: "default", bands: 5, sampleType: "FLOAT32" }
};
}
function evaluatePixel(s) {
return [s.B04, s.B03, s.B02, s.B08, s.CLM];
}"""
def _noop_progress(_msg: str, _pct: int | None = None) -> None:
return None
def analyze_corridor(
*,
preset_id: str,
label: str,
bbox: list[float],
country: str = "",
category: str = "",
months: int = DEFAULT_MONTHS,
max_frames: int = DEFAULT_MAX_FRAMES,
progress_cb: ProgressCb | None = None,
) -> dict[str, Any]:
"""Synchronously analyze one corridor bbox and persist daily truck-count trends."""
from rasterio import features as rio_features
from rasterio import transform as rio_transform
from sentinelhub import BBox, CRS, DataCollection, MimeType, SentinelHubCatalog, SentinelHubRequest
from .credentials import build_sh_config
from .s2_truck_detect import S2TruckEngine
progress = progress_cb or _noop_progress
min_lat, min_lon, max_lat, max_lon = bbox
if abs(max_lat - min_lat) > 0.5 or abs(max_lon - min_lon) > 0.5:
raise ValueError("AOI too large. Max strategic sector is ~55 km x 55 km.")
CACHE_DIR.mkdir(parents=True, exist_ok=True)
engine = S2TruckEngine(
cache_dir=str(CACHE_DIR),
detection_dir=str(DETECTION_CROP_DIR),
)
config = build_sh_config()
progress(f"Road discovery for {label}", 10)
roads = engine.fetch_roads(bbox)
if roads.empty:
return store_analysis_result(
preset_id,
label=label,
bbox=bbox,
country=country,
category=category,
road_count=0,
frame_count=0,
detections=[],
status="error",
error="No major roads found in AOI.",
)
progress(f"Found {len(roads)} road segments — querying Copernicus catalog", 25)
sh_bbox = BBox(bbox=[min_lon, min_lat, max_lon, max_lat], crs=CRS.WGS84)
catalog = SentinelHubCatalog(config=config)
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=max(1, months) * 30)
cdse_collection = DataCollection.SENTINEL2_L2A.define_from(
"s2l2a",
service_url=config.sh_base_url,
)
search_results = list(
catalog.search(
cdse_collection,
bbox=sh_bbox,
datetime=(
f"{start_date.strftime('%Y-%m-%dT00:00:00Z')}/"
f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}"
),
filter="eo:cloud_cover < 60",
fields={"include": ["properties.datetime", "id"], "exclude": []},
)
)
unique_scenes: dict[str, Any] = {}
for res in search_results:
date_key = res["properties"]["datetime"][:10]
if date_key not in unique_scenes:
unique_scenes[date_key] = res
final_obs = [unique_scenes[d] for d in sorted(unique_scenes.keys(), reverse=True)]
final_obs = final_obs[: max(1, max_frames)]
if not final_obs:
return store_analysis_result(
preset_id,
label=label,
bbox=bbox,
country=country,
category=category,
road_count=len(roads),
frame_count=0,
detections=[],
status="error",
error=f"No clear imagery found in the last {months} months.",
)
def _fetch_frame(idx: int, res_obs: dict[str, Any]):
try:
date_str = res_obs["properties"]["datetime"]
req_sh = SentinelHubRequest(
evalscript=_EVALSCRIPT,
input_data=[
SentinelHubRequest.input_data(
data_collection=cdse_collection,
time_interval=(date_str, date_str),
)
],
responses=[SentinelHubRequest.output_response("default", MimeType.TIFF)],
bbox=sh_bbox,
config=config,
)
data_list = req_sh.get_data()
if not data_list:
return idx, date_str, None
return idx, date_str, data_list[0]
except Exception as exc:
logger.error("Sentinel frame %s failed: %s", idx, exc)
return idx, None, None
progress(f"Seed frame 1/{len(final_obs)}", 35)
_, seed_ts, seed_data = _fetch_frame(0, final_obs[0])
if seed_data is None:
return store_analysis_result(
preset_id,
label=label,
bbox=bbox,
country=country,
category=category,
road_count=len(roads),
frame_count=0,
detections=[],
status="error",
error="Failed to acquire seed spectral data.",
)
roads_buf = roads.to_crs(epsg=3857).buffer(20).to_crs(epsg=4326)
h, w = seed_data.shape[:2]
trans = rio_transform.from_bounds(min_lon, min_lat, max_lon, max_lat, w, h)
road_mask = rio_features.rasterize(
[(geom.__geo_interface__, 1) for geom in roads_buf.geometry],
out_shape=(h, w),
transform=trans,
fill=0,
all_touched=True,
)
detections: list[dict[str, Any]] = []
detections.extend(engine.detect_trucks(seed_data, bbox, final_obs[0]["properties"]["datetime"], road_mask))
if len(final_obs) > 1:
progress(f"Parallel frames ({len(final_obs) - 1} remaining)", 45)
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="road-corridor") as executor:
futures = {
executor.submit(_fetch_frame, i, final_obs[i]): i for i in range(1, len(final_obs))
}
done = 1
for future in as_completed(futures):
idx, date_str, frame_data = future.result()
done += 1
if frame_data is not None and date_str:
detections.extend(engine.detect_trucks(frame_data, bbox, date_str, road_mask))
progress(f"Frame {done}/{len(final_obs)}", 45 + int((done / len(final_obs)) * 50))
progress(f"Complete — {len(detections)} truck signatures", 100)
return store_analysis_result(
preset_id,
label=label,
bbox=bbox,
country=country,
category=category,
road_count=len(roads),
frame_count=len(final_obs),
detections=detections,
status="ok",
)
def analyze_preset(preset_id: str, progress_cb: ProgressCb | None = None) -> dict[str, Any]:
from .presets import get_preset
preset = get_preset(preset_id)
if preset is None:
raise KeyError(f"Unknown preset: {preset_id}")
return analyze_corridor(
preset_id=preset["id"],
label=preset["label"],
bbox=preset["bbox"],
country=preset["country"],
category=preset["category"],
progress_cb=progress_cb,
)
@@ -0,0 +1,59 @@
"""Preset freight / chokepoint corridors for scheduled trend analysis."""
from __future__ import annotations
from typing import TypedDict
class CorridorPreset(TypedDict):
id: str
label: str
bbox: list[float] # [min_lat, min_lon, max_lat, max_lon]
country: str
category: str
# Bboxes are small (~510 km) highway segments suitable for 10 m Sentinel-2 analysis.
CORRIDOR_PRESETS: list[CorridorPreset] = [
{
"id": "laredo_i35",
"label": "Laredo I-35 (USMexico freight)",
"bbox": [27.48, -99.58, 27.54, -99.48],
"country": "USA / Mexico",
"category": "border_crossing",
},
{
"id": "bandar_abbas_feeder",
"label": "Bandar Abbas port feeder (Highway 71)",
"bbox": [27.12, 56.22, 27.22, 56.38],
"country": "Iran",
"category": "port_feeder",
},
{
"id": "rotterdam_a15",
"label": "Rotterdam A15 port feeder",
"bbox": [51.88, 4.42, 51.96, 4.58],
"country": "Netherlands",
"category": "port_feeder",
},
{
"id": "mombasa_nairobi_a109",
"label": "MombasaNairobi A109 corridor",
"bbox": [-4.10, 39.55, -1.20, 37.00],
"country": "Kenya",
"category": "trade_corridor",
},
{
"id": "braunschweig_a7",
"label": "Braunschweig A7 (validation)",
"bbox": [52.25, 10.45, 52.32, 10.55],
"country": "Germany",
"category": "validation",
},
]
def get_preset(preset_id: str) -> CorridorPreset | None:
for preset in CORRIDOR_PRESETS:
if preset["id"] == preset_id:
return preset
return None
@@ -0,0 +1,731 @@
"""S2 truck motion detection core (DrishX / Fisser et al. 2022 — see third_party/drishx/NOTICE.md)."""
from __future__ import annotations
import logging
import os
import pickle
import time
from pathlib import Path
import imageio.v3 as imageio
import numpy as np
import requests
from requests.adapters import HTTPAdapter
from shapely.geometry import LineString
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
SECONDS_OFFSET_B02_B04 = 1.01
OVERPASS_MIRRORS = [
"https://lz4.overpass-api.de/api/interpreter",
"https://z.overpass-api.de/api/interpreter",
"https://overpass.osm.ch/api/interpreter",
"https://overpass-api.de/api/interpreter",
]
_session = requests.Session()
_retry = Retry(
total=2,
backoff_factor=1.0,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
_adapter = HTTPAdapter(max_retries=_retry)
_session.mount("http://", _adapter)
_session.mount("https://", _adapter)
def _configure_osmnx(data_dir: str) -> None:
import osmnx as ox
ox.settings.requests_session = _session
ox.settings.requests_timeout = 30
ox.settings.overpass_rate_limit = False
ox.settings.max_query_area_size = 1_000_000_000_000
ox.settings.log_console = False
ox.settings.use_cache = True
ox.settings.cache_folder = os.path.join(data_dir, "osm_cache")
def _default_rf_model_path() -> str:
return str(Path(__file__).resolve().parents[2] / "data" / "drishx" / "rf_model.pickle")
# ─────────────────────────────────────────────────────────────────────────────
# Helper math — mirrors S2TD.array_utils.math
# ─────────────────────────────────────────────────────────────────────────────
def normalized_ratio(a, b):
"""(a - b) / (a + b), safe division."""
denom = a + b
with np.errstate(divide="ignore", invalid="ignore"):
result = np.where(denom != 0, (a - b) / denom, 0.0)
return result.astype(np.float32)
def rescale_s2(bands):
"""Rescale Sentinel-2 L2A reflectance values (typically 010000 int) to 01 float."""
bands = bands.astype(np.float32)
if np.nanmax(bands) > 10: # likely DN scale
bands /= 10000.0
return bands
# ─────────────────────────────────────────────────────────────────────────────
# Array subset — exact replica of S2TD.pick_arr_subset
# ─────────────────────────────────────────────────────────────────────────────
def pick_arr_subset(arr, y, x, size):
"""Pick a size×size window centred on (y, x) from a 2D or 3D array."""
size_low = size // 2
size_up = size // 2
if size_low + size_up < size:
size_up += 1
ymin = max(0, y - size_low)
ymax = max(0, y + size_up)
xmin = max(0, x - size_low)
xmax = max(0, x + size_up)
if arr.ndim == 2:
return arr[ymin:ymax, xmin:xmax]
elif arr.ndim == 3:
return arr[:, ymin:ymax, xmin:xmax]
return arr
# ─────────────────────────────────────────────────────────────────────────────
# Feature stack — exact 7 features as in S2TD._build_feature_stack (Table 1)
# ─────────────────────────────────────────────────────────────────────────────
def build_feature_stack(data):
"""
Build the 7-feature stack from Sentinel-2 bands.
Input `data` shape: (H, W, 5) with channels [B04(R), B03(G), B02(B), B08(NIR), CLM].
Feature order (Table 1, Fisser et al. 2022):
0: variance of (B04, B03, B02)
1: normalized_ratio(B04, B02) — red / blue
2: normalized_ratio(B03, B02) — green / blue
3: B04 - mean(B04)
4: B03 - mean(B03)
5: B02 - mean(B02)
6: B08 - mean(B08)
"""
R = data[:, :, 0].astype(np.float32) # B04
G = data[:, :, 1].astype(np.float32) # B03
B = data[:, :, 2].astype(np.float32) # B02
NIR = data[:, :, 3].astype(np.float32) # B08
CLM = data[:, :, 4]
# Rescale if needed
bands = np.stack([R, G, B, NIR], axis=0)
bands = rescale_s2(bands)
R, G, B, NIR = bands[0], bands[1], bands[2], bands[3]
# Cloud mask → NaN
cloud = CLM > 0
R[cloud] = np.nan
G[cloud] = np.nan
B[cloud] = np.nan
NIR[cloud] = np.nan
H, W = R.shape
fs = np.zeros((7, H, W), dtype=np.float32)
# Check for any valid data to avoid "Mean of empty slice" warnings
if np.any(~cloud):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
# Feature 0: variance of visible bands
fs[0] = np.nanvar(np.stack([R, G, B], axis=0), axis=0, ddof=0)
# Features 12: normalized ratios
fs[1] = normalized_ratio(R, B)
fs[2] = normalized_ratio(G, B)
# Features 36: mean-centered bands
fs[3] = R - np.nanmean(R)
fs[4] = G - np.nanmean(G)
fs[5] = B - np.nanmean(B)
fs[6] = NIR - np.nanmean(NIR)
else:
# All pixels are cloud-masked
fs.fill(np.nan)
# Ensure NaN consistency
nan_mask = np.isnan(fs[3])
fs[:, nan_mask] = np.nan
return {
"feature_stack": fs,
"bands": {"R": R, "G": G, "B": B, "NIR": NIR},
"cloud_mask": cloud,
}
# ─────────────────────────────────────────────────────────────────────────────
# RF Model loading
# ─────────────────────────────────────────────────────────────────────────────
# Path to the trained Random Forest model from S2TruckDetect
RF_MODEL_PATH = _default_rf_model_path()
_rf_model = None
def load_rf_model(path=None):
"""Load the trained RF model from pickle. Returns None if not found."""
global _rf_model
p = path or RF_MODEL_PATH
if _rf_model is not None:
return _rf_model
if os.path.isfile(p):
try:
_rf_model = pickle.load(open(p, "rb"))
logger.info(f"Loaded trained RF model from {p}")
return _rf_model
except Exception as e:
logger.error(f"Failed to load RF model from {p}: {e}")
else:
logger.warning(f"RF model not found at {p} — will use proxy classifier (lower accuracy)")
return None
# ─────────────────────────────────────────────────────────────────────────────
# Classification — real RF (preferred) or proxy fallback
# ─────────────────────────────────────────────────────────────────────────────
def rf_classify(feature_stack, road_mask, rf_model):
"""
Classify pixels using the trained Random Forest model.
Exact replica of S2TD._predict + _postprocess_prediction.
:param feature_stack: (7, H, W) feature array
:param road_mask: (H, W) binary road mask
:param rf_model: trained sklearn RandomForestClassifier
:return: (probabilities (4, H, W), prediction (H, W) int8)
"""
H, W = feature_stack.shape[1], feature_stack.shape[2]
# Reshape to (n_pixels, 7) for sklearn
vars_reshaped = []
for band_idx in range(feature_stack.shape[0]):
vars_reshaped.append(feature_stack[band_idx].flatten())
vars_reshaped = np.array(vars_reshaped).swapaxes(0, 1) # (n_pixels, 7)
# Build NaN mask — exclude NaN and Inf pixels
nan_mask_flat = np.zeros_like(vars_reshaped)
for var_idx in range(vars_reshaped.shape[1]):
nan_mask_flat[:, var_idx] = ~np.isnan(vars_reshaped[:, var_idx])
not_nan = (np.nanmin(nan_mask_flat, axis=1).astype(bool)
& np.min(np.isfinite(vars_reshaped), axis=1).astype(bool))
# Run RF predict_proba on valid pixels only
if not np.any(not_nan):
# Graceful return if no valid pixels found (e.g., all cloud masked)
probabilities_shaped = np.zeros((4, H, W), dtype=np.float32)
classification = np.zeros((H, W), dtype=np.int8)
return probabilities_shaped, classification
predictions_flat = rf_model.predict_proba(vars_reshaped[not_nan])
# Map probabilities back to spatial grid
n_classes = predictions_flat.shape[1]
probabilities_shaped = np.zeros((n_classes, H * W), dtype=np.float32)
for idx in range(n_classes):
probabilities_shaped[idx, not_nan] = predictions_flat[:, idx]
probabilities_shaped = probabilities_shaped.reshape((n_classes, H, W))
# Zero out NaN positions
nan_2d = np.isnan(feature_stack[0])
probabilities_shaped[:, nan_2d] = 0
# Post-process: suppress low-confidence background (exact S2TD logic)
probabilities_shaped[1][probabilities_shaped[1] < 0.75] = 0
classification = np.nanargmax(probabilities_shaped, axis=0).astype(np.int8) + 1
classification[np.max(probabilities_shaped, axis=0) == 0] = 0
classification[nan_2d] = 0
# Apply road mask
rm = road_mask.astype(bool)
classification[~rm] = 0
return probabilities_shaped, classification
def proxy_classify(feature_stack, road_mask):
"""
Heuristic proxy when RF model is unavailable. Lower accuracy.
Produces:
probabilities: (4, H, W) — class probs for [background, blue, green, red]
prediction: (H, W) — int8 labels {0=nan, 1=background, 2=blue, 3=green, 4=red}
"""
fs = feature_stack # (7, H, W)
H, W = fs.shape[1], fs.shape[2]
probs = np.zeros((4, H, W), dtype=np.float32)
centered_R = fs[3]
centered_G = fs[4]
centered_B = fs[5]
var_feat = fs[0]
nratio_rb = fs[1]
nratio_gb = fs[2]
rm = road_mask.astype(bool)
nan_mask = np.isnan(centered_R)
blue_score = np.clip(-nratio_rb * 2 + centered_B * 5 + var_feat * 10, 0, None)
blue_score[~rm | nan_mask] = 0
green_score = np.clip(nratio_gb * 2 + centered_G * 5 + var_feat * 10, 0, None)
green_score[~rm | nan_mask] = 0
red_score = np.clip(nratio_rb * 2 + centered_R * 5 + var_feat * 10, 0, None)
red_score[~rm | nan_mask] = 0
total = blue_score + green_score + red_score + 1e-8
probs[1] = blue_score / total
probs[2] = green_score / total
probs[3] = red_score / total
probs[0] = 1.0 - np.max(probs[1:], axis=0)
probs[0][probs[0] < 0.75] = 0
classification = np.nanargmax(probs, axis=0).astype(np.int8) + 1
classification[np.max(probs, axis=0) == 0] = 0
classification[nan_mask] = 0
classification[~rm] = 0
return probs, classification
def classify(feature_stack, road_mask, rf_model=None):
"""
Unified classifier entry point.
Uses trained RF if model is provided, otherwise falls back to proxy.
"""
if rf_model is not None:
logger.debug("Using trained RF model for classification")
return rf_classify(feature_stack, road_mask, rf_model)
else:
logger.debug("Using proxy classifier (no RF model loaded)")
return proxy_classify(feature_stack, road_mask)
# ─────────────────────────────────────────────────────────────────────────────
# Object extraction — faithful port of S2TD ObjectExtractor
# ─────────────────────────────────────────────────────────────────────────────
class ObjectExtractor:
"""
Extracts truck objects from the RF prediction raster using recursive
neighbourhood clustering, matching the S2TD reference implementation.
"""
def __init__(self, probabilities, lat_arr, lon_arr):
"""
:param probabilities: (4, H, W) class probabilities
:param lat_arr: 1-D array of latitude per row
:param lon_arr: 1-D array of longitude per column
"""
self.probabilities = probabilities
self.lat = lat_arr
self.lon = lon_arr
def extract(self, predictions_arr):
"""Main extraction loop over all blue (class 2) seed pixels."""
preds = predictions_arr.copy()
probs = self.probabilities.copy()
preds[preds == 1] = 0 # zero out background
blue_ys, blue_xs = np.where(preds == 2)
detections = []
sub_size = 9
for i in range(len(blue_ys)):
y_blue, x_blue = int(blue_ys[i]), int(blue_xs[i])
if preds[y_blue, x_blue] == 0:
continue
subset_9 = pick_arr_subset(preds, y_blue, x_blue, sub_size).copy()
subset_3 = pick_arr_subset(preds, y_blue, x_blue, 3).copy()
subset_9_probs = pick_arr_subset(probs, y_blue, x_blue, sub_size).copy()
half_idx_y = y_blue if subset_9.shape[0] < sub_size else subset_9.shape[0] // 2
half_idx_x = x_blue if subset_9.shape[1] < sub_size else subset_9.shape[1] // 2
try:
current_value = subset_9[half_idx_y, half_idx_x]
except IndexError:
half_idx_y, half_idx_x = sub_size // 2, sub_size // 2
current_value = subset_9[half_idx_y, half_idx_x]
new_value = 100
if not all(v in subset_9 for v in [2, 3, 4]):
continue
cluster, seen_idx, seen_vals, _ = self._cluster_array(
arr=subset_9, probs=subset_9_probs,
point=[half_idx_y, half_idx_x],
new_value=new_value, current_value=current_value,
yet_seen_indices=[], yet_seen_values=[],
skipped_one=False,
)
if np.count_nonzero(cluster == new_value) < 3:
continue
det = self._postprocess_cluster(
cluster, preds, probs, subset_3,
y_blue, x_blue,
half_idx_y, half_idx_x,
new_value,
)
if det is not None:
preds = det["updated_preds"]
detections.append(det["detection"])
return detections
def _cluster_array(self, arr, probs, point, new_value, current_value,
yet_seen_indices, yet_seen_values, skipped_one):
"""Recursive neighbourhood clustering — matches S2TD._cluster_array."""
if len(yet_seen_indices) == 0:
yet_seen_indices.append(point)
yet_seen_values.append(current_value)
arr_mod = arr.copy()
arr_mod[point[0], point[1]] = 0
window_3x3 = pick_arr_subset(arr_mod, point[0], point[1], 3).copy()
if window_3x3.shape[0] >= 2 and window_3x3.shape[1] >= 2:
cy = min(1, window_3x3.shape[0] - 1)
cx = min(1, window_3x3.shape[1] - 1)
if window_3x3[cy, cx] == 2:
window_3x3[window_3x3 == 4] = 1 # eliminate reds near blue
y, x = point[0], point[1]
window_3x3_probs = pick_arr_subset(probs, y, x, 3)
windows = [window_3x3]
windows_probs = [window_3x3_probs]
if current_value == 4 or skipped_one:
windows = windows[0:1]
ys, xs = np.array([], dtype=int), np.array([], dtype=int)
window_idx = 0
offset_y, offset_x = 0, 0
while len(ys) == 0 and window_idx < len(windows):
window = windows[window_idx]
window_p = windows_probs[window_idx]
offset_y = window.shape[0] // 2
offset_x = window.shape[1] // 2
go_next = (current_value + 1) in window or current_value == 2
target_value = current_value + 1 if go_next else current_value
match = window == target_value
if np.count_nonzero(match) == 0:
target_value = current_value
match = window == target_value
ys_found, xs_found = np.where(match)
# Probability-based tie-breaking
if len(ys_found) > 1 and window_p.ndim == 3 and window_p.shape[0] > (target_value - 1):
wp_target = window_p[target_value - 1] * match
max_prob_mask = (wp_target == np.max(wp_target))
ys_found, xs_found = np.where(max_prob_mask)
ys, xs = ys_found, xs_found
window_idx += 1
ymin_w = max(0, point[0] - offset_y)
xmin_w = max(0, point[1] - offset_x)
for y_local, x_local in zip(ys, xs):
ny, nx = ymin_w + int(y_local), xmin_w + int(x_local)
if [ny, nx] in yet_seen_indices:
continue
if ny < 0 or ny >= arr.shape[0] or nx < 0 or nx >= arr.shape[1]:
continue
try:
cv = arr[ny, nx]
except IndexError:
continue
# Red already seen but this is green or blue → skip
if 4 in yet_seen_values and cv <= 3:
continue
arr_mod[ny, nx] = new_value
yet_seen_indices.append([ny, nx])
yet_seen_values.append(cv)
# Guard: avoid picking many more reds than blues and greens
n_blue = sum(1 for v in yet_seen_values if v == 2)
n_green = sum(1 for v in yet_seen_values if v == 3)
n_red = sum(1 for v in yet_seen_values if v == 4)
if n_red > n_blue and n_red > n_green:
break
arr_mod, yet_seen_indices, yet_seen_values, skipped_one = self._cluster_array(
arr_mod, probs, [ny, nx], new_value, cv,
yet_seen_indices, yet_seen_values, skipped_one,
)
arr_mod[point[0], point[1]] = new_value
return arr_mod, yet_seen_indices, yet_seen_values, skipped_one
def _postprocess_cluster(self, cluster, preds_copy, probs, subset_3,
y_blue, x_blue, half_idx_y, half_idx_x,
new_value):
"""Validate cluster and produce a detection dict — mirrors S2TD._postprocess_cluster."""
# Add neighbouring blues from the 3×3 window
ys_ba, xs_ba = np.where(subset_3 == 2)
ys_ba = ys_ba + half_idx_y - 1
xs_ba = xs_ba + half_idx_x - 1
for yb, xb in zip(ys_ba, xs_ba):
yb_c = int(np.clip(yb, 0, cluster.shape[0] - 1))
xb_c = int(np.clip(xb, 0, cluster.shape[1] - 1))
cluster[yb_c, xb_c] = new_value
cluster[cluster != new_value] = 0
cys, cxs = np.where(cluster == new_value)
if len(cys) == 0:
return None
# Map subset coords back to full array
ymin_sub = int(np.clip(y_blue - half_idx_y, 0, np.inf))
xmin_sub = int(np.clip(x_blue - half_idx_x, 0, np.inf))
cys_full = cys + ymin_sub
cxs_full = cxs + xmin_sub
ymin = int(np.min(cys_full))
xmin = int(np.min(cxs_full))
ymax = int(np.max(cys_full)) + 1 # +1: box extends to upper bound of pixel
xmax = int(np.max(cxs_full)) + 1
H, W = preds_copy.shape
ymin, ymax = max(0, ymin), min(H, ymax)
xmin, xmax = max(0, xmin), min(W, xmax)
box_preds = preds_copy[ymin:ymax, xmin:xmax].copy()
box_probs = probs[1:, ymin:ymax, xmin:xmax].copy() # classes 2,3,4 → indices 0,1,2
# Spectral probability scores (exact S2TD logic)
max_probs = []
for cls_offset, cls_val in enumerate([2, 3, 4]):
mask = (box_preds == cls_val)
vals = box_probs[cls_offset] * mask
mp = float(np.nanmax(vals)) if np.any(mask) else 0.0
max_probs.append(mp)
mean_max_spectral_probability = float(np.nanmean(max_probs))
mean_spectral_probability = float(np.nanmean(np.nanmax(box_probs, axis=0)))
# Validation checks
all_given = all(v in box_preds for v in [2, 3, 4])
large_enough = box_preds.shape[0] > 2 or box_preds.shape[1] > 2
too_large = box_preds.shape[0] > 5 or box_preds.shape[1] > 5
if too_large or not all_given or not large_enough:
return None
# Score: TWO terms — matches reference
score = mean_max_spectral_probability + mean_spectral_probability
if score <= 1.2:
return None
# Direction (blue → red vector)
by, bx = np.where(box_preds == 2)
ry, rx = np.where(box_preds == 4)
blue_idx = np.array([by[0], bx[0]], dtype=np.int8)
red_idx = np.array([ry[0], rx[0]], dtype=np.int8)
vector = (blue_idx - red_idx) * np.array([1, -1], dtype=np.int8)
heading = float(np.degrees(np.arctan2(vector[1], vector[0])) % 360)
# Speed
diameter = max(box_preds.shape) * 10 - 10
speed_kmh = float(np.sqrt(diameter * 20) / SECONDS_OFFSET_B02_B04 * 3.6)
# Geo-coordinates (centre of detection box)
lat_centre = float((self.lat[ymin] + self.lat[min(ymax, len(self.lat) - 1)]) / 2)
lon_centre = float((self.lon[xmin] + self.lon[min(xmax, len(self.lon) - 1)]) / 2)
# Zero out detected pixels to prevent re-detection
preds_copy[ymin:ymax, xmin:xmax] *= np.zeros_like(box_preds)
# Also zero 3×3 around blue pixels
blue_in_box = np.where(box_preds == 2)
for yb, xb in zip(blue_in_box[0], blue_in_box[1]):
y0, y1 = max(0, ymin + yb - 1), min(H, ymin + yb + 2)
x0, x1 = max(0, xmin + xb - 1), min(W, xmin + xb + 2)
preds_copy[y0:y1, x0:x1] *= (preds_copy[y0:y1, x0:x1] != 2).astype(np.int8)
crop_id = f"truck_{int(time.time() * 1000)}_{ymin}_{xmin}.png"
return {
"updated_preds": preds_copy,
"detection": {
"lat": lat_centre,
"lon": lon_centre,
"confidence": float(min(score / 2.4, 1.0)),
"s_score": round(score, 3),
"speed_kmh": round(speed_kmh, 1),
"heading": round(heading, 1),
"heading_desc": self._direction_to_compass(heading),
"id": crop_id,
"image_url": f"/detections/{crop_id}",
"box_shape": list(box_preds.shape),
"max_probs": {"blue": max_probs[0], "green": max_probs[1], "red": max_probs[2]},
},
}
@staticmethod
def _direction_to_compass(deg):
bins = np.arange(0, 359, 45, dtype=np.float32)
labels = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
return labels[int(np.argmin(np.abs(bins - deg)))]
# ─────────────────────────────────────────────────────────────────────────────
# ARGUS Engine
# ─────────────────────────────────────────────────────────────────────────────
class S2TruckEngine:
def __init__(self, *, cache_dir: str, detection_dir: str, rf_model_path: str | None = None):
self.cache_dir = cache_dir
self.detection_dir = detection_dir
os.makedirs(self.detection_dir, exist_ok=True)
_configure_osmnx(cache_dir)
self.rf_model = load_rf_model(rf_model_path)
def fetch_roads(self, bbox_coords, progress_cb=None):
"""Fetch major roads with automatic mirror rotation and fallbacks."""
import geopandas as gpd
import osmnx as ox
def log(msg, level="info", pct=None):
if level == "info":
logger.info(msg)
elif level == "warn":
logger.warning(msg)
if progress_cb:
progress_cb(msg, pct)
min_lat, min_lon, max_lat, max_lon = bbox_coords
center_lat = (min_lat + max_lat) / 2
center_lon = (min_lon + max_lon) / 2
lat_span = (max_lat - min_lat) * 111000
lon_span = (max_lon - min_lon) * 111000 * np.cos(np.radians(center_lat))
dist_m = int(max(lat_span, lon_span) * 0.6) + 1000
log(f"Starting road discovery (ROI: {center_lat:.4f}, {center_lon:.4f})", pct=5)
for i, mirror in enumerate(OVERPASS_MIRRORS):
log(f"Trying mirror {i+1}/{len(OVERPASS_MIRRORS)}: {mirror}", pct=10 + i * 5)
ox.settings.overpass_url = mirror
try:
graph = ox.graph_from_point(
(center_lat, center_lon), dist=dist_m,
network_type="drive", simplify=True,
retain_all=False, truncate_by_edge=True,
)
roads = ox.graph_to_gdfs(graph, nodes=False)
major_types = [
"motorway", "trunk", "primary", "secondary",
"motorway_link", "trunk_link", "primary_link",
]
roads = roads[roads["highway"].isin(major_types)].copy()
if not roads.empty:
logger.info(f"Fetched {len(roads)} major roads from {mirror}")
return roads
except Exception as e:
logger.warning(f"Mirror {mirror} failed: {e}")
time.sleep(1)
# Raw Overpass fallback
logger.warning("All mirrors failed. Trying raw Overpass query.")
try:
query = f"""
[out:json][timeout:60];
(way["highway"~"motorway|trunk|primary"]({min_lat},{min_lon},{max_lat},{max_lon}););
out body; >; out skel qt;
"""
resp = requests.post(OVERPASS_MIRRORS[0], data={"data": query}, timeout=60)
if resp.status_code == 200:
data = resp.json()
nodes = {n["id"]: (n["lon"], n["lat"]) for n in data["elements"] if n["type"] == "node"}
ways = []
for w in data["elements"]:
if w["type"] == "way" and "nodes" in w:
coords = [nodes[nid] for nid in w["nodes"] if nid in nodes]
if len(coords) > 1:
ways.append({"geometry": LineString(coords), "highway": w["tags"].get("highway")})
if ways:
roads = gpd.GeoDataFrame(ways, crs="EPSG:4326")
logger.info(f"Raw fallback: {len(roads)} roads")
return roads
except Exception as e:
logger.error(f"Raw fallback failed: {e}")
return gpd.GeoDataFrame()
def detect_trucks(self, data, bbox_coords, timestamp, road_mask):
"""
Detect trucks using corrected Fisser et al. methodology.
:param data: (H, W, 5) array — [B04, B03, B02, B08, CLM]
:param bbox_coords: [min_lat, min_lon, max_lat, max_lon]
:param timestamp: str ISO timestamp
:param road_mask: (H, W) binary mask of road pixels
:return: list of detection dicts
"""
min_lat, min_lon, max_lat, max_lon = bbox_coords
H, W = data.shape[:2]
# 1. Build feature stack (corrected order)
feat = build_feature_stack(data)
feature_stack = feat["feature_stack"]
# 2. Classify (real RF if loaded, proxy fallback otherwise)
probs, prediction = classify(feature_stack, road_mask, self.rf_model)
# 3. Lat/lon arrays for geo-referencing
lat_arr = np.linspace(max_lat, min_lat, H) # top to bottom
lon_arr = np.linspace(min_lon, max_lon, W) # left to right
# 4. Object extraction (corrected)
extractor = ObjectExtractor(probs, lat_arr, lon_arr)
detections = extractor.extract(prediction)
# 5. Add timestamp and save crops
for det in detections:
det["timestamp"] = timestamp
try:
self._save_crop(data, det, H, W, min_lat, min_lon, max_lat, max_lon)
except Exception as e:
logger.warning(f"Could not save crop for {det['id']}: {e}")
return detections
def _save_crop(self, data, det, H, W, min_lat, min_lon, max_lat, max_lon):
"""Save a 20×20 RGB crop centred on the detection."""
cy = int((max_lat - det["lat"]) / (max_lat - min_lat + 1e-9) * H)
cx = int((det["lon"] - min_lon) / (max_lon - min_lon + 1e-9) * W)
cy, cx = int(np.clip(cy, 0, H - 1)), int(np.clip(cx, 0, W - 1))
y0, y1 = max(0, cy - 10), min(H, cy + 10)
x0, x1 = max(0, cx - 10), min(W, cx + 10)
rgb = data[y0:y1, x0:x1, :3].astype(np.float32)
rgb = rescale_s2(rgb)
rgb = (np.clip(rgb, 0, 0.3) / 0.3 * 255).astype(np.uint8)
path = os.path.join(self.detection_dir, det["id"])
imageio.imwrite(path, rgb)
@@ -0,0 +1,145 @@
"""Disk persistence for road corridor trend runs."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .config import DATA_ROOT, STATE_PATH
from .presets import CORRIDOR_PRESETS, get_preset
logger = logging.getLogger(__name__)
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def preset_result_path(preset_id: str) -> Path:
return DATA_ROOT / f"{preset_id}.json"
def load_preset_result(preset_id: str) -> dict[str, Any] | None:
path = preset_result_path(preset_id)
if not path.is_file():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
logger.warning("Could not read road corridor result %s: %s", path, exc)
return None
def save_preset_result(preset_id: str, payload: dict[str, Any]) -> None:
DATA_ROOT.mkdir(parents=True, exist_ok=True)
path = preset_result_path(preset_id)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def load_refresh_state() -> dict[str, str]:
if not STATE_PATH.is_file():
return {}
try:
raw = json.loads(STATE_PATH.read_text(encoding="utf-8"))
return {str(k): str(v) for k, v in raw.items()}
except (OSError, json.JSONDecodeError):
return {}
def save_refresh_state(state: dict[str, str]) -> None:
DATA_ROOT.mkdir(parents=True, exist_ok=True)
STATE_PATH.write_text(json.dumps(state, indent=2), encoding="utf-8")
def mark_preset_refreshed(preset_id: str) -> None:
state = load_refresh_state()
state[preset_id] = _utc_now_iso()
save_refresh_state(state)
def list_corridor_summaries() -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for preset in CORRIDOR_PRESETS:
stored = load_preset_result(preset["id"])
if stored:
summaries.append(stored)
continue
summaries.append(
{
"preset_id": preset["id"],
"label": preset["label"],
"bbox": preset["bbox"],
"country": preset["country"],
"category": preset["category"],
"status": "never_run",
"daily_counts": [],
"total_detections": 0,
}
)
return summaries
def build_trends_payload() -> dict[str, Any]:
return {
"updated_at": _utc_now_iso(),
"corridors": list_corridor_summaries(),
}
def store_analysis_result(
preset_id: str,
*,
label: str,
bbox: list[float],
country: str,
category: str,
road_count: int,
frame_count: int,
detections: list[dict[str, Any]],
status: str = "ok",
error: str | None = None,
) -> dict[str, Any]:
daily: dict[str, int] = {}
for det in detections:
ts = str(det.get("timestamp", ""))[:10]
if ts:
daily[ts] = daily.get(ts, 0) + 1
daily_counts = [{"date": d, "count": daily[d]} for d in sorted(daily.keys())]
payload = {
"preset_id": preset_id,
"label": label,
"bbox": bbox,
"country": country,
"category": category,
"updated_at": _utc_now_iso(),
"road_count": road_count,
"frame_count": frame_count,
"total_detections": len(detections),
"daily_counts": daily_counts,
"status": status,
"error": error,
}
save_preset_result(preset_id, payload)
mark_preset_refreshed(preset_id)
return payload
def preset_metadata(preset_id: str) -> dict[str, Any] | None:
preset = get_preset(preset_id)
if preset is None:
return None
stored = load_preset_result(preset_id)
if stored:
return stored
return {
"preset_id": preset["id"],
"label": preset["label"],
"bbox": preset["bbox"],
"country": preset["country"],
"category": preset["category"],
"status": "never_run",
"daily_counts": [],
"total_detections": 0,
}
@@ -0,0 +1,19 @@
"""Map-viewport helpers for on-demand corridor analysis."""
from __future__ import annotations
import hashlib
def bbox_around_point(lat: float, lon: float, *, half_span_deg: float = 0.04) -> list[float]:
"""Square AOI around a map center (~45 km half-span, under the 0.5° engine cap)."""
span = min(max(half_span_deg, 0.02), 0.24)
return [lat - span, lon - span, lat + span, lon + span]
def adhoc_preset_id(lat: float, lon: float) -> str:
digest = hashlib.sha256(f"{lat:.4f},{lon:.4f}".encode()).hexdigest()[:12]
return f"adhoc_{digest}"
def default_label_for_point(lat: float, lon: float) -> str:
return f"Map center ({lat:.4f}, {lon:.4f})"
+147
View File
@@ -0,0 +1,147 @@
"""Tests for opt-in Sentinel-2 road corridor trend layer."""
from __future__ import annotations
import json
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from services.fetchers._store import active_layers, latest_data
from services.fetchers.road_corridor_sat import fetch_road_corridor_trends
from services.road_corridor_sat.presets import get_preset
class TestRoadCorridorGates:
def test_fetch_skips_when_layer_disabled(self, monkeypatch):
monkeypatch.setenv("ROAD_CORRIDOR_SAT_ENABLED", "true")
active_layers["road_corridor_trends"] = False
with patch("services.road_corridor_sat.pipeline.analyze_preset") as analyze:
fetch_road_corridor_trends(force=True)
analyze.assert_not_called()
def test_fetch_skips_when_feature_disabled(self, monkeypatch):
active_layers["road_corridor_trends"] = True
monkeypatch.delenv("ROAD_CORRIDOR_SAT_ENABLED", raising=False)
with patch("services.road_corridor_sat.pipeline.analyze_preset") as analyze:
fetch_road_corridor_trends(force=True)
analyze.assert_not_called()
def test_fetch_runs_when_enabled(self, monkeypatch, tmp_path):
monkeypatch.setenv("ROAD_CORRIDOR_SAT_ENABLED", "true")
monkeypatch.setenv("SENTINEL_CLIENT_ID", "test-id")
monkeypatch.setenv("SENTINEL_CLIENT_SECRET", "test-secret")
monkeypatch.setenv("ROAD_CORRIDOR_DATA_DIR", str(tmp_path))
active_layers["road_corridor_trends"] = True
fake_result = {
"preset_id": "laredo_i35",
"label": "Laredo I-35",
"status": "ok",
"daily_counts": [{"date": "2026-05-01", "count": 3}],
"total_detections": 3,
}
with patch("services.road_corridor_sat.config.optional_deps_available", return_value=True):
with patch(
"services.road_corridor_sat.pipeline.analyze_preset",
return_value=fake_result,
) as analyze:
fetch_road_corridor_trends(force=True)
analyze.assert_called_once_with("laredo_i35")
assert latest_data["road_corridor_trends"]["corridors"]
class TestAnalyzeHere:
def test_analyze_requires_credentials(self, monkeypatch):
from main import app
monkeypatch.setattr("routers.road_corridors.optional_deps_available", lambda: True)
monkeypatch.setattr("routers.road_corridors.sentinel_credentials_configured", lambda: False)
client = TestClient(app)
resp = client.post(
"/api/road-corridors/analyze",
json={"lat": 27.51, "lon": -99.53},
)
assert resp.status_code == 503
def test_analyze_starts_job(self, monkeypatch):
from main import app
monkeypatch.setattr("routers.road_corridors.optional_deps_available", lambda: True)
monkeypatch.setattr("routers.road_corridors.sentinel_credentials_configured", lambda: True)
def fake_enqueue(lat, lon, label=None):
from services.road_corridor_sat.jobs import AnalyzeJob
return AnalyzeJob(job_id="job123", lat=lat, lon=lon, status="queued")
monkeypatch.setattr("routers.road_corridors.enqueue_analyze", fake_enqueue)
client = TestClient(app)
resp = client.post(
"/api/road-corridors/analyze",
json={"lat": 27.51, "lon": -99.53},
)
assert resp.status_code == 200
body = resp.json()
assert body["job_id"] == "job123"
assert body["status"] == "queued"
class TestRoadCorridorApi:
def test_list_presets(self):
from main import app
client = TestClient(app)
resp = client.get("/api/road-corridors")
assert resp.status_code == 200
body = resp.json()
assert body["ok"] is True
ids = {p["id"] for p in body["presets"]}
assert "laredo_i35" in ids
def test_get_preset_detail(self):
from main import app
client = TestClient(app)
resp = client.get("/api/road-corridors/laredo_i35")
assert resp.status_code == 200
body = resp.json()
assert body["preset"]["id"] == "laredo_i35"
assert body["result"]["preset_id"] == "laredo_i35"
def test_unknown_preset_404(self):
from main import app
client = TestClient(app)
resp = client.get("/api/road-corridors/not-a-real-preset")
assert resp.status_code == 404
class TestStorage:
def test_store_analysis_result_roundtrip(self, tmp_path, monkeypatch):
monkeypatch.setattr("services.road_corridor_sat.storage.DATA_ROOT", tmp_path)
monkeypatch.setattr(
"services.road_corridor_sat.storage.STATE_PATH",
tmp_path / "_refresh_state.json",
)
from services.road_corridor_sat.storage import load_preset_result, store_analysis_result
preset = get_preset("laredo_i35")
assert preset is not None
store_analysis_result(
preset["id"],
label=preset["label"],
bbox=preset["bbox"],
country=preset["country"],
category=preset["category"],
road_count=4,
frame_count=2,
detections=[{"timestamp": "2026-05-01T12:00:00Z", "confidence": 0.9}],
)
loaded = load_preset_result("laredo_i35")
assert loaded is not None
assert loaded["total_detections"] == 1
assert loaded["daily_counts"] == [{"date": "2026-05-01", "count": 1}]
on_disk = json.loads((tmp_path / "laredo_i35.json").read_text(encoding="utf-8"))
assert on_disk["status"] == "ok"
+11
View File
@@ -0,0 +1,11 @@
# DrishX / S2 truck motion detection — third-party notice
Detection code in `backend/services/road_corridor_sat/` is adapted from:
- **DrishX** — MIT License — [sparkyniner/DRISH-X-Satellite-powered-freight-intelligence-](https://github.com/sparkyniner/DRISH-X-Satellite-powered-freight-intelligence-)
- **S2TruckDetect / Fisser et al. (2022)** — [Detecting Moving Trucks on Roads Using Sentinel-2 Data](https://ui.adsabs.harvard.edu/abs/2022RemS...14.1595F/abstract)
The trained Random Forest weights ship as `backend/data/drishx/rf_model.pickle` from the DrishX distribution.
Satellite imagery: [Copernicus Data Space Ecosystem](https://dataspace.copernicus.eu/) (free, ESA).
Road network: [OpenStreetMap](https://www.openstreetmap.org/) via Overpass.
+3
View File
@@ -185,6 +185,7 @@ export default function Dashboard() {
highres_satellite: false,
sentinel_hub: false,
viirs_nightlights: false,
road_corridor_trends: false,
// Hazards — no fire, rest ON
earthquakes: true,
firms: false,
@@ -446,6 +447,7 @@ export default function Dashboard() {
highres_satellite: false,
sentinel_hub: false,
viirs_nightlights: false,
road_corridor_trends: false,
psk_reporter: false,
tinygs: false,
datacenters: false,
@@ -589,6 +591,7 @@ export default function Dashboard() {
setTrackedScanner={setTrackedScanner}
isMinimized={leftDataMinimized}
onMinimizedChange={setLeftDataMinimized}
viewBoundsRef={viewBoundsRef}
/>
</ErrorBoundary>
) : (
@@ -0,0 +1,213 @@
'use client';
import React, { useCallback, useEffect, useRef, useState } from 'react';
import { Loader2, MapPin, Truck } from 'lucide-react';
import { API_BASE } from '@/lib/api';
import { LAYER_TOGGLE_EVENT } from '@/hooks/useDataPolling';
import { VIEWPORT_COMMITTED_EVENT } from '@/components/map/hooks/useViewportBounds';
import { useTranslation } from '@/i18n';
type ViewBounds = { south: number; west: number; north: number; east: number };
type RoadCorridorStatus = {
deps_installed: boolean;
credentials_configured: boolean;
active_job?: AnalyzeJob | null;
};
type AnalyzeJob = {
job_id: string;
status: string;
message: string;
progress: number;
error?: string | null;
result?: {
total_detections?: number;
daily_counts?: Array<{ date: string; count: number }>;
status?: string;
error?: string | null;
} | null;
};
function viewCenter(bounds: ViewBounds | null | undefined): { lat: number; lon: number } | null {
if (!bounds) return null;
const { south, west, north, east } = bounds;
if (![south, west, north, east].every((v) => Number.isFinite(v))) return null;
return { lat: (south + north) / 2, lon: (west + east) / 2 };
}
export default function RoadCorridorLayerControls({
viewBoundsRef,
}: {
viewBoundsRef?: React.RefObject<ViewBounds | null>;
}) {
const { t } = useTranslation();
const [status, setStatus] = useState<RoadCorridorStatus | null>(null);
const [job, setJob] = useState<AnalyzeJob | null>(null);
const [submitting, setSubmitting] = useState(false);
const [mapCenter, setMapCenter] = useState<{ lat: number; lon: number } | null>(() =>
viewCenter(viewBoundsRef?.current ?? null),
);
const pollRef = useRef<ReturnType<typeof setInterval> | null>(null);
useEffect(() => {
const syncCenter = () => setMapCenter(viewCenter(viewBoundsRef?.current ?? null));
syncCenter();
window.addEventListener(VIEWPORT_COMMITTED_EVENT, syncCenter);
return () => window.removeEventListener(VIEWPORT_COMMITTED_EVENT, syncCenter);
}, [viewBoundsRef]);
const stopPolling = useCallback(() => {
if (pollRef.current) {
clearInterval(pollRef.current);
pollRef.current = null;
}
}, []);
const pollJob = useCallback(
async (jobId?: string) => {
try {
const qs = jobId ? `?job_id=${encodeURIComponent(jobId)}` : '';
const res = await fetch(`${API_BASE}/api/road-corridors/analyze/status${qs}`);
if (!res.ok) return;
const body = await res.json();
const next = body.job as AnalyzeJob | null;
if (!next) return;
setJob(next);
if (next.status === 'ok' || next.status === 'error') {
stopPolling();
setSubmitting(false);
if (next.status === 'ok') {
window.dispatchEvent(new Event(LAYER_TOGGLE_EVENT));
}
}
} catch {
// ignore transient poll errors
}
},
[stopPolling],
);
const startPolling = useCallback(
(jobId: string) => {
stopPolling();
void pollJob(jobId);
pollRef.current = setInterval(() => void pollJob(jobId), 2500);
},
[pollJob, stopPolling],
);
useEffect(() => {
let cancelled = false;
(async () => {
try {
const res = await fetch(`${API_BASE}/api/road-corridors/status`);
if (!res.ok || cancelled) return;
const body = await res.json();
setStatus(body);
if (body.active_job?.status === 'queued' || body.active_job?.status === 'running') {
setJob(body.active_job);
setSubmitting(true);
startPolling(body.active_job.job_id);
}
} catch {
// backend may be offline during boot
}
})();
return () => {
cancelled = true;
stopPolling();
};
}, [startPolling, stopPolling]);
const ready = Boolean(status?.deps_installed && status?.credentials_configured);
const running = submitting || job?.status === 'queued' || job?.status === 'running';
const handleAnalyze = async () => {
const c = mapCenter ?? viewCenter(viewBoundsRef?.current ?? null);
if (!c || running) return;
setSubmitting(true);
setJob(null);
try {
const res = await fetch(`${API_BASE}/api/road-corridors/analyze`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ lat: c.lat, lon: c.lon }),
});
const body = await res.json().catch(() => ({}));
if (res.status === 409 && body.detail) {
const statusRes = await fetch(`${API_BASE}/api/road-corridors/analyze/status`);
const statusBody = await statusRes.json();
if (statusBody.job) {
setJob(statusBody.job);
startPolling(statusBody.job.job_id);
}
return;
}
if (!res.ok) {
setSubmitting(false);
setJob({
job_id: '',
status: 'error',
message: typeof body.detail === 'string' ? body.detail : t('roadCorridor.analyzeFailed'),
progress: 100,
error: typeof body.detail === 'string' ? body.detail : undefined,
});
return;
}
setJob(body as AnalyzeJob);
startPolling((body as AnalyzeJob).job_id);
} catch {
setSubmitting(false);
setJob({
job_id: '',
status: 'error',
message: t('roadCorridor.analyzeFailed'),
progress: 100,
});
}
};
let statusLine = t('roadCorridor.hintTrends');
if (!ready) {
statusLine = !status?.deps_installed
? t('roadCorridor.missingDeps')
: t('roadCorridor.missingCreds');
} else if (!mapCenter) {
statusLine = t('roadCorridor.panMapFirst');
} else if (running && job) {
statusLine = job.message || t('roadCorridor.analyzing');
} else if (job?.status === 'ok' && job.result) {
const days = job.result.daily_counts?.length ?? 0;
const total = job.result.total_detections ?? 0;
statusLine = `${total} truck signatures · ${days} day${days === 1 ? '' : 's'}`;
} else if (job?.status === 'error') {
statusLine = job.error || job.message || t('roadCorridor.analyzeFailed');
}
return (
<div className="ml-7 mt-2 flex flex-col gap-1.5" onClick={(e) => e.stopPropagation()}>
<button
type="button"
onClick={() => void handleAnalyze()}
disabled={!ready || !mapCenter || running}
className="flex items-center gap-1.5 text-[9px] font-mono tracking-wide text-amber-400 hover:text-amber-200 border border-amber-500/30 hover:border-amber-500/50 bg-amber-500/5 hover:bg-amber-500/10 disabled:opacity-40 disabled:hover:text-amber-400 disabled:hover:border-amber-500/30 disabled:hover:bg-amber-500/5 px-2.5 py-1 rounded transition w-fit"
>
{running ? <Loader2 size={10} className="animate-spin" /> : <MapPin size={10} />}
{running ? t('roadCorridor.analyzing') : t('roadCorridor.analyzeHere')}
</button>
<div className="flex items-start gap-1.5 text-[10px] font-mono text-[var(--text-muted)] leading-snug max-w-[220px]">
<Truck size={10} className="mt-0.5 shrink-0 text-amber-500/70" />
<span>{statusLine}</span>
</div>
{running && job && job.progress > 0 ? (
<div className="h-1 w-full max-w-[180px] bg-amber-950/40 rounded overflow-hidden">
<div
className="h-full bg-amber-500/70 transition-all duration-500"
style={{ width: `${Math.min(100, job.progress)}%` }}
/>
</div>
) : null}
</div>
);
}
+20 -3
View File
@@ -43,7 +43,9 @@ import {
Droplets,
Radar,
MapPin,
Truck,
} from 'lucide-react';
import RoadCorridorLayerControls from '@/components/RoadCorridorLayerControls';
import { API_BASE } from '@/lib/api';
import { useLiveUamapScraperOptIn } from '@/hooks/useLiveUamapScraperOptIn';
import ConfirmDialog from '@/components/ui/ConfirmDialog';
@@ -107,6 +109,7 @@ const FRESHNESS_MAP: Record<string, string> = {
wastewater: 'wastewater',
ai_intel: '',
crowdthreat: 'crowdthreat',
road_corridor_trends: 'road_corridor_trends',
};
// POTUS fleet ICAO hex codes for client-side filtering
@@ -650,6 +653,7 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
isMinimized: isMinimizedProp,
onMinimizedChange,
onOpenSarAoiEditor,
viewBoundsRef,
}: {
activeLayers: ActiveLayers;
setActiveLayers: React.Dispatch<React.SetStateAction<ActiveLayers>>;
@@ -675,6 +679,7 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
isMinimized?: boolean;
onMinimizedChange?: (minimized: boolean) => void;
onOpenSarAoiEditor?: () => void;
viewBoundsRef?: React.RefObject<{ south: number; west: number; north: number; east: number } | null>;
}) {
const data = useDataSnapshot() as import('@/types/dashboard').DashboardData;
const { t } = useTranslation();
@@ -1039,6 +1044,15 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
count: null,
icon: Moon,
},
{
id: 'road_corridor_trends',
name: t('layers.roadCorridorTrends'),
source: t('layers.roadCorridorSource'),
count:
data?.road_corridor_trends?.corridors?.filter((c) => (c.total_detections ?? 0) > 0)
.length ?? 0,
icon: Truck,
},
],
},
{
@@ -1401,21 +1415,21 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
<button
title={
Object.entries(activeLayers)
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights'].includes(k))
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends'].includes(k))
.every(([, v]) => v)
? 'Disable all layers'
: 'Enable all layers'
}
className={`${
Object.entries(activeLayers)
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights'].includes(k))
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends'].includes(k))
.every(([, v]) => v)
? 'text-cyan-400'
: 'text-[var(--text-muted)]'
} hover:text-cyan-400 transition-colors`}
onClick={(e) => {
e.stopPropagation();
const excluded = new Set(['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights']);
const excluded = new Set(['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends']);
const allOn = Object.entries(activeLayers)
.filter(([k]) => !excluded.has(k))
.every(([, v]) => v);
@@ -1839,6 +1853,9 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
</div>
)}
{/* SAR inline controls — AOI editor button */}
{active && layer.id === 'road_corridor_trends' && (
<RoadCorridorLayerControls viewBoundsRef={viewBoundsRef} />
)}
{active && layer.id === 'sar' && onOpenSarAoiEditor && (
<div
className="ml-7 mt-2 flex items-center gap-2"
+12 -1
View File
@@ -201,7 +201,18 @@
"crowdThreat": "CrowdThreat",
"shodanOverlay": "Shodan Overlay",
"aiIntel": "AI Intel",
"sar": "SAR"
"sar": "SAR",
"roadCorridorTrends": "Road Freight Trends",
"roadCorridorSource": "Copernicus S-2 · trends not live"
},
"roadCorridor": {
"analyzeHere": "ANALYZE HERE",
"analyzing": "Analyzing…",
"hintTrends": "Satellite truck-volume trends on major highways (~5 day revisit)",
"missingDeps": "Install backend road-corridor extras to analyze",
"missingCreds": "Add Sentinel creds in Settings → Imagery",
"panMapFirst": "Pan the map to choose an area",
"analyzeFailed": "Analysis failed"
},
"shodan": {
"title": "Shodan Connector",
+12 -1
View File
@@ -201,7 +201,18 @@
"crowdThreat": "CrowdThreat",
"shodanOverlay": "Couche Shodan",
"aiIntel": "Infos IA",
"sar": "SAR"
"sar": "SAR",
"roadCorridorTrends": "Tendances fret routier",
"roadCorridorSource": "Copernicus S-2 · tendances (pas en direct)"
},
"roadCorridor": {
"analyzeHere": "ANALYSER ICI",
"analyzing": "Analyse…",
"hintTrends": "Tendances camions par satellite sur autoroutes (~5 j de revisite)",
"missingDeps": "Installer les extras backend road-corridor",
"missingCreds": "Ajouter les identifiants Sentinel dans Réglages → Imagerie",
"panMapFirst": "Déplacez la carte pour choisir une zone",
"analyzeFailed": "Échec de l'analyse"
},
"shodan": {
"title": "Connecteur Shodan",
+12 -1
View File
@@ -201,7 +201,18 @@
"crowdThreat": "人群威胁",
"shodanOverlay": "Shodan 叠加",
"aiIntel": "AI 情报",
"sar": "SAR"
"sar": "SAR",
"roadCorridorTrends": "公路货运趋势",
"roadCorridorSource": "Copernicus S-2 · 趋势(非实时)"
},
"roadCorridor": {
"analyzeHere": "分析此处",
"analyzing": "分析中…",
"hintTrends": "主要高速公路卫星卡车量趋势(约 5 天重访)",
"missingDeps": "请安装 backend road-corridor 可选依赖",
"missingCreds": "请在设置 → 影像 中添加 Sentinel 凭据",
"panMapFirst": "请先平移地图以选择区域",
"analyzeFailed": "分析失败"
},
"shodan": {
"title": "Shodan 连接器",
+14
View File
@@ -921,6 +921,19 @@ export interface DashboardData {
sar_anomalies?: SarAnomaly[];
sar_aoi_coverage?: SarAoiCoverage[];
sar_aois?: SarAoi[];
road_corridor_trends?: {
updated_at?: string | null;
corridors?: Array<{
preset_id: string;
label: string;
status: string;
total_detections?: number;
daily_counts?: Array<{ date: string; count: number }>;
updated_at?: string;
error?: string | null;
}>;
};
}
// ─── SAR ─────────────────────────────────────────────────────────────────────
@@ -1030,6 +1043,7 @@ export interface ActiveLayers {
ai_intel: boolean;
crowdthreat: boolean;
sar: boolean;
road_corridor_trends: boolean;
}
export interface SelectedEntity {
Generated
+980 -12
View File
File diff suppressed because it is too large Load Diff