diff --git a/backend/routers/ai_intel.py b/backend/routers/ai_intel.py index 0c3c851..32ffeae 100644 --- a/backend/routers/ai_intel.py +++ b/backend/routers/ai_intel.py @@ -1687,9 +1687,53 @@ async def agent_tool_manifest(request: Request): "radius_km": {"type": "float", "required": False, "description": "Context radius in km (default 100, max 1000)"}, "limit": {"type": "integer", "required": False, "description": "Max records per evidence group (default 10, max 50)"}, }, - "returns": "{status, claim_level, entity, center, radius_km, signals, evidence: {proximate_entities, context_layers}, recommended_next}", + "returns": "{status, claim_level, entity, center, radius_km, signals, movement, evidence: {proximate_entities, context_layers}, recommended_next}", "example": {"cmd": "correlate_entity", "args": {"entity_type": "aircraft", "callsign": "AF1", "radius_km": 150}}, }, + { + "name": "get_entity_trail", + "type": "read", + "description": "Resolve an aircraft or vessel and return its observed movement trail, route enrichment (from entity fields or route database), bearing/heading, and optional ACARS datalink hints. Trails accumulate while ShadowBroker is running.", + "parameters": { + "query": {"type": "string", "required": False, "description": "Natural-language name, owner, callsign, tail, MMSI, or vessel name"}, + "entity_type": {"type": "string", "required": False, "description": "Optional type hint: aircraft or ship"}, + "callsign": {"type": "string", "required": False, "description": "Aircraft or vessel callsign"}, + "registration": {"type": "string", "required": False, "description": "Aircraft tail number / registration"}, + "icao24": {"type": "string", "required": False, "description": "Aircraft ICAO24 hex identifier"}, + "mmsi": {"type": "string", "required": False, "description": "Ship MMSI"}, + "imo": {"type": "string", "required": False, "description": "Ship IMO number"}, + "name": {"type": "string", "required": False, "description": "Known aircraft/vessel name"}, + "owner": {"type": "string", "required": False, "description": "Owner or operator"}, + "max_points": {"type": "integer", "required": False, "description": "Max trail points returned (default 80, max 200)"}, + "include_datalink": {"type": "boolean", "required": False, "description": "Include ACARS/VDL summaries when available (default true)"}, + }, + "returns": "{status, entity, trail: [{lat,lng,alt_ft,ts}], route, movement, datalink_hints, notes}", + "example": {"cmd": "get_entity_trail", "args": {"registration": "N424PX", "icao24": "a50bd6"}}, + }, + { + "name": "get_entity_profile", + "type": "read", + "description": "One-shot aircraft or vessel dossier: identity, VIP/plane-alert metadata, position, trail, route, holding/emissions, ACARS datalink, nearby correlations/jamming/SAR, and related news.", + "parameters": { + "query": {"type": "string", "required": False, "description": "Natural-language name, owner, callsign, tail, MMSI, or vessel name"}, + "entity_type": {"type": "string", "required": False, "description": "Optional type hint: aircraft or ship"}, + "callsign": {"type": "string", "required": False, "description": "Aircraft or vessel callsign"}, + "registration": {"type": "string", "required": False, "description": "Aircraft tail number / registration"}, + "icao24": {"type": "string", "required": False, "description": "Aircraft ICAO24 hex identifier"}, + "mmsi": {"type": "string", "required": False, "description": "Ship MMSI"}, + "imo": {"type": "string", "required": False, "description": "Ship IMO number"}, + "name": {"type": "string", "required": False, "description": "Known aircraft/vessel name"}, + "owner": {"type": "string", "required": False, "description": "Owner or operator"}, + "max_trail_points": {"type": "integer", "required": False, "description": "Max trail points (default 80)"}, + "include_datalink": {"type": "boolean", "required": False, "description": "Include ACARS summaries (default true)"}, + "include_datalink_messages": {"type": "boolean", "required": False, "description": "Include full summarized datalink messages (default false)"}, + "include_news": {"type": "boolean", "required": False, "description": "Search news for owner/operator (default true)"}, + "include_nearby_context": {"type": "boolean", "required": False, "description": "Include nearby correlations, GPS jamming, SAR (default true)"}, + "context_radius_km": {"type": "float", "required": False, "description": "Radius for nearby context (default 120 km)"}, + }, + "returns": "{status, identity, position, trail, route, movement, aircraft_state, datalink, nearby_context, related_news, recommended_next}", + "example": {"cmd": "get_entity_profile", "args": {"registration": "N424PX", "owner": "Katzman"}}, + }, { "name": "search_telemetry", "type": "read", @@ -2413,7 +2457,15 @@ async def api_capabilities(request: Request): }, "correlate_entity": { "args": {"query": "str (optional)", "entity_type": "str (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "radius_km": "float (default 100)", "limit": "int (default 10)"}, - "description": "Resolve an entity and return nearby context/correlation evidence. Co-location is reported as a lead, not proof.", + "description": "Resolve an entity and return nearby context/correlation evidence plus observed movement trail summary. Co-location is reported as a lead, not proof.", + }, + "get_entity_trail": { + "args": {"query": "str (optional)", "entity_type": "str (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "max_points": "int (default 80)", "include_datalink": "bool (default true)"}, + "description": "Observed aircraft/vessel trail, route enrichment, bearing, and ACARS hints. Use instead of Time Machine for live-session history.", + }, + "get_entity_profile": { + "args": {"query": "str (optional)", "entity_type": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "owner": "str (optional)", "include_datalink": "bool (default true)", "include_news": "bool (default true)", "include_nearby_context": "bool (default true)"}, + "description": "Preferred one-shot dossier for aircraft/vessels — identity, trail, route, VIP tags, datalink, jamming/correlations, news.", }, "search_telemetry": { "args": {"query": "str", "layers": "list[str] (optional)", "limit": "int (default 25)"}, diff --git a/backend/services/entity_profile.py b/backend/services/entity_profile.py new file mode 100644 index 0000000..e074ed3 --- /dev/null +++ b/backend/services/entity_profile.py @@ -0,0 +1,291 @@ +"""Bundled entity intelligence profile for OpenClaw agents.""" + +from __future__ import annotations + +from typing import Any + +from services.entity_trail import get_entity_trail +from services.fetchers._store import get_latest_data_subset_refs +from services.telemetry import find_entity, search_news + +_AIRCRAFT_LAYERS = ( + "tracked_flights", + "military_flights", + "private_jets", + "private_flights", + "commercial_flights", +) + + +def _pick_str(entity: dict[str, Any], *keys: str) -> str: + for key in keys: + value = entity.get(key) + if value not in (None, ""): + return str(value).strip() + return "" + + +def _full_record(entity: dict[str, Any]) -> dict[str, Any]: + """Re-load the enriched store record (holding, emissions, alert_tags, etc.).""" + icao = _pick_str(entity, "icao24").lower() + mmsi = _pick_str(entity, "mmsi") + if icao: + snap = get_latest_data_subset_refs(*_AIRCRAFT_LAYERS) + for layer in _AIRCRAFT_LAYERS: + for item in snap.get(layer) or []: + if not isinstance(item, dict): + continue + if str(item.get("icao24") or "").lower() == icao: + merged = dict(item) + merged.setdefault("source_layer", layer) + return merged + if mmsi: + snap = get_latest_data_subset_refs("ships") + items = snap.get("ships") or [] + if isinstance(items, dict): + items = items.get("vessels", []) or items.get("items", []) + for item in items if isinstance(items, list) else []: + if not isinstance(item, dict): + continue + if str(item.get("mmsi") or "") == mmsi: + merged = dict(item) + merged.setdefault("source_layer", "ships") + return merged + return dict(entity) + + +def _identity_block(entity: dict[str, Any], *, is_ship: bool) -> dict[str, Any]: + block: dict[str, Any] = { + "label": _pick_str(entity, "label", "callsign", "name", "tracked_name"), + "callsign": _pick_str(entity, "callsign", "flight", "call"), + "registration": _pick_str(entity, "registration", "r"), + "icao24": _pick_str(entity, "icao24"), + "mmsi": _pick_str(entity, "mmsi"), + "imo": _pick_str(entity, "imo"), + "name": _pick_str(entity, "name", "shipName", "tracked_name", "yacht_name"), + "type": _pick_str(entity, "type", "t", "aircraft_type", "shipType"), + "owner": _pick_str(entity, "owner", "operator", "alert_operator", "yacht_owner"), + "source_layer": _pick_str(entity, "source_layer", "layer"), + "country": _pick_str(entity, "country", "flag"), + } + if not is_ship: + tags = entity.get("alert_tags") or entity.get("intel_tags") + if tags: + block["tags"] = tags if isinstance(tags, list) else str(tags) + for key in ( + "alert_category", + "alert_operator", + "alert_color", + "alert_type", + "alert_link", + "alert_wiki", + "alert_socials", + "tracked_name", + "intel_tags", + "squawk", + ): + value = entity.get(key) + if value not in (None, "", [], {}): + block[key] = value + else: + for key in ("tracked_name", "tracked_category", "yacht_owner", "yacht_name", "yacht_category"): + value = entity.get(key) + if value not in (None, ""): + block[key] = value + return block + + +def _position_block(entity: dict[str, Any]) -> dict[str, Any]: + return { + "lat": entity.get("lat") or entity.get("latitude"), + "lng": entity.get("lng") or entity.get("lon") or entity.get("longitude"), + "alt_ft": entity.get("alt") or entity.get("altitude") or entity.get("alt_baro"), + "speed_knots": entity.get("speed_knots") or entity.get("speed") or entity.get("gs") or entity.get("sog"), + "heading_deg": entity.get("heading") or entity.get("true_track") or entity.get("track") or entity.get("course"), + "on_ground": bool(entity.get("on_ground")) if "on_ground" in entity else None, + } + + +def _aircraft_state(entity: dict[str, Any]) -> dict[str, Any]: + state: dict[str, Any] = {} + if entity.get("holding") is not None: + state["holding"] = bool(entity.get("holding")) + emissions = entity.get("emissions") + if isinstance(emissions, dict) and emissions: + state["emissions"] = { + key: emissions[key] + for key in ( + "fuel_gph", + "co2_kg_per_hour", + "fuel_gallons_burned", + "co2_kg_emitted", + "observation_seconds", + ) + if emissions.get(key) is not None + } + return state + + +def _datalink_block( + *, + icao24: str, + registration: str, + callsign: str, + include_messages: bool, + message_limit: int, +) -> dict[str, Any]: + try: + from services.fetchers.airframes import lookup_datalink_messages + + result = lookup_datalink_messages( + icao24=icao24, + registration=registration, + callsign=callsign, + allow_live=False, + ) + except Exception: + return {"configured": False, "messages": [], "hints": [], "hidden_count": 0} + + messages = result.get("messages") or [] + hints = [ + str(msg.get("summary") or "").strip() + for msg in messages + if isinstance(msg, dict) and str(msg.get("summary") or "").strip() + ][:5] + block: dict[str, Any] = { + "configured": bool(result.get("configured")), + "hints": hints, + "hidden_count": int(result.get("hidden_count") or 0), + "queued_refresh": bool(result.get("queued_refresh") or result.get("priority_scan")), + } + if include_messages: + block["messages"] = messages[: max(1, min(message_limit, 20))] + return block + + +def _nearby_context( + *, + lat: float, + lng: float, + radius_km: float, + is_ship: bool, +) -> dict[str, Any]: + from services.telemetry import _nearby_items_from_layers + + layers = ["correlations", "gps_jamming", "sar_anomalies", "internet_outages"] + if is_ship: + layers.append("fishing_activity") + context = _nearby_items_from_layers( + lat=lat, + lng=lng, + radius_km=radius_km, + layers=tuple(layers), + limit_per_layer=5, + ) + return {layer: items for layer, items in context.items() if items} + + +def get_entity_profile( + *, + query: str = "", + entity_type: str = "", + callsign: str = "", + registration: str = "", + icao24: str = "", + mmsi: str = "", + imo: str = "", + name: str = "", + owner: str = "", + max_trail_points: int = 80, + include_datalink: bool = True, + include_datalink_messages: bool = False, + datalink_message_limit: int = 8, + include_news: bool = True, + news_limit: int = 5, + context_radius_km: float = 120, + include_nearby_context: bool = True, +) -> dict[str, Any]: + """One-shot dossier: identity, position, trail, route, enrichment, and context.""" + trail_pack = get_entity_trail( + query=query, + entity_type=entity_type, + callsign=callsign, + registration=registration, + icao24=icao24, + mmsi=mmsi, + imo=imo, + name=name, + owner=owner, + max_points=max_trail_points, + include_datalink=False, + ) + if trail_pack.get("status") == "unresolved": + return { + "status": "unresolved", + "lookup": trail_pack.get("lookup"), + "recommended_next": [ + "Try registration, ICAO24, MMSI, callsign, or owner.", + "Use track_entity to get alerts when the entity reappears.", + ], + } + + entity = _full_record(trail_pack.get("entity") or {}) + is_ship = trail_pack.get("entity_kind") == "ship" + identity = _identity_block(entity, is_ship=is_ship) + position = _position_block(entity) + + profile: dict[str, Any] = { + "status": trail_pack.get("status"), + "entity_kind": trail_pack.get("entity_kind"), + "lookup": trail_pack.get("lookup"), + "identity": identity, + "position": position, + "trail": trail_pack.get("trail") or [], + "route": trail_pack.get("route") or {}, + "movement": trail_pack.get("movement") or {}, + "notes": trail_pack.get("notes") or [], + } + + if not is_ship: + aircraft_state = _aircraft_state(entity) + if aircraft_state: + profile["aircraft_state"] = aircraft_state + if include_datalink: + profile["datalink"] = _datalink_block( + icao24=_pick_str(entity, "icao24") or icao24, + registration=_pick_str(entity, "registration") or registration, + callsign=_pick_str(entity, "callsign", "flight") or callsign, + include_messages=include_datalink_messages, + message_limit=datalink_message_limit, + ) + + lat = position.get("lat") + lng = position.get("lng") + if include_nearby_context and lat is not None and lng is not None: + profile["nearby_context"] = _nearby_context( + lat=float(lat), + lng=float(lng), + radius_km=max(10.0, min(float(context_radius_km or 120), 500.0)), + is_ship=is_ship, + ) + + if include_news: + news_query = ( + _pick_str(entity, "alert_operator", "owner", "operator", "tracked_name", "name") + or _pick_str(entity, "registration", "callsign") + or query + ) + if news_query: + profile["related_news"] = search_news(query=news_query, limit=max(1, min(news_limit, 15))) + + profile["recommended_next"] = [ + "Use correlate_entity for nearby-event evidence packs.", + "Use track_entity for forward monitoring without re-querying.", + "Use get_entity_trail when you only need movement history.", + ] + if not profile.get("route"): + profile["recommended_next"].insert( + 0, + "Route unknown — check datalink hints or wait for callsign route database match.", + ) + return profile diff --git a/backend/services/entity_trail.py b/backend/services/entity_trail.py new file mode 100644 index 0000000..71fa67b --- /dev/null +++ b/backend/services/entity_trail.py @@ -0,0 +1,300 @@ +"""Resolve live movement history (trail + route) for aircraft and vessels.""" + +from __future__ import annotations + +import math +from typing import Any + +from services.telemetry import find_entity + + +def _coerce_float(value: Any) -> float | None: + try: + if value is None: + return None + return float(value) + except (TypeError, ValueError): + return None + + +def _norm_key(value: str) -> str: + return str(value or "").strip().lower() + + +def _is_known_route_name(value: str) -> bool: + normalized = str(value or "").strip().upper() + return bool(normalized and normalized != "UNKNOWN") + + +def _bearing_deg(lat1: float, lng1: float, lat2: float, lng2: float) -> float: + phi1, phi2 = math.radians(lat1), math.radians(lat2) + d_lambda = math.radians(lng2 - lng1) + y = math.sin(d_lambda) * math.cos(phi2) + x = math.cos(phi1) * math.sin(phi2) - math.sin(phi1) * math.cos(phi2) * math.cos(d_lambda) + return (math.degrees(math.atan2(y, x)) + 360.0) % 360.0 + + +def _compact_trail_points(points: list, *, max_points: int = 80) -> list[dict[str, Any]]: + if not points: + return [] + if len(points) <= max_points: + selected = points + else: + step = max(1, len(points) // max_points) + selected = points[::step] + if selected[-1] is not points[-1]: + selected.append(points[-1]) + + out: list[dict[str, Any]] = [] + for point in selected: + if not isinstance(point, (list, tuple)) or len(point) < 2: + continue + lat = _coerce_float(point[0]) + lng = _coerce_float(point[1]) + if lat is None or lng is None: + continue + item: dict[str, Any] = { + "lat": round(lat, 5), + "lng": round(lng, 5), + } + if len(point) >= 3: + alt = _coerce_float(point[2]) + if alt is not None: + item["alt_ft"] = round(alt, 1) + if len(point) >= 4: + ts = _coerce_float(point[3]) + if ts is not None: + item["ts"] = round(ts, 1) + out.append(item) + return out + + +def _route_from_entity(entity: dict[str, Any]) -> dict[str, Any]: + origin_name = str(entity.get("origin_name") or "").strip() + dest_name = str(entity.get("dest_name") or "").strip() + origin_loc = entity.get("origin_loc") + dest_loc = entity.get("dest_loc") + if _is_known_route_name(origin_name) and _is_known_route_name(dest_name): + return { + "origin_name": origin_name, + "dest_name": dest_name, + "origin_loc": origin_loc, + "dest_loc": dest_loc, + "source": "entity_field", + } + return {} + + +def _route_from_database(callsign: str) -> dict[str, Any]: + from services.fetchers.route_database import lookup_route + + route = lookup_route(callsign) + if not route: + return {} + return { + "origin_name": route.get("orig_name"), + "dest_name": route.get("dest_name"), + "origin_loc": route.get("orig_loc"), + "dest_loc": route.get("dest_loc"), + "source": "route_database", + } + + +def _datalink_hints(*, icao24: str = "", registration: str = "", callsign: str = "") -> list[str]: + try: + from services.fetchers.airframes import lookup_datalink_messages + + result = lookup_datalink_messages( + icao24=icao24, + registration=registration, + callsign=callsign, + allow_live=False, + ) + except Exception: + return [] + + hints: list[str] = [] + for message in result.get("messages") or []: + if not isinstance(message, dict): + continue + summary = str(message.get("summary") or "").strip() + if summary: + hints.append(summary) + if len(hints) >= 5: + break + return hints + + +def _flight_trail(icao24: str) -> list: + from services.fetchers.flights import get_flight_trail + + return get_flight_trail(icao24) + + +def _ship_trail(mmsi: int | str) -> list: + from services.ais_stream import get_vessel_trail + + try: + return get_vessel_trail(int(mmsi)) + except (TypeError, ValueError): + return [] + + +def _movement_summary(points: list[dict[str, Any]]) -> dict[str, Any]: + if not points: + return {} + first = points[0] + last = points[-1] + summary: dict[str, Any] = { + "first_point": first, + "last_point": last, + "point_count": len(points), + } + if first.get("ts") and last.get("ts"): + duration_s = max(0.0, float(last["ts"]) - float(first["ts"])) + summary["duration_minutes"] = round(duration_s / 60.0, 1) + summary["first_seen_at"] = first["ts"] + summary["last_seen_at"] = last["ts"] + if len(points) >= 2: + summary["bearing_deg"] = round( + _bearing_deg(first["lat"], first["lng"], last["lat"], last["lng"]), + 1, + ) + if len(points) >= 3: + prev = points[-2] + summary["current_heading_deg"] = round( + _bearing_deg(prev["lat"], prev["lng"], last["lat"], last["lng"]), + 1, + ) + return summary + + +def get_entity_trail( + *, + query: str = "", + entity_type: str = "", + callsign: str = "", + registration: str = "", + icao24: str = "", + mmsi: str = "", + imo: str = "", + name: str = "", + owner: str = "", + max_points: int = 80, + include_datalink: bool = True, +) -> dict[str, Any]: + """Return movement history and route context for a resolved aircraft or vessel.""" + lookup = find_entity( + query=query, + entity_type=entity_type, + callsign=callsign, + registration=registration, + icao24=icao24, + mmsi=mmsi, + imo=imo, + name=name, + owner=owner, + limit=3, + fallback_search=True, + ) + entity = lookup.get("best_match") if isinstance(lookup.get("best_match"), dict) else None + if not entity: + return { + "status": "unresolved", + "lookup": lookup, + "entity": None, + "trail": [], + "route": {}, + "movement": {}, + "datalink_hints": [], + "notes": [ + "No matching aircraft or vessel in live layers.", + "Trails accumulate while ShadowBroker is running; they are not pre-flight history.", + ], + } + + group = _norm_key(entity.get("group") or entity.get("entity_group") or "") + source_layer = _norm_key(entity.get("source_layer") or "") + is_ship = group == "maritime" or source_layer == "ships" or bool(entity.get("mmsi")) + + raw_points: list = [] + entity_id = "" + if is_ship: + mmsi_value = entity.get("mmsi") + if mmsi_value is not None: + entity_id = str(mmsi_value) + raw_points = _ship_trail(mmsi_value) + else: + hex_id = str(entity.get("icao24") or "").strip().lower() + entity_id = hex_id + if hex_id: + raw_points = _flight_trail(hex_id) + + max_points = max(10, min(int(max_points or 80), 200)) + trail = _compact_trail_points(raw_points, max_points=max_points) + movement = _movement_summary(trail) + + route = _route_from_entity(entity) + if not route: + callsign_value = str( + entity.get("callsign") or entity.get("flight") or entity.get("call") or callsign or "" + ).strip() + if callsign_value: + route = _route_from_database(callsign_value) + + datalink_hints: list[str] = [] + if include_datalink and not is_ship: + datalink_hints = _datalink_hints( + icao24=str(entity.get("icao24") or icao24 or ""), + registration=str(entity.get("registration") or registration or ""), + callsign=str(entity.get("callsign") or callsign or ""), + ) + + notes = [ + "Trail points are observed positions since this ShadowBroker instance started tracking the entity.", + "Use Time Machine snapshots for longer historical playback when enabled.", + ] + if not trail: + notes.insert( + 0, + "No trail points yet — the entity may have just appeared or trail retention expired.", + ) + elif not route: + notes.append("Route origin/destination unknown; infer direction from trail bearing only.") + + status = "trail_available" if trail else "resolved_without_trail" + return { + "status": status, + "lookup": lookup, + "entity": entity, + "entity_id": entity_id, + "entity_kind": "ship" if is_ship else "aircraft", + "trail": trail, + "route": route, + "movement": movement, + "datalink_hints": datalink_hints, + "notes": notes, + } + + +def movement_context_for_entity(entity: dict[str, Any], *, max_points: int = 40) -> dict[str, Any]: + """Compact movement block for correlate_entity and dossier helpers.""" + if not isinstance(entity, dict): + return {} + result = get_entity_trail( + icao24=str(entity.get("icao24") or ""), + mmsi=str(entity.get("mmsi") or ""), + registration=str(entity.get("registration") or ""), + callsign=str(entity.get("callsign") or entity.get("flight") or ""), + entity_type="ship" if entity.get("mmsi") else "aircraft", + max_points=max_points, + include_datalink=True, + ) + return { + "trail_point_count": len(result.get("trail") or []), + "trail": result.get("trail") or [], + "route": result.get("route") or {}, + "movement": result.get("movement") or {}, + "datalink_hints": result.get("datalink_hints") or [], + "notes": result.get("notes") or [], + } diff --git a/backend/services/fetchers/flights.py b/backend/services/fetchers/flights.py index eb49ba6..0412bf0 100644 --- a/backend/services/fetchers/flights.py +++ b/backend/services/fetchers/flights.py @@ -810,9 +810,11 @@ def _classify_and_publish(all_adsb_flights): seen_hexes.add(hex_id) tracked_hexes = {t.get("icao24", "").lower() for t in tracked_snapshot} + private_hexes = {t.get("icao24", "").lower() for t in private_jets_snapshot} + priority_hexes = tracked_hexes | private_hexes stale_keys = [] for k, v in flight_trails.items(): - cutoff = now_ts - 1800 if k in tracked_hexes else now_ts - 300 + cutoff = now_ts - 14400 if k in priority_hexes else now_ts - 300 if v["last_seen"] < cutoff: stale_keys.append(k) for k in stale_keys: diff --git a/backend/services/openclaw_channel.py b/backend/services/openclaw_channel.py index 2be6387..ad6d877 100644 --- a/backend/services/openclaw_channel.py +++ b/backend/services/openclaw_channel.py @@ -59,6 +59,8 @@ READ_COMMANDS = frozenset({ "find_ships", "find_entity", "correlate_entity", + "get_entity_trail", + "get_entity_profile", "brief_area", "what_changed", "search_telemetry", @@ -843,6 +845,62 @@ def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]: return {"ok": True, "data": compact, "format": "compressed_v1"} return {"ok": True, "data": result} + if cmd == "get_entity_trail": + from services.entity_trail import get_entity_trail + + result = get_entity_trail( + query=str(args.get("query", "") or ""), + entity_type=str(args.get("entity_type", "") or args.get("type", "") or ""), + callsign=str(args.get("callsign", "") or ""), + registration=str(args.get("registration", "") or args.get("tail_number", "") or ""), + icao24=str(args.get("icao24", "") or ""), + mmsi=str(args.get("mmsi", "") or ""), + imo=str(args.get("imo", "") or ""), + name=str(args.get("name", "") or ""), + owner=str(args.get("owner", "") or args.get("operator", "") or ""), + max_points=args.get("max_points", 80), + include_datalink=bool(args.get("include_datalink", True)), + ) + if _wants_compact(args): + compact = dict(result) + if isinstance(compact.get("entity"), dict): + compact["entity"] = _compact_result_entry(compact["entity"]) + return {"ok": True, "data": compact, "format": "compressed_v1"} + return {"ok": True, "data": result} + + if cmd == "get_entity_profile": + from services.entity_profile import get_entity_profile + + result = get_entity_profile( + query=str(args.get("query", "") or ""), + entity_type=str(args.get("entity_type", "") or args.get("type", "") or ""), + callsign=str(args.get("callsign", "") or ""), + registration=str(args.get("registration", "") or args.get("tail_number", "") or ""), + icao24=str(args.get("icao24", "") or ""), + mmsi=str(args.get("mmsi", "") or ""), + imo=str(args.get("imo", "") or ""), + name=str(args.get("name", "") or ""), + owner=str(args.get("owner", "") or args.get("operator", "") or ""), + max_trail_points=args.get("max_trail_points", args.get("max_points", 80)), + include_datalink=bool(args.get("include_datalink", True)), + include_datalink_messages=bool(args.get("include_datalink_messages", False)), + datalink_message_limit=args.get("datalink_message_limit", 8), + include_news=bool(args.get("include_news", True)), + news_limit=args.get("news_limit", 5), + context_radius_km=args.get("context_radius_km", 120), + include_nearby_context=bool(args.get("include_nearby_context", True)), + ) + if _wants_compact(args): + compact = dict(result) + if isinstance(compact.get("identity"), dict): + compact["identity"] = _compact_result_entry(compact["identity"]) + if isinstance(compact.get("position"), dict): + compact["position"] = _compact_result_entry(compact["position"]) + if isinstance(compact.get("related_news"), dict): + compact["related_news"] = _compact_query_result(compact["related_news"]) + return {"ok": True, "data": compact, "format": "compressed_v1"} + return {"ok": True, "data": result} + if cmd == "search_telemetry": from services.telemetry import search_telemetry result = search_telemetry( diff --git a/backend/services/openclaw_routing.py b/backend/services/openclaw_routing.py index 2747078..8810ae1 100644 --- a/backend/services/openclaw_routing.py +++ b/backend/services/openclaw_routing.py @@ -33,6 +33,8 @@ LATENCY_TIER_MS: dict[str, int] = { "brief_area": 30, "get_layer_slice": 50, "correlate_entity": 15, + "get_entity_trail": 20, + "get_entity_profile": 35, "entity_expand": 40, "osint_lookup": 200, "run_playbook": 120, @@ -175,7 +177,7 @@ def routing_manifest() -> dict[str, Any]: }, { "intent": "known person/aircraft", - "use": "find_entity(query=...) or find_flights(owner=...)", + "use": "get_entity_profile(query=...) or find_entity(query=...)", }, { "intent": "news / telegram topic", @@ -640,20 +642,58 @@ def plan_playbook(name: str, args: dict[str, Any] | None = None) -> dict[str, An return { "ok": True, "playbook": playbook, - "description": "Resolve entity for tracking", + "description": "Resolve entity and return full movement profile", "batch": [ { - "cmd": "find_entity", + "cmd": "get_entity_profile", "args": { "query": query, "entity_type": params.get("entity_type", ""), - "fallback_search": True, "compact": True, + "include_datalink": True, + "include_nearby_context": True, }, } ], } + if playbook == "jet_recon": + query = str(params.get("query", "") or params.get("registration", "") or params.get("owner", "") or "").strip() + if not query: + return {"ok": False, "detail": "jet_recon requires query, registration, or owner"} + return { + "ok": True, + "playbook": playbook, + "description": "VIP/aircraft dossier: profile + correlation evidence", + "batch": [ + { + "cmd": "get_entity_profile", + "args": { + "query": query, + "entity_type": params.get("entity_type", "aircraft"), + "registration": params.get("registration", ""), + "icao24": params.get("icao24", ""), + "owner": params.get("owner", ""), + "compact": True, + "include_datalink": True, + "include_news": True, + }, + }, + { + "cmd": "correlate_entity", + "args": { + "query": query, + "entity_type": params.get("entity_type", "aircraft"), + "registration": params.get("registration", ""), + "icao24": params.get("icao24", ""), + "owner": params.get("owner", ""), + "radius_km": params.get("radius_km", 150), + "compact": True, + }, + }, + ], + } + if playbook == "area_brief": lat = params.get("lat") lng = params.get("lng") @@ -694,7 +734,7 @@ def plan_playbook(name: str, args: dict[str, Any] | None = None) -> dict[str, An spec = PLAYBOOKS.get(playbook) if not spec: - known = sorted(PLAYBOOKS) + ["track_snapshot", "area_brief", "entity_recon"] + known = sorted(PLAYBOOKS) + ["track_snapshot", "jet_recon", "area_brief", "entity_recon"] return {"ok": False, "detail": f"unknown playbook: {playbook}", "known": known} return { diff --git a/backend/services/telemetry.py b/backend/services/telemetry.py index 60f970a..0d567df 100644 --- a/backend/services/telemetry.py +++ b/backend/services/telemetry.py @@ -1930,6 +1930,9 @@ def correlate_entity( }) status = "context_found" if signals else "no_nearby_context" + from services.entity_trail import movement_context_for_entity + + movement = movement_context_for_entity(best, max_points=40) return { "status": status, "claim_level": "evidence_pack_not_verdict", @@ -1938,11 +1941,13 @@ def correlate_entity( "center": {"lat": lat, "lng": lng}, "radius_km": radius, "signals": signals, + "movement": movement, "evidence": { "proximate_entities": proximate_entities, "context_layers": context, }, "recommended_next": [ + "Use get_entity_trail for full observed path and route enrichment.", "Use track_entity to keep monitoring this exact entity.", "Use watch_area on the returned center if the area matters more than the entity.", "Treat co-location as a lead, not proof of intent or causation.", diff --git a/backend/tests/test_entity_profile.py b/backend/tests/test_entity_profile.py new file mode 100644 index 0000000..454c842 --- /dev/null +++ b/backend/tests/test_entity_profile.py @@ -0,0 +1,104 @@ +"""Tests for get_entity_profile bundled dossier.""" + +import pytest +from unittest.mock import patch + + +@pytest.fixture() +def sample_store(): + from services.fetchers._store import latest_data, _data_lock + + with _data_lock: + backup = { + "tracked_flights": list(latest_data.get("tracked_flights") or []), + "news": list(latest_data.get("news") or []), + } + latest_data["tracked_flights"] = [ + { + "callsign": "OXE2116", + "registration": "N36NE", + "icao24": "a0f011", + "operator": "Patriots", + "alert_operator": "Patriots", + "alert_category": "Sports", + "alert_color": "orange", + "alert_tags": "NFL, New England Patriots", + "type": "Boeing 767-323ER", + "lat": 39.24, + "lng": -96.96, + "alt": 37000, + "heading": 270, + "holding": False, + "emissions": { + "fuel_gph": 1200, + "co2_kg_per_hour": 11500, + "fuel_gallons_burned": 2400, + "co2_kg_emitted": 23000, + }, + } + ] + latest_data["news"] = [ + {"title": "Patriots travel day", "summary": "Team jet departed", "lat": 39.0, "lng": -97.0} + ] + try: + yield + finally: + with _data_lock: + latest_data["tracked_flights"] = backup["tracked_flights"] + latest_data["news"] = backup["news"] + + +def test_get_entity_profile_aircraft(sample_store): + from services.entity_profile import get_entity_profile + from services.fetchers import flights + + flights.flight_trails["a0f011"] = { + "points": [[39.0, -97.0, 35000, 1000.0], [39.24, -96.96, 37000, 1120.0]], + "last_seen": 1120.0, + } + + result = get_entity_profile(registration="N36NE", include_nearby_context=False) + assert result["status"] in {"trail_available", "resolved_without_trail"} + assert result["identity"]["alert_operator"] == "Patriots" + assert result["identity"]["alert_category"] == "Sports" + assert result["aircraft_state"]["emissions"]["fuel_gallons_burned"] == 2400 + assert len(result["trail"]) == 2 + assert "related_news" in result + + +@patch("services.fetchers.airframes.lookup_datalink_messages") +def test_get_entity_profile_includes_datalink(mock_lookup, sample_store): + from services.entity_profile import get_entity_profile + + mock_lookup.return_value = { + "configured": True, + "messages": [{"summary": "Position / route report · KABQ→KDEN", "text": "POS..."}], + "hidden_count": 0, + } + + result = get_entity_profile(registration="N36NE", include_nearby_context=False, include_news=False) + assert result["datalink"]["configured"] is True + assert "KABQ" in result["datalink"]["hints"][0] + + +def test_openclaw_get_entity_profile_command(sample_store): + from services.openclaw_channel import _dispatch_command + from services.fetchers import flights + + flights.flight_trails["a0f011"] = { + "points": [[39.0, -97.0, 35000, 1000.0]], + "last_seen": 1000.0, + } + + result = _dispatch_command("get_entity_profile", {"registration": "N36NE", "compact": True}) + assert result["ok"] is True + assert result["data"]["identity"]["registration"] == "N36NE" + + +def test_jet_recon_playbook(): + from services.openclaw_routing import plan_playbook + + plan = plan_playbook("jet_recon", {"registration": "N424PX"}) + assert plan["ok"] is True + assert plan["batch"][0]["cmd"] == "get_entity_profile" + assert plan["batch"][1]["cmd"] == "correlate_entity" diff --git a/backend/tests/test_entity_trail.py b/backend/tests/test_entity_trail.py new file mode 100644 index 0000000..717bbcb --- /dev/null +++ b/backend/tests/test_entity_trail.py @@ -0,0 +1,132 @@ +"""Tests for entity trail resolution.""" + +import pytest +from unittest.mock import patch + + +@pytest.fixture() +def sample_store(): + from services.fetchers._store import latest_data, _data_lock + + with _data_lock: + backup = {"tracked_flights": list(latest_data.get("tracked_flights") or [])} + latest_data["tracked_flights"] = [ + { + "callsign": "AF1", + "registration": "82-8000", + "icao24": "adfdf8", + "alert_operator": "POTUS", + "type": "B744", + "lat": 38.95, + "lng": -77.45, + }, + { + "callsign": "OXE2116", + "registration": "N36NE", + "icao24": "a0f011", + "operator": "Patriots", + "type": "Boeing 767-323ER", + "lat": 39.24, + "lng": -96.96, + }, + ] + latest_data["ships"] = [ + { + "mmsi": 366999999, + "name": "BRAVO EUGENIA", + "lat": 25.77, + "lng": -80.13, + "shipType": 37, + } + ] + try: + yield + finally: + with _data_lock: + latest_data["tracked_flights"] = backup["tracked_flights"] + + +def test_get_entity_trail_aircraft_with_mock_trail(sample_store): + from services.entity_trail import get_entity_trail + from services.fetchers import flights + + flights.flight_trails["a0f011"] = { + "points": [ + [39.0, -97.0, 35000, 1000.0], + [39.2, -96.9, 36000, 1060.0], + [39.24, -96.96, 37000, 1120.0], + ], + "last_seen": 1120.0, + } + + result = get_entity_trail(registration="N36NE", include_datalink=False) + assert result["status"] == "trail_available" + assert result["entity_kind"] == "aircraft" + assert len(result["trail"]) == 3 + assert result["movement"]["bearing_deg"] is not None + assert "Trail points are observed" in result["notes"][0] + + +def test_get_entity_trail_ship(sample_store): + from services.entity_trail import get_entity_trail + from services import ais_stream + + ais_stream._vessel_trails[366999999] = { + "points": [ + [25.7, -80.1, 12, 2000.0], + [25.8, -80.2, 12, 2120.0], + ], + "last_seen": 2120.0, + } + + result = get_entity_trail(mmsi="366999999", entity_type="ship", include_datalink=False) + assert result["status"] == "trail_available" + assert result["entity_kind"] == "ship" + assert len(result["trail"]) == 2 + + +@patch("services.fetchers.route_database.lookup_route") +def test_get_entity_trail_enriches_route_from_database(mock_lookup, sample_store): + from services.entity_trail import get_entity_trail + + mock_lookup.return_value = { + "orig_name": "JFK: Kennedy", + "dest_name": "LAX: Los Angeles", + "orig_loc": [-73.78, 40.64], + "dest_loc": [-118.41, 33.94], + } + + result = get_entity_trail(callsign="AF1", include_datalink=False) + assert result["route"]["source"] == "route_database" + assert "Kennedy" in result["route"]["origin_name"] + + +def test_correlate_entity_includes_movement(sample_store, monkeypatch): + from services import telemetry + from services.fetchers import flights + + flights.flight_trails["adfdf8"] = { + "points": [[38.9, -77.5, 10000, 500.0], [38.95, -77.45, 12000, 560.0]], + "last_seen": 560.0, + } + + result = telemetry.correlate_entity(callsign="AF1", entity_type="aircraft", radius_km=80, limit=5) + assert result["movement"]["trail_point_count"] == 2 + assert len(result["movement"]["trail"]) == 2 + + +def test_openclaw_get_entity_trail_command(sample_store, monkeypatch): + from services.openclaw_channel import _dispatch_command + from services.fetchers import flights + + flights.flight_trails["a0f011"] = { + "points": [[39.0, -97.0, 35000, 1000.0]], + "last_seen": 1000.0, + } + + result = _dispatch_command( + "get_entity_trail", + {"query": "patriots jet", "compact": True}, + ) + assert result["ok"] is True + assert result["data"]["status"] in {"trail_available", "resolved_without_trail"} diff --git a/backend/tests/test_openclaw_routing.py b/backend/tests/test_openclaw_routing.py index bdea34a..67dbe2b 100644 --- a/backend/tests/test_openclaw_routing.py +++ b/backend/tests/test_openclaw_routing.py @@ -85,7 +85,7 @@ def test_plan_playbook_track_snapshot_requires_query(): assert plan["ok"] is False plan_ok = plan_playbook("track_snapshot", {"query": "patriots jet"}) assert plan_ok["ok"] is True - assert plan_ok["batch"][0]["cmd"] == "find_entity" + assert plan_ok["batch"][0]["cmd"] == "get_entity_profile" def test_expensive_commands_set(): diff --git a/openclaw-skills/shadowbroker/SKILL.md b/openclaw-skills/shadowbroker/SKILL.md index dff1158..4bd6062 100644 --- a/openclaw-skills/shadowbroker/SKILL.md +++ b/openclaw-skills/shadowbroker/SKILL.md @@ -25,7 +25,7 @@ three-tool surface: | `await sb.run_playbook("hot_snapshot")` | Pre-batched snapshots (morning brief, monitor poll, status) | | `await sb.channel_status()` | Liveness (~5 ms) — never `/api/health` | -**Latency tiers:** `find_entity` / `find_flights` / `search_news` / `entities_near` → **⚡ <30 ms**. +**Latency tiers:** `get_entity_profile` / `find_entity` / `get_entity_trail` / `find_flights` / `search_news` / `entities_near` → **⚡ <30 ms**. `search_telemetry` / `get_telemetry` / `get_report` → **🔴 seconds** — blocked unless `confirm_expensive=true`. ```python @@ -46,7 +46,7 @@ batch = await sb.send_batch([ ]) ``` -**Playbooks:** `hot_snapshot`, `morning_brief`, `status_check`, `monitor_heartbeat`, `track_snapshot`, `area_brief`, `entity_recon`. +**Playbooks:** `hot_snapshot`, `morning_brief`, `status_check`, `monitor_heartbeat`, `track_snapshot`, `jet_recon`, `area_brief`, `entity_recon`. **Anti-patterns:** `search_telemetry` for known tail numbers; `get_telemetry` for routine polls; sequential `send_command` loops; empty `layers: []` on `get_layer_slice`. @@ -157,6 +157,8 @@ The channel operates over HMAC-authenticated HTTP with body-integrity binding: | `await sb.get_layer_slice(["ships", "gdelt"])` | Only the requested layers, with per-layer incremental | **Primary fetch method** — automatically skips layers you already have | | `await sb.send_command("get_summary")` | Lightweight counts-only summary | Discover what data exists before pulling anything | | `await sb.ask("...")` | **Route + execute** | **Default** for natural-language reads | +| `await sb.send_command("get_entity_profile", {...})` | **Preferred aircraft/vessel dossier** — identity, VIP tags, trail, route, ACARS, jamming/correlations, news | Tail, owner, callsign, MMSI | +| `await sb.send_command("get_entity_trail", {...})` | Observed path + route + ACARS hints only | When you don't need full dossier | | `await sb.send_command("find_entity", {...})` | Exact-first entity resolver | Parsed person/tail/callsign/MMSI — skips fuzzy unless `fallback_search=true` | | `await sb.send_command("find_flights", {...})` | Targeted flight search | When you know the domain (callsign, tail number) | | `await sb.send_command("route_query", {...})` | Routing plan only | Inspect recommended command before executing |