diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 572f7cc4b2f..efe8cb4e021 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -86,8 +86,16 @@ def online_read( for index, feature_name in enumerate( response_json["metadata"]["feature_names"] ): - if requested_features is not None and feature_name in requested_features: - if response_json["results"][index]["statuses"][feature_value_index] == "PRESENT": + if ( + requested_features is not None + and feature_name in requested_features + ): + if ( + response_json["results"][index]["statuses"][ + feature_value_index + ] + == "PRESENT" + ): message = python_values_to_proto_values( [ response_json["results"][index]["values"][ diff --git a/sdk/python/tests/integration/online_store/test_remote_online_store.py b/sdk/python/tests/integration/online_store/test_remote_online_store.py index 96ddec4c166..ec01a69321a 100644 --- a/sdk/python/tests/integration/online_store/test_remote_online_store.py +++ b/sdk/python/tests/integration/online_store/test_remote_online_store.py @@ -1,14 +1,13 @@ import os +import subprocess import tempfile from datetime import datetime -from multiprocessing import Process from textwrap import dedent import pytest from feast.feature_store import FeatureStore from feast.wait import wait_retry_backoff -from tests.conftest import start_test_local_server from tests.utils.cli_repo_creator import CliRunner from tests.utils.http_server import check_port_open, free_port @@ -109,7 +108,10 @@ def _assert_existing_feature_views_entity( def _assert_client_server_online_stores_are_matching( - client_store: FeatureStore, server_store: FeatureStore, features, entity_rows + client_store: FeatureStore, + server_store: FeatureStore, + features: list[str], + entity_rows: list, ): online_features_from_client = client_store.get_online_features( features=features, entity_rows=entity_rows @@ -138,7 +140,7 @@ def _create_server_store_spin_feature_server(temp_dir): return store, server_url, os.path.join(store.repo_path, "data", "registry.db") -def _default_store(temp_dir, project_name): +def _default_store(temp_dir, project_name) -> FeatureStore: runner = CliRunner() result = runner.run(["init", project_name], cwd=temp_dir) repo_path = os.path.join(temp_dir, project_name, "feature_repo") @@ -156,7 +158,7 @@ def _default_store(temp_dir, project_name): def _create_remote_client_feature_store( temp_dir, server_registry_path: str, feature_server_url: str -): +) -> FeatureStore: project_name = "REMOTE_ONLINE_CLIENT_PROJECT" runner = CliRunner() result = runner.run(["init", project_name], cwd=temp_dir) @@ -195,21 +197,30 @@ def _overwrite_remote_client_feature_store_yaml( def _start_feature_server(repo_path: str, server_port: int): - feast_server_process = Process( - target=start_test_local_server, args=(repo_path, server_port) + host = "0.0.0.0" + cmd = [ + "feast", + "-c" + repo_path, + "serve", + "--host", + host, + "--port", + str(server_port), + ] + feast_server_process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) - feast_server_process.start() - _time_out_sec: int = 45 + _time_out_sec: int = 60 # Wait for server to start wait_retry_backoff( - lambda: (None, check_port_open("localhost", server_port)), + lambda: (None, check_port_open(host, server_port)), timeout_secs=_time_out_sec, timeout_msg=f"Unable to start the feast server in {_time_out_sec} seconds for remote online store type, port={server_port}", ) yield f"http://localhost:{server_port}" - if feast_server_process.is_alive(): + if feast_server_process is not None: feast_server_process.kill() # wait server to free the port @@ -218,5 +229,6 @@ def _start_feature_server(repo_path: str, server_port: int): None, not check_port_open("localhost", server_port), ), - timeout_secs=30, + timeout_msg=f"Unable to stop the feast server in {_time_out_sec} seconds for remote online store type, port={server_port}", + timeout_secs=_time_out_sec, )