Skip to content

Commit

Permalink
Merge branch 'main' into ray-job-support
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored Oct 2, 2023
2 parents 1d274eb + f9cc8fb commit 5d86d46
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 42 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ jobs:
role-to-assume: arn:aws:iam::959243851260:role/github-action-prefect-integration
role-session-name: s3access
aws-region: us-west-2
- name: Test Prefect <> Anyscale Integration
- name: Test Prefect <> Anyscale Integration (AWS secret manager)
env:
ANYSCALE_HOST: ${{ secrets.ANYSCALE_HOST }}
ANYSCALE_CLI_TOKEN: ${{ secrets.ANYSCALE_CLI_TOKEN }}
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: |
pip install -e .
envsubst < ci/prefect-agent-service-awssecrets-ci.yaml > /tmp/prefect-agent-service-awssecrets.out
anyscale service deploy /tmp/prefect-agent-service-awssecrets.out
PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py --queue test-awssecrets
- name: Test Prefect <> Anyscale Integration (PREFECT_API_KEY)
env:
ANYSCALE_HOST: ${{ secrets.ANYSCALE_HOST }}
ANYSCALE_CLI_TOKEN: ${{ secrets.ANYSCALE_CLI_TOKEN }}
Expand All @@ -39,5 +50,5 @@ jobs:
pip install -e .
envsubst < ci/prefect-agent-service-ci.yaml > /tmp/prefect-agent-service.out
anyscale service deploy /tmp/prefect-agent-service.out
PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py
PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py --queue test
PYTHONPATH=$PYTHONPATH:. python ci/test_ray_job_integration.py
128 changes: 121 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,49 @@ do not use the `RayTaskRunner(address="ray://...")` or `RayTaskRunner(address="a
cause various issues (version mismatches between client and cluster, loosing connection, slower data transfer and API
calls between client and server etc).

## Production Setup
## Running Anyscale Jobs as part of a larger Prefect flow

You can run Anyscale Jobs as part of a Prefect flow like this:
```python
import os
import subprocess
import tempfile
import yaml

from prefect import flow, task, get_run_logger

@task
def execute_anyscale_job(args):
job_config = {
"name": "my-anyscale-job",
"description": "An Anyscale Job submitted from Prefect.",
"cluster_env": "default_cluster_env_2.3.1_py39",
"runtime_env": {
"working_dir": ".",
"upload_path": "<path to your S3 bucket where the code should be stored>",
},
"entrypoint": "python my_job_script.py " + " ".join([f"--{key} {val}" for key, val in args.items()]),
}

with tempfile.NamedTemporaryFile(mode="w") as f:
yaml.dump(job_config, f)
f.flush()
# Submit an Anyscale Job from Prefect and record the logs
output = subprocess.check_output(
["anyscale", "job", "submit", f.name, "--follow"]
)
logger = get_run_logger()
logger.info("Anyscale Job output: " + output.decode())

@flow
def flow_with_anyscale_job():
execute_anyscale_job.submit({"arg": "value"})

