Merge pull request #96 from johan-martensson/fix/financial-batch-fetch

fix: replace concurrent yfinance fetches with single batch download
This commit is contained in:
Shadowbroker
2026-03-22 01:48:14 -06:00
committed by GitHub
2 changed files with 99 additions and 139 deletions
+26 -105
View File
@@ -13,23 +13,19 @@ Heavy logic has been extracted into services/fetchers/:
- infrastructure.py — internet outages, data centers, CCTV, KiwiSDR
- geo.py — ships, airports, frontlines, GDELT, LiveUAMap
"""
import logging
import concurrent.futures
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from services.cctv_pipeline import init_db
# Shared state — all fetcher modules read/write through this
from services.fetchers._store import (
latest_data,
source_timestamps,
_mark_fresh,
_data_lock, # noqa: F401 — re-exported for main.py
latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — re-exported for main.py
)
# Domain-specific fetcher modules (already extracted)
@@ -42,65 +38,19 @@ from services.fetchers.news import fetch_news # noqa: F401
# Newly extracted fetcher modules
from services.fetchers.financial import fetch_defense_stocks, fetch_oil_prices # noqa: F401
from services.fetchers.earth_observation import ( # noqa: F401
fetch_earthquakes,
fetch_firms_fires,
fetch_space_weather,
fetch_weather,
fetch_earthquakes, fetch_firms_fires, fetch_space_weather, fetch_weather,
)
from services.fetchers.infrastructure import ( # noqa: F401
fetch_internet_outages,
fetch_datacenters,
fetch_military_bases,
fetch_power_plants,
fetch_cctv,
fetch_kiwisdr,
fetch_internet_outages, fetch_datacenters, fetch_military_bases, fetch_power_plants,
fetch_cctv, fetch_kiwisdr,
)
from services.fetchers.geo import ( # noqa: F401
fetch_ships,
fetch_airports,
find_nearest_airport,
cached_airports,
fetch_frontlines,
fetch_gdelt,
fetch_geopolitics,
update_liveuamap,
fetch_ships, fetch_airports, find_nearest_airport, cached_airports,
fetch_frontlines, fetch_gdelt, fetch_geopolitics, update_liveuamap,
)
logger = logging.getLogger(__name__)
def run_cctv_ingest_cycle():
"""Populate the CCTV database, then refresh the in-memory cache."""
logger.info("CCTV ingest cycle starting...")
from services.cctv_pipeline import (
AustinTXIngestor,
LTASingaporeIngestor,
MadridCCTVIngestor,
MalagaCCTVIngestor,
NYCDOTIngestor,
SpainDGTIngestor,
TFLJamCamIngestor,
VigoCCTVIngestor,
VitoriaGasteizCCTVIngestor,
)
for ingestor_cls in (
TFLJamCamIngestor,
LTASingaporeIngestor,
AustinTXIngestor,
NYCDOTIngestor,
SpainDGTIngestor,
MadridCCTVIngestor,
MalagaCCTVIngestor,
VigoCCTVIngestor,
VitoriaGasteizCCTVIngestor,
):
ingestor_cls().ingest()
fetch_cctv()
logger.info("CCTV ingest cycle complete.")
# ---------------------------------------------------------------------------
# Scheduler & Orchestration
# ---------------------------------------------------------------------------
@@ -117,10 +67,9 @@ def update_fast_data():
futures = [executor.submit(func) for func in fast_funcs]
concurrent.futures.wait(futures)
with _data_lock:
latest_data["last_updated"] = datetime.utcnow().isoformat()
latest_data['last_updated'] = datetime.utcnow().isoformat()
logger.info("Fast-tier update complete.")
def update_slow_data():
"""Slow-tier: contextual + enrichment data that refreshes less often (every 510 min)."""
logger.info("Slow-tier data update starting...")
@@ -146,7 +95,6 @@ def update_slow_data():
concurrent.futures.wait(futures)
logger.info("Slow-tier update complete.")
def update_all_data():
"""Full refresh — all tiers run IN PARALLEL for fastest startup."""
logger.info("Full data update starting (parallel)...")
@@ -157,73 +105,46 @@ def update_all_data():
concurrent.futures.wait([f0, f1, f2])
logger.info("Full data update complete.")
_scheduler = None
def start_scheduler():
global _scheduler
init_db()
_scheduler = BackgroundScheduler(daemon=True)
# Fast tier — every 60 seconds
_scheduler.add_job(
update_fast_data,
"interval",
seconds=60,
id="fast_tier",
max_instances=1,
misfire_grace_time=30,
)
_scheduler.add_job(update_fast_data, 'interval', seconds=60, id='fast_tier', max_instances=1, misfire_grace_time=30)
# Slow tier — every 5 minutes
_scheduler.add_job(
update_slow_data,
"interval",
minutes=5,
id="slow_tier",
max_instances=1,
misfire_grace_time=120,
)
_scheduler.add_job(update_slow_data, 'interval', minutes=5, id='slow_tier', max_instances=1, misfire_grace_time=120)
# Very slow — every 15 minutes
_scheduler.add_job(
fetch_gdelt,
"interval",
minutes=15,
id="gdelt",
max_instances=1,
misfire_grace_time=120,
)
_scheduler.add_job(
update_liveuamap,
"interval",
minutes=15,
id="liveuamap",
max_instances=1,
misfire_grace_time=120,
)
_scheduler.add_job(fetch_gdelt, 'interval', minutes=15, id='gdelt', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(update_liveuamap, 'interval', minutes=15, id='liveuamap', max_instances=1, misfire_grace_time=120)
# CCTV pipeline refresh — hydrate DB-backed cameras immediately, then every 10 minutes.
_scheduler.add_job(
run_cctv_ingest_cycle,
"interval",
minutes=10,
id="cctv_ingest",
max_instances=1,
misfire_grace_time=120,
next_run_time=datetime.utcnow(),
# CCTV pipeline refresh — every 10 minutes
# Instantiate once and reuse — avoids re-creating DB connections on every tick
from services.cctv_pipeline import (
TFLJamCamIngestor, LTASingaporeIngestor,
AustinTXIngestor, NYCDOTIngestor,
)
_cctv_tfl = TFLJamCamIngestor()
_cctv_lta = LTASingaporeIngestor()
_cctv_atx = AustinTXIngestor()
_cctv_nyc = NYCDOTIngestor()
_now = datetime.now()
_scheduler.add_job(_cctv_tfl.ingest, 'interval', minutes=10, id='cctv_tfl', max_instances=1, misfire_grace_time=120, next_run_time=_now)
_scheduler.add_job(_cctv_lta.ingest, 'interval', minutes=10, id='cctv_lta', max_instances=1, misfire_grace_time=120, next_run_time=_now)
_scheduler.add_job(_cctv_atx.ingest, 'interval', minutes=10, id='cctv_atx', max_instances=1, misfire_grace_time=120, next_run_time=_now)
_scheduler.add_job(_cctv_nyc.ingest, 'interval', minutes=10, id='cctv_nyc', max_instances=1, misfire_grace_time=120, next_run_time=_now)
_scheduler.start()
logger.info("Scheduler started.")
def stop_scheduler():
if _scheduler:
_scheduler.shutdown(wait=False)
def get_latest_data():
with _data_lock:
return dict(latest_data)
+73 -34
View File
@@ -1,9 +1,8 @@
"""Financial data fetchers — defense stocks and oil prices.
Uses yfinance for ticker data with concurrent execution for performance.
Uses yfinance batch download to minimise Yahoo Finance requests and avoid rate limiting.
"""
import logging
import concurrent.futures
import yfinance as yf
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
@@ -11,48 +10,88 @@ from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
def _fetch_single_ticker(symbol: str, period: str = "2d"):
"""Fetch a single yfinance ticker. Returns (symbol, data_dict) or (symbol, None)."""
def _batch_fetch(symbols: list[str], period: str = "5d") -> dict:
"""Fetch multiple tickers in a single yfinance request. Returns {symbol: {price, change_percent, up}}."""
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period=period)
if len(hist) >= 1:
current_price = hist['Close'].iloc[-1]
prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price
change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0
return symbol, {
"price": round(float(current_price), 2),
"change_percent": round(float(change_percent), 2),
"up": bool(change_percent >= 0)
}
hist = yf.download(symbols, period=period, auto_adjust=True, progress=False)
if hist.empty:
return {}
close = hist["Close"]
result = {}
for sym in symbols:
try:
col = close[sym] if len(symbols) > 1 else close
col = col.dropna()
if len(col) < 1:
continue
current = float(col.iloc[-1])
prev = float(col.iloc[0]) if len(col) > 1 else current
change = ((current - prev) / prev * 100) if prev else 0
result[sym] = {
"price": round(current, 2),
"change_percent": round(change, 2),
"up": bool(change >= 0),
}
except Exception as e:
logger.warning(f"Could not parse {sym}: {e}")
return result
except Exception as e:
logger.warning(f"Could not fetch data for {symbol}: {e}")
return symbol, None
logger.warning(f"Batch fetch failed: {e}")
return {}
@with_retry(max_retries=1, base_delay=1)
_STOCK_TICKERS = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"]
_OIL_MAP = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"}
_ALL_TICKERS = _STOCK_TICKERS + list(_OIL_MAP.values())
_MARKET_COOLDOWN_SECONDS = 1800 # fetch at most once every 30 minutes
_last_market_fetch: float = 0.0
def _fetch_all_market_data():
"""Single yfinance download for all market tickers to avoid rate limiting."""
raw = _batch_fetch(_ALL_TICKERS, period="5d")
stocks = {sym: raw[sym] for sym in _STOCK_TICKERS if sym in raw}
oil = {name: raw[sym] for name, sym in _OIL_MAP.items() if sym in raw}
return stocks, oil
@with_retry(max_retries=2, base_delay=10)
def fetch_defense_stocks():
tickers = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"]
global _last_market_fetch
import time
if time.time() - _last_market_fetch < _MARKET_COOLDOWN_SECONDS:
return
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
results = pool.map(lambda t: _fetch_single_ticker(t, "2d"), tickers)
stocks_data = {sym: data for sym, data in results if data}
with _data_lock:
latest_data['stocks'] = stocks_data
_mark_fresh("stocks")
stocks, oil = _fetch_all_market_data()
if stocks:
_last_market_fetch = time.time()
with _data_lock:
latest_data['stocks'] = stocks
if oil:
latest_data['oil'] = oil
_mark_fresh("stocks")
if oil:
_mark_fresh("oil")
logger.info(f"Markets: {len(stocks)} stocks, {len(oil)} oil tickers")
else:
logger.warning("Markets: empty result from yfinance (rate limited?)")
except Exception as e:
logger.error(f"Error fetching stocks: {e}")
logger.error(f"Error fetching market data: {e}")
@with_retry(max_retries=1, base_delay=1)
@with_retry(max_retries=1, base_delay=10)
def fetch_oil_prices():
tickers = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"}
# Oil is now fetched together with stocks in fetch_defense_stocks to use a single request.
# This function is kept for scheduler compatibility but is a no-op if stocks already ran.
with _data_lock:
if latest_data.get('oil'):
return # Already populated by fetch_defense_stocks
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
results = pool.map(lambda item: (_fetch_single_ticker(item[1], "5d")[1], item[0]), tickers.items())
oil_data = {name: data for data, name in results if data}
with _data_lock:
latest_data['oil'] = oil_data
_mark_fresh("oil")
_, oil = _fetch_all_market_data()
if oil:
with _data_lock:
latest_data['oil'] = oil
_mark_fresh("oil")
except Exception as e:
logger.error(f"Error fetching oil: {e}")