Skip to content

Commit

Permalink
add tags filtering capability to 'list' for objects
Browse files Browse the repository at this point in the history
Signed-off-by: Tommy Hughes <[email protected]>
  • Loading branch information
tchughesiv committed Jun 17, 2024
1 parent 849f590 commit 91decd6
Show file tree
Hide file tree
Showing 19 changed files with 555 additions and 157 deletions.
6 changes: 6 additions & 0 deletions protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ message GetEntityRequest {
message ListEntitiesRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListEntitiesResponse {
Expand Down Expand Up @@ -146,6 +147,7 @@ message GetDataSourceRequest {
message ListDataSourcesRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListDataSourcesResponse {
Expand Down Expand Up @@ -179,6 +181,7 @@ message GetFeatureViewRequest {
message ListFeatureViewsRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListFeatureViewsResponse {
Expand All @@ -202,6 +205,7 @@ message GetStreamFeatureViewRequest {
message ListStreamFeatureViewsRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListStreamFeatureViewsResponse {
Expand All @@ -219,6 +223,7 @@ message GetOnDemandFeatureViewRequest {
message ListOnDemandFeatureViewsRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListOnDemandFeatureViewsResponse {
Expand All @@ -242,6 +247,7 @@ message GetFeatureServiceRequest {
message ListFeatureServicesRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListFeatureServicesResponse {
Expand Down
37 changes: 26 additions & 11 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
from feast.utils import maybe_local_tz

_logger = logging.getLogger(__name__)
tagsOption = click.option(
"--tags",
help="Filter by tags (e.g. 'key:value, key:value, ...')",
multiple=True,
)


class NoOptionDefaultFormat(click.Command):
Expand Down Expand Up @@ -226,14 +231,16 @@ def data_source_describe(ctx: click.Context, name: str):


@data_sources_cmd.command(name="list")
@tagsOption
@click.pass_context
def data_source_list(ctx: click.Context):
def data_source_list(ctx: click.Context, tags: Optional[str]):
"""
List all data sources
"""
store = create_feature_store(ctx)
table = []
for datasource in store.list_data_sources():
tags_filter = utils.tags_str_to_dict(tags)
for datasource in store.list_data_sources(tags=tags_filter):
table.append([datasource.name, datasource.__class__])

from tabulate import tabulate
Expand Down Expand Up @@ -272,14 +279,16 @@ def entity_describe(ctx: click.Context, name: str):


@entities_cmd.command(name="list")
@tagsOption
@click.pass_context
def entity_list(ctx: click.Context):
def entity_list(ctx: click.Context, tags: Optional[str]):
"""
List all entities
"""
store = create_feature_store(ctx)
table = []
for entity in store.list_entities():
tags_filter = utils.tags_str_to_dict(tags)
for entity in store.list_entities(tags=tags_filter):
table.append([entity.name, entity.description, entity.value_type])

from tabulate import tabulate
Expand Down Expand Up @@ -320,14 +329,16 @@ def feature_service_describe(ctx: click.Context, name: str):


@feature_services_cmd.command(name="list")
@tagsOption
@click.pass_context
def feature_service_list(ctx: click.Context):
def feature_service_list(ctx: click.Context, tags: Optional[str]):
"""
List all feature services
"""
store = create_feature_store(ctx)
feature_services = []
for feature_service in store.list_feature_services():
tags_filter = utils.tags_str_to_dict(tags)
for feature_service in store.list_feature_services(tags=tags_filter):
feature_names = []
for projection in feature_service.feature_view_projections:
feature_names.extend(
Expand Down Expand Up @@ -371,16 +382,18 @@ def feature_view_describe(ctx: click.Context, name: str):


@feature_views_cmd.command(name="list")
@tagsOption
@click.pass_context
def feature_view_list(ctx: click.Context):
def feature_view_list(ctx: click.Context, tags: Optional[str]):
"""
List all feature views
"""
store = create_feature_store(ctx)
table = []
tags_filter = utils.tags_str_to_dict(tags)
for feature_view in [
*store.list_feature_views(),
*store.list_on_demand_feature_views(),
*store.list_batch_feature_views(tags=tags_filter),
*store.list_on_demand_feature_views(tags=tags_filter),
]:
entities = set()
if isinstance(feature_view, FeatureView):
Expand Down Expand Up @@ -434,14 +447,16 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):


@on_demand_feature_views_cmd.command(name="list")
@tagsOption
@click.pass_context
def on_demand_feature_view_list(ctx: click.Context):
def on_demand_feature_view_list(ctx: click.Context, tags: Optional[str]):
"""
[Experimental] List all on demand feature views
"""
store = create_feature_store(ctx)
table = []
for on_demand_feature_view in store.list_on_demand_feature_views():
tags_filter = utils.tags_str_to_dict(tags)
for on_demand_feature_view in store.list_on_demand_feature_views(tags=tags_filter):
table.append([on_demand_feature_view.name])

from tabulate import tabulate
Expand Down
86 changes: 64 additions & 22 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,41 +215,52 @@ def refresh_registry(self):

self._registry = registry

def list_entities(self, allow_cache: bool = False) -> List[Entity]:
def list_entities(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[Entity]:
"""
Retrieves the list of entities from the registry.
Args:
allow_cache: Whether to allow returning entities from a cached registry.
tags: Filter by tags.
Returns:
A list of entities.
"""
return self._list_entities(allow_cache)
return self._list_entities(allow_cache, tags=tags)

def _list_entities(
self, allow_cache: bool = False, hide_dummy_entity: bool = True
self,
allow_cache: bool = False,
hide_dummy_entity: bool = True,
tags: Optional[dict[str, str]] = None,
) -> List[Entity]:
all_entities = self._registry.list_entities(
self.project, allow_cache=allow_cache
self.project, allow_cache=allow_cache, tags=tags
)
return [
entity
for entity in all_entities
if entity.name != DUMMY_ENTITY_NAME or not hide_dummy_entity
]

def list_feature_services(self) -> List[FeatureService]:
def list_feature_services(
self, tags: Optional[dict[str, str]] = None
) -> List[FeatureService]:
"""
Retrieves the list of feature services from the registry.
Args:
tags: Filter by tags.
Returns:
A list of feature services.
"""
return self._registry.list_feature_services(self.project)
return self._registry.list_feature_services(self.project, tags=tags)

def list_all_feature_views(
self, allow_cache: bool = False
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]:
"""
Retrieves the list of feature views from the registry.
Expand All @@ -260,14 +271,17 @@ def list_all_feature_views(
Returns:
A list of feature views.
"""
return self._list_all_feature_views(allow_cache)
return self._list_all_feature_views(allow_cache, tags=tags)

def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]:
def list_feature_views(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[FeatureView]:
"""
Retrieves the list of feature views from the registry.
Args:
allow_cache: Whether to allow returning entities from a cached registry.
tags: Filter by tags.
Returns:
A list of feature views.
Expand All @@ -276,31 +290,48 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]:
"list_feature_views will make breaking changes. Please use list_batch_feature_views instead. "
"list_feature_views will behave like list_all_feature_views in the future."
)
return self._list_feature_views(allow_cache)
return self._list_feature_views(allow_cache=allow_cache, tags=tags)

def list_batch_feature_views(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[FeatureView]:
"""
Retrieves the list of feature views from the registry.
Args:
allow_cache: Whether to allow returning entities from a cached registry.
tags: Filter by tags.
Returns:
A list of feature views.
"""
return self._list_batch_feature_views(allow_cache=allow_cache, tags=tags)

def _list_all_feature_views(
self,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]:
all_feature_views = (
self._list_feature_views(allow_cache)
+ self._list_stream_feature_views(allow_cache)
+ self.list_on_demand_feature_views(allow_cache)
self._list_feature_views(allow_cache, tags=tags)
+ self._list_stream_feature_views(allow_cache, tags=tags)
+ self.list_on_demand_feature_views(allow_cache, tags=tags)
)
return all_feature_views

def _list_feature_views(
self,
allow_cache: bool = False,
hide_dummy_entity: bool = True,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureView]:
logging.warning(
"_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. "
"_list_feature_views will behave like _list_all_feature_views in the future."
)
feature_views = []
for fv in self._registry.list_feature_views(
self.project, allow_cache=allow_cache
self.project, allow_cache=allow_cache, tags=tags
):
if (
hide_dummy_entity
Expand All @@ -316,10 +347,11 @@ def _list_batch_feature_views(
self,
allow_cache: bool = False,
hide_dummy_entity: bool = True,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureView]:
feature_views = []
for fv in self._registry.list_feature_views(
self.project, allow_cache=allow_cache
self.project, allow_cache=allow_cache, tags=tags
):
if (
hide_dummy_entity
Expand All @@ -335,10 +367,11 @@ def _list_stream_feature_views(
self,
allow_cache: bool = False,
hide_dummy_entity: bool = True,
tags: Optional[dict[str, str]] = None,
) -> List[StreamFeatureView]:
stream_feature_views = []
for sfv in self._registry.list_stream_feature_views(
self.project, allow_cache=allow_cache
self.project, allow_cache=allow_cache, tags=tags
):
if hide_dummy_entity and sfv.entities[0] == DUMMY_ENTITY_NAME:
sfv.entities = []
Expand All @@ -347,40 +380,49 @@ def _list_stream_feature_views(
return stream_feature_views

def list_on_demand_feature_views(
self, allow_cache: bool = False
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[OnDemandFeatureView]:
"""
Retrieves the list of on demand feature views from the registry.
Args:
allow_cache: Whether to allow returning entities from a cached registry.
tags: Filter by tags.
Returns:
A list of on demand feature views.
"""
return self._registry.list_on_demand_feature_views(
self.project, allow_cache=allow_cache
self.project, allow_cache=allow_cache, tags=tags
)

def list_stream_feature_views(
self, allow_cache: bool = False
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[StreamFeatureView]:
"""
Retrieves the list of stream feature views from the registry.
Returns:
A list of stream feature views.
"""
return self._list_stream_feature_views(allow_cache)
return self._list_stream_feature_views(allow_cache, tags=tags)

def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
def list_data_sources(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[DataSource]:
"""
Retrieves the list of data sources from the registry.
Args:
allow_cache: Whether to allow returning data sources from a cached registry.
tags: Filter by tags.
Returns:
A list of data sources.
"""
return self._registry.list_data_sources(self.project, allow_cache=allow_cache)
return self._registry.list_data_sources(
self.project, allow_cache=allow_cache, tags=tags
)

def get_entity(self, name: str, allow_registry_cache: bool = False) -> Entity:
"""
Expand Down
Loading

0 comments on commit 91decd6

Please sign in to comment.