if __name__ == "__main__":
flow_with_anyscale_job()
```

## Using Anyscale as the compute infrastructure for Prefect workloads

This repository is providing an integration between Anyscale and Prefect for production scenarios, where you
want to submit your experiments from the Prefect UI and have them run in Anyscale. It uses
Expand Down Expand Up @@ -63,13 +105,18 @@ We now need to create an Anyscale Service file for deploying the Anyscale Prefec
```bash
prefect config view --hide-sources
```
and create a `prefect-agent-service.yaml` file where you fill in the information just displayed in place of the `...`:
and create a `prefect-agent-service.yaml` file where you **fill in the information** displayed above in place of the `...`:
```yaml
name: prefect-agent
entrypoint: pip install prefect-anyscale && PREFECT_API_URL="https://api.prefect.cloud/api/accounts/..." PREFECT_API_KEY="..." python start_anyscale_service.py --queue test
runtime_env:
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip
healthcheck_url: "/healthcheck"
ray_serve_config:
import_path: start_anyscale_service:entrypoint
runtime_env:
env_vars:
PREFECT_API_URL: "https://api.prefect.cloud/api/accounts/..."
PREFECT_API_KEY: "..."
ANYSCALE_PREFECT_QUEUE: test
pip: ["prefect-anyscale"]
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.2.1.zip
```
**NOTE**: This will store your Prefect API token in the service
Expand Down Expand Up @@ -116,4 +163,71 @@ You can now schedule new runs with this deployment from the Prefect UI

![submit prefect run](./doc/prefect_submit_run.png)

and it will be executed as an Anyscale Job on an autoscaling Ray Cluster which has the same setup as the development setup described above.
and it will be executed as an Anyscale Job on an autoscaling Ray Cluster which has the same setup as the development setup described above.

#### Overriding properties of the infra block

You can override properties of the Anyscale infra block in a deployment like this

```python
import prefect
from prefect.filesystems import S3
from prefect_anyscale import AnyscaleJob
from prefect_test import count_to
deployment = prefect.deployments.Deployment.build_from_flow(
flow=count_to,
name="prefect_test_custom",
work_queue_name="test",
storage=S3.load("test-storage"),
infrastructure=AnyscaleJob.load("test-infra"),
infra_overrides={"compute_config": "test-compute-config"}
)
deployment.apply()
```
#### Using the AWS Secrets Manager for storing the PREFECT_API_KEY

We recommend using the AWS Secrets Manager for storing your PREFECT_API_KEY token. Store your
`PREFECT_API_KEY` secret as a Plaintext secret (not Key/value) like the following

![create prefect secret](./doc/prefect_api_key_secret.png)

and add the following policy to your `<cloud-id>-cluster_node_role` role:

```
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "<fill this out with the Secret ARN>"
}
]
}
```

You can then run the agent by specifying a `ANYSCALE_PREFECT_AWS_SECRET_ID` and
`ANYSCALE_PREFECT_AWS_REGION` in your configuration yaml instead of the `PREFECT_API_KEY`,
see the `ci/prefect-agent-service-awssecrets-ci.yaml` file in this repository for an example.

### Using your own Prefect Agent

If you already have a setup with an existing Prefect agent working, you can use that agent
to run the Prefect Anyscale integration.

First make sure you
- Have the `prefect_anyscale` package installed in the Prefect Agent's environment and
- Are logged into Prefect or have set the `PREFECT_API_URL` and `PREFECT_API_KEY` environment
variables and
- Are logged into Anyscale or have set the `ANYSCALE_HOST` and `ANYSCALE_CLI_TOKEN` environment
variables

Then start the agent with
```
PREFECT_EXTRA_ENTRYPOINTS=prefect_anyscale prefect agent start -q <your prefect queue>
```
The agent will listen to new work on the specified queue and will execute flows that run with
the `AnyscaleJob` infra as Anyscale Jobs.
15 changes: 15 additions & 0 deletions ci/anyscale_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import argparse
import ray

parser = argparse.ArgumentParser()
parser.add_argument('--arg', type=str)
args = parser.parse_args()

@ray.remote
def f(arg):
return "Argument is '" + arg + "'"

args = ["This", "is", "a", "test"] + [args.arg]
results = ray.get([f.remote(arg) for arg in args])

print("\n".join(results))
55 changes: 55 additions & 0 deletions ci/complex_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
import subprocess
import tempfile
import yaml

import ray

from prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner

# This custom resource will cause another node to be added
# to the cluster so we can test that the working directory
# is indeed synced to all nodes in the cluster.
@ray.remote(resources={"custom_resource": 1})
def remote_task():
from ci.test_python_file import test
return test()

@task
def test_task():
return ray.get(remote_task.remote())

@task
def anyscale_job(args):
job_config = {
"name": "my-anyscale-job",
"cloud": "anyscale_v2_default_cloud",
"description": "An Anyscale Job submitted from Prefect.",
"cluster_env": "default_cluster_env_2.3.1_py39",
"runtime_env": {
"working_dir": "ci/",
"upload_path": "s3://anyscale-prefect-integration-test/working-dir/",
},
"entrypoint": "python anyscale_job.py " + " ".join([f"--{key} {val}" for key, val in args.items()]),
}

with tempfile.NamedTemporaryFile(mode="w") as f:
yaml.dump(job_config, f)
f.flush()
# Submit an Anyscale Job from Prefect and record the logs
output = subprocess.check_output(
["anyscale", "job", "submit", f.name, "--follow"]
)
logger = get_run_logger()
logger.info("Anyscale Job output: " + output.decode())

@flow(task_runner=RayTaskRunner)
def complex_flow():
result = test_task.submit()
assert result.result() == 42
result = anyscale_job.submit({"arg": "value"})
assert result.result() == None

if __name__ == "__main__":
complex_flow()
13 changes: 13 additions & 0 deletions ci/prefect-agent-service-awssecrets-ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: prefect-agent-awssecrets-28
cloud: "anyscale_v2_default_cloud"
ray_serve_config:
import_path: start_anyscale_service:entrypoint
runtime_env:
env_vars:
PREFECT_API_URL: $PREFECT_API_URL
ANYSCALE_PREFECT_AWS_SECRET_ID: prefect-api-key
ANYSCALE_PREFECT_AWS_REGION: us-west-2
ANYSCALE_PREFECT_QUEUE: test-awssecrets
ANYSCALE_PREFECT_DEVELOPMENT: "1"
working_dir: .
upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/"
18 changes: 12 additions & 6 deletions ci/prefect-agent-service-ci.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
name: prefect-agent-18
entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
runtime_env:
working_dir: .
upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/"
healthcheck_url: "/healthcheck"
name: prefect-agent-28
cloud: "anyscale_v2_default_cloud"
ray_serve_config:
import_path: start_anyscale_service:entrypoint
runtime_env:
env_vars:
PREFECT_API_URL: $PREFECT_API_URL
PREFECT_API_KEY: $PREFECT_API_KEY
ANYSCALE_PREFECT_QUEUE: test
ANYSCALE_PREFECT_DEVELOPMENT: "1"
working_dir: .
upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/"
24 changes: 22 additions & 2 deletions ci/submit_prefect_run_and_check.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import asyncio

import prefect.deployments
Expand All @@ -7,18 +8,36 @@
from prefect_anyscale import AnyscaleJob

from prefect_test import count_to
from ci.complex_flow import complex_flow

parser = argparse.ArgumentParser()
parser.add_argument("--queue", help="prefect queue to submit to")
args = parser.parse_args()

deployment = prefect.deployments.Deployment.build_from_flow(
flow=count_to,
name="prefect_test",
work_queue_name="test",
work_queue_name=args.queue,
storage=S3.load("test-storage-github"),
infrastructure=AnyscaleJob.load("anyscale-job-infra")
infrastructure=AnyscaleJob.load("anyscale-job-infra"),
infra_overrides={"compute_config": "complex-test-compute-config"},
)
deployment.apply()

flow_run = prefect.deployments.run_deployment("count-to/prefect_test", parameters={"highest_number": 5})

complex_deployment = prefect.deployments.Deployment.build_from_flow(
flow=complex_flow,
name="prefect_complex_test",
work_queue_name=args.queue,
storage=S3.load("test-storage-github"),
infrastructure=AnyscaleJob.load("anyscale-job-infra"),
infra_overrides={"compute_config": "complex-test-compute-config"},
)
complex_deployment.apply()

complex_flow_run = prefect.deployments.run_deployment("complex-flow/prefect_complex_test", parameters={})

async def wait_for_run_complete(flow_id):
async with get_client() as client:
while True:
Expand All @@ -31,3 +50,4 @@ async def wait_for_run_complete(flow_id):
await asyncio.sleep(5.0)

asyncio.run(wait_for_run_complete(flow_run.id))
asyncio.run(wait_for_run_complete(complex_flow_run.id))
2 changes: 2 additions & 0 deletions ci/test_python_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def test():
return 42
Binary file added doc/prefect_api_key_secret.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 12 additions & 5 deletions prefect-agent-service.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
name: prefect-agent-10
entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
runtime_env:
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip
healthcheck_url: "/healthcheck"
name: prefect-agent
ray_serve_config:
import_path: start_anyscale_service:entrypoint
runtime_env:
env_vars:
PREFECT_API_URL: $PREFECT_API_URL
# Consider using a secret manager to store the PREFECT_API_KEY token
# (see instructions in README.md)
PREFECT_API_KEY: $PREFECT_API_KEY
ANYSCALE_PREFECT_QUEUE: test
pip: ["prefect-anyscale"]
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.2.0.zip
17 changes: 17 additions & 0 deletions prefect_anyscale/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
from prefect_anyscale.infrastructure import AnyscaleJob, RayJob


def prefect_runtime_environment_hook(runtime_env):

if not runtime_env:
runtime_env = {}

# If no working_dir is specified, we use the current
# directory as the working directory -- this will be
# the directory containing the source code which is
# downloaded by the Prefect engine.
if not runtime_env.get("working_dir"):
runtime_env["working_dir"] = "."

return runtime_env


__all__ = [
"AnyscaleJob",
"RayJob",
"prefect_runtime_environment_hook",
]
Loading

0 comments on commit 5d86d46

Please sign in to comment.