diff --git a/.github/workflows/orquestra-sdk-integration.yml b/.github/workflows/orquestra-sdk-integration.yml index 6e4ae884a..0110289be 100644 --- a/.github/workflows/orquestra-sdk-integration.yml +++ b/.github/workflows/orquestra-sdk-integration.yml @@ -49,7 +49,7 @@ jobs: source ./venv/bin/activate make github-actions-integration - - name: Run performance test + - name: Run integration test shell: bash run: | source ./venv/bin/activate diff --git a/.github/workflows/publish-cuda-docker-image.yml b/.github/workflows/publish-cuda-docker-image.yml index 81a32687e..066f5caba 100644 --- a/.github/workflows/publish-cuda-docker-image.yml +++ b/.github/workflows/publish-cuda-docker-image.yml @@ -3,14 +3,6 @@ name: Publish orquestra-sdk-base Docker Image with CUDA support on: workflow_dispatch: inputs: - sdk_requirement: - type: string - description: | - Orquestra SDK requirement to build the image with. - Example of using a published version of the SDK: - orquestra-sdk[ray]==0.63.0 - Example of using an unpublished version: - orquestra-sdk[ray] @ git+https://github.com/zapata-engineering/orquestra-sdk.git@v0.63.0#subdirectory=projects/orquestra-sdk docker_tag: type: string description: | @@ -81,6 +73,6 @@ jobs: # as on the command line `--build-arg`, one to a line: # docker_build_args: BUILD_ARG_ONE=value1\nBUILD_ARG_TWO=value2 # DUE TO JSON LIMITATIONS, NEWLINES MUST BE EXPLICITLY ADDED AS \n - docker_build_args: SDK_REQUIREMENT=${{ inputs.sdk_requirement }} + docker_build_args: # leave blank for linux/amd64 (recommended) target_platforms: '' diff --git a/.github/workflows/publish-docker-image.yml b/.github/workflows/publish-docker-image.yml index 3c3c1a755..36d17764f 100644 --- a/.github/workflows/publish-docker-image.yml +++ b/.github/workflows/publish-docker-image.yml @@ -3,14 +3,6 @@ name: Publish orquestra-sdk-base Docker Image on: workflow_dispatch: inputs: - sdk_requirement: - type: string - description: | - Orquestra SDK requirement to build the image with. - Example of using a published version of the SDK: - orquestra-sdk[ray]==0.63.0 - Example of using an unpublished version: - orquestra-sdk[ray] @ git+https://github.com/zapata-engineering/orquestra-sdk.git@v0.63.0#subdirectory=projects/orquestra-sdk docker_tag: type: string description: | @@ -79,6 +71,6 @@ jobs: # as on the command line `--build-arg`, one to a line: # docker_build_args: BUILD_ARG_ONE=value1\nBUILD_ARG_TWO=value2 # DUE TO JSON LIMITATIONS, NEWLINES MUST BE EXPLICITLY ADDED AS \n - docker_build_args: SDK_REQUIREMENT=${{ inputs.sdk_requirement }} + docker_build_args: # leave blank for linux/amd64 (recommended) target_platforms: 'linux/amd64,linux/arm64' diff --git a/projects/orquestra-sdk/docker/Dockerfile b/projects/orquestra-sdk/docker/Dockerfile index 8f4d53f47..c30b263fa 100644 --- a/projects/orquestra-sdk/docker/Dockerfile +++ b/projects/orquestra-sdk/docker/Dockerfile @@ -2,7 +2,6 @@ # Base image for running Orquestra tasks. # Published at hub.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base FROM python:3.11.6-slim-bullseye -ARG SDK_REQUIREMENT # Set by BuildKit ARG TARGETARCH @@ -38,7 +37,7 @@ RUN <' assumes that the is diff --git a/projects/orquestra-sdk/tests/integration/ray/test_integration.py b/projects/orquestra-sdk/tests/integration/ray/test_integration.py index 7007986a9..fd86e0654 100644 --- a/projects/orquestra-sdk/tests/integration/ray/test_integration.py +++ b/projects/orquestra-sdk/tests/integration/ray/test_integration.py @@ -947,8 +947,9 @@ def test_setting_resources( # Then calls = client.add_options.call_args_list - # We should only have two calls: our invocation and the aggregation step - assert len(calls) == 2 + # We should only have two calls: invocation aggregation step and aggregation + # error handling step + assert len(calls) == 3 # Checking our call did not have any resources included assert calls[0] == call( ANY, @@ -975,7 +976,7 @@ def test_setting_resources( None, None, { - "image:hub.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base:mocked": 1 # noqa: E501 + "image:hub.stage.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base:worker-1.0.0a1": 1 # noqa: E501 }, {}, ), @@ -983,7 +984,7 @@ def test_setting_resources( None, 1, { - "image:hub.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base:mocked-cuda": 1 # noqa: E501 + "image:hub.stage.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base:worker-1.0.0a1-cuda": 1 # noqa: E501 }, { "num_gpus": 1, @@ -1021,8 +1022,9 @@ def test_with_env_set( # Then calls = client.add_options.call_args_list - # We should only have two calls: our invocation and the aggregation step - assert len(calls) == 2 + # We should only have two calls: invocation and the aggregation step + # and error-handling aggregation step + assert len(calls) == 3 # Checking our call did not have any resources included assert calls[0] == call( ANY, @@ -1055,8 +1057,10 @@ def test_with_env_not_set( # Then calls = client.add_options.call_args_list - # We should only have two calls: our invocation and the aggregation step - assert len(calls) == 2 + # We should only have three calls: + # We should only have two calls: invocation and the aggregation step + # and error-handling aggregation step + assert len(calls) == 3 # Checking our call did not have any resources included assert calls[0] == call( ANY, diff --git a/projects/orquestra-sdk/tests/sdk/driver/test_client.py b/projects/orquestra-sdk/tests/sdk/driver/test_client.py index 3d25948ce..693ba40ea 100644 --- a/projects/orquestra-sdk/tests/sdk/driver/test_client.py +++ b/projects/orquestra-sdk/tests/sdk/driver/test_client.py @@ -4,6 +4,7 @@ """ Tests for orquestra.sdk._base._driver._client. """ +import warnings from datetime import timedelta from typing import Any, Dict, List, Optional from unittest.mock import Mock, create_autospec @@ -12,11 +13,12 @@ import pytest import responses from orquestra.workflow_shared._spaces._structs import ProjectRef -from orquestra.workflow_shared.schema.ir import WorkflowDef +from orquestra.workflow_shared.schema.ir import Version, WorkflowDef from orquestra.workflow_shared.schema.responses import JSONResult, PickleResult from orquestra.workflow_shared.schema.workflow_run import RunStatus, State, TaskRun import orquestra.sdk as sdk +from orquestra.sdk._client._base import _traversal from orquestra.sdk._client._base._driver import _exceptions from orquestra.sdk._client._base._driver._client import ( DriverClient, @@ -132,13 +134,23 @@ def task_run_id(self): def task_inv_id(self): return "00000000-0000-0000-0000-000000000000" + @pytest.fixture + def mask_sdk_version(self, monkeypatch): + monkeypatch.setattr( + _traversal, + "get_current_sdk_version", + lambda *_: Version( + original="0.66.0", major=0, minor=66, patch=0, is_prerelease=False + ), + ) + @pytest.fixture def status(self): return RunStatus(state=State.SUCCEEDED, start_time=None, end_time=None) @pytest.fixture def workflow_def(self): - @sdk.task + @sdk.task(custom_image="") def task(): return 1 @@ -146,7 +158,11 @@ def task(): def workflow(): return task() - return workflow().model + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", "Attempting to read a workflow definition" + ) + return workflow().model class TestWorkflowDefinitions: # ------ queries ------ @@ -454,6 +470,7 @@ def endpoint_mocker(endpoint_mocker_base, base_uri: str): ], ) def test_params_encoding( + mask_sdk_version, endpoint_mocker, client: DriverClient, workflow_def_id, diff --git a/projects/orquestra-sdk/tests/sdk/test_artifact_future_methods.py b/projects/orquestra-sdk/tests/sdk/test_artifact_future_methods.py index ce7caa015..028e73bf3 100644 --- a/projects/orquestra-sdk/tests/sdk/test_artifact_future_methods.py +++ b/projects/orquestra-sdk/tests/sdk/test_artifact_future_methods.py @@ -11,6 +11,7 @@ import orquestra.sdk as sdk from orquestra.sdk._client._base import _workflow +from orquestra.sdk._client._base._docker_images import DEFAULT_WORKER_IMAGE from orquestra.sdk._client._base._dsl import ArtifactFuture _TaskResourcesArgs = t.TypedDict( @@ -331,7 +332,7 @@ def test_artifact_with_metadata_workflow_model(): assert invocations[0].custom_image == CUSTOM_IMAGE_NOT_DEFAULT["custom_image"] assert invocations[1].resources is None - assert invocations[1].custom_image is None + assert invocations[1].custom_image == DEFAULT_WORKER_IMAGE @staticmethod def test_artifact_with_resources_workflow_model(): @@ -350,7 +351,7 @@ def test_artifact_with_resources_workflow_model(): assert invocations[0].resources.model_dump() == TASK_RESOURCES_EXPECTED assert invocations[1].resources is None - assert invocations[1].custom_image == invocations[0].custom_image + assert f"{invocations[1].custom_image}-cuda" == f"{invocations[0].custom_image}" @staticmethod def test_artifact_with_custom_image_workflow_model(): @@ -368,7 +369,7 @@ def test_artifact_with_custom_image_workflow_model(): assert invocations[0].custom_image == CUSTOM_IMAGE_NOT_DEFAULT["custom_image"] assert invocations[1].resources == invocations[0].resources - assert invocations[1].custom_image is None + assert invocations[1].custom_image == f"{DEFAULT_WORKER_IMAGE}" @staticmethod def test_artifact_with_resources_and_custom_image_workflow_model(): @@ -388,4 +389,4 @@ def test_artifact_with_resources_and_custom_image_workflow_model(): assert invocations[0].custom_image == CUSTOM_IMAGE_NOT_DEFAULT["custom_image"] assert invocations[1].resources is None - assert invocations[1].custom_image is None + assert invocations[1].custom_image == f"{DEFAULT_WORKER_IMAGE}" diff --git a/projects/orquestra-sdk/tests/sdk/test_traversal.py b/projects/orquestra-sdk/tests/sdk/test_traversal.py index 8087484b1..3ce900457 100644 --- a/projects/orquestra-sdk/tests/sdk/test_traversal.py +++ b/projects/orquestra-sdk/tests/sdk/test_traversal.py @@ -43,6 +43,18 @@ wf_pass_obj_with_num_from_task, ) +pytestmark = pytest.mark.filterwarnings( + "ignore::pytest.PytestUnraisableExceptionWarning", + "ignore::orquestra.workflow_shared.exceptions.VersionMismatch", +) + + +@pytest.fixture(autouse=True) +def mask_sdk_version(monkeypatch): + mocked_installed_version = Mock(return_value="0.66.0") + monkeypatch.setattr(_versions, "get_installed_version", mocked_installed_version) + monkeypatch.setattr(git.Repo, "is_dirty", Mock(return_value=False)) + # --- Tasks used in tests --- # @_dsl.task() diff --git a/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_build_workflow.py b/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_build_workflow.py index 20866fa23..e36ea824d 100644 --- a/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_build_workflow.py +++ b/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_build_workflow.py @@ -6,7 +6,6 @@ import re import time import typing as t -import warnings from functools import singledispatch from pathlib import Path @@ -34,15 +33,6 @@ from ._ray_settings import VENV_SETUP_TIMEOUT_SECONDS from ._wf_metadata import InvUserMetadata, pydatic_to_json_dict -DEFAULT_IMAGE_TEMPLATE = "hub.nexus.orquestra.io/zapatacomputing/orquestra-sdk-base:{}" - - -def _get_default_image(template: str, sdk_version: str, num_gpus: t.Optional[int]): - image = template.format(sdk_version) - if num_gpus is not None and num_gpus > 0: - image = f"{image}-cuda" - return image - def _arg_from_graph(argument_id: ir.ArgumentId, workflow_def: ir.WorkflowDef): try: @@ -403,7 +393,13 @@ def _(imp: ir.GitImport): "" if imp.package_name is None else f"{imp.package_name}{extras_string} @ " ) - return [f"{package_name_string}{url_string}"] + # TODO ORQSDK-1064: we should fully support subdirectories as projects + if url == "git+https://github.com/zapata-engineering/orquestra-sdk.git": + return [ + f"{package_name_string}{url_string}#subdirectory=projects/orquestra-sdk" + ] + else: + return [f"{package_name_string}{url_string}"] def _import_pip_env( @@ -420,9 +416,7 @@ def _import_pip_env( specific import Returns: - A list consisting of the python imports declared in the task definition, and the - current Orquestra SDK version. The latter is included to prevent tasks from - executing with different SDK versions to the head node. + A list consisting of the python imports declared in the task definition. """ task_def = wf.tasks[ir_invocation.task_id] imports = [ @@ -433,32 +427,7 @@ def _import_pip_env( ) ] - current_sdk_version: str = get_installed_version("orquestra-sdk") - - sdk_dependency = None - pip_list = [ - chunk - for imp in imports - for chunk in imports_pip_string[imp.id] - if not (sdk_dependency := re.match(r"^orquestra-sdk([<|!|=|>|~].*)?$", chunk)) - ] - - # If the task definition includes the SDK, warn the user that this does nothing. - if sdk_dependency: - warnings.warn( - f"The definition for task `{ir_invocation.task_id}` " - f"declares `{sdk_dependency[0]}` as a dependency. " - "The current SDK version " - + (f"({current_sdk_version}) " if current_sdk_version else "") - + "is automatically installed in task environments. " - "The specified dependency will be ignored.", - exceptions.OrquestraSDKVersionMismatchWarning, - ) - - # Don't add sdk dependency if submitting from a prerelease or dev version. - parsed_sdk_version = version.parse(current_sdk_version) - if not (parsed_sdk_version.is_devrelease or parsed_sdk_version.is_prerelease): - pip_list += [f"orquestra-sdk=={current_sdk_version}"] + pip_list = [chunk for imp in imports for chunk in imports_pip_string[imp.id]] return pip_list @@ -608,19 +577,17 @@ def make_ray_dag( # Set custom image if os.getenv(RAY_SET_CUSTOM_IMAGE_RESOURCES_ENV) is not None: - # This makes an assumption that only "new" IRs will get to this point - assert workflow_def.metadata is not None, "Expected a >=0.45.0 IR" - sdk_version = workflow_def.metadata.sdk_version.original - # Custom "Ray resources" request. The entries need to correspond to the ones # used when starting the Ray cluster. See also: # https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + # Since 0.67 we set custom image on every invocation in the client side, + # so this should happen + assert invocation.custom_image, ( + f"invocation {invocation.id} had empty " + f"custom image field. Please report this as a bug." + ) ray_options["resources"] = _ray_resources_for_custom_image( invocation.custom_image - or user_task.custom_image - or _get_default_image( - DEFAULT_IMAGE_TEMPLATE, sdk_version, ray_options.get("num_gpus") - ) ) ray_result = _make_ray_dag_node( @@ -638,6 +605,15 @@ def make_ray_dag( for output_id in invocation.output_ids: ray_futures[output_id] = ray_result + runtime_version = get_installed_version("orquestra-workflow-runtime") + parsed_runtime_version = version.parse(runtime_version) + if not ( + parsed_runtime_version.is_devrelease or parsed_runtime_version.is_prerelease + ): + runtime_pip_string = f"orquestra-workflow-runtime[all]=={runtime_version}" + else: + runtime_pip_string = "orquestra-workflow-runtime[all]" + # Gather futures for the last, fake task, and decide what args we need to unwrap. pos_args, pos_args_artifact_nodes = _gather_args( workflow_def.output_ids, workflow_def, ray_futures @@ -649,7 +625,7 @@ def make_ray_dag( ray_options={ "name": None, "metadata": None, - "runtime_env": None, + "runtime_env": _client.RuntimeEnv(pip=[runtime_pip_string]), "catch_exceptions": True, # Set to avoid retrying when the worker crashes. # See the comment with the invocation's options for more details. @@ -688,7 +664,15 @@ def handle_data_aggregation_error(result: t.Tuple[t.Any, Exception]): else: return result[0] - return handle_data_aggregation_error.bind(last_future) + error = client.add_options( + handle_data_aggregation_error, + name="data_aggregation_error_handler", + metadata=None, + max_retries=0, + catch_exceptions=False, + runtime_env=_client.RuntimeEnv(pip=[runtime_pip_string]), + ) + return error.bind(last_future) def get_current_ids() -> ( diff --git a/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_client.py b/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_client.py index 00b07ab6a..7153c0f8b 100644 --- a/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_client.py +++ b/projects/orquestra-workflow-runtime/src/orquestra/workflow_runtime/_ray/_client.py @@ -110,7 +110,7 @@ def add_options( ray_remote_fn, *, name: str, - metadata: t.Dict[str, t.Any], + metadata: t.Optional[t.Dict[str, t.Any]], runtime_env: t.Optional[RuntimeEnv], catch_exceptions: t.Optional[bool], max_retries: int, diff --git a/projects/orquestra-workflow-runtime/tests/data/.gitignore b/projects/orquestra-workflow-runtime/tests/data/.gitignore new file mode 100644 index 000000000..e8e51b44a --- /dev/null +++ b/projects/orquestra-workflow-runtime/tests/data/.gitignore @@ -0,0 +1,2 @@ +# When running runtime tests, we generate bunch of IR json files in this directory +*.json diff --git a/projects/orquestra-workflow-runtime/tests/test_build_workflow.py b/projects/orquestra-workflow-runtime/tests/test_build_workflow.py index 6f172c792..f6941302e 100644 --- a/projects/orquestra-workflow-runtime/tests/test_build_workflow.py +++ b/projects/orquestra-workflow-runtime/tests/test_build_workflow.py @@ -6,7 +6,6 @@ import pydantic import pytest from orquestra.workflow_shared import parse_git_url, serde -from orquestra.workflow_shared._graphs import iter_invocations_topologically from orquestra.workflow_shared.schema import ir from orquestra.workflow_shared.schema.ir import GitURL, SecretNode, WorkflowDef from orquestra.workflow_shared.schema.responses import WorkflowResult @@ -472,174 +471,3 @@ def test_import_pip_string_resolved_once_per_import( wf_def = pydantic.TypeAdapter(WorkflowDef).validate_json(json.load(f)) _ = _build_workflow.make_ray_dag(client, wf_def, wf_run_id, False) assert pip_string.mock_calls == [call(imp) for imp in wf_def.imports.values()] - - -@pytest.mark.parametrize( - "installed_sdk_version, expected_sdk_dependency", - [ - ("0.57.1.dev3+g3ef9f57.d20231003", None), - ("1.2.3", "1.2.3"), - ("0.1.dev1+g25df81e", None), - ], -) -class TestHandlingSDKVersions: - """``_import_pip_env`` handles adding the current SDK version as a dependency. - - Note that these tests don't mock serde - we're interested in whether the correct - package list gets constructed so we can't just return 'mocked' for imports. - """ - - @staticmethod - def test_with_no_dependencies( - installed_sdk_version: str, - expected_sdk_dependency: str, - monkeypatch: pytest.MonkeyPatch, - ): - import orquestra.workflow_shared.packaging._versions - - # Given - monkeypatch.setattr( - _build_workflow, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - monkeypatch.setattr( - orquestra.workflow_shared.packaging._versions, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - path_to_json = Path(__file__).parent.joinpath("data/simple_wf.json") - - with open(path_to_json, "r", encoding="utf-8") as f: - wf = pydantic.TypeAdapter(WorkflowDef).validate_json(json.load(f)) - task_inv = [inv for inv in iter_invocations_topologically(wf)][0] - imports = { - id: _build_workflow._pip_string(imp) for id, imp in wf.imports.items() - } - - # When - pip = _build_workflow._import_pip_env(task_inv, wf, imports) - - # Then - if expected_sdk_dependency: - assert pip == [f"orquestra-sdk=={expected_sdk_dependency}"] - else: - assert pip == [] - - @staticmethod - def test_with_multiple_dependencies( - installed_sdk_version: str, - expected_sdk_dependency: str, - monkeypatch: pytest.MonkeyPatch, - ): - # Given - python_import = "MarkupSafe" - import_version = "==1.0.0" - import orquestra.workflow_shared.packaging._versions - - monkeypatch.setattr( - _build_workflow, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - monkeypatch.setattr( - orquestra.workflow_shared.packaging._versions, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - path_to_json = Path(__file__).parent.joinpath("data/wildcard.json") - - with open(path_to_json, "r", encoding="utf-8") as f: - wildcard_workflow_def = json.load(f) - wildcard_workflow_def = wildcard_workflow_def.replace( - "", python_import - ) - wildcard_workflow_def = wildcard_workflow_def.replace( - "", import_version - ) - wf = pydantic.TypeAdapter(WorkflowDef).validate_json(wildcard_workflow_def) - - task_inv = [inv for inv in iter_invocations_topologically(wf)][0] - imports = { - id: _build_workflow._pip_string(imp) for id, imp in wf.imports.items() - } - - # When - pip = _build_workflow._import_pip_env(task_inv, wf, imports) - - # Then - if expected_sdk_dependency: - assert sorted(pip) == sorted( - [f"{python_import}{import_version}"] - + [f"orquestra-sdk=={expected_sdk_dependency}"] - ) - else: - assert sorted(pip) == sorted([f"{python_import}{import_version}"]) - - @pytest.mark.parametrize( - "sdk_import", - [ - "orquestra-sdk", - "orquestra-sdk>=1.2.3", - "orquestra-sdk<=1.2.3", - "orquestra-sdk~=1.2.3", - "orquestra-sdk==1.2.3", - "orquestra-sdk!=1.2.3", - "orquestra-sdk===1.2.3", - "orquestra-sdk >= 1.2.3", - "orquestra-sdk <= 1.2.3", - "orquestra-sdk ~= 1.2.3", - "orquestra-sdk == 1.2.3", - "orquestra-sdk != 1.2.3", - "orquestra-sdk === 1.2.3", - ], - ) - class TestHandlesSDKDependency: - @staticmethod - @pytest.mark.filterwarnings("ignore:The definition for task ") - def test_replaces_declared_sdk_dependency( - sdk_import, - installed_sdk_version: str, - expected_sdk_dependency: str, - monkeypatch: pytest.MonkeyPatch, - ): - # Given - sdk_version = sdk_import.replace("orquestra-sdk", "").strip() - import orquestra.workflow_shared.packaging._versions - - monkeypatch.setattr( - _build_workflow, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - monkeypatch.setattr( - orquestra.workflow_shared.packaging._versions, - "get_installed_version", - Mock(return_value=installed_sdk_version), - ) - path_to_json = Path(__file__).parent.joinpath("data/wildcard.json") - with open(path_to_json, "r", encoding="utf-8") as f: - wildcard_workflow_def = json.load(f) - wildcard_workflow_def = wildcard_workflow_def.replace( - "", "orquestra-sdk" - ) - wildcard_workflow_def = wildcard_workflow_def.replace( - "", sdk_version - ) - wf = pydantic.TypeAdapter(WorkflowDef).validate_json( - wildcard_workflow_def - ) - task_inv = [inv for inv in iter_invocations_topologically(wf)][0] - imports = { - id: _build_workflow._pip_string(imp) for id, imp in wf.imports.items() - } - - # When - pip = _build_workflow._import_pip_env(task_inv, wf, imports) - - # Then - if expected_sdk_dependency: - assert pip == [f"orquestra-sdk=={expected_sdk_dependency}"] - - else: - assert pip == [] diff --git a/projects/orquestra-workflow-runtime/tests/test_integration.py b/projects/orquestra-workflow-runtime/tests/test_integration.py index f21a098de..39c413a51 100644 --- a/projects/orquestra-workflow-runtime/tests/test_integration.py +++ b/projects/orquestra-workflow-runtime/tests/test_integration.py @@ -23,21 +23,8 @@ @pytest.fixture(autouse=True) -def patch_orquestra_version(monkeypatch): - import orquestra.workflow_shared.packaging._versions - - import orquestra.workflow_runtime._ray._build_workflow - - monkeypatch.setattr( - orquestra.workflow_shared.packaging._versions, - "get_installed_version", - lambda _: "0.64.0", - ) - monkeypatch.setattr( - orquestra.workflow_runtime._ray._build_workflow, - "get_installed_version", - lambda _: "0.64.0", - ) +def set_orq_envs(monkeypatch): + monkeypatch.setenv(name="ORQ_RAY_DOWNLOAD_GIT_IMPORTS", value="1") @pytest.fixture(scope="module") diff --git a/projects/orquestra-workflow-shared/src/orquestra/workflow_shared/schema/ir.py b/projects/orquestra-workflow-shared/src/orquestra/workflow_shared/schema/ir.py index 2baa352c4..1cfb808e3 100644 --- a/projects/orquestra-workflow-shared/src/orquestra/workflow_shared/schema/ir.py +++ b/projects/orquestra-workflow-shared/src/orquestra/workflow_shared/schema/ir.py @@ -384,6 +384,8 @@ class Version(BaseModel): class WorkflowMetadata(BaseModel): sdk_version: Version python_version: Version + # new field added in 0.67. Default to None to allow parsing older IRs + head_node_image: t.Optional[str] = None class WorkflowDef(BaseModel): @@ -434,7 +436,11 @@ def sdk_version_up_to_date(cls, v: t.Optional[WorkflowMetadata]): from ..packaging import _versions from ..schema import _compat - current_version = _versions.get_current_sdk_version() + try: + current_version = _versions.get_current_sdk_version() + except exceptions.BaseRuntimeError: + # no SDK installed locally, so nothing to validate. + return v if v is None: warnings.warn( @@ -454,18 +460,19 @@ def sdk_version_up_to_date(cls, v: t.Optional[WorkflowMetadata]): if not _compat.versions_are_compatible( generated_at=v.sdk_version, current=current_version ): - warnings.warn( - exceptions.VersionMismatch( - ( - "Attempting to read a workflow definition generated with a " - "different version of Orquestra Workflow SDK. " - "Please consider re-running your workflow or installing " - f"'orquestra-sdk=={v.sdk_version.original}'. " - "For more information visit: https://docs.orquestra.io/docs/core/sdk/guides/version-compatibility.html" # noqa: E501 - ), - actual=current_version, - needed=v.sdk_version, + if v.sdk_version: + warnings.warn( + exceptions.VersionMismatch( + ( + "Attempting to read a workflow definition generated with a " + "different version of Orquestra Workflow SDK. " + "Please consider re-running your workflow or installing " + f"'orquestra-sdk=={v.sdk_version.original}'. " + "For more information visit: https://docs.orquestra.io/docs/core/sdk/guides/version-compatibility.html" # noqa: E501 + ), + actual=current_version, + needed=v.sdk_version, + ) ) - ) return v