mirror of
https://github.com/invariantlabs-ai/invariant-gateway.git
synced 2026-05-15 20:48:17 +02:00
73 lines
2.1 KiB
Python
73 lines
2.1 KiB
Python
"""Task utilities for running async functions"""
|
|
|
|
import asyncio
|
|
import concurrent.futures
|
|
import threading
|
|
|
|
from contextlib import redirect_stdout
|
|
from typing import Any
|
|
|
|
from gateway.mcp.log import MCP_LOG_FILE, mcp_log
|
|
|
|
|
|
def run_task_in_background(async_func, *args, **kwargs):
|
|
"""
|
|
Runs an async function in a background thread with its own event loop.
|
|
This function does NOT block the calling thread as it immediately returns
|
|
after starting the background thread.
|
|
|
|
Args:
|
|
async_func: The async function to run
|
|
*args: Positional arguments to pass to the async function
|
|
**kwargs: Keyword arguments to pass to the async function
|
|
"""
|
|
|
|
def thread_target():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(async_func(*args, **kwargs))
|
|
except Exception as e:
|
|
mcp_log(
|
|
f"[ERROR] Error in async thread while running run_task_in_background: {e}"
|
|
)
|
|
finally:
|
|
loop.close()
|
|
|
|
# Create and start a daemon thread
|
|
thread = threading.Thread(target=thread_target, daemon=True)
|
|
thread.start()
|
|
|
|
|
|
def run_task_sync(async_func, *args, **kwargs) -> Any:
|
|
"""
|
|
Runs an asynchronous function synchronously in a separate
|
|
thread with its own event loop. This function blocks the calling
|
|
thread until completion or timeout (10 seconds).
|
|
|
|
Args:
|
|
async_func: The async function to run
|
|
*args: Positional arguments to pass to the async function
|
|
**kwargs: Keyword arguments to pass to the async function
|
|
|
|
Returns:
|
|
Any: The return value of the async function
|
|
"""
|
|
|
|
def run_in_new_loop():
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
return loop.run_until_complete(
|
|
async_func(
|
|
*args,
|
|
**kwargs,
|
|
)
|
|
)
|
|
finally:
|
|
loop.close()
|
|
|
|
with redirect_stdout(MCP_LOG_FILE):
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future = executor.submit(run_in_new_loop)
|
|
return future.result(timeout=10.0)
|