mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-04-25 20:16:15 +02:00
fc9eff865e
New features: - In-app auto-updater with confirmation dialog, manual download fallback, restart polling, and protected file safety net - Ship layers split into 4 independent toggles (Military/Carriers, Cargo/Tankers, Civilian, Cruise/Passenger) with per-category counts - Stable entity IDs using MMSI/callsign instead of volatile array indices - Dismissible threat alert bubbles (session-scoped, survives data refresh) Performance: - GDELT title fetching is now non-blocking (background enrichment) - Removed duplicate startup fetch jobs - Docker healthcheck start_period 15s → 90s Bug fixes: - Removed fake intelligence assessment generator (OSINT-only policy) - Fixed carrier tracker GDELT 429/TypeError crash - Fixed ETag collision (full payload hash) - Added concurrent /api/refresh guard Contributors: @imqdcr (ship split + stable IDs), @csysp (dismissible alerts, PR #48) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Former-commit-id: a2c4c67da54345393f70a9b33b52e7e4fd6c049f
51 lines
1.7 KiB
Python
51 lines
1.7 KiB
Python
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _suppress_background_services():
|
|
"""Prevent real scheduler/stream/tracker from starting during tests."""
|
|
with patch("services.data_fetcher.start_scheduler"), \
|
|
patch("services.data_fetcher.stop_scheduler"), \
|
|
patch("services.ais_stream.start_ais_stream"), \
|
|
patch("services.ais_stream.stop_ais_stream"), \
|
|
patch("services.carrier_tracker.start_carrier_tracker"), \
|
|
patch("services.carrier_tracker.stop_carrier_tracker"):
|
|
yield
|
|
|
|
|
|
@pytest.fixture()
|
|
def client(_suppress_background_services):
|
|
"""HTTPX test client against the FastAPI app (no real network)."""
|
|
from httpx import ASGITransport, AsyncClient
|
|
from main import app
|
|
import asyncio
|
|
|
|
transport = ASGITransport(app=app)
|
|
|
|
async def _make_client():
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
return ac
|
|
|
|
# Return a sync-usable wrapper
|
|
class SyncClient:
|
|
def __init__(self):
|
|
self._loop = asyncio.new_event_loop()
|
|
self._transport = ASGITransport(app=app)
|
|
|
|
def get(self, url, **kw):
|
|
return self._loop.run_until_complete(self._get(url, **kw))
|
|
|
|
async def _get(self, url, **kw):
|
|
async with AsyncClient(transport=self._transport, base_url="http://test") as ac:
|
|
return await ac.get(url, **kw)
|
|
|
|
def put(self, url, **kw):
|
|
return self._loop.run_until_complete(self._put(url, **kw))
|
|
|
|
async def _put(self, url, **kw):
|
|
async with AsyncClient(transport=self._transport, base_url="http://test") as ac:
|
|
return await ac.put(url, **kw)
|
|
|
|
return SyncClient()
|