Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue 1041: feat(audit): query improvements #1044

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions tools/ops/cmr_audit/cmr_audit_hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
import os
import re
import sys
import urllib.parse
from collections import defaultdict
from typing import Union, Iterable

import aiohttp
import more_itertools
from dotenv import dotenv_values
from more_itertools import always_iterable


from tools.ops.cmr_audit.cmr_audit_utils import async_get_cmr_granules, get_cmr_audit_granules

Expand All @@ -20,7 +24,7 @@
format="%(levelname)7s: %(relativeCreated)7d %(name)s:%(filename)s:%(funcName)s:%(lineno)s - %(message)s", # alternative format which displays time elapsed.
# format="%(asctime)s %(levelname)7s %(name)4s:%(filename)8s:%(funcName)22s:%(lineno)3s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG)
level=logging.INFO)
logger = logging.getLogger(__name__)

config = {
Expand Down Expand Up @@ -88,35 +92,46 @@ async def async_get_cmr_granules_hls_s30(temporal_date_start: str, temporal_date
platform_short_name=["Sentinel-2A", "Sentinel-2B"])


async def async_get_cmr_dswx(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L3_DSWX-HLS_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end)


async def async_get_cmr_dswx(dswx_native_id_patterns: set):
logger.debug(f"entry({len(dswx_native_id_patterns)=:,})")
async def async_get_cmr(
native_id_patterns: set,
collection_short_name: Union[str, Iterable[str]],
temporal_date_start: str, temporal_date_end: str,
chunk_size=1000
):
logger.debug(f"entry({len(native_id_patterns)=:,})")

# batch granules-requests due to CMR limitation. 1000 native-id clauses seems to be near the limit.
dswx_native_id_patterns = more_itertools.always_iterable(dswx_native_id_patterns)
dswx_native_id_pattern_batches = list(more_itertools.chunked(dswx_native_id_patterns, 1000)) # 1000 == 55,100 length
native_id_patterns = more_itertools.always_iterable(native_id_patterns)
native_id_pattern_batches = list(more_itertools.chunked(native_id_patterns, chunk_size)) # 1000 == 55,100 length

request_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json"

sem = asyncio.Semaphore(15)
async with aiohttp.ClientSession() as session:
post_cmr_tasks = []
for i, dswx_native_id_pattern_batch in enumerate(dswx_native_id_pattern_batches, start=1):
dswx_native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(dswx_native_id_pattern_batch)
for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
# native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch)

request_body = (
"provider=POCLOUD"
"&short_name[]=OPERA_L3_DSWX-HLS_PROVISIONAL_V1"
"&options[native-id][pattern]=true"
f"{dswx_native_id_patterns_query_params}"
f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}'
# "&options[native-id][pattern]=true"
# f"{native_id_patterns_query_params}"
f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}"
)
logger.debug(f"Creating request task {i} of {len(dswx_native_id_pattern_batches)}")
logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}")
post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem))
break
logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}")

# issue requests in batches
logger.debug("Batching tasks")
dswx_granules = set()
cmr_granules = set()
task_chunks = list(more_itertools.chunked(post_cmr_tasks, len(post_cmr_tasks))) # CMR recommends 2-5 threads.
for i, task_chunk in enumerate(task_chunks, start=1):
logger.info(f"Processing batch {i} of {len(task_chunks)}")
Expand All @@ -125,8 +140,8 @@ async def async_get_cmr_dswx(dswx_native_id_patterns: set):
await asyncio.gather(*task_chunk, return_exceptions=False)
)
for post_cmr_tasks_result in post_cmr_tasks_results:
dswx_granules.update(post_cmr_tasks_result[0])
return dswx_granules
cmr_granules.update(post_cmr_tasks_result[0])
return cmr_granules


def hls_granule_ids_to_dswx_native_id_patterns(cmr_granules: set[str], input_to_outputs_map: defaultdict, output_to_inputs_map: defaultdict):
Expand Down Expand Up @@ -222,7 +237,7 @@ async def run(argv: list[str]):
)

logger.info("Querying CMR for list of expected DSWx granules")
cmr_dswx_products = await async_get_cmr_dswx(dswx_native_id_patterns)
cmr_dswx_products = await async_get_cmr_dswx(dswx_native_id_patterns, temporal_date_start=cmr_start_dt_str, temporal_date_end=cmr_end_dt_str)

cmr_dswx_prefix_expected = {prefix[:-1] for prefix in dswx_native_id_patterns}
cmr_dswx_prefix_actual = dswx_native_ids_to_prefixes(cmr_dswx_products)
Expand All @@ -233,7 +248,8 @@ async def run(argv: list[str]):
#######################################################################
# logger.debug(f"{pstr(missing_cmr_dswx_granules_prefixes)=!s}")

