From 7e255699a17653d6cdad7648284a83cbc08a7ab5 Mon Sep 17 00:00:00 2001 From: Pradip Thapa Date: Tue, 17 Dec 2024 09:59:52 +0545 Subject: [PATCH] feat: update drone image processor to handle single and multiple tasks (#399) * feat: add endpoints to process all task images of AOI * reac: refactor process_assets_from_odm function to reduce redundancy * fix: remove double s3_path from process_assets_from_odm * fix: added all tasks images in odm processing * fix: remove local setup of config & docker-compose * fix: remove multple class for drone images processing & now handle from single class * fix: refine drone image processing * added tags in the odm webhook router endpoint --------- Co-authored-by: Niraj Adhikari --- src/backend/app/projects/image_processing.py | 387 ++++++++++++------- src/backend/app/projects/project_logic.py | 56 ++- src/backend/app/projects/project_routes.py | 223 +++++------ 3 files changed, 404 insertions(+), 262 deletions(-) diff --git a/src/backend/app/projects/image_processing.py b/src/backend/app/projects/image_processing.py index d86600e0..ec0a2de7 100644 --- a/src/backend/app/projects/image_processing.py +++ b/src/backend/app/projects/image_processing.py @@ -1,4 +1,5 @@ import os +from typing import Any, Dict, List, Optional import uuid import tempfile import shutil @@ -23,24 +24,36 @@ def __init__( self, node_odm_url: str, project_id: uuid.UUID, - task_id: uuid.UUID, user_id: str, db: Connection, + task_id: Optional[uuid.UUID] = None, + task_ids: Optional[List[uuid.UUID]] = None, ): """ - Initializes the connection to the ODM node. + Base initialization for drone image processing. + + :param node_odm_url: URL of the ODM node + :param project_id: Project UUID + :param user_id: User ID + :param db: Database connection + :param task_id: Optional single task ID + :param task_ids: Optional list of task IDs """ - # self.node = Node(node_odm_host, node_odm_port) self.node = Node.from_url(node_odm_url) self.project_id = project_id - self.task_id = task_id self.user_id = user_id self.db = db + self.task_id = task_id + self.task_ids = task_ids or ([] if task_id is None else [task_id]) - def options_list_to_dict(self, options=[]): + def options_list_to_dict( + self, options: List[Dict[str, Any]] = None + ) -> Dict[str, Any]: """ - Converts options formatted as a list ([{'name': optionName, 'value': optionValue}, ...]) - to a dictionary {optionName: optionValue, ...} + Convert options list to a dictionary. + + :param options: List of option dictionaries + :return: Dictionary of options """ opts = {} if options is not None: @@ -48,73 +61,171 @@ def options_list_to_dict(self, options=[]): opts[o["name"]] = o["value"] return opts + def list_images(self, directory: str) -> List[str]: + """ + List all image files in a directory. + + :param directory: Directory path + :return: List of image file paths + """ + images = [] + path = Path(directory) + + for file in path.rglob("*"): + if file.suffix.lower() in {".jpg", ".jpeg", ".png", ".txt", ".laz"}: + images.append(str(file)) + return images + def download_object(self, bucket_name: str, obj, images_folder: str): + """ + Download an object from the bucket if it's an image or related file. + + :param bucket_name: Bucket name + :param obj: Object to download + :param images_folder: Destination folder + """ if obj.object_name.endswith( (".jpg", ".jpeg", ".JPG", ".png", ".PNG", ".txt", ".laz") - ): # Images and GCP File + ): local_path = Path(images_folder) / Path(obj.object_name).name local_path.parent.mkdir(parents=True, exist_ok=True) get_file_from_bucket(bucket_name, obj.object_name, local_path) - def download_images_from_s3(self, bucket_name, local_dir): + def download_images_from_s3( + self, bucket_name: str, local_dir: str, task_id: Optional[uuid.UUID] = None + ): """ - Downloads images from MinIO under the specified path. + Download images from S3 for a specific task or project. - :param bucket_name: Name of the MinIO bucket. - :param project_id: The project UUID. - :param task_id: The task UUID. - :param local_dir: Local directory to save the images. - :return: List of local image file paths. + :param bucket_name: Bucket name + :param local_dir: Local directory to save images + :param task_id: Optional specific task ID """ - prefix = f"dtm-data/projects/{self.project_id}/{self.task_id}" - + prefix = ( + f"dtm-data/projects/{self.project_id}/{task_id}" + if task_id + else f"dtm-data/projects/{self.project_id}" + ) objects = list_objects_from_bucket(bucket_name, prefix) - # Process images concurrently with ThreadPoolExecutor() as executor: executor.map( lambda obj: self.download_object(bucket_name, obj, local_dir), objects, ) - def list_images(self, directory): + def process_new_task( + self, + images: List[str], + name: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + progress_callback: Optional[Any] = None, + webhook: Optional[str] = None, + ): """ - Lists all images in the specified directory. + Create a new processing task. + + :param images: List of image file paths + :param name: Task name + :param options: Processing options + :param progress_callback: Progress tracking callback + :param webhook: Webhook URL + :return: Created task object + """ + opts = self.options_list_to_dict(options) + task = self.node.create_task( + images, opts, name, progress_callback, webhook=webhook + ) + return task - :param directory: The directory containing the images. - :return: List of image file paths. + async def _process_images( + self, + bucket_name: str, + name: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + webhook: Optional[str] = None, + single_task: bool = True, + ): """ - images = [] - path = Path(directory) + Internal method to process images for a single task or multiple tasks. + + :param bucket_name: Bucket name + :param name: Task name + :param options: Processing options + :param webhook: Webhook URL + :param single_task: Whether processing a single or multiple tasks + :return: Created task object + """ + # Create a temporary directory to store downloaded images + temp_dir = tempfile.mkdtemp() + try: + images_list = [] + # Download images based on single or multiple task processing + if single_task: # and self.task_id: + self.download_images_from_s3(bucket_name, temp_dir, self.task_id) + images_list = self.list_images(temp_dir) + else: + for task_id in self.task_ids: + self.download_images_from_s3(bucket_name, temp_dir, task_id) + images_list.extend(self.list_images(temp_dir)) - for file in path.rglob("*"): - if file.suffix.lower() in { - ".jpg", - ".jpeg", - ".png", - ".txt", - ".laz", - }: # Images, GCP File, and align.laz - images.append(str(file)) - return images + # Start a new processing task + task = self.process_new_task( + images_list, + name=name + or ( + f"DTM-Task-{self.task_id}" + if single_task + else f"DTM-Project-{self.project_id}" + ), + options=options, + webhook=webhook, + ) - def process_new_task( - self, images, name=None, options=[], progress_callback=None, webhook=None + return task + finally: + # Clean up temporary directory + shutil.rmtree(temp_dir) + + async def process_single_task( + self, + bucket_name: str, + name: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + webhook: Optional[str] = None, ): """ - Sends a set of images via the API to start processing. + Process images for a single task. - :param images: List of image file paths. - :param name: Name of the task. - :param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]). - :param progress_callback: Callback function to report upload progress. - :return: The created task object. + :param bucket_name: Bucket name + :param name: Task name + :param options: Processing options + :param webhook: Webhook URL + :return: Created task object """ - opts = self.options_list_to_dict(options) - task = self.node.create_task( - images, opts, name, progress_callback, webhook=webhook + return await self._process_images( + bucket_name, name=name, options=options, webhook=webhook, single_task=True + ) + + async def process_multiple_tasks( + self, + bucket_name: str, + name: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + webhook: Optional[str] = None, + ): + """ + Process images for multiple tasks. + + :param bucket_name: Bucket name + :param name: Task name + :param options: Processing options + :param webhook: Webhook URL + :return: Created task object + """ + return await self._process_images( + bucket_name, name=name, options=options, webhook=webhook, single_task=False ) - return task def monitor_task(self, task): """ @@ -127,42 +238,22 @@ def monitor_task(self, task): log.info("Task completed.") return task - def download_results(self, task, output_path): - """ - Downloads all results of the task to the specified output path. - - :param task: The task object. - :param output_path: The directory where results will be saved. - """ - log.info(f"Downloading results to {output_path}...") - path = task.download_zip(output_path) - log.info("Download completed.") - return path - - def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=None): + async def process_images_from_s3( + self, + bucket_name: str, + name: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + webhook: Optional[str] = None, + ): """ - Processes images from MinIO storage. - - :param bucket_name: Name of the MinIO bucket. - :param project_id: The project UUID. - :param task_id: The task UUID. - :param name: Name of the task. - :param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]). - :return: The task object. + Process images from S3 for a single task. """ - # Create a temporary directory to store downloaded images - temp_dir = tempfile.mkdtemp() - try: - self.download_images_from_s3(bucket_name, temp_dir) - - images_list = self.list_images(temp_dir) - - # Start a new processing task - task = self.process_new_task( - images_list, name=name, options=options, webhook=webhook - ) + task = await self.process_single_task( + bucket_name, name=name, options=options, webhook=webhook + ) - # If webhook is passed, webhook does this job. + if not webhook: + # If webhook is passed, webhook does this job. if not webhook: # Monitor task progress self.monitor_task(task) @@ -195,10 +286,23 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non ) return task - finally: - # Clean up temporary directory - shutil.rmtree(temp_dir) - pass + return task + + async def process_images_for_all_tasks( + self, + bucket_name: str, + name_prefix: Optional[str] = None, + options: Optional[List[Dict[str, Any]]] = None, + webhook: Optional[str] = None, + ): + """ + Process images for all tasks in a project. + """ + task = await self.process_multiple_tasks( + bucket_name, name=name_prefix, options=options, webhook=webhook + ) + + return task def reproject_to_web_mercator(input_file, output_file): @@ -228,47 +332,42 @@ def reproject_to_web_mercator(input_file, output_file): raise -async def download_and_upload_assets_from_odm_to_s3( +async def process_assets_from_odm( node_odm_url: str, - task_id: str, dtm_project_id: uuid.UUID, - dtm_task_id: uuid.UUID, - user_id: str, - current_state: State, - comment: str, + odm_task_id: str, + state=None, + message=None, + dtm_task_id=None, + dtm_user_id=None, ): """ - Downloads results from ODM, reprojects the orthophoto to EPSG:3857, and uploads it to S3. + Downloads results from ODM, reprojects the orthophoto, and uploads assets to S3. + Updates task state if required. - :param task_id: UUID of the ODM task. + :param node_odm_url: URL of the ODM node. :param dtm_project_id: UUID of the project. - :param dtm_task_id: UUID of the task. - :param current_state: Current state of the task (IMAGE_UPLOADED or IMAGE_PROCESSING_FAILED). + :param odm_task_id: UUID of the ODM task. + :param state: Current state of the task. + :param message: Message to log upon completion. + :param dtm_task_id: Task ID for state updates. + :param dtm_user_id: User ID for state updates. """ - log.info(f"Starting download for task {task_id}") - # Replace with actual ODM node details and URL + log.info(f"Starting processing for project {dtm_project_id}") node = Node.from_url(node_odm_url) + output_file_path = f"/tmp/{dtm_project_id}" try: - # Get the task object using the task_id - task = node.get_task(task_id) + task = node.get_task(odm_task_id) + log.info(f"Downloading results for task {dtm_project_id} to {output_file_path}") - # Create a temporary directory to store the results - output_file_path = f"/tmp/{dtm_project_id}" - - log.info(f"Downloading results for task {task_id} to {output_file_path}") - - # Download results as a zip file assets_path = task.download_zip(output_file_path) - - # Upload the results into S3 (Minio) - s3_path = f"dtm-data/projects/{dtm_project_id}/{dtm_task_id}/assets.zip" - log.info(f"Uploading {output_file_path} to S3 path: {s3_path}") + s3_path = f"dtm-data/projects/{dtm_project_id}/{dtm_task_id if dtm_task_id else ''}/assets.zip".strip( + "/" + ) + log.info(f"Uploading {assets_path} to S3 path: {s3_path}") add_file_to_bucket(settings.S3_BUCKET_NAME, assets_path, s3_path) - log.info(f"Assets for task {task_id} successfully uploaded to S3.") - - # Extract the zip file to find the orthophoto with zipfile.ZipFile(assets_path, "r") as zip_ref: zip_ref.extractall(output_file_path) @@ -276,64 +375,58 @@ async def download_and_upload_assets_from_odm_to_s3( output_file_path, "odm_orthophoto", "odm_orthophoto.tif" ) if not os.path.exists(orthophoto_path): - log.error(f"Orthophoto file not found at {orthophoto_path}") - raise FileNotFoundError(f"Orthophoto not found in {output_file_path}") - - log.info(f"Orthophoto found at {orthophoto_path}") + log.error(f"Orthophoto not found at {orthophoto_path}") + raise FileNotFoundError("Orthophoto file is missing") - # NOTE: Reproject the orthophoto to EPSG:3857, overwriting the original file reproject_to_web_mercator(orthophoto_path, orthophoto_path) + s3_ortho_path = f"dtm-data/projects/{dtm_project_id}/{dtm_task_id if dtm_task_id else ''}/orthophoto/odm_orthophoto.tif".strip( + "/" + ) - # Upload the reprojected orthophoto to S3 - s3_ortho_path = f"dtm-data/projects/{dtm_project_id}/{dtm_task_id}/orthophoto/odm_orthophoto.tif" log.info(f"Uploading reprojected orthophoto to S3 path: {s3_ortho_path}") add_file_to_bucket(settings.S3_BUCKET_NAME, orthophoto_path, s3_ortho_path) - # Upload the images.json file from extracted zip file into the path 'images.json' images_json_path = os.path.join(output_file_path, "images.json") - s3_images_json_path = ( - f"dtm-data/projects/{dtm_project_id}/{dtm_task_id}/images.json" + s3_images_json_path = f"dtm-data/projects/{dtm_project_id}/{dtm_task_id if dtm_task_id else ''}/images.json".strip( + "/" ) + log.info(f"Uploading images.json to S3 path: {s3_images_json_path}") add_file_to_bucket( settings.S3_BUCKET_NAME, images_json_path, s3_images_json_path ) - log.info( - f"Reprojected orthophoto for task {task_id} successfully uploaded to S3 at {s3_ortho_path}" - ) - # NOTE: This function uses a separate database connection pool because it is called by an internal server - # and doesn't rely on FastAPI's request context. This allows independent database access outside FastAPI's lifecycle. - - pool = await database.get_db_connection_pool() - async with pool as pool_instance: - async with pool_instance.connection() as conn: - await task_logic.update_task_state( - db=conn, - project_id=dtm_project_id, - task_id=dtm_task_id, - user_id=user_id, - comment=comment, - initial_state=current_state, - final_state=State.IMAGE_PROCESSING_FINISHED, - updated_at=timestamp(), - ) - log.info( - f"Task {dtm_task_id} state updated to IMAGE_PROCESSING_FINISHED in the database." - ) + log.info(f"Processing complete for project {dtm_project_id}") + + if state and dtm_task_id and dtm_user_id: + # NOTE: This function uses a separate database connection pool because it is called by an internal server + # and doesn't rely on FastAPI's request context. This allows independent database access outside FastAPI's lifecycle. + pool = await database.get_db_connection_pool() + async with pool as pool_instance: + async with pool_instance.connection() as conn: + await task_logic.update_task_state( + db=conn, + project_id=dtm_project_id, + task_id=dtm_task_id, + user_id=dtm_user_id, + comment=message, + initial_state=state, + final_state=State.IMAGE_PROCESSING_FINISHED, + updated_at=timestamp(), + ) + log.info( + f"Task {dtm_task_id} state updated to IMAGE_PROCESSING_FINISHED in the database." + ) except Exception as e: - log.error( - f"An error occurred during processing for task {task_id}. Details: {e}" - ) + log.error(f"Error during processing for project {dtm_project_id}: {e}") finally: - # Clean up the temporary directory if os.path.exists(output_file_path): try: shutil.rmtree(output_file_path) log.info(f"Temporary directory {output_file_path} cleaned up.") except Exception as cleanup_error: log.error( - f"Error cleaning up temporary directory {output_file_path}: {cleanup_error}" + f"Error cleaning up directory {output_file_path}: {cleanup_error}" ) diff --git a/src/backend/app/projects/project_logic.py b/src/backend/app/projects/project_logic.py index d08f06e2..af1769ad 100644 --- a/src/backend/app/projects/project_logic.py +++ b/src/backend/app/projects/project_logic.py @@ -200,12 +200,17 @@ async def preview_split_by_square(boundary: str, meters: int): ) -def process_drone_images( +async def process_drone_images( project_id: uuid.UUID, task_id: uuid.UUID, user_id: str, db: Connection ): # Initialize the processor processor = DroneImageProcessor( - settings.NODE_ODM_URL, project_id, task_id, user_id, db + node_odm_url=settings.NODE_ODM_URL, + project_id=project_id, + task_id=task_id, + user_id=user_id, + task_ids=None, + db=db, ) # Define processing options @@ -213,9 +218,8 @@ def process_drone_images( {"name": "dsm", "value": True}, {"name": "orthophoto-resolution", "value": 5}, ] - webhook_url = f"{settings.BACKEND_URL}/api/projects/odm/webhook/{user_id}/{project_id}/{task_id}/" - processor.process_images_from_s3( + await processor.process_images_from_s3( settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options, @@ -223,6 +227,35 @@ def process_drone_images( ) +async def process_all_drone_images( + project_id: uuid.UUID, tasks: list, user_id: str, db: Connection +): + # Initialize the processor + processor = DroneImageProcessor( + node_odm_url=settings.NODE_ODM_URL, + project_id=project_id, + task_id=None, + user_id=user_id, + task_ids=tasks, + db=db, + ) + + # Define processing options + options = [ + {"name": "dsm", "value": True}, + {"name": "orthophoto-resolution", "value": 5}, + ] + webhook_url = ( + f"{settings.BACKEND_URL}/api/projects/odm/webhook/{user_id}/{project_id}/" + ) + await processor.process_images_for_all_tasks( + settings.S3_BUCKET_NAME, + name_prefix=f"DTM-Task-{project_id}", + options=options, + webhook=webhook_url, + ) + + def get_project_info_from_s3(project_id: uuid.UUID, task_id: uuid.UUID): """ Helper function to get the number of images and the URL to download the assets. @@ -314,3 +347,18 @@ def generate_square_geojson(center_lat, center_lon, side_length_meters): ], } return geojson + + +async def get_all_tasks_for_project(project_id, db): + "Get all tasks associated with the project ID that are in state IMAGE_UPLOADED." + async with db.cursor() as cur: + query = """ + SELECT t.id + FROM tasks t + JOIN task_events te ON t.id = te.task_id + WHERE t.project_id = %s AND te.state = 'IMAGE_UPLOADED'; + """ + await cur.execute(query, (project_id,)) + results = await cur.fetchall() + # Convert UUIDs to string + return [str(result[0]) for result in results] diff --git a/src/backend/app/projects/project_routes.py b/src/backend/app/projects/project_routes.py index 5c1d74d4..e2ff6813 100644 --- a/src/backend/app/projects/project_routes.py +++ b/src/backend/app/projects/project_routes.py @@ -455,48 +455,59 @@ async def process_imagery( return {"message": "Processing started"} -@router.get( - "/assets/{project_id}/", - tags=["Image Processing"], -) -async def get_assets_info( - user_data: Annotated[AuthUser, Depends(login_required)], - db: Annotated[Connection, Depends(database.get_db)], +@router.post("/process_all_imagery/{project_id}/", tags=["Image Processing"]) +async def process_all_imagery( + project_id: uuid.UUID, project: Annotated[ project_schemas.DbProject, Depends(project_deps.get_project_by_id) ], - task_id: Optional[uuid.UUID] = None, + user_data: Annotated[AuthUser, Depends(login_required)], + background_tasks: BackgroundTasks, + db: Annotated[Connection, Depends(database.get_db)], ): """ - Endpoint to get the number of images and the URL to download the assets - for a given project and task. If no task_id is provided, returns info - for all tasks associated with the project. + API endpoint to process all tasks associated with a project. """ - if task_id is None: - # Fetch all tasks associated with the project - tasks = await project_deps.get_tasks_by_project_id(project.id, db) + user_id = user_data.id - results = [] + tasks = await project_logic.get_all_tasks_for_project(project.id, db) + background_tasks.add_task( + project_logic.process_all_drone_images, project_id, tasks, user_id, db + ) + return {"message": f"Processing started for {len(tasks)} tasks."} - for task in tasks: - task_info = project_logic.get_project_info_from_s3( - project.id, task.get("id") - ) - results.append(task_info) - return results - else: - current_state = await task_logic.get_task_state(db, project.id, task_id) - project_info = project_logic.get_project_info_from_s3(project.id, task_id) - project_info.state = current_state.get("state") - return project_info +@router.post("/odm/webhook/{dtm_user_id}/{dtm_project_id}/", tags=["Image Processing"]) +async def odm_webhook_for_processing_whole_project( + request: Request, + dtm_project_id: uuid.UUID, + dtm_user_id: str, + background_tasks: BackgroundTasks, +): + payload = await request.json() + odm_task_id = payload.get("uuid") + status = payload.get("status") + + if not odm_task_id or not status: + raise HTTPException(status_code=400, detail="Invalid webhook payload") + + if status["code"] in {30, 40}: + log.info(f"Project {dtm_project_id}: Processing status {status['code']}") + background_tasks.add_task( + image_processing.process_assets_from_odm, + node_odm_url=settings.NODE_ODM_URL, + dtm_project_id=dtm_project_id, + odm_task_id=odm_task_id, + ) + + return {"message": "Webhook received", "task_id": dtm_project_id} @router.post( "/odm/webhook/{dtm_user_id}/{dtm_project_id}/{dtm_task_id}/", tags=["Image Processing"], ) -async def odm_webhook( +async def odm_webhook_for_processing_a_single_task( request: Request, db: Annotated[Connection, Depends(database.get_db)], dtm_project_id: uuid.UUID, @@ -504,98 +515,51 @@ async def odm_webhook( dtm_user_id: str, background_tasks: BackgroundTasks, ): - """ - Webhook to receive notifications from ODM processing tasks. - """ - # Try to parse the JSON body - try: - payload = await request.json() - except Exception as e: - log.error(f"Error parsing JSON: {e}") - raise HTTPException(status_code=400, detail="Invalid JSON body") - - task_id = payload.get("uuid") + payload = await request.json() + odm_task_id = payload.get("uuid") status = payload.get("status") - if not task_id or not status: + if not odm_task_id or not status: raise HTTPException(status_code=400, detail="Invalid webhook payload") - log.info(f"Task ID: {task_id}, Status: {status}") + current_state = await task_logic.get_task_state(db, dtm_project_id, dtm_task_id) + state_value = State[current_state.get("state")] - # If status is 'success', download and upload assets to S3. - # 40 is the status code for success in odm if status["code"] == 40: - log.info(f"Task ID: {task_id}, Status: going for download......") - - current_state = await task_logic.get_task_state(db, dtm_project_id, dtm_task_id) - current_state_value = State[current_state.get("state")] - match current_state_value: - case State.IMAGE_PROCESSING_STARTED: - log.info( - f"Task ID: {task_id}, Status: already IMAGE_UPLOADED - no update needed." - ) - # Call function to download assets from ODM and upload to S3 - background_tasks.add_task( - image_processing.download_and_upload_assets_from_odm_to_s3, - settings.NODE_ODM_URL, - task_id, - dtm_project_id, - dtm_task_id, - dtm_user_id, - State.IMAGE_PROCESSING_STARTED, - "Task completed.", - ) - - case State.IMAGE_PROCESSING_FAILED: - log.warning( - f"Task ID: {task_id}, Status: previously failed, updating to IMAGE_UPLOADED" - ) - # Call function to download assets from ODM and upload to S3 - background_tasks.add_task( - image_processing.download_and_upload_assets_from_odm_to_s3, - settings.NODE_ODM_URL, - task_id, - dtm_project_id, - dtm_task_id, - dtm_user_id, - State.IMAGE_PROCESSING_FAILED, - "Task completed.", - ) - - case _: - log.info( - f"Task ID: {task_id}, Status: updating to IMAGE_UPLOADED from {current_state}" - ) - - elif status["code"] == 30: - current_state = await task_logic.get_task_state(db, dtm_project_id, dtm_task_id) - # If the current state is not already IMAGE_PROCESSING_FAILED, update it - if current_state != State.IMAGE_PROCESSING_FAILED: - await task_logic.update_task_state( - db, - dtm_project_id, - dtm_task_id, - dtm_user_id, - "Image processing failed.", - State.IMAGE_PROCESSING_STARTED, - State.IMAGE_PROCESSING_FAILED, - timestamp(), - ) - - background_tasks.add_task( - image_processing.download_and_upload_assets_from_odm_to_s3, - settings.NODE_ODM_URL, - task_id, - dtm_project_id, - dtm_task_id, - dtm_user_id, - State.IMAGE_PROCESSING_FAILED, - "Image processing failed.", - ) + background_tasks.add_task( + image_processing.process_assets_from_odm, + node_odm_url=settings.NODE_ODM_URL, + dtm_project_id=dtm_project_id, + odm_task_id=odm_task_id, + state=state_value, + message="Task completed.", + dtm_task_id=dtm_task_id, + dtm_user_id=dtm_user_id, + ) - log.info(f"Task ID: {task_id}, Status: Webhook received") + elif status["code"] == 30 and state_value != State.IMAGE_PROCESSING_FAILED: + await task_logic.update_task_state( + db, + dtm_project_id, + dtm_task_id, + dtm_user_id, + "Image processing failed.", + state_value, + State.IMAGE_PROCESSING_FAILED, + timestamp(), + ) + background_tasks.add_task( + image_processing.process_assets_from_odm, + node_odm_url=settings.NODE_ODM_URL, + dtm_project_id=dtm_project_id, + odm_task_id=odm_task_id, + state=state_value, + message="Image processing failed.", + dtm_task_id=dtm_task_id, + dtm_user_id=dtm_user_id, + ) - return {"message": "Webhook received", "task_id": task_id} + return {"message": "Webhook received", "task_id": odm_task_id} @router.post("/regulator/comment/{project_id}/", tags=["regulator"]) @@ -744,3 +708,40 @@ async def get_project_waypoints_counts( return { "avg_no_of_waypoints": len(json.loads(points_with_elevation)["features"]), } + + +@router.get( + "/assets/{project_id}/", + tags=["Image Processing"], +) +async def get_assets_info( + user_data: Annotated[AuthUser, Depends(login_required)], + db: Annotated[Connection, Depends(database.get_db)], + project: Annotated[ + project_schemas.DbProject, Depends(project_deps.get_project_by_id) + ], + task_id: Optional[uuid.UUID] = None, +): + """ + Endpoint to get the number of images and the URL to download the assets + for a given project and task. If no task_id is provided, returns info + for all tasks associated with the project. + """ + if task_id is None: + # Fetch all tasks associated with the project + tasks = await project_deps.get_tasks_by_project_id(project.id, db) + + results = [] + + for task in tasks: + task_info = project_logic.get_project_info_from_s3( + project.id, task.get("id") + ) + results.append(task_info) + + return results + else: + current_state = await task_logic.get_task_state(db, project.id, task_id) + project_info = project_logic.get_project_info_from_s3(project.id, task_id) + project_info.state = current_state.get("state") + return project_info