From 8965171b09b727f239a2a421c94ada03fc8df977 Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Fri, 19 Jul 2024 17:44:44 -0700 Subject: [PATCH] #925: The first half of this task. CSLC Query now will check for compressed cslc satiety. If not satified, the download job parameters are stored in GRQ ES for future evaluation and execution --- data_subscriber/asf_cslc_download.py | 36 +++-------- data_subscriber/cslc_utils.py | 61 +++++++++++++++++++ data_subscriber/query.py | 18 +++++- .../test_run_disp_s1_historical_processing.py | 14 ++--- tools/run_disp_s1_historical_processing.py | 41 +++---------- 5 files changed, 97 insertions(+), 73 deletions(-) diff --git a/data_subscriber/asf_cslc_download.py b/data_subscriber/asf_cslc_download.py index 5274d661..111037b9 100644 --- a/data_subscriber/asf_cslc_download.py +++ b/data_subscriber/asf_cslc_download.py @@ -12,7 +12,7 @@ from data_subscriber.cmr import Collection from data_subscriber.cslc.cslc_static_catalog import CSLCStaticProductCatalog from data_subscriber.download import SessionWithHeaderRedirection -from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle +from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle, get_dependent_compressed_cslcs from data_subscriber.asf_rtc_download import AsfDaacRtcDownload from data_subscriber.cslc.cslc_static_query import CslcStaticCmrQuery from data_subscriber.url import cslc_unique_id @@ -181,37 +181,15 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo os.remove(iono_file) # Determine M Compressed CSLCs by querying compressed cslc GRQ ES --------------> - # Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index) k, m = es_conn.get_k_and_m(args.batch_ids[0]) logger.info(f"{k=}, {m=}") - ''' Search for all previous M compressed CSLCs - prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of - the latest acq cycle index - ''' - prev_day_indices = get_prev_day_indices(latest_acq_cycle_index, frame_id, self.disp_burst_map, args, token, cmr, settings) - - # special case for early sensing time series. Reduce m if there aren't enough sensing times in the database in the first place - # For example, if k was 4 and m was 3, but there are only 4 previous sensing times in the database, then m should be 2 - if len(prev_day_indices) < k * (m - 1): - m = (len(prev_day_indices) // k) + 1 - - for mm in range(0, m-1): # m parameter is inclusive of the current frame at hand - for burst_id in burst_id_set: - ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) #looks like t034_071112_iw3_461 - logger.info("Retrieving Compressed CSLCs for ccslc_m_index: %s", ccslc_m_index) - ccslcs = es_conn.es.query( - index=_C_CSLC_ES_INDEX_PATTERNS, - body={"query": { "bool": { "must": [ - {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) - - # Should have exactly one compressed cslc per acq cycle per burst - if len(ccslcs) != 1: - raise Exception(f"Expected 1 Compressed CSLC for {ccslc_m_index}, got {len(ccslcs)}") - - for ccslc in ccslcs: - c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"]) - # <------------------------- Compressed CSLC look up + ccslcs = get_dependent_compressed_cslcs(frame_id, latest_acq_cycle_index, k, m, args, self.disp_burst_map, es_conn.es) + if ccslcs is False: + raise Exception(f"Failed to get compressed cslc for frame {frame_id} and day index {latest_acq_cycle_index}") + + for ccslc in ccslcs: + c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"]) # Look up bounding box for frame bounding_box = get_bounding_box_for_frame(int(frame_id), self.frame_geo_map) diff --git a/data_subscriber/cslc_utils.py b/data_subscriber/cslc_utils.py index 5cfcce84..eaf256b9 100644 --- a/data_subscriber/cslc_utils.py +++ b/data_subscriber/cslc_utils.py @@ -15,6 +15,8 @@ DISP_FRAME_BURST_MAP_HIST = 'opera-disp-s1-consistent-burst-ids-with-datetimes.json' FRAME_GEO_SIMPLE_JSON = 'frame-geometries-simple.geojson' +_C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*" +_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME = "grq_1_l2_cslc_s1_blocked_downloads" logger = logging.getLogger(__name__) @@ -270,6 +272,65 @@ def parse_cslc_native_id(native_id, burst_to_frames, frame_to_bursts): return burst_id, acquisition_dts, acquisition_cycles, frame_ids +def compressed_cslc_satisfied(frame_id, day_index, k, m, args, frame_to_bursts, eu): + + if get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, frame_to_bursts, eu) == False: + return False + return True + +def get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, disp_burst_map, eu): + ''' Search for all previous M compressed CSLCs + prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of + the latest acq cycle index + ''' + + prev_day_indices = get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None, + None) + + ccslcs = [] + + #special case for early sensing time series + if len(prev_day_indices) < k * (m-1): + m = (len(prev_day_indices) // k ) + 1 + + # Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index) + for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand + for burst_id in disp_burst_map[frame_id].burst_ids: + ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) + ccslcs = eu.query( + index=_C_CSLC_ES_INDEX_PATTERNS, + body={"query": {"bool": {"must": [ + {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) + + # Should have exactly one compressed cslc per acq cycle per burst + if len(ccslcs) != 1: + logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index) + return False + + logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index) + return ccslcs + +def save_blocked_download_job(eu, product_type, params, job_queue, job_name, frame_id, acq_index, k, m): + """Save the blocked download job in the ES index""" + + eu.index_document( + index=_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME, + id = job_name, + body= { + "doc":{ + "job_name": job_name, + "job_queue": job_queue, + "job_params": params, + "job_ts": datetime.now().isoformat(timespec="seconds").replace("+00:00", "Z"), + "product_type": product_type, + "frame_id": frame_id, + "acq_index": acq_index, + "k": k, + "m": m + } + } + ) + def parse_cslc_burst_id(native_id): burst_id, _ = parse_cslc_file_name(native_id) diff --git a/data_subscriber/query.py b/data_subscriber/query.py index 0258603e..92947462 100644 --- a/data_subscriber/query.py +++ b/data_subscriber/query.py @@ -19,7 +19,7 @@ download_from_s3) from data_subscriber.hls.hls_catalog import HLSProductCatalog from data_subscriber.rtc.rtc_download_job_submitter import submit_rtc_download_job_submissions_tasks -from data_subscriber.cslc_utils import split_download_batch_id +from data_subscriber.cslc_utils import split_download_batch_id, compressed_cslc_satisfied, save_blocked_download_job from data_subscriber.url import form_batch_id, _slc_url_to_chunk_id from hysds_commons.job_utils import submit_mozart_job from util.conf_util import SettingsConf @@ -254,17 +254,30 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time logger.info(f"{payload_hash=}") logger.debug(f"{chunk_urls=}") + params = self.create_download_job_params(query_timerange, chunk_batch_ids) + product_type = COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection].lower() if COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection] == ProductType.CSLC: frame_id = split_download_batch_id(chunk_batch_ids[0])[0] acq_indices = [split_download_batch_id(chunk_batch_id)[1] for chunk_batch_id in chunk_batch_ids] job_name = f"job-WF-{product_type}_download-frame-{frame_id}-acq_indices-{min(acq_indices)}-to-{max(acq_indices)}" + + # See if all the compressed cslcs are satisfied. If not, do not submit the job. Instead, save all the job info in ES + # and wait for the next query to come in. Any acquisition index will work because all batches + # require the same compressed cslcs + if not compressed_cslc_satisfied(frame_id, acq_indices[0], self.args.k, self.args.m, self.args, + self.disp_burst_map_hist, self.es_conn.es): + logger.info(f"Not all compressed CSLCs are satisfied so this download job is blocked until they are satisfied") + save_blocked_download_job(self.es_conn.es, product_type, params, self.args.job_queue, job_name, + frame_id, acq_indices[0], self.args.k, self.args.m) + continue + else: job_name = f"job-WF-{product_type}_download-{chunk_batch_ids[0]}" download_job_id = submit_download_job(release_version=self.settings["RELEASE_VERSION"], product_type=product_type, - params=self.create_download_job_params(query_timerange, chunk_batch_ids), + params=params, job_queue=self.args.job_queue, job_name = job_name, payload_hash = payload_hash @@ -278,7 +291,6 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time return job_submission_tasks - def create_download_job_params(self, query_timerange, chunk_batch_ids): args = self.args download_job_params = [ diff --git a/tests/tools/test_run_disp_s1_historical_processing.py b/tests/tools/test_run_disp_s1_historical_processing.py index 976c8c49..92b7e56e 100644 --- a/tests/tools/test_run_disp_s1_historical_processing.py +++ b/tests/tools/test_run_disp_s1_historical_processing.py @@ -58,7 +58,7 @@ def test_form_job_params_basic(): p = generate_p() p.frame_states = generate_initial_frame_states(p.frames) do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == True assert job_name == "data-subscriber-query-timer-historical1_f831-2017-02-15T22:35:24-2017-03-23T23:35:24" @@ -72,7 +72,7 @@ def test_form_job_params_basic(): assert job_params["exclude_regions"] == f'--exclude-regions={EXCLUDE_REGIONS}' assert job_params["frame_id"] == f'--frame-id=831' assert job_params["k"] == f'--k=4' - assert job_params["m"] == f'--m=2' + assert job_params["m"] == f'--m=1' assert next_frame_sensing_position == 4 assert finished == False @@ -84,7 +84,7 @@ def test_form_job_params_early(): p.frame_states = generate_initial_frame_states(p.frames) p.data_start_date = '2018-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert next_frame_sensing_position == 4 assert do_submit == False @@ -97,7 +97,7 @@ def test_form_job_params_late(): p.frame_states = generate_initial_frame_states(p.frames) p.data_end_date = '2015-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == False assert finished == True @@ -106,14 +106,14 @@ def test_form_job_params_no_ccslc(monkeypatch): '''If compressed cslcs are not found, don't process this round and don't increment the position''' mock_ccslc = MagicMock(return_value=False) - monkeypatch.setattr(tools.run_disp_s1_historical_processing, - tools.run_disp_s1_historical_processing.compressed_cslc_satisfied.__name__, mock_ccslc) + monkeypatch.setattr(cslc_utils, + cslc_utils.compressed_cslc_satisfied.__name__, mock_ccslc) p = generate_p() p.frame_states = generate_initial_frame_states(p.frames) p.data_end_date = '2015-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == False assert next_frame_sensing_position == 0 \ No newline at end of file diff --git a/tools/run_disp_s1_historical_processing.py b/tools/run_disp_s1_historical_processing.py index 33447a1a..c853a07a 100755 --- a/tools/run_disp_s1_historical_processing.py +++ b/tools/run_disp_s1_historical_processing.py @@ -31,7 +31,8 @@ disp_burst_map, burst_to_frames, day_indices_to_frames = cslc_utils.localize_disp_frame_burst_hist(cslc_utils.DISP_FRAME_BURST_MAP_HIST) -def proc_once(eu, procs, dryrun = False): +def proc_once(eu, procs, args): + dryrun = args.dry_run job_success = True for proc in procs: @@ -73,7 +74,7 @@ def proc_once(eu, procs, dryrun = False): # Compute job parameters, whether to process or not, and if we're finished do_submit, job_name, job_spec, job_params, job_tags, next_frame_pos, finished = \ - form_job_params(p, int(frame_id), last_frame_processed) + form_job_params(p, int(frame_id), last_frame_processed, args, eu) proc_finished = proc_finished & finished # All frames must be finished for this batch proc to be finished @@ -121,7 +122,7 @@ def proc_once(eu, procs, dryrun = False): return job_success -def form_job_params(p, frame_id, sensing_time_position_zero_based): +def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu): data_start_date = datetime.strptime(p.data_start_date, ES_DATETIME_FORMAT) data_end_date = datetime.strptime(p.data_end_date, ES_DATETIME_FORMAT) @@ -181,9 +182,9 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based): #Query GRQ ES for the previous sensing time day index compressed cslc. If this doesn't exist, we can't process # this frame sensing time yet. So we will not submit job and increment next_sensing_time_position - if compressed_cslc_satisfied(frame_id, + if cslc_utils.compressed_cslc_satisfied(frame_id, disp_burst_map[frame_id].sensing_datetime_days_index[sensing_time_position_zero_based], - p.k, p.m): + p.k, p.m, args, disp_burst_map, eu): next_sensing_time_position = sensing_time_position_zero_based + p.k else: do_submit = False @@ -310,34 +311,6 @@ def generate_initial_frame_states(frames): return frame_states -def compressed_cslc_satisfied(frame_id, day_index, k, m): - '''Look for the compressed cslc records needed to process this frame at day index in GRQ ES''' - - #TODO: This code is mostly identical to asf_cslc_download.py lines circa 200. Refactor into one place - prev_day_indices = cslc_utils.get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None, - None) - - #special case for early sensing time series - if len(prev_day_indices) < k * (m-1): - m = (len(prev_day_indices) // k ) + 1 - - _C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*" - for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand - for burst_id in disp_burst_map[frame_id].burst_ids: - ccslc_m_index = cslc_utils.get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) - ccslcs = eu.query( - index=_C_CSLC_ES_INDEX_PATTERNS, - body={"query": {"bool": {"must": [ - {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) - - # Should have exactly one compressed cslc per acq cycle per burst - if len(ccslcs) != 1: - logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index) - return False - - logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index) - return True - def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT): """ Converts from a datetime string to a datetime object or vice versa @@ -377,5 +350,5 @@ def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT): while (True): batch_procs = eu.query(index=ES_INDEX) # TODO: query for only enabled docs - proc_once(eu, batch_procs, args.dry_run) + proc_once(eu, batch_procs, args) time.sleep(int(args.sleep_secs)) \ No newline at end of file