diff --git a/cads_processing_api_service/config.py b/cads_processing_api_service/config.py index 7ed3a69..67b01a8 100644 --- a/cads_processing_api_service/config.py +++ b/cads_processing_api_service/config.py @@ -17,6 +17,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import random + import pydantic_settings API_REQUEST_TEMPLATE = """import cdsapi @@ -58,10 +61,20 @@ class Settings(pydantic_settings.BaseSettings): missing_dataset_title: str = "Dataset not available" anonymous_licences_message: str = ANONYMOUS_LICENCES_MESSAGE + download_nodes_config: str = "/etc/retrieve-api/download-nodes.config" + @property def profiles_api_url(self) -> str: return f"http://{self.profiles_service}:{self.profiles_api_service_port}" + @property + def data_volume(self) -> str: + data_volumes_config_path = self.download_nodes_config + with open(data_volumes_config_path) as fp: + data_volumes = [os.path.expandvars(line.rstrip("\n")) for line in fp] + data_volume = random.choice(data_volumes) + return data_volume + def ensure_settings( settings: Settings | None = None, diff --git a/cads_processing_api_service/utils.py b/cads_processing_api_service/utils.py index 4ee6b72..63a3999 100644 --- a/cads_processing_api_service/utils.py +++ b/cads_processing_api_service/utils.py @@ -18,6 +18,7 @@ import datetime import enum import threading +import urllib.parse from typing import Any, Callable, Mapping import cachetools @@ -470,6 +471,14 @@ def get_job_from_broker_db( return job +def update_results_href(href: str, data_volume: str | None = None) -> str: + if data_volume is None: + data_volume = config.ensure_settings().data_volume + file_path = urllib.parse.urlparse(href).path + results_href = urllib.parse.urljoin(data_volume, file_path) + return results_href + + def get_results_from_job( job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session ) -> dict[str, Any]: @@ -497,6 +506,8 @@ def get_results_from_job( if job_status == "successful": try: asset_value = job.cache_entry.result["args"][0] # type: ignore + if "href" in asset_value: + asset_value["href"] = update_results_href(asset_value["href"]) results = {"asset": {"value": asset_value}} except Exception: raise exceptions.JobResultsExpired( diff --git a/tests/test_30_utils.py b/tests/test_30_utils.py index 8da0b5a..e3cbbcb 100644 --- a/tests/test_30_utils.py +++ b/tests/test_30_utils.py @@ -244,6 +244,14 @@ def test_get_job_from_broker_db() -> None: job = utils.get_job_from_broker_db("1234", session=mock_session) +def test_update_results_href() -> None: + href = "http://base_path/results/1234" + data_volume = "http://data_volume/" + updated_href = utils.update_results_href(href, data_volume) + exp_updated_href = "http://data_volume/results/1234" + assert updated_href == exp_updated_href + + def test_get_results_from_job() -> None: mock_session = unittest.mock.Mock(spec=sqlalchemy.orm.Session) job = cads_broker.SystemRequest(