mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-04-23 19:16:06 +02:00
668ce16dc7
Gate messages now propagate via the Infonet hashchain as encrypted blobs — every node syncs them through normal chain sync while only Gate members with MLS keys can decrypt. Added mesh reputation system, peer push workers, voluntary Wormhole opt-in for node participation, fork recovery, killwormhole scripts, obfuscated terminology, and hardened the self-updater to protect encryption keys and chain state during updates. New features: Shodan search, train tracking, Sentinel Hub imagery, 8 new intelligence layers, CCTV expansion to 11,000+ cameras across 6 countries, Mesh Terminal CLI, prediction markets, desktop-shell scaffold, and comprehensive mesh test suite (215 frontend + backend tests passing). Community contributors: @wa1id, @AlborzNazari, @adust09, @Xpirix, @imqdcr, @csysp, @suranyami, @chr0n1x, @johan-martensson, @singularfailure, @smithbh, @OrfeoTerkuci, @deuza, @tm-const, @Elhard1, @ttulttul
564 lines
20 KiB
Python
564 lines
20 KiB
Python
"""Data fetcher orchestrator — schedules and coordinates all data source modules.
|
||
|
||
Heavy logic has been extracted into services/fetchers/:
|
||
- _store.py — shared state (latest_data, locks, timestamps)
|
||
- plane_alert.py — aircraft enrichment DB
|
||
- flights.py — commercial flights, routes, trails, GPS jamming
|
||
- military.py — military flights, UAV detection
|
||
- satellites.py — satellite tracking (SGP4)
|
||
- news.py — RSS news fetching, clustering, risk assessment
|
||
- yacht_alert.py — superyacht alert enrichment
|
||
- financial.py — defense stocks, oil prices
|
||
- earth_observation.py — earthquakes, FIRMS fires, space weather, weather radar
|
||
- infrastructure.py — internet outages, data centers, CCTV, KiwiSDR
|
||
- geo.py — ships, airports, frontlines, GDELT, LiveUAMap
|
||
"""
|
||
|
||
import logging
|
||
import concurrent.futures
|
||
import os
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from datetime import datetime
|
||
from services.cctv_pipeline import init_db
|
||
|
||
# Shared state — all fetcher modules read/write through this
|
||
from services.fetchers._store import (
|
||
latest_data,
|
||
source_timestamps,
|
||
_mark_fresh,
|
||
_data_lock, # noqa: F401 — re-exported for main.py
|
||
get_latest_data_subset,
|
||
)
|
||
|
||
# Domain-specific fetcher modules (already extracted)
|
||
from services.fetchers.flights import fetch_flights # noqa: F401
|
||
from services.fetchers.flights import _BLIND_SPOT_REGIONS # noqa: F401 — re-exported for tests
|
||
from services.fetchers.military import fetch_military_flights # noqa: F401
|
||
from services.fetchers.satellites import fetch_satellites # noqa: F401
|
||
from services.fetchers.news import fetch_news # noqa: F401
|
||
|
||
# Newly extracted fetcher modules
|
||
from services.fetchers.financial import fetch_financial_markets # noqa: F401
|
||
from services.fetchers.unusual_whales import fetch_unusual_whales # noqa: F401
|
||
from services.fetchers.earth_observation import ( # noqa: F401
|
||
fetch_earthquakes,
|
||
fetch_firms_fires,
|
||
fetch_firms_country_fires,
|
||
fetch_space_weather,
|
||
fetch_weather,
|
||
fetch_weather_alerts,
|
||
fetch_air_quality,
|
||
fetch_volcanoes,
|
||
fetch_viirs_change_nodes,
|
||
)
|
||
from services.fetchers.infrastructure import ( # noqa: F401
|
||
fetch_internet_outages,
|
||
fetch_ripe_atlas_probes,
|
||
fetch_datacenters,
|
||
fetch_military_bases,
|
||
fetch_power_plants,
|
||
fetch_cctv,
|
||
fetch_kiwisdr,
|
||
fetch_scanners,
|
||
fetch_satnogs,
|
||
fetch_tinygs,
|
||
fetch_psk_reporter,
|
||
)
|
||
from services.fetchers.geo import ( # noqa: F401
|
||
fetch_ships,
|
||
fetch_airports,
|
||
find_nearest_airport,
|
||
cached_airports,
|
||
fetch_frontlines,
|
||
fetch_gdelt,
|
||
fetch_geopolitics,
|
||
update_liveuamap,
|
||
fetch_fishing_activity,
|
||
)
|
||
from services.fetchers.prediction_markets import fetch_prediction_markets # noqa: F401
|
||
from services.fetchers.sigint import fetch_sigint # noqa: F401
|
||
from services.fetchers.trains import fetch_trains # noqa: F401
|
||
from services.fetchers.ukraine_alerts import fetch_ukraine_air_raid_alerts # noqa: F401
|
||
from services.fetchers.meshtastic_map import (
|
||
fetch_meshtastic_nodes,
|
||
load_meshtastic_cache_if_available,
|
||
) # noqa: F401
|
||
from services.fetchers.fimi import fetch_fimi # noqa: F401
|
||
from services.ais_stream import prune_stale_vessels # noqa: F401
|
||
|
||
logger = logging.getLogger(__name__)
|
||
_SLOW_FETCH_S = float(os.environ.get("FETCH_SLOW_THRESHOLD_S", "5"))
|
||
|
||
# Shared thread pool — reused across all fetch cycles instead of creating/destroying per tick
|
||
_SHARED_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
|
||
max_workers=20, thread_name_prefix="fetch"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Scheduler & Orchestration
|
||
# ---------------------------------------------------------------------------
|
||
def _run_tasks(label: str, funcs: list):
|
||
"""Run tasks concurrently and log any exceptions (do not fail silently)."""
|
||
if not funcs:
|
||
return
|
||
futures = {_SHARED_EXECUTOR.submit(func): (func.__name__, time.perf_counter()) for func in funcs}
|
||
for future in concurrent.futures.as_completed(futures):
|
||
name, start = futures[future]
|
||
try:
|
||
future.result()
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_success
|
||
|
||
record_success(name, duration_s=duration)
|
||
if duration > _SLOW_FETCH_S:
|
||
logger.warning(f"{label} task slow: {name} took {duration:.2f}s")
|
||
except Exception as e:
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_failure
|
||
|
||
record_failure(name, error=e, duration_s=duration)
|
||
logger.exception(f"{label} task failed: {name}")
|
||
|
||
|
||
def _run_task_with_health(func, name: str | None = None):
|
||
"""Run a single task with health tracking."""
|
||
task_name = name or getattr(func, "__name__", "task")
|
||
start = time.perf_counter()
|
||
try:
|
||
func()
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_success
|
||
|
||
record_success(task_name, duration_s=duration)
|
||
if duration > _SLOW_FETCH_S:
|
||
logger.warning(f"task slow: {task_name} took {duration:.2f}s")
|
||
except Exception as e:
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_failure
|
||
|
||
record_failure(task_name, error=e, duration_s=duration)
|
||
logger.exception(f"task failed: {task_name}")
|
||
|
||
|
||
def update_fast_data():
|
||
"""Fast-tier: moving entities that need frequent updates (every 60s)."""
|
||
logger.info("Fast-tier data update starting...")
|
||
fast_funcs = [
|
||
fetch_flights,
|
||
fetch_military_flights,
|
||
fetch_ships,
|
||
fetch_satellites,
|
||
fetch_sigint,
|
||
fetch_trains,
|
||
fetch_tinygs,
|
||
]
|
||
_run_tasks("fast-tier", fast_funcs)
|
||
with _data_lock:
|
||
latest_data["last_updated"] = datetime.utcnow().isoformat()
|
||
from services.fetchers._store import bump_data_version
|
||
bump_data_version()
|
||
logger.info("Fast-tier update complete.")
|
||
|
||
|
||
def update_slow_data():
|
||
"""Slow-tier: contextual + enrichment data that refreshes less often (every 5–10 min)."""
|
||
logger.info("Slow-tier data update starting...")
|
||
slow_funcs = [
|
||
fetch_news,
|
||
fetch_prediction_markets,
|
||
fetch_earthquakes,
|
||
fetch_firms_fires,
|
||
fetch_firms_country_fires,
|
||
fetch_weather,
|
||
fetch_space_weather,
|
||
fetch_internet_outages,
|
||
fetch_ripe_atlas_probes, # runs after IODA to deduplicate
|
||
fetch_cctv,
|
||
fetch_kiwisdr,
|
||
fetch_satnogs,
|
||
fetch_frontlines,
|
||
fetch_datacenters,
|
||
fetch_military_bases,
|
||
fetch_scanners,
|
||
fetch_psk_reporter,
|
||
fetch_weather_alerts,
|
||
fetch_air_quality,
|
||
fetch_fishing_activity,
|
||
fetch_power_plants,
|
||
fetch_ukraine_air_raid_alerts,
|
||
]
|
||
_run_tasks("slow-tier", slow_funcs)
|
||
# Run correlation engine after all data is fresh
|
||
try:
|
||
from services.correlation_engine import compute_correlations
|
||
with _data_lock:
|
||
snapshot = dict(latest_data)
|
||
correlations = compute_correlations(snapshot)
|
||
with _data_lock:
|
||
latest_data["correlations"] = correlations
|
||
except Exception as e:
|
||
logger.error("Correlation engine failed: %s", e)
|
||
from services.fetchers._store import bump_data_version
|
||
bump_data_version()
|
||
logger.info("Slow-tier update complete.")
|
||
|
||
|
||
def update_all_data(*, startup_mode: bool = False):
|
||
"""Full refresh.
|
||
|
||
On startup we prefer cached/DB-backed data first, then let scheduled jobs
|
||
perform some heavy top-ups after the app is already responsive.
|
||
"""
|
||
logger.info("Full data update starting (parallel)...")
|
||
# Preload Meshtastic map cache immediately (instant, from disk)
|
||
load_meshtastic_cache_if_available()
|
||
with _data_lock:
|
||
meshtastic_seeded = bool(latest_data.get("meshtastic_map_nodes"))
|
||
futures = {
|
||
_SHARED_EXECUTOR.submit(fetch_airports): ("fetch_airports", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(update_fast_data): ("update_fast_data", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(update_slow_data): ("update_slow_data", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(fetch_volcanoes): ("fetch_volcanoes", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(fetch_viirs_change_nodes): ("fetch_viirs_change_nodes", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(fetch_unusual_whales): ("fetch_unusual_whales", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(fetch_fimi): ("fetch_fimi", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(fetch_gdelt): ("fetch_gdelt", time.perf_counter()),
|
||
_SHARED_EXECUTOR.submit(update_liveuamap): ("update_liveuamap", time.perf_counter()),
|
||
}
|
||
if not startup_mode or not meshtastic_seeded:
|
||
futures[_SHARED_EXECUTOR.submit(fetch_meshtastic_nodes)] = (
|
||
"fetch_meshtastic_nodes",
|
||
time.perf_counter(),
|
||
)
|
||
else:
|
||
logger.info(
|
||
"Startup preload: Meshtastic cache already loaded, deferring remote map refresh to scheduled cadence"
|
||
)
|
||
for future in concurrent.futures.as_completed(futures):
|
||
name, start = futures[future]
|
||
try:
|
||
future.result()
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_success
|
||
|
||
record_success(name, duration_s=duration)
|
||
if duration > _SLOW_FETCH_S:
|
||
logger.warning(f"full-refresh task slow: {name} took {duration:.2f}s")
|
||
except Exception as e:
|
||
duration = time.perf_counter() - start
|
||
from services.fetch_health import record_failure
|
||
|
||
record_failure(name, error=e, duration_s=duration)
|
||
logger.exception(f"full-refresh task failed: {name}")
|
||
logger.info("Full data update complete.")
|
||
|
||
|
||
_scheduler = None
|
||
_STARTUP_CCTV_INGEST_DELAY_S = 30
|
||
_FINANCIAL_REFRESH_MINUTES = 30
|
||
|
||
|
||
def _oracle_resolution_sweep():
|
||
"""Hourly sweep: check if any markets with active predictions have concluded.
|
||
|
||
Resolution logic:
|
||
- If a market's end_date has passed AND it's no longer in the active API data → resolved
|
||
- For binary markets: final probability determines outcome (>50% = yes, <50% = no)
|
||
- For multi-outcome: the outcome with highest final probability wins
|
||
"""
|
||
try:
|
||
from services.mesh.mesh_oracle import oracle_ledger
|
||
|
||
active_titles = oracle_ledger.get_active_markets()
|
||
if not active_titles:
|
||
return
|
||
|
||
# Get current market data
|
||
with _data_lock:
|
||
markets = list(latest_data.get("prediction_markets", []))
|
||
|
||
# Build lookup of active API markets
|
||
api_titles = {m.get("title", "").lower(): m for m in markets}
|
||
|
||
import time as _time
|
||
|
||
now = _time.time()
|
||
resolved_count = 0
|
||
|
||
for title in active_titles:
|
||
api_market = api_titles.get(title.lower())
|
||
|
||
# If market still in API and end_date hasn't passed, skip
|
||
if api_market:
|
||
end_date = api_market.get("end_date")
|
||
if end_date:
|
||
try:
|
||
from datetime import datetime, timezone
|
||
|
||
dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
|
||
if dt.timestamp() > now:
|
||
continue # Market hasn't ended yet
|
||
except Exception:
|
||
continue
|
||
else:
|
||
continue # No end date, can't auto-resolve
|
||
|
||
# Market has concluded (past end_date or dropped from API)
|
||
# Determine outcome from last known data
|
||
if api_market:
|
||
outcomes = api_market.get("outcomes", [])
|
||
if outcomes and len(outcomes) > 2:
|
||
# Multi-outcome: highest pct wins
|
||
best = max(outcomes, key=lambda o: o.get("pct", 0))
|
||
outcome = best.get("name", "")
|
||
else:
|
||
# Binary: consensus > 50 = yes
|
||
pct = api_market.get("consensus_pct") or api_market.get("polymarket_pct") or 50
|
||
outcome = "yes" if float(pct) > 50 else "no"
|
||
else:
|
||
# Market dropped from API entirely — can't determine outcome, skip
|
||
logger.warning(
|
||
f"Oracle sweep: market '{title}' no longer in API, cannot auto-resolve"
|
||
)
|
||
continue
|
||
|
||
if not outcome:
|
||
continue
|
||
|
||
# Resolve both free predictions and market stakes
|
||
winners, losers = oracle_ledger.resolve_market(title, outcome)
|
||
stake_result = oracle_ledger.resolve_market_stakes(title, outcome)
|
||
resolved_count += 1
|
||
logger.info(
|
||
f"Oracle sweep resolved '{title}' → {outcome}: "
|
||
f"{winners}W/{losers}L free, "
|
||
f"{stake_result.get('winners', 0)}W/{stake_result.get('losers', 0)}L staked"
|
||
)
|
||
|
||
if resolved_count:
|
||
logger.info(f"Oracle sweep complete: {resolved_count} markets resolved")
|
||
# Also clean up old data periodically
|
||
oracle_ledger.cleanup_old_data()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Oracle resolution sweep error: {e}")
|
||
|
||
|
||
def start_scheduler():
|
||
global _scheduler
|
||
init_db()
|
||
_scheduler = BackgroundScheduler(daemon=True)
|
||
|
||
# Fast tier — every 60 seconds
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(update_fast_data, "update_fast_data"),
|
||
"interval",
|
||
seconds=60,
|
||
id="fast_tier",
|
||
max_instances=1,
|
||
misfire_grace_time=30,
|
||
)
|
||
|
||
# Slow tier — every 5 minutes
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(update_slow_data, "update_slow_data"),
|
||
"interval",
|
||
minutes=5,
|
||
id="slow_tier",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
)
|
||
|
||
# Weather alerts — every 5 minutes (time-critical, separate from slow tier)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_weather_alerts, "fetch_weather_alerts"),
|
||
"interval",
|
||
minutes=5,
|
||
id="weather_alerts",
|
||
max_instances=1,
|
||
misfire_grace_time=60,
|
||
)
|
||
|
||
# Ukraine air raid alerts — every 2 minutes (time-critical)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_ukraine_air_raid_alerts, "fetch_ukraine_air_raid_alerts"),
|
||
"interval",
|
||
minutes=2,
|
||
id="ukraine_alerts",
|
||
max_instances=1,
|
||
misfire_grace_time=60,
|
||
)
|
||
|
||
# AIS vessel pruning — every 5 minutes (prevents unbounded memory growth)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(prune_stale_vessels, "prune_stale_vessels"),
|
||
"interval",
|
||
minutes=5,
|
||
id="ais_prune",
|
||
max_instances=1,
|
||
misfire_grace_time=60,
|
||
)
|
||
|
||
# GDELT — every 30 minutes (downloads 32 ZIP files per call, avoid rate limits)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_gdelt, "fetch_gdelt"),
|
||
"interval",
|
||
minutes=30,
|
||
id="gdelt",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(update_liveuamap, "update_liveuamap"),
|
||
"interval",
|
||
minutes=30,
|
||
id="liveuamap",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
)
|
||
|
||
# CCTV pipeline refresh — runs all ingestors, then refreshes in-memory data.
|
||
# Delay the first run slightly so startup serves cached/DB-backed data first.
|
||
from services.cctv_pipeline import (
|
||
TFLJamCamIngestor,
|
||
LTASingaporeIngestor,
|
||
AustinTXIngestor,
|
||
NYCDOTIngestor,
|
||
CaltransIngestor,
|
||
ColoradoDOTIngestor,
|
||
WSDOTIngestor,
|
||
GeorgiaDOTIngestor,
|
||
IllinoisDOTIngestor,
|
||
MichiganDOTIngestor,
|
||
WindyWebcamsIngestor,
|
||
DGTNationalIngestor,
|
||
MadridCityIngestor,
|
||
OSMTrafficCameraIngestor,
|
||
)
|
||
|
||
_cctv_ingestors = [
|
||
(TFLJamCamIngestor(), "cctv_tfl"),
|
||
(LTASingaporeIngestor(), "cctv_lta"),
|
||
(AustinTXIngestor(), "cctv_atx"),
|
||
(NYCDOTIngestor(), "cctv_nyc"),
|
||
(CaltransIngestor(), "cctv_caltrans"),
|
||
(ColoradoDOTIngestor(), "cctv_codot"),
|
||
(WSDOTIngestor(), "cctv_wsdot"),
|
||
(GeorgiaDOTIngestor(), "cctv_gdot"),
|
||
(IllinoisDOTIngestor(), "cctv_idot"),
|
||
(MichiganDOTIngestor(), "cctv_mdot"),
|
||
(WindyWebcamsIngestor(), "cctv_windy"),
|
||
(DGTNationalIngestor(), "cctv_dgt"),
|
||
(MadridCityIngestor(), "cctv_madrid"),
|
||
(OSMTrafficCameraIngestor(), "cctv_osm"),
|
||
]
|
||
|
||
def _run_cctv_ingest_cycle():
|
||
from services.fetchers._store import is_any_active
|
||
|
||
if not is_any_active("cctv"):
|
||
return
|
||
for ingestor, name in _cctv_ingestors:
|
||
_run_task_with_health(ingestor.ingest, name)
|
||
# Refresh in-memory CCTV data immediately after ingest
|
||
try:
|
||
from services.cctv_pipeline import get_all_cameras
|
||
from services.fetchers.infrastructure import fetch_cctv
|
||
fetch_cctv()
|
||
logger.info(f"CCTV ingest cycle complete — {len(get_all_cameras())} cameras in DB")
|
||
except Exception as e:
|
||
logger.warning(f"CCTV post-ingest refresh failed: {e}")
|
||
|
||
_scheduler.add_job(
|
||
_run_cctv_ingest_cycle,
|
||
"interval",
|
||
minutes=10,
|
||
id="cctv_ingest",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
next_run_time=datetime.utcnow() + timedelta(seconds=_STARTUP_CCTV_INGEST_DELAY_S),
|
||
)
|
||
|
||
# Financial tickers — every 30 minutes (Yahoo Finance rate-limits aggressively)
|
||
def _fetch_financial():
|
||
_run_task_with_health(fetch_financial_markets, "fetch_financial_markets")
|
||
|
||
_scheduler.add_job(
|
||
_fetch_financial,
|
||
"interval",
|
||
minutes=_FINANCIAL_REFRESH_MINUTES,
|
||
id="financial_tickers",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
next_run_time=datetime.utcnow() + timedelta(minutes=_FINANCIAL_REFRESH_MINUTES),
|
||
)
|
||
|
||
# Unusual Whales — every 15 minutes (congress trades, dark pool, flow alerts)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_unusual_whales, "fetch_unusual_whales"),
|
||
"interval",
|
||
minutes=15,
|
||
id="unusual_whales",
|
||
max_instances=1,
|
||
misfire_grace_time=120,
|
||
)
|
||
|
||
# Meshtastic map API — every 4 hours, fetch global node positions
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_meshtastic_nodes, "fetch_meshtastic_nodes"),
|
||
"interval",
|
||
hours=4,
|
||
id="meshtastic_map",
|
||
max_instances=1,
|
||
misfire_grace_time=600,
|
||
)
|
||
|
||
# Oracle resolution sweep — every hour, check if any markets with predictions have concluded
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(_oracle_resolution_sweep, "oracle_sweep"),
|
||
"interval",
|
||
hours=1,
|
||
id="oracle_sweep",
|
||
max_instances=1,
|
||
misfire_grace_time=300,
|
||
)
|
||
|
||
# VIIRS change detection — every 12 hours (monthly composites, no rush)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_viirs_change_nodes, "fetch_viirs_change_nodes"),
|
||
"interval",
|
||
hours=12,
|
||
id="viirs_change",
|
||
max_instances=1,
|
||
misfire_grace_time=600,
|
||
)
|
||
|
||
# FIMI disinformation index — every 12 hours (weekly editorial feed)
|
||
_scheduler.add_job(
|
||
lambda: _run_task_with_health(fetch_fimi, "fetch_fimi"),
|
||
"interval",
|
||
hours=12,
|
||
id="fimi",
|
||
max_instances=1,
|
||
misfire_grace_time=600,
|
||
)
|
||
|
||
_scheduler.start()
|
||
logger.info("Scheduler started.")
|
||
|
||
|
||
def stop_scheduler():
|
||
if _scheduler:
|
||
_scheduler.shutdown(wait=False)
|
||
|
||
|
||
def get_latest_data():
|
||
return get_latest_data_subset(*latest_data.keys())
|