Skip to content

Commit

Permalink
More core_migrate improvements in arg handling and json error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
monotasker committed Jan 12, 2024
1 parent 9e2cd3b commit 2a71581
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ scripts/core-migrate/core_migrate/logs/core_migrate.log.2
scripts/core-migrate/core_migrate/logs/core_migrate.log.3
scripts/core-migrate/core_migrate/logs/core_migrate.log.4
scripts/core-migrate/core_migrate/logs/core_migrate.log.5
scripts/core-migrate/core_migrate/logs/core_migrate_failed_records.jsonl
2 changes: 1 addition & 1 deletion scripts/core-migrate/core_migrate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def load_records(
"retry_failed": retry_failed,
"use_sourceids": use_sourceids,
}
if "-" in records[0]:
if len(records) > 0 and "-" in records[0]:
if use_sourceids:
print("Error: Cannot use source ids with ranges.")
logger.error(
Expand Down
56 changes: 37 additions & 19 deletions scripts/core-migrate/core_migrate/record_loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from halo import Halo
import itertools
import json
from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
import jsonlines
from pathlib import Path
import requests
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
import subprocess
from traceback import format_exception, print_exc
from typing import Optional, Union
Expand Down Expand Up @@ -79,7 +81,11 @@ def api_request(

try:
json_response = response.json() if method != "DELETE" else None
except json.decoder.JSONDecodeError:
except (
SimpleJSONDecodeError,
RequestsJSONDecodeError,
json.decoder.JSONDecodeError,
):
logger.error("url for API request:")
logger.error(api_url)
logger.error("response status code:")
Expand Down Expand Up @@ -1161,11 +1167,11 @@ def load_records_into_invenio(
unchanged_existing = 0
new_records = 0
repaired_failed = []
range_args = [start_index]
range_args = [start_index - 1]
if stop_index > -1 and stop_index >= start_index:
range_args.append(stop_index + 1)
range_args.append(stop_index)
else:
range_args.append(start_index + 1)
range_args.append(start_index)

# Load list of previously touched records
previously_touched_records = []
Expand Down Expand Up @@ -1194,7 +1200,9 @@ def load_records_into_invenio(
existing_failed_records = [obj for obj in reader]
except FileNotFoundError:
logger.info("**no existing failed records log file found...**")
existing_failed_indices = [r["index"] for r in existing_failed_records]
existing_failed_hcids = [r["commons_id"] for r in existing_failed_records]
residual_failed_records = [*existing_failed_records]

logger.info("Starting to load records into Invenio...")
if no_updates:
Expand Down Expand Up @@ -1232,10 +1240,9 @@ def load_records_into_invenio(
logger.info("No previously failed records to retry.")
return
line_num = 1
failed_indices = [r["index"] for r in existing_failed_records]
record_set = []
for j in json_source:
if line_num in failed_indices:
if line_num in existing_failed_indices:
j["jsonl_index"] = line_num
record_set.append(j)
line_num += 1
Expand Down Expand Up @@ -1263,7 +1270,6 @@ def load_records_into_invenio(
for rec in record_set:
if "jsonl_index" in rec.keys():
current_record = rec["jsonl_index"]
logger.info(f"current_record is jsonl_index {current_record}")
else:
current_record = start_index + record_counter
rec_doi = rec["pids"]["doi"]["identifier"]
Expand Down Expand Up @@ -1304,11 +1310,21 @@ def load_records_into_invenio(
}
)
if rec_hcid in existing_failed_hcids:
existing_failed_records = [
logger.info(" repaired previously failed record...")
logger.info(f" {rec_doi} {rec_hcid} {rec_recid}")
residual_failed_records = [
d
for d in existing_failed_records
for d in residual_failed_records
if d["commons_id"] != rec_hcid
]
repaired_failed.append(
{
"index": current_record,
"invenio_id": rec_doi,
"commons_id": rec_hcid,
"core_record_id": rec_recid,
}
)
except Exception as e:
print("ERROR:", e)
print_exc()
Expand Down Expand Up @@ -1356,7 +1372,9 @@ def load_records_into_invenio(
logger.info(message)

# Report
if repaired_failed:
if repaired_failed or (
existing_failed_records and not residual_failed_records
):
print("Previously failed records repaired:")
logger.info("Previously failed records repaired:")
for r in repaired_failed:
Expand All @@ -1371,15 +1389,15 @@ def load_records_into_invenio(
print(r)
logger.info(r)

with jsonlines.open(failed_log_path, "w") as failed_writer:
for e in existing_failed_records:
if e not in failed_records:
failed_records.append(e)
ordered_failed_records = sorted(
failed_records, key=lambda r: r["index"]
)
for o in ordered_failed_records:
failed_writer.write(o)
with jsonlines.open(failed_log_path, "w") as failed_writer:
total_failed = [*failed_records]
for e in residual_failed_records:
if e not in failed_records:
total_failed.append(e)
ordered_failed_records = sorted(total_failed, key=lambda r: r["index"])
for o in ordered_failed_records:
failed_writer.write(o)
if failed_records:
print(
"Failed records written to logs/core_migrate_failed_records.jsonl"
)
Expand Down

0 comments on commit 2a71581

Please sign in to comment.