Skip to content

Commit

Permalink
Add PDF metadata handling in PDFUploader
Browse files Browse the repository at this point in the history
  • Loading branch information
quang-ng committed Dec 24, 2024
1 parent 197c5f7 commit 77fab4f
Showing 1 changed file with 115 additions and 1 deletion.
116 changes: 115 additions & 1 deletion dsst_etl/upload_pdfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import json
import os
from pathlib import Path
from typing import List, Optional, Tuple
Expand All @@ -10,7 +11,7 @@
from dsst_etl import __version__, get_db_engine, logger
from dsst_etl._utils import get_bucket_name, get_compute_context_id
from dsst_etl.db import get_db_session
from dsst_etl.models import Documents, Provenance, Works
from dsst_etl.models import Documents, Identifier, Provenance, Works

from .config import config

Expand Down Expand Up @@ -169,6 +170,69 @@ def link_documents_to_work(self, document_ids: List[int], work_id: int) -> None:
self.db_session.commit()
logger.info(f"Linked {len(document_ids)} documents to work_id {work_id}")

def create_identifier_record(
self, document: Documents, metadata: dict
) -> Optional[Identifier]:
"""
Create an identifier record for a document if metadata is provided.
Args:
document (Documents): The document record
metadata (dict): Metadata containing identifiers like PMID, PMCID, DOI
Returns:
Optional[Identifier]: Created identifier record or None
"""
if not metadata:
return None

identifier = Identifier(
document_id=document.id,
pmid=metadata.get("PMID"),
pmcid=metadata.get("PMCID"),
doi=metadata.get("DOI"),
provenance_id=document.provenance_id,
)

try:
self.db_session.add(identifier)
self.db_session.commit()
logger.info(f"Created identifier record for document ID {document.id}")
return identifier
except Exception as e:
self.db_session.rollback()
logger.error(f"Failed to create identifier record: {str(e)}")
return None

def upload_pdfs_with_metadata(
self, pdf_paths: List[str], metadata: dict
) -> Tuple[List[str], List[str]]:
"""
Upload PDFs to S3 and create identifier records based on metadata.
Args:
pdf_paths (List[str]): List of paths to PDF files
metadata (dict): Metadata for each PDF file
Returns:
Tuple[List[str], List[str]]: Lists of successful and failed uploads
"""
successful_uploads, failed_uploads = self.upload_pdfs(pdf_paths)

for pdf_path in successful_uploads:
document = (
self.db_session.query(Documents)
.filter_by(
s3uri=f"s3://{self.bucket_name}/pdfs/{os.path.basename(pdf_path)}"
)
.first()
)
if document:
file_metadata = metadata.get(pdf_path, {})
self.create_identifier_record(document, file_metadata)

return successful_uploads, failed_uploads


def upload_directory(pdf_directory_path: str, comment: Optional[str] = None) -> None:
"""
Expand Down Expand Up @@ -209,3 +273,53 @@ def upload_directory(pdf_directory_path: str, comment: Optional[str] = None) ->

for document in documents:
uploader.initial_work_for_document(document, provenance)


def upload_directory_with_metadata(
pdf_directory_path: str,
metadata_source: Optional[str] = None,
comment: Optional[str] = None,
) -> None:
"""
Upload all PDFs from a directory to S3 and create necessary database records with metadata.
Args:
pdf_directory_path (str): Path to directory containing PDFs
metadata_source (Optional[str]): Path to JSON file or dictionary with metadata
comment (Optional[str]): Comment for provenance record
"""
pdf_directory = Path(pdf_directory_path)
pdf_files = list(pdf_directory.glob("*.pdf"))

if not pdf_files:
logger.warning(f"No PDF files found in {pdf_directory_path}")
return

metadata = {}
if metadata_source:
if isinstance(metadata_source, str) and os.path.isfile(metadata_source):
with open(metadata_source, "r") as f:
metadata = json.load(f)
elif isinstance(metadata_source, dict):
metadata = metadata_source

uploader = PDFUploader(get_db_session(get_db_engine()))

successful_uploads, failed_uploads = uploader.upload_pdfs_with_metadata(
pdf_files, metadata
)

if failed_uploads:
logger.warning(f"Failed to upload {len(failed_uploads)} files")

if successful_uploads:
documents = uploader.create_document_records(successful_uploads)

if not documents:
logger.warning("No documents created")
return

provenance = uploader.create_provenance_record(documents, comment)

for document in documents:
uploader.initial_work_for_document(document, provenance)

0 comments on commit 77fab4f

Please sign in to comment.