Skip to content

Commit

Permalink
fix typo in persisent cache name; use configs to determine where cach…
Browse files Browse the repository at this point in the history
…es live; remove diskcache
  • Loading branch information
CamDavidsonPilon committed Dec 29, 2024
1 parent fe73608 commit 082c892
Show file tree
Hide file tree
Showing 29 changed files with 170 additions and 172 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
- [ ] logs page
- new table for historical assignments
- plugins page has dropdown to select the unit
- new config entries under storage


#### Breaking changes
- fixed typo `utils.local_persistant_storage` to `utils.local_persistent_storage`.

### 24.12.10
- Hotfix for UI settings bug
Expand Down
5 changes: 4 additions & 1 deletion config.dev.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ use_calibration=1
smoothing_penalizer=700.0

[storage]
database=pioreactor.sqlite
database=.pioreactor/storage/pioreactor.sqlite
temporary_cache=/tmp/local_intermittent_pioreactor_metadata.sqlite
persistent_cache=.pioreactor/storage/local_persistent_pioreactor_metadata.db


[logging]
log_file=./pioreactor.log
Expand Down
6 changes: 3 additions & 3 deletions pioreactor/actions/leader/backup_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pioreactor.exc import RsyncError
from pioreactor.logging import create_logger
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.networking import rsync
Expand Down Expand Up @@ -69,7 +69,7 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
bck.close()
con.close()

with local_persistant_storage("database_backups") as cache:
with local_persistent_storage("database_backups") as cache:
cache["latest_backup_timestamp"] = current_time

logger.info("Completed backup of database.")
Expand Down Expand Up @@ -102,7 +102,7 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
logger.debug(f"Backed up database to {backup_unit}:{output_file}.")
backups_complete += 1

with local_persistant_storage("database_backups") as cache:
with local_persistent_storage("database_backups") as cache:
cache[f"latest_backup_in_{backup_unit}"] = current_time

return
Expand Down
8 changes: 3 additions & 5 deletions pioreactor/actions/od_blank.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections import defaultdict
Expand All @@ -21,7 +20,7 @@
from pioreactor.logging import create_logger
from pioreactor.pubsub import prune_retained_messages
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils import math_helpers
from pioreactor.utils.timing import current_utc_datetime
Expand Down Expand Up @@ -119,7 +118,7 @@ def delete_od_blank(unit=None, experiment=None):
unit = unit or whoami.get_unit_name()
experiment = experiment or whoami.get_assigned_experiment_name(unit)

with local_persistant_storage(action_name) as cache:
with local_persistent_storage(action_name) as cache:
if experiment not in cache:
return

Expand Down Expand Up @@ -150,7 +149,6 @@ def od_blank(
experiment=None,
) -> dict[pt.PdChannel, float]:
from pioreactor.background_jobs.od_reading import start_od_reading
from pioreactor.background_jobs.stirring import start_stirring

