Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/subsetter_argo' into develop_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
devincowan committed Jan 8, 2024
2 parents 257a8cc + b8db299 commit 01ea059
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 340 deletions.
14 changes: 12 additions & 2 deletions app/api/subsetter/app/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from functools import lru_cache
from typing import List, Optional, Tuple

import httpx
Expand All @@ -9,10 +10,14 @@

from subsetter.config import get_settings

DATABASE_URL = get_settings().mongo_url
client = motor.motor_asyncio.AsyncIOMotorClient(DATABASE_URL, uuidRepresentation="standard")
client = motor.motor_asyncio.AsyncIOMotorClient(get_settings().mongo_url, uuidRepresentation="standard")
db = client[get_settings().mongo_database]

client_hydroshare = motor.motor_asyncio.AsyncIOMotorClient(
get_settings().hydroshare_mongo_url, uuidRepresentation="standard"
)
db_hydroshare = client_hydroshare[get_settings().hydroshare_mongo_database]


class OAuthAccount(BaseOAuthAccount):
pass
Expand Down Expand Up @@ -80,3 +85,8 @@ async def update_submission(self, submission: Submission) -> None:

async def get_user_db():
yield BeanieUserDatabase(User, OAuthAccount)


@lru_cache
def get_hydroshare_access_db():
return db_hydroshare
137 changes: 74 additions & 63 deletions app/api/subsetter/app/routers/access_control/policy_generation.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,54 @@
'''
def admin_policy_attach(target, name):
arguments = ['mc', '--config-dir', '/hydroshare/', 'admin', 'policy', 'attach', target, name, '--user', name]
logger.info(arguments)
try:
_output = subprocess.check_output(arguments, user='hydro-service')
logger.info(_output)
except subprocess.CalledProcessError as e:
logger.exception(e.output)
import copy
import json
import logging as logger
import os
import subprocess
import tempfile
from typing import Dict


def admin_policy_create(target, name, file):
arguments = ['mc', '--config-dir', '/hydroshare/', '--json', 'admin', 'policy', 'create', target, name, file]
arguments = ['mc', '--json', 'admin', 'policy', 'create', target, name, file]
logger.info(arguments)
try:
_output = subprocess.check_output(arguments, user='hydro-service')
_output = subprocess.check_output(arguments)
logger.info(_output)
except subprocess.CalledProcessError as e:
logger.exception(e.output)
admin_policy_attach(target, name)


def admin_policy_remove(target, name):
arguments = ['mc', '--config-dir', '/hydroshare/', '--json', 'admin', 'policy', 'remove', target, name]
arguments = ['mc', '--json', 'admin', 'policy', 'rm', target, name]
logger.info(arguments)
try:
_output = subprocess.check_output(arguments, user='hydro-service')
_output = subprocess.check_output(arguments)
logger.info(_output)
except subprocess.CalledProcessError as e:
logger.exception(e.output)

def refresh_minio_policy(user):
policy = minio_policy(user)
logging.info(json.dumps(policy, indent=2))
if policy["Statement"]:
with tempfile.TemporaryDirectory(dir='/hs_tmp') as tmpdirname:
filepath = os.path.join(tmpdirname, "metadata.json")
fp = open(filepath, "w")
fp.write(json.dumps(policy))
fp.close()
admin_policy_remove(target='cuahsi', name=user.username)
admin_policy_create(target='cuahsi', name=user.username, file=filepath)
else:
admin_policy_remove(target='cuahsi', name=user.username)
'''

import copy
def refresh_minio_policy(user, policy):
logger.info(json.dumps(policy, indent=2))
with tempfile.TemporaryDirectory() as tmpdirname:
filepath = os.path.join(tmpdirname, "metadata.json")
fp = open(filepath, "w")
fp.write(json.dumps(policy))
fp.close()
admin_policy_create(target='cuahsi', name=user.username, file=filepath)
return policy


def bucket_name(resource_id: str):
# raccess = ResourceAccess.objects.filter(resource__short_id=resource_id).first()
# return raccess.owners.first().username
return "subsetter-outputs"