missing_cmr_granules_hls = set(functools.reduce(set.union, [output_dswx_to_inputs_hls_map[prefix] for prefix in missing_cmr_dswx_granules_prefixes]))
missing_cmr_granules_hls = [output_dswx_to_inputs_hls_map[prefix] for prefix in missing_cmr_dswx_granules_prefixes]
missing_cmr_granules_hls = set(functools.reduce(set.union, missing_cmr_granules_hls)) if missing_cmr_granules_hls else set()

# logger.debug(f"{pstr(missing_cmr_granules)=!s}")
logger.info(f"Expected input (granules): {len(cmr_granules_hls)=:,}")
Expand Down
27 changes: 10 additions & 17 deletions tools/ops/cmr_audit/cmr_audit_slc.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,24 @@ async def async_get_cmr_granules_slc_s1b(temporal_date_start: str, temporal_date


async def async_get_cmr_cslc(cslc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", collection_concept_id="C1257337155-ASF",
return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100)


async def async_get_cmr_rtc(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str):
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", collection_concept_id="C1257337044-ASF",
return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1",
temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100)


async def async_get_cmr(
native_id_patterns: set,
collection_short_name: Union[str, Iterable[str]],
collection_concept_id: str,
temporal_date_start: str, temporal_date_end: str,
chunk_size=1000): # 1000 ~= 55,100 length
"""
Issue CMR query requests.
:param native_id_patterns: the native ID patterns to use in the query. Corresponds to query param `&native-id[]`. Allows use of wildcards "*" and "?", but is descouraged.
:param collection_short_name: CMR collection short name. Typically found in PCM's settings.yaml
:param collection_concept_id: CMR collection concept ID for faster queries.
:param temporal_date_start: temporal start date. Corresponds to query param `&temporal[]=<start>,<end>`
:param temporal_date_end: temporal end date. Corresponds to query param `&temporal[]=<start>,<end>`
:param chunk_size: split queries across N native-id patterns per request. CMR request bodies have an implicit size limit of 55,100 length. Must be a value in the interval [1,1000].
Expand All @@ -130,21 +128,22 @@ async def async_get_cmr(
sem = asyncio.Semaphore(15)
async with aiohttp.ClientSession() as session:
post_cmr_tasks = []
for i, rtc_native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(rtc_native_id_pattern_batch)
for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1):
# native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch)

request_body = (
"provider=ASF"
f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}'
"&platform[]=Sentinel-1A"
"&platform[]=Sentinel-1B"
"&bounding_box=-180,-60,180,90"
"&options[native-id][pattern]=true"
f"{native_id_patterns_query_params}"
# "&options[native-id][pattern]=true"
# f"{native_id_patterns_query_params}"
f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}"
)
logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}")
post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem))
break
logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}")

# issue requests in batches
Expand Down Expand Up @@ -184,7 +183,7 @@ def slc_granule_ids_to_cslc_native_id_patterns(cmr_granules: set[str], input_to_
cslc_acquisition_dt_str = m.group("start_ts")

# OPERA_L2_CSLC-S1_*_20231124T124529Z_*_S1*
rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*'
rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*v1.1'
rtc_native_id_patterns.add(rtc_native_id_pattern)

# bi-directional mapping of HLS-DSWx inputs and outputs
Expand Down Expand Up @@ -322,16 +321,10 @@ async def run(argv: list[str]):
# logger.debug(f"{pstr(missing_rtc_native_id_patterns)=!s}")

missing_cmr_granules_slc_cslc = [output_cslc_to_inputs_slc_map[native_id_pattern] for native_id_pattern in missing_cslc_native_id_patterns]
if not missing_cmr_granules_slc_cslc:
missing_cmr_granules_slc_cslc = set()
else:
missing_cmr_granules_slc_cslc = set(functools.reduce(set.union, missing_cmr_granules_slc_cslc))
missing_cmr_granules_slc_cslc = set(functools.reduce(set.union, missing_cmr_granules_slc_cslc)) if missing_cmr_granules_slc_cslc else set()

missing_cmr_granules_slc_rtc = [output_rtc_to_inputs_slc_map[native_id_pattern] for native_id_pattern in missing_rtc_native_id_patterns]
if not missing_cmr_granules_slc_rtc:
missing_cmr_granules_slc_rtc = set()
else:
missing_cmr_granules_slc_rtc = set(functools.reduce(set.union, missing_cmr_granules_slc_rtc))
missing_cmr_granules_slc_rtc = set(functools.reduce(set.union, missing_cmr_granules_slc_rtc)) if missing_cmr_granules_slc_rtc else set()

# logger.debug(f"{pstr(missing_slc)=!s}")
logger.info(f"Expected input (granules): {len(cmr_granules_slc)=:,}")
Expand Down