From 98a9293166caafa095b78137b0e23e6918c2398f Mon Sep 17 00:00:00 2001 From: Johan Martensson Date: Sun, 22 Mar 2026 05:31:28 +0000 Subject: [PATCH] fix: replace concurrent yfinance fetches with single batch download to avoid rate limiting --- backend/services/data_fetcher.py | 131 +++++-------------------- backend/services/fetchers/financial.py | 107 +++++++++++++------- 2 files changed, 99 insertions(+), 139 deletions(-) diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 24e90ac..bce5ff9 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -13,23 +13,19 @@ Heavy logic has been extracted into services/fetchers/: - infrastructure.py — internet outages, data centers, CCTV, KiwiSDR - geo.py — ships, airports, frontlines, GDELT, LiveUAMap """ - import logging import concurrent.futures from datetime import datetime from dotenv import load_dotenv - load_dotenv() from apscheduler.schedulers.background import BackgroundScheduler +from datetime import datetime from services.cctv_pipeline import init_db # Shared state — all fetcher modules read/write through this from services.fetchers._store import ( - latest_data, - source_timestamps, - _mark_fresh, - _data_lock, # noqa: F401 — re-exported for main.py + latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — re-exported for main.py ) # Domain-specific fetcher modules (already extracted) @@ -42,65 +38,19 @@ from services.fetchers.news import fetch_news # noqa: F401 # Newly extracted fetcher modules from services.fetchers.financial import fetch_defense_stocks, fetch_oil_prices # noqa: F401 from services.fetchers.earth_observation import ( # noqa: F401 - fetch_earthquakes, - fetch_firms_fires, - fetch_space_weather, - fetch_weather, + fetch_earthquakes, fetch_firms_fires, fetch_space_weather, fetch_weather, ) from services.fetchers.infrastructure import ( # noqa: F401 - fetch_internet_outages, - fetch_datacenters, - fetch_military_bases, - fetch_power_plants, - fetch_cctv, - fetch_kiwisdr, + fetch_internet_outages, fetch_datacenters, fetch_military_bases, fetch_power_plants, + fetch_cctv, fetch_kiwisdr, ) from services.fetchers.geo import ( # noqa: F401 - fetch_ships, - fetch_airports, - find_nearest_airport, - cached_airports, - fetch_frontlines, - fetch_gdelt, - fetch_geopolitics, - update_liveuamap, + fetch_ships, fetch_airports, find_nearest_airport, cached_airports, + fetch_frontlines, fetch_gdelt, fetch_geopolitics, update_liveuamap, ) logger = logging.getLogger(__name__) - -def run_cctv_ingest_cycle(): - """Populate the CCTV database, then refresh the in-memory cache.""" - logger.info("CCTV ingest cycle starting...") - from services.cctv_pipeline import ( - AustinTXIngestor, - LTASingaporeIngestor, - MadridCCTVIngestor, - MalagaCCTVIngestor, - NYCDOTIngestor, - SpainDGTIngestor, - TFLJamCamIngestor, - VigoCCTVIngestor, - VitoriaGasteizCCTVIngestor, - ) - - for ingestor_cls in ( - TFLJamCamIngestor, - LTASingaporeIngestor, - AustinTXIngestor, - NYCDOTIngestor, - SpainDGTIngestor, - MadridCCTVIngestor, - MalagaCCTVIngestor, - VigoCCTVIngestor, - VitoriaGasteizCCTVIngestor, - ): - ingestor_cls().ingest() - - fetch_cctv() - logger.info("CCTV ingest cycle complete.") - - # --------------------------------------------------------------------------- # Scheduler & Orchestration # --------------------------------------------------------------------------- @@ -117,10 +67,9 @@ def update_fast_data(): futures = [executor.submit(func) for func in fast_funcs] concurrent.futures.wait(futures) with _data_lock: - latest_data["last_updated"] = datetime.utcnow().isoformat() + latest_data['last_updated'] = datetime.utcnow().isoformat() logger.info("Fast-tier update complete.") - def update_slow_data(): """Slow-tier: contextual + enrichment data that refreshes less often (every 5–10 min).""" logger.info("Slow-tier data update starting...") @@ -146,7 +95,6 @@ def update_slow_data(): concurrent.futures.wait(futures) logger.info("Slow-tier update complete.") - def update_all_data(): """Full refresh — all tiers run IN PARALLEL for fastest startup.""" logger.info("Full data update starting (parallel)...") @@ -157,73 +105,46 @@ def update_all_data(): concurrent.futures.wait([f0, f1, f2]) logger.info("Full data update complete.") - _scheduler = None - def start_scheduler(): global _scheduler init_db() _scheduler = BackgroundScheduler(daemon=True) # Fast tier — every 60 seconds - _scheduler.add_job( - update_fast_data, - "interval", - seconds=60, - id="fast_tier", - max_instances=1, - misfire_grace_time=30, - ) + _scheduler.add_job(update_fast_data, 'interval', seconds=60, id='fast_tier', max_instances=1, misfire_grace_time=30) # Slow tier — every 5 minutes - _scheduler.add_job( - update_slow_data, - "interval", - minutes=5, - id="slow_tier", - max_instances=1, - misfire_grace_time=120, - ) + _scheduler.add_job(update_slow_data, 'interval', minutes=5, id='slow_tier', max_instances=1, misfire_grace_time=120) # Very slow — every 15 minutes - _scheduler.add_job( - fetch_gdelt, - "interval", - minutes=15, - id="gdelt", - max_instances=1, - misfire_grace_time=120, - ) - _scheduler.add_job( - update_liveuamap, - "interval", - minutes=15, - id="liveuamap", - max_instances=1, - misfire_grace_time=120, - ) + _scheduler.add_job(fetch_gdelt, 'interval', minutes=15, id='gdelt', max_instances=1, misfire_grace_time=120) + _scheduler.add_job(update_liveuamap, 'interval', minutes=15, id='liveuamap', max_instances=1, misfire_grace_time=120) - # CCTV pipeline refresh — hydrate DB-backed cameras immediately, then every 10 minutes. - _scheduler.add_job( - run_cctv_ingest_cycle, - "interval", - minutes=10, - id="cctv_ingest", - max_instances=1, - misfire_grace_time=120, - next_run_time=datetime.utcnow(), + # CCTV pipeline refresh — every 10 minutes + # Instantiate once and reuse — avoids re-creating DB connections on every tick + from services.cctv_pipeline import ( + TFLJamCamIngestor, LTASingaporeIngestor, + AustinTXIngestor, NYCDOTIngestor, ) + _cctv_tfl = TFLJamCamIngestor() + _cctv_lta = LTASingaporeIngestor() + _cctv_atx = AustinTXIngestor() + _cctv_nyc = NYCDOTIngestor() + _now = datetime.now() + _scheduler.add_job(_cctv_tfl.ingest, 'interval', minutes=10, id='cctv_tfl', max_instances=1, misfire_grace_time=120, next_run_time=_now) + _scheduler.add_job(_cctv_lta.ingest, 'interval', minutes=10, id='cctv_lta', max_instances=1, misfire_grace_time=120, next_run_time=_now) + _scheduler.add_job(_cctv_atx.ingest, 'interval', minutes=10, id='cctv_atx', max_instances=1, misfire_grace_time=120, next_run_time=_now) + _scheduler.add_job(_cctv_nyc.ingest, 'interval', minutes=10, id='cctv_nyc', max_instances=1, misfire_grace_time=120, next_run_time=_now) _scheduler.start() logger.info("Scheduler started.") - def stop_scheduler(): if _scheduler: _scheduler.shutdown(wait=False) - def get_latest_data(): with _data_lock: return dict(latest_data) diff --git a/backend/services/fetchers/financial.py b/backend/services/fetchers/financial.py index f131b7a..f207930 100644 --- a/backend/services/fetchers/financial.py +++ b/backend/services/fetchers/financial.py @@ -1,9 +1,8 @@ """Financial data fetchers — defense stocks and oil prices. -Uses yfinance for ticker data with concurrent execution for performance. +Uses yfinance batch download to minimise Yahoo Finance requests and avoid rate limiting. """ import logging -import concurrent.futures import yfinance as yf from services.fetchers._store import latest_data, _data_lock, _mark_fresh from services.fetchers.retry import with_retry @@ -11,48 +10,88 @@ from services.fetchers.retry import with_retry logger = logging.getLogger(__name__) -def _fetch_single_ticker(symbol: str, period: str = "2d"): - """Fetch a single yfinance ticker. Returns (symbol, data_dict) or (symbol, None).""" +def _batch_fetch(symbols: list[str], period: str = "5d") -> dict: + """Fetch multiple tickers in a single yfinance request. Returns {symbol: {price, change_percent, up}}.""" try: - ticker = yf.Ticker(symbol) - hist = ticker.history(period=period) - if len(hist) >= 1: - current_price = hist['Close'].iloc[-1] - prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price - change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0 - return symbol, { - "price": round(float(current_price), 2), - "change_percent": round(float(change_percent), 2), - "up": bool(change_percent >= 0) - } + hist = yf.download(symbols, period=period, auto_adjust=True, progress=False) + if hist.empty: + return {} + close = hist["Close"] + result = {} + for sym in symbols: + try: + col = close[sym] if len(symbols) > 1 else close + col = col.dropna() + if len(col) < 1: + continue + current = float(col.iloc[-1]) + prev = float(col.iloc[0]) if len(col) > 1 else current + change = ((current - prev) / prev * 100) if prev else 0 + result[sym] = { + "price": round(current, 2), + "change_percent": round(change, 2), + "up": bool(change >= 0), + } + except Exception as e: + logger.warning(f"Could not parse {sym}: {e}") + return result except Exception as e: - logger.warning(f"Could not fetch data for {symbol}: {e}") - return symbol, None + logger.warning(f"Batch fetch failed: {e}") + return {} -@with_retry(max_retries=1, base_delay=1) +_STOCK_TICKERS = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"] +_OIL_MAP = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"} +_ALL_TICKERS = _STOCK_TICKERS + list(_OIL_MAP.values()) + +_MARKET_COOLDOWN_SECONDS = 1800 # fetch at most once every 30 minutes +_last_market_fetch: float = 0.0 + + +def _fetch_all_market_data(): + """Single yfinance download for all market tickers to avoid rate limiting.""" + raw = _batch_fetch(_ALL_TICKERS, period="5d") + stocks = {sym: raw[sym] for sym in _STOCK_TICKERS if sym in raw} + oil = {name: raw[sym] for name, sym in _OIL_MAP.items() if sym in raw} + return stocks, oil + + +@with_retry(max_retries=2, base_delay=10) def fetch_defense_stocks(): - tickers = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"] + global _last_market_fetch + import time + if time.time() - _last_market_fetch < _MARKET_COOLDOWN_SECONDS: + return try: - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool: - results = pool.map(lambda t: _fetch_single_ticker(t, "2d"), tickers) - stocks_data = {sym: data for sym, data in results if data} - with _data_lock: - latest_data['stocks'] = stocks_data - _mark_fresh("stocks") + stocks, oil = _fetch_all_market_data() + if stocks: + _last_market_fetch = time.time() + with _data_lock: + latest_data['stocks'] = stocks + if oil: + latest_data['oil'] = oil + _mark_fresh("stocks") + if oil: + _mark_fresh("oil") + logger.info(f"Markets: {len(stocks)} stocks, {len(oil)} oil tickers") + else: + logger.warning("Markets: empty result from yfinance (rate limited?)") except Exception as e: - logger.error(f"Error fetching stocks: {e}") + logger.error(f"Error fetching market data: {e}") -@with_retry(max_retries=1, base_delay=1) +@with_retry(max_retries=1, base_delay=10) def fetch_oil_prices(): - tickers = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"} + # Oil is now fetched together with stocks in fetch_defense_stocks to use a single request. + # This function is kept for scheduler compatibility but is a no-op if stocks already ran. + with _data_lock: + if latest_data.get('oil'): + return # Already populated by fetch_defense_stocks try: - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: - results = pool.map(lambda item: (_fetch_single_ticker(item[1], "5d")[1], item[0]), tickers.items()) - oil_data = {name: data for data, name in results if data} - with _data_lock: - latest_data['oil'] = oil_data - _mark_fresh("oil") + _, oil = _fetch_all_market_data() + if oil: + with _data_lock: + latest_data['oil'] = oil + _mark_fresh("oil") except Exception as e: logger.error(f"Error fetching oil: {e}")