diff --git a/backend/config/news_feeds.json b/backend/config/news_feeds.json index 791adf3..1af3a0f 100644 --- a/backend/config/news_feeds.json +++ b/backend/config/news_feeds.json @@ -1,12 +1,44 @@ { "feeds": [ - { "name": "NPR", "url": "https://feeds.npr.org/1004/rss.xml", "weight": 4 }, - { "name": "BBC", "url": "http://feeds.bbci.co.uk/news/world/rss.xml", "weight": 3 }, - { "name": "AlJazeera", "url": "https://www.aljazeera.com/xml/rss/all.xml", "weight": 2 }, - { "name": "NYT", "url": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml", "weight": 1 }, - { "name": "GDACS", "url": "https://www.gdacs.org/xml/rss.xml", "weight": 5 }, - { "name": "NHK", "url": "https://www3.nhk.or.jp/nhkworld/rss/world.xml", "weight": 3 }, - { "name": "CNA", "url": "https://www.channelnewsasia.com/rssfeed/8395986", "weight": 3 }, - { "name": "Mercopress", "url": "https://en.mercopress.com/rss/", "weight": 3 } + { + "name": "NPR", + "url": "https://feeds.npr.org/1004/rss.xml", + "weight": 4 + }, + { + "name": "BBC", + "url": "http://feeds.bbci.co.uk/news/world/rss.xml", + "weight": 3 + }, + { + "name": "AlJazeera", + "url": "https://www.aljazeera.com/xml/rss/all.xml", + "weight": 2 + }, + { + "name": "NYT", + "url": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml", + "weight": 1 + }, + { + "name": "GDACS", + "url": "https://www.gdacs.org/xml/rss.xml", + "weight": 5 + }, + { + "name": "NHK", + "url": "https://www3.nhk.or.jp/nhkworld/rss/world.xml", + "weight": 3 + }, + { + "name": "CNA", + "url": "https://www.channelnewsasia.com/rssfeed/8395986", + "weight": 3 + }, + { + "name": "Mercopress", + "url": "https://en.mercopress.com/rss/", + "weight": 3 + } ] -} +} \ No newline at end of file diff --git a/backend/main.py b/backend/main.py index e7c2ff3..bfb6fba 100644 --- a/backend/main.py +++ b/backend/main.py @@ -77,6 +77,15 @@ async def force_refresh(): async def live_data(): return get_latest_data() +def _etag_response(request: Request, payload: dict, prefix: str = "", default=None): + """Serialize once, hash the bytes for ETag, return 304 or full response.""" + content = json_mod.dumps(payload, default=default) + etag = hashlib.md5(f"{prefix}{content[:256]}".encode()).hexdigest()[:16] + if request.headers.get("if-none-match") == etag: + return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) + return Response(content=content, media_type="application/json", + headers={"ETag": etag, "Cache-Control": "no-cache"}) + @app.get("/api/live-data/fast") async def live_data_fast(request: Request): d = get_latest_data() @@ -87,25 +96,13 @@ async def live_data_fast(request: Request): "private_jets": d.get("private_jets", []), "tracked_flights": d.get("tracked_flights", []), "ships": d.get("ships", []), - "satellites": d.get("satellites", []), "cctv": d.get("cctv", []), "uavs": d.get("uavs", []), "liveuamap": d.get("liveuamap", []), "gps_jamming": d.get("gps_jamming", []), "freshness": dict(source_timestamps), } - # ETag includes last_updated timestamp so it changes on every data refresh, - # not just when item counts change (old bug: positions went stale) - last_updated = d.get("last_updated", "") - counts = "|".join(f"{k}:{len(v) if isinstance(v, list) else 0}" for k, v in payload.items() if k != "freshness") - etag = hashlib.md5(f"{last_updated}|{counts}".encode()).hexdigest()[:16] - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - return Response( - content=json_mod.dumps(payload), - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"} - ) + return _etag_response(request, payload, prefix="fast|") @app.get("/api/live-data/slow") async def live_data_slow(request: Request): @@ -129,17 +126,7 @@ async def live_data_slow(request: Request): "datacenters": d.get("datacenters", []), "freshness": dict(source_timestamps), } - # ETag based on last_updated + item counts - last_updated = d.get("last_updated", "") - counts = "|".join(f"{k}:{len(v) if isinstance(v, list) else 0}" for k, v in payload.items() if k != "freshness") - etag = hashlib.md5(f"slow|{last_updated}|{counts}".encode()).hexdigest()[:16] - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - return Response( - content=json_mod.dumps(payload, default=str), - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"} - ) + return _etag_response(request, payload, prefix="slow|", default=str) @app.get("/api/debug-latest") async def debug_latest_data(): diff --git a/backend/services/ais_stream.py b/backend/services/ais_stream.py index 0eb5bf3..22c2a2e 100644 --- a/backend/services/ais_stream.py +++ b/backend/services/ais_stream.py @@ -238,49 +238,51 @@ def _ais_stream_loop(): logger.info("AIS Stream proxy started — receiving vessel data") msg_count = 0 + ok_streak = 0 # Track consecutive successful messages for backoff reset + last_log_time = time.time() for raw_msg in iter(process.stdout.readline, ''): if not _ws_running: process.terminate() break - + raw_msg = raw_msg.strip() if not raw_msg: continue - + try: data = json.loads(raw_msg) except json.JSONDecodeError: continue - + if "error" in data: logger.error(f"AIS Stream error: {data['error']}") continue - + msg_type = data.get("MessageType", "") metadata = data.get("MetaData", {}) message = data.get("Message", {}) - + mmsi = metadata.get("MMSI", 0) if not mmsi: continue - + with _vessels_lock: if mmsi not in _vessels: _vessels[mmsi] = {"_updated": time.time()} vessel = _vessels[mmsi] - + # Update position from PositionReport or StandardClassBPositionReport if msg_type in ("PositionReport", "StandardClassBPositionReport"): report = message.get(msg_type, {}) lat = report.get("Latitude", metadata.get("latitude", 0)) lng = report.get("Longitude", metadata.get("longitude", 0)) - + # Skip invalid positions if lat == 0 and lng == 0: continue if abs(lat) > 90 or abs(lng) > 180: continue - + with _vessels_lock: vessel["lat"] = lat vessel["lng"] = lng @@ -292,12 +294,12 @@ def _ais_stream_loop(): # Use metadata name if we don't have one yet if not vessel.get("name") or vessel["name"] == "UNKNOWN": vessel["name"] = metadata.get("ShipName", "UNKNOWN").strip() or "UNKNOWN" - + # Update static data from ShipStaticData elif msg_type == "ShipStaticData": static = message.get("ShipStaticData", {}) ais_type = static.get("Type", 0) - + with _vessels_lock: vessel["name"] = (static.get("Name", "") or metadata.get("ShipName", "UNKNOWN")).strip() or "UNKNOWN" vessel["callsign"] = (static.get("CallSign", "") or "").strip() @@ -306,21 +308,24 @@ def _ais_stream_loop(): vessel["ais_type_code"] = ais_type vessel["type"] = classify_vessel(ais_type, mmsi) vessel["_updated"] = time.time() - + msg_count += 1 - if msg_count % 5000 == 0: + ok_streak += 1 + + # Reset backoff after 200 consecutive successful messages + if ok_streak >= 200 and backoff > 1: + backoff = 1 + ok_streak = 0 + + # Periodic logging + cache save (time-based instead of count-based to avoid lock in hot loop) + now = time.time() + if now - last_log_time >= 60: with _vessels_lock: - # Inline pruning: remove vessels not updated in 15 minutes - prune_cutoff = time.time() - 900 - stale = [k for k, v in _vessels.items() if v.get("_updated", 0) < prune_cutoff] - for k in stale: - del _vessels[k] count = len(_vessels) - if stale: - logger.info(f"AIS pruned {len(stale)} stale vessels") logger.info(f"AIS Stream: processed {msg_count} messages, tracking {count} vessels") - _save_cache() # Auto-save every 5000 messages (~60 seconds) - + _save_cache() + last_log_time = now + except Exception as e: logger.error(f"AIS proxy connection error: {e}") if _ws_running: @@ -328,8 +333,6 @@ def _ais_stream_loop(): time.sleep(backoff) backoff = min(backoff * 2, 60) # Double up to 60s max continue - # Reset backoff on successful connection (got at least some messages) - backoff = 1 def _run_ais_loop(): diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 9138d4c..4c2a6a9 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -15,6 +15,7 @@ import threading import io from apscheduler.schedulers.background import BackgroundScheduler import concurrent.futures +import heapq from sgp4.api import Satrec, WGS72 from sgp4.api import jday from datetime import datetime @@ -81,6 +82,25 @@ opensky_client = OpenSkyClient( last_opensky_fetch = 0 cached_opensky_flights = [] +# --------------------------------------------------------------------------- +# Supplemental ADS-B sources for blind-spot gap-filling (Russia/China/Africa) +# These aggregators have different feeder pools than adsb.lol and can surface +# aircraft invisible to our primary source. Only gap-fill planes are kept. +# --------------------------------------------------------------------------- +_BLIND_SPOT_REGIONS = [ + {"name": "Yekaterinburg", "lat": 56.8, "lon": 60.6, "radius_nm": 250}, + {"name": "Novosibirsk", "lat": 55.0, "lon": 82.9, "radius_nm": 250}, + {"name": "Krasnoyarsk", "lat": 56.0, "lon": 92.9, "radius_nm": 250}, + {"name": "Vladivostok", "lat": 43.1, "lon": 131.9, "radius_nm": 250}, + {"name": "Urumqi", "lat": 43.8, "lon": 87.6, "radius_nm": 250}, + {"name": "Chengdu", "lat": 30.6, "lon": 104.1, "radius_nm": 250}, + {"name": "Lagos-Accra", "lat": 6.5, "lon": 3.4, "radius_nm": 250}, + {"name": "Addis Ababa", "lat": 9.0, "lon": 38.7, "radius_nm": 250}, +] +_SUPPLEMENTAL_FETCH_INTERVAL = 120 # seconds — only query every 2 min +last_supplemental_fetch = 0 +cached_supplemental_flights = [] + # In-memory store @@ -480,27 +500,31 @@ def fetch_news(): latest_data['news'] = news_items _mark_fresh("news") +def _fetch_single_ticker(symbol: str, period: str = "2d"): + """Fetch a single yfinance ticker. Returns (symbol, data_dict) or (symbol, None).""" + try: + ticker = yf.Ticker(symbol) + hist = ticker.history(period=period) + if len(hist) >= 1: + current_price = hist['Close'].iloc[-1] + prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price + change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0 + return symbol, { + "price": round(float(current_price), 2), + "change_percent": round(float(change_percent), 2), + "up": bool(change_percent >= 0) + } + except Exception as e: + logger.warning(f"Could not fetch data for {symbol}: {e}") + return symbol, None + + def fetch_defense_stocks(): tickers = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"] - stocks_data = {} try: - for t in tickers: - try: - ticker = yf.Ticker(t) - hist = ticker.history(period="2d") - if len(hist) >= 1: - current_price = hist['Close'].iloc[-1] - prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price - change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0 - - stocks_data[t] = { - "price": round(float(current_price), 2), - "change_percent": round(float(change_percent), 2), - "up": bool(change_percent >= 0) - } - except Exception as e: - logger.warning(f"Could not fetch data for {t}: {e}") - + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool: + results = pool.map(lambda t: _fetch_single_ticker(t, "2d"), tickers) + stocks_data = {sym: data for sym, data in results if data} latest_data['stocks'] = stocks_data _mark_fresh("stocks") except Exception as e: @@ -509,25 +533,10 @@ def fetch_defense_stocks(): def fetch_oil_prices(): # CL=F is Crude Oil, BZ=F is Brent Crude tickers = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"} - oil_data = {} try: - for name, symbol in tickers.items(): - try: - ticker = yf.Ticker(symbol) - hist = ticker.history(period="5d") - if len(hist) >= 2: - current_price = hist['Close'].iloc[-1] - prev_close = hist['Close'].iloc[-2] - change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0 - - oil_data[name] = { - "price": round(float(current_price), 2), - "change_percent": round(float(change_percent), 2), - "up": bool(change_percent >= 0) - } - except Exception as e: - logger.warning(f"Could not fetch data for {symbol}: {e}") - + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: + results = pool.map(lambda item: (_fetch_single_ticker(item[1], "5d")[1], item[0]), tickers.items()) + oil_data = {name: data for data, name in results if data} latest_data['oil'] = oil_data _mark_fresh("oil") except Exception as e: @@ -612,6 +621,87 @@ _HELI_TYPES_BACKEND = { "B47G", "HUEY", "GAMA", "CABR", "EXE", } + +def _fetch_supplemental_sources(seen_hex: set) -> list: + """Fetch from airplanes.live and adsb.fi to fill blind-spot gaps. + + Only returns aircraft whose ICAO hex is NOT already in seen_hex. + Throttled to run every _SUPPLEMENTAL_FETCH_INTERVAL seconds. + Fully wrapped in try/except — returns [] on any failure. + """ + global last_supplemental_fetch, cached_supplemental_flights + + now = time.time() + if now - last_supplemental_fetch < _SUPPLEMENTAL_FETCH_INTERVAL: + # Return cached results, but still filter against current seen_hex + return [f for f in cached_supplemental_flights + if f.get("hex", "").lower().strip() not in seen_hex] + + new_supplemental = [] + supplemental_hex = set() # track hex within supplemental to avoid internal dupes + + # --- airplanes.live (parallel, all hotspots) --- + def _fetch_airplaneslive(region): + try: + url = (f"https://api.airplanes.live/v2/point/" + f"{region['lat']}/{region['lon']}/{region['radius_nm']}") + res = fetch_with_curl(url, timeout=10) + if res.status_code == 200: + data = res.json() + return data.get("ac", []) + except Exception as e: + logger.debug(f"airplanes.live {region['name']} failed: {e}") + return [] + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool: + results = list(pool.map(_fetch_airplaneslive, _BLIND_SPOT_REGIONS)) + for region_flights in results: + for f in region_flights: + h = f.get("hex", "").lower().strip() + if h and h not in seen_hex and h not in supplemental_hex: + f["supplemental_source"] = "airplanes.live" + new_supplemental.append(f) + supplemental_hex.add(h) + except Exception as e: + logger.warning(f"airplanes.live supplemental fetch failed: {e}") + + ap_count = len(new_supplemental) + + # --- adsb.fi (sequential, 1.1s between requests to respect 1 req/sec limit) --- + try: + for region in _BLIND_SPOT_REGIONS: + try: + url = (f"https://opendata.adsb.fi/api/v3/lat/" + f"{region['lat']}/lon/{region['lon']}/dist/{region['radius_nm']}") + res = fetch_with_curl(url, timeout=10) + if res.status_code == 200: + data = res.json() + for f in data.get("ac", []): + h = f.get("hex", "").lower().strip() + if h and h not in seen_hex and h not in supplemental_hex: + f["supplemental_source"] = "adsb.fi" + new_supplemental.append(f) + supplemental_hex.add(h) + except Exception as e: + logger.debug(f"adsb.fi {region['name']} failed: {e}") + time.sleep(1.1) # Rate limit: 1 req/sec + except Exception as e: + logger.warning(f"adsb.fi supplemental fetch failed: {e}") + + fi_count = len(new_supplemental) - ap_count + + cached_supplemental_flights = new_supplemental + last_supplemental_fetch = now + if new_supplemental: + _mark_fresh("supplemental_flights") + + logger.info(f"Supplemental: +{len(new_supplemental)} new aircraft from blind-spot " + f"hotspots (airplanes.live: {ap_count}, adsb.fi: {fi_count})") + + return new_supplemental + + def fetch_flights(): # OpenSky Network public API for flights. We want to demonstrate global coverage. flights = [] @@ -712,7 +802,22 @@ def fetch_flights(): all_adsb_flights.append(osf) seen_hex.add(h.lower().strip()) - + # ------------------------------------------------------------------- + # Supplemental Sources: airplanes.live + adsb.fi (blind-spot gap-fill) + # Only adds aircraft whose ICAO hex is NOT already in seen_hex. + # ------------------------------------------------------------------- + try: + gap_fill = _fetch_supplemental_sources(seen_hex) + for f in gap_fill: + all_adsb_flights.append(f) + h = f.get("hex", "").lower().strip() + if h: + seen_hex.add(h) + if gap_fill: + logger.info(f"Gap-fill: added {len(gap_fill)} aircraft to pipeline") + except Exception as e: + logger.warning(f"Supplemental source fetch failed (non-fatal): {e}") + if all_adsb_flights: # The user requested maximum flight density. Rendering all available aircraft. @@ -1333,9 +1438,8 @@ def fetch_firms_fires(): }) except (ValueError, TypeError): continue - # Sort by FRP descending, keep top 5000 (most intense fires first) - all_rows.sort(key=lambda x: x["frp"], reverse=True) - fires = all_rows[:5000] + # Keep top 5000 by FRP (most intense fires first) — heapq is O(n) vs O(n log n) sort + fires = heapq.nlargest(5000, all_rows, key=lambda x: x["frp"]) logger.info(f"FIRMS fires: {len(fires)} hotspots (from {response.status_code})") except Exception as e: logger.error(f"Error fetching FIRMS fires: {e}") @@ -1471,9 +1575,8 @@ def fetch_internet_outages(): r["lat"] = coords[0] r["lng"] = coords[1] geocoded.append(r) - # Sort by severity descending, cap at 100 - geocoded.sort(key=lambda x: x["severity"], reverse=True) - outages = geocoded[:100] + # Keep top 100 by severity + outages = heapq.nlargest(100, geocoded, key=lambda x: x["severity"]) logger.info(f"Internet outages: {len(outages)} regions affected") except Exception as e: logger.error(f"Error fetching internet outages: {e}") @@ -2219,8 +2322,8 @@ def start_scheduler(): scheduler.add_job(update_liveuamap, 'date', run_date=datetime.now()) scheduler.add_job(update_liveuamap, 'interval', hours=12) - # Geopolitics (frontlines) more frequently than other slow data - scheduler.add_job(fetch_geopolitics, 'interval', minutes=5) + # Geopolitics (frontlines) aligned with slow-data tier + scheduler.add_job(fetch_geopolitics, 'interval', minutes=30) scheduler.start() diff --git a/backend/services/geopolitics.py b/backend/services/geopolitics.py index d5a4f1b..aebfb7d 100644 --- a/backend/services/geopolitics.py +++ b/backend/services/geopolitics.py @@ -86,8 +86,10 @@ def _extract_domain(url): def _url_to_headline(url): """Extract a human-readable headline from a URL path. - e.g. 'https://nytimes.com/2026/03/us-strikes-iran-nuclear-sites.html' -> 'Us Strikes Iran Nuclear Sites (nytimes.com)' + e.g. 'https://nytimes.com/2026/03/us-strikes-iran-nuclear-sites.html' -> 'Us Strikes Iran Nuclear Sites' + Falls back to domain name if the URL slug is gibberish (hex IDs, UUIDs, etc.). """ + import re try: from urllib.parse import urlparse, unquote parsed = urlparse(url) @@ -100,43 +102,151 @@ def _url_to_headline(url): if not path: return domain - # Take the last path segment (usually the slug) - slug = path.split('/')[-1] - # Remove file extensions - for ext in ['.html', '.htm', '.php', '.asp', '.aspx', '.shtml']: - if slug.lower().endswith(ext): - slug = slug[:-len(ext)] - # If slug is purely numeric or a short ID, try the second-to-last segment - import re - if re.match(r'^[a-z]?\d{5,}$', slug, re.IGNORECASE): - segments = path.split('/') - if len(segments) >= 2: - slug = segments[-2] - for ext in ['.html', '.htm', '.php']: - if slug.lower().endswith(ext): - slug = slug[:-len(ext)] + # Try the last path segment first, then walk backwards + segments = [s for s in path.split('/') if s] + slug = '' + for seg in reversed(segments): + # Remove file extensions + for ext in ['.html', '.htm', '.php', '.asp', '.aspx', '.shtml']: + if seg.lower().endswith(ext): + seg = seg[:-len(ext)] + # Skip segments that are clearly not headlines + if _is_gibberish(seg): + continue + slug = seg + break + + if not slug: + return domain + # Remove common ID patterns at start/end - slug = re.sub(r'^[\d]+-', '', slug) # leading numbers like "13847569-" - slug = re.sub(r'-[\da-f]{6,}$', '', slug) # trailing hex IDs - slug = re.sub(r'[-_]c-\d+$', '', slug) # trailing "-c-21803431" - slug = re.sub(r'^p=\d+$', '', slug) # WordPress ?p=1234 + slug = re.sub(r'^[\d]+-', '', slug) # leading "13847569-" + slug = re.sub(r'-[\da-f]{6,}$', '', slug) # trailing hex IDs + slug = re.sub(r'[-_]c-\d+$', '', slug) # trailing "-c-21803431" + slug = re.sub(r'^p=\d+$', '', slug) # WordPress ?p=1234 # Convert slug separators to spaces slug = slug.replace('-', ' ').replace('_', ' ') - # Clean up multiple spaces slug = re.sub(r'\s+', ' ', slug).strip() - # If slug is still just a number or too short, fall back to domain - if len(slug) < 5 or re.match(r'^\d+$', slug): + # Final gibberish check after cleanup + if len(slug) < 8 or _is_gibberish(slug.replace(' ', '-')): return domain # Title case and truncate headline = slug.title() - if len(headline) > 80: - headline = headline[:77] + '...' - return f"{headline} ({domain})" + if len(headline) > 90: + headline = headline[:87] + '...' + return headline except Exception: return url[:60] + +def _is_gibberish(text): + """Detect if a URL segment is gibberish (hex IDs, UUIDs, numeric IDs, etc.) + rather than a real human-readable slug like 'us-strikes-iran'.""" + import re + t = text.strip() + if not t: + return True + # Pure numbers + if re.match(r'^\d+$', t): + return True + # UUID pattern (with or without dashes) + if re.match(r'^[0-9a-f]{8}[_-]?[0-9a-f]{4}[_-]?[0-9a-f]{4}[_-]?[0-9a-f]{4}[_-]?[0-9a-f]{12}$', t, re.I): + return True + # Hex-heavy string: more than 40% hex digits among alphanumeric chars + alnum = re.sub(r'[^a-zA-Z0-9]', '', t) + if alnum: + hex_chars = sum(1 for c in alnum if c in '0123456789abcdefABCDEF') + if hex_chars / len(alnum) > 0.4 and len(alnum) > 6: + return True + # Mostly digits with a few alpha (like "article8efa6c53") + digits = sum(1 for c in alnum if c.isdigit()) + if alnum and digits / len(alnum) > 0.5: + return True + # Too short to be a headline slug + if len(t) < 5: + return True + # Query-param style segments + if '=' in t: + return True + return False + + +# Persistent cache for article titles — survives across GDELT cache refreshes +_article_title_cache = {} + +def _fetch_article_title(url): + """Fetch the real headline from an article's HTML