action_name = "od_blank"
unit = unit or whoami.get_unit_name()
Expand Down Expand Up @@ -193,7 +191,7 @@ def od_blank(
logger.error(e)
raise e

with local_persistant_storage(action_name) as cache:
with local_persistent_storage(action_name) as cache:
cache[experiment] = dumps(means)

for channel, mean in means.items():
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/actions/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _get_pin(pump_type: str, config) -> pt.GpioPin:

def _get_calibration(pump_type: str) -> structs.AnyPumpCalibration:
# TODO: make sure current voltage is the same as calibrated. Actually where should that check occur? in Pump?
with utils.local_persistant_storage("current_pump_calibration") as cache:
with utils.local_persistent_storage("current_pump_calibration") as cache:
try:
return decode(cache[pump_type], type=structs.AnyPumpCalibration) # type: ignore
except KeyError:
Expand Down
7 changes: 3 additions & 4 deletions pioreactor/actions/self_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import sys
from json import dumps
from json import loads
from threading import Thread
from time import sleep
from typing import Callable
Expand Down Expand Up @@ -44,7 +43,7 @@
from pioreactor.types import LedChannel
from pioreactor.types import PdChannel
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils import SummableDict
from pioreactor.utils.math_helpers import correlation
Expand Down Expand Up @@ -462,12 +461,12 @@ def _run(self, managed_state, logger: CustomLogger, unit: str, testing_experimen

managed_state.publish_setting(test_name, int(res))

with local_persistant_storage("self_test_results") as c:
with local_persistent_storage("self_test_results") as c:
c[(self.experiment, test_name)] = int(res)


def get_failed_test_names(experiment: str) -> Iterator[str]:
with local_persistant_storage("self_test_results") as c:
with local_persistent_storage("self_test_results") as c:
for name in get_all_test_names():
if c.get((experiment, name)) == 0:
yield name
Expand Down
4 changes: 2 additions & 2 deletions pioreactor/automations/dosing/chemostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage


class Chemostat(DosingAutomationJob):
Expand All @@ -20,7 +20,7 @@ class Chemostat(DosingAutomationJob):
def __init__(self, volume: float | str, **kwargs) -> None:
super().__init__(**kwargs)

with local_persistant_storage("current_pump_calibration") as cache:
with local_persistent_storage("current_pump_calibration") as cache:
if "media" not in cache:
raise CalibrationError("Media and waste pump calibration must be performed first.")
elif "waste" not in cache:
Expand Down
4 changes: 2 additions & 2 deletions pioreactor/automations/dosing/fed_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage


class FedBatch(DosingAutomationJob):
Expand All @@ -21,7 +21,7 @@ class FedBatch(DosingAutomationJob):
def __init__(self, volume, **kwargs):
super().__init__(**kwargs)

with local_persistant_storage("current_pump_calibration") as cache:
with local_persistent_storage("current_pump_calibration") as cache:
if "media" not in cache:
raise CalibrationError("Media pump calibration must be performed first.")

Expand Down
4 changes: 2 additions & 2 deletions pioreactor/automations/dosing/pid_morbidostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.config import config
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils.streaming_calculations import PID


Expand All @@ -29,7 +29,7 @@ def __init__(self, target_growth_rate: float | str, target_normalized_od: float
assert target_normalized_od is not None, "`target_normalized_od` must be set"
assert target_growth_rate is not None, "`target_growth_rate` must be set"

with local_persistant_storage("current_pump_calibration") as cache:
with local_persistent_storage("current_pump_calibration") as cache:
if "media" not in cache:
raise CalibrationError("Media pump calibration must be performed first.")
elif "waste" not in cache:
Expand Down
4 changes: 2 additions & 2 deletions pioreactor/automations/dosing/turbidostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.config import config
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils.streaming_calculations import ExponentialMovingAverage


Expand All @@ -34,7 +34,7 @@ def __init__(
) -> None:
super().__init__(**kwargs)

with local_persistant_storage("current_pump_calibration") as cache:
with local_persistent_storage("current_pump_calibration") as cache:
if "media" not in cache:
raise CalibrationError("Media pump calibration must be performed first.")
elif "waste" not in cache:
Expand Down
18 changes: 9 additions & 9 deletions pioreactor/background_jobs/dosing_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pioreactor.config import config
from pioreactor.logging import create_logger
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import SummableDict
from pioreactor.utils import whoami
from pioreactor.utils.timing import current_utc_datetime
Expand Down Expand Up @@ -584,14 +584,14 @@ def _update_alt_media_fraction(self, dosing_event: structs.DosingEvent) -> None:
)

# add to cache
with local_persistant_storage("alt_media_fraction") as cache:
with local_persistent_storage("alt_media_fraction") as cache:
cache[self.experiment] = self.alt_media_fraction

def _update_liquid_volume(self, dosing_event: structs.DosingEvent) -> None:
self.liquid_volume = LiquidVolumeCalculator.update(dosing_event, self.liquid_volume)

# add to cache
with local_persistant_storage("liquid_volume") as cache:
with local_persistent_storage("liquid_volume") as cache:
cache[self.experiment] = self.liquid_volume

if self.liquid_volume >= self.MAX_VIAL_VOLUME_TO_WARN:
Expand All @@ -610,10 +610,10 @@ def _update_throughput(self, dosing_event: structs.DosingEvent) -> None:
) = ThroughputCalculator.update(dosing_event, self.media_throughput, self.alt_media_throughput)

# add to cache
with local_persistant_storage("alt_media_throughput") as cache:
with local_persistent_storage("alt_media_throughput") as cache:
cache[self.experiment] = self.alt_media_throughput

with local_persistant_storage("media_throughput") as cache:
with local_persistent_storage("media_throughput") as cache:
cache[self.experiment] = self.media_throughput

def _init_alt_media_fraction(self, initial_alt_media_fraction: float) -> None:
Expand All @@ -626,7 +626,7 @@ def _init_alt_media_fraction(self, initial_alt_media_fraction: float) -> None:
},
)

with local_persistant_storage("alt_media_fraction") as cache:
with local_persistent_storage("alt_media_fraction") as cache:
self.alt_media_fraction = cache.get(self.experiment, initial_alt_media_fraction)

return
Expand All @@ -643,7 +643,7 @@ def _init_liquid_volume(self, initial_liquid_volume: float) -> None:
},
)

with local_persistant_storage("liquid_volume") as cache:
with local_persistent_storage("liquid_volume") as cache:
self.liquid_volume = cache.get(self.experiment, initial_liquid_volume)

return
Expand All @@ -666,10 +666,10 @@ def _init_volume_throughput(self) -> None:
},
)

with local_persistant_storage("alt_media_throughput") as cache:
with local_persistent_storage("alt_media_throughput") as cache:
self.alt_media_throughput = cache.get(self.experiment, 0.0)

with local_persistant_storage("media_throughput") as cache:
with local_persistent_storage("media_throughput") as cache:
self.media_throughput = cache.get(self.experiment, 0.0)

