diff --git a/pioreactor/actions/leader/export_experiment_data.py b/pioreactor/actions/leader/export_experiment_data.py index 61b86e21..80b53b55 100644 --- a/pioreactor/actions/leader/export_experiment_data.py +++ b/pioreactor/actions/leader/export_experiment_data.py @@ -139,6 +139,10 @@ def export_experiment_data( click.echo("output should end with .zip") raise click.Abort() + if len(dataset_names) == 0: + click.echo("At least one dataset name must be provided.") + raise click.Abort() + logger = create_logger("export_experiment_data") logger.info( f"Starting export of dataset{'s' if len(dataset_names) > 1 else ''}: {', '.join(dataset_names)}." diff --git a/pioreactor/background_jobs/base.py b/pioreactor/background_jobs/base.py index 670b4e72..3a64c04b 100644 --- a/pioreactor/background_jobs/base.py +++ b/pioreactor/background_jobs/base.py @@ -821,7 +821,6 @@ def _disconnect_from_mqtt_clients(self) -> None: def _clean_up_resources(self) -> None: self._clear_caches() self._remove_from_job_manager() - # Explicitly cleanup MQTT resources... self._disconnect_from_mqtt_clients() self._disconnect_from_loggers() @@ -979,6 +978,7 @@ def __init__(self, unit: str, experiment: str, plugin_name: str) -> None: super().__init__(unit, experiment, source=plugin_name) + class BackgroundJobWithDodging(_BackgroundJob): """ This utility class allows for a change in behaviour when an OD reading is about to taken. Example: shutting @@ -1030,64 +1030,88 @@ def __init__(self, *args, source="app", **kwargs) -> None: f"Required section '{self.job_name}.config' does not exist in the configuration." ) + self.sneak_in_timer = RepeatedTimer(5, self._noop, job_name=self.job_name, logger=self.logger) # placeholder? self.add_to_published_settings("enable_dodging_od", {"datatype": "boolean", "settable": True}) - self.set_enable_dodging_od( - config.getboolean(f"{self.job_name}.config", "enable_dodging_od", fallback="True") - ) + self.add_to_published_settings("currently_dodging_od", {"datatype": "boolean", "settable": False}) - def action_to_do_before_od_reading(self) -> None: - raise NotImplementedError() + def __post__init__(self): + super().__post__init__() + self.set_enable_dodging_od(config.getboolean(f"{self.job_name}.config", "enable_dodging_od", fallback="True")) + self.start_passive_listeners() + + def _noop(self): + pass + + def set_currently_dodging_od(self, value: bool): + if self.set_currently_dodging_od == value: + # noop + return + + self.currently_dodging_od = value + if self.currently_dodging_od: + self.initialize_dodging_operation() # user defined + self._action_to_do_before_od_reading = self.action_to_do_before_od_reading + self._action_to_do_after_od_reading = self.action_to_do_after_od_reading + self._setup_timer() + else: + self.initialize_continuous_operation() # user defined + + self._action_to_do_before_od_reading = self._noop + self._action_to_do_after_od_reading = self._noop + self.sneak_in_timer.cancel() + + + def set_enable_dodging_od(self, value: bool): + self.enable_dodging_od = value + + if self.is_od_job_running() and self.enable_dodging_od: + self.logger.info("Will attempt to dodge OD readings.") + self.set_currently_dodging_od(True) + elif self.is_od_job_running() and not self.enable_dodging_od: + self.logger.info("Running continuously through OD readings.") + self.set_currently_dodging_od(False) + elif not self.is_od_job_running() and not self.enable_dodging_od: + self.logger.info("Running continuously through OD readings.") + self.set_currently_dodging_od(False) + elif not self.is_od_job_running() and self.enable_dodging_od: + self.logger.info("Will attempt to dodge later OD readings.") + self.set_currently_dodging_od(False) + + def is_od_job_running(self) -> bool: + return is_pio_job_running("od_reading") def action_to_do_after_od_reading(self) -> None: raise NotImplementedError() - def _listen_for_od_reading(self) -> None: + def action_to_do_before_od_reading(self) -> None: + raise NotImplementedError() + + def initialize_dodging_operation(self) -> None: + pass + + def initialize_continuous_operation(self) -> None: + pass + + def start_passive_listeners(self) -> None: self.subscribe_and_callback( - self._setup_actions, + self._od_reading_changed_status, f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval", ) - def set_enable_dodging_od(self, value: bool) -> None: - self.enable_dodging_od = value + def _od_reading_changed_status(self, msg): if self.enable_dodging_od: - self.logger.info("Will attempt to stop during OD readings.") - self._listen_for_od_reading() - else: - self.logger.info("Running continuously through OD readings.") - if hasattr(self, "sneak_in_timer"): - self.sneak_in_timer.cancel() - try: - self.action_to_do_after_od_reading() - except Exception: - pass - self.sub_client.unsubscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval") - - def _setup_actions(self, msg: pt.MQTTMessage) -> None: - if not msg.payload: - # OD reading stopped: reset and exit - if hasattr(self, "sneak_in_timer"): - self.sneak_in_timer.cancel() - self.action_to_do_after_od_reading() - self.sub_client.unsubscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval") - return + if msg.payload: + # turned off + self.set_currently_dodging_od(True) + else: + self.set_currently_dodging_od(False) - # OD found - revert to paused state - # we put this in a try for the following reason: - # if od reading is running, and we start Dodging job, the _setup_actions callback is fired - # _after_ this classes __init__ is done, but before the subclasses __init__. If - # action_to_do_before_od_reading references things in the subclasses __init__, it will - # fail. - self.logger.debug("OD reading data is found in MQTT. Dodging!") - try: - self.action_to_do_before_od_reading() - except Exception: - pass + def _setup_timer(self) -> None: - try: - self.sneak_in_timer.cancel() - except AttributeError: - pass + self.logger.debug("OD reading present. Dodging!") + + self.sneak_in_timer.cancel() post_delay = config.getfloat(f"{self.job_name}.config", "post_delay_duration", fallback=1.0) pre_delay = config.getfloat(f"{self.job_name}.config", "pre_delay_duration", fallback=1.5) @@ -1103,23 +1127,23 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None: return self.is_after_period = True - self.action_to_do_after_od_reading() + self._action_to_do_after_od_reading() sleep(ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay)) self.is_after_period = False - self.action_to_do_before_od_reading() + self._action_to_do_before_od_reading() # this could fail in the following way: # in the same experiment, the od_reading fails catastrophically so that the ADC attributes are never # cleared. Later, this job starts, and it will pick up the _old_ ADC attributes. ads_start_time_msg = subscribe( - f"pioreactor/{self.unit}/{self.experiment}/od_reading/first_od_obs_time" + f"pioreactor/{self.unit}/{self.experiment}/od_reading/first_od_obs_time", timeout=5 ) if ads_start_time_msg and ads_start_time_msg.payload: ads_start_time = float(ads_start_time_msg.payload) else: return - ads_interval_msg = subscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval") + ads_interval_msg = subscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval", timeout=5) if ads_interval_msg and ads_interval_msg.payload: ads_interval = float(ads_interval_msg.payload) else: @@ -1147,6 +1171,7 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None: sleep(time_to_next_ads_reading + (post_delay + self.OD_READING_DURATION)) self.sneak_in_timer.start() + def on_sleeping(self) -> None: try: self.sneak_in_timer.pause() @@ -1166,6 +1191,8 @@ def on_sleeping_to_ready(self) -> None: pass + + class BackgroundJobWithDodgingContrib(BackgroundJobWithDodging): """ Plugin jobs should inherit from this class. diff --git a/pioreactor/background_jobs/od_reading.py b/pioreactor/background_jobs/od_reading.py index b16ca451..55a4e098 100644 --- a/pioreactor/background_jobs/od_reading.py +++ b/pioreactor/background_jobs/od_reading.py @@ -1125,9 +1125,6 @@ def on_disconnected(self) -> None: except Exception: pass - # tech debt: clear _pre and _post - self._pre_read.clear() - self._post_read.clear() def _get_ir_led_channel_from_configuration(self) -> pt.LedChannel: try: diff --git a/pioreactor/background_jobs/stirring.py b/pioreactor/background_jobs/stirring.py index 8aec2cb4..8410b9d3 100644 --- a/pioreactor/background_jobs/stirring.py +++ b/pioreactor/background_jobs/stirring.py @@ -17,7 +17,7 @@ from pioreactor import exc from pioreactor import hardware from pioreactor import structs -from pioreactor.background_jobs.base import BackgroundJob +from pioreactor.background_jobs.base import BackgroundJobWithDodging from pioreactor.config import config from pioreactor.pubsub import subscribe from pioreactor.utils import clamp @@ -165,7 +165,7 @@ def estimate(self, seconds_to_observe: float) -> float: return round(self._running_count * 60 / self._running_sum, 1) -class Stirrer(BackgroundJob): +class Stirrer(BackgroundJobWithDodging): """ Parameters ------------ @@ -280,6 +280,36 @@ def __init__( logger=self.logger, ) + def action_to_do_before_od_reading(self): + self.logger.debug("stop stirring") + self.set_duty_cycle(0) + + def action_to_do_after_od_reading(self): + self.logger.debug("starting stirring") + self.set_duty_cycle(self._estimate_duty_cycle) + sleep(1) + self.poll_and_update_dc() + + def initialize_dodging_operation(self): + self.rpm_check_repeated_thread = RepeatedTimer( + 1_000, + lambda *args: None, + job_name=self.job_name, + logger=self.logger, + ) + + def initialize_continuous_operation(self): + # set up thread to periodically check the rpm + self.rpm_check_repeated_thread = RepeatedTimer( + config.getfloat("stirring.config", "duration_between_updates_seconds", fallback=23.0), + self.poll_and_update_dc, + job_name=self.job_name, + run_immediately=True, + run_after=6, + logger=self.logger, + ) + + def initialize_rpm_to_dc_lookup(self) -> Callable: if self.rpm_calculator is None: # if we can't track RPM, no point in adjusting DC, use current value @@ -308,6 +338,7 @@ def initialize_rpm_to_dc_lookup(self) -> Callable: return lambda rpm: self._estimate_duty_cycle def on_disconnected(self) -> None: + super().on_disconnected() with suppress(AttributeError): self.rpm_check_repeated_thread.cancel() with suppress(AttributeError): @@ -417,6 +448,7 @@ def on_ready_to_sleeping(self) -> None: self.set_duty_cycle(0.0) def on_sleeping_to_ready(self) -> None: + super().on_sleeping_to_ready() self.duty_cycle = self._estimate_duty_cycle self.rpm_check_repeated_thread.unpause() self.start_stirring() @@ -459,7 +491,7 @@ def block_until_rpm_is_close_to_target( """ - if self.rpm_calculator is None or self.target_rpm is None: # or is_testing_env(): + if self.rpm_calculator is None or self.target_rpm is None or self.currently_dodging_od: # or is_testing_env(): # can't block if we aren't recording the RPM return False diff --git a/pioreactor/tests/test_background_job.py b/pioreactor/tests/test_background_job.py index 2cb29be6..7c2ff137 100644 --- a/pioreactor/tests/test_background_job.py +++ b/pioreactor/tests/test_background_job.py @@ -369,7 +369,7 @@ def __init__(self, unit, experiment): assert msg is not None -def test_dodging() -> None: +def test_dodging_order() -> None: config["just_pause.config"] = {} config["just_pause.config"]["post_delay_duration"] = "0.75" config["just_pause.config"]["pre_delay_duration"] = "0.25" @@ -396,22 +396,36 @@ def action_to_do_before_od_reading(self) -> None: def action_to_do_after_od_reading(self) -> None: self.logger.notice(f" Unpausing at {time.time()} 🟢") - st = start_od_reading( - "90", - None, - unit=get_unit_name(), - experiment="test_dodging", - fake_data=True, - use_calibration=False, - ) - time.sleep(5) - with collect_all_logs_of_level("NOTICE", unit=get_unit_name(), experiment="test_dodging") as bucket: + with start_od_reading( + "90", + None, + unit=get_unit_name(), + experiment="test_dodging", + fake_data=True, + use_calibration=False, + ): + time.sleep(5) + with JustPause(): + time.sleep(26) + assert len(bucket) > 4, bucket + + with JustPause(): - time.sleep(26) - assert len(bucket) > 4, bucket + time.sleep(6) + with start_od_reading( + "90", + None, + unit=get_unit_name(), + experiment="test_dodging", + fake_data=True, + use_calibration=False, + ): + time.sleep(26) + + + - st.clean_up() ODReader._post_read = [] ODReader._pre_read = [] @@ -455,8 +469,8 @@ def action_to_do_after_od_reading(self) -> None: assert len(bucket) == 0 -def test_disabled_dodging() -> None: - exp = "test_disabled_dodging" +def test_disabling_dodging() -> None: + exp = "test_disabling_dodging" config["just_pause.config"] = {} config["just_pause.config"]["post_delay_duration"] = "0.2" @@ -471,38 +485,46 @@ def __init__(self) -> None: super().__init__(unit=get_unit_name(), experiment=exp) def action_to_do_before_od_reading(self) -> None: + self.test = False self.logger.notice("Pausing") def action_to_do_after_od_reading(self) -> None: + self.test = True self.logger.notice("Unpausing") + def initialize_dodging_operation(self): + self.test = False + + def initialize_continuous_operation(self): + self.test = True + with collect_all_logs_of_level("NOTICE", unit=get_unit_name(), experiment=exp) as bucket: - jp = JustPause() - assert set(jp.published_settings.keys()) == set(["test", "state", "enable_dodging_od"]) + with JustPause() as jp: + time.sleep(2) + with start_od_reading( + "90", + None, + interval=5, # needed + unit=get_unit_name(), + experiment=exp, + fake_data=True, + use_calibration=False, + ) as od: - od = start_od_reading( - "90", - None, - interval=5, # needed - unit=get_unit_name(), - experiment=exp, - fake_data=True, - use_calibration=False, - ) - time.sleep(5) - jp.set_enable_dodging_od(False) - time.sleep(20) - assert len(bucket) == 2 + assert set(jp.published_settings.keys()) == set(["test", "state", "enable_dodging_od", "currently_dodging_od"]) + time.sleep(20) - jp.set_enable_dodging_od(True) - time.sleep(12) - assert len(bucket) == 4 + assert len(bucket) == 4 - od.clean_up() - jp.clean_up() + jp.set_enable_dodging_od(False) + assert jp.test == True + time.sleep(20) + jp.set_enable_dodging_od(True) + time.sleep(12) -def test_disabled_dodging_will_start_action_to_do_after_od_reading() -> None: + +def test_disabled_dodging_will_start_continuous_operation() -> None: exp = "test_disabled_dodging_will_start_action_to_do_after_od_reading" config["just_pause.config"] = {} @@ -516,10 +538,10 @@ class JustPause(BackgroundJobWithDodging): def __init__(self) -> None: super().__init__(unit=get_unit_name(), experiment=exp) - def action_to_do_before_od_reading(self) -> None: + def initialize_dodging_operation(self) -> None: self.logger.notice("NOPE") - def action_to_do_after_od_reading(self) -> None: + def initialize_continuous_operation(self) -> None: self.logger.notice("OK") with collect_all_logs_of_level("NOTICE", unit=get_unit_name(), experiment=exp) as bucket: