diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index cc9a4f5..2a58a6b 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -29,7 +29,18 @@ jobs: role-to-assume: arn:aws:iam::959243851260:role/github-action-prefect-integration role-session-name: s3access aws-region: us-west-2 - - name: Test Prefect <> Anyscale Integration + - name: Test Prefect <> Anyscale Integration (AWS secret manager) + env: + ANYSCALE_HOST: ${{ secrets.ANYSCALE_HOST }} + ANYSCALE_CLI_TOKEN: ${{ secrets.ANYSCALE_CLI_TOKEN }} + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }} + run: | + pip install -e . + envsubst < ci/prefect-agent-service-awssecrets-ci.yaml > /tmp/prefect-agent-service-awssecrets.out + anyscale service deploy /tmp/prefect-agent-service-awssecrets.out + PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py --queue test-awssecrets + - name: Test Prefect <> Anyscale Integration (PREFECT_API_KEY) env: ANYSCALE_HOST: ${{ secrets.ANYSCALE_HOST }} ANYSCALE_CLI_TOKEN: ${{ secrets.ANYSCALE_CLI_TOKEN }} @@ -39,5 +50,5 @@ jobs: pip install -e . envsubst < ci/prefect-agent-service-ci.yaml > /tmp/prefect-agent-service.out anyscale service deploy /tmp/prefect-agent-service.out - PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py + PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py --queue test PYTHONPATH=$PYTHONPATH:. python ci/test_ray_job_integration.py diff --git a/README.md b/README.md index c7bda1a..47fc165 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,49 @@ do not use the `RayTaskRunner(address="ray://...")` or `RayTaskRunner(address="a cause various issues (version mismatches between client and cluster, loosing connection, slower data transfer and API calls between client and server etc). -## Production Setup +## Running Anyscale Jobs as part of a larger Prefect flow + +You can run Anyscale Jobs as part of a Prefect flow like this: +```python +import os +import subprocess +import tempfile +import yaml + +from prefect import flow, task, get_run_logger + +@task +def execute_anyscale_job(args): + job_config = { + "name": "my-anyscale-job", + "description": "An Anyscale Job submitted from Prefect.", + "cluster_env": "default_cluster_env_2.3.1_py39", + "runtime_env": { + "working_dir": ".", + "upload_path": "", + }, + "entrypoint": "python my_job_script.py " + " ".join([f"--{key} {val}" for key, val in args.items()]), + } + + with tempfile.NamedTemporaryFile(mode="w") as f: + yaml.dump(job_config, f) + f.flush() + # Submit an Anyscale Job from Prefect and record the logs + output = subprocess.check_output( + ["anyscale", "job", "submit", f.name, "--follow"] + ) + logger = get_run_logger() + logger.info("Anyscale Job output: " + output.decode()) + +@flow +def flow_with_anyscale_job(): + execute_anyscale_job.submit({"arg": "value"}) + +if __name__ == "__main__": + flow_with_anyscale_job() +``` + +## Using Anyscale as the compute infrastructure for Prefect workloads This repository is providing an integration between Anyscale and Prefect for production scenarios, where you want to submit your experiments from the Prefect UI and have them run in Anyscale. It uses @@ -63,13 +105,18 @@ We now need to create an Anyscale Service file for deploying the Anyscale Prefec ```bash prefect config view --hide-sources ``` -and create a `prefect-agent-service.yaml` file where you fill in the information just displayed in place of the `...`: +and create a `prefect-agent-service.yaml` file where you **fill in the information** displayed above in place of the `...`: ```yaml name: prefect-agent -entrypoint: pip install prefect-anyscale && PREFECT_API_URL="https://api.prefect.cloud/api/accounts/..." PREFECT_API_KEY="..." python start_anyscale_service.py --queue test -runtime_env: - working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip -healthcheck_url: "/healthcheck" +ray_serve_config: + import_path: start_anyscale_service:entrypoint + runtime_env: + env_vars: + PREFECT_API_URL: "https://api.prefect.cloud/api/accounts/..." + PREFECT_API_KEY: "..." + ANYSCALE_PREFECT_QUEUE: test + pip: ["prefect-anyscale"] + working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.2.1.zip ``` **NOTE**: This will store your Prefect API token in the service @@ -116,4 +163,71 @@ You can now schedule new runs with this deployment from the Prefect UI ![submit prefect run](./doc/prefect_submit_run.png) -and it will be executed as an Anyscale Job on an autoscaling Ray Cluster which has the same setup as the development setup described above. \ No newline at end of file +and it will be executed as an Anyscale Job on an autoscaling Ray Cluster which has the same setup as the development setup described above. + +#### Overriding properties of the infra block + +You can override properties of the Anyscale infra block in a deployment like this + +```python +import prefect +from prefect.filesystems import S3 +from prefect_anyscale import AnyscaleJob + +from prefect_test import count_to + +deployment = prefect.deployments.Deployment.build_from_flow( + flow=count_to, + name="prefect_test_custom", + work_queue_name="test", + storage=S3.load("test-storage"), + infrastructure=AnyscaleJob.load("test-infra"), + infra_overrides={"compute_config": "test-compute-config"} +) +deployment.apply() +``` +#### Using the AWS Secrets Manager for storing the PREFECT_API_KEY + +We recommend using the AWS Secrets Manager for storing your PREFECT_API_KEY token. Store your +`PREFECT_API_KEY` secret as a Plaintext secret (not Key/value) like the following + +![create prefect secret](./doc/prefect_api_key_secret.png) + +and add the following policy to your `-cluster_node_role` role: + +``` +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "secretsmanager:GetSecretValue", + "Resource": "" + } + ] +} +``` + +You can then run the agent by specifying a `ANYSCALE_PREFECT_AWS_SECRET_ID` and +`ANYSCALE_PREFECT_AWS_REGION` in your configuration yaml instead of the `PREFECT_API_KEY`, +see the `ci/prefect-agent-service-awssecrets-ci.yaml` file in this repository for an example. + +### Using your own Prefect Agent + +If you already have a setup with an existing Prefect agent working, you can use that agent +to run the Prefect Anyscale integration. + +First make sure you +- Have the `prefect_anyscale` package installed in the Prefect Agent's environment and +- Are logged into Prefect or have set the `PREFECT_API_URL` and `PREFECT_API_KEY` environment +variables and +- Are logged into Anyscale or have set the `ANYSCALE_HOST` and `ANYSCALE_CLI_TOKEN` environment +variables + +Then start the agent with +``` +PREFECT_EXTRA_ENTRYPOINTS=prefect_anyscale prefect agent start -q +``` + +The agent will listen to new work on the specified queue and will execute flows that run with +the `AnyscaleJob` infra as Anyscale Jobs. diff --git a/ci/anyscale_job.py b/ci/anyscale_job.py new file mode 100644 index 0000000..ced9555 --- /dev/null +++ b/ci/anyscale_job.py @@ -0,0 +1,15 @@ +import argparse +import ray + +parser = argparse.ArgumentParser() +parser.add_argument('--arg', type=str) +args = parser.parse_args() + +@ray.remote +def f(arg): + return "Argument is '" + arg + "'" + +args = ["This", "is", "a", "test"] + [args.arg] +results = ray.get([f.remote(arg) for arg in args]) + +print("\n".join(results)) \ No newline at end of file diff --git a/ci/complex_flow.py b/ci/complex_flow.py new file mode 100644 index 0000000..2b660a5 --- /dev/null +++ b/ci/complex_flow.py @@ -0,0 +1,55 @@ +import os +import subprocess +import tempfile +import yaml + +import ray + +from prefect import flow, task, get_run_logger +from prefect_ray import RayTaskRunner + +# This custom resource will cause another node to be added +# to the cluster so we can test that the working directory +# is indeed synced to all nodes in the cluster. +@ray.remote(resources={"custom_resource": 1}) +def remote_task(): + from ci.test_python_file import test + return test() + +@task +def test_task(): + return ray.get(remote_task.remote()) + +@task +def anyscale_job(args): + job_config = { + "name": "my-anyscale-job", + "cloud": "anyscale_v2_default_cloud", + "description": "An Anyscale Job submitted from Prefect.", + "cluster_env": "default_cluster_env_2.3.1_py39", + "runtime_env": { + "working_dir": "ci/", + "upload_path": "s3://anyscale-prefect-integration-test/working-dir/", + }, + "entrypoint": "python anyscale_job.py " + " ".join([f"--{key} {val}" for key, val in args.items()]), + } + + with tempfile.NamedTemporaryFile(mode="w") as f: + yaml.dump(job_config, f) + f.flush() + # Submit an Anyscale Job from Prefect and record the logs + output = subprocess.check_output( + ["anyscale", "job", "submit", f.name, "--follow"] + ) + logger = get_run_logger() + logger.info("Anyscale Job output: " + output.decode()) + +@flow(task_runner=RayTaskRunner) +def complex_flow(): + result = test_task.submit() + assert result.result() == 42 + result = anyscale_job.submit({"arg": "value"}) + assert result.result() == None + +if __name__ == "__main__": + complex_flow() diff --git a/ci/prefect-agent-service-awssecrets-ci.yaml b/ci/prefect-agent-service-awssecrets-ci.yaml new file mode 100644 index 0000000..ff4476e --- /dev/null +++ b/ci/prefect-agent-service-awssecrets-ci.yaml @@ -0,0 +1,13 @@ +name: prefect-agent-awssecrets-28 +cloud: "anyscale_v2_default_cloud" +ray_serve_config: + import_path: start_anyscale_service:entrypoint + runtime_env: + env_vars: + PREFECT_API_URL: $PREFECT_API_URL + ANYSCALE_PREFECT_AWS_SECRET_ID: prefect-api-key + ANYSCALE_PREFECT_AWS_REGION: us-west-2 + ANYSCALE_PREFECT_QUEUE: test-awssecrets + ANYSCALE_PREFECT_DEVELOPMENT: "1" + working_dir: . + upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/" \ No newline at end of file diff --git a/ci/prefect-agent-service-ci.yaml b/ci/prefect-agent-service-ci.yaml index 97e2390..5797c14 100644 --- a/ci/prefect-agent-service-ci.yaml +++ b/ci/prefect-agent-service-ci.yaml @@ -1,6 +1,12 @@ -name: prefect-agent-18 -entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test -runtime_env: - working_dir: . - upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/" -healthcheck_url: "/healthcheck" \ No newline at end of file +name: prefect-agent-28 +cloud: "anyscale_v2_default_cloud" +ray_serve_config: + import_path: start_anyscale_service:entrypoint + runtime_env: + env_vars: + PREFECT_API_URL: $PREFECT_API_URL + PREFECT_API_KEY: $PREFECT_API_KEY + ANYSCALE_PREFECT_QUEUE: test + ANYSCALE_PREFECT_DEVELOPMENT: "1" + working_dir: . + upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/" diff --git a/ci/submit_prefect_run_and_check.py b/ci/submit_prefect_run_and_check.py index 619aad1..0422785 100644 --- a/ci/submit_prefect_run_and_check.py +++ b/ci/submit_prefect_run_and_check.py @@ -1,3 +1,4 @@ +import argparse import asyncio import prefect.deployments @@ -7,18 +8,36 @@ from prefect_anyscale import AnyscaleJob from prefect_test import count_to +from ci.complex_flow import complex_flow + +parser = argparse.ArgumentParser() +parser.add_argument("--queue", help="prefect queue to submit to") +args = parser.parse_args() deployment = prefect.deployments.Deployment.build_from_flow( flow=count_to, name="prefect_test", - work_queue_name="test", + work_queue_name=args.queue, storage=S3.load("test-storage-github"), - infrastructure=AnyscaleJob.load("anyscale-job-infra") + infrastructure=AnyscaleJob.load("anyscale-job-infra"), + infra_overrides={"compute_config": "complex-test-compute-config"}, ) deployment.apply() flow_run = prefect.deployments.run_deployment("count-to/prefect_test", parameters={"highest_number": 5}) +complex_deployment = prefect.deployments.Deployment.build_from_flow( + flow=complex_flow, + name="prefect_complex_test", + work_queue_name=args.queue, + storage=S3.load("test-storage-github"), + infrastructure=AnyscaleJob.load("anyscale-job-infra"), + infra_overrides={"compute_config": "complex-test-compute-config"}, +) +complex_deployment.apply() + +complex_flow_run = prefect.deployments.run_deployment("complex-flow/prefect_complex_test", parameters={}) + async def wait_for_run_complete(flow_id): async with get_client() as client: while True: @@ -31,3 +50,4 @@ async def wait_for_run_complete(flow_id): await asyncio.sleep(5.0) asyncio.run(wait_for_run_complete(flow_run.id)) +asyncio.run(wait_for_run_complete(complex_flow_run.id)) diff --git a/ci/test_python_file.py b/ci/test_python_file.py new file mode 100644 index 0000000..4009b47 --- /dev/null +++ b/ci/test_python_file.py @@ -0,0 +1,2 @@ +def test(): + return 42 diff --git a/doc/prefect_api_key_secret.png b/doc/prefect_api_key_secret.png new file mode 100644 index 0000000..317fd78 Binary files /dev/null and b/doc/prefect_api_key_secret.png differ diff --git a/prefect-agent-service.yaml b/prefect-agent-service.yaml index 76b0b62..e6f5bd8 100644 --- a/prefect-agent-service.yaml +++ b/prefect-agent-service.yaml @@ -1,5 +1,12 @@ -name: prefect-agent-10 -entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test -runtime_env: - working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip -healthcheck_url: "/healthcheck" +name: prefect-agent +ray_serve_config: + import_path: start_anyscale_service:entrypoint + runtime_env: + env_vars: + PREFECT_API_URL: $PREFECT_API_URL + # Consider using a secret manager to store the PREFECT_API_KEY token + # (see instructions in README.md) + PREFECT_API_KEY: $PREFECT_API_KEY + ANYSCALE_PREFECT_QUEUE: test + pip: ["prefect-anyscale"] + working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.2.0.zip diff --git a/prefect_anyscale/__init__.py b/prefect_anyscale/__init__.py index 645f6f5..3a2272f 100644 --- a/prefect_anyscale/__init__.py +++ b/prefect_anyscale/__init__.py @@ -1,6 +1,23 @@ from prefect_anyscale.infrastructure import AnyscaleJob, RayJob + +def prefect_runtime_environment_hook(runtime_env): + + if not runtime_env: + runtime_env = {} + + # If no working_dir is specified, we use the current + # directory as the working directory -- this will be + # the directory containing the source code which is + # downloaded by the Prefect engine. + if not runtime_env.get("working_dir"): + runtime_env["working_dir"] = "." + + return runtime_env + + __all__ = [ "AnyscaleJob", "RayJob", + "prefect_runtime_environment_hook", ] diff --git a/prefect_anyscale/infrastructure.py b/prefect_anyscale/infrastructure.py index 063dd74..cf89166 100644 --- a/prefect_anyscale/infrastructure.py +++ b/prefect_anyscale/infrastructure.py @@ -56,23 +56,34 @@ async def run( api_key = env.get("PREFECT_API_KEY") flow_run_id = env.get("PREFECT__FLOW_RUN_ID") + aws_secret_id = env.get("ANYSCALE_PREFECT_AWS_SECRET_ID") + cmd = "" if api_url: cmd += "PREFECT_API_URL={}".format(api_url) - if api_key: + if aws_secret_id: + # If we use the AWS secret manager to pass the PREFECT_API_KEY + # through, we will retrieve the API key when the job gets executed. + aws_region = env.get("ANYSCALE_PREFECT_AWS_REGION") + cmd += " PREFECT_API_KEY=`aws secretsmanager get-secret-value --secret-id {} --region {} --output=text --query=SecretString`".format(aws_secret_id, aws_region) + elif api_key: + logging.warn("Your PREFECT_API_KEY is currently stored in plain text. Consider using a secret manager to store your secrets.") cmd += " PREFECT_API_KEY={}".format(api_key) if flow_run_id: cmd += " PREFECT__FLOW_RUN_ID={}".format(flow_run_id) - cmd += " /home/ray/anaconda3/bin/python -m prefect.engine" + # Install runtime environment + cmd += " RAY_RUNTIME_ENV_HOOK=prefect_anyscale.prefect_runtime_environment_hook" + + cmd += " python -m prefect.engine" # Link the Job on the Anyscale UI with the prefect flow run job_name = "prefect-job-" + flow_run_id content = """ - name: "{}" - entrypoint: "{}" - """.format(job_name, cmd) +name: "{}" +entrypoint: "{}" +""".format(job_name, cmd) if self.compute_config: content += 'compute_config: "{}"\n'.format(self.compute_config) @@ -80,15 +91,15 @@ async def run( if self.cluster_env: content += 'cluster_env: "{}"\n'.format(self.cluster_env) + if task_status: + task_status.started(job_name) + with tempfile.NamedTemporaryFile(mode="w") as f: f.write(content) f.flush() logging.info(f"Submitting Anyscale Job with configuration '{content}'") returncode = subprocess.check_call(["anyscale", "job", "submit", f.name]) - if task_status: - task_status.started(job_name) - return AnyscaleJobResult( status_code=returncode, identifier="" ) diff --git a/setup.py b/setup.py index 1d5f359..90c0e57 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="prefect-anyscale", - version="0.1.0", + version="0.2.1", description="Prefect integrations with Anyscale.", license="Apache License 2.0", author="Anyscale, Inc.", diff --git a/start_anyscale_service.py b/start_anyscale_service.py index 7c22d86..6a25e94 100644 --- a/start_anyscale_service.py +++ b/start_anyscale_service.py @@ -1,37 +1,58 @@ -import argparse +import logging import os import subprocess from fastapi import FastAPI from ray import serve -parser = argparse.ArgumentParser() -parser.add_argument("--queue", type=str) -args = parser.parse_args() +def get_prefect_secret_environment(): + # We retrieve the PREFECT_API_KEY from the secret store so + # we can pass it to the prefect agent since the agent will + # need it to connect to the prefect control plane. + aws_secret_id = os.environ.get("ANYSCALE_PREFECT_AWS_SECRET_ID") + if aws_secret_id: + import boto3 + client = boto3.client( + "secretsmanager", region_name=os.environ["ANYSCALE_PREFECT_AWS_REGION"] + ) + response = client.get_secret_value(SecretId=aws_secret_id) + return { + "PREFECT_API_KEY": response["SecretString"], + "ANYSCALE_PREFECT_AWS_SECRET_ID": aws_secret_id, + "ANYSCALE_PREFECT_AWS_REGION": os.environ["ANYSCALE_PREFECT_AWS_REGION"], + } + else: + logging.warn("Your PREFECT_API_KEY is currently stored in plain text. Consider using a secret manager to store your secrets.") + return { + "PREFECT_API_KEY": os.environ["PREFECT_API_KEY"] + } serve.start(detached=True) app = FastAPI() -@serve.deployment(route_prefix="/", num_replicas=1) +@serve.deployment(route_prefix="/", num_replicas=1, health_check_period_s=10, health_check_timeout_s=30) @serve.ingress(app) class PrefectAgentDeployment: def __init__(self, prefect_env): self.agent = subprocess.Popen( - ["prefect", "agent", "start", "-q", args.queue], + ["prefect", "agent", "start", "-q", prefect_env["ANYSCALE_PREFECT_QUEUE"]], env=dict(os.environ, **prefect_env), ) - @app.get("/healthcheck") - def healthcheck(self): + def check_health(self): poll = self.agent.poll() if poll is None: return else: raise RuntimeError("Prefect agent died") -serve.run(PrefectAgentDeployment.bind({ +if os.environ.get("ANYSCALE_PREFECT_DEVELOPMENT", "0") == "1": + subprocess.check_call(["pip", "install", "-e", "."]) + +entrypoint = PrefectAgentDeployment.bind({ "PREFECT_API_URL": os.environ["PREFECT_API_URL"], - "PREFECT_API_KEY": os.environ["PREFECT_API_KEY"], "PREFECT_EXTRA_ENTRYPOINTS": "prefect_anyscale", -})) + "ANYSCALE_PREFECT_QUEUE": os.environ["ANYSCALE_PREFECT_QUEUE"], + **get_prefect_secret_environment() +})