Skip to content

Commit

Permalink
Merge branch 'subsetter_argo' into develop_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
devincowan committed Feb 15, 2024
2 parents 2cb8a2d + 61fd8f0 commit 49a33e4
Show file tree
Hide file tree
Showing 24 changed files with 355 additions and 452 deletions.
28 changes: 28 additions & 0 deletions app/api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Subsetter FastAPI

A python FastAPI application that can submit the subsetter workflow templates to an argo instance updated with the subsetter workflow templates at `./argo/`.

The Dockerfile declares a python base image and installs the dependencies declared in `requirements.txt` and `requirements-dev.txt` and starts up the FastApi application at port 8000.

`subsetter/main.py` is the entrypoint to the FastAPI application, configures the routers. The file also contains a startup event hook that initialized the mongodb database with [beanie ODM](https://beanie-odm.dev/). The startup event hook also sets up a minio client for the [CUAHSI MinIO instance](https://console.minio.cuahsi.io). The minio client is used for synchronizing user specific access policies and keys/secrets.

API documentation is rendered at https://subsetter-api-jbzfw6l52q-uc.a.run.app/redoc (This will be updated to https://api.subsetter.cuahsi.io/redocs pending certificate creation). OpenAPI spec documentation is generated from the code defining the api endpoints (FastAPI) and input/output models (Pydantic).

User authentication is achieved by configuring the [fastapi_users](https://github.com/fastapi-users/fastapi-users) module with [CUAHSI SSO](https://auth.cuahsi.org/) using the `OpenID Connect` protocol. On registration a S3 bucket is created for the user on [CUAHSI MinIO](https://console.minio.cuahsi.io) (TODO: create a default quota of 5 GB). An admin may increase the quota on a case by case basis.

The Subsetter API is divided into 4 routers defined at `subsetter/app/routers/`.

## Routers
### Access Control Router
The `access_control` router contains prototyped synchronization of view/edit access to paths on MinIO that have a HydroShare resource that references a path on the CUAHSI MinIO instance. In the [mongo_discovery-access-control](https://github.com/hydroshare/hydroshare/compare/develop...mongo-discovery-access-control) HydroShare branch, event hooks are created for exporting Resource and User access to a mongo database. This mongo database is accessed to look up the resources which a user has view/edit privileges and generates the view/edit policies that are assigned to the user on CUAHSI MinIO storage. This means a path in a user's bucket may be registered on HydroShare and enjoy the same access control capabilities of a HydroShare Composite Resource.

### Argo Router
Contains the api endpoints for submitting a subsetter workflow, tracking submissions, and generating a presigned download url to the resulting datasets.

### Discovery Router
A copy of the IGUIDE discovery router that includes endpoints for searching resource metadata. The Subsetter workflows run the hydroshare metadata extraction tool to extract metadata the same metadata that a HydroShare composite resource will extract from recognized file formats. The resulting metadata can then be written to the Discovery database on Atlas. TODO: collect the metadata extracted from subsetter outputs into a discovery database.

### Storage Router
Contins the endpoints to generate presigned urls for PUT and GET of objects on S3. This is not currently used but could be used to create a resource landing page for resources stored on S3 equivalent to a resource on HydroShare.


3 changes: 3 additions & 0 deletions app/api/subsetter/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class Submission(BaseModel):
finishedAt: Optional[str] = None
estimatedDuration: Optional[int] = None

def output_path(self, base_path):
return f"{base_path}/{self.workflow_name}/{self.workflow_id}"


class User(BeanieBaseUser, Document):
oauth_accounts: List[OAuthAccount] = Field(default_factory=list)
Expand Down
2 changes: 1 addition & 1 deletion app/api/subsetter/app/routers/argo/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from subsetter.app.routers.argo.router import router
from .router import router
85 changes: 54 additions & 31 deletions app/api/subsetter/app/routers/argo/router.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import logging as log
import tempfile
import uuid
from typing import Annotated
from typing import Annotated, Any

import argo_workflows
import google.cloud.logging as logging
from argo_workflows.api import workflow_service_api
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel

from subsetter.app.db import Submission, User
from subsetter.app.models import (
Expand All @@ -28,6 +30,7 @@
router = APIRouter()

NAMESPACE = 'workflows'
OUTPUT_BASE_PATH = "argo_workflows"

configuration = argo_workflows.Configuration(host=get_settings().argo_host)
configuration.api_key['BearerToken'] = get_settings().argo_bearer_token
Expand All @@ -36,14 +39,14 @@
api_instance = workflow_service_api.WorkflowServiceApi(api_client)


def parflow_submission_body(hucs: list, bucket_name: str, workflow_name: str):
def parflow_submission_body(hucs: list, bucket_name: str, workflow_name: str, output_path):
return {
"resourceKind": "WorkflowTemplate",
"resourceName": "parflow-subset-v1-by-huc-minio",
"submitOptions": {
"name": workflow_name,
"parameters": [
f"output-path=argo_workflows/parflow/{workflow_name}",
f"output-path={output_path}",
f"output-bucket={bucket_name}",
"hucs=" + ",".join(hucs),
],
Expand All @@ -52,7 +55,7 @@ def parflow_submission_body(hucs: list, bucket_name: str, workflow_name: str):


def nwm1_submission_body(
y_south: float, x_west: float, y_north: float, x_east: float, bucket_name: str, workflow_name: str
y_south: float, x_west: float, y_north: float, x_east: float, bucket_name: str, workflow_name: str, output_path: str
):
return {
"resourceKind": "WorkflowTemplate",
Expand All @@ -61,7 +64,7 @@ def nwm1_submission_body(
"name": workflow_name,
"parameters": [
f"output-bucket={bucket_name}",
f"output-path=argo_workflows/nwm1/{workflow_name}",
f"output-path={output_path}",
f"y_south={y_south}",
f"x_west={x_west}",
f"y_north={y_north}",
Expand All @@ -72,7 +75,7 @@ def nwm1_submission_body(


def nwm2_submission_body(
y_south: float, x_west: float, y_north: float, x_east: float, bucket_name: str, workflow_name: str
y_south: float, x_west: float, y_north: float, x_east: float, bucket_name: str, workflow_name: str, output_path: str
):
return {
"resourceKind": "WorkflowTemplate",
Expand All @@ -81,7 +84,7 @@ def nwm2_submission_body(
"name": workflow_name,
"parameters": [
f"output-bucket={bucket_name}",
f"output-path=argo_workflows/nwm2/{workflow_name}",
f"output-path={output_path}",
f"y_south={y_south}",
f"x_west={x_west}",
f"y_north={y_north}",
Expand All @@ -91,17 +94,12 @@ def nwm2_submission_body(
}


def metadata_extraction_submission_body(bucket_key: str, path_key: str, workflow_name: str):
def metadata_extraction_submission_body(bucket: str, input_path: str, output_path: str):
return {
"resourceKind": "WorkflowTemplate",
"resourceName": "metadata-extractor",
"submitOptions": {
"name": workflow_name,
"parameters": [
f"job-id={workflow_name}",
f"bucket={bucket_key}",
f"path={path_key}",
],
"parameters": [f"bucket={bucket}", f"input-path={input_path}", f"output-path={output_path}"],
},
}

Expand All @@ -111,13 +109,13 @@ async def submit_parflow(
hucs: Annotated[list[str] | None, Query()], user: User = Depends(current_active_user)
) -> SubmissionResponseModel:
workflow_id = str(uuid.uuid4())
submission = Submission(workflow_id=workflow_id, workflow_name="parflow")
api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
body=parflow_submission_body(hucs, user.bucket_name, workflow_id),
body=parflow_submission_body(hucs, user.bucket_name, workflow_id, submission.output_path(OUTPUT_BASE_PATH)),
_preload_content=False,
)
log.info(api_response.json())
submission = Submission(workflow_id=workflow_id, workflow_name="parflow")
return await upsert_submission(user, submission)


Expand All @@ -127,13 +125,15 @@ async def submit_nwm1(
) -> SubmissionResponseModel:
# y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
workflow_id = str(uuid.uuid4())
submission = Submission(workflow_id=workflow_id, workflow_name="nwm1")
api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
body=nwm1_submission_body(y_south, x_west, y_north, x_east, user.bucket_name, workflow_id),
body=nwm1_submission_body(
y_south, x_west, y_north, x_east, user.bucket_name, workflow_id, submission.output_path(OUTPUT_BASE_PATH)
),
_preload_content=False,
)
log.info(api_response.json())
submission = Submission(workflow_id=workflow_id, workflow_name="nwm1")
return await upsert_submission(user, submission)


Expand All @@ -143,16 +143,50 @@ async def submit_nwm2(
) -> SubmissionResponseModel:
# y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
workflow_id = str(uuid.uuid4())
submission = Submission(workflow_id=workflow_id, workflow_name="nwm2")
api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
body=nwm2_submission_body(y_south, x_west, y_north, x_east, user.bucket_name, workflow_id),
body=nwm2_submission_body(
y_south, x_west, y_north, x_east, user.bucket_name, workflow_id, submission.output_path(OUTPUT_BASE_PATH)
),
_preload_content=False,
)
log.info(api_response.json())
submission = Submission(workflow_id=workflow_id, workflow_name="nwm2")
return await upsert_submission(user, submission)


class ExtractMetadataRequestBody(BaseModel):
workflow_id: str
metadata: Any = None


@router.post('/extract/metadata')
async def extract_metadata(metadata_request: ExtractMetadataRequestBody, user: User = Depends(current_active_user)):
submission = next(
submission for submission in user.submissions if submission.workflow_id == metadata_request.workflow_id
)
if not submission:
raise Exception(f"No Submission found for id {metadata_request.workflow_id}")
if metadata_request.metadata:
with tempfile.NamedTemporaryFile(delete=False) as fp:
metadata_json_str = json.dumps(metadata_request.metadata)
fp.write(str.encode(metadata_json_str))
fp.close()
get_minio_client().fput_object(
user.bucket_name, f"{submission.output_path(OUTPUT_BASE_PATH)}/hs_user_meta.json", fp.name
)

api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
body=metadata_extraction_submission_body(
user.bucket_name,
submission.output_path(OUTPUT_BASE_PATH),
f"{submission.output_path(OUTPUT_BASE_PATH)}_hs_metadata.tgz",
),
_preload_content=False,
)


async def upsert_submission(user: User, submission: Submission) -> Submission:
api_response = api_instance.get_workflow(
namespace=get_settings().argo_namespace, name=submission.workflow_id, _preload_content=False
Expand Down Expand Up @@ -214,20 +248,9 @@ async def logs(workflow_params: WorkflowDep) -> LogsResponseModel:
log_options_container="main",
_preload_content=False,
)
log.info(api_response.json())
return {"logs": parse_logs(api_response)}


@router.get('/url/{workflow_id}', description="Create a download url")
async def signed_url_minio(workflow_params: WorkflowDep) -> UrlResponseModel:
submission = workflow_params.user.get_submission(workflow_params.workflow_id)
url = get_minio_client().presigned_get_object(
"subsetter-outputs",
f"{workflow_params.user.bucket_name}/{submission.workflow_name}/{submission.workflow_id}/all.gz",
)
return {'url': url}


@router.get('/argo/{workflow_id}')
async def argo_metadata(workflow_params: WorkflowDep):
api_response = api_instance.get_workflow(
Expand Down
1 change: 1 addition & 0 deletions app/api/subsetter/app/routers/discovery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .router import router
1 change: 1 addition & 0 deletions app/api/subsetter/app/routers/hydroshare/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .router import router
50 changes: 50 additions & 0 deletions app/api/subsetter/app/routers/hydroshare/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import tempfile
from typing import Any, Union

import google.cloud.logging as logging
from fastapi import APIRouter, Depends
from pydantic import BaseModel

from subsetter.app.db import User
from subsetter.app.users import current_active_user
from subsetter.config import get_minio_client, get_settings

if get_settings().cloud_run:
logging_client = logging.Client()
logging_client.setup_logging()

router = APIRouter()


class HydroShareMetadata(BaseModel):
title: str
description: str


class DatasetMetadataRequestModel(BaseModel):
file_path: str
# bucket_name: str
metadata: Union[HydroShareMetadata, Any]


@router.post('/dataset/metadata')
async def create_metadata(metadata_request: DatasetMetadataRequestModel, user: User = Depends(current_active_user)):
with tempfile.NamedTemporaryFile(delete=False) as fp:
metadata_json_str = json.dumps(metadata_request.metadata)
print(metadata_json_str)
fp.write(str.encode(metadata_json_str))
fp.close()
get_minio_client().fput_object(user.bucket_name, metadata_request.file_path, fp.name)


@router.put('/dataset/metadata')
async def update_metadata(metadata_request: DatasetMetadataRequestModel, user: User = Depends(current_active_user)):
get_minio_client().remove_object(user.bucket_name, metadata_request.file_path)
return await create_metadata(metadata_request, user)


class DatasetExtractRequestModel(BaseModel):
file_path: str = None
# bucket_name: str
metadata: Union[HydroShareMetadata, Any] = None
2 changes: 1 addition & 1 deletion app/api/subsetter/app/routers/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from subsetter.app.routers.storage.router import router
from .router import router
1 change: 1 addition & 0 deletions app/api/subsetter/app/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async def on_after_request_verify(self, user: User, token: str, request: Optiona
async def get_user_manager(user_db: BeanieUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)


bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")


Expand Down
8 changes: 7 additions & 1 deletion app/api/subsetter/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import subprocess

from beanie import init_beanie
Expand All @@ -8,6 +7,7 @@
from subsetter.app.db import User, db
from subsetter.app.routers.access_control import router as access_control_router
from subsetter.app.routers.argo import router as argo_router
from subsetter.app.routers.hydroshare import router as hydroshare_router
from subsetter.app.routers.storage import router as storage_router
from subsetter.app.schemas import UserRead, UserUpdate
from subsetter.app.users import SECRET, auth_backend, cuahsi_oauth_client, fastapi_users
Expand Down Expand Up @@ -55,6 +55,12 @@
tags=["minio"],
)

app.include_router(
hydroshare_router,
# prefix="/auth/cuahsi",
tags=["hydroshare"],
)

app.include_router(
fastapi_users.get_oauth_router(
cuahsi_oauth_client,
Expand Down
Binary file added app/argo/.images/minio-view-subsetter-ouput.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added app/argo/.images/workflow-graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
44 changes: 41 additions & 3 deletions app/argo/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
Minio Artifact setup;
# Argo Workflows for Subsetting


Argo Workflows are containerized DAG workflows declared in yaml that run on Kubernetes. Each node in the workflow is a docker container with S3 storage.

*An example graph of the parflow subsetter workflow*
![Alt text](.images/workflow-graph.png)

The `templates/` directory contains workflows which declare composable [templates](https://argo-workflows.readthedocs.io/en/latest/workflow-templates/). They are referenced by the workflows in the `workflows/` directory with a [templateRef](https://argo-workflows.readthedocs.io/en/latest/workflow-templates/#referencing-other-workflowtemplates)

Artifact storage is S3 w/MinIO. We use artifacts to store input and output files for our workflows. Each user is given a bucket (TODO: configurable Version Control and Quotas) for storing output data of their workflows. The output of one workflow may be used as input to subsequent workflow runs.

The 3 supported subsetter workflows (nwm1, nwm2, parflow) write the result to S3 storage in their own bucket at `/argo_workflows/{workflow_template}/{GUID}`. A GUID is generated for each run of a subsetter workflow and is used as the workflow run name. An example parflow subsetter output viewed in the MinIO viewer is shown below.
![Example user bucket with parflow subsetter output](.images/minio-view-subsetter-ouput.png)

The subsetter input datasets are stored on the [CUAHSI MinIO instance](https://console.minio.cuahsi.io). This bucket has public read access. The workflows use these datasets as input artifacts within a subsetter workflow. A workflow conveninently maps an artifact to a path within a container that can be used as input our output locations to a program running in the container.

*Example output declaration with configurable output locations. [ArtifactRepositoryRef](https://argo-workflows.readthedocs.io/en/latest/artifact-repository-ref/) could be used to simplify artifact use.*

```yaml
outputs:
artifacts:
- name: subsetter-result
path: /output
s3:
endpoint: api.minio.cuahsi.io
bucket: '{{inputs.parameters.output-bucket}}'
accessKeySecret:
name: minio-credentials
key: accessKey
secretKeySecret:
name: minio-credentials
key: secretKey
key: '{{inputs.parameters.output-path}}'
```
# `minio-credentials` access key/secret setup;
1. Create an access key/secret in the minio UI at
2. Save the key/secret as a secret in kubernetes in the workflows namespace
`kubectl create secret generic minio-credentials --namespace workflows --from-literal=accessKey='<key>' --from-literal=secretKey='<secret>`
2. Save the key/secret as a secret in kubernetes in the `workflows` namespace
`kubectl create secret generic minio-credentials --namespace workflows --from-literal=accessKey='<key>' --from-literal=secretKey='<secret>`

These workflows should eventually be setup to automatically sync to https://workflows.argo.cuahsi.io
Loading

0 comments on commit 49a33e4

Please sign in to comment.