Skip to content

Commit

Permalink
Improvements to continuous logging of migration activity
Browse files Browse the repository at this point in the history
  • Loading branch information
monotasker committed Jan 12, 2024
1 parent c37266b commit cfaad1d
Showing 1 changed file with 95 additions and 37 deletions.
132 changes: 95 additions & 37 deletions scripts/core-migrate/core_migrate/record_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ def api_request(
method: str = "GET",
endpoint: str = "records",
server: str = "",
protocol: str = "",
args: str = "",
token: str = "",
params: dict[str, str] = {},
json_dict: Optional[Union[dict[str, str], list[dict]]] = {},
file_data: Optional[bytes] = None,
protocol: str = "",
) -> dict:
"""
Make an api request and return the response
Expand Down Expand Up @@ -973,17 +973,20 @@ def create_full_invenio_record(core_data: dict, no_updates: bool) -> dict:
)
else:
logger.error(files_delete)
old_files = metadata_record["json"]["files"]["entries"]
logger.error(
"Existing record with same DOI has different"
f" files.\n{metadata_record['json']['files']['entries']}\n"
f" !=\n {core_data['files']['entries']}Could not"
" delete"
f" files.\n{old_files}\n"
f" !=\n {core_data['files']['entries']}\n"
"Could not delete existing file "
f"{existing_file[0]['key']}."
)
raise RuntimeError(
"Existing record with same DOI has different"
f" files.\n{metadata_record['json']['files']['entries']}\n"
f" !=\n {core_data['files']['entries']}Could not"
" delete"
f" files.\n{old_files}\n"
f" !=\n {core_data['files']['entries']}\n"
"Could not delete existing file "
f"{existing_file[0]['key']}."
)

if same_files:
Expand Down Expand Up @@ -1210,6 +1213,59 @@ def load_records_into_invenio(
else:
range_args.append(start_index)

def log_failed_record(
index, invenio_id, commons_id, core_record_id
) -> None:
"""
Log a failed record to the failed records log file.
"""
failed_records.append(
{
"index": index,
"invenio_id": invenio_id,
"commons_id": commons_id,
"core_record_id": core_record_id,
}
)
with jsonlines.open(
Path(__file__).parent
/ "logs"
/ "core_migrate_failed_records.jsonl",
"w",
) as failed_writer:
total_failed = [*failed_records]
for e in residual_failed_records:
if e not in failed_records:
total_failed.append(e)
ordered_failed_records = sorted(
total_failed, key=lambda r: r["index"]
)
for o in ordered_failed_records:
failed_writer.write(o)

def log_touched_record(
index, invenio_id, commons_id, core_record_id
) -> None:
"""
Log a touched record to the touched records log file.
"""
touched = {
"index": index,
"invenio_id": invenio_id,
"commons_id": commons_id,
"core_record_id": core_record_id,
}
if touched not in touched_records:
touched_records.append(touched)
if commons_id not in previously_touched_sourceids:
with jsonlines.open(
Path(__file__).parent
/ "logs"
/ "core_migrate_touched_records.jsonl",
"a",
) as touched_writer:
touched_writer.write(touched)

# Load list of previously touched records
previously_touched_records = []
touched_log_path = (
Expand All @@ -1223,6 +1279,9 @@ def load_records_into_invenio(
previously_touched_records = [obj for obj in reader]
except FileNotFoundError:
logger.info("**no existing touched records log file found...**")
previously_touched_sourceids = [
r["commons_id"] for r in previously_touched_records
]

# Load list of failed records from prior runs
existing_failed_records = []
Expand Down Expand Up @@ -1327,6 +1386,14 @@ def load_records_into_invenio(
)
spinner.start()
try:
log_touched_record(
**{
"index": current_record,
"invenio_id": rec_doi,
"commons_id": rec_hcid,
"core_record_id": rec_recid,
}
)
result = create_full_invenio_record(rec, no_updates)
print(f" loaded record {current_record}")
successful_records += 1
Expand All @@ -1338,14 +1405,6 @@ def load_records_into_invenio(
updated_published += 1
if result.get("updated_draft"):
updated_drafts += 1
touched_records.append(
{
"index": current_record,
"invenio_id": rec_doi,
"commons_id": rec_hcid,
"core_record_id": rec_recid,
}
)
if rec_hcid in existing_failed_hcids:
logger.info(" repaired previously failed record...")
logger.info(f" {rec_doi} {rec_hcid} {rec_recid}")
Expand All @@ -1369,8 +1428,8 @@ def load_records_into_invenio(
logger.error(
f"ERROR: {format_exception(None, e, e.__traceback__)}"
)
failed_records.append(
{
log_failed_record(
**{
"index": current_record,
"invenio_id": rec_doi,
"commons_id": rec_hcid,
Expand Down Expand Up @@ -1401,8 +1460,8 @@ def load_records_into_invenio(
f" {str(successful_records - new_records)} already existed \n "
f" {str(updated_published)} updated published records \n "
f" {str(updated_drafts)} updated existing draft records \n "
f" {str(unchanged_existing)} unchanged existing records \n) "
f" {str(len(repaired_failed))} previously failed records repaired \n) "
f" {str(unchanged_existing)} unchanged existing records \n "
f" {str(len(repaired_failed))} previously failed records repaired \n "
f" {str(len(failed_records))} failed \n"
)
print(message)
Expand All @@ -1425,33 +1484,32 @@ def load_records_into_invenio(
for r in failed_records:
print(r)
logger.info(r)

with jsonlines.open(failed_log_path, "w") as failed_writer:
total_failed = [*failed_records]
for e in residual_failed_records:
if e not in failed_records:
total_failed.append(e)
ordered_failed_records = sorted(total_failed, key=lambda r: r["index"])
for o in ordered_failed_records:
failed_writer.write(o)
if failed_records:
print(
"Failed records written to logs/core_migrate_failed_records.jsonl"
)
logger.info(
"Failed records written to logs/core_migrate_failed_records.jsonl"
)

# Update log of touched records
with jsonlines.open(touched_log_path, "a") as f:
for t in previously_touched_records:
if t not in touched_records:
touched_records.append(t)
# Order touched records in log file (saved time earlier by not
# doing this on each iteration)
with jsonlines.open(
Path(__file__).parent / "logs" / "core_migrate_touched_records.jsonl",
"w",
) as touched_writer:
total_touched = []
for t in touched_records:
if t not in total_touched:
total_touched.append(t)
for e in previously_touched_records:
if e not in total_touched:
total_touched.append(e)
ordered_touched_records = sorted(
touched_records, key=lambda r: r["index"]
total_touched, key=lambda r: r["index"]
)
for u in ordered_touched_records:
f.write(u)
for o in ordered_touched_records:
touched_writer.write(o)

print("Touched records written to logs/core_migrate_touched_records.jsonl")
logger.info(
"Touched records written to logs/core_migrate_touched_records.jsonl"
Expand Down

0 comments on commit cfaad1d

Please sign in to comment.