diff --git a/.licenserc.yaml b/.licenserc.yaml index 6e4149c..9c4de3e 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -2,6 +2,7 @@ header: license: spdx-id: Apache-2.0 copyright-owner: Canonical Ltd. + copyright-year: 2023 content: | Copyright [year] [owner] See LICENSE file for licensing details. diff --git a/src/charm.py b/src/charm.py index cc0d20e..c6cde50 100755 --- a/src/charm.py +++ b/src/charm.py @@ -11,7 +11,7 @@ from pathlib import Path from dotenv import dotenv_values -from ops import main +from ops import main, pebble from ops.charm import CharmBase from ops.model import ( ActiveStatus, @@ -21,6 +21,7 @@ ModelError, WaitingStatus, ) +from ops.pebble import CheckStatus from literals import ( REQUIRED_CANDID_CONFIG, @@ -51,6 +52,7 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.temporal_worker_pebble_ready, self._on_temporal_worker_pebble_ready) self.framework.observe(self.on.restart_action, self._on_restart) + self.framework.observe(self.on.update_status, self._on_update_status) @log_event_handler(logger) def _on_temporal_worker_pebble_ready(self, event): @@ -95,6 +97,48 @@ def _on_config_changed(self, event): self.unit.status = WaitingStatus("configuring temporal worker") self._update(event) + @log_event_handler(logger) + def _on_update_status(self, event): + """Handle `update-status` events. + + Args: + event: The `update-status` event triggered at intervals. + """ + try: + self._validate(event) + except ValueError: + return + + container = self.unit.get_container(self.name) + valid_pebble_plan = self._validate_pebble_plan(container) + if not valid_pebble_plan: + self._update(event) + return + + check = container.get_check("up") + if check.status != CheckStatus.UP: + self.unit.status = MaintenanceStatus("Status check: DOWN") + return + + self.unit.status = ActiveStatus( + f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" + ) + + def _validate_pebble_plan(self, container): + """Validate Temporal worker pebble plan. + + Args: + container: application container + + Returns: + bool of pebble plan validity + """ + try: + plan = container.get_plan().to_dict() + return bool(plan and plan["services"].get(self.name, {}).get("on-check-failure")) + except pebble.ConnectionError: + return False + def _process_env_file(self, event): """Process env file attached by user. @@ -289,6 +333,15 @@ def _update(self, event): "startup": "enabled", "override": "replace", "environment": context, + "on-check-failure": {"up": "ignore"}, + } + }, + "checks": { + "up": { + "override": "replace", + "level": "alive", + "period": "10s", + "exec": {"command": "python check_status.py"}, } }, } @@ -296,9 +349,7 @@ def _update(self, event): container.add_layer(self.name, pebble_layer, combine=True) container.replan() - self.unit.status = ActiveStatus( - f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}" - ) + self.unit.status = MaintenanceStatus("replanning application") def convert_env_var(config_var, prefix="TWC_"): @@ -327,6 +378,7 @@ def _setup_container(container: Container, proxy: str): """ resources_path = Path(__file__).parent / "resources" _push_container_file(container, resources_path, "/worker.py", resources_path / "worker.py") + _push_container_file(container, resources_path, "/check_status.py", resources_path / "check_status.py") _push_container_file( container, resources_path, "/worker-dependencies.txt", resources_path / "worker-dependencies.txt" ) diff --git a/src/resources/check_status.py b/src/resources/check_status.py new file mode 100644 index 0000000..10a20b0 --- /dev/null +++ b/src/resources/check_status.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Temporal worker status checker.""" + + +import logging +import sys + +logger = logging.getLogger(__name__) + + +def check_worker_status(): + """Check Temporal worker status by reading status file.""" + try: + with open("worker_status.txt", "r") as status_file: + status = status_file.read().strip() + logger.info(f"Async status: {status}") + + if "Error" in status: + exit_code = 1 + else: + exit_code = 0 + except FileNotFoundError: + logger.error("Status file not found. Worker is not running.") + exit_code = 1 + + sys.exit(exit_code) + + +if __name__ == "__main__": + check_worker_status() diff --git a/src/resources/worker.py b/src/resources/worker.py index c01eee3..cc8d748 100644 --- a/src/resources/worker.py +++ b/src/resources/worker.py @@ -104,50 +104,58 @@ async def run_worker(unpacked_file_name, module_name): queue=os.getenv("TWC_QUEUE"), ) - workflows = _import_modules( - "workflows", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","), - ) - activities = _import_modules( - "activities", - unpacked_file_name=unpacked_file_name, - module_name=module_name, - supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","), - ) + try: + workflows = _import_modules( + "workflows", + unpacked_file_name=unpacked_file_name, + module_name=module_name, + supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","), + ) + activities = _import_modules( + "activities", + unpacked_file_name=unpacked_file_name, + module_name=module_name, + supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","), + ) - if os.getenv("TWC_TLS_ROOT_CAS").strip() != "": - client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") + if os.getenv("TWC_TLS_ROOT_CAS").strip() != "": + client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS") - if os.getenv("TWC_AUTH_PROVIDER").strip() != "": - client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()) + if os.getenv("TWC_AUTH_PROVIDER").strip() != "": + client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()) - if os.getenv("TWC_ENCRYPTION_KEY").strip() != "": - client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True) + if os.getenv("TWC_ENCRYPTION_KEY").strip() != "": + client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True) - worker_opt = None - dsn = os.getenv("TWC_SENTRY_DSN").strip() - if dsn != "": - sentry = SentryOptions( - dsn=dsn, - release=os.getenv("TWC_SENTRY_RELEASE").strip() or None, - environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None, - redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"), - ) + worker_opt = None + dsn = os.getenv("TWC_SENTRY_DSN").strip() + if dsn != "": + sentry = SentryOptions( + dsn=dsn, + release=os.getenv("TWC_SENTRY_RELEASE").strip() or None, + environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None, + redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"), + ) - worker_opt = WorkerOptions(sentry=sentry) + worker_opt = WorkerOptions(sentry=sentry) - client = await Client.connect(client_config) + client = await Client.connect(client_config) - worker = Worker( - client=client, - task_queue=os.getenv("TWC_QUEUE"), - workflows=workflows, - activities=activities, - worker_opt=worker_opt, - ) - await worker.run() + worker = Worker( + client=client, + task_queue=os.getenv("TWC_QUEUE"), + workflows=workflows, + activities=activities, + worker_opt=worker_opt, + ) + + with open("worker_status.txt", "w") as status_file: + status_file.write("Success") + await worker.run() + except Exception as e: + # If an error occurs, write the error message to the status file + with open("worker_status.txt", "w") as status_file: + status_file.write(f"Error: {e}") if __name__ == "__main__": # pragma: nocover diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 991f758..8022da8 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -50,4 +50,4 @@ async def deploy(ops_test: OpsTest): url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233) await ops_test.model.applications[APP_NAME].set_config({"host": url}) - await attach_worker_resource_file(ops_test) + await attach_worker_resource_file(ops_test, rsc_type="workflows") diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index a841314..eb4303b 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -38,7 +38,7 @@ async def run_sample_workflow(ops_test: OpsTest): url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233) logger.info("running workflow on app address: %s", url) - client = await Client.connect(Options(host=url, queue=WORKER_CONFIG["queue"], namespace="default")) + client = await Client.connect(Options(host=url, queue=WORKER_CONFIG["queue"], namespace=WORKER_CONFIG["namespace"])) # Execute workflow name = "Jean-luc" @@ -113,16 +113,17 @@ async def scale(ops_test: OpsTest, app, units): """ await ops_test.model.applications[app].scale(scale=units) - # Wait for model to settle - await ops_test.model.wait_for_idle( - apps=[app], - status="active", - idle_period=30, - raise_on_error=False, - raise_on_blocked=True, - timeout=300, - wait_for_exact_units=units, - ) + async with ops_test.fast_forward(): + # Wait for model to settle + await ops_test.model.wait_for_idle( + apps=[app], + status="active", + idle_period=30, + raise_on_error=False, + raise_on_blocked=True, + timeout=600, + wait_for_exact_units=units, + ) assert len(ops_test.model.applications[app].units) == units diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index b4d51f2..e2f2637 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -24,5 +24,5 @@ async def test_basic_client(self, ops_test: OpsTest): await run_sample_workflow(ops_test) async def test_invalid_env_file(self, ops_test: OpsTest): - """Connects a client and runs a basic Temporal workflow.""" + """Attaches an invalid .env file to the worker.""" await attach_worker_resource_file(ops_test, rsc_type="env-file") diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 5fab610..9be9ed9 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -8,13 +8,47 @@ import json from unittest import TestCase, mock -from ops.model import ActiveStatus, BlockedStatus +from ops import Container +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus +from ops.pebble import CheckStatus from ops.testing import Harness from charm import TemporalWorkerK8SOperatorCharm from state import State CONTAINER_NAME = "temporal-worker" +CONFIG = { + "log-level": "debug", + "host": "test-host", + "namespace": "test-namespace", + "queue": "test-queue", + "supported-workflows": "all", + "supported-activities": "all", + "sentry-dsn": "", + "sentry-release": "", + "sentry-environment": "", + "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", + "encryption-key": "", + "auth-provider": "candid", + "tls-root-cas": "", + "candid-url": "test-url", + "candid-username": "test-username", + "candid-public-key": "test-public-key", + "candid-private-key": "test-private-key", + "oidc-auth-type": "", + "oidc-project-id": "", + "oidc-private-key-id": "", + "oidc-private-key": "", + "oidc-client-email": "", + "oidc-client-id": "", + "oidc-auth-uri": "", + "oidc-token-uri": "", + "oidc-auth-cert-url": "", + "oidc-client-cert-url": "", + "http-proxy": "proxy", + "https-proxy": "proxy", + "no-proxy": "none", +} class TestCharm(TestCase): @@ -60,39 +94,7 @@ def test_ready(self, _process_wheel_file, _setup_container): """The charm is blocked without a admin:temporal relation with a ready schema.""" harness = self.harness - config = { - "log-level": "debug", - "host": "test-host", - "namespace": "test-namespace", - "queue": "test-queue", - "supported-workflows": "all", - "supported-activities": "all", - "sentry-dsn": "", - "sentry-release": "", - "sentry-environment": "", - "workflows-file-name": "python_samples-1.1.0-py3-none-any.whl", - "encryption-key": "", - "auth-provider": "candid", - "tls-root-cas": "", - "candid-url": "test-url", - "candid-username": "test-username", - "candid-public-key": "test-public-key", - "candid-private-key": "test-private-key", - "oidc-auth-type": "", - "oidc-project-id": "", - "oidc-private-key-id": "", - "oidc-private-key": "", - "oidc-client-email": "", - "oidc-client-id": "", - "oidc-auth-uri": "", - "oidc-token-uri": "", - "oidc-auth-cert-url": "", - "oidc-client-cert-url": "", - "http-proxy": "proxy", - "https-proxy": "proxy", - "no-proxy": "none", - } - state = simulate_lifecycle(harness, config) + state = simulate_lifecycle(harness, CONFIG) harness.charm.on.config_changed.emit() module_name = json.loads(state["module_name"]) @@ -144,6 +146,7 @@ def test_ready(self, _process_wheel_file, _setup_container): "HTTPS_PROXY": "proxy", "NO_PROXY": "none", }, + "on-check-failure": {"up": "ignore"}, } }, } @@ -155,11 +158,47 @@ def test_ready(self, _process_wheel_file, _setup_container): self.assertTrue(service.is_running()) # The ActiveStatus is set. + self.assertEqual(harness.model.unit.status, MaintenanceStatus("replanning application")) + + @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") + @mock.patch("charm._setup_container") + @mock.patch.object(Container, "exec") + def test_update_status_up(self, _process_wheel_file, _setup_container, mock_exec): + """The charm updates the unit status to active based on UP status.""" + harness = self.harness + mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=("", None))) + + simulate_lifecycle(harness, CONFIG) + self.harness.container_pebble_ready(CONTAINER_NAME) + + container = harness.model.unit.get_container(CONTAINER_NAME) + container.get_check = mock.Mock(status="up") + container.get_check.return_value.status = CheckStatus.UP + harness.charm.on.update_status.emit() + self.assertEqual( harness.model.unit.status, - ActiveStatus(f"worker listening to namespace {config['namespace']!r} on queue {config['queue']!r}"), + ActiveStatus(f"worker listening to namespace {CONFIG['namespace']!r} on queue {CONFIG['queue']!r}"), ) + @mock.patch("charm.TemporalWorkerK8SOperatorCharm._process_wheel_file") + @mock.patch("charm._setup_container") + @mock.patch.object(Container, "exec") + def test_update_status_down(self, _process_wheel_file, _setup_container, mock_exec): + """The charm updates the unit status to maintenance based on DOWN status.""" + harness = self.harness + mock_exec.return_value = mock.MagicMock(wait_output=mock.MagicMock(return_value=1)) + + simulate_lifecycle(harness, CONFIG) + self.harness.container_pebble_ready(CONTAINER_NAME) + + container = harness.model.unit.get_container(CONTAINER_NAME) + container.get_check = mock.Mock(status="up") + container.get_check.return_value.status = CheckStatus.DOWN + harness.charm.on.update_status.emit() + + self.assertEqual(harness.model.unit.status, MaintenanceStatus("Status check: DOWN")) + def simulate_lifecycle(harness, config): """Simulate a healthy charm life-cycle. diff --git a/tox.ini b/tox.ini index 3b2ebd1..8862aaa 100644 --- a/tox.ini +++ b/tox.ini @@ -97,11 +97,12 @@ deps = ipdb==0.13.9 juju==3.2.0.1 pytest==7.1.3 - pytest-operator==0.22.0 - temporal-lib-py==1.1.2 + pytest-operator==0.31.1 + temporal-lib-py==1.3.1 + pytest-asyncio==0.21 -r{toxinidir}/requirements.txt commands = - pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} + pytest {[vars]tst_path}integration/test_charm.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode [testenv:integration-scaling] description = Run scaling integration tests @@ -109,11 +110,12 @@ deps = ipdb==0.13.9 juju==3.2.0.1 pytest==7.1.3 - pytest-operator==0.22.0 - temporal-lib-py==1.1.2 + pytest-operator==0.31.1 + temporal-lib-py==1.3.1 + pytest-asyncio==0.21 -r{toxinidir}/requirements.txt commands = - pytest {[vars]tst_path}integration/test_scaling.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} + pytest {[vars]tst_path}integration/test_scaling.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode [testenv:integration-upgrades] description = Run upgrades integration tests @@ -121,8 +123,9 @@ deps = ipdb==0.13.9 juju==3.2.0.1 pytest==7.1.3 - pytest-operator==0.22.0 - temporal-lib-py==1.1.2 + pytest-operator==0.31.1 + temporal-lib-py==1.3.1 + pytest-asyncio==0.21 -r{toxinidir}/requirements.txt commands = - pytest {[vars]tst_path}integration/test_upgrades.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} + pytest {[vars]tst_path}integration/test_upgrades.py -v --tb native --ignore={[vars]tst_path}unit --log-cli-level=INFO -s {posargs} --destructive-mode