Skip to content

Commit

Permalink
[FSTORE-1484] Make the way project ID is acquired uniform (logicalclo…
Browse files Browse the repository at this point in the history
…cks#256)

* Make the way project ID is acquired uniform

That is, make it so that the project ID is always acquired from the current client.

* Ruff
  • Loading branch information
aversey authored Aug 2, 2024
1 parent 3de1efc commit 23802f8
Show file tree
Hide file tree
Showing 39 changed files with 315 additions and 518 deletions.
25 changes: 24 additions & 1 deletion python/hopsworks/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

import importlib
import os
import re
import sys
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
self._api_key_file = api_key_file
self._api_key_value = api_key_value
self._connected = False
self._engine = None

self.connect()

Expand Down Expand Up @@ -239,12 +241,33 @@ def connect(self):
try:
# init client
if client.base.Client.REST_ENDPOINT not in os.environ:
# determine engine, needed to init client
if (self._engine is not None and self._engine.lower() == "spark") or (
self._engine is None and importlib.util.find_spec("pyspark")
):
self._engine = "spark"
elif (
self._engine is not None and self._engine.lower() == "python"
) or (self._engine is None and not importlib.util.find_spec("pyspark")):
self._engine = "python"
elif self._engine is not None and self._engine.lower() == "training":
self._engine = "training"
elif (
self._engine is not None
and self._engine.lower() == "spark-no-metastore"
):
self._engine = "spark-no-metastore"
else:
raise ConnectionError(
"Engine you are trying to initialize is unknown. "
"Supported engines are `'spark'`, `'python'` and `'training'`."
)
client.init(
"external",
self._host,
self._port,
self._project,
None,
self._engine,
self._hostname_verification,
self._trust_store_path,
self._cert_folder,
Expand Down
23 changes: 7 additions & 16 deletions python/hopsworks/core/execution_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@


class ExecutionsApi:
def __init__(
self,
project_id,
):
self._project_id = project_id

def _start(self, job, args: str = None):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", job.name, "executions"]
path_params = ["project", _client._project_id, "jobs", job.name, "executions"]

return execution.Execution.from_response_json(
_client._send_request("POST", path_params, data=args), self._project_id, job
_client._send_request("POST", path_params, data=args), job
)

def _get(self, job, id):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
job.name,
"executions",
Expand All @@ -45,14 +39,12 @@ def _get(self, job, id):

headers = {"content-type": "application/json"}
return execution.Execution.from_response_json(
_client._send_request("GET", path_params, headers=headers),
self._project_id,
job,
_client._send_request("GET", path_params, headers=headers), job
)

def _get_all(self, job):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", job.name, "executions"]
path_params = ["project", _client._project_id, "jobs", job.name, "executions"]

query_params = {"sort_by": "submissiontime:desc"}

Expand All @@ -61,15 +53,14 @@ def _get_all(self, job):
_client._send_request(
"GET", path_params, headers=headers, query_params=query_params
),
self._project_id,
job,
)

def _delete(self, job_name, id):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
job_name,
"executions",
Expand All @@ -81,7 +72,7 @@ def _stop(self, job_name: str, id: int) -> None:
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
job_name,
"executions",
Expand Down
32 changes: 9 additions & 23 deletions python/hopsworks/core/flink_cluster_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,8 @@


class FlinkClusterApi:
def __init__(
self,
project_id,
project_name,
):
self._project_id = project_id
self._project_name = project_name
self._job_api = job_api.JobsApi(project_id, project_name)
def __init__(self):
self._job_api = job_api.JobsApi()

def get_configuration(self):
"""Get configuration for the Flink cluster.
Expand Down Expand Up @@ -85,21 +79,17 @@ def setup_cluster(self, name: str, config=None):
def _create_cluster(self, name: str, config: dict):
_client = client.get_instance()

config = util.validate_job_conf(config, self._project_name)
config = util.validate_job_conf(config, _client._project_name)

path_params = ["project", self._project_id, "jobs", name]
path_params = ["project", _client._project_id, "jobs", name]

headers = {"content-type": "application/json"}
flink_job = job.Job.from_response_json(
_client._send_request(
"PUT", path_params, headers=headers, data=json.dumps(config)
),
self._project_id,
self._project_name,
)
flink_cluster_obj = flink_cluster.FlinkCluster(
flink_job, self._project_id, self._project_name
)
)
flink_cluster_obj = flink_cluster.FlinkCluster(flink_job)
print(flink_cluster_obj.get_url())
return flink_cluster_obj

Expand All @@ -126,20 +116,16 @@ def get_cluster(self, name: str):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
name,
]
query_params = {"expand": ["creator"]}
flink_job = job.Job.from_response_json(
_client._send_request("GET", path_params, query_params=query_params),
self._project_id,
self._project_name,
_client._send_request("GET", path_params, query_params=query_params)
)

return flink_cluster.FlinkCluster(
flink_job, self._project_id, self._project_name
)
return flink_cluster.FlinkCluster(flink_job)

def _get_job(self, execution, job_id):
"""Get specific job from the specific execution of the flink cluster.
Expand Down
42 changes: 13 additions & 29 deletions python/hopsworks/core/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@


class JobsApi:
def __init__(
self,
project_id,
project_name,
):
self._project_id = project_id
self._project_name = project_name

def create_job(self, name: str, config: dict):
"""Create a new job or update an existing one.
Expand Down Expand Up @@ -57,17 +49,15 @@ def create_job(self, name: str, config: dict):
"""
_client = client.get_instance()

config = util.validate_job_conf(config, self._project_name)
config = util.validate_job_conf(config, _client._project_name)

path_params = ["project", self._project_id, "jobs", name]
path_params = ["project", _client._project_id, "jobs", name]

headers = {"content-type": "application/json"}
created_job = job.Job.from_response_json(
_client._send_request(
"PUT", path_params, headers=headers, data=json.dumps(config)
),
self._project_id,
self._project_name,
)
)
print(created_job.get_url())
return created_job
Expand All @@ -85,15 +75,13 @@ def get_job(self, name: str):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
name,
]
query_params = {"expand": ["creator"]}
return job.Job.from_response_json(
_client._send_request("GET", path_params, query_params=query_params),
self._project_id,
self._project_name,
_client._send_request("GET", path_params, query_params=query_params)
)

