diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 4be2201..7b9dafb 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -2,30 +2,14 @@ name: Integration tests on: pull_request: - workflow_call: jobs: - integration-test-microk8s: - name: Integration tests (microk8s) - strategy: - fail-fast: false - matrix: - tox-environments: - - integration-charm - - integration-scaling - - integration-upgrades - - integration-vault - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Setup operator environment - uses: charmed-kubernetes/actions-operator@main - with: - juju-channel: 3.1/stable - provider: microk8s - microk8s-addons: "ingress storage dns rbac registry" - channel: 1.25-strict/stable - - name: Run integration tests - # set a predictable model name so it can be consumed by charm-logdump-action - run: tox -e ${{ matrix.tox-environments }} + integration-tests: + uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main + secrets: inherit + with: + channel: 1.25-strict/stable + modules: '["test_charm.py", "test_scaling.py", "test_vault.py"]' + juju-channel: 3.1/stable + self-hosted-runner: false + microk8s-addons: "dns ingress rbac storage metallb:10.15.119.2-10.15.119.4 registry" diff --git a/.gitignore b/.gitignore index d1cb8f6..7f47c5a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ __pycache__/ .vscode/ *.whl *.tar.gz +*.rock +*.lock diff --git a/.trivyignore b/.trivyignore new file mode 100644 index 0000000..bd5cfa1 --- /dev/null +++ b/.trivyignore @@ -0,0 +1,6 @@ +# TODO (kelkawi-a): remove these once pebble CVE resolution propagates down the pipeline +CVE-2023-45288 +CVE-2024-27308 +# TODO (kelkawi-a): remove these once Temporal pushes the latest SDK build to pip +CVE-2024-32650 +CVE-2024-24790 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 165d6f5..4795e1a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,6 +35,9 @@ deployment, follow the following steps: # Install charmcraft from snap: sudo snap install charmcraft --classic + # Install Rockcraft from snap: + sudo snap install rockcraft --classic + # Add the 'ubuntu' user to the Microk8s group: sudo usermod -a -G microk8s ubuntu @@ -45,7 +48,7 @@ deployment, follow the following steps: newgrp microk8s # Enable the necessary Microk8s addons: - microk8s enable hostpath-storage dns + microk8s enable hostpath-storage dns registry # Install the Juju CLI client, juju: sudo snap install juju --classic @@ -60,20 +63,17 @@ deployment, follow the following steps: juju model-config logging-config="=INFO;unit=DEBUG" # Pack the charm: - charmcraft pack [--destructive-mode] + charmcraft pack - # Build wheel file: - cd resource_sample && poetry build -f wheel + # Build ROCK file and push it to local registry: + cd resource_sample_py && make build_rock # Deploy the charm: - juju deploy ./temporal-worker-k8s_ubuntu-22.04-amd64.charm --resource temporal-worker-image=python:3.8.2-slim-buster + juju deploy ./temporal-worker-k8s_ubuntu-22.04-amd64.charm --resource temporal-worker-image=localhost:32000/temporal-worker-rock juju config temporal-worker-k8s --file=path/to/config.yaml - # Attach wheel file resource: - juju attach-resource temporal-worker-k8s workflows-file=./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl - # Check progress: - juju status --relations --watch 1s + juju status --relations --watch 2s juju debug-log # Clean-up before retrying: diff --git a/README.md b/README.md index 578066d..ce0e9be 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # Temporal Worker K8s Operator -This is the Kubernetes Python Operator for the Temporal worker. +This is the Kubernetes Python Operator for the Temporal Worker. ## Description @@ -13,18 +13,29 @@ execution of services and applications (using workflows). Use Workflow as Code (TM) to build and operate resilient applications. Leverage developer friendly primitives and avoid fighting your infrastructure -This operator provides a Temporal worker, and consists of Python scripts which +This operator provides a Temporal Worker, and consists of Python scripts which connect to a deployed Temporal server. ## Usage ### Deploying -The Temporal worker operator can be deployed and connected to a deployed +To deploy the Temporal Worker operator, you can start by creating a Temporal +workflow, or use the one provided in +[`resource_sample_py`](./resource_sample_py/). Once done, the project can be +built as a [ROCK](https://documentation.ubuntu.com/rockcraft/en/stable/) and +pushed to the [local registry](https://microk8s.io/docs/registry-built-in) by +running the following command inside the `resource_sample_py` directory: + +```bash +make build_rock +``` + +The Temporal Worker operator can then be deployed and connected to a deployed Temporal server using the Juju command line as follows: ```bash -juju deploy temporal-worker-k8s +juju deploy temporal-worker-k8s --resource temporal-worker-image=localhost:32000/temporal-worker-rock juju config temporal-worker-k8s --file=path/to/config.yaml ``` @@ -35,63 +46,44 @@ temporal-worker-k8s: host: "localhost:7233" # Replace with Temporal server hostname queue: "test-queue" namespace: "test" - workflows-file-name: "python_samples-1.1.0-py3-none-any.whl" - # To support all defined workflows and activities, use the 'all' keyword - supported-workflows: "all" - supported-activities: "all" ``` -### Attaching "workflows-file" resource - -The Temporal worker operator expects a "workflows-file" resource to be attached -after deployment, which contains a set of defined Temporal workflows and -activities as defined in the [resource_sample](./resource_sample/) directory. -The structure of the built wheel file must follow the same structure: +Once done, the charm should enter an active state, indicating that the worker is +running successfully. To verify this, you can check the logs of the juju unit to +ensure there are no errors with the workload container: +```bash +juju ssh --container temporal-worker temporal-worker-k8s/0 /charm/bin/pebble logs temporal-worker -f ``` -- workflows/ - - workflow-a.py - - workflow-b.py -- activities/ - - activity-a.py - - activity-b.py -- some_other_directory/ -- some_helper_file.py -``` -The sample wheel file can be built by running `poetry build -f wheel` in the -[resource_sample](./resource_sample/) directory. +Note: The only requirement for the ROCK is to have a `scripts/start-worker.sh` +file, which will be used as the entry point for the charm to start the workload +container. + +### Adding Environment Variables -Once ready, the resource can be attached as follows: +The Temporal Worker operator can be used to inject environment variables that +can be ingested by your workflows. This can be done using the Juju command line as follows: ```bash -make -C resource_sample/ build -juju attach-resource temporal-worker-k8s workflows-file=./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl +juju attach-resource temporal-worker-k8s env-file=path/to/.env ``` -Once done, the charm should enter an active state, indicating that the worker is -running successfully. To verify this, you can check the logs of the kubernetes -pod to ensure there are no errors with the workload container: +#### **`.env`** -```bash -kubectl -n logs temporal-worker-k8s-0 -c temporal-worker -f ``` - -Note: Files defined under the "workflows" directory must only contain classes -decorated using the `@workflow.defn` decorator. Files defined under the -"activities" directory must only contain methods decorated using the -`@activity.defn` decorator. Any additional methods or classes needed should be -defined in other files. +VALUE=123 +``` ## Verifying -To verify that the setup is running correctly, run `juju status --watch 1s` and +To verify that the setup is running correctly, run `juju status --watch 2s` and ensure the pod is active. To run a basic workflow, you may use a simple client (e.g. [sdk-python sample](https://github.com/temporalio/sdk-python#quick-start)) and connect to the same Temporal server. If run on the same namespace and task queue -as the Temporal worker, it should be executed successfully. +as the Temporal Worker, it should be executed successfully. ## Scaling @@ -103,7 +95,7 @@ juju scale-application temporal-worker-k8s ## Error Monitoring -The Temporal worker operator has a built-in Sentry interceptor which can be used +The Temporal Worker operator has a built-in Sentry interceptor which can be used to intercept and capture errors from the Temporal SDK. To enable it, run the following commands: @@ -115,7 +107,7 @@ juju config temporal-worker-k8s sentry-environment="staging" ## Observability -The Temporal worker operator charm can be related to the +The Temporal Worker operator charm can be related to the [Canonical Observability Stack](https://charmhub.io/topics/canonical-observability-stack) in order to collect logs and telemetry. To deploy cos-lite and expose its endpoints as offers, follow these steps: @@ -147,7 +139,7 @@ juju run grafana/0 -m cos get-admin-password --wait 1m ## Vault -The Temporal worker operator charm can be related to the +The Temporal Worker operator charm can be related to the [Vault operator charm](https://charmhub.io/vault-k8s) to securely store credentials that can be accessed by workflows. This is the recommended way of storing workflow-related credentials in production environments. To enable this, @@ -165,7 +157,7 @@ instructions found [here](https://charmhub.io/vault-k8s/docs/h-getting-started). For a reference on how to access credentials from Vault through the workflow code, -[`activity2.py`](./resource_sample/resource_sample/activities/activity2.py) +[`activity2.py`](./resource_sample_py/resource_sample/activities/activity2.py) under the `resource_sample` directory shows a sample for writing and reading secrets in Vault. diff --git a/config.yaml b/config.yaml index e5e82ea..5e1663f 100644 --- a/config.yaml +++ b/config.yaml @@ -31,16 +31,6 @@ options: default: "" type: string - supported-workflows: - description: Comma-separated list of workflow names to extract from attached wheel file. - default: "" - type: string - - supported-activities: - description: Comma-separated list of workflow activities to extract from attached wheel file. - default: "" - type: string - sentry-dsn: description: Sentry Data Source Name to send events to. default: "" @@ -69,11 +59,6 @@ options: default: 1.0 type: float - workflows-file-name: - description: Name of the wheel file resource attached to the charm. - default: "" - type: string - encryption-key: description: Base64-encoded key used for data encryption. default: "" @@ -161,18 +146,3 @@ options: description: Client certificate URL for OIDC authentication. default: "" type: string - - http-proxy: - description: Used to set HTTP_PROXY environment variable. - default: "" - type: string - - https-proxy: - description: Used to set HTTPS_PROXY environment variable. - default: "" - type: string - - no-proxy: - description: Used to set NO_PROXY environment variable. - default: "" - type: string diff --git a/metadata.yaml b/metadata.yaml index ee950bd..3dab5f9 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -35,17 +35,11 @@ peers: containers: temporal-worker: resource: temporal-worker-image - # Included for simplicity in integration tests. - upstream-source: python:3.8.2-slim-buster resources: temporal-worker-image: type: oci-image description: OCI image containing Python package. - workflows-file: - type: file - description: Wheel file containing Temporal workflows and activities. - filename: 'workflows-file.whl' env-file: type: file description: .env file containing environment variables to be sourced to the workload container. diff --git a/resource_sample/resource_sample/workflows/workflow1.py b/resource_sample/resource_sample/workflows/workflow1.py deleted file mode 100644 index 901f524..0000000 --- a/resource_sample/resource_sample/workflows/workflow1.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -from datetime import timedelta - -from temporalio import workflow -from temporalio.common import RetryPolicy -from temporalio.exceptions import FailureError - -from pathlib import Path -import sys -path_root = Path(__file__).parents[2] -sys.path.append(str(path_root)) - -with workflow.unsafe.imports_passed_through(): - import resource_sample.activities.activity1 as all_activities -from datetime import timedelta - -# Basic workflow that logs and invokes an activity -@workflow.defn(name="GreetingWorkflow") -class GreetingWorkflow: - @workflow.run - async def run(self, name: str) -> str: - workflow.logger.info("Running workflow with parameter %s" % name) - return await workflow.execute_activity( - all_activities.compose_greeting, - all_activities.ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), - ) diff --git a/resource_sample/resource_sample/workflows/workflow2.py b/resource_sample/resource_sample/workflows/workflow2.py deleted file mode 100644 index 616275d..0000000 --- a/resource_sample/resource_sample/workflows/workflow2.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -from datetime import timedelta - -from temporalio import workflow -from temporalio.common import RetryPolicy -from temporalio.exceptions import FailureError - -from pathlib import Path -import sys -path_root = Path(__file__).parents[2] -sys.path.append(str(path_root)) - -with workflow.unsafe.imports_passed_through(): - import resource_sample.activities.activity2 as all_activities -from datetime import timedelta - -# Basic workflow that logs and invokes an activity -@workflow.defn(name="VaultWorkflow") -class VaultWorkflow: - @workflow.run - async def run(self, name: str) -> str: - workflow.logger.info("Running workflow with parameter %s" % name) - return await workflow.execute_activity( - all_activities.vault_test, - all_activities.ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), - ) diff --git a/resource_sample/Makefile b/resource_sample_py/Makefile similarity index 84% rename from resource_sample/Makefile rename to resource_sample_py/Makefile index e42b5d4..4619009 100644 --- a/resource_sample/Makefile +++ b/resource_sample_py/Makefile @@ -3,13 +3,17 @@ # Makefile to help automate tasks +# The name of the python package/project +PY_PACKAGE := resource_sample + +# ROCK build parameters +ROCK_NAME := temporal-worker_1.0_amd64.rock +IMAGE_NAME := temporal-worker-rock:latest + # build and dist folders BUILD := build DIST := dist -# The name of the python package/project -PY_PACKAGE := resource_sample - # Paths to venv executables POETRY := poetry PY := python3 @@ -42,11 +46,9 @@ install-dev: .PHONY: lint lint: ## Run linter $(POETRY) run $(FLAKE8) $(PY_PACKAGE) tests - $(POETRY) run $(PYLINT) $(PY_PACKAGE) tests --disable=E0401,W1203,W0613,W0718,R0903,W1514,C0301,C0103,C0104,R0914,R0913,W0719,R0902 $(POETRY) run $(ISORT) --check $(PY_PACKAGE) tests $(POETRY) run $(BLACK) --check $(PY_PACKAGE) tests $(POETRY) run $(BANDIT) --configfile pyproject.toml --quiet --recursive $(PY_PACKAGE) tests - $(POETRY) run $(PYDOCSTYLE) $(PY_PACKAGE) .PHONY: fmt fmt: ## Reformat code for linter @@ -71,6 +73,14 @@ changelog: ## Add a new entry to the Changelog and bump the package version build: ## Create a Python source distribution and a wheel in dist $(POETRY) build +.PHONY: build_rock +build_rock: + rockcraft pack + rockcraft.skopeo --insecure-policy copy oci-archive:$(ROCK_NAME) docker-daemon:$(IMAGE_NAME) + IMAGE_ID=$$(docker inspect --format='{{.Id}}' $(IMAGE_NAME)); \ + docker tag $$IMAGE_ID localhost:32000/$(IMAGE_NAME) + docker push localhost:32000/$(IMAGE_NAME) + .PHONY: publish publish: ## Publish the package to PYPI $(POETRY) publish diff --git a/resource_sample_py/README.md b/resource_sample_py/README.md new file mode 100644 index 0000000..672bea5 --- /dev/null +++ b/resource_sample_py/README.md @@ -0,0 +1,23 @@ +# Sample ROCK project + +This is a sample +[ROCK image](https://documentation.ubuntu.com/rockcraft/en/stable/explanation/rocks/#rocks-explanation) +that can be used to build Python-based Temporal workflows. + +To work with the charm, the root directory must include a +`scripts/start-worker.sh` file, with a command that would start your +asynchronous Temporal worker. + +To test the worker locally, export the relevant environment variables found in +[`rockcraft.yaml`](./rockcraft.yaml) and start the worker by running +`poetry run python resource_sample/worker.py`. + +To build the ROCK image, you must enable a local registry as outlined in +[`CONTRIBUTING.md`](../CONTRIBUTING.md). You can then run `make build_rock` to +build the ROCK and push it to a local registry. + +To start the image, you can run the following command: + +```bash +docker run -d --name temporal-worker -p 8088:8088 localhost:32000/temporal-worker-rock start temporal-worker +``` diff --git a/resource_sample/pyproject.toml b/resource_sample_py/pyproject.toml similarity index 100% rename from resource_sample/pyproject.toml rename to resource_sample_py/pyproject.toml diff --git a/src/resources/temporal_client/__init__.py b/resource_sample_py/resource_sample/activities/__init__.py similarity index 58% rename from src/resources/temporal_client/__init__.py rename to resource_sample_py/resource_sample/activities/__init__.py index 3b8510e..58ee7de 100644 --- a/src/resources/temporal_client/__init__.py +++ b/resource_sample_py/resource_sample/activities/__init__.py @@ -2,4 +2,4 @@ # See LICENSE file for licensing details. -"""Temporal client sample workflows and activities.""" +"""Temporal activities.""" diff --git a/resource_sample/resource_sample/activities/activity1.py b/resource_sample_py/resource_sample/activities/activity1.py similarity index 79% rename from resource_sample/resource_sample/activities/activity1.py rename to resource_sample_py/resource_sample/activities/activity1.py index b62d05d..2bdb3c9 100644 --- a/resource_sample/resource_sample/activities/activity1.py +++ b/resource_sample_py/resource_sample/activities/activity1.py @@ -1,9 +1,9 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +from common.messages import ComposeGreetingInput from temporalio import activity -from dataclasses import dataclass -from resource_sample.common.messages import ComposeGreetingInput + # Basic activity that logs and does string concatenation @activity.defn(name="compose_greeting") diff --git a/resource_sample/resource_sample/activities/activity2.py b/resource_sample_py/resource_sample/activities/activity2.py similarity index 71% rename from resource_sample/resource_sample/activities/activity2.py rename to resource_sample_py/resource_sample/activities/activity2.py index 491a94f..26ddb10 100644 --- a/resource_sample/resource_sample/activities/activity2.py +++ b/resource_sample_py/resource_sample/activities/activity2.py @@ -1,11 +1,11 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -from temporalio import activity -from dataclasses import dataclass -from resource_sample.common.messages import ComposeGreetingInput import os + import hvac +from common.messages import ComposeGreetingInput +from temporalio import activity vault_client = None if os.getenv("TWC_VAULT_ADDR"): @@ -19,25 +19,26 @@ secret_id=os.getenv("TWC_VAULT_ROLE_SECRET_ID"), ) + # Basic activity that logs and does string concatenation @activity.defn(name="vault_test") async def vault_test(arg: ComposeGreetingInput) -> str: activity.logger.info("Running activity with parameter %s" % arg) hvac_secret = { - 'greeting': arg.greeting, + "greeting": arg.greeting, } vault_client.secrets.kv.v2.create_or_update_secret( - path='credentials', - mount_point=os.getenv('TWC_VAULT_MOUNT'), + path="credentials", + mount_point=os.getenv("TWC_VAULT_MOUNT"), secret=hvac_secret, ) read_secret_result = vault_client.secrets.kv.v2.read_secret( - path='credentials', - mount_point=os.getenv('TWC_VAULT_MOUNT'), + path="credentials", + mount_point=os.getenv("TWC_VAULT_MOUNT"), ) - - greeting = read_secret_result['data']['data']['greeting'] + + greeting = read_secret_result["data"]["data"]["greeting"] return f"{greeting}, {arg.name}!" diff --git a/resource_sample_py/resource_sample/common/__init__.py b/resource_sample_py/resource_sample/common/__init__.py new file mode 100644 index 0000000..298499f --- /dev/null +++ b/resource_sample_py/resource_sample/common/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Common.""" diff --git a/resource_sample/resource_sample/common/messages.py b/resource_sample_py/resource_sample/common/messages.py similarity index 99% rename from resource_sample/resource_sample/common/messages.py rename to resource_sample_py/resource_sample/common/messages.py index e7c0797..8c1c350 100644 --- a/resource_sample/resource_sample/common/messages.py +++ b/resource_sample_py/resource_sample/common/messages.py @@ -3,6 +3,7 @@ from dataclasses import dataclass + @dataclass class ComposeGreetingInput: greeting: str diff --git a/resource_sample_py/resource_sample/worker.py b/resource_sample_py/resource_sample/worker.py new file mode 100644 index 0000000..7725536 --- /dev/null +++ b/resource_sample_py/resource_sample/worker.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Temporal client worker.""" + +import asyncio +import logging +import os + +from activities.activity1 import compose_greeting +from activities.activity2 import vault_test +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporallib.auth import ( + AuthOptions, + GoogleAuthOptions, + KeyPair, + MacaroonAuthOptions, +) +from temporallib.client import Client, Options +from temporallib.encryption import EncryptionOptions +from temporallib.worker import SentryOptions, Worker, WorkerOptions +from workflows.workflow1 import GreetingWorkflow, VaultWorkflow + +logger = logging.getLogger(__name__) + + +def _get_auth_header(): + """Get auth options based on provider. + + Returns: + AuthOptions object. + """ + if os.getenv("TWC_AUTH_PROVIDER") == "candid": + return MacaroonAuthOptions( + keys=KeyPair( + private=os.getenv("TWC_CANDID_PRIVATE_KEY"), + public=os.getenv("TWC_CANDID_PUBLIC_KEY"), + ), + macaroon_url=os.getenv("TWC_CANDID_URL"), + username=os.getenv("TWC_CANDID_USERNAME"), + ) + + if os.getenv("TWC_AUTH_PROVIDER") == "google": + return GoogleAuthOptions( + type="service_account", + project_id=os.getenv("TWC_OIDC_PROJECT_ID"), + private_key_id=os.getenv("TWC_OIDC_PRIVATE_KEY_ID"), + private_key=os.getenv("TWC_OIDC_PRIVATE_KEY"), + client_email=os.getenv("TWC_OIDC_CLIENT_EMAIL"), + client_id=os.getenv("TWC_OIDC_CLIENT_ID"), + auth_uri=os.getenv("TWC_OIDC_AUTH_URI"), + token_uri=os.getenv("TWC_OIDC_TOKEN_URI"), + auth_provider_x509_cert_url=os.getenv("TWC_OIDC_AUTH_CERT_URL"), + client_x509_cert_url=os.getenv("TWC_OIDC_CLIENT_CERT_URL"), + ) + + return None + + +def _init_runtime_with_prometheus(port: int) -> Runtime: + """Create runtime for use with Prometheus metrics. + + Args: + port: Port of prometheus. + + Returns: + Runtime for temporalio with prometheus. + """ + return Runtime( + telemetry=TelemetryConfig( + metrics=PrometheusConfig(bind_address=f"0.0.0.0:{port}") + ) + ) + + +async def run_worker(): + """Connect Temporal worker to Temporal server.""" + client_config = Options( + host=os.getenv("TWC_HOST"), + namespace=os.getenv("TWC_NAMESPACE"), + queue=os.getenv("TWC_QUEUE"), + ) + + if os.getenv("TWC_TLS_ROOT_CAS", "").strip() != "": + client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") + + if os.getenv("TWC_AUTH_PROVIDER", "").strip() != "": + client_config.auth = AuthOptions( + provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header() + ) + + if os.getenv("TWC_ENCRYPTION_KEY", "").strip() != "": + client_config.encryption = EncryptionOptions( + key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True + ) + + worker_opt = None + dsn = os.getenv("TWC_SENTRY_DSN", "").strip() + if dsn != "": + sentry = SentryOptions( + dsn=dsn, + release=os.getenv("TWC_SENTRY_RELEASE", "").strip() or None, + environment=os.getenv("TWC_SENTRY_ENVIRONMENT", "").strip() or None, + redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS", False), + sample_rate=os.getenv("TWC_SENTRY_SAMPLE_RATE", 1.0), + ) + + worker_opt = WorkerOptions(sentry=sentry) + + runtime = None + if os.getenv("TWC_PROMETHEUS_PORT"): + runtime = _init_runtime_with_prometheus(int(os.getenv("TWC_PROMETHEUS_PORT"))) + + client = await Client.connect(client_config, runtime=runtime) + + worker = Worker( + client=client, + task_queue=os.getenv("TWC_QUEUE"), + workflows=[GreetingWorkflow, VaultWorkflow], + activities=[compose_greeting, vault_test], + worker_opt=worker_opt, + ) + + await worker.run() + + +if __name__ == "__main__": # pragma: nocover + asyncio.run(run_worker()) diff --git a/resource_sample_py/resource_sample/workflows/__init__.py b/resource_sample_py/resource_sample/workflows/__init__.py new file mode 100644 index 0000000..cdf6752 --- /dev/null +++ b/resource_sample_py/resource_sample/workflows/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +"""Temporal workflows.""" diff --git a/resource_sample_py/resource_sample/workflows/workflow1.py b/resource_sample_py/resource_sample/workflows/workflow1.py new file mode 100644 index 0000000..e4e1239 --- /dev/null +++ b/resource_sample_py/resource_sample/workflows/workflow1.py @@ -0,0 +1,35 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + import activities.activity1 as all_activities1 + import activities.activity2 as all_activities2 + + +# Basic workflow that logs and invokes an activity +@workflow.defn(name="GreetingWorkflow") +class GreetingWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + all_activities1.compose_greeting, + all_activities1.ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + +@workflow.defn(name="VaultWorkflow") +class VaultWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + all_activities2.vault_test, + all_activities1.ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) diff --git a/resource_sample_py/rockcraft.yaml b/resource_sample_py/rockcraft.yaml new file mode 100644 index 0000000..cc4aba3 --- /dev/null +++ b/resource_sample_py/rockcraft.yaml @@ -0,0 +1,75 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +name: temporal-worker +summary: Temporal worker app +description: OCI image for Temporal worker app +version: "1.0" +base: ubuntu@22.04 +build-base: ubuntu@22.04 +license: Apache-2.0 + +services: + temporal-worker: + override: replace + summary: "temporal worker" + startup: disabled + command: "./app/scripts/start-worker.sh" + environment: + TWC_HOST: localhost:7233 + TWC_NAMESPACE: default + TWC_QUEUE: test-queue + TWC_PROMETHEUS_PORT: "9000" + TWC_TLS_ROOT_CAS: "" + TWC_AUTH_PROVIDER: "" # "google" or "candid" + TWC_ENCRYPTION_KEY: "" + TWC_SENTRY_DSN: "" + TWC_SENTRY_RELEASE: "" + TWC_SENTRY_ENVIRONMENT: "" + TWC_SENTRY_REDACT_PARAMS: "False" + TWC_SENTRY_SAMPLE_RATE: "1.0" + TWC_CANDID_URL: "" + TWC_CANDID_USERNAME: "" + TWC_CANDID_PRIVATE_KEY: "" + TWC_CANDID_PUBLIC_KEY: "" + TWC_OIDC_PROJECT_ID: "" + TWC_OIDC_PRIVATE_KEY_ID: "" + TWC_OIDC_PRIVATE_KEY: "" + TWC_OIDC_CLIENT_EMAIL: "" + TWC_OIDC_CLIENT_ID: "" + TWC_OIDC_AUTH_URI: "" + TWC_OIDC_TOKEN_URI: "" + TWC_OIDC_AUTH_CERT_URL: "" + TWC_OIDC_CLIENT_CERT_URL: "" + + # Vault variables + TWC_VAULT_ADDR: "" + TWC_VAULT_CACERT_BYTES: "" + TWC_VAULT_ROLE_ID: "" + TWC_VAULT_ROLE_SECRET_ID: "" + TWC_VAULT_MOUNT: "" + TWC_VAULT_CERT_PATH: "" + +platforms: + amd64: + +parts: + worker-dependencies: + plugin: python + source: . + build-packages: + - build-essential + # Uncomment if using a 'requirements.txt' file + # python-requirements: + # - requirements.txt + stage-packages: + - python3.10-venv + - coreutils + - bash + + worker-app: + plugin: dump + source: . + organize: + "*": app/ + stage: + - app diff --git a/resource_sample_py/scripts/start-worker.sh b/resource_sample_py/scripts/start-worker.sh new file mode 100755 index 0000000..7b79f9f --- /dev/null +++ b/resource_sample_py/scripts/start-worker.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +# update 'resource_sample' accordingly +python3 app/resource_sample/worker.py diff --git a/resource_sample_py/tests/__init__.py b/resource_sample_py/tests/__init__.py new file mode 100644 index 0000000..db3bfe1 --- /dev/null +++ b/resource_sample_py/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/resource_sample/config.yaml b/sample_files/config.yaml similarity index 58% rename from resource_sample/config.yaml rename to sample_files/config.yaml index 6ec72e3..715d1ea 100644 --- a/resource_sample/config.yaml +++ b/sample_files/config.yaml @@ -5,6 +5,3 @@ temporal-worker-k8s: host: "temporal-k8s:7233" queue: "test-queue" namespace: "default" - workflows-file-name: "python_samples-1.1.0-py3-none-any.whl" - supported-workflows: "all" - supported-activities: "all" diff --git a/resource_sample/invalid.env b/sample_files/invalid.env similarity index 100% rename from resource_sample/invalid.env rename to sample_files/invalid.env diff --git a/resource_sample/sample.env b/sample_files/sample.env similarity index 100% rename from resource_sample/sample.env rename to sample_files/sample.env diff --git a/src/charm.py b/src/charm.py index a297ff9..7dd0d0a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,9 +7,8 @@ """Charm definition and helpers.""" import logging -import re +import os import secrets -from pathlib import Path from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer @@ -21,12 +20,10 @@ from ops.model import ( ActiveStatus, BlockedStatus, - Container, MaintenanceStatus, ModelError, WaitingStatus, ) -from ops.pebble import CheckStatus from literals import ( LOG_FILE, @@ -125,9 +122,6 @@ def _on_restart(self, event): self.unit.status = MaintenanceStatus("restarting worker") container.restart(self.name) - self.unit.status = ActiveStatus( - f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" - ) event.set_results({"result": "worker successfully restarted"}) @@ -159,11 +153,6 @@ def _on_update_status(self, event): self._update(event) return - check = container.get_check("up") - if check.status != CheckStatus.UP: - self.unit.status = MaintenanceStatus("Status check: DOWN") - return - self.unit.status = ActiveStatus( f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" ) @@ -179,7 +168,7 @@ def _validate_pebble_plan(self, container): """ try: plan = container.get_plan().to_dict() - return bool(plan and plan["services"].get(self.name, {}).get("on-check-failure")) + return bool(plan and plan["services"].get(self.name, {})) except pebble.ConnectionError: return False @@ -215,72 +204,6 @@ def _process_env_file(self, event): except ModelError as err: logger.error(err) - def _process_wheel_file(self, event): # noqa: C901 - """Process wheel file attached by user. - - This method extracts the wheel file provided by the user and places the contents - into the workload container, which can then be read by the Temporal worker. - - Args: - event: The event triggered when the relation changed. - - Raises: - ValueError: if file is not found or an operation failed while extracting. - """ - if not self._state.is_ready(): - event.defer() - return - - if self.config["workflows-file-name"].strip() == "": - raise ValueError("Invalid config: wheel-file-name missing") - - if not _validate_wheel_name(self.config["workflows-file-name"]): - raise ValueError("Invalid config: invalid wheel-file-name") - - try: - resource_path = self.model.resources.fetch("workflows-file") - filename = Path(resource_path).name - - container = self.unit.get_container(self.name) - if not container.can_connect(): - event.defer() - self.unit.status = WaitingStatus("waiting for pebble api") - return - - container.exec(["rm", "-rf", "/user_provided"]).wait_output() - - wheel_file = f"/user_provided/{filename}" - original_wheel_file = f"/user_provided/{self.config['workflows-file-name']}" - wheel_arr = self.config["workflows-file-name"].split("-") - unpacked_file_name = f"/user_provided/{'-'.join(wheel_arr[0:2])}" - - with open(resource_path, "rb") as file: - wheel_data = file.read() - - # Push wheel file to the container and extract it. - container.push(wheel_file, wheel_data, make_dirs=True) - - # Rename wheel file to its original name and install it - if wheel_file != original_wheel_file: - container.exec(["mv", wheel_file, original_wheel_file]).wait() - - _install_wheel_file( - container=container, wheel_file_name=original_wheel_file, proxy=self.config["http-proxy"] - ) - _unpack_wheel_file(container=container, wheel_file_name=original_wheel_file) - module_name = _get_module_name(container=container, unpacked_file_name=unpacked_file_name) - _check_required_directories( - container=container, unpacked_file_name=unpacked_file_name, module_name=module_name - ) - - if self.unit.is_leader(): - self._state.module_name = module_name - self._state.unpacked_file_name = unpacked_file_name - - except ModelError as err: - logger.error(err) - raise ValueError("Invalid state: workflows-file resource not found") from err - def _check_required_config(self, config_list): """Check if required config has been set by user. @@ -310,14 +233,10 @@ def _validate(self, event): # noqa: C901 if not self._state.is_ready(): raise ValueError("peer relation not ready") - self._process_wheel_file(event) self._process_env_file(event) self._check_required_config(REQUIRED_CHARM_CONFIG) - if self._state.module_name is None: - raise ValueError("Invalid state: error extracting folder name from wheel file") - if self.config["auth-provider"]: if not self.config["auth-provider"] in SUPPORTED_AUTH_PROVIDERS: raise ValueError("Invalid config: auth-provider not supported") @@ -349,28 +268,25 @@ def _update(self, event): # noqa: C901 self.unit.status = WaitingStatus("waiting for pebble api") return - # ensure the container is set up - _setup_container(container, self.config["http-proxy"]) - logger.info("Configuring Temporal worker") - module_name = self._state.module_name - unpacked_file_name = self._state.unpacked_file_name - context = {} if self._state.env: context.update(self._state.env) + proxy_vars = { + "HTTP_PROXY": "JUJU_CHARM_HTTP_PROXY", + "HTTPS_PROXY": "JUJU_CHARM_HTTPS_PROXY", + "NO_PROXY": "JUJU_CHARM_NO_PROXY", + } + + for key, env_var in proxy_vars.items(): + value = os.environ.get(env_var) + if value: + context.update({key: value}) + context.update({convert_env_var(key): value for key, value in self.config.items()}) - command = f"python worker.py {unpacked_file_name} {module_name}" - - context.update( - { - "HTTP_PROXY": self.config["http-proxy"], - "HTTPS_PROXY": self.config["https-proxy"], - "NO_PROXY": self.config["no-proxy"], - } - ) + context.update({"TWC_PROMETHEUS_PORT": PROMETHEUS_PORT}) try: vault_config = self.vault_relation._get_vault_config() @@ -396,19 +312,10 @@ def _update(self, event): # noqa: C901 "services": { self.name: { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": context, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -433,148 +340,5 @@ def convert_env_var(config_var, prefix="TWC_"): return prefix + converted_env_var -def _setup_container(container: Container, proxy: str): - """Copy worker file to the container and install dependencies. - - Args: - container: Container unit on which to perform action. - proxy: optional proxy value used in running pip command. - - Raises: - ValueError: if worker dependencies fail to install. - """ - resources_path = Path(__file__).parent / "resources" - _push_container_file(container, resources_path, "/worker.py", resources_path / "worker.py") - _push_container_file(container, resources_path, "/../literals.py", resources_path / "../literals.py") - _push_container_file(container, resources_path, "/check_status.py", resources_path / "check_status.py") - _push_container_file( - container, resources_path, "/worker-dependencies.txt", resources_path / "worker-dependencies.txt" - ) - - # Install worker dependencies - worker_dependencies_path = "/worker-dependencies.txt" - logger.info("installing worker dependencies...") - - command = ["pip", "install", "-r", str(worker_dependencies_path)] - if proxy.strip() != "": - command.insert(2, f"--proxy={proxy}") - - _, error = container.exec(command).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to install worker dependencies: {error}") - raise ValueError("Invalid state: failed to install worker dependencies") - - -def _validate_wheel_name(filename): - """Validate wheel file name. - - Args: - filename: Name of the wheel file. - - Returns: - True if the file name is valid, False otherwise. - """ - # Define an allowed list of allowed characters and patterns - allowed_pattern = r"^[a-zA-Z0-9-._]+-[a-zA-Z0-9_.]+-([a-zA-Z0-9_.]+|any|py2.py3)-(none|linux|macosx|win)-(any|any|intel|amd64)\.whl$" - return bool(re.match(allowed_pattern, filename)) - - -def _push_container_file(container: Container, src_path, dest_path, resource): - """Copy worker file to the container and install dependencies. - - Args: - container: Container unit on which to perform action. - src_path: resource path. - dest_path: destination path on container. - resource: resource to push to container. - """ - source_path = src_path / resource - with open(source_path, "r") as file_source: - logger.info(f"pushing {resource} source...") - container.push(dest_path, file_source, make_dirs=True) - - -def _install_wheel_file(container: Container, wheel_file_name: str, proxy: str): - """Install named wheel file on container. - - Args: - container: Container unit on which to perform action. - wheel_file_name: name of wheel file to install. - proxy: optional proxy used when installing wheel file. - - Raises: - ValueError: if wheel file fails to install. - """ - command = ["pip", "install", wheel_file_name] - if proxy.strip() != "": - command.insert(2, f"--proxy={proxy}") - - _, error = container.exec(command).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to install wheel file: {error}") - raise ValueError("Invalid state: failed to install wheel file") - - -def _unpack_wheel_file(container: Container, wheel_file_name: str): - """Unpack named wheel file on container. - - Args: - container: Container unit on which to perform action. - wheel_file_name: name of wheel file to unpack. - - Raises: - ValueError: if wheel unpacking fails. - """ - _, error = container.exec(["wheel", "unpack", wheel_file_name, "-d", "/user_provided"]).wait_output() - if error is not None and error.strip() != "" and not error.strip().startswith("WARNING"): - logger.error(f"failed to unpack wheel file: {error}") - raise ValueError("Invalid state: failed to unpack wheel file") - - -def _get_module_name(container: Container, unpacked_file_name: str): - """Get module name from unpacked wheel file. - - Args: - container: Container unit on which to perform action. - unpacked_file_name: Name of wheel file after unpacking. - - Returns: - Name of module provided by the user. - - Raises: - ValueError: if module name extraction fails. - """ - # Find the name of the module provided by the user and set it in state. - command = f"find {unpacked_file_name} -mindepth 1 -maxdepth 1 -type d ! -name *.dist-info ! -name *.whl" - out, error = container.exec(command.split(" ")).wait_output() - - if error is not None and error.strip() != "": - logger.error(f"failed to extract module name from wheel file: {error}") - raise ValueError("Invalid state: failed to extract module name from wheel file") - - directories = out.split("\n") - return Path(directories[0]).name - - -def _check_required_directories(container: Container, unpacked_file_name: str, module_name: str): - """Verify that the required workflows and activities directories are present. - - Args: - container: Container unit on which to perform action. - unpacked_file_name: Name of wheel file after unpacking. - module_name: Name of module provided by user. - - Raises: - ValueError: if required directories are not found in attached resource. - """ - command = f"find {unpacked_file_name}/{module_name} -mindepth 1 -maxdepth 1 -type d" - out, _ = container.exec(command.split(" ")).wait_output() - provided_directories = out.split("\n") - required_directories = ["workflows", "activities"] - for d in required_directories: - if f"{unpacked_file_name}/{module_name}/{d}" not in provided_directories: - raise ValueError(f"Invalid state: {d} directory not found in attached resource") - - if __name__ == "__main__": # pragma: nocover main.main(TemporalWorkerK8SOperatorCharm) diff --git a/src/literals.py b/src/literals.py index 7f03171..9ffe1e9 100644 --- a/src/literals.py +++ b/src/literals.py @@ -8,7 +8,7 @@ VALID_LOG_LEVELS = ["info", "debug", "warning", "error", "critical"] LOG_FILE = "/var/log/temporal" -REQUIRED_CHARM_CONFIG = ["host", "namespace", "queue", "supported-workflows", "supported-activities"] +REQUIRED_CHARM_CONFIG = ["host", "namespace", "queue"] REQUIRED_CANDID_CONFIG = ["candid-url", "candid-username", "candid-public-key", "candid-private-key"] REQUIRED_OIDC_CONFIG = [ "oidc-auth-type", diff --git a/src/resources/check_status.py b/src/resources/check_status.py deleted file mode 100644 index 2491f24..0000000 --- a/src/resources/check_status.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Temporal worker status checker.""" - - -import logging -import sys - -logger = logging.getLogger(__name__) - - -def check_worker_status(): - """Check Temporal worker status by reading status file.""" - try: - with open("worker_status.txt", "r") as status_file: - status = status_file.read().strip() - logger.info(f"Async status: {status}") - - if status.startswith("Success"): - exit_code = 0 - else: - exit_code = 1 - except FileNotFoundError: - logger.error("Status file not found. Worker is not running.") - exit_code = 1 - - sys.exit(exit_code) - - -if __name__ == "__main__": - check_worker_status() diff --git a/src/resources/temporal_client/activities.py b/src/resources/temporal_client/activities.py deleted file mode 100644 index 3306412..0000000 --- a/src/resources/temporal_client/activities.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal sample activity.""" - -from dataclasses import dataclass - -from temporalio import activity - - -@dataclass -class ComposeGreetingInput: - """Greeting class. - - Attrs: - greeting: greeting string. - name: name string. - """ - - greeting: str - name: str - - -@activity.defn(name="compose_greeting") -async def compose_greeting(arg: ComposeGreetingInput) -> str: - """Log and do string concatenation activity. - - Args: - arg: greeting to log. - - Returns: - Greeting string. - """ - activity.logger.info(f"Running activity with parameter {arg}") - return f"{arg.greeting}, {arg.name}!" diff --git a/src/resources/temporal_client/workflows.py b/src/resources/temporal_client/workflows.py deleted file mode 100644 index fe1db1c..0000000 --- a/src/resources/temporal_client/workflows.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal sample workflow.""" - -from datetime import timedelta - -from temporalio import workflow - -with workflow.unsafe.imports_passed_through(): - from .activities import ComposeGreetingInput, compose_greeting - - -@workflow.defn(name="GreetingWorkflow") -class GreetingWorkflow: - """Basic workflow that logs and invokes an activity.""" - - @workflow.run - async def run(self, name: str) -> str: - """Workflow runner. - - Args: - name: value to be logged. - - Returns: - Workflow execution. - """ - workflow.logger.info(f"Running workflow with parameter {name}") - return await workflow.execute_activity( - compose_greeting, - ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), - ) diff --git a/src/resources/worker-dependencies.txt b/src/resources/worker-dependencies.txt deleted file mode 100644 index aed7c75..0000000 --- a/src/resources/worker-dependencies.txt +++ /dev/null @@ -1,2 +0,0 @@ -temporal-lib-py==1.3.1 -wheel==0.41.2 diff --git a/src/resources/worker.py b/src/resources/worker.py deleted file mode 100644 index 4449e93..0000000 --- a/src/resources/worker.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - - -"""Temporal client worker.""" - -import asyncio -import glob -import inspect -import logging -import os -import sys -import traceback -from importlib import import_module - -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig -from temporallib.auth import ( - AuthOptions, - GoogleAuthOptions, - KeyPair, - MacaroonAuthOptions, -) -from temporallib.client import Client, Options -from temporallib.encryption import EncryptionOptions -from temporallib.worker import SentryOptions, Worker, WorkerOptions - -from literals import PROMETHEUS_PORT - -logger = logging.getLogger(__name__) - - -def _get_auth_header(): - """Get auth options based on provider. - - Returns: - AuthOptions object. - """ - if os.getenv("TWC_AUTH_PROVIDER") == "candid": - return MacaroonAuthOptions( - keys=KeyPair(private=os.getenv("TWC_CANDID_PRIVATE_KEY"), public=os.getenv("TWC_CANDID_PUBLIC_KEY")), - macaroon_url=os.getenv("TWC_CANDID_URL"), - username=os.getenv("TWC_CANDID_USERNAME"), - ) - - if os.getenv("TWC_AUTH_PROVIDER") == "google": - return GoogleAuthOptions( - type="service_account", - project_id=os.getenv("TWC_OIDC_PROJECT_ID"), - private_key_id=os.getenv("TWC_OIDC_PRIVATE_KEY_ID"), - private_key=os.getenv("TWC_OIDC_PRIVATE_KEY"), - client_email=os.getenv("TWC_OIDC_CLIENT_EMAIL"), - client_id=os.getenv("TWC_OIDC_CLIENT_ID"), - auth_uri=os.getenv("TWC_OIDC_AUTH_URI"), - token_uri=os.getenv("TWC_OIDC_TOKEN_URI"), - auth_provider_x509_cert_url=os.getenv("TWC_OIDC_AUTH_CERT_URL"), - client_x509_cert_url=os.getenv("TWC_OIDC_CLIENT_CERT_URL"), - ) - - return None - - -def _import_modules(module_type, unpacked_file_name, module_name, supported_modules): - """Extract supported workflows and activities . - - Args: - module_type: "workflows" or "activities". - unpacked_file_name: Name of unpacked wheel file. - module_name: Parent module name extracted from wheel file. - supported_modules: list of supported modules to be extracted from module file. - - Returns: - List of supported module references extracted from .py file. - """ - folder_path = os.path.join(os.getcwd(), unpacked_file_name, module_name, module_type) - sys.path.append(folder_path) - file_names = glob.glob(f"{folder_path}/*.py") - file_names = [os.path.basename(file) for file in file_names] - - module_list = [] - for file_name in file_names: - module_name = file_name[:-3] - module = import_module(module_name) - - if "all" in supported_modules: - for _, obj in inspect.getmembers(module): - if module_type == "workflows": - if inspect.isclass(obj) and inspect.getmodule(obj) is module: - module_list.append(obj) - else: - if inspect.isfunction(obj) and inspect.getmodule(obj) is module: - module_list.append(obj) - else: - for sm in supported_modules: - if hasattr(module, sm.strip()): - module_list.append(getattr(module, sm.strip())) - - return module_list - - -def _init_runtime_with_prometheus(port: int) -> Runtime: - """Create runtime for use with Prometheus metrics. - - Args: - port: Port of prometheus. - - Returns: - Runtime for temporalio with prometheus. - """ - return Runtime(telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address=f"0.0.0.0:{port}"))) - - -async def run_worker(unpacked_file_name, module_name): - """Connect Temporal worker to Temporal server. - - Args: - unpacked_file_name: Name of unpacked wheel file. - module_name: Parent module name extracted from wheel file. - """ - client_config = Options( - host=os.getenv("TWC_HOST"), - namespace=os.getenv("TWC_NAMESPACE"), - queue=os.getenv("TWC_QUEUE"), - ) - - try: - workflows = _import_modules( - "workflows", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","), - ) - activities = _import_modules( - "activities", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","), - ) - - if os.getenv("TWC_TLS_ROOT_CAS").strip() != "": - client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") - - if os.getenv("TWC_AUTH_PROVIDER").strip() != "": - client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()) - - if os.getenv("TWC_ENCRYPTION_KEY").strip() != "": - client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True) - - worker_opt = None - dsn = os.getenv("TWC_SENTRY_DSN").strip() - if dsn != "": - sentry = SentryOptions( - dsn=dsn, - release=os.getenv("TWC_SENTRY_RELEASE").strip() or None, - environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None, - redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"), - sample_rate=os.getenv("TWC_SENTRY_SAMPLE_RATE"), - ) - - worker_opt = WorkerOptions(sentry=sentry) - - runtime = _init_runtime_with_prometheus(PROMETHEUS_PORT) - - client = await Client.connect(client_config, runtime=runtime) - - worker = Worker( - client=client, - task_queue=os.getenv("TWC_QUEUE"), - workflows=workflows, - activities=activities, - worker_opt=worker_opt, - ) - - with open("worker_status.txt", "w") as status_file: - status_file.write("Success") - await worker.run() - except Exception as e: - # If an error occurs, write the error message to the status file - with open("worker_status.txt", "w") as status_file: - logger.exception("Error in the workflow:") - traceback.print_exception(type(e), e, e.__traceback__, file=status_file) - - -if __name__ == "__main__": # pragma: nocover - global_unpacked_file_name = sys.argv[1] - global_module_name = sys.argv[2] - - asyncio.run(run_worker(global_unpacked_file_name, global_module_name)) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..2fb9958 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests module.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1d015b2 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,18 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for jenkins-k8s charm tests.""" + +import pytest + + +def pytest_addoption(parser: pytest.Parser): + """Parse additional pytest options. + + Args: + parser: pytest command line parser. + """ + # The prebuilt charm file. + parser.addoption("--charm-file", action="append", default=[]) + # The image name:tag. + parser.addoption("--temporal-worker-image", action="store", default="") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9be93e5..576b8d3 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -10,35 +10,42 @@ from helpers import ( APP_NAME, APP_NAME_SERVER, - METADATA, WORKER_CONFIG, - attach_worker_resource_file, get_application_url, setup_temporal_ecosystem, ) +from pytest import FixtureRequest from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) -rsc_path = "./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl" env_rsc_path = "./resource_sample/sample.env" +@pytest.fixture(scope="module", name="temporal_worker_image") +def temporal_worker_image_fixture(request: FixtureRequest) -> str: + """Fetch the OCI image for Temporal Worker charm.""" + temporal_worker_image = request.config.getoption("--temporal-worker-image") + assert ( + temporal_worker_image + ), "--temporal-worker-image argument is required which should contain the name of the OCI image." + return temporal_worker_image + + @pytest.mark.skip_if_deployed @pytest_asyncio.fixture(name="deploy", scope="module") -async def deploy(ops_test: OpsTest): +async def deploy(ops_test: OpsTest, temporal_worker_image: str): """Verify the app is up and running.""" await ops_test.model.set_config({"update-status-hook-interval": "1m"}) - await setup_temporal_ecosystem(ops_test) - charm = await ops_test.build_charm(".") resources = { - "temporal-worker-image": METADATA["containers"]["temporal-worker"]["upstream-source"], - "workflows-file": rsc_path, + "temporal-worker-image": temporal_worker_image, "env-file": env_rsc_path, } + charm = await ops_test.build_charm(".") await ops_test.model.deploy(charm, resources=resources, config=WORKER_CONFIG, application_name=APP_NAME) + await setup_temporal_ecosystem(ops_test) async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( @@ -50,5 +57,11 @@ async def deploy(ops_test: OpsTest): url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233) await ops_test.model.applications[APP_NAME].set_config({"host": url}) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + raise_on_blocked=False, + timeout=600, + ) - await attach_worker_resource_file(ops_test, rsc_type="workflows") + assert ops_test.model.applications[APP_NAME].units[0].workload_status == "active" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index ff299e9..22a1df6 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -7,6 +7,7 @@ import asyncio import logging import time +from datetime import timedelta from pathlib import Path import yaml @@ -23,9 +24,6 @@ WORKER_CONFIG = { "namespace": "default", "queue": "test-queue", - "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", - "supported-workflows": "all", - "supported-activities": "all", } @@ -63,10 +61,7 @@ async def run_sample_workflow(ops_test: OpsTest, workflow_type=None): # Execute workflow name = "Jean-luc" result = await client.execute_workflow( - workflow_name, - name, - id="my-workflow-id", - task_queue=WORKER_CONFIG["queue"], + workflow_name, name, id="my-workflow-id", task_queue=WORKER_CONFIG["queue"], run_timeout=timedelta(seconds=20) ) logger.info(f"result: {result}") assert result == f"Hello, {name}!" @@ -194,34 +189,23 @@ async def setup_temporal_ecosystem(ops_test: OpsTest): assert ops_test.model.applications[APP_NAME_SERVER].units[0].workload_status == "active" -async def attach_worker_resource_file(ops_test: OpsTest, rsc_type="workflows"): +async def attach_worker_invalid_env_file(ops_test: OpsTest): """Scale the application to the provided number and wait for idle. Args: ops_test: PyTest object. - rsc_type: Resource type. """ - if rsc_type == "workflows": - rsc_name = "workflows-file" - rsc_path = "./resource_sample/dist/python_samples-1.1.0-py3-none-any.whl" - else: - rsc_name = "env-file" - rsc_path = "./resource_sample/invalid.env" + rsc_name = "env-file" + rsc_path = "./sample_files/invalid.env" logger.info(f"Attaching resource: {APP_NAME} {rsc_name}={rsc_path}") with open(rsc_path, "rb") as file: ops_test.model.applications[APP_NAME].attach_resource(rsc_name, rsc_path, file) - if rsc_type == "workflows": - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", raise_on_error=False, raise_on_blocked=False, timeout=600 - ) - assert ops_test.model.applications[APP_NAME].units[0].workload_status == "active" - else: - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="blocked", raise_on_error=False, raise_on_blocked=False, timeout=600 - ) - assert ops_test.model.applications[APP_NAME].units[0].workload_status == "blocked" + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="blocked", raise_on_error=False, raise_on_blocked=False, timeout=600 + ) + assert ops_test.model.applications[APP_NAME].units[0].workload_status == "blocked" async def read_vault_unit_statuses(ops_test: OpsTest): diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index e2f2637..ffa2aa2 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -8,7 +8,7 @@ import pytest from conftest import deploy # noqa: F401, pylint: disable=W0611 -from helpers import attach_worker_resource_file, run_sample_workflow +from helpers import attach_worker_invalid_env_file, run_sample_workflow from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) @@ -25,4 +25,4 @@ async def test_basic_client(self, ops_test: OpsTest): async def test_invalid_env_file(self, ops_test: OpsTest): """Attaches an invalid .env file to the worker.""" - await attach_worker_resource_file(ops_test, rsc_type="env-file") + await attach_worker_invalid_env_file(ops_test) diff --git a/tests/integration/test_upgrades.py b/tests/integration/test_upgrades.py index 987b1f6..a8c29e6 100644 --- a/tests/integration/test_upgrades.py +++ b/tests/integration/test_upgrades.py @@ -12,7 +12,6 @@ APP_NAME, APP_NAME_SERVER, WORKER_CONFIG, - attach_worker_resource_file, get_application_url, run_sample_workflow, setup_temporal_ecosystem, @@ -41,8 +40,12 @@ async def deploy(ops_test: OpsTest): url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233) await ops_test.model.applications[APP_NAME].set_config({"host": url}) - - await attach_worker_resource_file(ops_test) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + raise_on_blocked=False, + timeout=600, + ) @pytest.mark.abort_on_fail diff --git a/tests/unit/literals.py b/tests/unit/literals.py index 8c6f0a0..57aad1a 100644 --- a/tests/unit/literals.py +++ b/tests/unit/literals.py @@ -11,12 +11,9 @@ "host": "test-host", "namespace": "test-namespace", "queue": "test-queue", - "supported-workflows": "all", - "supported-activities": "all", "sentry-dsn": "", "sentry-release": "", "sentry-environment": "", - "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", "encryption-key": "", "auth-provider": "candid", "tls-root-cas": "", @@ -34,9 +31,6 @@ "oidc-token-uri": "", "oidc-auth-cert-url": "", "oidc-client-cert-url": "", - "http-proxy": "proxy", - "https-proxy": "proxy", - "no-proxy": "none", } EXPECTED_VAULT_ENV = { @@ -56,11 +50,9 @@ "TWC_CANDID_USERNAME": "test-username", "TWC_ENCRYPTION_KEY": "", "TWC_HOST": "test-host", - "TWC_HTTPS_PROXY": "proxy", - "TWC_HTTP_PROXY": "proxy", "TWC_LOG_LEVEL": "debug", "TWC_NAMESPACE": "test-namespace", - "TWC_NO_PROXY": "none", + "TWC_PROMETHEUS_PORT": 9000, "TWC_OIDC_AUTH_CERT_URL": "", "TWC_OIDC_AUTH_TYPE": "", "TWC_OIDC_AUTH_URI": "", @@ -77,11 +69,5 @@ "TWC_SENTRY_RELEASE": "", "TWC_SENTRY_SAMPLE_RATE": 1.0, "TWC_SENTRY_REDACT_PARAMS": False, - "TWC_SUPPORTED_ACTIVITIES": "all", - "TWC_SUPPORTED_WORKFLOWS": "all", "TWC_TLS_ROOT_CAS": "", - "TWC_WORKFLOWS_FILE_NAME": "python_samples-1.1.0-py3-none-any.whl", - "HTTP_PROXY": "proxy", - "HTTPS_PROXY": "proxy", - "NO_PROXY": "none", } diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 5075a79..202d522 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -6,11 +6,9 @@ """Temporal worker charm unit tests.""" import json -from unittest import TestCase, mock +from unittest import TestCase -from ops import Container from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus -from ops.pebble import CheckStatus from ops.testing import Harness from charm import TemporalWorkerK8SOperatorCharm @@ -40,52 +38,32 @@ def test_initial_plan(self): initial_plan = harness.get_container_pebble_plan(CONTAINER_NAME).to_dict() self.assertEqual(initial_plan, {}) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - def test_attach_resource(self, _process_wheel_file): - """The workflows file resource can be attached.""" + def test_blocked_on_missing_host(self): + """The charm is blocked on missing host config.""" harness = self.harness # Simulate peer relation readiness. harness.add_relation("peer", "temporal") - harness.add_resource("workflows-file", "") - harness.charm.on.config_changed.emit() - _process_wheel_file.assert_called() self.assertEqual(harness.model.unit.status, BlockedStatus("Invalid config: host value missing")) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - def test_ready(self, _process_wheel_file, _setup_container): + def test_ready(self): """The charm is ready.""" harness = self.harness - state = simulate_lifecycle(harness, CONFIG) + simulate_lifecycle(harness, CONFIG) harness.charm.on.config_changed.emit() - module_name = json.loads(state["module_name"]) - unpacked_file_name = json.loads(state["unpacked_file_name"]) - - command = f"python worker.py {unpacked_file_name} {module_name}" - # The plan is generated after pebble is ready. want_plan = { "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": WANT_ENV, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -97,21 +75,6 @@ def test_ready(self, _process_wheel_file, _setup_container): self.assertTrue(service.is_running()) self.assertEqual(harness.model.unit.status, MaintenanceStatus("replanning application")) - - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - @mock.patch.object(Container, "exec") - def test_update_status_up(self, _process_wheel_file, _setup_container, mock_exec): - """The charm updates the unit status to active based on UP status.""" - harness = self.harness - mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=("", None))) - - simulate_lifecycle(harness, CONFIG) - self.harness.container_pebble_ready(CONTAINER_NAME) - - container = harness.model.unit.get_container(CONTAINER_NAME) - container.get_check = mock.Mock(status="up") - container.get_check.return_value.status = CheckStatus.UP harness.charm.on.update_status.emit() self.assertEqual( @@ -119,37 +82,13 @@ def test_update_status_up(self, _process_wheel_file, _setup_container, mock_exec ActiveStatus(f"worker listening to namespace {CONFIG['namespace']!r} on queue {CONFIG['queue']!r}"), ) - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - @mock.patch.object(Container, "exec") - def test_update_status_down(self, _process_wheel_file, _setup_container, mock_exec): - """The charm updates the unit status to maintenance based on DOWN status.""" - harness = self.harness - mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=1)) - - simulate_lifecycle(harness, CONFIG) - self.harness.container_pebble_ready(CONTAINER_NAME) - - container = harness.model.unit.get_container(CONTAINER_NAME) - container.get_check = mock.Mock(status="up") - container.get_check.return_value.status = CheckStatus.DOWN - harness.charm.on.update_status.emit() - - self.assertEqual(harness.model.unit.status, MaintenanceStatus("Status check: DOWN")) - - @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") - @mock.patch("charm._setup_container") - def test_vault_relation(self, _process_wheel_file, _setup_container): + def test_vault_relation(self): """The charm is ready with vault relation.""" harness = self.harness - state = simulate_lifecycle(harness, CONFIG) + simulate_lifecycle(harness, CONFIG) harness.charm.on.config_changed.emit() - module_name = json.loads(state["module_name"]) - unpacked_file_name = json.loads(state["unpacked_file_name"]) - command = f"python worker.py {unpacked_file_name} {module_name}" - relation_id = add_vault_relation(self, harness) self.harness.update_config({}) @@ -158,19 +97,10 @@ def test_vault_relation(self, _process_wheel_file, _setup_container): "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": {**WANT_ENV, **EXPECTED_VAULT_ENV}, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -187,19 +117,10 @@ def test_vault_relation(self, _process_wheel_file, _setup_container): "services": { "temporal-worker": { "summary": "temporal worker", - "command": command, + "command": "./app/scripts/start-worker.sh", "startup": "enabled", "override": "replace", "environment": {**WANT_ENV}, - "on-check-failure": {"up": "ignore"}, - } - }, - "checks": { - "up": { - "override": "replace", - "level": "alive", - "period": "10s", - "exec": {"command": "python check_status.py"}, } }, } @@ -254,29 +175,12 @@ def simulate_lifecycle(harness, config): Args: harness: ops.testing.Harness object used to simulate charm lifecycle. config: object to update the charm's config. - - Returns: - Peer relation data. """ # Simulate peer relation readiness. - rel = harness.add_relation("peer", "temporal") + harness.add_relation("peer", "temporal") # Simulate pebble readiness. container = harness.model.unit.get_container(CONTAINER_NAME) harness.charm.on.temporal_worker_pebble_ready.emit(container) harness.update_config(config) - harness.add_resource("workflows-file", "bytes_content") - - harness.update_relation_data( - rel, - app_or_unit="temporal-worker-k8s", - key_values={ - "supported_workflows": json.dumps(["TestWorkflow"]), - "supported_activities": json.dumps(["test_activity"]), - "module_name": json.dumps("python_samples"), - "unpacked_file_name": json.dumps("python_sample-0.1.0"), - }, - ) - - return harness.get_relation_data(rel, "temporal-worker-k8s") diff --git a/tox.ini b/tox.ini index 4f288b3..2d042b2 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ all_path = {[vars]src_path} {[vars]tst_path} [testenv] basepython = python3 -allowlist_externals = make +allowlist_externals = make, rockcraft setenv = PYTHONPATH = {toxinidir}:{toxinidir}/lib:{[vars]src_path} PYTHONBREAKPOINT=ipdb.set_trace @@ -56,7 +56,8 @@ commands = codespell {toxinidir} --skip {toxinidir}/.git --skip {toxinidir}/.tox \ --skip {toxinidir}/build --skip {toxinidir}/lib --skip {toxinidir}/venv \ --skip {toxinidir}/.mypy_cache --skip {toxinidir}/icon.svg \ - --skip {toxinidir}/src/resources/worker.py + --skip {toxinidir}/src/resources/worker.py \ + --skip {toxinidir}/resource_sample_py/resource_sample/worker.py pflake8 {[vars]all_path} isort --check-only --diff {[vars]all_path} black --check --diff {[vars]all_path} @@ -104,26 +105,12 @@ deps = poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + # For local testing + make -C resource_sample_py/ build_rock + pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} -[testenv:integration-scaling] -description = Run scaling integration tests -deps = - ipdb==0.13.9 - juju==3.2.0.1 - pytest==7.1.3 - pytest-operator==0.31.1 - temporal-lib-py==1.3.1 - pytest-asyncio==0.21 - poetry==1.8.3 - -r{toxinidir}/requirements.txt -commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_scaling.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode - -[testenv:integration-upgrades] -description = Run upgrades integration tests +[testenv:integration] +description = Run integration tests deps = ipdb==0.13.9 juju==3.2.0.1 @@ -132,23 +119,7 @@ deps = temporal-lib-py==1.3.1 pytest-asyncio==0.21 poetry==1.8.3 - -r{toxinidir}/requirements.txt -commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_upgrades.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode - -[testenv:integration-vault] -description = Run vault integration tests -deps = - ipdb==0.13.9 - juju==3.2.0.1 - pytest==7.1.3 - pytest-operator==0.31.1 - temporal-lib-py==1.3.1 - pytest-asyncio==0.21 hvac==2.2.0 - poetry==1.8.3 -r{toxinidir}/requirements.txt commands = - make -C resource_sample/ build - pytest {[vars]tst_path}integration/test_vault.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode + pytest --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs}