diff --git a/python/hopsworks/client/__init__.py b/python/hopsworks/client/__init__.py index 1e4a7ea8f..004e49c8b 100644 --- a/python/hopsworks/client/__init__.py +++ b/python/hopsworks/client/__init__.py @@ -16,6 +16,7 @@ from hopsworks.client import external, hopsworks + _client = None _python_version = None diff --git a/python/hopsworks/client/base.py b/python/hopsworks/client/base.py index 1d343aaa6..852259639 100644 --- a/python/hopsworks/client/base.py +++ b/python/hopsworks/client/base.py @@ -15,13 +15,12 @@ # import os -import furl from abc import ABC, abstractmethod +import furl import requests import urllib3 - -from hopsworks.client import exceptions, auth +from hopsworks.client import auth, exceptions from hopsworks.decorators import connected diff --git a/python/hopsworks/client/hopsworks.py b/python/hopsworks/client/hopsworks.py index 884dc4000..514e3fe48 100644 --- a/python/hopsworks/client/hopsworks.py +++ b/python/hopsworks/client/hopsworks.py @@ -20,8 +20,8 @@ from pathlib import Path import requests +from hopsworks.client import auth, base -from hopsworks.client import base, auth try: import jks @@ -134,7 +134,7 @@ def _convert_jks_to_pem(self, jks_path, keystore_pw): ca_certs = "" # Convert CA Certificates into PEM format and append to string - for alias, c in ks.certs.items(): + for _alias, c in ks.certs.items(): ca_certs = ca_certs + self._bytes_to_pem_str(c.cert, "CERTIFICATE") return ca_certs diff --git a/python/hopsworks/connection.py b/python/hopsworks/connection.py index 1fe984030..61f2e3d6a 100644 --- a/python/hopsworks/connection.py +++ b/python/hopsworks/connection.py @@ -16,14 +16,14 @@ import os import re -import warnings import sys +import warnings -from requests.exceptions import ConnectionError - -from hopsworks.decorators import connected, not_connected from hopsworks import client, version from hopsworks.core import project_api, secret_api, variable_api +from hopsworks.decorators import connected, not_connected +from requests.exceptions import ConnectionError + HOPSWORKS_PORT_DEFAULT = 443 HOSTNAME_VERIFICATION_DEFAULT = True @@ -210,7 +210,8 @@ def _check_compatibility(self): warnings.warn( "The installed hopsworks client version {0} may not be compatible with the connected Hopsworks backend version {1}. \nTo ensure compatibility please install the latest bug fix release matching the minor version of your backend ({2}) by running 'pip install hopsworks=={2}.*'".format( client_version, backend_version, major_minor_backend - ) + ), + stacklevel=1, ) sys.stderr.flush() diff --git a/python/hopsworks/core/dataset_api.py b/python/hopsworks/core/dataset_api.py index 285083cac..a0e84d235 100644 --- a/python/hopsworks/core/dataset_api.py +++ b/python/hopsworks/core/dataset_api.py @@ -14,18 +14,17 @@ # limitations under the License. # +import copy +import logging import math import os -import time -from tqdm.auto import tqdm import shutil -import logging -import copy +import time +from concurrent.futures import ThreadPoolExecutor, wait from hopsworks import client -from hopsworks.client.exceptions import RestAPIError -from hopsworks.client.exceptions import DatasetException -from concurrent.futures import ThreadPoolExecutor, wait +from hopsworks.client.exceptions import DatasetException, RestAPIError +from tqdm.auto import tqdm class Chunk: diff --git a/python/hopsworks/core/environment_api.py b/python/hopsworks/core/environment_api.py index 18c0c55d1..6a9ccf2ea 100644 --- a/python/hopsworks/core/environment_api.py +++ b/python/hopsworks/core/environment_api.py @@ -32,7 +32,13 @@ def __init__( self._environment_engine = environment_engine.EnvironmentEngine(project_id) - def create_environment(self, name: str, description: Optional[str] = None, base_environment_name: Optional[str] = "python-feature-pipeline", await_creation: Optional[bool] = True) -> environment.Environment: + def create_environment( + self, + name: str, + description: Optional[str] = None, + base_environment_name: Optional[str] = "python-feature-pipeline", + await_creation: Optional[bool] = True, + ) -> environment.Environment: """Create Python environment for the project ```python @@ -66,13 +72,14 @@ def create_environment(self, name: str, description: Optional[str] = None, base_ name, ] headers = {"content-type": "application/json"} - data = {"name": name, - "baseImage": { - "name": base_environment_name, - "description": description - }} + data = { + "name": name, + "baseImage": {"name": base_environment_name, "description": description}, + } env = environment.Environment.from_response_json( - _client._send_request("POST", path_params, headers=headers, data=json.dumps(data)), + _client._send_request( + "POST", path_params, headers=headers, data=json.dumps(data) + ), self._project_id, self._project_name, ) @@ -148,4 +155,4 @@ def _delete(self, name): name, ] headers = {"content-type": "application/json"} - _client._send_request("DELETE", path_params, headers=headers), + (_client._send_request("DELETE", path_params, headers=headers),) diff --git a/python/hopsworks/core/flink_cluster_api.py b/python/hopsworks/core/flink_cluster_api.py index 825f7d42d..53b13b3ed 100644 --- a/python/hopsworks/core/flink_cluster_api.py +++ b/python/hopsworks/core/flink_cluster_api.py @@ -14,9 +14,11 @@ # limitations under the License. # -import os import json -from hopsworks import client, flink_cluster, util, job +import os + +from hopsworks import client, flink_cluster, job, util +from hopsworks.client.exceptions import RestAPIError from hopsworks.core import job_api @@ -69,7 +71,9 @@ def setup_cluster(self, name: str, config=None): # If the job already exists, retrieve it _flink_cluster = self.get_cluster(name) if _flink_cluster._job.job_type != "FLINK": - raise "This is not a Flink cluster. Please use different name to create new Flink cluster" + raise RestAPIError( + "This is not a Flink cluster. Please use different name to create new Flink cluster" + ) return _flink_cluster else: # If the job doesn't exists, create a new job diff --git a/python/hopsworks/core/git_api.py b/python/hopsworks/core/git_api.py index 6eaa8afd9..581b18243 100644 --- a/python/hopsworks/core/git_api.py +++ b/python/hopsworks/core/git_api.py @@ -14,23 +14,23 @@ # limitations under the License. # +import json +import logging +from typing import List, Union + from hopsworks import ( client, - git_repo, - git_op_execution, - util, git_commit, git_file_status, + git_op_execution, + git_repo, + util, ) from hopsworks.client.exceptions import GitException -from hopsworks.engine import git_engine from hopsworks.core import git_provider_api -from typing import List, Union +from hopsworks.engine import git_engine from hopsworks.git_file_status import GitFileStatus -import json -import logging - class GitApi: def __init__( @@ -347,7 +347,7 @@ def _status(self, repo_id): status_dict = json.loads(git_op.command_result_message) file_status = None - if status_dict is not None and type(status_dict["status"]) is list: + if status_dict is not None and isinstance(status_dict["status"], list): file_status = [] for status in status_dict["status"]: file_status.append( diff --git a/python/hopsworks/core/git_provider_api.py b/python/hopsworks/core/git_provider_api.py index b06c95fb4..139109928 100644 --- a/python/hopsworks/core/git_provider_api.py +++ b/python/hopsworks/core/git_provider_api.py @@ -14,11 +14,11 @@ # limitations under the License. # +import json + from hopsworks import client, git_provider -from hopsworks.engine import git_engine from hopsworks.client.exceptions import GitException - -import json +from hopsworks.engine import git_engine class GitProviderApi: diff --git a/python/hopsworks/core/job_api.py b/python/hopsworks/core/job_api.py index e40afe8c0..4a93f1bfd 100644 --- a/python/hopsworks/core/job_api.py +++ b/python/hopsworks/core/job_api.py @@ -16,7 +16,7 @@ import json -from hopsworks import client, job, util, job_schedule +from hopsworks import client, job, job_schedule, util from hopsworks.client.exceptions import RestAPIError diff --git a/python/hopsworks/core/kafka_api.py b/python/hopsworks/core/kafka_api.py index f1ae2ece9..b597a89b9 100644 --- a/python/hopsworks/core/kafka_api.py +++ b/python/hopsworks/core/kafka_api.py @@ -14,10 +14,11 @@ # limitations under the License. # -from hopsworks import client, kafka_topic, kafka_schema, constants -from hopsworks.client.exceptions import KafkaException import json import socket + +from hopsworks import client, constants, kafka_schema, kafka_topic +from hopsworks.client.exceptions import KafkaException from hopsworks.client.external import Client @@ -366,7 +367,7 @@ def get_default_config(self): constants.KAFKA_SSL_CONFIG.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG: "none", } _client = client.get_instance() - if type(_client) == Client: + if type(_client) is Client: config[constants.KAFKA_PRODUCER_CONFIG.BOOTSTRAP_SERVERS_CONFIG] = ",".join( [ endpoint.replace("EXTERNAL://", "") diff --git a/python/hopsworks/core/secret_api.py b/python/hopsworks/core/secret_api.py index 169ac6ff1..bf47b6ad8 100644 --- a/python/hopsworks/core/secret_api.py +++ b/python/hopsworks/core/secret_api.py @@ -72,7 +72,9 @@ def get_secret(self, name: str, owner: str = None) -> secret.Secret: "shared", ] - return secret.Secret.from_response_json(_client._send_request("GET", path_params, query_params=query_params))[0] + return secret.Secret.from_response_json( + _client._send_request("GET", path_params, query_params=query_params) + )[0] def get(self, name: str, owner: str = None) -> str: """Get the secret's value. @@ -90,16 +92,20 @@ def get(self, name: str, owner: str = None) -> str: return self.get_secret(name=name, owner=owner).value except RestAPIError as e: if ( - e.response.json().get("errorCode", "") == 160048 - and e.response.status_code == 404 - and util.is_interactive() + e.response.json().get("errorCode", "") == 160048 + and e.response.status_code == 404 + and util.is_interactive() ): - secret_input = getpass.getpass(prompt="\nCould not find secret, enter value here to create it: ") + secret_input = getpass.getpass( + prompt="\nCould not find secret, enter value here to create it: " + ) return self.create_secret(name, secret_input).value else: raise e - def create_secret(self, name: str, value: str, project: str = None) -> secret.Secret: + def create_secret( + self, name: str, value: str, project: str = None + ) -> secret.Secret: """Create a new secret. ```python diff --git a/python/hopsworks/engine/execution_engine.py b/python/hopsworks/engine/execution_engine.py index 7a7af92ff..5ff14cee7 100644 --- a/python/hopsworks/engine/execution_engine.py +++ b/python/hopsworks/engine/execution_engine.py @@ -14,13 +14,13 @@ # limitations under the License. # -from hopsworks.core import dataset_api, execution_api -import os import logging +import os import time import uuid from hopsworks.client.exceptions import JobExecutionException, RestAPIError +from hopsworks.core import dataset_api, execution_api class ExecutionEngine: diff --git a/python/hopsworks/engine/git_engine.py b/python/hopsworks/engine/git_engine.py index f0aa74d3c..3fb506e91 100644 --- a/python/hopsworks/engine/git_engine.py +++ b/python/hopsworks/engine/git_engine.py @@ -14,10 +14,11 @@ # limitations under the License. # -from hopsworks.core import git_op_execution_api -from hopsworks.client.exceptions import GitException -import time import logging +import time + +from hopsworks.client.exceptions import GitException +from hopsworks.core import git_op_execution_api class GitEngine: diff --git a/python/hopsworks/environment.py b/python/hopsworks/environment.py index 3d087cad0..f286bdf8c 100644 --- a/python/hopsworks/environment.py +++ b/python/hopsworks/environment.py @@ -133,16 +133,18 @@ def install_wheel(self, path: str, await_installation: Optional[bool] = True): "packageSource": "WHEEL", } - library_rest = self._library_api._install( - library_name, self.name, library_spec - ) + library_rest = self._library_api._install(library_name, self.name, library_spec) if await_installation: - return self._environment_engine.await_library_command(self.name, library_name) + return self._environment_engine.await_library_command( + self.name, library_name + ) return library_rest - def install_requirements(self, path: str, await_installation: Optional[bool] = True): + def install_requirements( + self, path: str, await_installation: Optional[bool] = True + ): """Install libraries specified in a requirements.txt file ```python @@ -184,12 +186,12 @@ def install_requirements(self, path: str, await_installation: Optional[bool] = T "packageSource": "REQUIREMENTS_TXT", } - library_rest = self._library_api._install( - library_name, self.name, library_spec - ) + library_rest = self._library_api._install(library_name, self.name, library_spec) if await_installation: - return self._environment_engine.await_library_command(self.name, library_name) + return self._environment_engine.await_library_command( + self.name, library_name + ) return library_rest diff --git a/python/hopsworks/flink_cluster.py b/python/hopsworks/flink_cluster.py index 5f2936aad..443ead2f8 100644 --- a/python/hopsworks/flink_cluster.py +++ b/python/hopsworks/flink_cluster.py @@ -15,10 +15,10 @@ # import time -from hopsworks.engine import execution_engine -from hopsworks.core import execution_api -from hopsworks.core import flink_cluster_api + from hopsworks import util +from hopsworks.core import execution_api, flink_cluster_api +from hopsworks.engine import execution_engine class FlinkCluster: diff --git a/python/hopsworks/git_commit.py b/python/hopsworks/git_commit.py index 9018162a4..53524b850 100644 --- a/python/hopsworks/git_commit.py +++ b/python/hopsworks/git_commit.py @@ -14,10 +14,11 @@ # limitations under the License. # -from hopsworks import util -import humps import json +import humps +from hopsworks import util + class GitCommit: def __init__( diff --git a/python/hopsworks/git_file_status.py b/python/hopsworks/git_file_status.py index 016346623..5001974cd 100644 --- a/python/hopsworks/git_file_status.py +++ b/python/hopsworks/git_file_status.py @@ -14,9 +14,9 @@ # limitations under the License. # -import humps import json +import humps from hopsworks import util diff --git a/python/hopsworks/git_provider.py b/python/hopsworks/git_provider.py index f804fb28c..4a8649456 100644 --- a/python/hopsworks/git_provider.py +++ b/python/hopsworks/git_provider.py @@ -14,10 +14,11 @@ # limitations under the License. # -import humps import json -from hopsworks.core import git_provider_api + +import humps from hopsworks import util +from hopsworks.core import git_provider_api class GitProvider: diff --git a/python/hopsworks/git_remote.py b/python/hopsworks/git_remote.py index 1733bbac6..89712346b 100644 --- a/python/hopsworks/git_remote.py +++ b/python/hopsworks/git_remote.py @@ -14,10 +14,11 @@ # limitations under the License. # -import humps import json -from hopsworks.core import git_remote_api + +import humps from hopsworks import util +from hopsworks.core import git_remote_api class GitRemote: diff --git a/python/hopsworks/git_repo.py b/python/hopsworks/git_repo.py index bbdceb36f..a651d0e8f 100644 --- a/python/hopsworks/git_repo.py +++ b/python/hopsworks/git_repo.py @@ -14,11 +14,12 @@ # limitations under the License. # -import humps import json -from hopsworks import user, git_commit, util -from hopsworks.core import git_api, git_remote_api, dataset_api from typing import List, Union + +import humps +from hopsworks import git_commit, user, util +from hopsworks.core import dataset_api, git_api, git_remote_api from hopsworks.git_file_status import GitFileStatus diff --git a/python/hopsworks/job_schedule.py b/python/hopsworks/job_schedule.py index 48e022572..301b04122 100644 --- a/python/hopsworks/job_schedule.py +++ b/python/hopsworks/job_schedule.py @@ -14,10 +14,10 @@ # limitations under the License. # -import humps import json from datetime import datetime, timezone +import humps from hopsworks import util @@ -30,7 +30,7 @@ def __init__( next_execution_date_time=None, id=None, end_date_time=None, - **kwargs + **kwargs, ): self._id = id self._start_date_time = ( diff --git a/python/hopsworks/kafka_schema.py b/python/hopsworks/kafka_schema.py index 539db920a..c57831809 100644 --- a/python/hopsworks/kafka_schema.py +++ b/python/hopsworks/kafka_schema.py @@ -14,10 +14,11 @@ # limitations under the License. # -import humps import json -from hopsworks.core import kafka_api + +import humps from hopsworks import util +from hopsworks.core import kafka_api class KafkaSchema: diff --git a/python/hopsworks/kafka_topic.py b/python/hopsworks/kafka_topic.py index 0ad0fbe2d..fc5a8a71c 100644 --- a/python/hopsworks/kafka_topic.py +++ b/python/hopsworks/kafka_topic.py @@ -14,10 +14,11 @@ # limitations under the License. # -import humps import json -from hopsworks.core import kafka_api + +import humps from hopsworks import util +from hopsworks.core import kafka_api class KafkaTopic: diff --git a/python/hopsworks/library.py b/python/hopsworks/library.py index ac73f261a..b0891f298 100644 --- a/python/hopsworks/library.py +++ b/python/hopsworks/library.py @@ -15,7 +15,6 @@ # import humps - from hopsworks import command diff --git a/python/hopsworks/project.py b/python/hopsworks/project.py index 79ccff369..d975eb987 100644 --- a/python/hopsworks/project.py +++ b/python/hopsworks/project.py @@ -133,7 +133,7 @@ def get_feature_store( from hsfs import connection _client = client.get_instance() - if type(_client) == Client: # If external client + if type(_client) is Client: # If external client if _client._host == constants.HOSTS.APP_HOST and engine is None: engine = "python" return connection( @@ -168,7 +168,7 @@ def get_model_registry(self): from hsml import connection _client = client.get_instance() - if type(_client) == Client: # If external client + if type(_client) is Client: # If external client return connection( host=_client._host, port=_client._port, @@ -198,7 +198,7 @@ def get_model_serving(self): from hsml import connection _client = client.get_instance() - if type(_client) == Client: # If external client + if type(_client) is Client: # If external client return connection( host=_client._host, port=_client._port, @@ -215,7 +215,7 @@ def get_kafka_api(self): `KafkaApi`: The Kafka Api handle """ _client = client.get_instance() - if type(_client) == Client: + if type(_client) is Client: _client.download_certs(self.name) return self._kafka_api @@ -226,7 +226,7 @@ def get_opensearch_api(self): `OpenSearchApi`: The OpenSearch Api handle """ _client = client.get_instance() - if type(_client) == Client: + if type(_client) is Client: _client.download_certs(self.name) return self._opensearch_api diff --git a/python/hopsworks/user.py b/python/hopsworks/user.py index 3b08cb277..51a862fa3 100644 --- a/python/hopsworks/user.py +++ b/python/hopsworks/user.py @@ -15,8 +15,8 @@ # import json -import humps +import humps from hopsworks import util diff --git a/python/hopsworks/util.py b/python/hopsworks/util.py index 35785783f..b5f46f29b 100644 --- a/python/hopsworks/util.py +++ b/python/hopsworks/util.py @@ -81,6 +81,8 @@ def get_hostname_replaced_url(sub_path: str): url_parsed = client.get_instance().replace_public_host(urlparse(href)) return url_parsed.geturl() + def is_interactive(): import __main__ as main - return not hasattr(main, '__file__') + + return not hasattr(main, "__file__") diff --git a/python/hsfs/client/base.py b/python/hsfs/client/base.py index 25850833e..eeb6eb369 100644 --- a/python/hsfs/client/base.py +++ b/python/hsfs/client/base.py @@ -19,7 +19,6 @@ import os import textwrap import time -from abc import ABC from pathlib import Path import furl @@ -39,7 +38,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -class Client(ABC): +class Client: TOKEN_FILE = "token.jwt" TOKEN_EXPIRED_RETRY_INTERVAL = 0.6 TOKEN_EXPIRED_MAX_RETRIES = 10 diff --git a/python/hsfs/core/explicit_provenance.py b/python/hsfs/core/explicit_provenance.py index 2ce4f8c80..450a00310 100644 --- a/python/hsfs/core/explicit_provenance.py +++ b/python/hsfs/core/explicit_provenance.py @@ -415,9 +415,7 @@ def default(self, obj): } elif isinstance( obj, - ( - storage_connector.StorageConnector - ), + (storage_connector.StorageConnector), ): return { "name": obj.name, diff --git a/python/hsfs/core/feature_logging.py b/python/hsfs/core/feature_logging.py index b29a7317d..bdf68d2ca 100644 --- a/python/hsfs/core/feature_logging.py +++ b/python/hsfs/core/feature_logging.py @@ -6,25 +6,32 @@ class FeatureLogging: - - def __init__(self, id: int, - transformed_features: "feature_group.FeatureGroup", - untransformed_features: "feature_group.FeatureGroup"): + def __init__( + self, + id: int, + transformed_features: "feature_group.FeatureGroup", + untransformed_features: "feature_group.FeatureGroup", + ): self._id = id self._transformed_features = transformed_features self._untransformed_features = untransformed_features @classmethod - def from_response_json(cls, json_dict: Dict[str, Any]) -> 'FeatureLogging': + def from_response_json(cls, json_dict: Dict[str, Any]) -> "FeatureLogging": from hsfs.feature_group import FeatureGroup # avoid circular import + json_decamelized = humps.decamelize(json_dict) - transformed_features = json_decamelized.get('transformed_log') - untransformed_features = json_decamelized.get('untransformed_log') + transformed_features = json_decamelized.get("transformed_log") + untransformed_features = json_decamelized.get("untransformed_log") if transformed_features: transformed_features = FeatureGroup.from_response_json(transformed_features) if untransformed_features: - untransformed_features = FeatureGroup.from_response_json(untransformed_features) - return cls(json_decamelized.get('id'), transformed_features, untransformed_features) + untransformed_features = FeatureGroup.from_response_json( + untransformed_features + ) + return cls( + json_decamelized.get("id"), transformed_features, untransformed_features + ) @property def transformed_features(self) -> "feature_group.FeatureGroup": @@ -40,9 +47,9 @@ def id(self) -> str: def to_dict(self): return { - 'id': self._id, - 'transformed_log': self._transformed_features, - 'untransformed_log': self._untransformed_features, + "id": self._id, + "transformed_log": self._transformed_features, + "untransformed_log": self._untransformed_features, } def json(self) -> Dict[str, Any]: @@ -50,4 +57,3 @@ def json(self) -> Dict[str, Any]: def __repr__(self): return self.json() - diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 0be48a72a..236d0f2b5 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -797,6 +797,11 @@ def get_batch_data( else: return feature_dataframe + def transform_batch_data(self, features, transformation_functions): + return engine.get_instance()._apply_transformation_function( + transformation_functions, dataset=features, inplace=False + ) + def add_tag( self, feature_view_obj, name: str, value, training_dataset_version=None ): diff --git a/python/hsfs/core/opensearch.py b/python/hsfs/core/opensearch.py index 3865c7ab0..6e1ca5091 100644 --- a/python/hsfs/core/opensearch.py +++ b/python/hsfs/core/opensearch.py @@ -54,7 +54,8 @@ def error_handler_wrapper(*args, **kw): caused_by = e.info.get("error") and e.info["error"].get("caused_by") if caused_by and caused_by["type"] == "illegal_argument_exception": raise OpenSearchClientSingleton()._create_vector_database_exception( - caused_by["reason"]) from e + caused_by["reason"] + ) from e raise VectorDatabaseException( VectorDatabaseException.OTHERS, f"Error in Opensearch request: {e}", @@ -100,16 +101,19 @@ def get_options(cls, options: dict): attribute values of the OpensearchRequestOption class, and values are obtained either from the provided options or default values if not available. """ - default_option = (cls.DEFAULT_OPTION_MAP - if cls.get_version() < (2, 3) - else cls.DEFAULT_OPTION_MAP_V2_3) + default_option = ( + cls.DEFAULT_OPTION_MAP + if cls.get_version() < (2, 3) + else cls.DEFAULT_OPTION_MAP_V2_3 + ) if options: # make lower case to avoid issues with cases options = {k.lower(): v for k, v in options.items()} new_options = {} for option, value in default_option.items(): if option in options: - if (option == "timeout" + if ( + option == "timeout" and cls.get_version() < (2, 3) and isinstance(options[option], int) ): @@ -161,7 +165,9 @@ def _refresh_opensearch_connection(self): ) @_handle_opensearch_exception def search(self, index=None, body=None, options=None): - return self._opensearch_client.search(body=body, index=index, params=OpensearchRequestOption.get_options(options)) + return self._opensearch_client.search( + body=body, index=index, params=OpensearchRequestOption.get_options(options) + ) @retry( wait_exponential_multiplier=1000, diff --git a/python/hsfs/core/storage_connector_api.py b/python/hsfs/core/storage_connector_api.py index d30201a11..01d1898de 100644 --- a/python/hsfs/core/storage_connector_api.py +++ b/python/hsfs/core/storage_connector_api.py @@ -101,9 +101,7 @@ def get_kafka_connector( _client._send_request("GET", path_params, query_params=query_params) ) - def get_feature_groups_provenance( - self, storage_connector_instance - ): + def get_feature_groups_provenance(self, storage_connector_instance): """Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted @@ -135,6 +133,7 @@ def get_feature_groups_provenance( } links_json = _client._send_request("GET", path_params, query_params) from hsfs.core import explicit_provenance + return explicit_provenance.Links.from_response_json( links_json, explicit_provenance.Links.Direction.DOWNSTREAM, diff --git a/python/hsfs/core/vector_db_client.py b/python/hsfs/core/vector_db_client.py index b9fdc86ab..71060c983 100644 --- a/python/hsfs/core/vector_db_client.py +++ b/python/hsfs/core/vector_db_client.py @@ -96,7 +96,9 @@ def init(self): ) self._embedding_fg_by_join_index[i] = join_fg for embedding_feature in join_fg.embedding_index.get_embeddings(): - self._td_embedding_feature_names.add((join.prefix or "") + embedding_feature.name) + self._td_embedding_feature_names.add( + (join.prefix or "") + embedding_feature.name + ) vdb_col_td_col_map = {} for feat in join_fg.features: vdb_col_td_col_map[ @@ -191,10 +193,13 @@ def find_neighbors( return [ ( 1 / item["_score"] - 1, - self._convert_to_pandas_type(embedding_feature.feature_group.features, self._rewrite_result_key( - item["_source"], - self._fg_vdb_col_td_col_map[embedding_feature.feature_group.id], - )), + self._convert_to_pandas_type( + embedding_feature.feature_group.features, + self._rewrite_result_key( + item["_source"], + self._fg_vdb_col_td_col_map[embedding_feature.feature_group.id], + ), + ), ) for item in results["hits"]["hits"] ] @@ -207,11 +212,15 @@ def _convert_to_pandas_type(self, schema, result): if not feature_value: # Feature value can be null continue elif feature_type == "date": - result[feature_name] = datetime.utcfromtimestamp(feature_value // 10**3).date() + result[feature_name] = datetime.utcfromtimestamp( + feature_value // 10**3 + ).date() elif feature_type == "timestamp": # convert timestamp in ms to datetime in s result[feature_name] = datetime.utcfromtimestamp(feature_value // 10**3) - elif feature_type == "binary" or (feature.is_complex() and feature not in self._embedding_features): + elif feature_type == "binary" or ( + feature.is_complex() and feature not in self._embedding_features + ): result[feature_name] = base64.b64decode(feature_value) return result @@ -337,18 +346,20 @@ def read(self, fg_id, schema, keys=None, pk=None, index_name=None, n=10): if VectorDbClient._index_result_limit_n.get(index_name) is None: try: query["size"] = 2**31 - 1 - self._opensearch_client.search(body=query, - index=index_name) + self._opensearch_client.search(body=query, index=index_name) except VectorDatabaseException as e: if ( - e.reason == VectorDatabaseException.REQUESTED_NUM_RESULT_TOO_LARGE + e.reason + == VectorDatabaseException.REQUESTED_NUM_RESULT_TOO_LARGE and e.info.get( - VectorDatabaseException.REQUESTED_NUM_RESULT_TOO_LARGE_INFO_N - ) - ): - VectorDbClient._index_result_limit_n[index_name] = e.info.get( VectorDatabaseException.REQUESTED_NUM_RESULT_TOO_LARGE_INFO_N ) + ): + VectorDbClient._index_result_limit_n[index_name] = ( + e.info.get( + VectorDatabaseException.REQUESTED_NUM_RESULT_TOO_LARGE_INFO_N + ) + ) else: raise e query["size"] = VectorDbClient._index_result_limit_n.get(index_name) @@ -356,24 +367,32 @@ def read(self, fg_id, schema, keys=None, pk=None, index_name=None, n=10): results = self._opensearch_client.search(body=query, index=index_name) # https://opensearch.org/docs/latest/search-plugins/knn/approximate-knn/#spaces return [ - self._convert_to_pandas_type(schema, self._rewrite_result_key( - item["_source"], self._fg_vdb_col_td_col_map[fg_id] - )) + self._convert_to_pandas_type( + schema, + self._rewrite_result_key( + item["_source"], self._fg_vdb_col_td_col_map[fg_id] + ), + ) for item in results["hits"]["hits"] ] @staticmethod - def read_feature_group(feature_group: "hsfs.feature_group.FeatureGroup", n: int =None) -> list: + def read_feature_group( + feature_group: "hsfs.feature_group.FeatureGroup", n: int = None + ) -> list: if feature_group.embedding_index: vector_db_client = VectorDbClient(feature_group.select_all()) results = vector_db_client.read( feature_group.id, feature_group.features, - pk=feature_group.embedding_index.col_prefix + feature_group.primary_key[0], + pk=feature_group.embedding_index.col_prefix + + feature_group.primary_key[0], index_name=feature_group.embedding_index.index_name, - n=n + n=n, ) - return [[result[f.name] for f in feature_group.features] for result in results] + return [ + [result[f.name] for f in feature_group.features] for result in results + ] else: raise FeatureStoreException("Feature group does not have embedding.") diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 598a9cfc0..1c467028b 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -17,7 +17,6 @@ import itertools import logging -import warnings from base64 import b64decode from datetime import datetime, timezone from io import BytesIO @@ -553,161 +552,11 @@ def assemble_feature_vector( for fname in self._untransformed_feature_vector_col_name ] - def _check_feature_vectors_type_and_convert_to_dict( - self, - feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], - ) -> Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]: - """ - Function that converts an input feature vector into a list of dictionaries. - - # Arguments - feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be converted. - - # Returns - `Tuple[Dict[str, Any], Literal["pandas", "polars", "list"]]`: A tuple that contains the feature vector as a dictionary and a string denoting the data type of the input feature vector. - - """ - if isinstance(feature_vectors, pd.DataFrame): - return_type = "pandas" - feature_vectors = feature_vectors.to_dict(orient="records") - - elif isinstance(feature_vectors, pl.DataFrame): - return_type = "polars" - feature_vectors = feature_vectors.to_pandas() - feature_vectors = feature_vectors.to_dict(orient="records") - - elif isinstance(feature_vectors, list) and feature_vectors: - if all( - isinstance(feature_vector, list) for feature_vector in feature_vectors - ): - return_type = "list" - feature_vectors = [ - self.get_untransformed_features_map(feature_vector) - for feature_vector in feature_vectors - ] - - else: - return_type = "list" - feature_vectors = [self.get_untransformed_features_map(feature_vectors)] - - else: - raise exceptions.FeatureStoreException( - "Unsupported input type for feature vector. Supported types are `List`, `pandas.DataFrame`, `polars.DataFrame`" - ) - return feature_vectors, return_type - - def transform( - self, - feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], - ) -> Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]: - """ - Applies model dependent transformation on the provided feature vector. - - # Arguments - feature_vectors: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vectors to be transformed using attached model-dependent transformations. - - # Returns - `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The transformed feature vector. - """ - if not self._model_dependent_transformation_functions: - warnings.warn( - "Feature view does not have any attached model-dependent transformations. Returning input feature vectors.", - stacklevel=0, - ) - return feature_vectors - - feature_vectors, return_type = ( - self._check_feature_vectors_type_and_convert_to_dict(feature_vectors) - ) - transformed_feature_vectors = [] - for feature_vector in feature_vectors: - transformed_feature_vector = self.apply_model_dependent_transformations( - feature_vector - ) - transformed_feature_vectors.append( - [ - transformed_feature_vector.get(fname, None) - for fname in self.transformed_feature_vector_col_name - ] - ) - - if len(transformed_feature_vectors) == 1: - batch = False - transformed_feature_vectors = transformed_feature_vectors[0] - else: - batch = True - - return self.handle_feature_vector_return_type( - transformed_feature_vectors, - batch=batch, - inference_helper=False, - return_type=return_type, - transformed=True, - ) - - def compute_on_demand_features( - self, - feature_vectors: Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame], - request_parameters: Union[List[Dict[str, Any]], Dict[str, Any]], - ): - """ - Function computes on-demand features present in the feature view. - - # Arguments - feature_vector: `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`. The feature vector to be transformed. - request_parameters: Request parameters required by on-demand transformation functions to compute on-demand features present in the feature view. - # Returns - `Union[List[Any], List[List[Any]], pd.DataFrame, pl.DataFrame]`: The feature vector that contains all on-demand features in the feature view. - """ - if not self._on_demand_transformation_functions: - warnings.warn( - "Feature view does not have any on-demand features. Returning input feature vectors.", - stacklevel=1, - ) - return feature_vectors - - request_parameters = {} if not request_parameters else request_parameters - # Convert feature vectors to dictionary - feature_vectors, return_type = ( - self._check_feature_vectors_type_and_convert_to_dict(feature_vectors) - ) - # Check if all request parameters are provided - # If request parameter is a dictionary then copy it to list with the same length as that of entires - request_parameters = ( - [request_parameters] * len(feature_vectors) - if isinstance(request_parameters, dict) - else request_parameters - ) - self.check_missing_request_parameters( - features=feature_vectors[0], request_parameters=request_parameters[0] - ) - on_demand_feature_vectors = [] - for feature_vector, request_parameter in zip( - feature_vectors, request_parameters - ): - on_demand_feature_vector = self.apply_on_demand_transformations( - feature_vector, request_parameter - ) - on_demand_feature_vectors.append( - [ - on_demand_feature_vector.get(fname, None) - for fname in self._untransformed_feature_vector_col_name - ] - ) - - if len(on_demand_feature_vectors) == 1: - batch = False - on_demand_feature_vectors = on_demand_feature_vectors[0] - else: - batch = True - - return self.handle_feature_vector_return_type( - on_demand_feature_vectors, - batch=batch, - inference_helper=False, - return_type=return_type, - transformed=False, - ) + def transform_feature_vectors(self, batch_features): + return [ + self.apply_transformation(self.get_untransformed_features_map(features)) + for features in batch_features + ] def get_untransformed_features_map(self, features) -> Dict[str, Any]: return dict( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 4e1f67c7d..b07a09395 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1403,9 +1403,9 @@ def _convert_spark_type_to_offline_type(spark_type, using_hudi): if not using_hudi: return spark_type.simpleString() - elif type(spark_type) == ByteType: + elif type(spark_type) is ByteType: return "int" - elif type(spark_type) == ShortType: + elif type(spark_type) is ShortType: return "int" elif type(spark_type) in [ BooleanType, diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 96596a5b0..8e0c90b0b 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -211,7 +211,9 @@ def get_feature_groups(self): feature_groups_provenance = self.get_feature_groups_provenance() if feature_groups_provenance.inaccessible or feature_groups_provenance.deleted: - _logger.info("There are deleted or inaccessible feature groups. For more details access `get_feature_groups_provenance`") + _logger.info( + "There are deleted or inaccessible feature groups. For more details access `get_feature_groups_provenance`" + ) if feature_groups_provenance.accessible: return feature_groups_provenance.accessible diff --git a/python/hsfs/usage.py b/python/hsfs/usage.py index 3428de21f..bd724c293 100644 --- a/python/hsfs/usage.py +++ b/python/hsfs/usage.py @@ -85,16 +85,18 @@ def get_timezone(self): return self._timezone def json(self): - return json.dumps({ - "platform": self.get_platform(), - "hsml_version": self.get_hsml_version(), - "hsfs_version": self.get_hsfs_version(), - "hopsworks_version": self.get_hopsworks_version(), - "user_id": self.get_user_id(), - "backend_version": self.get_backend_version(), - "timezone": str(self.get_timezone()), - "python_version": self.get_python_version(), - }) + return json.dumps( + { + "platform": self.get_platform(), + "hsml_version": self.get_hsml_version(), + "hsfs_version": self.get_hsfs_version(), + "hopsworks_version": self.get_hopsworks_version(), + "user_id": self.get_user_id(), + "backend_version": self.get_backend_version(), + "timezone": str(self.get_timezone()), + "python_version": self.get_python_version(), + } + ) class MethodCounter: diff --git a/python/hsml/client/istio/utils/numpy_codec.py b/python/hsml/client/istio/utils/numpy_codec.py index 3c6ecb606..bf22bcf34 100644 --- a/python/hsml/client/istio/utils/numpy_codec.py +++ b/python/hsml/client/istio/utils/numpy_codec.py @@ -38,7 +38,7 @@ def to_np_dtype(dtype): def from_np_dtype(np_dtype): - if np_dtype == bool: + if np_dtype is bool: return "BOOL" elif np_dtype == np.int8: return "INT8" diff --git a/python/hsml/connection.py b/python/hsml/connection.py index 899589a4e..f4ca72512 100644 --- a/python/hsml/connection.py +++ b/python/hsml/connection.py @@ -97,6 +97,7 @@ def __init__( api_key_value: str = None, ): from hsml.core import model_api, model_registry_api, model_serving_api + self._host = host self._port = port self._project = project @@ -163,6 +164,7 @@ def connect(self): """ from hsml import client from hsml.core import model_api + self._connected = True try: # init client @@ -196,6 +198,7 @@ def close(self): Usage is recommended but optional. """ from hsml import client + client.stop() self._model_api = None self._connected = False diff --git a/python/hsml/model_serving.py b/python/hsml/model_serving.py index 21d04b833..d298e669f 100644 --- a/python/hsml/model_serving.py +++ b/python/hsml/model_serving.py @@ -285,7 +285,12 @@ def postprocess(self, outputs): return Transformer(script_file=script_file, resources=resources) - def create_deployment(self, predictor: Predictor, name: Optional[str] = None, environment: Optional[str] = None): + def create_deployment( + self, + predictor: Predictor, + name: Optional[str] = None, + environment: Optional[str] = None, + ): """Create a Deployment metadata object. !!! example diff --git a/python/hsml/util.py b/python/hsml/util.py index 6ef6d9053..6fffc4033 100644 --- a/python/hsml/util.py +++ b/python/hsml/util.py @@ -100,6 +100,7 @@ def set_model_class(model): from hsml.sklearn.model import Model as SkLearnModel from hsml.tensorflow.model import Model as TFModel from hsml.torch.model import Model as TorchModel + if "href" in model: _ = model.pop("href") if "type" in model: # backwards compatibility @@ -241,6 +242,7 @@ def get_predictor_for_model(model, **kwargs): from hsml.tensorflow.predictor import Predictor as TFPredictor from hsml.torch.model import Model as TorchModel from hsml.torch.predictor import Predictor as TorchPredictor + if not isinstance(model, BaseModel): raise ValueError( "model is of type {}, but an instance of {} class is expected".format( diff --git a/python/tests/core/test_feature_group_api.py b/python/tests/core/test_feature_group_api.py index 37459d897..9366f4401 100644 --- a/python/tests/core/test_feature_group_api.py +++ b/python/tests/core/test_feature_group_api.py @@ -54,9 +54,7 @@ def test_get_smart_with_infer_type(self, mocker, backend_fixtures): def test_check_features(self, mocker, backend_fixtures): # Arrange fg_api = feature_group_api.FeatureGroupApi() - json = backend_fixtures["feature_group"]["get_basic_info"][ - "response" - ] + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = fg_mod.FeatureGroup.from_response_json(json) # Act diff --git a/python/tests/core/test_opensearch.py b/python/tests/core/test_opensearch.py index 3ae804cdc..5a4bcb681 100644 --- a/python/tests/core/test_opensearch.py +++ b/python/tests/core/test_opensearch.py @@ -69,7 +69,6 @@ def test_create_vector_database_exception( class TestOpensearchRequestOption: - def test_version_1_no_options(self): OpensearchRequestOption.get_version = lambda: (1, 1) options = OpensearchRequestOption.get_options({}) diff --git a/python/tests/core/test_vector_db_client.py b/python/tests/core/test_vector_db_client.py index 4f17a1dbe..a4261a5dd 100644 --- a/python/tests/core/test_vector_db_client.py +++ b/python/tests/core/test_vector_db_client.py @@ -220,7 +220,9 @@ def test_check_filter_when_filter_is_not_logic_or_filter(self): self.target._check_filter("f1 > 20", self.fg2) def test_read_with_keys(self): - actual = self.target.read(self.fg.id, self.fg.features, keys={"f1": 10, "f2": 20}) + actual = self.target.read( + self.fg.id, self.fg.features, keys={"f1": 10, "f2": 20} + ) expected_query = { "query": {"bool": {"must": [{"match": {"f1": 10}}, {"match": {"f2": 20}}]}}, diff --git a/python/tests/test_util.py b/python/tests/test_util.py index b39501162..330c76b5c 100644 --- a/python/tests/test_util.py +++ b/python/tests/test_util.py @@ -736,7 +736,9 @@ def test_get_dataset_type_HIVEDB_with_dfs(self): assert db_type == "HIVEDB" def test_get_dataset_type_DATASET(self): - db_type = hsfs.util.get_dataset_type("/Projects/temp/Resources/kafka__tstore.jks") + db_type = hsfs.util.get_dataset_type( + "/Projects/temp/Resources/kafka__tstore.jks" + ) assert db_type == "DATASET" def test_get_dataset_type_DATASET_with_dfs(self):