From 201349ff69ebd471d193f81d60b320562d2e5e00 Mon Sep 17 00:00:00 2001 From: Scott Black Date: Fri, 5 Jan 2024 11:19:04 -0700 Subject: [PATCH] use user buckets for outputs --- app/api/subsetter/app/db.py | 8 - .../subsetter/app/routers/argo/transformer.py | 21 ++ .../app/routers/discovery/__init__.py | 0 .../app/routers/discovery/discovery.py | 192 ++++++++++++++++++ .../subsetter/app/routers/storage/router.py | 14 +- app/api/subsetter/app/users.py | 4 +- 6 files changed, 222 insertions(+), 17 deletions(-) create mode 100644 app/api/subsetter/app/routers/argo/transformer.py create mode 100644 app/api/subsetter/app/routers/discovery/__init__.py create mode 100644 app/api/subsetter/app/routers/discovery/discovery.py diff --git a/app/api/subsetter/app/db.py b/app/api/subsetter/app/db.py index 88190433..e90f2ef2 100644 --- a/app/api/subsetter/app/db.py +++ b/app/api/subsetter/app/db.py @@ -33,14 +33,6 @@ class Submission(BaseModel): startedAt: Optional[str] = None finishedAt: Optional[str] = None estimatedDuration: Optional[int] = None - view_users: Optional[List[str]] = [] - - def add_user(self, username: str): - self.view_users.append(username) - self.view_users = list(set(self.view_users)) - - def remove_user(self, username: str): - self.view_users.remove(username) class User(BeanieBaseUser, Document): diff --git a/app/api/subsetter/app/routers/argo/transformer.py b/app/api/subsetter/app/routers/argo/transformer.py new file mode 100644 index 00000000..cae96d24 --- /dev/null +++ b/app/api/subsetter/app/routers/argo/transformer.py @@ -0,0 +1,21 @@ +import pyproj + + +def transform_latlon(y_south: float, x_west: float, y_north: float, x_east: float): + print(f"y_south={y_south}") + print(f"y_north={y_north}") + print(f"x_west={x_west}") + print(f"x_east={x_east}") + # define a target coordinate system and convert the geometry data into the projection of our forcing data + target_crs = pyproj.Proj( + proj='lcc', lat_1=30.0, lat_2=60.0, lat_0=40.0000076293945, lon_0=-97.0, a=6370000, b=6370000 # Center point + ) + + transformer = pyproj.Transformer.from_crs("EPSG:4326", target_crs.crs) + x_west, y_south, x_east, y_north = transformer.transform_bounds(x_west, y_south, x_east, y_north, always_xy=True) + # x_west, y_south, x_east, y_north = transformer.itransform([(x_west, south, y_south, x_east, y_north]) + print(f"y_south={y_south}") + print(f"y_north={y_north}") + print(f"x_west={x_west}") + print(f"x_east={x_east}") + return y_south, x_west, y_north, x_east diff --git a/app/api/subsetter/app/routers/discovery/__init__.py b/app/api/subsetter/app/routers/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app/api/subsetter/app/routers/discovery/discovery.py b/app/api/subsetter/app/routers/discovery/discovery.py new file mode 100644 index 00000000..5cda8d69 --- /dev/null +++ b/app/api/subsetter/app/routers/discovery/discovery.py @@ -0,0 +1,192 @@ +from datetime import datetime + +from fastapi import APIRouter, Depends, Request +from pydantic import BaseModel, validator + +router = APIRouter() + + +class SearchQuery(BaseModel): + term: str = None + sortBy: str = None + reverseSort: bool = True + contentType: str = None + providerName: str = None + creatorName: str = None + dataCoverageStart: int = None + dataCoverageEnd: int = None + publishedStart: int = None + publishedEnd: int = None + hasPartName: str = None + isPartOfName: str = None + associatedMediaName: str = None + fundingGrantName: str = None + fundingFunderName: str = None + creativeWorkStatus: str = None + pageNumber: int = 1 + pageSize: int = 30 + + @validator('*') + def empty_str_to_none(cls, v, field, **kwargs): + if field.name == 'term' and v: + return v.strip() + + if isinstance(v, str) and v.strip() == '': + return None + return v + + @validator('dataCoverageStart', 'dataCoverageEnd', 'publishedStart', 'publishedEnd') + def validate_year(cls, v, values, field, **kwargs): + if v is None: + return v + try: + datetime(v, 1, 1) + except ValueError: + raise ValueError(f'{field.name} is not a valid year') + if field.name == 'dataCoverageEnd': + if 'dataCoverageStart' in values and v < values['dataCoverageStart']: + raise ValueError(f'{field.name} must be greater or equal to dataCoverageStart') + if field.name == 'publishedEnd': + if 'publishedStart' in values and v < values['publishedStart']: + raise ValueError(f'{field.name} must be greater or equal to publishedStart') + return v + + @validator('pageNumber', 'pageSize') + def validate_page(cls, v, field, **kwargs): + if v <= 0: + raise ValueError(f'{field.name} must be greater than 0') + return v + + @property + def _filters(self): + filters = [] + if self.publishedStart: + filters.append( + { + 'range': { + 'path': 'datePublished', + 'gte': datetime(self.publishedStart, 1, 1), + }, + } + ) + if self.publishedEnd: + filters.append( + { + 'range': { + 'path': 'datePublished', + 'lt': datetime(self.publishedEnd + 1, 1, 1), # +1 to include all of the publishedEnd year + }, + } + ) + + if self.dataCoverageStart: + filters.append( + {'range': {'path': 'temporalCoverage.startDate', 'gte': datetime(self.dataCoverageStart, 1, 1)}} + ) + if self.dataCoverageEnd: + filters.append( + {'range': {'path': 'temporalCoverage.endDate', 'lt': datetime(self.dataCoverageEnd + 1, 1, 1)}} + ) + return filters + + @property + def _should(self): + search_paths = ['name', 'description', 'keywords', 'keywords.name'] + should = [{'autocomplete': {'query': self.term, 'path': key, 'fuzzy': {'maxEdits': 1}}} for key in search_paths] + return should + + @property + def _must(self): + must = [] + must.append({'term': {'path': '@type', 'query': "Dataset"}}) + if self.contentType: + must.append({'term': {'path': '@type', 'query': self.contentType}}) + if self.creatorName: + must.append({'text': {'path': 'creator.name', 'query': self.creatorName}}) + if self.providerName: + must.append({'text': {'path': 'provider.name', 'query': self.providerName}}) + if self.hasPartName: + must.append({'text': {'path': 'hasPart.name', 'query': self.hasPartName}}) + if self.isPartOfName: + must.append({'text': {'path': 'isPartOf.name', 'query': self.isPartOfName}}) + if self.associatedMediaName: + must.append({'text': {'path': 'associatedMedia.name', 'query': self.associatedMediaName}}) + if self.fundingGrantName: + must.append({'text': {'path': 'funding.name', 'query': self.fundingGrantName}}) + if self.fundingFunderName: + must.append({'text': {'path': 'funding.funder.name', 'query': self.fundingFunderName}}) + if self.creativeWorkStatus: + must.append( + {'text': {'path': ['creativeWorkStatus', 'creativeWorkStatus.name'], 'query': self.creativeWorkStatus}} + ) + + return must + + @property + def stages(self): + highlightPaths = ['name', 'description', 'keywords', 'keywords.name', 'creator.name'] + stages = [] + compound = {'filter': self._filters, 'must': self._must} + if self.term: + compound['should'] = self._should + search_stage = { + '$search': { + 'index': 'fuzzy_search', + 'compound': compound, + } + } + if self.term: + search_stage["$search"]['highlight'] = {'path': highlightPaths} + + stages.append(search_stage) + + # sorting needs to happen before pagination + if self.sortBy: + if self.sortBy == "name": + self.sortBy = "name_for_sorting" + self.reverseSort = not self.reverseSort + stages.append({'$sort': {self.sortBy: -1 if self.reverseSort else 1}}) + stages.append({'$skip': (self.pageNumber - 1) * self.pageSize}) + stages.append({'$limit': self.pageSize}) + # stages.append({'$unset': ['_id', '_class_id']}) + stages.append( + {'$set': {'score': {'$meta': 'searchScore'}, 'highlights': {'$meta': 'searchHighlights'}}}, + ) + return stages + + +@router.get("/search") +async def search(request: Request, search_query: SearchQuery = Depends()): + stages = search_query.stages + result = await request.app.mongodb["discovery"].aggregate(stages).to_list(search_query.pageSize) + import json + + json_str = json.dumps(result, default=str) + return json.loads(json_str) + + +@router.get("/typeahead") +async def typeahead(request: Request, term: str, pageSize: int = 30): + search_paths = ['name', 'description', 'keywords', 'keywords.name'] + should = [{'autocomplete': {'query': term, 'path': key, 'fuzzy': {'maxEdits': 1}}} for key in search_paths] + + stages = [ + { + '$search': { + 'index': 'fuzzy_search', + 'compound': {'should': should}, + 'highlight': {'path': ['description', 'name', 'keywords', 'keywords.name']}, + } + }, + { + '$project': { + 'name': 1, + 'description': 1, + 'keywords': 1, + 'highlights': {'$meta': 'searchHighlights'}, + '_id': 0, + } + }, + ] + result = await request.app.mongodb["discovery"].aggregate(stages).to_list(pageSize) + return result diff --git a/app/api/subsetter/app/routers/storage/router.py b/app/api/subsetter/app/routers/storage/router.py index 7da0b8e7..c8ef717f 100644 --- a/app/api/subsetter/app/routers/storage/router.py +++ b/app/api/subsetter/app/routers/storage/router.py @@ -1,22 +1,21 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter -from subsetter.app.db import Submission, User from subsetter.app.models import WorkflowDep -from subsetter.app.users import current_active_user from subsetter.config import get_minio_client router = APIRouter() -@router.get('/presigned/put', description="Create a download url") +@router.get('/presigned/get/{workflow_id}', description="Create a download url") async def presigned_put_minio(workflow_params: WorkflowDep): submission = workflow_params.user.get_submission(workflow_params.workflow_id) - url = get_minio_client().presigned_put_object( - "subsetter-outputs", f"{submission.workflow_name}/{submission.workflow_id}/all.gz" + url = get_minio_client().presigned_get_object( + workflow_params.user.username, f"argo_workflows/{submission.workflow_name}/{submission.workflow_id}/all.gz" ) return {'url': url} +''' @router.post('/bucket/create') async def create_user_bucket( user: User = Depends(current_active_user), description="Creates a bucket named with the username" @@ -24,7 +23,6 @@ async def create_user_bucket( if not get_minio_client().bucket_exists(user.username): get_minio_client().make_bucket(user.username) - @router.get('/upload/{user_name}/workflow/{workflow_id}') async def share_workflow_with_user( user_name: str, workflow_params: WorkflowDep, user: User = Depends(current_active_user) @@ -43,7 +41,7 @@ async def share_workflow_with_user( submission.remove_user(user_name) await user.update_submission(submission) return User.get(document_id=user.document_id) - +''' # @router.post('/extract/{workflow_id}') # async def extract_workflow_artifact(workflow_params: WorkflowDep) -> SubmissionResponseModel: diff --git a/app/api/subsetter/app/users.py b/app/api/subsetter/app/users.py index 6b69fa54..7714b6c9 100644 --- a/app/api/subsetter/app/users.py +++ b/app/api/subsetter/app/users.py @@ -11,7 +11,7 @@ from httpx_oauth.oauth2 import OAuth2 from subsetter.app.db import User, get_user_db -from subsetter.config import get_settings +from subsetter.config import get_settings, get_minio_client SECRET = "SECRET" @@ -51,6 +51,8 @@ class UserManager(ObjectIDIDMixin, BaseUserManager[User, PydanticObjectId]): async def on_after_register(self, user: User, request: Optional[Request] = None): await user.update_profile() + if not get_minio_client().bucket_exists(user.username): + get_minio_client().make_bucket(user.username) print(f"User {user.id} has registered.") async def on_after_forgot_password(self, user: User, token: str, request: Optional[Request] = None):