Add entity trail and profile commands for OpenClaw agent dossiers.

Expose observed aircraft/vessel paths, route enrichment, VIP metadata, datalink, and nearby context so agents can reconstruct movement without full telemetry dumps.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-26 00:23:40 -06:00
parent 8c4619179e
commit 4cb6492b22
11 changed files with 997 additions and 11 deletions
+54 -2
View File
@@ -1687,9 +1687,53 @@ async def agent_tool_manifest(request: Request):
"radius_km": {"type": "float", "required": False, "description": "Context radius in km (default 100, max 1000)"},
"limit": {"type": "integer", "required": False, "description": "Max records per evidence group (default 10, max 50)"},
},
"returns": "{status, claim_level, entity, center, radius_km, signals, evidence: {proximate_entities, context_layers}, recommended_next}",
"returns": "{status, claim_level, entity, center, radius_km, signals, movement, evidence: {proximate_entities, context_layers}, recommended_next}",
"example": {"cmd": "correlate_entity", "args": {"entity_type": "aircraft", "callsign": "AF1", "radius_km": 150}},
},
{
"name": "get_entity_trail",
"type": "read",
"description": "Resolve an aircraft or vessel and return its observed movement trail, route enrichment (from entity fields or route database), bearing/heading, and optional ACARS datalink hints. Trails accumulate while ShadowBroker is running.",
"parameters": {
"query": {"type": "string", "required": False, "description": "Natural-language name, owner, callsign, tail, MMSI, or vessel name"},
"entity_type": {"type": "string", "required": False, "description": "Optional type hint: aircraft or ship"},
"callsign": {"type": "string", "required": False, "description": "Aircraft or vessel callsign"},
"registration": {"type": "string", "required": False, "description": "Aircraft tail number / registration"},
"icao24": {"type": "string", "required": False, "description": "Aircraft ICAO24 hex identifier"},
"mmsi": {"type": "string", "required": False, "description": "Ship MMSI"},
"imo": {"type": "string", "required": False, "description": "Ship IMO number"},
"name": {"type": "string", "required": False, "description": "Known aircraft/vessel name"},
"owner": {"type": "string", "required": False, "description": "Owner or operator"},
"max_points": {"type": "integer", "required": False, "description": "Max trail points returned (default 80, max 200)"},
"include_datalink": {"type": "boolean", "required": False, "description": "Include ACARS/VDL summaries when available (default true)"},
},
"returns": "{status, entity, trail: [{lat,lng,alt_ft,ts}], route, movement, datalink_hints, notes}",
"example": {"cmd": "get_entity_trail", "args": {"registration": "N424PX", "icao24": "a50bd6"}},
},
{
"name": "get_entity_profile",
"type": "read",
"description": "One-shot aircraft or vessel dossier: identity, VIP/plane-alert metadata, position, trail, route, holding/emissions, ACARS datalink, nearby correlations/jamming/SAR, and related news.",
"parameters": {
"query": {"type": "string", "required": False, "description": "Natural-language name, owner, callsign, tail, MMSI, or vessel name"},
"entity_type": {"type": "string", "required": False, "description": "Optional type hint: aircraft or ship"},
"callsign": {"type": "string", "required": False, "description": "Aircraft or vessel callsign"},
"registration": {"type": "string", "required": False, "description": "Aircraft tail number / registration"},
"icao24": {"type": "string", "required": False, "description": "Aircraft ICAO24 hex identifier"},
"mmsi": {"type": "string", "required": False, "description": "Ship MMSI"},
"imo": {"type": "string", "required": False, "description": "Ship IMO number"},
"name": {"type": "string", "required": False, "description": "Known aircraft/vessel name"},
"owner": {"type": "string", "required": False, "description": "Owner or operator"},
"max_trail_points": {"type": "integer", "required": False, "description": "Max trail points (default 80)"},
"include_datalink": {"type": "boolean", "required": False, "description": "Include ACARS summaries (default true)"},
"include_datalink_messages": {"type": "boolean", "required": False, "description": "Include full summarized datalink messages (default false)"},
"include_news": {"type": "boolean", "required": False, "description": "Search news for owner/operator (default true)"},
"include_nearby_context": {"type": "boolean", "required": False, "description": "Include nearby correlations, GPS jamming, SAR (default true)"},
"context_radius_km": {"type": "float", "required": False, "description": "Radius for nearby context (default 120 km)"},
},
"returns": "{status, identity, position, trail, route, movement, aircraft_state, datalink, nearby_context, related_news, recommended_next}",
"example": {"cmd": "get_entity_profile", "args": {"registration": "N424PX", "owner": "Katzman"}},
},
{
"name": "search_telemetry",
"type": "read",
@@ -2413,7 +2457,15 @@ async def api_capabilities(request: Request):
},
"correlate_entity": {
"args": {"query": "str (optional)", "entity_type": "str (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "radius_km": "float (default 100)", "limit": "int (default 10)"},
"description": "Resolve an entity and return nearby context/correlation evidence. Co-location is reported as a lead, not proof.",
"description": "Resolve an entity and return nearby context/correlation evidence plus observed movement trail summary. Co-location is reported as a lead, not proof.",
},
"get_entity_trail": {
"args": {"query": "str (optional)", "entity_type": "str (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "max_points": "int (default 80)", "include_datalink": "bool (default true)"},
"description": "Observed aircraft/vessel trail, route enrichment, bearing, and ACARS hints. Use instead of Time Machine for live-session history.",
},
"get_entity_profile": {
"args": {"query": "str (optional)", "entity_type": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "owner": "str (optional)", "include_datalink": "bool (default true)", "include_news": "bool (default true)", "include_nearby_context": "bool (default true)"},
"description": "Preferred one-shot dossier for aircraft/vessels — identity, trail, route, VIP tags, datalink, jamming/correlations, news.",
},
"search_telemetry": {
"args": {"query": "str", "layers": "list[str] (optional)", "limit": "int (default 25)"},
+291
View File
@@ -0,0 +1,291 @@
"""Bundled entity intelligence profile for OpenClaw agents."""
from __future__ import annotations
from typing import Any
from services.entity_trail import get_entity_trail
from services.fetchers._store import get_latest_data_subset_refs
from services.telemetry import find_entity, search_news
_AIRCRAFT_LAYERS = (
"tracked_flights",
"military_flights",
"private_jets",
"private_flights",
"commercial_flights",
)
def _pick_str(entity: dict[str, Any], *keys: str) -> str:
for key in keys:
value = entity.get(key)
if value not in (None, ""):
return str(value).strip()
return ""
def _full_record(entity: dict[str, Any]) -> dict[str, Any]:
"""Re-load the enriched store record (holding, emissions, alert_tags, etc.)."""
icao = _pick_str(entity, "icao24").lower()
mmsi = _pick_str(entity, "mmsi")
if icao:
snap = get_latest_data_subset_refs(*_AIRCRAFT_LAYERS)
for layer in _AIRCRAFT_LAYERS:
for item in snap.get(layer) or []:
if not isinstance(item, dict):
continue
if str(item.get("icao24") or "").lower() == icao:
merged = dict(item)
merged.setdefault("source_layer", layer)
return merged
if mmsi:
snap = get_latest_data_subset_refs("ships")
items = snap.get("ships") or []
if isinstance(items, dict):
items = items.get("vessels", []) or items.get("items", [])
for item in items if isinstance(items, list) else []:
if not isinstance(item, dict):
continue
if str(item.get("mmsi") or "") == mmsi:
merged = dict(item)
merged.setdefault("source_layer", "ships")
return merged
return dict(entity)
def _identity_block(entity: dict[str, Any], *, is_ship: bool) -> dict[str, Any]:
block: dict[str, Any] = {
"label": _pick_str(entity, "label", "callsign", "name", "tracked_name"),
"callsign": _pick_str(entity, "callsign", "flight", "call"),
"registration": _pick_str(entity, "registration", "r"),
"icao24": _pick_str(entity, "icao24"),
"mmsi": _pick_str(entity, "mmsi"),
"imo": _pick_str(entity, "imo"),
"name": _pick_str(entity, "name", "shipName", "tracked_name", "yacht_name"),
"type": _pick_str(entity, "type", "t", "aircraft_type", "shipType"),
"owner": _pick_str(entity, "owner", "operator", "alert_operator", "yacht_owner"),
"source_layer": _pick_str(entity, "source_layer", "layer"),
"country": _pick_str(entity, "country", "flag"),
}
if not is_ship:
tags = entity.get("alert_tags") or entity.get("intel_tags")
if tags:
block["tags"] = tags if isinstance(tags, list) else str(tags)
for key in (
"alert_category",
"alert_operator",
"alert_color",
"alert_type",
"alert_link",
"alert_wiki",
"alert_socials",
"tracked_name",
"intel_tags",
"squawk",
):
value = entity.get(key)
if value not in (None, "", [], {}):
block[key] = value
else:
for key in ("tracked_name", "tracked_category", "yacht_owner", "yacht_name", "yacht_category"):
value = entity.get(key)
if value not in (None, ""):
block[key] = value
return block
def _position_block(entity: dict[str, Any]) -> dict[str, Any]:
return {
"lat": entity.get("lat") or entity.get("latitude"),
"lng": entity.get("lng") or entity.get("lon") or entity.get("longitude"),
"alt_ft": entity.get("alt") or entity.get("altitude") or entity.get("alt_baro"),
"speed_knots": entity.get("speed_knots") or entity.get("speed") or entity.get("gs") or entity.get("sog"),
"heading_deg": entity.get("heading") or entity.get("true_track") or entity.get("track") or entity.get("course"),
"on_ground": bool(entity.get("on_ground")) if "on_ground" in entity else None,
}
def _aircraft_state(entity: dict[str, Any]) -> dict[str, Any]:
state: dict[str, Any] = {}
if entity.get("holding") is not None:
state["holding"] = bool(entity.get("holding"))
emissions = entity.get("emissions")
if isinstance(emissions, dict) and emissions:
state["emissions"] = {
key: emissions[key]
for key in (
"fuel_gph",
"co2_kg_per_hour",
"fuel_gallons_burned",
"co2_kg_emitted",
"observation_seconds",
)
if emissions.get(key) is not None
}
return state
def _datalink_block(
*,
icao24: str,
registration: str,
callsign: str,
include_messages: bool,
message_limit: int,
) -> dict[str, Any]:
try:
from services.fetchers.airframes import lookup_datalink_messages
result = lookup_datalink_messages(
icao24=icao24,
registration=registration,
callsign=callsign,
allow_live=False,
)
except Exception:
return {"configured": False, "messages": [], "hints": [], "hidden_count": 0}
messages = result.get("messages") or []
hints = [
str(msg.get("summary") or "").strip()
for msg in messages
if isinstance(msg, dict) and str(msg.get("summary") or "").strip()
][:5]
block: dict[str, Any] = {
"configured": bool(result.get("configured")),
"hints": hints,
"hidden_count": int(result.get("hidden_count") or 0),
"queued_refresh": bool(result.get("queued_refresh") or result.get("priority_scan")),
}
if include_messages:
block["messages"] = messages[: max(1, min(message_limit, 20))]
return block
def _nearby_context(
*,
lat: float,
lng: float,
radius_km: float,
is_ship: bool,
) -> dict[str, Any]:
from services.telemetry import _nearby_items_from_layers
layers = ["correlations", "gps_jamming", "sar_anomalies", "internet_outages"]
if is_ship:
layers.append("fishing_activity")
context = _nearby_items_from_layers(
lat=lat,
lng=lng,
radius_km=radius_km,
layers=tuple(layers),
limit_per_layer=5,
)
return {layer: items for layer, items in context.items() if items}
def get_entity_profile(
*,
query: str = "",
entity_type: str = "",
callsign: str = "",
registration: str = "",
icao24: str = "",
mmsi: str = "",
imo: str = "",
name: str = "",
owner: str = "",
max_trail_points: int = 80,
include_datalink: bool = True,
include_datalink_messages: bool = False,
datalink_message_limit: int = 8,
include_news: bool = True,
news_limit: int = 5,
context_radius_km: float = 120,
include_nearby_context: bool = True,
) -> dict[str, Any]:
"""One-shot dossier: identity, position, trail, route, enrichment, and context."""
trail_pack = get_entity_trail(
query=query,
entity_type=entity_type,
callsign=callsign,
registration=registration,
icao24=icao24,
mmsi=mmsi,
imo=imo,
name=name,
owner=owner,
max_points=max_trail_points,
include_datalink=False,
)
if trail_pack.get("status") == "unresolved":
return {
"status": "unresolved",
"lookup": trail_pack.get("lookup"),
"recommended_next": [
"Try registration, ICAO24, MMSI, callsign, or owner.",
"Use track_entity to get alerts when the entity reappears.",
],
}
entity = _full_record(trail_pack.get("entity") or {})
is_ship = trail_pack.get("entity_kind") == "ship"
identity = _identity_block(entity, is_ship=is_ship)
position = _position_block(entity)
profile: dict[str, Any] = {
"status": trail_pack.get("status"),
"entity_kind": trail_pack.get("entity_kind"),
"lookup": trail_pack.get("lookup"),
"identity": identity,
"position": position,
"trail": trail_pack.get("trail") or [],
"route": trail_pack.get("route") or {},
"movement": trail_pack.get("movement") or {},
"notes": trail_pack.get("notes") or [],
}
if not is_ship:
aircraft_state = _aircraft_state(entity)
if aircraft_state:
profile["aircraft_state"] = aircraft_state
if include_datalink:
profile["datalink"] = _datalink_block(
icao24=_pick_str(entity, "icao24") or icao24,
registration=_pick_str(entity, "registration") or registration,
callsign=_pick_str(entity, "callsign", "flight") or callsign,
include_messages=include_datalink_messages,
message_limit=datalink_message_limit,
)
lat = position.get("lat")
lng = position.get("lng")
if include_nearby_context and lat is not None and lng is not None:
profile["nearby_context"] = _nearby_context(
lat=float(lat),
lng=float(lng),
radius_km=max(10.0, min(float(context_radius_km or 120), 500.0)),
is_ship=is_ship,
)
if include_news:
news_query = (
_pick_str(entity, "alert_operator", "owner", "operator", "tracked_name", "name")
or _pick_str(entity, "registration", "callsign")
or query
)
if news_query:
profile["related_news"] = search_news(query=news_query, limit=max(1, min(news_limit, 15)))
profile["recommended_next"] = [
"Use correlate_entity for nearby-event evidence packs.",
"Use track_entity for forward monitoring without re-querying.",
"Use get_entity_trail when you only need movement history.",
]
if not profile.get("route"):
profile["recommended_next"].insert(
0,
"Route unknown — check datalink hints or wait for callsign route database match.",
)
return profile
+300
View File
@@ -0,0 +1,300 @@
"""Resolve live movement history (trail + route) for aircraft and vessels."""
from __future__ import annotations
import math
from typing import Any
from services.telemetry import find_entity
def _coerce_float(value: Any) -> float | None:
try:
if value is None:
return None
return float(value)
except (TypeError, ValueError):
return None
def _norm_key(value: str) -> str:
return str(value or "").strip().lower()
def _is_known_route_name(value: str) -> bool:
normalized = str(value or "").strip().upper()
return bool(normalized and normalized != "UNKNOWN")
def _bearing_deg(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
phi1, phi2 = math.radians(lat1), math.radians(lat2)
d_lambda = math.radians(lng2 - lng1)
y = math.sin(d_lambda) * math.cos(phi2)
x = math.cos(phi1) * math.sin(phi2) - math.sin(phi1) * math.cos(phi2) * math.cos(d_lambda)
return (math.degrees(math.atan2(y, x)) + 360.0) % 360.0
def _compact_trail_points(points: list, *, max_points: int = 80) -> list[dict[str, Any]]:
if not points:
return []
if len(points) <= max_points:
selected = points
else:
step = max(1, len(points) // max_points)
selected = points[::step]
if selected[-1] is not points[-1]:
selected.append(points[-1])
out: list[dict[str, Any]] = []
for point in selected:
if not isinstance(point, (list, tuple)) or len(point) < 2:
continue
lat = _coerce_float(point[0])
lng = _coerce_float(point[1])
if lat is None or lng is None:
continue
item: dict[str, Any] = {
"lat": round(lat, 5),
"lng": round(lng, 5),
}
if len(point) >= 3:
alt = _coerce_float(point[2])
if alt is not None:
item["alt_ft"] = round(alt, 1)
if len(point) >= 4:
ts = _coerce_float(point[3])
if ts is not None:
item["ts"] = round(ts, 1)
out.append(item)
return out
def _route_from_entity(entity: dict[str, Any]) -> dict[str, Any]:
origin_name = str(entity.get("origin_name") or "").strip()
dest_name = str(entity.get("dest_name") or "").strip()
origin_loc = entity.get("origin_loc")
dest_loc = entity.get("dest_loc")
if _is_known_route_name(origin_name) and _is_known_route_name(dest_name):
return {
"origin_name": origin_name,
"dest_name": dest_name,
"origin_loc": origin_loc,
"dest_loc": dest_loc,
"source": "entity_field",
}
return {}
def _route_from_database(callsign: str) -> dict[str, Any]:
from services.fetchers.route_database import lookup_route
route = lookup_route(callsign)
if not route:
return {}
return {
"origin_name": route.get("orig_name"),
"dest_name": route.get("dest_name"),
"origin_loc": route.get("orig_loc"),
"dest_loc": route.get("dest_loc"),
"source": "route_database",
}
def _datalink_hints(*, icao24: str = "", registration: str = "", callsign: str = "") -> list[str]:
try:
from services.fetchers.airframes import lookup_datalink_messages
result = lookup_datalink_messages(
icao24=icao24,
registration=registration,
callsign=callsign,
allow_live=False,
)
except Exception:
return []
hints: list[str] = []
for message in result.get("messages") or []:
if not isinstance(message, dict):
continue
summary = str(message.get("summary") or "").strip()
if summary:
hints.append(summary)
if len(hints) >= 5:
break
return hints
def _flight_trail(icao24: str) -> list:
from services.fetchers.flights import get_flight_trail
return get_flight_trail(icao24)
def _ship_trail(mmsi: int | str) -> list:
from services.ais_stream import get_vessel_trail
try:
return get_vessel_trail(int(mmsi))
except (TypeError, ValueError):
return []
def _movement_summary(points: list[dict[str, Any]]) -> dict[str, Any]:
if not points:
return {}
first = points[0]
last = points[-1]
summary: dict[str, Any] = {
"first_point": first,
"last_point": last,
"point_count": len(points),
}
if first.get("ts") and last.get("ts"):
duration_s = max(0.0, float(last["ts"]) - float(first["ts"]))
summary["duration_minutes"] = round(duration_s / 60.0, 1)
summary["first_seen_at"] = first["ts"]
summary["last_seen_at"] = last["ts"]
if len(points) >= 2:
summary["bearing_deg"] = round(
_bearing_deg(first["lat"], first["lng"], last["lat"], last["lng"]),
1,
)
if len(points) >= 3:
prev = points[-2]
summary["current_heading_deg"] = round(
_bearing_deg(prev["lat"], prev["lng"], last["lat"], last["lng"]),
1,
)
return summary
def get_entity_trail(
*,
query: str = "",
entity_type: str = "",
callsign: str = "",
registration: str = "",
icao24: str = "",
mmsi: str = "",
imo: str = "",
name: str = "",
owner: str = "",
max_points: int = 80,
include_datalink: bool = True,
) -> dict[str, Any]:
"""Return movement history and route context for a resolved aircraft or vessel."""
lookup = find_entity(
query=query,
entity_type=entity_type,
callsign=callsign,
registration=registration,
icao24=icao24,
mmsi=mmsi,
imo=imo,
name=name,
owner=owner,
limit=3,
fallback_search=True,
)
entity = lookup.get("best_match") if isinstance(lookup.get("best_match"), dict) else None
if not entity:
return {
"status": "unresolved",
"lookup": lookup,
"entity": None,
"trail": [],
"route": {},
"movement": {},
"datalink_hints": [],
"notes": [
"No matching aircraft or vessel in live layers.",
"Trails accumulate while ShadowBroker is running; they are not pre-flight history.",
],
}
group = _norm_key(entity.get("group") or entity.get("entity_group") or "")
source_layer = _norm_key(entity.get("source_layer") or "")
is_ship = group == "maritime" or source_layer == "ships" or bool(entity.get("mmsi"))
raw_points: list = []
entity_id = ""
if is_ship:
mmsi_value = entity.get("mmsi")
if mmsi_value is not None:
entity_id = str(mmsi_value)
raw_points = _ship_trail(mmsi_value)
else:
hex_id = str(entity.get("icao24") or "").strip().lower()
entity_id = hex_id
if hex_id:
raw_points = _flight_trail(hex_id)
max_points = max(10, min(int(max_points or 80), 200))
trail = _compact_trail_points(raw_points, max_points=max_points)
movement = _movement_summary(trail)
route = _route_from_entity(entity)
if not route:
callsign_value = str(
entity.get("callsign") or entity.get("flight") or entity.get("call") or callsign or ""
).strip()
if callsign_value:
route = _route_from_database(callsign_value)
datalink_hints: list[str] = []
if include_datalink and not is_ship:
datalink_hints = _datalink_hints(
icao24=str(entity.get("icao24") or icao24 or ""),
registration=str(entity.get("registration") or registration or ""),
callsign=str(entity.get("callsign") or callsign or ""),
)
notes = [
"Trail points are observed positions since this ShadowBroker instance started tracking the entity.",
"Use Time Machine snapshots for longer historical playback when enabled.",
]
if not trail:
notes.insert(
0,
"No trail points yet — the entity may have just appeared or trail retention expired.",
)
elif not route:
notes.append("Route origin/destination unknown; infer direction from trail bearing only.")
status = "trail_available" if trail else "resolved_without_trail"
return {
"status": status,
"lookup": lookup,
"entity": entity,
"entity_id": entity_id,
"entity_kind": "ship" if is_ship else "aircraft",
"trail": trail,
"route": route,
"movement": movement,
"datalink_hints": datalink_hints,
"notes": notes,
}
def movement_context_for_entity(entity: dict[str, Any], *, max_points: int = 40) -> dict[str, Any]:
"""Compact movement block for correlate_entity and dossier helpers."""
if not isinstance(entity, dict):
return {}
result = get_entity_trail(
icao24=str(entity.get("icao24") or ""),
mmsi=str(entity.get("mmsi") or ""),
registration=str(entity.get("registration") or ""),
callsign=str(entity.get("callsign") or entity.get("flight") or ""),
entity_type="ship" if entity.get("mmsi") else "aircraft",
max_points=max_points,
include_datalink=True,
)
return {
"trail_point_count": len(result.get("trail") or []),
"trail": result.get("trail") or [],
"route": result.get("route") or {},
"movement": result.get("movement") or {},
"datalink_hints": result.get("datalink_hints") or [],
"notes": result.get("notes") or [],
}
+3 -1
View File
@@ -810,9 +810,11 @@ def _classify_and_publish(all_adsb_flights):
seen_hexes.add(hex_id)
tracked_hexes = {t.get("icao24", "").lower() for t in tracked_snapshot}
private_hexes = {t.get("icao24", "").lower() for t in private_jets_snapshot}
priority_hexes = tracked_hexes | private_hexes
stale_keys = []
for k, v in flight_trails.items():
cutoff = now_ts - 1800 if k in tracked_hexes else now_ts - 300
cutoff = now_ts - 14400 if k in priority_hexes else now_ts - 300
if v["last_seen"] < cutoff:
stale_keys.append(k)
for k in stale_keys:
+58
View File
@@ -59,6 +59,8 @@ READ_COMMANDS = frozenset({
"find_ships",
"find_entity",
"correlate_entity",
"get_entity_trail",
"get_entity_profile",
"brief_area",
"what_changed",
"search_telemetry",
@@ -843,6 +845,62 @@ def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]:
return {"ok": True, "data": compact, "format": "compressed_v1"}
return {"ok": True, "data": result}
if cmd == "get_entity_trail":
from services.entity_trail import get_entity_trail
result = get_entity_trail(
query=str(args.get("query", "") or ""),
entity_type=str(args.get("entity_type", "") or args.get("type", "") or ""),
callsign=str(args.get("callsign", "") or ""),
registration=str(args.get("registration", "") or args.get("tail_number", "") or ""),
icao24=str(args.get("icao24", "") or ""),
mmsi=str(args.get("mmsi", "") or ""),
imo=str(args.get("imo", "") or ""),
name=str(args.get("name", "") or ""),
owner=str(args.get("owner", "") or args.get("operator", "") or ""),
max_points=args.get("max_points", 80),
include_datalink=bool(args.get("include_datalink", True)),
)
if _wants_compact(args):
compact = dict(result)
if isinstance(compact.get("entity"), dict):
compact["entity"] = _compact_result_entry(compact["entity"])
return {"ok": True, "data": compact, "format": "compressed_v1"}
return {"ok": True, "data": result}
if cmd == "get_entity_profile":
from services.entity_profile import get_entity_profile
result = get_entity_profile(
query=str(args.get("query", "") or ""),
entity_type=str(args.get("entity_type", "") or args.get("type", "") or ""),
callsign=str(args.get("callsign", "") or ""),
registration=str(args.get("registration", "") or args.get("tail_number", "") or ""),
icao24=str(args.get("icao24", "") or ""),
mmsi=str(args.get("mmsi", "") or ""),
imo=str(args.get("imo", "") or ""),
name=str(args.get("name", "") or ""),
owner=str(args.get("owner", "") or args.get("operator", "") or ""),
max_trail_points=args.get("max_trail_points", args.get("max_points", 80)),
include_datalink=bool(args.get("include_datalink", True)),
include_datalink_messages=bool(args.get("include_datalink_messages", False)),
datalink_message_limit=args.get("datalink_message_limit", 8),
include_news=bool(args.get("include_news", True)),
news_limit=args.get("news_limit", 5),
context_radius_km=args.get("context_radius_km", 120),
include_nearby_context=bool(args.get("include_nearby_context", True)),
)
if _wants_compact(args):
compact = dict(result)
if isinstance(compact.get("identity"), dict):
compact["identity"] = _compact_result_entry(compact["identity"])
if isinstance(compact.get("position"), dict):
compact["position"] = _compact_result_entry(compact["position"])
if isinstance(compact.get("related_news"), dict):
compact["related_news"] = _compact_query_result(compact["related_news"])
return {"ok": True, "data": compact, "format": "compressed_v1"}
return {"ok": True, "data": result}
if cmd == "search_telemetry":
from services.telemetry import search_telemetry
result = search_telemetry(
+45 -5
View File
@@ -33,6 +33,8 @@ LATENCY_TIER_MS: dict[str, int] = {
"brief_area": 30,
"get_layer_slice": 50,
"correlate_entity": 15,
"get_entity_trail": 20,
"get_entity_profile": 35,
"entity_expand": 40,
"osint_lookup": 200,
"run_playbook": 120,
@@ -175,7 +177,7 @@ def routing_manifest() -> dict[str, Any]:
},
{
"intent": "known person/aircraft",
"use": "find_entity(query=...) or find_flights(owner=...)",
"use": "get_entity_profile(query=...) or find_entity(query=...)",
},
{
"intent": "news / telegram topic",
@@ -640,20 +642,58 @@ def plan_playbook(name: str, args: dict[str, Any] | None = None) -> dict[str, An
return {
"ok": True,
"playbook": playbook,
"description": "Resolve entity for tracking",
"description": "Resolve entity and return full movement profile",
"batch": [
{
"cmd": "find_entity",
"cmd": "get_entity_profile",
"args": {
"query": query,
"entity_type": params.get("entity_type", ""),
"fallback_search": True,
"compact": True,
"include_datalink": True,
"include_nearby_context": True,
},
}
],
}
if playbook == "jet_recon":
query = str(params.get("query", "") or params.get("registration", "") or params.get("owner", "") or "").strip()
if not query:
return {"ok": False, "detail": "jet_recon requires query, registration, or owner"}
return {
"ok": True,
"playbook": playbook,
"description": "VIP/aircraft dossier: profile + correlation evidence",
"batch": [
{
"cmd": "get_entity_profile",
"args": {
"query": query,
"entity_type": params.get("entity_type", "aircraft"),
"registration": params.get("registration", ""),
"icao24": params.get("icao24", ""),
"owner": params.get("owner", ""),
"compact": True,
"include_datalink": True,
"include_news": True,
},
},
{
"cmd": "correlate_entity",
"args": {
"query": query,
"entity_type": params.get("entity_type", "aircraft"),
"registration": params.get("registration", ""),
"icao24": params.get("icao24", ""),
"owner": params.get("owner", ""),
"radius_km": params.get("radius_km", 150),
"compact": True,
},
},
],
}
if playbook == "area_brief":
lat = params.get("lat")
lng = params.get("lng")
@@ -694,7 +734,7 @@ def plan_playbook(name: str, args: dict[str, Any] | None = None) -> dict[str, An
spec = PLAYBOOKS.get(playbook)
if not spec:
known = sorted(PLAYBOOKS) + ["track_snapshot", "area_brief", "entity_recon"]
known = sorted(PLAYBOOKS) + ["track_snapshot", "jet_recon", "area_brief", "entity_recon"]
return {"ok": False, "detail": f"unknown playbook: {playbook}", "known": known}
return {
+5
View File
@@ -1930,6 +1930,9 @@ def correlate_entity(
})
status = "context_found" if signals else "no_nearby_context"
from services.entity_trail import movement_context_for_entity
movement = movement_context_for_entity(best, max_points=40)
return {
"status": status,
"claim_level": "evidence_pack_not_verdict",
@@ -1938,11 +1941,13 @@ def correlate_entity(
"center": {"lat": lat, "lng": lng},
"radius_km": radius,
"signals": signals,
"movement": movement,
"evidence": {
"proximate_entities": proximate_entities,
"context_layers": context,
},
"recommended_next": [
"Use get_entity_trail for full observed path and route enrichment.",
"Use track_entity to keep monitoring this exact entity.",
"Use watch_area on the returned center if the area matters more than the entity.",
"Treat co-location as a lead, not proof of intent or causation.",
+104
View File
@@ -0,0 +1,104 @@
"""Tests for get_entity_profile bundled dossier."""
import pytest
from unittest.mock import patch
@pytest.fixture()
def sample_store():
from services.fetchers._store import latest_data, _data_lock
with _data_lock:
backup = {
"tracked_flights": list(latest_data.get("tracked_flights") or []),
"news": list(latest_data.get("news") or []),
}
latest_data["tracked_flights"] = [
{
"callsign": "OXE2116",
"registration": "N36NE",
"icao24": "a0f011",
"operator": "Patriots",
"alert_operator": "Patriots",
"alert_category": "Sports",
"alert_color": "orange",
"alert_tags": "NFL, New England Patriots",
"type": "Boeing 767-323ER",
"lat": 39.24,
"lng": -96.96,
"alt": 37000,
"heading": 270,
"holding": False,
"emissions": {
"fuel_gph": 1200,
"co2_kg_per_hour": 11500,
"fuel_gallons_burned": 2400,
"co2_kg_emitted": 23000,
},
}
]
latest_data["news"] = [
{"title": "Patriots travel day", "summary": "Team jet departed", "lat": 39.0, "lng": -97.0}
]
try:
yield
finally:
with _data_lock:
latest_data["tracked_flights"] = backup["tracked_flights"]
latest_data["news"] = backup["news"]
def test_get_entity_profile_aircraft(sample_store):
from services.entity_profile import get_entity_profile
from services.fetchers import flights
flights.flight_trails["a0f011"] = {
"points": [[39.0, -97.0, 35000, 1000.0], [39.24, -96.96, 37000, 1120.0]],
"last_seen": 1120.0,
}
result = get_entity_profile(registration="N36NE", include_nearby_context=False)
assert result["status"] in {"trail_available", "resolved_without_trail"}
assert result["identity"]["alert_operator"] == "Patriots"
assert result["identity"]["alert_category"] == "Sports"
assert result["aircraft_state"]["emissions"]["fuel_gallons_burned"] == 2400
assert len(result["trail"]) == 2
assert "related_news" in result
@patch("services.fetchers.airframes.lookup_datalink_messages")
def test_get_entity_profile_includes_datalink(mock_lookup, sample_store):
from services.entity_profile import get_entity_profile
mock_lookup.return_value = {
"configured": True,
"messages": [{"summary": "Position / route report · KABQ→KDEN", "text": "POS..."}],
"hidden_count": 0,
}
result = get_entity_profile(registration="N36NE", include_nearby_context=False, include_news=False)
assert result["datalink"]["configured"] is True
assert "KABQ" in result["datalink"]["hints"][0]
def test_openclaw_get_entity_profile_command(sample_store):
from services.openclaw_channel import _dispatch_command
from services.fetchers import flights
flights.flight_trails["a0f011"] = {
"points": [[39.0, -97.0, 35000, 1000.0]],
"last_seen": 1000.0,
}
result = _dispatch_command("get_entity_profile", {"registration": "N36NE", "compact": True})
assert result["ok"] is True
assert result["data"]["identity"]["registration"] == "N36NE"
def test_jet_recon_playbook():
from services.openclaw_routing import plan_playbook
plan = plan_playbook("jet_recon", {"registration": "N424PX"})
assert plan["ok"] is True
assert plan["batch"][0]["cmd"] == "get_entity_profile"
assert plan["batch"][1]["cmd"] == "correlate_entity"
+132
View File
@@ -0,0 +1,132 @@
"""Tests for entity trail resolution."""
import pytest
from unittest.mock import patch
@pytest.fixture()
def sample_store():
from services.fetchers._store import latest_data, _data_lock
with _data_lock:
backup = {"tracked_flights": list(latest_data.get("tracked_flights") or [])}
latest_data["tracked_flights"] = [
{
"callsign": "AF1",
"registration": "82-8000",
"icao24": "adfdf8",
"alert_operator": "POTUS",
"type": "B744",
"lat": 38.95,
"lng": -77.45,
},
{
"callsign": "OXE2116",
"registration": "N36NE",
"icao24": "a0f011",
"operator": "Patriots",
"type": "Boeing 767-323ER",
"lat": 39.24,
"lng": -96.96,
},
]
latest_data["ships"] = [
{
"mmsi": 366999999,
"name": "BRAVO EUGENIA",
"lat": 25.77,
"lng": -80.13,
"shipType": 37,
}
]
try:
yield
finally:
with _data_lock:
latest_data["tracked_flights"] = backup["tracked_flights"]
def test_get_entity_trail_aircraft_with_mock_trail(sample_store):
from services.entity_trail import get_entity_trail
from services.fetchers import flights
flights.flight_trails["a0f011"] = {
"points": [
[39.0, -97.0, 35000, 1000.0],
[39.2, -96.9, 36000, 1060.0],
[39.24, -96.96, 37000, 1120.0],
],
"last_seen": 1120.0,
}
result = get_entity_trail(registration="N36NE", include_datalink=False)
assert result["status"] == "trail_available"
assert result["entity_kind"] == "aircraft"
assert len(result["trail"]) == 3
assert result["movement"]["bearing_deg"] is not None
assert "Trail points are observed" in result["notes"][0]
def test_get_entity_trail_ship(sample_store):
from services.entity_trail import get_entity_trail
from services import ais_stream
ais_stream._vessel_trails[366999999] = {
"points": [
[25.7, -80.1, 12, 2000.0],
[25.8, -80.2, 12, 2120.0],
],
"last_seen": 2120.0,
}
result = get_entity_trail(mmsi="366999999", entity_type="ship", include_datalink=False)
assert result["status"] == "trail_available"
assert result["entity_kind"] == "ship"
assert len(result["trail"]) == 2
@patch("services.fetchers.route_database.lookup_route")
def test_get_entity_trail_enriches_route_from_database(mock_lookup, sample_store):
from services.entity_trail import get_entity_trail
mock_lookup.return_value = {
"orig_name": "JFK: Kennedy",
"dest_name": "LAX: Los Angeles",
"orig_loc": [-73.78, 40.64],
"dest_loc": [-118.41, 33.94],
}
result = get_entity_trail(callsign="AF1", include_datalink=False)
assert result["route"]["source"] == "route_database"
assert "Kennedy" in result["route"]["origin_name"]
def test_correlate_entity_includes_movement(sample_store, monkeypatch):
from services import telemetry
from services.fetchers import flights
flights.flight_trails["adfdf8"] = {
"points": [[38.9, -77.5, 10000, 500.0], [38.95, -77.45, 12000, 560.0]],
"last_seen": 560.0,
}
result = telemetry.correlate_entity(callsign="AF1", entity_type="aircraft", radius_km=80, limit=5)
assert result["movement"]["trail_point_count"] == 2
assert len(result["movement"]["trail"]) == 2
def test_openclaw_get_entity_trail_command(sample_store, monkeypatch):
from services.openclaw_channel import _dispatch_command
from services.fetchers import flights
flights.flight_trails["a0f011"] = {
"points": [[39.0, -97.0, 35000, 1000.0]],
"last_seen": 1000.0,
}
result = _dispatch_command(
"get_entity_trail",
{"query": "patriots jet", "compact": True},
)
assert result["ok"] is True
assert result["data"]["status"] in {"trail_available", "resolved_without_trail"}
+1 -1
View File
@@ -85,7 +85,7 @@ def test_plan_playbook_track_snapshot_requires_query():
assert plan["ok"] is False
plan_ok = plan_playbook("track_snapshot", {"query": "patriots jet"})
assert plan_ok["ok"] is True
assert plan_ok["batch"][0]["cmd"] == "find_entity"
assert plan_ok["batch"][0]["cmd"] == "get_entity_profile"
def test_expensive_commands_set():
+4 -2
View File
@@ -25,7 +25,7 @@ three-tool surface:
| `await sb.run_playbook("hot_snapshot")` | Pre-batched snapshots (morning brief, monitor poll, status) |
| `await sb.channel_status()` | Liveness (~5 ms) — never `/api/health` |
**Latency tiers:** `find_entity` / `find_flights` / `search_news` / `entities_near`**⚡ &lt;30 ms**.
**Latency tiers:** `get_entity_profile` / `find_entity` / `get_entity_trail` / `find_flights` / `search_news` / `entities_near`**⚡ &lt;30 ms**.
`search_telemetry` / `get_telemetry` / `get_report`**🔴 seconds** — blocked unless `confirm_expensive=true`.
```python
@@ -46,7 +46,7 @@ batch = await sb.send_batch([
])
```
**Playbooks:** `hot_snapshot`, `morning_brief`, `status_check`, `monitor_heartbeat`, `track_snapshot`, `area_brief`, `entity_recon`.
**Playbooks:** `hot_snapshot`, `morning_brief`, `status_check`, `monitor_heartbeat`, `track_snapshot`, `jet_recon`, `area_brief`, `entity_recon`.
**Anti-patterns:** `search_telemetry` for known tail numbers; `get_telemetry` for routine polls; sequential `send_command` loops; empty `layers: []` on `get_layer_slice`.
@@ -157,6 +157,8 @@ The channel operates over HMAC-authenticated HTTP with body-integrity binding:
| `await sb.get_layer_slice(["ships", "gdelt"])` | Only the requested layers, with per-layer incremental | **Primary fetch method** — automatically skips layers you already have |
| `await sb.send_command("get_summary")` | Lightweight counts-only summary | Discover what data exists before pulling anything |
| `await sb.ask("...")` | **Route + execute** | **Default** for natural-language reads |
| `await sb.send_command("get_entity_profile", {...})` | **Preferred aircraft/vessel dossier** — identity, VIP tags, trail, route, ACARS, jamming/correlations, news | Tail, owner, callsign, MMSI |
| `await sb.send_command("get_entity_trail", {...})` | Observed path + route + ACARS hints only | When you don't need full dossier |
| `await sb.send_command("find_entity", {...})` | Exact-first entity resolver | Parsed person/tail/callsign/MMSI — skips fuzzy unless `fallback_search=true` |
| `await sb.send_command("find_flights", {...})` | Targeted flight search | When you know the domain (callsign, tail number) |
| `await sb.send_command("route_query", {...})` | Routing plan only | Inspect recommended command before executing |