diff --git a/backend/routers/ai_intel.py b/backend/routers/ai_intel.py index e380b6d..132aaa8 100644 --- a/backend/routers/ai_intel.py +++ b/backend/routers/ai_intel.py @@ -2276,12 +2276,14 @@ async def agent_tool_manifest(request: Request): async def api_capabilities(request: Request): """Return full API manifest so the agent knows every available endpoint.""" from services.openclaw_channel import READ_COMMANDS, WRITE_COMMANDS, detect_tier + from services.openclaw_routing import routing_manifest from services.config import get_settings tier = detect_tier() access_tier = str(get_settings().OPENCLAW_ACCESS_TIER or "restricted").strip().lower() return { "ok": True, "version": "0.9.82", + "routing": routing_manifest(), "auth": { "method": "HMAC-SHA256", "headers": ["X-SB-Timestamp", "X-SB-Nonce", "X-SB-Signature"], @@ -2397,8 +2399,16 @@ async def api_capabilities(request: Request): "description": "Compact server-side ship search by MMSI/IMO/name/query, including yacht-owner enrichment.", }, "find_entity": { - "args": {"query": "str (optional)", "entity_type": "aircraft|ship|person|event|infrastructure (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "layers": "list[str] (optional)", "limit": "int (default 10)"}, - "description": "Exact-first resolver for planes, ships, operators, callsigns, registrations, MMSI/IMO, and named entities. Use before tracking to avoid fuzzy prompt matching.", + "args": {"query": "str (optional)", "entity_type": "aircraft|ship|person|event|infrastructure (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "layers": "list[str] (optional)", "limit": "int (default 10)", "fallback_search": "bool (default false)", "confirm_fuzzy": "bool (alias for fallback_search)"}, + "description": "Exact-first resolver for planes, ships, operators, callsigns, registrations, MMSI/IMO, and named entities. Skips fuzzy search unless fallback_search=true or no exact match.", + }, + "route_query": { + "args": {"text": "str", "lat": "float (optional)", "lng": "float (optional)", "radius_km": "float (default 50)", "compact": "bool (default true)"}, + "description": "Deterministic intent router — returns recommended fast command, alternates, and latency estimate. Preferred entry for natural-language reads.", + }, + "run_playbook": { + "args": {"name": "str", "query": "str (optional)", "lat": "float (optional)", "lng": "float (optional)"}, + "description": "Execute a named batch plan (hot_snapshot, morning_brief, monitor_heartbeat, track_snapshot, area_brief, entity_recon).", }, "correlate_entity": { "args": {"query": "str (optional)", "entity_type": "str (optional)", "callsign": "str (optional)", "registration": "str (optional)", "icao24": "str (optional)", "mmsi": "str (optional)", "imo": "str (optional)", "name": "str (optional)", "owner": "str (optional)", "radius_km": "float (default 100)", "limit": "int (default 10)"}, @@ -2578,7 +2588,8 @@ async def api_capabilities(request: Request): "layers are serialized, unchanged layers transfer zero bytes. The client tracks versions " "automatically from SSE events and previous responses. " "3) Pass compact=true on every read command for compressed_v1 responses (~60-90% smaller). " - "4) Use targeted commands first (find_flights, search_telemetry, entities_near). " + "4) Use route_query / find_entity / run_playbook before search_telemetry. " + "Expensive commands require confirm_expensive=true. " "Reserve get_telemetry/get_slow_telemetry for rare full-context pulls.", "pins": "Pins are server-side, NOT localStorage. Use place_pin command or POST /api/ai/pins. The agent can place and delete pins.", "tracking": "To track a specific aircraft without polling: use add_watch with track_callsign or track_registration. Over SSE, you'll get instant push alerts.", diff --git a/backend/services/openclaw_channel.py b/backend/services/openclaw_channel.py index 1d91c3f..d67e7b5 100644 --- a/backend/services/openclaw_channel.py +++ b/backend/services/openclaw_channel.py @@ -87,6 +87,9 @@ READ_COMMANDS = frozenset({ "osint_lookup", "osint_tools", "entity_expand", + # Agent routing helpers + "route_query", + "run_playbook", }) WRITE_COMMANDS = frozenset({ @@ -643,6 +646,19 @@ def _compact_query_result(result: Any) -> Any: # Command dispatcher # --------------------------------------------------------------------------- +def _expensive_gate(cmd: str, args: dict[str, Any]) -> dict[str, Any] | None: + from services.openclaw_routing import EXPENSIVE_GATE_MESSAGE, requires_expensive_confirm + + if requires_expensive_confirm(cmd, args): + return { + "ok": False, + "detail": EXPENSIVE_GATE_MESSAGE, + "code": "expensive_command_blocked", + "hint": "route_query", + } + return None + + def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]: """Route a command to the appropriate AI Intel function. @@ -650,6 +666,43 @@ def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]: Commands run in an isolated thread (via _execute_command) so they do not need or touch the caller's event loop. """ + blocked = _expensive_gate(cmd, args) + if blocked is not None: + return blocked + + if cmd == "route_query": + from services.openclaw_routing import route_query + + result = route_query( + text=str(args.get("text", "") or args.get("query", "") or ""), + lat=args.get("lat"), + lng=args.get("lng"), + radius_km=float(args.get("radius_km", 50) or 50), + compact=bool(args.get("compact", True)), + ) + return {"ok": True, "data": result} + + if cmd == "run_playbook": + from services.openclaw_routing import plan_playbook + + plan = plan_playbook(str(args.get("name", "") or args.get("playbook", "")), args) + if not plan.get("ok"): + return plan + batch_results: list[dict[str, Any]] = [] + for item in plan.get("batch", []): + inner_cmd = str(item.get("cmd", "")).strip().lower() + inner_args = item.get("args") or {} + inner_result = _dispatch_command(inner_cmd, inner_args) + batch_results.append({"cmd": inner_cmd, **inner_result}) + return { + "ok": True, + "data": { + "playbook": plan.get("playbook"), + "description": plan.get("description", ""), + "results": batch_results, + }, + } + if cmd == "get_telemetry": from services.telemetry import get_cached_telemetry_refs data = get_cached_telemetry_refs() @@ -731,6 +784,7 @@ def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]: owner=str(args.get("owner", "") or args.get("operator", "") or ""), layers=args.get("layers") if isinstance(args.get("layers"), (list, tuple)) else None, limit=args.get("limit", 10), + fallback_search=bool(args.get("fallback_search") or args.get("confirm_fuzzy")), ) if _wants_compact(args): compact = dict(result) @@ -1092,6 +1146,7 @@ def _dispatch_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]: owner=str(args.get("owner", "") or args.get("operator", "") or ""), layers=args.get("layers") if isinstance(args.get("layers"), (list, tuple)) else None, limit=5, + fallback_search=True, ) best = lookup.get("best_match") if isinstance(lookup.get("best_match"), dict) else {} group = str(best.get("group", "") or entity_type).lower() diff --git a/backend/services/openclaw_routing.py b/backend/services/openclaw_routing.py new file mode 100644 index 0000000..fb037b6 --- /dev/null +++ b/backend/services/openclaw_routing.py @@ -0,0 +1,500 @@ +"""Deterministic OpenClaw routing — intent → fastest command. + +Keeps expensive fuzzy scans and full-layer dumps out of the default agent path. +""" + +from __future__ import annotations + +import re +from typing import Any + +EXPENSIVE_COMMANDS = frozenset({ + "search_telemetry", + "get_telemetry", + "get_slow_telemetry", + "get_report", +}) + +EXPENSIVE_GATE_MESSAGE = ( + "expensive command blocked — use route_query, find_entity, run_playbook, or targeted reads. " + "Pass confirm_expensive=true only when fuzzy search or full dumps are intentional." +) + +LATENCY_TIER_MS: dict[str, int] = { + "channel_status": 5, + "route_query": 5, + "get_summary": 10, + "what_changed": 15, + "search_news": 15, + "find_flights": 25, + "find_ships": 25, + "find_entity": 30, + "entities_near": 30, + "brief_area": 30, + "get_layer_slice": 50, + "correlate_entity": 15, + "entity_expand": 40, + "osint_lookup": 200, + "run_playbook": 120, + "search_telemetry": 8000, + "get_telemetry": 3500, + "get_slow_telemetry": 1500, + "get_report": 5000, +} + +RE_N_NUMBER = re.compile(r"\bN\d{1,5}[A-Z]{0,2}\b", re.I) +RE_CALLSIGN = re.compile(r"\b[A-Z]{2,4}\d{1,4}[A-Z]?\b") +RE_MMSI = re.compile(r"\b\d{9}\b") +RE_CVE = re.compile(r"\bCVE-\d{4}-\d+\b", re.I) +RE_IPV4 = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") +RE_DOMAIN = re.compile( + r"\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{2,})\b", + re.I, +) + +KNOWN_CALLSIGNS = frozenset({ + "AF1", "AF2", "EXEC1", "EXEC2", "SAM", "STALK52", "SPAR19", "SPAR20", +}) + +PLAYBOOKS: dict[str, dict[str, Any]] = { + "hot_snapshot": { + "description": "Summary + hot layers + what changed (one batch)", + "batch": [ + {"cmd": "get_summary", "args": {"compact": True}}, + { + "cmd": "get_layer_slice", + "args": { + "layers": [ + "news", + "telegram_osint", + "military_flights", + "private_jets", + "earthquakes", + ], + "limit_per_layer": 10, + "compact": True, + }, + }, + {"cmd": "what_changed", "args": {"compact": True}}, + ], + }, + "status_check": { + "description": "Channel health + layer counts", + "batch": [ + {"cmd": "channel_status", "args": {}}, + {"cmd": "get_summary", "args": {"compact": True}}, + ], + }, + "morning_brief": { + "description": "Operator morning digest layers", + "batch": [ + {"cmd": "get_summary", "args": {"compact": True}}, + {"cmd": "what_changed", "args": {"compact": True}}, + { + "cmd": "get_layer_slice", + "args": { + "layers": [ + "news", + "telegram_osint", + "gdelt", + "earthquakes", + "crowdthreat", + "military_flights", + ], + "limit_per_layer": 15, + "compact": True, + }, + }, + ], + }, + "monitor_heartbeat": { + "description": "Low-latency monitor poll (replaces full telemetry pull)", + "batch": [ + {"cmd": "what_changed", "args": {"compact": True}}, + { + "cmd": "get_layer_slice", + "args": { + "layers": [ + "military_flights", + "ships", + "earthquakes", + "liveuamap", + "crowdthreat", + "uap_sightings", + "firms_fires", + "gps_jamming", + "wastewater", + ], + "limit_per_layer": 200, + "compact": True, + }, + }, + ], + }, +} + + +def routing_manifest() -> dict[str, Any]: + """Machine-readable routing hints for /api/ai/capabilities.""" + return { + "default_read": "find_entity", + "preferred_entry": "route_query", + "client_wrapper": "ShadowBrokerClient.ask", + "batch_playbook": "run_playbook", + "last_resort": "search_telemetry", + "expensive_commands": sorted(EXPENSIVE_COMMANDS), + "latency_tier_ms": LATENCY_TIER_MS, + "anti_patterns": [ + "search_telemetry for known tail numbers, callsigns, owners, or MMSI", + "get_telemetry for routine reads — use get_layer_slice or run_playbook hot_snapshot", + "sequential send_command loops — use send_batch or run_playbook", + "/api/health for liveness — use channel_status", + "empty layers: [] on get_layer_slice — pass explicit layer names", + ], + "recipes": [ + { + "intent": "natural language question", + "use": "route_query → recommended cmd, or ShadowBrokerClient.ask()", + }, + { + "intent": "known person/aircraft", + "use": "find_entity(query=...) or find_flights(owner=...)", + }, + { + "intent": "news / telegram topic", + "use": "search_news(query=...)", + }, + { + "intent": "near a point", + "use": "entities_near or brief_area", + }, + { + "intent": "hot snapshot", + "use": "run_playbook(name=hot_snapshot)", + }, + ], + "playbooks": { + name: {"description": spec.get("description", "")} + for name, spec in PLAYBOOKS.items() + }, + "agent_surface": { + "primary": ["ask", "send_batch", "channel_status"], + "writes": [ + "place_pin", + "add_watch", + "inject_data", + "place_analysis_zone", + ], + }, + } + + +def requires_expensive_confirm(cmd: str, args: dict[str, Any] | None) -> bool: + if cmd not in EXPENSIVE_COMMANDS: + return False + if isinstance(args, dict) and args.get("confirm_expensive") is True: + return False + return True + + +def _compact_args(args: dict[str, Any], *, compact: bool) -> dict[str, Any]: + out = dict(args) + if compact and "compact" not in out: + out["compact"] = True + return out + + +def _estimate_ms(cmd: str) -> int: + return int(LATENCY_TIER_MS.get(cmd, 100)) + + +def _news_query(text: str) -> str: + cleaned = text + for prefix in ( + "news about", + "news on", + "telegram", + "headlines about", + "headlines on", + "latest on", + "search news for", + ): + if cleaned.lower().startswith(prefix): + cleaned = cleaned[len(prefix):].strip() + return cleaned.strip(" ?.") + + +def route_query( + text: str = "", + *, + lat: float | None = None, + lng: float | None = None, + radius_km: float = 50, + compact: bool = True, +) -> dict[str, Any]: + """Map natural-language intent to the fastest command (no LLM).""" + raw = str(text or "").strip() + lowered = raw.lower() + avoid = ["search_telemetry", "get_telemetry", "get_slow_telemetry"] + alternates: list[dict[str, Any]] = [] + + if not raw and lat is not None and lng is not None: + recommended = { + "cmd": "brief_area", + "args": _compact_args( + {"lat": lat, "lng": lng, "radius_km": radius_km}, + compact=compact, + ), + } + return { + "intent": "area_brief", + "recommended": recommended, + "alternates": [{"cmd": "entities_near", "args": recommended["args"]}], + "avoid": avoid, + "estimated_ms": _estimate_ms("brief_area"), + } + + if not raw: + recommended = {"cmd": "get_summary", "args": _compact_args({}, compact=compact)} + return { + "intent": "discovery", + "recommended": recommended, + "alternates": [{"cmd": "channel_status", "args": {}}], + "avoid": avoid, + "estimated_ms": _estimate_ms("get_summary"), + } + + cve_match = RE_CVE.search(raw) + if cve_match: + recommended = { + "cmd": "osint_lookup", + "args": _compact_args({"tool": "cve", "cve": cve_match.group(0).upper()}, compact=compact), + } + return _route_result("cve_lookup", recommended, avoid, alternates) + + ip_match = RE_IPV4.search(raw) + if ip_match and ("ip" in lowered or "address" in lowered or lowered.count(".") >= 3): + recommended = { + "cmd": "osint_lookup", + "args": _compact_args({"tool": "ip", "ip": ip_match.group(0)}, compact=compact), + } + alternates.append({"cmd": "entity_expand", "args": {"type": "ip", "id": ip_match.group(0)}}) + return _route_result("ip_lookup", recommended, avoid, alternates) + + if "whois" in lowered or ("dns" in lowered and RE_DOMAIN.search(raw)): + domain = (RE_DOMAIN.search(raw) or re.search(r"\b([a-z0-9-]+\.[a-z]{2,})\b", raw, re.I)) + tool = "whois" if "whois" in lowered else "dns" + domain_value = domain.group(0) if domain else raw + recommended = { + "cmd": "osint_lookup", + "args": _compact_args({"tool": tool, "domain": domain_value}, compact=compact), + } + return _route_result("domain_lookup", recommended, avoid, alternates) + + if "sanction" in lowered or "ofac" in lowered: + recommended = { + "cmd": "osint_lookup", + "args": _compact_args({"tool": "sanctions", "query": raw}, compact=compact), + } + return _route_result("sanctions_lookup", recommended, avoid, alternates) + + mmsi_match = RE_MMSI.search(raw) + if mmsi_match and any(k in lowered for k in ("mmsi", "ship", "vessel", "yacht", "boat", "maritime")): + recommended = { + "cmd": "find_ships", + "args": _compact_args({"mmsi": mmsi_match.group(0)}, compact=compact), + } + alternates.append({"cmd": "find_entity", "args": {"mmsi": mmsi_match.group(0), "entity_type": "ship"}}) + return _route_result("maritime_identifier", recommended, avoid, alternates) + + n_match = RE_N_NUMBER.search(raw) + if n_match: + reg = n_match.group(0).upper() + recommended = { + "cmd": "find_flights", + "args": _compact_args({"registration": reg}, compact=compact), + } + alternates.append({"cmd": "find_entity", "args": {"registration": reg, "entity_type": "aircraft"}}) + return _route_result("tail_number", recommended, avoid, alternates) + + # callsign tokens + tokens = re.findall(r"\b[A-Z0-9]{2,8}\b", raw.upper()) + for token in tokens: + if token in KNOWN_CALLSIGNS or RE_CALLSIGN.fullmatch(token): + recommended = { + "cmd": "find_flights", + "args": _compact_args({"callsign": token}, compact=compact), + } + alternates.append({"cmd": "find_entity", "args": {"callsign": token, "entity_type": "aircraft"}}) + return _route_result("callsign", recommended, avoid, alternates) + + if any(k in lowered for k in ("news", "telegram", "headline", "headlines", "gdelt")): + recommended = { + "cmd": "search_news", + "args": _compact_args({"query": _news_query(raw), "limit": 10}, compact=compact), + } + alternates.append({ + "cmd": "get_layer_slice", + "args": {"layers": ["telegram_osint", "news"], "limit_per_layer": 10, "compact": compact}, + }) + return _route_result("news_search", recommended, avoid, alternates) + + if lat is not None and lng is not None and any( + k in lowered for k in ("near", "around", "within", "radius", "brief", "aoi") + ): + recommended = { + "cmd": "brief_area", + "args": _compact_args( + {"lat": lat, "lng": lng, "radius_km": radius_km, "query": raw}, + compact=compact, + ), + } + alternates.append({ + "cmd": "entities_near", + "args": {"lat": lat, "lng": lng, "radius_km": radius_km, "compact": compact}, + }) + return _route_result("area_brief", recommended, avoid, alternates) + + if any(k in lowered for k in ("what changed", "updates", "delta", "since last")): + recommended = {"cmd": "what_changed", "args": _compact_args({}, compact=compact)} + return _route_result("incremental_poll", recommended, avoid, alternates) + + if any(k in lowered for k in ("summary", "status", "layers populated", "what data")): + recommended = {"cmd": "get_summary", "args": _compact_args({}, compact=compact)} + alternates.append({"cmd": "channel_status", "args": {}}) + return _route_result("discovery", recommended, avoid, alternates) + + if any(k in lowered for k in ("recon", "whois", "dns lookup", "cve", "mac address")): + recommended = { + "cmd": "osint_tools", + "args": {}, + } + return _route_result("recon_discovery", recommended, avoid, alternates) + + entity_type = "" + if any(k in lowered for k in ("ship", "vessel", "yacht", "boat", "maritime", "carrier")): + entity_type = "ship" + elif any(k in lowered for k in ("jet", "plane", "flight", "aircraft", "helicopter", "tail")): + entity_type = "aircraft" + + owner_hint = "" + if any(k in lowered for k in ("owner", "operated by", "'s jet", "'s yacht", "belongs to")): + owner_hint = raw + for phrase in ("where is", "find", "track", "locate", "jet", "yacht", "plane", "flight", "ship"): + owner_hint = re.sub(rf"\b{phrase}\b", "", owner_hint, flags=re.I).strip() + + entity_args: dict[str, Any] = {"query": raw, "compact": compact} + if entity_type: + entity_args["entity_type"] = entity_type + if owner_hint and len(owner_hint) >= 3: + entity_args["owner"] = owner_hint + + recommended = { + "cmd": "find_entity", + "args": _compact_args(entity_args, compact=compact), + } + alternates = [ + {"cmd": "search_news", "args": {"query": raw, "limit": 10, "compact": compact}}, + ] + if any(k in lowered for k in ("near", "around")): + alternates.append({ + "cmd": "search_telemetry", + "args": {"query": raw, "limit": 10, "confirm_expensive": True, "compact": compact}, + }) + + return _route_result("entity_lookup", recommended, avoid, alternates) + + +def _route_result( + intent: str, + recommended: dict[str, Any], + avoid: list[str], + alternates: list[dict[str, Any]], +) -> dict[str, Any]: + cmd = str(recommended.get("cmd", "")) + return { + "intent": intent, + "recommended": recommended, + "alternates": alternates, + "avoid": avoid, + "estimated_ms": _estimate_ms(cmd), + } + + +def plan_playbook(name: str, args: dict[str, Any] | None = None) -> dict[str, Any]: + """Resolve a named playbook to a command batch.""" + playbook = str(name or "").strip().lower() + params = dict(args or {}) + if not playbook: + return {"ok": False, "detail": "playbook name required"} + + if playbook == "track_snapshot": + query = str(params.get("query", "") or params.get("name", "") or "").strip() + if not query: + return {"ok": False, "detail": "track_snapshot requires query"} + return { + "ok": True, + "playbook": playbook, + "description": "Resolve entity for tracking", + "batch": [ + { + "cmd": "find_entity", + "args": { + "query": query, + "entity_type": params.get("entity_type", ""), + "fallback_search": True, + "compact": True, + }, + } + ], + } + + if playbook == "area_brief": + lat = params.get("lat") + lng = params.get("lng") + if lat is None or lng is None: + return {"ok": False, "detail": "area_brief requires lat and lng"} + return { + "ok": True, + "playbook": playbook, + "description": "Brief an area of interest", + "batch": [ + { + "cmd": "brief_area", + "args": { + "lat": lat, + "lng": lng, + "radius_km": params.get("radius_km", 50), + "query": params.get("query", ""), + "compact": True, + }, + } + ], + } + + if playbook == "entity_recon": + query = str(params.get("query", "") or params.get("ip", "") or "").strip() + ip_match = RE_IPV4.search(query) + if not ip_match: + return {"ok": False, "detail": "entity_recon requires an IP in query"} + return { + "ok": True, + "playbook": playbook, + "description": "IP recon + entity graph", + "batch": [ + {"cmd": "osint_lookup", "args": {"tool": "ip", "ip": ip_match.group(0), "compact": True}}, + {"cmd": "entity_expand", "args": {"type": "ip", "id": ip_match.group(0)}}, + ], + } + + spec = PLAYBOOKS.get(playbook) + if not spec: + known = sorted(PLAYBOOKS) + ["track_snapshot", "area_brief", "entity_recon"] + return {"ok": False, "detail": f"unknown playbook: {playbook}", "known": known} + + return { + "ok": True, + "playbook": playbook, + "description": spec.get("description", ""), + "batch": [dict(item) for item in spec.get("batch", [])], + } diff --git a/backend/services/telemetry.py b/backend/services/telemetry.py index effcded..03f37a6 100644 --- a/backend/services/telemetry.py +++ b/backend/services/telemetry.py @@ -1549,11 +1549,13 @@ def find_entity( owner: str = "", layers: list[str] | tuple[str, ...] | None = None, limit: int = 10, + fallback_search: bool = False, ) -> dict[str, Any]: """Find a named entity across aircraft, maritime, and general telemetry. This is an intent-level lookup for agents. It tries high-precision - aircraft/ship fields first, then falls back to the universal search index. + aircraft/ship fields first, then optionally falls back to the universal + search index only when ``fallback_search`` is True (opt-in fuzzy scan). """ effective_query = str(query or name or owner or callsign or registration or icao24 or mmsi or imo or "").strip() if not effective_query: @@ -1628,16 +1630,18 @@ def find_entity( seen.add(key) results.append(normalized) - search_layers = requested_layers or _entity_layers_for_type(entity_type) - search_result = search_telemetry(query=effective_query, layers=search_layers, limit=limit) - if search_result.get("results"): - strategies.append("universal_index") - for item in search_result.get("results") or []: - normalized = _normalize_entity_result(item) - key = _entity_key(normalized) - if key not in seen: - seen.add(key) - results.append(normalized) + search_layers = list(requested_layers or _entity_layers_for_type(entity_type) or []) + search_result: dict[str, Any] = {"results": [], "searched_layers": search_layers} + if fallback_search: + search_result = search_telemetry(query=effective_query, layers=search_layers, limit=limit) + if search_result.get("results"): + strategies.append("universal_index") + for item in search_result.get("results") or []: + normalized = _normalize_entity_result(item) + key = _entity_key(normalized) + if key not in seen: + seen.add(key) + results.append(normalized) results.sort( key=lambda item: ( diff --git a/backend/tests/test_openclaw_query_helpers.py b/backend/tests/test_openclaw_query_helpers.py index 1093bc8..72f5b5f 100644 --- a/backend/tests/test_openclaw_query_helpers.py +++ b/backend/tests/test_openclaw_query_helpers.py @@ -466,15 +466,55 @@ def test_find_entity_prioritizes_aircraft_operator_and_callsign(sample_store, mo monkeypatch.setattr(telemetry, "get_data_version", lambda: 130) - by_operator = telemetry.find_entity(query="patriots jet", limit=5) + by_operator = telemetry.find_entity(owner="Patriots", limit=5) assert by_operator["best_match"]["group"] == "aircraft" assert by_operator["best_match"]["label"] == "OXE2116" + fuzzy = telemetry.find_entity(query="patriots jet", limit=5, fallback_search=True) + assert fuzzy["best_match"]["group"] == "aircraft" + by_callsign = telemetry.find_entity(callsign="AF1", entity_type="aircraft", limit=5) assert by_callsign["best_match"]["callsign"] == "AF1" assert by_callsign["best_match"]["alert_operator"] == "POTUS" +def test_find_entity_skips_fuzzy_when_exact_match(sample_store, monkeypatch): + import services.telemetry as telemetry + + monkeypatch.setattr(telemetry, "get_data_version", lambda: 200) + calls: list[str] = [] + + def _fake_search(*_args, **_kwargs): + calls.append("search") + return {"results": [], "searched_layers": []} + + monkeypatch.setattr(telemetry, "search_telemetry", _fake_search) + + result = telemetry.find_entity(callsign="AF1", entity_type="aircraft", fallback_search=False) + assert result["best_match"]["callsign"] == "AF1" + assert calls == [] + + +def test_find_entity_fuzzy_only_when_fallback_or_empty(sample_store, monkeypatch): + import services.telemetry as telemetry + + monkeypatch.setattr(telemetry, "get_data_version", lambda: 201) + calls: list[str] = [] + + def _fake_search(*_args, **_kwargs): + calls.append("search") + return {"results": [], "searched_layers": []} + + monkeypatch.setattr(telemetry, "search_telemetry", _fake_search) + + empty = telemetry.find_entity(query="zzzznonexistententity", fallback_search=False) + assert empty["best_match"] is None + assert calls == [] + + telemetry.find_entity(query="zzzznonexistententity", fallback_search=True) + assert calls == ["search"] + + def test_find_entity_prioritizes_maritime_owner_and_identifiers(sample_store, monkeypatch): import services.telemetry as telemetry diff --git a/backend/tests/test_openclaw_routing.py b/backend/tests/test_openclaw_routing.py new file mode 100644 index 0000000..0765856 --- /dev/null +++ b/backend/tests/test_openclaw_routing.py @@ -0,0 +1,93 @@ +"""OpenClaw routing, playbooks, and expensive-command gate.""" + +from __future__ import annotations + +from services.openclaw_channel import _dispatch_command +from services.openclaw_routing import ( + EXPENSIVE_COMMANDS, + plan_playbook, + requires_expensive_confirm, + route_query, + routing_manifest, +) + + +def test_routing_manifest_has_agent_surface(): + manifest = routing_manifest() + assert manifest["preferred_entry"] == "route_query" + assert manifest["client_wrapper"] == "ShadowBrokerClient.ask" + assert "search_telemetry" in manifest["expensive_commands"] + assert "hot_snapshot" in manifest["playbooks"] + + +def test_route_query_tail_number(): + plan = route_query("track N628TS position") + assert plan["recommended"]["cmd"] == "find_flights" + assert plan["recommended"]["args"]["registration"] == "N628TS" + assert "search_telemetry" in plan["avoid"] + + +def test_route_query_callsign(): + plan = route_query("where is AF1 right now") + assert plan["recommended"]["cmd"] == "find_flights" + assert plan["recommended"]["args"]["callsign"] == "AF1" + + +def test_route_query_news(): + plan = route_query("telegram news about Iran tanker") + assert plan["recommended"]["cmd"] == "search_news" + + +def test_route_query_cve(): + plan = route_query("details for CVE-2024-1234") + assert plan["recommended"]["cmd"] == "osint_lookup" + assert plan["recommended"]["args"]["tool"] == "cve" + + +def test_route_query_default_entity(): + plan = route_query("where is the patriots jet") + assert plan["recommended"]["cmd"] == "find_entity" + assert plan["recommended"]["args"]["query"] + + +def test_expensive_gate_blocks_search_telemetry(): + assert requires_expensive_confirm("search_telemetry", {"query": "test"}) + assert not requires_expensive_confirm( + "search_telemetry", + {"query": "test", "confirm_expensive": True}, + ) + result = _dispatch_command("search_telemetry", {"query": "test"}) + assert result["ok"] is False + assert result.get("code") == "expensive_command_blocked" + + +def test_expensive_gate_blocks_get_telemetry(): + result = _dispatch_command("get_telemetry", {}) + assert result["ok"] is False + assert result.get("code") == "expensive_command_blocked" + + +def test_dispatch_route_query(): + result = _dispatch_command("route_query", {"text": "news about carrier strike"}) + assert result["ok"] is True + assert result["data"]["recommended"]["cmd"] == "search_news" + + +def test_dispatch_run_playbook_hot_snapshot(): + result = _dispatch_command("run_playbook", {"name": "status_check"}) + assert result["ok"] is True + cmds = [item["cmd"] for item in result["data"]["results"]] + assert cmds == ["channel_status", "get_summary"] + + +def test_plan_playbook_track_snapshot_requires_query(): + plan = plan_playbook("track_snapshot", {}) + assert plan["ok"] is False + plan_ok = plan_playbook("track_snapshot", {"query": "patriots jet"}) + assert plan_ok["ok"] is True + assert plan_ok["batch"][0]["cmd"] == "find_entity" + + +def test_expensive_commands_set(): + assert "get_report" in EXPENSIVE_COMMANDS + assert "route_query" not in EXPENSIVE_COMMANDS diff --git a/openclaw-skills/shadowbroker/SKILL.md b/openclaw-skills/shadowbroker/SKILL.md index e36a23e..d815542 100644 --- a/openclaw-skills/shadowbroker/SKILL.md +++ b/openclaw-skills/shadowbroker/SKILL.md @@ -14,6 +14,44 @@ running on `localhost:8000`. It tracks military flights, ships, satellites, SIGI earthquakes, fires, GDELT conflict events, prediction markets, and 30+ other data layers — all with geographic coordinates. +## Agent Fast Path (read first) + +ShadowBroker exposes dozens of read commands. **Do not explore them.** Use the +three-tool surface: + +| Tool | When | +|------|------| +| `await sb.ask("natural language question")` | **Default for reads** — server routes to fastest command | +| `await sb.run_playbook("hot_snapshot")` | Pre-batched snapshots (morning brief, monitor poll, status) | +| `await sb.channel_status()` | Liveness (~5 ms) — never `/api/health` | + +**Latency tiers:** `find_entity` / `find_flights` / `search_news` / `entities_near` → **⚡ <30 ms**. +`search_telemetry` / `get_telemetry` / `get_report` → **🔴 seconds** — blocked unless `confirm_expensive=true`. + +```python +# Default read path (route + execute) +answer = await sb.ask("where is the Patriots jet") + +# Named batch plans +brief = await sb.run_playbook("hot_snapshot") +monitor = await sb.run_playbook("monitor_heartbeat") + +# Structured lookup when you already parsed fields +entity = await sb.send_command("find_entity", {"owner": "musk", "compact": True}) + +# Multi-command — always batch, never sequential loops +batch = await sb.send_batch([ + {"cmd": "get_summary", "args": {"compact": True}}, + {"cmd": "what_changed", "args": {"compact": True}}, +]) +``` + +**Playbooks:** `hot_snapshot`, `morning_brief`, `status_check`, `monitor_heartbeat`, `track_snapshot`, `area_brief`, `entity_recon`. + +**Anti-patterns:** `search_telemetry` for known tail numbers; `get_telemetry` for routine polls; sequential `send_command` loops; empty `layers: []` on `get_layer_slice`. + +Load machine-readable routing hints once: `GET /api/ai/capabilities` → `routing`. + ## How to Use This Skill Import the client and call methods: @@ -118,8 +156,11 @@ The channel operates over HMAC-authenticated HTTP with body-integrity binding: | `sb.stream_updates()` | SSE push: `layer_changed`, alerts, tasks | **Open first, keep open** — tells you exactly which layers updated | | `await sb.get_layer_slice(["ships", "gdelt"])` | Only the requested layers, with per-layer incremental | **Primary fetch method** — automatically skips layers you already have | | `await sb.send_command("get_summary")` | Lightweight counts-only summary | Discover what data exists before pulling anything | +| `await sb.ask("...")` | **Route + execute** | **Default** for natural-language reads | +| `await sb.send_command("find_entity", {...})` | Exact-first entity resolver | Parsed person/tail/callsign/MMSI — skips fuzzy unless `fallback_search=true` | | `await sb.send_command("find_flights", {...})` | Targeted flight search | When you know the domain (callsign, tail number) | -| `await sb.send_command("search_telemetry", {...})` | Cross-layer keyword search | When you don't know which layer has the answer | +| `await sb.send_command("route_query", {...})` | Routing plan only | Inspect recommended command before executing | +| `await sb.send_command("search_telemetry", {...})` | Cross-layer fuzzy search | **Last resort** — requires `confirm_expensive=true` | **Full telemetry dumps (use sparingly — large payloads):** @@ -598,10 +639,10 @@ When the user asks a question, follow this decision tree: fresh data, pushes alerts instantly, and eliminates blind polling. 2. **Does ShadowBroker have this data already?** - - **Start with `get_summary()`** to see what layers are populated and their counts. - - **Known domain** (flight callsign, ship name, keyword) → use the targeted command: - `find_flights`, `find_ships`, `search_news`, `entities_near`, `search_telemetry` - - **Unknown domain** → `search_telemetry` (cross-layer keyword search, ranked results) + - **Natural language** → `await sb.ask(question)` (routes server-side) + - **Batch snapshot** → `await sb.run_playbook("hot_snapshot")` + - **Known domain** → `find_entity`, `find_flights`, `find_ships`, `search_news`, `entities_near` + - **Unknown domain** → `find_entity` first; only then `search_telemetry` with `confirm_expensive=true` - **Need specific layers** → `get_layer_slice(["military_flights", "gdelt"])` — only fetches layers that changed since your last call (per-layer incremental). - **Near a location** → `entities_near()` or `get_near_me()` (scans all layers within radius) diff --git a/openclaw-skills/shadowbroker/sb_monitor.py b/openclaw-skills/shadowbroker/sb_monitor.py index bbb3c20..ce3b3cd 100644 --- a/openclaw-skills/shadowbroker/sb_monitor.py +++ b/openclaw-skills/shadowbroker/sb_monitor.py @@ -424,6 +424,27 @@ def query_snapshots( return results +# --------------------------------------------------------------------------- +# Playbook flattening (monitor_heartbeat → detect_anomalies shape) +# --------------------------------------------------------------------------- + +def _layers_from_playbook(playbook_data: dict) -> dict: + """Turn run_playbook(monitor_heartbeat) results into a layer-keyed dict.""" + merged: dict[str, list] = {} + if not isinstance(playbook_data, dict): + return merged + for item in playbook_data.get("results") or []: + if not isinstance(item, dict) or not item.get("ok"): + continue + payload = item.get("data") if isinstance(item.get("data"), dict) else {} + if item.get("cmd") == "get_layer_slice": + layers = payload.get("layers") if isinstance(payload.get("layers"), dict) else {} + for layer, rows in layers.items(): + if isinstance(rows, list): + merged[layer] = rows + return merged + + # --------------------------------------------------------------------------- # Main heartbeat handler # --------------------------------------------------------------------------- @@ -437,8 +458,14 @@ async def heartbeat(sb_client) -> list[str]: messages = [] try: - # 1. Pull fresh telemetry (fast + slow merged for full visibility) - data = await sb_client.get_full_telemetry() + # 1. Low-latency monitor poll (playbook) — fallback to legacy full pull + try: + playbook_data = await sb_client.run_playbook("monitor_heartbeat", {}) + data = _layers_from_playbook(playbook_data) + except Exception: + data = {} + if not data: + data = await sb_client.get_full_telemetry() # 2. Run anomaly detection anomalies = detect_anomalies(data, _state) diff --git a/openclaw-skills/shadowbroker/sb_query.py b/openclaw-skills/shadowbroker/sb_query.py index 9960e5a..454a9d8 100644 --- a/openclaw-skills/shadowbroker/sb_query.py +++ b/openclaw-skills/shadowbroker/sb_query.py @@ -270,6 +270,82 @@ class ShadowBrokerClient: r = await self._get("/api/ai/channel/status") return r.json() + @staticmethod + def unwrap_channel_result(resp: dict) -> dict: + """Extract inner command payload from /api/ai/channel/command response.""" + if not isinstance(resp, dict): + return {} + result = resp.get("result") + if not isinstance(result, dict): + return {} + if result.get("ok"): + data = result.get("data") + return data if isinstance(data, dict) else {} + return result + + async def route_query( + self, + text: str, + *, + lat: float | None = None, + lng: float | None = None, + radius_km: float = 50, + compact: bool = True, + ) -> dict: + """Server-side intent routing — returns recommended command (no LLM).""" + args: dict[str, Any] = {"text": text, "radius_km": radius_km, "compact": compact} + if lat is not None: + args["lat"] = lat + if lng is not None: + args["lng"] = lng + resp = await self.send_command("route_query", args) + return self.unwrap_channel_result(resp) + + async def run_playbook(self, name: str, args: dict | None = None) -> dict: + """Execute a named server playbook (batched, concurrent).""" + payload = {"name": name, **(args or {})} + resp = await self.send_command("run_playbook", payload) + return self.unwrap_channel_result(resp) + + async def ask( + self, + question: str, + *, + lat: float | None = None, + lng: float | None = None, + radius_km: float = 50, + execute: bool = True, + ) -> dict: + """Natural-language read: route_query → recommended command (one round-trip or two).""" + route = await self.route_query( + question, + lat=lat, + lng=lng, + radius_km=radius_km, + compact=True, + ) + if not route: + return {"ok": False, "detail": "route_query returned no plan"} + + if not execute: + return {"ok": True, "route": route} + + recommended = route.get("recommended") or {} + cmd = str(recommended.get("cmd", "") or "").strip() + cmd_args = recommended.get("args") or {} + if not cmd: + return {"ok": False, "detail": "route produced no command", "route": route} + + exec_resp = await self.send_command(cmd, cmd_args) + exec_inner = exec_resp.get("result") if isinstance(exec_resp.get("result"), dict) else {} + return { + "ok": bool(exec_resp.get("ok") and exec_inner.get("ok")), + "route": route, + "command": cmd, + "args": cmd_args, + "result": exec_inner, + } + async def send_batch(self, commands: list[dict]) -> dict: """Send multiple commands in a single HTTP round-trip. diff --git a/openclaw-skills/shadowbroker/skill.yaml b/openclaw-skills/shadowbroker/skill.yaml index 11e810b..6204082 100644 --- a/openclaw-skills/shadowbroker/skill.yaml +++ b/openclaw-skills/shadowbroker/skill.yaml @@ -24,6 +24,22 @@ entry_points: requirements: - httpx>=0.25.0 +# Thin agent surface — expose only these to the LLM tool picker +agent_surface: + primary_reads: + - sb_query.ShadowBrokerClient.ask + - sb_query.ShadowBrokerClient.run_playbook + - sb_query.ShadowBrokerClient.send_batch + - sb_query.ShadowBrokerClient.channel_status + writes: + - sb_query.ShadowBrokerClient.place_pin + - sb_query.ShadowBrokerClient.place_pins_batch + blocked_without_confirm: + - search_telemetry + - get_telemetry + - get_slow_telemetry + - get_report + # Capabilities declared capabilities: - live_telemetry # Real-time OSINT data (flights, ships, SIGINT, etc.)