From c05b0dbe4771fa8182f35d55049e1502b929773b Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Mon, 3 Jun 2024 17:20:34 -0400 Subject: [PATCH 01/12] Add sql registry async refresh Signed-off-by: Stanley Opara --- .../feast/infra/registry/caching_registry.py | 10 +- sdk/python/feast/infra/registry/sql.py | 4 +- sdk/python/feast/repo_config.py | 3 + .../registration/test_universal_registry.py | 111 ++++++++++++++++-- 4 files changed, 116 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 0f66012808..33306a4266 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,4 +1,5 @@ import logging +import threading from abc import abstractmethod from datetime import datetime, timedelta from threading import Lock @@ -21,9 +22,7 @@ class CachingRegistry(BaseRegistry): def __init__( - self, - project: str, - cache_ttl_seconds: int, + self, project: str, cache_ttl_seconds: int, allow_async_cache: bool = False ): self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) @@ -32,6 +31,9 @@ def __init__( self.cached_registry_proto_ttl = timedelta( seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0 ) + self.allow_async_cache = allow_async_cache + if allow_async_cache: + threading.Timer(cache_ttl_seconds, self.refresh).start() @abstractmethod def _get_data_source(self, name: str, project: str) -> DataSource: @@ -289,6 +291,8 @@ def refresh(self, project: Optional[str] = None): self.cached_registry_proto_created = datetime.utcnow() def _refresh_cached_registry_if_necessary(self): + if self.allow_async_cache: + return with self._refresh_lock: expired = ( self.cached_registry_proto is None diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 26f9da19e1..fb47175836 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -191,7 +191,9 @@ def __init__( ) metadata.create_all(self.engine) super().__init__( - project=project, cache_ttl_seconds=registry_config.cache_ttl_seconds + project=project, + cache_ttl_seconds=registry_config.cache_ttl_seconds, + allow_async_cache=registry_config.allow_async_cache, ) def teardown(self): diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 6ef81794bf..93b0b008d7 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -9,6 +9,7 @@ BaseModel, ConfigDict, Field, + StrictBool, StrictInt, StrictStr, ValidationError, @@ -130,6 +131,8 @@ class RegistryConfig(FeastBaseModel): sqlalchemy_config_kwargs: Dict[str, Any] = {} """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """ + allow_async_cache: StrictBool = False + class RepoConfig(FeastBaseModel): """Repo config. Typically loaded from `feature_store.yaml`""" diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 65d07aca45..d4e8396785 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -135,6 +135,35 @@ def pg_registry(): container.start() + registry_config = _given_registry_config_for_pg_sql(container) + + yield SqlRegistry(registry_config, "project", None) + + container.stop() + + +@pytest.fixture(scope="session") +def pg_registry_async(): + container = ( + DockerContainer("postgres:latest") + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", POSTGRES_USER) + .with_env("POSTGRES_PASSWORD", POSTGRES_PASSWORD) + .with_env("POSTGRES_DB", POSTGRES_DB) + ) + + container.start() + + registry_config = _given_registry_config_for_pg_sql(container, 2, True) + + yield SqlRegistry(registry_config, "project", None) + + container.stop() + + +def _given_registry_config_for_pg_sql( + container, cache_ttl_seconds=2, allow_async_cache=False +): log_string_to_wait_for = "database system is ready to accept connections" waited = wait_for_logs( container=container, @@ -146,23 +175,42 @@ def pg_registry(): container_port = container.get_exposed_port(5432) container_host = container.get_container_host_ip() - registry_config = RegistryConfig( + return RegistryConfig( registry_type="sql", + cache_ttl_seconds=cache_ttl_seconds, + allow_async_cache=allow_async_cache, path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}", sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True}, ) + +@pytest.fixture(scope="session") +def mysql_registry(): + container = MySqlContainer("mysql:latest") + container.start() + + registry_config = _given_registry_config_for_mysql(container) + yield SqlRegistry(registry_config, "project", None) container.stop() @pytest.fixture(scope="session") -def mysql_registry(): +def mysql_registry_async(): container = MySqlContainer("mysql:latest") container.start() - # testing for the database to exist and ready to connect and start testing. + registry_config = _given_registry_config_for_mysql(container, 2, True) + + yield SqlRegistry(registry_config, "project", None) + + container.stop() + + +def _given_registry_config_for_mysql( + container, cache_ttl_seconds=2, allow_async_cache=False +): import sqlalchemy engine = sqlalchemy.create_engine( @@ -170,16 +218,14 @@ def mysql_registry(): ) engine.connect() - registry_config = RegistryConfig( + return RegistryConfig( registry_type="sql", path=container.get_connection_url(), + cache_ttl_seconds=cache_ttl_seconds, + allow_async_cache=allow_async_cache, sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True}, ) - yield SqlRegistry(registry_config, "project", None) - - container.stop() - @pytest.fixture(scope="session") def sqlite_registry(): @@ -265,6 +311,17 @@ def mock_remote_registry(): lazy_fixture("sqlite_registry"), ] +async_sql_fixtures = [ + pytest.param( + lazy_fixture("pg_registry_async"), + marks=pytest.mark.xdist_group(name="pg_registry_async"), + ), + pytest.param( + lazy_fixture("mysql_registry_async"), + marks=pytest.mark.xdist_group(name="mysql_registry_async"), + ), +] + @pytest.mark.integration @pytest.mark.parametrize("test_registry", all_fixtures) @@ -773,6 +830,44 @@ def test_registry_cache(test_registry): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + async_sql_fixtures, +) +def test_registry_cache_async(test_registry): + # Create Feature Views + batch_source = FileSource( + name="test_source", + file_format=ParquetFormat(), + path="file://feast/*", + timestamp_field="ts_col", + created_timestamp_column="timestamp", + ) + + project = "project" + + # Register data source + test_registry.apply_data_source(batch_source, project) + registry_data_sources_cached = test_registry.list_data_sources( + project, allow_cache=True + ) + # async ttl yet to expire, so cache miss + assert len(registry_data_sources_cached) == 0 + + # Wait for cache to be refreshed + time.sleep(4) + # Now objects exist + registry_data_sources_cached = test_registry.list_data_sources( + project, allow_cache=True + ) + assert len(registry_data_sources_cached) == 1 + registry_data_source = registry_data_sources_cached[0] + assert registry_data_source == batch_source + + test_registry.teardown() + + @pytest.mark.integration @pytest.mark.parametrize( "test_registry", From 2074959c65a235ea8686014b1c5dd1883f2678a3 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 5 Jun 2024 17:00:53 -0400 Subject: [PATCH 02/12] make refresh code a daemon thread Signed-off-by: Stanley Opara --- sdk/python/feast/infra/registry/caching_registry.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 33306a4266..3fb9f45810 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,3 +1,4 @@ +import atexit import logging import threading from abc import abstractmethod @@ -33,7 +34,7 @@ def __init__( ) self.allow_async_cache = allow_async_cache if allow_async_cache: - threading.Timer(cache_ttl_seconds, self.refresh).start() + self._start_async_refresh(cache_ttl_seconds) @abstractmethod def _get_data_source(self, name: str, project: str) -> DataSource: @@ -312,3 +313,12 @@ def _refresh_cached_registry_if_necessary(self): if expired: logger.info("Registry cache expired, so refreshing") self.refresh() + + def _start_async_refresh(self, cache_ttl_seconds): + self.registry_refresh_thread = threading.Timer(cache_ttl_seconds, self.refresh) + self.registry_refresh_thread.setDaemon(True) + self.registry_refresh_thread.start() + atexit.register(self._exit_handler) + + def _exit_handler(self): + self.registry_refresh_thread.cancel() From 956b7596a2c4e21d663b15dc253844fb0f21cea1 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Tue, 11 Jun 2024 16:07:06 -0400 Subject: [PATCH 03/12] Change RegistryConfig to cacheMode Signed-off-by: Stanley Opara --- .../feast/infra/registry/caching_registry.py | 47 +++++++++---------- sdk/python/feast/infra/registry/sql.py | 2 +- sdk/python/feast/repo_config.py | 3 +- .../registration/test_universal_registry.py | 16 +++---- 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 3fb9f45810..543c1897af 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -22,9 +22,7 @@ class CachingRegistry(BaseRegistry): - def __init__( - self, project: str, cache_ttl_seconds: int, allow_async_cache: bool = False - ): + def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str): self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) self.cached_registry_proto_created = datetime.utcnow() @@ -32,9 +30,9 @@ def __init__( self.cached_registry_proto_ttl = timedelta( seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0 ) - self.allow_async_cache = allow_async_cache - if allow_async_cache: - self._start_async_refresh(cache_ttl_seconds) + self.cache_mode = cache_mode + if cache_mode == "thread": + self._start_thread_async_refresh(cache_ttl_seconds) @abstractmethod def _get_data_source(self, name: str, project: str) -> DataSource: @@ -292,29 +290,28 @@ def refresh(self, project: Optional[str] = None): self.cached_registry_proto_created = datetime.utcnow() def _refresh_cached_registry_if_necessary(self): - if self.allow_async_cache: - return - with self._refresh_lock: - expired = ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None - ) or ( - self.cached_registry_proto_ttl.total_seconds() - > 0 # 0 ttl means infinity - and ( - datetime.utcnow() - > ( - self.cached_registry_proto_created - + self.cached_registry_proto_ttl + if self.cache_mode == "sync": + with self._refresh_lock: + expired = ( + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() + > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > ( + self.cached_registry_proto_created + + self.cached_registry_proto_ttl + ) ) ) - ) - if expired: - logger.info("Registry cache expired, so refreshing") - self.refresh() + if expired: + logger.info("Registry cache expired, so refreshing") + self.refresh() - def _start_async_refresh(self, cache_ttl_seconds): + def _start_thread_async_refresh(self, cache_ttl_seconds): self.registry_refresh_thread = threading.Timer(cache_ttl_seconds, self.refresh) self.registry_refresh_thread.setDaemon(True) self.registry_refresh_thread.start() diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index fb47175836..2b9676a577 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -193,7 +193,7 @@ def __init__( super().__init__( project=project, cache_ttl_seconds=registry_config.cache_ttl_seconds, - allow_async_cache=registry_config.allow_async_cache, + cache_mode=registry_config.cache_mode, ) def teardown(self): diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 93b0b008d7..fd38e9ff91 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -9,7 +9,6 @@ BaseModel, ConfigDict, Field, - StrictBool, StrictInt, StrictStr, ValidationError, @@ -131,7 +130,7 @@ class RegistryConfig(FeastBaseModel): sqlalchemy_config_kwargs: Dict[str, Any] = {} """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """ - allow_async_cache: StrictBool = False + cache_mode: StrictStr = "sync" class RepoConfig(FeastBaseModel): diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index d4e8396785..4c35650061 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -154,7 +154,7 @@ def pg_registry_async(): container.start() - registry_config = _given_registry_config_for_pg_sql(container, 2, True) + registry_config = _given_registry_config_for_pg_sql(container, 2, "thread") yield SqlRegistry(registry_config, "project", None) @@ -162,7 +162,7 @@ def pg_registry_async(): def _given_registry_config_for_pg_sql( - container, cache_ttl_seconds=2, allow_async_cache=False + container, cache_ttl_seconds=2, cache_mode="sync" ): log_string_to_wait_for = "database system is ready to accept connections" waited = wait_for_logs( @@ -178,7 +178,7 @@ def _given_registry_config_for_pg_sql( return RegistryConfig( registry_type="sql", cache_ttl_seconds=cache_ttl_seconds, - allow_async_cache=allow_async_cache, + cache_mode=cache_mode, path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}", sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True}, ) @@ -201,16 +201,14 @@ def mysql_registry_async(): container = MySqlContainer("mysql:latest") container.start() - registry_config = _given_registry_config_for_mysql(container, 2, True) + registry_config = _given_registry_config_for_mysql(container, 2, "thread") yield SqlRegistry(registry_config, "project", None) container.stop() -def _given_registry_config_for_mysql( - container, cache_ttl_seconds=2, allow_async_cache=False -): +def _given_registry_config_for_mysql(container, cache_ttl_seconds=2, cache_mode="sync"): import sqlalchemy engine = sqlalchemy.create_engine( @@ -222,7 +220,7 @@ def _given_registry_config_for_mysql( registry_type="sql", path=container.get_connection_url(), cache_ttl_seconds=cache_ttl_seconds, - allow_async_cache=allow_async_cache, + cache_mode=cache_mode, sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True}, ) @@ -835,7 +833,7 @@ def test_registry_cache(test_registry): "test_registry", async_sql_fixtures, ) -def test_registry_cache_async(test_registry): +def test_registry_cache_thread_async(test_registry): # Create Feature Views batch_source = FileSource( name="test_source", From da82ba07d609f3c782e328b9f74c1ff818e8c907 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Tue, 11 Jun 2024 19:41:06 -0400 Subject: [PATCH 04/12] Only run async when ttl > 0 Signed-off-by: Stanley Opara --- sdk/python/feast/infra/registry/caching_registry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 543c1897af..c43d322307 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -312,6 +312,8 @@ def _refresh_cached_registry_if_necessary(self): self.refresh() def _start_thread_async_refresh(self, cache_ttl_seconds): + if cache_ttl_seconds <= 0: + return self.registry_refresh_thread = threading.Timer(cache_ttl_seconds, self.refresh) self.registry_refresh_thread.setDaemon(True) self.registry_refresh_thread.start() From d4f58b613d98639ec301c62c498cc65be952e67f Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 12 Jun 2024 11:52:45 -0400 Subject: [PATCH 05/12] make refresh async run in a loop Signed-off-by: Stanley Opara --- sdk/python/feast/infra/registry/caching_registry.py | 5 ++++- .../integration/registration/test_universal_registry.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index c43d322307..cd5d02be3a 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -314,7 +314,10 @@ def _refresh_cached_registry_if_necessary(self): def _start_thread_async_refresh(self, cache_ttl_seconds): if cache_ttl_seconds <= 0: return - self.registry_refresh_thread = threading.Timer(cache_ttl_seconds, self.refresh) + self.refresh() + self.registry_refresh_thread = threading.Timer( + cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds] + ) self.registry_refresh_thread.setDaemon(True) self.registry_refresh_thread.start() atexit.register(self._exit_handler) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 4c35650061..8c17573cf6 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -854,7 +854,7 @@ def test_registry_cache_thread_async(test_registry): assert len(registry_data_sources_cached) == 0 # Wait for cache to be refreshed - time.sleep(4) + time.sleep(8) # Now objects exist registry_data_sources_cached = test_registry.list_data_sources( project, allow_cache=True From 30928236e3ff97676d0ad957980edf409ddab23c Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 12 Jun 2024 12:07:34 -0400 Subject: [PATCH 06/12] make refresh async run in a loop Signed-off-by: Stanley Opara --- .../tests/integration/registration/test_universal_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 8c17573cf6..4c35650061 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -854,7 +854,7 @@ def test_registry_cache_thread_async(test_registry): assert len(registry_data_sources_cached) == 0 # Wait for cache to be refreshed - time.sleep(8) + time.sleep(4) # Now objects exist registry_data_sources_cached = test_registry.list_data_sources( project, allow_cache=True From 9a5cc160f80eb543fbd535beb8b439843566e61d Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 12 Jun 2024 15:49:47 -0400 Subject: [PATCH 07/12] Reorder async refresh call Signed-off-by: Stanley Opara --- sdk/python/feast/infra/registry/caching_registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index cd5d02be3a..ed5449b9f2 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -33,6 +33,7 @@ def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str): self.cache_mode = cache_mode if cache_mode == "thread": self._start_thread_async_refresh(cache_ttl_seconds) + atexit.register(self._exit_handler) @abstractmethod def _get_data_source(self, name: str, project: str) -> DataSource: @@ -312,15 +313,14 @@ def _refresh_cached_registry_if_necessary(self): self.refresh() def _start_thread_async_refresh(self, cache_ttl_seconds): + self.refresh() if cache_ttl_seconds <= 0: return - self.refresh() self.registry_refresh_thread = threading.Timer( cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds] ) self.registry_refresh_thread.setDaemon(True) self.registry_refresh_thread.start() - atexit.register(self._exit_handler) def _exit_handler(self): self.registry_refresh_thread.cancel() From 4593127ffcdbe60f8162f7ee6644cdb65c3779a9 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Mon, 17 Jun 2024 15:17:00 -0400 Subject: [PATCH 08/12] Add documentation Signed-off-by: Stanley Opara --- sdk/python/feast/repo_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fd38e9ff91..2c2070b7d8 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -131,6 +131,7 @@ class RegistryConfig(FeastBaseModel): """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """ cache_mode: StrictStr = "sync" + """ str: Cache mode type, Possible options are sync and thread(asynchronous caching using threading library)""" class RepoConfig(FeastBaseModel): From 87986969254430ea48376f37f5c74c74c6a3c866 Mon Sep 17 00:00:00 2001 From: stanconia <36575269+stanconia@users.noreply.github.com> Date: Tue, 2 Jul 2024 10:27:38 -0400 Subject: [PATCH 09/12] Update test_universal_registry.py Signed-off-by: Stanley Opara --- .../integration/registration/test_universal_registry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 57f5b20e7b..1fbcd7e297 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -125,7 +125,7 @@ def minio_registry() -> Registry: logger = logging.getLogger(__name__) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def pg_registry(): container = ( DockerContainer("postgres:latest") @@ -144,7 +144,7 @@ def pg_registry(): container.stop() -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def pg_registry_async(): container = ( DockerContainer("postgres:latest") @@ -188,7 +188,7 @@ def _given_registry_config_for_pg_sql( ) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def mysql_registry(): container = MySqlContainer("mysql:latest") container.start() @@ -200,7 +200,7 @@ def mysql_registry(): container.stop() -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def mysql_registry_async(): container = MySqlContainer("mysql:latest") container.start() From 00a150139f8f9dce1c498ea7a956481f67f78017 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 3 Jul 2024 09:54:00 -0400 Subject: [PATCH 10/12] Force rerun of tests Signed-off-by: Stanley Opara --- .../tests/integration/registration/test_universal_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 1fbcd7e297..e66ba36da2 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -1060,7 +1060,7 @@ def test_registry_cache(test_registry): async_sql_fixtures, ) def test_registry_cache_thread_async(test_registry): - # Create Feature Views + # Create Feature View batch_source = FileSource( name="test_source", file_format=ParquetFormat(), From 31953824dc16b6b07f3f8e155a374eddad2778a1 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 3 Jul 2024 13:35:48 -0400 Subject: [PATCH 11/12] Force rerun of tests Signed-off-by: Stanley Opara --- .../tests/integration/registration/test_universal_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index e66ba36da2..b0738c8419 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -1076,7 +1076,7 @@ def test_registry_cache_thread_async(test_registry): registry_data_sources_cached = test_registry.list_data_sources( project, allow_cache=True ) - # async ttl yet to expire, so cache miss + # async ttl yet to expire, so there will be a cache miss assert len(registry_data_sources_cached) == 0 # Wait for cache to be refreshed From b7df69233e62ecb2a6e854b51a0a3bae2771f308 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Fri, 5 Jul 2024 12:10:37 -0400 Subject: [PATCH 12/12] Format repo config file Signed-off-by: Stanley Opara --- sdk/python/feast/repo_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index ec31556adf..137023ef22 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -142,6 +142,7 @@ def validate_path(cls, path: str, values: ValidationInfo) -> str: return path.replace("postgresql://", "postgresql+psycopg://") return path + class RepoConfig(FeastBaseModel): """Repo config. Typically loaded from `feature_store.yaml`"""