diff --git a/facefusion/apis/asset_store.py b/facefusion/apis/asset_store.py new file mode 100644 index 00000000..36fd3955 --- /dev/null +++ b/facefusion/apis/asset_store.py @@ -0,0 +1,23 @@ +import uuid +from typing import Dict, Optional + +ASSET_STORE: Dict[str, Dict[str, str]] = {} + + +def get_asset(asset_id : str) -> Optional[Dict[str, str]]: + return ASSET_STORE.get(asset_id) + + +def register_asset(path : str) -> str: + asset_id = str(uuid.uuid4()) + + ASSET_STORE[asset_id] =\ + { + 'id': asset_id, + 'path': path + } + return asset_id + + +def clear() -> None: + ASSET_STORE.clear() diff --git a/facefusion/apis/endpoints/state.py b/facefusion/apis/endpoints/state.py index c1090581..a108f66b 100644 --- a/facefusion/apis/endpoints/state.py +++ b/facefusion/apis/endpoints/state.py @@ -1,8 +1,9 @@ from starlette.requests import Request from starlette.responses import JSONResponse -from starlette.status import HTTP_200_OK +from starlette.status import HTTP_200_OK, HTTP_404_NOT_FOUND -from facefusion import args_store, state_manager +from facefusion import args_store, state_manager, translator +from facefusion.apis import asset_store async def get_state(request : Request) -> JSONResponse: @@ -11,6 +12,17 @@ async def get_state(request : Request) -> JSONResponse: async def set_state(request : Request) -> JSONResponse: + action = request.query_params.get('action') + + if action == 'select': + asset_type = request.query_params.get('type') + + if asset_type == 'source': + return await select_source(request) + + if asset_type == 'target': + return await select_target(request) + body = await request.json() api_args = args_store.get_api_args() @@ -18,5 +30,48 @@ async def set_state(request : Request) -> JSONResponse: if key in api_args: state_manager.set_item(key, value) - __api_args__ = args_store.filter_api_args(state_manager.get_state()) #type:ignore[arg-type] - return JSONResponse(state_manager.collect_state(__api_args__), status_code = HTTP_200_OK) + api_args = args_store.filter_api_args(state_manager.get_state()) #type:ignore[arg-type,assignment] + return JSONResponse(state_manager.collect_state(api_args), status_code = HTTP_200_OK) #type:ignore[arg-type] + + +async def select_source(request : Request) -> JSONResponse: + body = await request.json() + asset_ids = body.get('asset_ids') + + if isinstance(asset_ids, list): + source_paths = [] + + for asset_id in asset_ids: + asset = asset_store.get_asset(asset_id) + + if asset: + source_paths.append(asset.get('path')) + + state_manager.set_item('source_paths', source_paths) + + api_args = args_store.filter_api_args(state_manager.get_state()) #type:ignore[arg-type] + return JSONResponse(state_manager.collect_state(api_args), status_code = HTTP_200_OK) + + return JSONResponse( + { + 'message': translator.get('source_asset_not_found', 'facefusion.apis') + }, status_code = HTTP_404_NOT_FOUND) + + +async def select_target(request : Request) -> JSONResponse: + body = await request.json() + asset_id = body.get('asset_id') + + if isinstance(asset_id, str): + asset = asset_store.get_asset(asset_id) + + if asset: + state_manager.set_item('target_path', asset.get('path')) + + api_args = args_store.filter_api_args(state_manager.get_state()) #type:ignore[arg-type] + return JSONResponse(state_manager.collect_state(api_args), status_code = HTTP_200_OK) + + return JSONResponse( + { + 'message': translator.get('target_asset_not_found', 'facefusion.apis') + }, status_code = HTTP_404_NOT_FOUND) diff --git a/facefusion/apis/locales.py b/facefusion/apis/locales.py index 884e99b5..1f1c6ceb 100644 --- a/facefusion/apis/locales.py +++ b/facefusion/apis/locales.py @@ -7,6 +7,8 @@ LOCALES : Locales =\ 'ok': 'ok', 'something_went_wrong': 'something went wrong', 'invalid_access_token': 'invalid access token', - 'invalid_refresh_token': 'invalid refresh token' + 'invalid_refresh_token': 'invalid refresh token', + 'source_asset_not_found': 'source asset not found', + 'target_asset_not_found': 'target asset not found' } } diff --git a/tests/test_api_state.py b/tests/test_api_state.py index fadb1a99..360f9336 100644 --- a/tests/test_api_state.py +++ b/tests/test_api_state.py @@ -5,10 +5,12 @@ from starlette.testclient import TestClient from facefusion import args_store, metadata, session_manager, state_manager from facefusion.apis.core import create_api +from facefusion.apis import asset_store @pytest.fixture(scope = 'module') def test_client() -> Iterator[TestClient]: + args_store.register_args([ 'source_paths', 'target_path' ], scopes = [ 'api' ]) args_store.register_args([ 'execution_providers' ], scopes = [ 'api' ]) state_manager.init_item('execution_providers', [ 'cpu' ]) @@ -19,6 +21,7 @@ def test_client() -> Iterator[TestClient]: @pytest.fixture(scope = 'function', autouse = True) def before_each() -> None: session_manager.SESSIONS.clear() + asset_store.clear() def test_get_state(test_client : TestClient) -> None: @@ -79,3 +82,82 @@ def test_set_state(test_client : TestClient) -> None: assert set_state_body.get('invalid') is None assert set_state_response.status_code == 200 + + +def test_select_source_assets(test_client : TestClient) -> None: + asset_id_1 = asset_store.register_asset('/path/to/source1.jpg') + asset_id_2 = asset_store.register_asset('/path/to/source2.jpg') + + select_response = test_client.put('/state?action=select&type=source', json = + { + 'asset_ids': [ asset_id_1, asset_id_2 ] + }) + + assert select_response.status_code == 401 + + create_session_response = test_client.post('/session', json = + { + 'client_version': metadata.get('version') + }) + create_session_body = create_session_response.json() + + select_response = test_client.put('/state?action=select&type=source', json = + { + 'asset_ids': 'invalid_string_not_list' + }, headers = + { + 'Authorization': 'Bearer ' + create_session_body.get('access_token') + }) + + assert select_response.status_code == 404 + + select_response = test_client.put('/state?action=select&type=source', json = + { + 'asset_ids': [ asset_id_1, asset_id_2 ] + }, headers = + { + 'Authorization': 'Bearer ' + create_session_body.get('access_token') + }) + select_body = select_response.json() + + assert select_body.get('source_paths') == [ '/path/to/source1.jpg', '/path/to/source2.jpg' ] + assert select_response.status_code == 200 + + +def test_select_target_assets(test_client : TestClient) -> None: + asset_id_1 = asset_store.register_asset('/path/to/target1.jpg') + + select_response = test_client.put('/state?action=select&type=target', json = + { + 'asset_id': asset_id_1 + }) + + assert select_response.status_code == 401 + + create_session_response = test_client.post('/session', json = + { + 'client_version': metadata.get('version') + }) + create_session_body = create_session_response.json() + + select_response = test_client.put('/state?action=select&type=target', json = + { + 'asset_id': 'invalid_asset_id' + }, headers = + { + 'Authorization': 'Bearer ' + create_session_body.get('access_token') + }) + + assert select_response.status_code == 404 + + select_response = test_client.put('/state?action=select&type=target', json = + { + 'asset_id': asset_id_1 + }, headers = + { + 'Authorization': 'Bearer ' + create_session_body.get('access_token') + }) + select_body = select_response.json() + + assert select_body.get('target_path') == '/path/to/target1.jpg' + assert select_response.status_code == 200