def create_view_statements(owner, submissions) -> list:
view_statement_template_get = {
def create_view_statements(user, views: Dict[str, list[str]]) -> list:
if not views:
return []
view_statement_template_get_object = {
"Effect": "Allow",
"Action": ["s3:GetBucketLocation", "s3:GetObject"],
"Action": ["s3:GetObject"],
"Resource": [],
}
view_statement_template_get_bucket = {
"Effect": "Allow",
"Action": ["s3:GetBucketLocation"],
"Resource": [],
}
view_statement_template_listing = {
Expand All @@ -64,35 +57,53 @@ def create_view_statements(owner, submissions) -> list:
"Resource": [],
"Condition": {"StringLike": {"s3:prefix": []}},
}
bucketname = bucket_name("blah")
get_resources = [f"arn:aws:s3:::{bucketname}/{owner.username}/*"]
view_statement = copy.deepcopy(view_statement_template_listing)
view_statement["Resource"] = [f"arn:aws:s3:::{bucketname}"]
view_statement["Condition"]["StringLike"]["s3:prefix"] = [f"{owner.username}/*"]
list_statements = [view_statement]
for user, submission in submissions:
bucketname = bucket_name(submission.workflow_id)
get_resources.append(
f"arn:aws:s3:::{bucketname}/{user.username}/{submission.workflow_name}/{submission.workflow_id}/*"
)

get_objectt_resources = []
get_bucket_resources = []
list_statements = []
for bucket_owner, resource_paths in views.items():
get_objectt_resources = get_objectt_resources + [
f"arn:aws:s3:::{bucket_owner}/{resource_path}/*" for resource_path in resource_paths
]
get_bucket_resources.append(f"arn:aws:s3:::{bucket_owner}")
view_statement = copy.deepcopy(view_statement_template_listing)
view_statement["Resource"] = [f"arn:aws:s3:::{bucketname}"]
view_statement["Resource"] = [f"arn:aws:s3:::{bucket_owner}"]
view_statement["Condition"]["StringLike"]["s3:prefix"] = [
f"{user.username}/{submission.workflow_name}/{submission.workflow_id}/*"
f"{resource_path}/*" for resource_path in resource_paths
]
list_statements.append(view_statement)
view_statement_template_get["Resource"] = get_resources
return list_statements + [view_statement_template_get]
view_statement_template_get_object["Resource"] = get_objectt_resources
view_statement_template_get_bucket["Resource"] = get_bucket_resources
return list_statements + [view_statement_template_get_object]


# def create_edit_owner_statements(resource_ids: list[str]) -> list:
# edit_statement_template = {"Effect": "Allow", "Action": ["s3:*"], "Resource": []}
# edit_statement_template["Resource"] = [
# f"arn:aws:s3:::{bucket_name(resource_id)}/hydroshare/{resource_id}/*" for resource_id in resource_ids
# ]
# return [edit_statement_template]
def create_edit_statements(user, edits: Dict[str, list[str]]) -> list:
edit_all_statement = {
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": [f"arn:aws:s3:::{user.username}"],
}
edit_paths_resources = []
for bucket_owner, resource_paths in edits.items():
edit_paths_resources = edit_paths_resources + [
f"arn:aws:s3:::{bucket_owner}/{resource_path}/*" for resource_path in resource_paths
]

edit_paths_statement = {
"Effect": "Allow",
"Action": ["s3:*Object"],
"Resource": edit_paths_resources,
}
list_bucket_statement = {
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{bucket_owner}" for bucket_owner, _ in edits.items()],
}
statements = [edit_all_statement, edit_paths_statement, list_bucket_statement]
return [statement for statement in statements if statement]


def minio_policy(user, view_submissions):
view_statements = create_view_statements(user, view_submissions)
return {"Version": "2012-10-17", "Statement": view_statements}
def minio_policy(user, owners: Dict[str, list[str]], edits: Dict[str, list[str]], views: Dict[str, list[str]]):
statements = create_view_statements(user, views)
statements = statements + create_edit_statements(user, edits)
return {"Version": "2012-10-17", "Statement": statements}
97 changes: 51 additions & 46 deletions app/api/subsetter/app/routers/access_control/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,74 @@
import os
import subprocess
import tempfile
from typing import Dict

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends
from pydantic import BaseModel

from subsetter.app.db import Submission, User
from subsetter.app.db import User, get_hydroshare_access_db
from subsetter.app.routers.access_control.policy_generation import minio_policy
from subsetter.app.users import current_active_user

from .policy_generation import refresh_minio_policy

router = APIRouter()


class ShareWorkflowBody(BaseModel):
class UserAccess(BaseModel):
owner: list[str]
edit: list[str]
view: list[str]


class MinioUserResourceAccess(BaseModel):
owners: list[str]
resource_id: str
minio_resource_url: str


class MinioUserAccess(BaseModel):
owner: list[MinioUserResourceAccess]
edit: list[MinioUserResourceAccess]
view: list[MinioUserResourceAccess]


class UserPrivilege(BaseModel):
username: str
workflow_id: str
all: UserAccess
minio: MinioUserAccess


@router.post('/policy/add')
async def share_workflow_with_user(share_params: ShareWorkflowBody, user: User = Depends(current_active_user)):
submission: Submission = user.get_submission(share_params.workflow_id)
if submission:
submission.add_user(share_params.username)
await user.update_submission(submission)
return user
else:
return HTTPException(status_code=400)
def check_owners_in_bucket_path(resource_access: MinioUserResourceAccess):
for owner in resource_access.owners:
if f"/browser/{owner}/" in resource_access.minio_resource_url:
return owner
return None


@router.delete('/policy/remove')
async def unshare_workflow_with_user(share_params: ShareWorkflowBody, user: User = Depends(current_active_user)):
submission: Submission = user.get_submission(share_params.workflow_id)
if submission:
submission.remove_user(share_params.username)
await user.update_submission(submission)
return user
else:
return HTTPException(status_code=400)
def sort_privileges(user_accesses: list[MinioUserResourceAccess]):
authorized_users = {}
for user_access in user_accesses:
bucket_owner = check_owners_in_bucket_path(user_access)
if bucket_owner:
resource_path = user_access.minio_resource_url.split(f"{bucket_owner}/", 1)[-1]
authorized_users.setdefault(bucket_owner, []).append(resource_path)
return authorized_users


@router.get('/policy')
async def generate_user_policy(user: User = Depends(current_active_user)):
users = await User.find({"submissions.view_users": user.username}).to_list()
# this should be rewritten to query all on the db, but I don't have time to figure that out now
matching_submissions = []
for u in users:
for submission in u.submissions:
if user.username in submission.view_users:
matching_submissions.append((u, submission))
return minio_policy(user, matching_submissions)
hydroshare_access_db = get_hydroshare_access_db()
user_privilege = await hydroshare_access_db.userprivileges.find_one({"username": user.username})
user_privilege: UserPrivilege = UserPrivilege(**user_privilege)

# Check Authorization
minio_user_access: MinioUserAccess = user_privilege.minio
authorized_owners: Dict[str, list[str]] = sort_privileges(minio_user_access.owner)
authorized_edits: Dict[str, list[str]] = sort_privileges(minio_user_access.edit)
authorized_views: Dict[str, list[str]] = sort_privileges(minio_user_access.view)

return minio_policy(user, authorized_owners, authorized_edits, authorized_views)


@router.get('/profile')
Expand All @@ -58,22 +78,7 @@ async def refresh_profile(user: User = Depends(current_active_user)):
return user


def admin_policy_create(name, policy, target="cuahsi"):
with tempfile.TemporaryDirectory() as tmpdirname:
print(policy)
filepath = os.path.join(tmpdirname, "metadata.json")
fp = open(filepath, "w")
fp.write(json.dumps(policy))
fp.close()
arguments = ['mc', '--json', 'admin', 'policy', 'create', target, name, filepath]
try:
_output = subprocess.check_output(arguments)
except subprocess.CalledProcessError as e:
raise


@router.get('/policy/minio/cuahsi')
async def generate_and_save_user_policy(user: User = Depends(current_active_user)):
user_policy = await generate_user_policy(user)
admin_policy_create(user.username, user_policy)
return user_policy
return refresh_minio_policy(user, user_policy)
10 changes: 6 additions & 4 deletions app/api/subsetter/app/routers/argo/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from subsetter.app.users import current_active_user
from subsetter.config import get_minio_client, get_settings

from .transformer import transform_latlon

if get_settings().cloud_run:
Expand All @@ -43,8 +44,9 @@ def parflow_submission_body(hucs: list, username: str, workflow_name: str):
"name": workflow_name,
"parameters": [
f"output-path=argo_workflows/parflow/{workflow_name}",
f"output-bucket={username}",
"hucs=" + ",".join(hucs)],
f"output-bucket={username}",
"hucs=" + ",".join(hucs),
],
},
}

Expand Down Expand Up @@ -123,7 +125,7 @@ async def submit_parflow(
async def submit_nwm1(
y_south: float, x_west: float, y_north: float, x_east: float, user: User = Depends(current_active_user)
) -> SubmissionResponseModel:
#y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
# y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
workflow_id = str(uuid.uuid4())
api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
Expand All @@ -139,7 +141,7 @@ async def submit_nwm1(
async def submit_nwm2(
y_south: float, x_west: float, y_north: float, x_east: float, user: User = Depends(current_active_user)
) -> SubmissionResponseModel:
#y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
# y_south, x_west, y_north, x_east = transform_latlon(y_south, x_west, y_north, x_east)
workflow_id = str(uuid.uuid4())
api_response = api_instance.submit_workflow(
namespace=get_settings().argo_namespace,
Expand Down
Loading

0 comments on commit 01ea059

Please sign in to comment.