data: sync plane-alert VIP tracking with real names only

Import oligarchs, royals, and curated celebrities from plane-alert-db while excluding joke tag labels from tracked_names. Sync plane_alert_db.json metadata, add import scripts, and map oligarch/royal/celebrity colors in the legend.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-24 20:01:35 -06:00
parent f30ffe2c77
commit a0c79c2044
9 changed files with 7086 additions and 4140 deletions
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+6 -2
View File
@@ -49,11 +49,11 @@ _CATEGORY_COLOR: dict[str, str] = {
"Head of State": "#ff1493",
"Royal Aircraft": "#ff1493",
"Don't you know who I am?": "#ff1493",
"As Seen on TV": "#ff1493",
"Bizjets": "#ff1493",
"Vanity Plate": "#ff1493",
"Football": "#ff1493",
# ORANGE — Joe Cool
# ORANGE — corporate / novelty / Joe Cool / As Seen on TV
"As Seen on TV": "orange",
"Joe Cool": "orange",
# WHITE — Climate Crisis
"Climate Crisis": "white",
@@ -338,6 +338,10 @@ def enrich_with_tracked_names(flight: dict) -> dict:
flight["alert_color"] = "blue"
elif is_med:
flight["alert_color"] = "#32cd32"
elif match.get("category") == "Oligarch":
flight["alert_color"] = "red"
elif match.get("category") in {"Royal", "Celebrity", "People"}:
flight["alert_color"] = "#ff1493"
elif "alert_color" not in flight:
flight["alert_color"] = "pink"
+1 -1
View File
@@ -113,7 +113,7 @@ const LEGEND: LegendCategory[] = [
{ svg: heli('#32CD32'), label: 'Medical / Fire / Rescue (lime)' },
{ svg: airliner('yellow'), label: 'Military / Intelligence (yellow)' },
{ svg: airliner('#222'), label: 'PIA — Privacy / Stealth (black)' },
{ svg: airliner('#FF8C00'), label: 'Private Flights / Joe Cool (orange)' },
{ svg: airliner('#FF8C00'), label: 'As Seen on TV / Joe Cool (orange)' },
{ svg: airliner('white'), label: 'Climate Crisis (white)' },
{ svg: airliner('#9B59B6'), label: 'Private Jets / Historic / Other (purple)' },
],
+2
View File
@@ -3,8 +3,10 @@ export const trackedCategories: string[] = [
'Celebrity',
'Formula 1',
'Government',
'Oligarch',
'Other',
'People',
'Royal',
'Sports',
'State/Law',
'Test Aircraft',
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""Merge curated plane-alert-db rows into backend/data/tracked_names.json.
Only real people, companies, and organizations — never plane-alert joke tags
(The Gambler, Genomes, Aaaaaaaand its gone, etc.).
"""
from __future__ import annotations
import csv
import json
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SB_PATH = ROOT / "backend" / "data" / "tracked_names.json"
PAD = Path.home() / "Downloads" / "plane-alert-db-main" / "plane-alert-db-main"
STRICT_CATS = {
"Don't you know who I am?",
"Oligarch",
"Royal Aircraft",
"Football",
}
GENERIC_TAGS = {
"bizjet", "bizjets", "pusher prop", "man made climate change", "government",
"royalty", "pga", "nfl", "nba", "basketball", "war eagle", "volunteers",
"original nuttah", "jumpers for goalposts", "money money money", "safe return",
"do a barrel roll", "biplane", "aerospace", "medical", "defense",
"the gambler", "the house always wins", "house always wins", "snake eyes",
"bunch of bankers", "scrooge mcduck", "aaaaaaaand its gone", "aaaaaaand its gone",
"genomes", "football", "zoomies", "you can't see me", "too much money",
"venture capital", "honda jet", "basic cable", "as seen on tv", "joe cool",
}
COMPANY_HINTS = re.compile(
r"\b(inc|llc|ltd|corp|company|co\.|bank|group|holdings|international|"
r"university|airlines|aviation|systems|foundation|tribe|resorts|casino|"
r"palace|entertainment|insurance|credit union|banco|sa|ag|gmbh|plc)\b",
re.I,
)
MERGE_ALIASES: dict[str, str] = {
"falcon landing llc": "Elon Musk",
"christian ronaldo": "Cristiano Ronaldo",
"elon musk": "Elon Musk",
"marc benioff": "Mark Benioff",
"p. diddy": "P. Diddy",
"baller": "P. Diddy",
"empire state of mind": "Jay Z",
"judy sheindlin": "Judge Judy",
"doge": "Vivek Ramaswamy",
"a&m records": "Jerry Moss",
"wings of grace": "Folorunso Alakija",
"reliance commercial dealers ltd": "Mukesh Ambani",
"monaco royal family": "Monaco Royal Family",
"the royal squadron": "UK Royal Family (RAF)",
"the kings helicopter flight": "UK Royal Family (RAF)",
}
def norm_reg(s: str) -> str:
return (s or "").strip().upper()
def row_get(row: dict[str, str], *keys: str) -> str:
for key in keys:
if row.get(key):
return str(row[key]).strip()
return ""
def sb_category(cat: str, display: str, operator: str) -> str:
if cat == "Oligarch":
return "Oligarch"
if cat in {"Royal Aircraft"} or "royal" in display.lower():
return "Royal"
if cat == "Football":
return "Sports"
if COMPANY_HINTS.search(operator) or COMPANY_HINTS.search(display):
return "Business"
return "Celebrity"
def is_likely_person_name(text: str) -> bool:
t = text.strip()
if not t or t.lower() in GENERIC_TAGS:
return False
if any(ch in t for ch in "?!"):
return False
if COMPANY_HINTS.search(t):
return False
words = t.split()
if len(words) < 2 or len(words) > 5:
return False
# Require each word to look name-like (Title case or Mc/Mac/O').
for w in words:
if not re.match(r"^[A-Z][\w'.-]*$|^(Mc|Mac|O')[A-Z]", w):
return False
return True
def pick_display_name(operator: str, tag1: str, tag2: str, tag3: str, cat: str) -> str | None:
op_key = operator.strip().lower()
if op_key in MERGE_ALIASES:
return MERGE_ALIASES[op_key]
op = operator.strip()
if cat == "Football":
return op or None
if cat == "Royal Aircraft":
return op or None
if cat == "Oligarch":
if is_likely_person_name(op):
return op
for tag in (tag2, tag3, tag1):
if is_likely_person_name(tag):
return tag.strip()
return op or None
if cat == "Don't you know who I am?":
if is_likely_person_name(op):
return op
for tag in (tag2, tag3, tag1):
if is_likely_person_name(tag):
return tag.strip()
if op and not op.lower() in GENERIC_TAGS:
return op
return None
return None
def load_rows() -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
for fname in ("plane-alert-db.csv", "plane-alert-civ.csv"):
path = PAD / fname
if not path.exists():
continue
with path.open(encoding="utf-8", errors="replace") as f:
rows.extend(list(csv.DictReader(f)))
return rows
def main() -> None:
with SB_PATH.open(encoding="utf-8") as f:
sb = json.load(f)
details: dict = sb.setdefault("details", {})
names_list: list[dict[str, str]] = sb.setdefault("names", [])
existing_names = {n["name"] for n in names_list}
sb_regs: set[str] = set()
for info in details.values():
for reg in info.get("registrations", []):
r = norm_reg(reg)
if r:
sb_regs.add(r)
added_entries = 0
added_regs = 0
merged_regs = 0
seen: set[tuple[str, str]] = set()
for row in load_rows():
cat = row_get(row, "Category")
if cat not in STRICT_CATS:
continue
reg = norm_reg(row_get(row, "$Registration", "Registration"))
if not reg or (reg, cat) in seen:
continue
seen.add((reg, cat))
operator = row_get(row, "$Operator", "Operator")
tag1 = row_get(row, "$Tag 1", "Tag 1")
tag2 = row_get(row, "#Tag 2", "$#Tag 2")
tag3 = row_get(row, "#Tag 3", "$#Tag 3")
display = pick_display_name(operator, tag1, tag2, tag3, cat)
if not display or reg in sb_regs:
continue
category = sb_category(cat, display, operator)
if display in details:
regs = details[display].setdefault("registrations", [])
if reg not in regs:
regs.append(reg)
merged_regs += 1
sb_regs.add(reg)
continue
details[display] = {
"category": category,
"registrations": [reg],
}
if display not in existing_names:
names_list.append({"name": display, "category": category})
existing_names.add(display)
added_entries += 1
added_regs += 1
sb_regs.add(reg)
uk_key = "UK Royal Family (RAF)"
uk_regs = ["G-XWBG", "GZ-100", "ZE700", "ZE701", "ZE707", "ZE708", "G-XXEC"]
if uk_key in details:
details[uk_key]["category"] = "Royal"
regs = details[uk_key].setdefault("registrations", [])
for r in uk_regs:
if r not in regs:
regs.append(r)
merged_regs += 1
else:
details[uk_key] = {"category": "Royal", "registrations": uk_regs}
if uk_key not in existing_names:
names_list.append({"name": uk_key, "category": "Royal"})
added_entries += 1
names_list.sort(key=lambda x: x["name"].lower())
with SB_PATH.open("w", encoding="utf-8") as f:
json.dump(sb, f, indent=2, ensure_ascii=False)
f.write("\n")
print(f"New tracked entries: {added_entries}")
print(f"New registrations: {added_regs}")
print(f"Merged into existing: {merged_regs}")
print(f"Total details entries: {len(details)}")
print(f"Total registrations: {sum(len(v.get('registrations',[])) for v in details.values())}")
if __name__ == "__main__":
main()
+143
View File
@@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""Compare plane-alert-db CSVs to Shadowbroker tracked_names.json."""
from __future__ import annotations
import csv
import json
import re
from pathlib import Path
SB = Path(__file__).resolve().parents[1] / "backend" / "data" / "tracked_names.json"
PAD = Path.home() / "Downloads" / "plane-alert-db-main" / "plane-alert-db-main"
CELEB_CATS = {
"Don't you know who I am?",
"As Seen on TV",
"Joe Cool",
"Vanity Plate",
"Football",
"Head of State",
"Royal Aircraft",
"Oligarch",
"Bizjets",
}
PURE_CELEB_CATS = {
"Don't you know who I am?",
"As Seen on TV",
"Joe Cool",
"Vanity Plate",
"Football",
}
def norm_name(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip().lower())
def load_csv(path: Path) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with path.open(encoding="utf-8", errors="replace") as f:
for row in csv.DictReader(f):
rows.append(row)
return rows
def row_field(row: dict[str, str], *keys: str) -> str:
for key in keys:
if row.get(key):
return str(row[key]).strip()
return ""
def main() -> None:
with SB.open(encoding="utf-8") as f:
sb = json.load(f)
sb_regs: set[str] = set()
sb_names: set[str] = set()
for name, info in sb.get("details", {}).items():
sb_names.add(norm_name(name))
for reg in info.get("registrations", []):
r = reg.strip().upper()
if r:
sb_regs.add(r)
rows: list[dict[str, str]] = []
for fname in ("plane-alert-db.csv", "plane-alert-civ.csv"):
path = PAD / fname
if path.exists():
rows.extend(load_csv(path))
seen: set[tuple[str, str, str]] = set()
new_by_cat: dict[str, list[dict[str, str]]] = {}
for row in rows:
cat = row_field(row, "Category")
if cat not in CELEB_CATS:
continue
reg = row_field(row, "$Registration", "Registration").upper()
op = row_field(row, "$Operator", "Operator")
icao = row_field(row, "$ICAO", "ICAO").upper()
if not reg and not op:
continue
key = (reg, norm_name(op), cat)
if key in seen:
continue
seen.add(key)
in_sb = False
if reg and reg in sb_regs:
in_sb = True
if norm_name(op) in sb_names:
in_sb = True
if not in_sb and op:
opn = norm_name(op)
for sn in sb_names:
if len(sn) >= 6 and (sn in opn or opn in sn):
in_sb = True
break
if in_sb:
continue
entry = {
"registration": reg,
"operator": op,
"category": cat,
"type": row_field(row, "$Type", "Type"),
"icao": icao,
"tag1": row_field(row, "$Tag 1", "Tag 1"),
}
new_by_cat.setdefault(cat, []).append(entry)
print("=== Shadowbroker tracked ===")
print(f" names in details: {len(sb_names)}")
print(f" registrations: {len(sb_regs)}")
print()
print("=== NEW celebrity/VIP-ish entries (not in Shadowbroker) ===")
total = 0
for cat in sorted(new_by_cat, key=lambda c: -len(new_by_cat[c])):
items = new_by_cat[cat]
total += len(items)
print(f"\n## {cat} ({len(items)})")
for e in sorted(items, key=lambda x: x["operator"])[:30]:
reg = e["registration"] or "(no reg)"
tag = f" | {e['tag1']}" if e["tag1"] else ""
print(f" {reg:12} {e['operator'][:60]}{tag}")
if len(items) > 30:
print(f" ... and {len(items) - 30} more")
print(f"\n=== TOTAL NEW (all VIP categories): {total} ===")
pure_items = [e for c in PURE_CELEB_CATS for e in new_by_cat.get(c, [])]
print(f"\n=== HIGH-SIGNAL CELEB / NOTABLE ({len(pure_items)}) ===")
for e in sorted(pure_items, key=lambda x: (x["category"], x["operator"])):
reg = e["registration"] or "????"
tag = f" ({e['tag1']})" if e["tag1"] else ""
print(f"[{e['category']}] {reg}{e['operator']}{tag}")
if __name__ == "__main__":
main()
+223
View File
@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""Extract plane-alert-db entries missing from tracked_names.json."""
from __future__ import annotations
import csv
import json
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SB = ROOT / "backend" / "data" / "tracked_names.json"
PAD = Path.home() / "Downloads" / "plane-alert-db-main" / "plane-alert-db-main"
# Categories to import into tracked_names
IMPORT_CATS = {
"Don't you know who I am?",
"Oligarch",
"Royal Aircraft",
"Football",
"Head of State",
"Dictator Alert",
}
# As Seen on TV / Bizjets only when operator looks like a person (heuristic)
PERSON_CATS = {"As Seen on TV", "Bizjets", "Vanity Plate"}
# Skip obvious corps / generic operators
CORP_RE = re.compile(
r"\b(inc|llc|ltd|corp|company|co\.|group|holdings|university|air force|"
r"airlines|aviation|services|systems|international|global|partners|"
r"foundation|bank|pharma|laboratories|transportation|motors|enterprises)\b",
re.I,
)
CELEB_HINTS = re.compile(
r"\b(actor|actress|singer|rapper|musician|celebrity|nfl|nba|f1|formula|"
r"royal|prince|princess|king|queen|duke|sheik|sultan|oligarch|billionaire|"
r"mogul|tycoon|founder|ceo|president|senator|governor|judge|athlete|"
r"footballer|golfer|tennis|director|producer|host|comedian|model|"
r"influencer|youtuber|podcast|chef|author|writer|artist|designer)\b",
re.I,
)
KNOWN_PERSON_NAMES = {
"elon musk", "jay z", "jay-z", "kanye", "west", "kim kardashian", "taylor swift",
"beyonce", "drake", "rihanna", "oprah", "gates", "bezos", "zuckerberg",
"buffett", "dalio", "icahn", "ackman", "soros", "thiel", "musk", "cruise",
"dicaprio", "pitt", "jolie", "clooney", "hanks", "spielberg", "lucas",
"branson", "trump", "biden", "obama", "clinton", "bush", "romney",
"ramaswamy", "benioff", "blavatnik", "abramovich", "abramov", "potanin",
"fridman", "deripaska", "kerimov", "tinkov", "mordashov", "rybolovlev",
"lisin", "vekselberg", "medvedchuk", "alekperov", "mikhelson", "diddy",
"combs", "sean combs", "ronaldo", "messi", "mbappe", "beckham", "jordan",
"lebron", "brady", "mahomes", "kroenke", "kraft", "jones", "snyder",
"sheindlin", "judge judy", "elton john", "moss", "ambani", "adani",
"lowry", "ecclestone", "hamilton", "verstappen", "schumacher", "woods",
"nicklaus", "federer", "nadal", "djokovic", "osaka", "williams", "serena",
"venus", "sharapova", "mcgregor", "mayweather", "paul", "logan paul",
"jake paul", "mrbeast", "pewdiepie", "charlie munger", "larry ellison",
"michael dell", "tim cook", "satya nadella", "sundar pichai", "jensen huang",
"gisele", "tom brady", "gwyneth", "howard stern", "howard marks",
"steven cohen", "ken griffin", "david tepper", "ray dalio", "peter thiel",
"paul allen", "steve ballmer", "mark cuban", "richard branson", "larry page",
"sergey brin", "eric schmidt", "reid hoffman", "marc andreessen",
"chamath", "naval", "andretti", "penske", "hendrick", "rick hendrick",
}
def norm_reg(s: str) -> str:
return (s or "").strip().upper()
def norm_name(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip())
def looks_like_person(operator: str, tag1: str, tag2: str, tag3: str) -> bool:
blob = " ".join([operator, tag1, tag2, tag3]).strip()
if not blob or len(blob) < 3:
return False
low = blob.lower()
if CORP_RE.search(low) and not any(h in low for h in KNOWN_PERSON_NAMES):
# allow "Falcon Landing LLC" when tag says Elon Musk
if not any(h in low for h in KNOWN_PERSON_NAMES):
return False
if any(h in low for h in KNOWN_PERSON_NAMES):
return True
if CELEB_HINTS.search(low):
return True
# Two+ capitalized words, no corp suffix — weak person signal
words = operator.split()
if 2 <= len(words) <= 4 and operator == operator.title() and not CORP_RE.search(low):
return True
return False
def sb_category_for(cat: str, operator: str) -> str:
low = operator.lower()
if cat in {"Oligarch", "Dictator Alert"}:
return "Oligarch"
if cat == "Royal Aircraft" or "royal" in low:
return "Royal"
if cat == "Football":
return "Sports"
if cat in {"Head of State"}:
return "Government"
if any(x in low for x in ("nfl", "nba", "mlb", "football", "basketball", "soccer", "f1", "formula")):
return "Sports"
return "Celebrity"
def row_get(row: dict[str, str], *keys: str) -> str:
for k in keys:
if row.get(k):
return str(row[k]).strip()
return ""
def main() -> None:
with SB.open(encoding="utf-8") as f:
sb = json.load(f)
sb_regs: set[str] = set()
sb_names: dict[str, str] = {}
for name, info in sb.get("details", {}).items():
for reg in info.get("registrations", []):
r = norm_reg(reg)
if r:
sb_regs.add(r)
sb_names[r] = name
additions: dict[str, dict] = {}
merge: dict[str, list[str]] = {}
csv_paths = [
PAD / "plane-alert-db.csv",
PAD / "plane-alert-civ.csv",
PAD / "plane-alert-gov.csv",
PAD / "plane-alert-mil.csv",
]
seen: set[tuple[str, str]] = set()
person_hits = 0
for path in csv_paths:
if not path.exists():
continue
with path.open(encoding="utf-8", errors="replace") as f:
for row in csv.DictReader(f):
cat = row_get(row, "Category")
reg = norm_reg(row_get(row, "$Registration", "Registration"))
op = norm_reg(row_get(row, "$Operator", "Operator"))
op_display = norm_name(row_get(row, "$Operator", "Operator"))
tag1 = row_get(row, "$Tag 1", "Tag 1")
tag2 = row_get(row, "#Tag 2", "$#Tag 2")
tag3 = row_get(row, "#Tag 3", "$#Tag 3")
if not reg:
continue
if (reg, cat) in seen:
continue
seen.add((reg, cat))
include = cat in IMPORT_CATS
if not include and cat in PERSON_CATS:
if looks_like_person(op_display, tag1, tag2, tag3):
include = True
person_hits += 1
if not include:
continue
if reg in sb_regs:
continue
# Prefer tag person name over shell company
display = op_display
for tag in (tag1, tag2, tag3):
if tag and any(h in tag.lower() for h in KNOWN_PERSON_NAMES):
display = tag
break
if tag and len(tag.split()) <= 4 and tag[0].isupper() and "llc" not in tag.lower():
if cat == "Don't you know who I am?" and tag not in {"Bizjet", "Pusher Prop"}:
display = tag
key = display
if key in sb.get("details", {}):
merge.setdefault(key, []).append(reg)
else:
entry = additions.setdefault(
key,
{"category": sb_category_for(cat, display), "registrations": []},
)
if reg not in entry["registrations"]:
entry["registrations"].append(reg)
print(f"New named entries: {len(additions)}")
print(f"Merge into existing: {len(merge)}")
print(f"Person-heuristic hits (ASTV/Bizjets): {person_hits}")
print()
by_cat: dict[str, list[tuple[str, list[str]]]] = {}
for name, info in sorted(additions.items()):
by_cat.setdefault(info["category"], []).append((name, info["registrations"]))
for cat in sorted(by_cat):
items = by_cat[cat]
print(f"## {cat} ({len(items)})")
for name, regs in items[:40]:
print(f" {name}: {', '.join(regs)}")
if len(items) > 40:
print(f" ... +{len(items)-40} more")
print()
out = ROOT / "scripts" / "plane_alert_additions.json"
out.write_text(
json.dumps({"additions": additions, "merge": merge}, indent=2),
encoding="utf-8",
)
print(f"Wrote {out}")
if __name__ == "__main__":
main()
+190
View File
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""Sync plane_alert_db.json from upstream CSV and add explicit celeb/royal tails.
Does NOT import plane-alert joke tags into tracked_names.json.
"""
from __future__ import annotations
import csv
import json
import subprocess
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SB_PATH = ROOT / "backend" / "data" / "tracked_names.json"
PADB_PATH = ROOT / "backend" / "data" / "plane_alert_db.json"
PAD = Path.home() / "Downloads" / "plane-alert-db-main" / "plane-alert-db-main"
MANUAL_TRACKED: list[tuple[str, str, list[str]]] = [
("Michael Dell", "Celebrity", ["N28ZD"]),
("Lady Moura", "Celebrity", ["VP-CNR"]),
("Lewis Hamilton", "Celebrity", ["G-OFOM"]),
("Mario Andretti", "Celebrity", ["N500MA"]),
("Frank Lowry", "Celebrity", ["N613LF"]),
("Mukesh Ambani", "Celebrity", ["VT-AKV"]),
("Judge Judy", "Celebrity", ["N555QB"]),
("Monaco Royal Family", "Royal", ["3A-MGA"]),
("PGA Tour", "Sports", ["N795HG"]),
]
def norm_reg(s: str) -> str:
return (s or "").strip().upper()
def norm_icao(s: str) -> str:
return (s or "").strip().upper()
def row_get(row: dict[str, str], *keys: str) -> str:
for key in keys:
if row.get(key):
return str(row[key]).strip()
return ""
def load_git_baseline() -> set[str]:
raw = subprocess.check_output(
["git", "-C", str(ROOT), "show", "HEAD:backend/data/tracked_names.json"],
)
data = json.loads(raw)
return set(data.get("details", {}).keys())
def wiki_from_link(link: str) -> str:
if not link:
return ""
if "wikipedia.org/wiki/" in link:
return link.rsplit("/wiki/", 1)[-1].split("#")[0]
return ""
def steal_reg(details: dict, reg: str, protect_names: set[str]) -> None:
reg = norm_reg(reg)
for name, info in list(details.items()):
if name in protect_names:
continue
regs = info.get("registrations", [])
kept = [r for r in regs if norm_reg(r) != reg]
if len(kept) != len(regs):
if kept:
info["registrations"] = kept
else:
del details[name]
def ensure_entry(
details: dict,
names_list: list,
name: str,
category: str,
reg: str,
*,
protect_names: set[str] | None = None,
) -> bool:
reg = norm_reg(reg)
if not reg:
return False
steal_reg(details, reg, protect_names or set())
entry = details.setdefault(name, {"category": category, "registrations": []})
entry["category"] = category
if reg not in {norm_reg(r) for r in entry["registrations"]}:
entry["registrations"].append(reg)
if name not in {n["name"] for n in names_list}:
names_list.append({"name": name, "category": category})
return True
def sync_plane_alert_db() -> tuple[int, int]:
if not PADB_PATH.exists():
return 0, 0
with PADB_PATH.open(encoding="utf-8") as f:
db: dict = json.load(f)
updated = 0
added = 0
for fname in (
"plane-alert-db.csv",
"plane-alert-civ.csv",
"plane-alert-gov.csv",
"plane-alert-mil.csv",
"plane-alert-pol.csv",
):
path = PAD / fname
if not path.exists():
continue
with path.open(encoding="utf-8", errors="replace") as f:
for row in csv.DictReader(f):
icao = norm_icao(row_get(row, "$ICAO", "ICAO"))
if not icao:
continue
reg = row_get(row, "$Registration", "Registration")
operator = row_get(row, "$Operator", "Operator")
ac_type = row_get(row, "$Type", "Type")
category = row_get(row, "Category")
tag1 = row_get(row, "$Tag 1", "Tag 1")
tag2 = row_get(row, "#Tag 2", "$#Tag 2")
tag3 = row_get(row, "#Tag 3", "$#Tag 3")
link = row_get(row, "$#Link", "#Link", "$#Link ")
tags = ", ".join(t for t in (tag1, tag2, tag3) if t)
record = {
"registration": reg,
"operator": operator,
"ac_type": ac_type,
"category": category,
"tags": tags,
"link": link,
}
wiki = wiki_from_link(link)
if wiki:
record["wiki"] = wiki
if icao in db:
if db[icao] != record:
db[icao] = record
updated += 1
else:
db[icao] = record
added += 1
with PADB_PATH.open("w", encoding="utf-8") as f:
json.dump(db, f, indent=4, ensure_ascii=False)
f.write("\n")
return added, updated
def main() -> None:
baseline_keys = load_git_baseline()
with SB_PATH.open(encoding="utf-8") as f:
sb = json.load(f)
details: dict = sb.setdefault("details", {})
names_list: list[dict] = sb.setdefault("names", [])
manual_added = 0
for name, category, regs in MANUAL_TRACKED:
for reg in regs:
if ensure_entry(details, names_list, name, category, reg, protect_names=baseline_keys):
manual_added += 1
for key in baseline_keys:
if key not in details:
raise RuntimeError(f"Baseline tracked name lost: {key}")
names_list.sort(key=lambda x: x["name"].lower())
with SB_PATH.open("w", encoding="utf-8") as f:
json.dump(sb, f, indent=2, ensure_ascii=False)
f.write("\n")
pad_added, pad_updated = sync_plane_alert_db()
print(f"Manual celeb regs added: {manual_added}")
print(f"plane_alert_db.json: +{pad_added} updated {pad_updated}")
print(f"tracked_names details: {len(details)}")
print(f"tracked_names registrations: {sum(len(v.get('registrations',[])) for v in details.values())}")
if __name__ == "__main__":
main()