diff --git a/dsst_etl/upload_pdfs.py b/dsst_etl/upload_pdfs.py index 53bb308..6322357 100644 --- a/dsst_etl/upload_pdfs.py +++ b/dsst_etl/upload_pdfs.py @@ -1,4 +1,5 @@ import hashlib +import json import os from pathlib import Path from typing import List, Optional, Tuple @@ -10,7 +11,7 @@ from dsst_etl import __version__, get_db_engine, logger from dsst_etl._utils import get_bucket_name, get_compute_context_id from dsst_etl.db import get_db_session -from dsst_etl.models import Documents, Provenance, Works +from dsst_etl.models import Documents, Identifier, Provenance, Works from .config import config @@ -169,6 +170,69 @@ def link_documents_to_work(self, document_ids: List[int], work_id: int) -> None: self.db_session.commit() logger.info(f"Linked {len(document_ids)} documents to work_id {work_id}") + def create_identifier_record( + self, document: Documents, metadata: dict + ) -> Optional[Identifier]: + """ + Create an identifier record for a document if metadata is provided. + + Args: + document (Documents): The document record + metadata (dict): Metadata containing identifiers like PMID, PMCID, DOI + + Returns: + Optional[Identifier]: Created identifier record or None + """ + if not metadata: + return None + + identifier = Identifier( + document_id=document.id, + pmid=metadata.get("PMID"), + pmcid=metadata.get("PMCID"), + doi=metadata.get("DOI"), + provenance_id=document.provenance_id, + ) + + try: + self.db_session.add(identifier) + self.db_session.commit() + logger.info(f"Created identifier record for document ID {document.id}") + return identifier + except Exception as e: + self.db_session.rollback() + logger.error(f"Failed to create identifier record: {str(e)}") + return None + + def upload_pdfs_with_metadata( + self, pdf_paths: List[str], metadata: dict + ) -> Tuple[List[str], List[str]]: + """ + Upload PDFs to S3 and create identifier records based on metadata. + + Args: + pdf_paths (List[str]): List of paths to PDF files + metadata (dict): Metadata for each PDF file + + Returns: + Tuple[List[str], List[str]]: Lists of successful and failed uploads + """ + successful_uploads, failed_uploads = self.upload_pdfs(pdf_paths) + + for pdf_path in successful_uploads: + document = ( + self.db_session.query(Documents) + .filter_by( + s3uri=f"s3://{self.bucket_name}/pdfs/{os.path.basename(pdf_path)}" + ) + .first() + ) + if document: + file_metadata = metadata.get(pdf_path, {}) + self.create_identifier_record(document, file_metadata) + + return successful_uploads, failed_uploads + def upload_directory(pdf_directory_path: str, comment: Optional[str] = None) -> None: """ @@ -209,3 +273,53 @@ def upload_directory(pdf_directory_path: str, comment: Optional[str] = None) -> for document in documents: uploader.initial_work_for_document(document, provenance) + + +def upload_directory_with_metadata( + pdf_directory_path: str, + metadata_source: Optional[str] = None, + comment: Optional[str] = None, +) -> None: + """ + Upload all PDFs from a directory to S3 and create necessary database records with metadata. + + Args: + pdf_directory_path (str): Path to directory containing PDFs + metadata_source (Optional[str]): Path to JSON file or dictionary with metadata + comment (Optional[str]): Comment for provenance record + """ + pdf_directory = Path(pdf_directory_path) + pdf_files = list(pdf_directory.glob("*.pdf")) + + if not pdf_files: + logger.warning(f"No PDF files found in {pdf_directory_path}") + return + + metadata = {} + if metadata_source: + if isinstance(metadata_source, str) and os.path.isfile(metadata_source): + with open(metadata_source, "r") as f: + metadata = json.load(f) + elif isinstance(metadata_source, dict): + metadata = metadata_source + + uploader = PDFUploader(get_db_session(get_db_engine())) + + successful_uploads, failed_uploads = uploader.upload_pdfs_with_metadata( + pdf_files, metadata + ) + + if failed_uploads: + logger.warning(f"Failed to upload {len(failed_uploads)} files") + + if successful_uploads: + documents = uploader.create_document_records(successful_uploads) + + if not documents: + logger.warning("No documents created") + return + + provenance = uploader.create_provenance_record(documents, comment) + + for document in documents: + uploader.initial_work_for_document(document, provenance)