diff --git a/tools/ops/cmr_audit/cmr_audit_hls.py b/tools/ops/cmr_audit/cmr_audit_hls.py index 71e0a477..1e843860 100644 --- a/tools/ops/cmr_audit/cmr_audit_hls.py +++ b/tools/ops/cmr_audit/cmr_audit_hls.py @@ -7,11 +7,15 @@ import os import re import sys +import urllib.parse from collections import defaultdict +from typing import Union, Iterable import aiohttp import more_itertools from dotenv import dotenv_values +from more_itertools import always_iterable + from tools.ops.cmr_audit.cmr_audit_utils import async_get_cmr_granules, get_cmr_audit_granules @@ -20,7 +24,7 @@ format="%(levelname)7s: %(relativeCreated)7d %(name)s:%(filename)s:%(funcName)s:%(lineno)s - %(message)s", # alternative format which displays time elapsed. # format="%(asctime)s %(levelname)7s %(name)4s:%(filename)8s:%(funcName)22s:%(lineno)3s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - level=logging.DEBUG) + level=logging.INFO) logger = logging.getLogger(__name__) config = { @@ -88,35 +92,46 @@ async def async_get_cmr_granules_hls_s30(temporal_date_start: str, temporal_date platform_short_name=["Sentinel-2A", "Sentinel-2B"]) +async def async_get_cmr_dswx(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): + return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L3_DSWX-HLS_V1", + temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end) + -async def async_get_cmr_dswx(dswx_native_id_patterns: set): - logger.debug(f"entry({len(dswx_native_id_patterns)=:,})") +async def async_get_cmr( + native_id_patterns: set, + collection_short_name: Union[str, Iterable[str]], + temporal_date_start: str, temporal_date_end: str, + chunk_size=1000 +): + logger.debug(f"entry({len(native_id_patterns)=:,})") # batch granules-requests due to CMR limitation. 1000 native-id clauses seems to be near the limit. - dswx_native_id_patterns = more_itertools.always_iterable(dswx_native_id_patterns) - dswx_native_id_pattern_batches = list(more_itertools.chunked(dswx_native_id_patterns, 1000)) # 1000 == 55,100 length + native_id_patterns = more_itertools.always_iterable(native_id_patterns) + native_id_pattern_batches = list(more_itertools.chunked(native_id_patterns, chunk_size)) # 1000 == 55,100 length request_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json" sem = asyncio.Semaphore(15) async with aiohttp.ClientSession() as session: post_cmr_tasks = [] - for i, dswx_native_id_pattern_batch in enumerate(dswx_native_id_pattern_batches, start=1): - dswx_native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(dswx_native_id_pattern_batch) + for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): + # native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch) request_body = ( "provider=POCLOUD" - "&short_name[]=OPERA_L3_DSWX-HLS_PROVISIONAL_V1" - "&options[native-id][pattern]=true" - f"{dswx_native_id_patterns_query_params}" + f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}' + # "&options[native-id][pattern]=true" + # f"{native_id_patterns_query_params}" + f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}" ) - logger.debug(f"Creating request task {i} of {len(dswx_native_id_pattern_batches)}") + logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}") post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem)) + break logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}") # issue requests in batches logger.debug("Batching tasks") - dswx_granules = set() + cmr_granules = set() task_chunks = list(more_itertools.chunked(post_cmr_tasks, len(post_cmr_tasks))) # CMR recommends 2-5 threads. for i, task_chunk in enumerate(task_chunks, start=1): logger.info(f"Processing batch {i} of {len(task_chunks)}") @@ -125,8 +140,8 @@ async def async_get_cmr_dswx(dswx_native_id_patterns: set): await asyncio.gather(*task_chunk, return_exceptions=False) ) for post_cmr_tasks_result in post_cmr_tasks_results: - dswx_granules.update(post_cmr_tasks_result[0]) - return dswx_granules + cmr_granules.update(post_cmr_tasks_result[0]) + return cmr_granules def hls_granule_ids_to_dswx_native_id_patterns(cmr_granules: set[str], input_to_outputs_map: defaultdict, output_to_inputs_map: defaultdict): @@ -222,7 +237,7 @@ async def run(argv: list[str]): ) logger.info("Querying CMR for list of expected DSWx granules") - cmr_dswx_products = await async_get_cmr_dswx(dswx_native_id_patterns) + cmr_dswx_products = await async_get_cmr_dswx(dswx_native_id_patterns, temporal_date_start=cmr_start_dt_str, temporal_date_end=cmr_end_dt_str) cmr_dswx_prefix_expected = {prefix[:-1] for prefix in dswx_native_id_patterns} cmr_dswx_prefix_actual = dswx_native_ids_to_prefixes(cmr_dswx_products) @@ -233,7 +248,8 @@ async def run(argv: list[str]): ####################################################################### # logger.debug(f"{pstr(missing_cmr_dswx_granules_prefixes)=!s}") - missing_cmr_granules_hls = set(functools.reduce(set.union, [output_dswx_to_inputs_hls_map[prefix] for prefix in missing_cmr_dswx_granules_prefixes])) + missing_cmr_granules_hls = [output_dswx_to_inputs_hls_map[prefix] for prefix in missing_cmr_dswx_granules_prefixes] + missing_cmr_granules_hls = set(functools.reduce(set.union, missing_cmr_granules_hls)) if missing_cmr_granules_hls else set() # logger.debug(f"{pstr(missing_cmr_granules)=!s}") logger.info(f"Expected input (granules): {len(cmr_granules_hls)=:,}") diff --git a/tools/ops/cmr_audit/cmr_audit_slc.py b/tools/ops/cmr_audit/cmr_audit_slc.py index f7cf61ec..f1153b85 100644 --- a/tools/ops/cmr_audit/cmr_audit_slc.py +++ b/tools/ops/cmr_audit/cmr_audit_slc.py @@ -95,26 +95,24 @@ async def async_get_cmr_granules_slc_s1b(temporal_date_start: str, temporal_date async def async_get_cmr_cslc(cslc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): - return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", collection_concept_id="C1257337155-ASF", + return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100) async def async_get_cmr_rtc(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): - return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", collection_concept_id="C1257337044-ASF", + return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100) async def async_get_cmr( native_id_patterns: set, collection_short_name: Union[str, Iterable[str]], - collection_concept_id: str, temporal_date_start: str, temporal_date_end: str, chunk_size=1000): # 1000 ~= 55,100 length """ Issue CMR query requests. :param native_id_patterns: the native ID patterns to use in the query. Corresponds to query param `&native-id[]`. Allows use of wildcards "*" and "?", but is descouraged. :param collection_short_name: CMR collection short name. Typically found in PCM's settings.yaml - :param collection_concept_id: CMR collection concept ID for faster queries. :param temporal_date_start: temporal start date. Corresponds to query param `&temporal[]=,` :param temporal_date_end: temporal end date. Corresponds to query param `&temporal[]=,` :param chunk_size: split queries across N native-id patterns per request. CMR request bodies have an implicit size limit of 55,100 length. Must be a value in the interval [1,1000]. @@ -130,8 +128,8 @@ async def async_get_cmr( sem = asyncio.Semaphore(15) async with aiohttp.ClientSession() as session: post_cmr_tasks = [] - for i, rtc_native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): - native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(rtc_native_id_pattern_batch) + for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): + # native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch) request_body = ( "provider=ASF" @@ -139,12 +137,13 @@ async def async_get_cmr( "&platform[]=Sentinel-1A" "&platform[]=Sentinel-1B" "&bounding_box=-180,-60,180,90" - "&options[native-id][pattern]=true" - f"{native_id_patterns_query_params}" + # "&options[native-id][pattern]=true" + # f"{native_id_patterns_query_params}" f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}" ) logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}") post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem)) + break logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}") # issue requests in batches @@ -184,7 +183,7 @@ def slc_granule_ids_to_cslc_native_id_patterns(cmr_granules: set[str], input_to_ cslc_acquisition_dt_str = m.group("start_ts") # OPERA_L2_CSLC-S1_*_20231124T124529Z_*_S1* - rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*' + rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*v1.1' rtc_native_id_patterns.add(rtc_native_id_pattern) # bi-directional mapping of HLS-DSWx inputs and outputs @@ -322,16 +321,10 @@ async def run(argv: list[str]): # logger.debug(f"{pstr(missing_rtc_native_id_patterns)=!s}") missing_cmr_granules_slc_cslc = [output_cslc_to_inputs_slc_map[native_id_pattern] for native_id_pattern in missing_cslc_native_id_patterns] - if not missing_cmr_granules_slc_cslc: - missing_cmr_granules_slc_cslc = set() - else: - missing_cmr_granules_slc_cslc = set(functools.reduce(set.union, missing_cmr_granules_slc_cslc)) + missing_cmr_granules_slc_cslc = set(functools.reduce(set.union, missing_cmr_granules_slc_cslc)) if missing_cmr_granules_slc_cslc else set() missing_cmr_granules_slc_rtc = [output_rtc_to_inputs_slc_map[native_id_pattern] for native_id_pattern in missing_rtc_native_id_patterns] - if not missing_cmr_granules_slc_rtc: - missing_cmr_granules_slc_rtc = set() - else: - missing_cmr_granules_slc_rtc = set(functools.reduce(set.union, missing_cmr_granules_slc_rtc)) + missing_cmr_granules_slc_rtc = set(functools.reduce(set.union, missing_cmr_granules_slc_rtc)) if missing_cmr_granules_slc_rtc else set() # logger.debug(f"{pstr(missing_slc)=!s}") logger.info(f"Expected input (granules): {len(cmr_granules_slc)=:,}")