return
Expand Down
32 changes: 16 additions & 16 deletions pioreactor/background_jobs/growth_rate_calculating.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from pioreactor.config import config
from pioreactor.pubsub import QOS
from pioreactor.pubsub import subscribe
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils.streaming_calculations import CultureGrowthEKF


Expand Down Expand Up @@ -230,10 +230,10 @@ def create_obs_noise_covariance(self, obs_std): # type: ignore
self.logger.debug(exc_info=True)
# we should clear the cache here...

with local_persistant_storage("od_normalization_mean") as cache:
with local_persistent_storage("od_normalization_mean") as cache:
del cache[self.experiment]

with local_persistant_storage("od_normalization_variance") as cache:
with local_persistent_storage("od_normalization_variance") as cache:
del cache[self.experiment]

raise ZeroDivisionError(
Expand All @@ -257,11 +257,11 @@ def _compute_and_cache_od_statistics(
)
self.logger.info("Completed OD normalization metrics.")

with local_persistant_storage("od_normalization_mean") as cache:
with local_persistent_storage("od_normalization_mean") as cache:
if self.experiment not in cache:
cache[self.experiment] = dumps(means)

with local_persistant_storage("od_normalization_variance") as cache:
with local_persistent_storage("od_normalization_variance") as cache:
if self.experiment not in cache:
cache[self.experiment] = dumps(variances)

Expand Down Expand Up @@ -303,7 +303,7 @@ def get_precomputed_values(
)

def get_od_blank_from_cache(self) -> dict[pt.PdChannel, float]:
with local_persistant_storage("od_blank") as cache:
with local_persistent_storage("od_blank") as cache:
result = cache.get(self.experiment)

if result is not None:
Expand All @@ -313,13 +313,13 @@ def get_od_blank_from_cache(self) -> dict[pt.PdChannel, float]:
return defaultdict(lambda: 0.0)

def get_growth_rate_from_cache(self) -> float:
with local_persistant_storage("growth_rate") as cache:
with local_persistent_storage("growth_rate") as cache:
return cache.get(self.experiment, 0.0)

def get_filtered_od_from_cache_or_computed(self) -> float:
from statistics import mean

with local_persistant_storage("od_filtered") as cache:
with local_persistent_storage("od_filtered") as cache:
if self.experiment in cache:
return cache[self.experiment]

Expand All @@ -341,7 +341,7 @@ def get_filtered_od_from_cache_or_computed(self) -> float:

def get_od_normalization_from_cache(self) -> dict[pt.PdChannel, float]:
# we check if we've computed mean stats
with local_persistant_storage("od_normalization_mean") as cache:
with local_persistent_storage("od_normalization_mean") as cache:
result = cache.get(self.experiment, None)
if result is not None:
return loads(result)
Expand All @@ -353,7 +353,7 @@ def get_od_normalization_from_cache(self) -> dict[pt.PdChannel, float]:

def get_od_variances_from_cache(self) -> dict[pt.PdChannel, float]:
# we check if we've computed variance stats
with local_persistant_storage("od_normalization_variance") as cache:
with local_persistent_storage("od_normalization_variance") as cache:
result = cache.get(self.experiment, None)
if result:
return loads(result)
Expand Down Expand Up @@ -423,10 +423,10 @@ def update_state_from_observation(
return self.growth_rate, self.od_filtered, self.kalman_filter_outputs

# save to cache
with local_persistant_storage("growth_rate") as cache:
with local_persistent_storage("growth_rate") as cache:
cache[self.experiment] = self.growth_rate.growth_rate

with local_persistant_storage("od_filtered") as cache:
with local_persistent_storage("od_filtered") as cache:
cache[self.experiment] = self.od_filtered.od_filtered

return self.growth_rate, self.od_filtered, self.kalman_filter_outputs
Expand Down Expand Up @@ -573,11 +573,11 @@ def click_clear_cache() -> None:
unit = whoami.get_unit_name()
experiment = whoami.get_assigned_experiment_name(unit)

with local_persistant_storage("od_filtered") as cache:
with local_persistent_storage("od_filtered") as cache:
cache.pop(experiment)
with local_persistant_storage("growth_rate") as cache:
with local_persistent_storage("growth_rate") as cache:
cache.pop(experiment)
with local_persistant_storage("od_normalization_mean") as cache:
with local_persistent_storage("od_normalization_mean") as cache:
cache.pop(experiment)
with local_persistant_storage("od_normalization_variance") as cache:
with local_persistent_storage("od_normalization_variance") as cache:
cache.pop(experiment)
6 changes: 5 additions & 1 deletion pioreactor/background_jobs/leader/mqtt_to_db_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ def __init__(
for topic_to_table in topics_to_tables
]

self.timer = RepeatedTimer(60, self.write_stats).start()
self.timer = RepeatedTimer(
60,
self.write_stats,
job_name=self.job_name,
).start()

self.initialize_callbacks(topics_and_callbacks)

Expand Down
Loading

0 comments on commit 082c892

Please sign in to comment.