def get_jobs(self):
Expand All @@ -107,14 +95,12 @@ def get_jobs(self):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
]
query_params = {"expand": ["creator"]}
return job.Job.from_response_json(
_client._send_request("GET", path_params, query_params=query_params),
self._project_id,
self._project_name,
_client._send_request("GET", path_params, query_params=query_params)
)

def exists(self, name: str):
Expand Down Expand Up @@ -146,7 +132,7 @@ def get_configuration(self, type: str):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
type.lower(),
"configuration",
Expand All @@ -163,7 +149,7 @@ def _delete(self, job):
_client = client.get_instance()
path_params = [
"project",
self._project_id,
_client._project_id,
"jobs",
str(job.name),
]
Expand All @@ -182,20 +168,18 @@ def _update_job(self, name: str, config: dict):

config = util.validate_job_conf(config, self._project_name)

path_params = ["project", self._project_id, "jobs", name]
path_params = ["project", _client._project_id, "jobs", name]

headers = {"content-type": "application/json"}
return job.Job.from_response_json(
_client._send_request(
"PUT", path_params, headers=headers, data=json.dumps(config)
),
self._project_id,
self._project_name,
)
)

def _schedule_job(self, name, schedule_config):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]
path_params = ["project", _client._project_id, "jobs", name, "schedule", "v2"]
headers = {"content-type": "application/json"}
method = "PUT" if schedule_config["id"] else "POST"

Expand All @@ -207,7 +191,7 @@ def _schedule_job(self, name, schedule_config):

def _delete_schedule_job(self, name):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]
path_params = ["project", _client._project_id, "jobs", name, "schedule", "v2"]

return _client._send_request(
"DELETE",
Expand Down
17 changes: 5 additions & 12 deletions python/hopsworks/core/opensearch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@


class OpenSearchApi:
def __init__(
self,
project_id,
project_name,
):
self._project_id = project_id
self._project_name = project_name
def __init__(self):
self._variable_api = variable_api.VariableApi()

def _get_opensearch_url(self):
Expand Down Expand Up @@ -60,7 +54,8 @@ def get_project_index(self, index):
Returns:
A valid opensearch index name.
"""
return (self._project_name + "_" + index).lower()
_client = client.get_instance()
return (_client._project_name + "_" + index).lower()

def get_default_py_config(self):
"""
Expand Down Expand Up @@ -91,9 +86,7 @@ def get_default_py_config(self):
constants.OPENSEARCH_CONFIG.USE_SSL: True,
constants.OPENSEARCH_CONFIG.VERIFY_CERTS: True,
constants.OPENSEARCH_CONFIG.SSL_ASSERT_HOSTNAME: False,
constants.OPENSEARCH_CONFIG.CA_CERTS: client.get_instance()._get_ca_chain_path(
self._project_name
),
constants.OPENSEARCH_CONFIG.CA_CERTS: client.get_instance()._get_ca_chain_path(),
}

def _get_authorization_token(self):
Expand All @@ -106,7 +99,7 @@ def _get_authorization_token(self):
"""

_client = client.get_instance()
path_params = ["elastic", "jwt", self._project_id]
path_params = ["elastic", "jwt", _client._project_id]

headers = {"content-type": "application/json"}
return _client._send_request("GET", path_params, headers=headers)["token"]
6 changes: 3 additions & 3 deletions python/hopsworks/engine/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@


class ExecutionEngine:
def __init__(self, project_id=None):
self._dataset_api = dataset_api.DatasetApi(project_id)
self._execution_api = execution_api.ExecutionsApi(project_id)
def __init__(self):
self._dataset_api = dataset_api.DatasetApi()
self._execution_api = execution_api.ExecutionsApi()
self._log = logging.getLogger(__name__)

def download_logs(self, execution, path=None):
Expand Down
Loading

0 comments on commit 23802f8

Please sign in to comment.