From 2ecc2c999de6c5655974453207fbbad0d22cef48 Mon Sep 17 00:00:00 2001 From: Alan Kuurstra Date: Fri, 9 Aug 2024 16:12:21 -0400 Subject: [PATCH] timezone aware --- heudiconv/bids.py | 9 ++- heudiconv/dicoms.py | 22 +++---- heudiconv/tests/test_utils.py | 89 ++++++++++++++++++++++++---- heudiconv/utils.py | 105 +++++++++++++++++++++++++++++----- 4 files changed, 182 insertions(+), 43 deletions(-) diff --git a/heudiconv/bids.py b/heudiconv/bids.py index d9edbe20..283429c1 100644 --- a/heudiconv/bids.py +++ b/heudiconv/bids.py @@ -31,7 +31,7 @@ remove_suffix, save_json, set_readonly, - strptime_micr, + strptime_bids, update_json, ) @@ -952,17 +952,16 @@ def select_fmap_from_compatible_groups( k for k, v in acq_times_fmaps.items() if v == first_acq_time ][0] elif criterion == "Closest": - json_acq_time = strptime_micr( + json_acq_time = strptime_bids( acq_times[ # remove session folder and '.json', add '.nii.gz': remove_suffix(remove_prefix(json_file, sess_folder + op.sep), ".json") + ".nii.gz" - ], - "%Y-%m-%dT%H:%M:%S[.%f]", + ] ) # differences in acquisition time (abs value): diff_fmaps_acq_times = { - k: abs(strptime_micr(v, "%Y-%m-%dT%H:%M:%S[.%f]") - json_acq_time) + k: abs(strptime_bids(v) - json_acq_time) for k, v in acq_times_fmaps.items() } min_diff_acq_times = sorted(diff_fmaps_acq_times.values())[0] diff --git a/heudiconv/dicoms.py b/heudiconv/dicoms.py index 1d276e39..9afb67a4 100644 --- a/heudiconv/dicoms.py +++ b/heudiconv/dicoms.py @@ -32,7 +32,8 @@ get_typed_attr, load_json, set_readonly, - strptime_micr, + strptime_dcm_da_tm, + strptime_dcm_dt ) if TYPE_CHECKING: @@ -531,19 +532,12 @@ def get_datetime_from_dcm(dcm_data: dcm.FileDataset) -> Optional[datetime.dateti 3. SeriesDate & SeriesTime (0008,0021); (0008,0031) """ - acq_date = dcm_data.get("AcquisitionDate", "").strip() - acq_time = dcm_data.get("AcquisitionTime", "").strip() - if acq_date and acq_time: - return strptime_micr(acq_date + acq_time, "%Y%m%d%H%M%S[.%f]") - - acq_dt = dcm_data.get("AcquisitionDateTime", "").strip() - if acq_dt: - return strptime_micr(acq_dt, "%Y%m%d%H%M%S[.%f]") - - series_date = dcm_data.get("SeriesDate", "").strip() - series_time = dcm_data.get("SeriesTime", "").strip() - if series_date and series_time: - return strptime_micr(series_date + series_time, "%Y%m%d%H%M%S[.%f]") + if "AcquisitionDate" in dcm_data and "AcquisitionTime" in dcm_data: + return strptime_dcm_da_tm(dcm_data, "AcquisitionDate", "AcquisitionTime") + if "AcquisitionDateTime" in dcm_data: + return strptime_dcm_dt(dcm_data, "AcquisitionDateTime") + if "SeriesDate" in dcm_data and "SeriesTime" in dcm_data: + return strptime_dcm_da_tm(dcm_data, "SeriesDate", "SeriesTime") return None diff --git a/heudiconv/tests/test_utils.py b/heudiconv/tests/test_utils.py index 064f50ce..3277d9f7 100644 --- a/heudiconv/tests/test_utils.py +++ b/heudiconv/tests/test_utils.py @@ -9,6 +9,7 @@ from typing import IO, Any from unittest.mock import patch +import pydicom as dcm import pytest from heudiconv.utils import ( @@ -22,7 +23,9 @@ remove_prefix, remove_suffix, save_json, - strptime_micr, + strptime_bids, + strptime_dcm_da_tm, + strptime_dcm_dt, update_json, ) @@ -173,19 +176,85 @@ def test_get_datetime() -> None: @pytest.mark.parametrize( "dt, fmt", [ - ("20230310190100", "%Y%m%d%H%M%S"), ("2023-04-02T11:47:09", "%Y-%m-%dT%H:%M:%S"), + ("2023-04-02T11:47:09.0", "%Y-%m-%dT%H:%M:%S.%f"), + ("2023-04-02T11:47:09.000000", "%Y-%m-%dT%H:%M:%S.%f"), + ("2023-04-02T11:47:09.1", "%Y-%m-%dT%H:%M:%S.%f"), + ("2023-04-02T11:47:09-0900", "%Y-%m-%dT%H:%M:%S%z"), + ("2023-04-02T11:47:09.1-0900", "%Y-%m-%dT%H:%M:%S.%f%z"), ], ) -def test_strptime_micr(dt: str, fmt: str) -> None: +def test_strptime_bids(dt: str, fmt: str) -> None: target = datetime.strptime(dt, fmt) - assert strptime_micr(dt, fmt) == target - assert strptime_micr(dt, fmt + "[.%f]") == target - assert strptime_micr(dt + ".0", fmt + "[.%f]") == target - assert strptime_micr(dt + ".000000", fmt + "[.%f]") == target - assert strptime_micr(dt + ".1", fmt + "[.%f]") == datetime.strptime( - dt + ".1", fmt + ".%f" - ) + assert strptime_bids(dt) == target + + +@pytest.mark.parametrize( + "tm, tm_fmt", + [ + ("114709.1", "%H%M%S.%f"), + ("114709", "%H%M%S"), + ("1147", "%H%M"), + ("11", "%H"), + ], +) +@pytest.mark.parametrize( + "offset, offset_fmt", + [ + ("-0900", "%z"), + ('', ''), + ], +) +def test_strptime_dcm_da_tm(tm: str, tm_fmt: str, offset: str, offset_fmt: str) -> None: + da = "20230402" + da_fmt = "%Y%m%d" + target = datetime.strptime(da + tm + offset, da_fmt + tm_fmt + offset_fmt) + ds = dcm.dataset.Dataset() + ds["AcquisitionDate"] = dcm.DataElement("AcquisitionDate","DA",da) + ds["AcquisitionTime"] = dcm.DataElement("AcquisitionTime", "TM", tm) + if offset: + ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset) + assert strptime_dcm_da_tm(ds, "AcquisitionDate", "AcquisitionTime") == target + + +@pytest.mark.parametrize( + "dt, dt_fmt", + [ + ("20230402114709.1-0400", "%Y%m%d%H%M%S.%f%z"), + ("20230402114709-0400", "%Y%m%d%H%M%S%z"), + ("202304021147-0400", "%Y%m%d%H%M%z"), + ("2023040211-0400", "%Y%m%d%H%z"), + ("20230402-0400", "%Y%m%d%z"), + ("202304-0400", "%Y%m%z"), + ("2023-0400", "%Y%z"), + ("20230402114709.1", "%Y%m%d%H%M%S.%f"), + ("20230402114709", "%Y%m%d%H%M%S"), + ("202304021147", "%Y%m%d%H%M"), + ("2023040211", "%Y%m%d%H"), + ("20230402", "%Y%m%d"), + ("202304", "%Y%m"), + ("2023", "%Y"), + ], +) +@pytest.mark.parametrize( + "offset, offset_fmt", + [ + ("-0900", "%z"), + ('', ''), + ], +) +def test_strptime_dcm_dt(dt: str, dt_fmt: str, offset: str, offset_fmt: str) -> None: + target = None + if dt_fmt[-2:] == "%z" and offset: + target = datetime.strptime(dt, dt_fmt) + else: + target = datetime.strptime(dt + offset, dt_fmt + offset_fmt) + ds = dcm.dataset.Dataset() + ds["AcquisitionDateTime"] = dcm.DataElement("AcquisitionDateTime","DT", dt) + if offset: + ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset) + assert strptime_dcm_dt(ds, "AcquisitionDateTime") == target + def test_remove_suffix() -> None: diff --git a/heudiconv/utils.py b/heudiconv/utils.py index f7bf16a7..494a8efb 100644 --- a/heudiconv/utils.py +++ b/heudiconv/utils.py @@ -4,7 +4,7 @@ from collections.abc import Callable from collections.abc import Mapping as MappingABC import copy -from datetime import datetime +import datetime from glob import glob import hashlib import json @@ -13,6 +13,8 @@ import os import os.path as op from pathlib import Path +import pydicom as dcm +from pydicom.tag import TagType import re import shutil import stat @@ -662,32 +664,107 @@ def get_datetime(date: str, time: str, *, microseconds: bool = True) -> str: # add dummy microseconds if not available for strptime to parse time += ".000000" td = time + ":" + date - datetime_str = datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat() + datetime_str = datetime.datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat() if not microseconds: datetime_str = datetime_str.split(".", 1)[0] return datetime_str +def datetime_utc_offset(datetime_obj: datetime, utc_offset: str): + """set the datetime's tzinfo by parsing an utc offset string""" + sign, hours, minutes = re.match(r"([+\-]?)(\d{2})(\d{2})", utc_offset).groups() + sign = -1 if sign == '-' else 1 + hours, minutes = int(hours), int(minutes) + tzinfo = datetime.timezone(sign * datetime.timedelta(hours=hours, minutes=minutes)) + return datetime_obj.replace(tzinfo=tzinfo) -def strptime_micr(date_string: str, fmt: str) -> datetime: +def strptime(datetime_string: str, fmts: list[str]) -> datetime: r""" - Decorate strptime while supporting optional [.%f] in the format at the end + Try datetime.strptime on a list of formats returning the first successful attempt. + + Parameters + ---------- + datetime_string: str + Datetime string to parse + fmts: list[str] + List of format strings + """ + datetime_str = datetime_string.strip() + for fmt in fmts: + try: + #return datetime.datetime.strptime(datetime_str, fmt) + retval = datetime.datetime.strptime(datetime_str, fmt) + print(retval) + return retval + except ValueError: + pass + raise ValueError(f"Unable to parse datetime string: {datetime_str}") + +def strptime_bids(datetime_string: str) -> datetime: + r""" + Create a datetime object from a bids datetime string. Parameters ---------- date_string: str - Date string to parse - fmt: str - Format string. If it ends with [.%f], we keep it if date_string ends with - '.\d+' regex and not if it does not. + Datetime string to parse """ + # https://bids-specification.readthedocs.io/en/stable/common-principles.html#units + fmts = ["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"] + datetime_obj = strptime(datetime_string, fmts) + return datetime_obj - optional_micr = "[.%f]" - if fmt.endswith(optional_micr): - fmt = fmt[: -len(optional_micr)] - if re.search(r"\.\d+$", date_string): - fmt += ".%f" - return datetime.strptime(date_string, fmt) +def strptime_dcm_da_tm(dcm_data: dcm.Dataset, da_tag: TagType, tm_tag: TagType) -> datetime: + r""" + Create a datetime object from a dicom DA tag and TM tag. + Parameters + ---------- + dcm_data : dcm.FileDataset + DICOM with header, e.g., as read by pydicom.dcmread. + Objects with __getitem__ and have those keys with values properly formatted may also work + da_tag: str + Dicom tag with DA value representation + tm_tag: str + Dicom tag with TM value representation + """ + # https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html + date_str = dcm_data[da_tag].value + fmts = ["%Y%m%d",] + date = strptime(date_str, fmts) + + time_str = dcm_data[tm_tag].value + fmts = ["%H", "%H%M", "%H%M%S", "%H%M%S.%f"] + time = strptime(time_str, fmts) + + datetime_obj = datetime.datetime.combine(date.date(), time.time()) + + if (0x0008, 0x0201) in dcm_data: + utc_offset = dcm_data[0x0008, 0x0201].value + datetime_obj = datetime_utc_offset(datetime_obj, utc_offset) if utc_offset else datetime_obj + return datetime_obj + +def strptime_dcm_dt(dcm_data: dcm.Dataset, dt_tag: TagType) -> datetime: + r""" + Create a datetime object from a dicom DT tag. + + Parameters + ---------- + dcm_data : dcm.FileDataset + DICOM with header, e.g., as read by pydicom.dcmread. + Objects with __getitem__ and have those keys with values properly formatted may also work + da_tag: str + Dicom tag with DT value representation + """ + # https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html + datetime_str = dcm_data.get(dt_tag) + fmts = ["%Y%z", "%Y%m%z", "%Y%m%d%z", "%Y%m%d%H%z", "%Y%m%d%H%M%z", "%Y%m%d%H%M%S%z", "%Y%m%d%H%M%S.%f%z", + "%Y", "%Y%m", "%Y%m%d", "%Y%m%d%H", "%Y%m%d%H%M", "%Y%m%d%H%M%S", "%Y%m%d%H%M%S.%f"] + datetime_obj = strptime(datetime_str, fmts) + + if not datetime_obj.tzinfo and (0x0008, 0x0201) in dcm_data: + utc_offset = dcm_data[0x0008, 0x0201].value + datetime_obj = datetime_utc_offset(datetime_obj, utc_offset) if utc_offset else datetime_obj + return datetime_obj def remove_suffix(s: str, suf: str) -> str: """