Skip to content

Commit

Permalink
try this new dodging system
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Nov 29, 2024
1 parent d7971f4 commit eb7b265
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 94 deletions.
4 changes: 4 additions & 0 deletions pioreactor/actions/leader/export_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def export_experiment_data(
click.echo("output should end with .zip")
raise click.Abort()

if len(dataset_names) == 0:
click.echo("At least one dataset name must be provided.")
raise click.Abort()

logger = create_logger("export_experiment_data")
logger.info(
f"Starting export of dataset{'s' if len(dataset_names) > 1 else ''}: {', '.join(dataset_names)}."
Expand Down
125 changes: 76 additions & 49 deletions pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,6 @@ def _disconnect_from_mqtt_clients(self) -> None:
def _clean_up_resources(self) -> None:
self._clear_caches()
self._remove_from_job_manager()
# Explicitly cleanup MQTT resources...
self._disconnect_from_mqtt_clients()
self._disconnect_from_loggers()

Expand Down Expand Up @@ -979,6 +978,7 @@ def __init__(self, unit: str, experiment: str, plugin_name: str) -> None:
super().__init__(unit, experiment, source=plugin_name)



class BackgroundJobWithDodging(_BackgroundJob):
"""
This utility class allows for a change in behaviour when an OD reading is about to taken. Example: shutting
Expand Down Expand Up @@ -1030,64 +1030,88 @@ def __init__(self, *args, source="app", **kwargs) -> None:
f"Required section '{self.job_name}.config' does not exist in the configuration."
)

self.sneak_in_timer = RepeatedTimer(5, self._noop, job_name=self.job_name, logger=self.logger) # placeholder?
self.add_to_published_settings("enable_dodging_od", {"datatype": "boolean", "settable": True})
self.set_enable_dodging_od(
config.getboolean(f"{self.job_name}.config", "enable_dodging_od", fallback="True")
)
self.add_to_published_settings("currently_dodging_od", {"datatype": "boolean", "settable": False})

def action_to_do_before_od_reading(self) -> None:
raise NotImplementedError()
def __post__init__(self):
super().__post__init__()
self.set_enable_dodging_od(config.getboolean(f"{self.job_name}.config", "enable_dodging_od", fallback="True"))
self.start_passive_listeners()

def _noop(self):
pass

def set_currently_dodging_od(self, value: bool):
if self.set_currently_dodging_od == value:
# noop
return

self.currently_dodging_od = value
if self.currently_dodging_od:
self.initialize_dodging_operation() # user defined
self._action_to_do_before_od_reading = self.action_to_do_before_od_reading
self._action_to_do_after_od_reading = self.action_to_do_after_od_reading
self._setup_timer()
else:
self.initialize_continuous_operation() # user defined

self._action_to_do_before_od_reading = self._noop
self._action_to_do_after_od_reading = self._noop
self.sneak_in_timer.cancel()


def set_enable_dodging_od(self, value: bool):
self.enable_dodging_od = value

if self.is_od_job_running() and self.enable_dodging_od:
self.logger.info("Will attempt to dodge OD readings.")
self.set_currently_dodging_od(True)
elif self.is_od_job_running() and not self.enable_dodging_od:
self.logger.info("Running continuously through OD readings.")
self.set_currently_dodging_od(False)
elif not self.is_od_job_running() and not self.enable_dodging_od:
self.logger.info("Running continuously through OD readings.")
self.set_currently_dodging_od(False)
elif not self.is_od_job_running() and self.enable_dodging_od:
self.logger.info("Will attempt to dodge later OD readings.")
self.set_currently_dodging_od(False)

def is_od_job_running(self) -> bool:
return is_pio_job_running("od_reading")

def action_to_do_after_od_reading(self) -> None:
raise NotImplementedError()

def _listen_for_od_reading(self) -> None:
def action_to_do_before_od_reading(self) -> None:
raise NotImplementedError()

def initialize_dodging_operation(self) -> None:
pass

def initialize_continuous_operation(self) -> None:
pass

def start_passive_listeners(self) -> None:
self.subscribe_and_callback(
self._setup_actions,
self._od_reading_changed_status,
f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval",
)

def set_enable_dodging_od(self, value: bool) -> None:
self.enable_dodging_od = value
def _od_reading_changed_status(self, msg):
if self.enable_dodging_od:
self.logger.info("Will attempt to stop during OD readings.")
self._listen_for_od_reading()
else:
self.logger.info("Running continuously through OD readings.")
if hasattr(self, "sneak_in_timer"):
self.sneak_in_timer.cancel()
try:
self.action_to_do_after_od_reading()
except Exception:
pass
self.sub_client.unsubscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval")

def _setup_actions(self, msg: pt.MQTTMessage) -> None:
if not msg.payload:
# OD reading stopped: reset and exit
if hasattr(self, "sneak_in_timer"):
self.sneak_in_timer.cancel()
self.action_to_do_after_od_reading()
self.sub_client.unsubscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval")
return
if msg.payload:
# turned off
self.set_currently_dodging_od(True)
else:
self.set_currently_dodging_od(False)

# OD found - revert to paused state
# we put this in a try for the following reason:
# if od reading is running, and we start Dodging job, the _setup_actions callback is fired
# _after_ this classes __init__ is done, but before the subclasses __init__. If
# action_to_do_before_od_reading references things in the subclasses __init__, it will
# fail.
self.logger.debug("OD reading data is found in MQTT. Dodging!")

try:
self.action_to_do_before_od_reading()
except Exception:
pass
def _setup_timer(self) -> None:

try:
self.sneak_in_timer.cancel()
except AttributeError:
pass
self.logger.debug("OD reading present. Dodging!")

self.sneak_in_timer.cancel()

post_delay = config.getfloat(f"{self.job_name}.config", "post_delay_duration", fallback=1.0)
pre_delay = config.getfloat(f"{self.job_name}.config", "pre_delay_duration", fallback=1.5)
Expand All @@ -1103,23 +1127,23 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:
return

self.is_after_period = True
self.action_to_do_after_od_reading()
self._action_to_do_after_od_reading()
sleep(ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay))
self.is_after_period = False
self.action_to_do_before_od_reading()
self._action_to_do_before_od_reading()

# this could fail in the following way:
# in the same experiment, the od_reading fails catastrophically so that the ADC attributes are never
# cleared. Later, this job starts, and it will pick up the _old_ ADC attributes.
ads_start_time_msg = subscribe(
f"pioreactor/{self.unit}/{self.experiment}/od_reading/first_od_obs_time"
f"pioreactor/{self.unit}/{self.experiment}/od_reading/first_od_obs_time", timeout=5
)
if ads_start_time_msg and ads_start_time_msg.payload:
ads_start_time = float(ads_start_time_msg.payload)
else:
return

ads_interval_msg = subscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval")
ads_interval_msg = subscribe(f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval", timeout=5)
if ads_interval_msg and ads_interval_msg.payload:
ads_interval = float(ads_interval_msg.payload)
else:
Expand Down Expand Up @@ -1147,6 +1171,7 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:
sleep(time_to_next_ads_reading + (post_delay + self.OD_READING_DURATION))
self.sneak_in_timer.start()


def on_sleeping(self) -> None:
try:
self.sneak_in_timer.pause()
Expand All @@ -1166,6 +1191,8 @@ def on_sleeping_to_ready(self) -> None:
pass




class BackgroundJobWithDodgingContrib(BackgroundJobWithDodging):
"""
Plugin jobs should inherit from this class.
Expand Down
3 changes: 0 additions & 3 deletions pioreactor/background_jobs/od_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,9 +1125,6 @@ def on_disconnected(self) -> None:
except Exception:
pass

# tech debt: clear _pre and _post
self._pre_read.clear()
self._post_read.clear()

def _get_ir_led_channel_from_configuration(self) -> pt.LedChannel:
try:
Expand Down
38 changes: 35 additions & 3 deletions pioreactor/background_jobs/stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pioreactor import exc
from pioreactor import hardware
from pioreactor import structs
from pioreactor.background_jobs.base import BackgroundJob
from pioreactor.background_jobs.base import BackgroundJobWithDodging
from pioreactor.config import config
from pioreactor.pubsub import subscribe
from pioreactor.utils import clamp
Expand Down Expand Up @@ -165,7 +165,7 @@ def estimate(self, seconds_to_observe: float) -> float:
return round(self._running_count * 60 / self._running_sum, 1)


class Stirrer(BackgroundJob):
class Stirrer(BackgroundJobWithDodging):
"""
Parameters
------------
Expand Down Expand Up @@ -280,6 +280,36 @@ def __init__(
logger=self.logger,
)

def action_to_do_before_od_reading(self):
self.logger.debug("stop stirring")
self.set_duty_cycle(0)

def action_to_do_after_od_reading(self):
self.logger.debug("starting stirring")
self.set_duty_cycle(self._estimate_duty_cycle)
sleep(1)
self.poll_and_update_dc()

def initialize_dodging_operation(self):
self.rpm_check_repeated_thread = RepeatedTimer(
1_000,
lambda *args: None,
job_name=self.job_name,
logger=self.logger,
)

def initialize_continuous_operation(self):
# set up thread to periodically check the rpm
self.rpm_check_repeated_thread = RepeatedTimer(
config.getfloat("stirring.config", "duration_between_updates_seconds", fallback=23.0),
self.poll_and_update_dc,
job_name=self.job_name,
run_immediately=True,
run_after=6,
logger=self.logger,
)


def initialize_rpm_to_dc_lookup(self) -> Callable:
if self.rpm_calculator is None:
# if we can't track RPM, no point in adjusting DC, use current value
Expand Down Expand Up @@ -308,6 +338,7 @@ def initialize_rpm_to_dc_lookup(self) -> Callable:
return lambda rpm: self._estimate_duty_cycle

def on_disconnected(self) -> None:
super().on_disconnected()
with suppress(AttributeError):
self.rpm_check_repeated_thread.cancel()
with suppress(AttributeError):
Expand Down Expand Up @@ -417,6 +448,7 @@ def on_ready_to_sleeping(self) -> None:
self.set_duty_cycle(0.0)

def on_sleeping_to_ready(self) -> None:
super().on_sleeping_to_ready()
self.duty_cycle = self._estimate_duty_cycle
self.rpm_check_repeated_thread.unpause()
self.start_stirring()
Expand Down Expand Up @@ -459,7 +491,7 @@ def block_until_rpm_is_close_to_target(
"""

if self.rpm_calculator is None or self.target_rpm is None: # or is_testing_env():
if self.rpm_calculator is None or self.target_rpm is None or self.currently_dodging_od: # or is_testing_env():
# can't block if we aren't recording the RPM
return False

Expand Down
Loading

0 comments on commit eb7b265

Please sign in to comment.