Skip to content

Commit

Permalink
introduce a native 'cache', eventually we will replace diskcache
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Dec 2, 2024
1 parent 975e27b commit 5ff21f8
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#### Bug fixes
- Fixed "circulate X" actions in the Manage All dialog in the UI.

#### Breaking changes
- moved all the temporary caches, which previously where their own sqlite3 db in /tmp/ to /tmp/local_intermittent_pioreactor_metadata.sqlite. This shouldn't break anything unless you update _during_ an experiment - don't do that!

### 24.10.29

#### Enhancements
Expand Down
8 changes: 4 additions & 4 deletions pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ def _setup_timer(self) -> None:
if pre_delay < 0.25:
self.logger.warning("For optimal OD readings, keep `pre_delay_duration` more than 0.25 seconds.")

def sneak_in(ads_interval, post_delay, pre_delay) -> None:
def sneak_in(ads_interval: float, post_delay: float, pre_delay: float) -> None:
if self.state != self.READY:
return

Expand All @@ -1149,9 +1149,9 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:
# in the same experiment, the od_reading fails catastrophically so that the settings are never
# cleared. Later, this job starts, and it will pick up the _old_ settings.
with JobManager() as jm:
ads_interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5)
ads_start_time = jm.get_setting_from_running_job(
"od_reading", "first_od_obs_time", timeout=5
ads_interval = float(jm.get_setting_from_running_job("od_reading", "interval", timeout=5))
ads_start_time = float(
jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5)
) # this is populated later in the OD job...

# get interval, and confirm that the requirements are possible: post_delay + pre_delay <= ADS interval - (od reading duration)
Expand Down
7 changes: 5 additions & 2 deletions pioreactor/background_jobs/stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class Stirrer(BackgroundJobWithDodging):
"measured_rpm": {"datatype": "MeasuredRPM", "settable": False, "unit": "RPM"},
"duty_cycle": {"datatype": "float", "settable": True, "unit": "%"},
}
# the _estimate_duty_cycle parameter is like the unrealized DC, and the duty_cycle is the realized DC.
_estimate_duty_cycle: float = config.getfloat("stirring.config", "initial_duty_cycle", fallback=30)
duty_cycle: float = 0
_measured_rpm: Optional[float] = None
Expand Down Expand Up @@ -370,8 +371,10 @@ def kick_stirring_but_avoid_od_reading(self) -> None:
wait until it completes before kicking stirring.
"""
with JobManager() as jm:
interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5)
first_od_obs_time = jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5)
interval = float(jm.get_setting_from_running_job("od_reading", "interval", timeout=5))
first_od_obs_time = float(
jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5)
)

seconds_to_next_reading = interval - (time() - first_od_obs_time) % interval
sleep(
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/tests/test_stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test_block_until_rpm_is_close_to_target_will_timeout() -> None:


def test_block_until_rpm_is_close_will_exit() -> None:
exp = "test_block_until_rpm_isf_close_to_target_will_timeout"
exp = "test_block_until_rpm_is_close_will_exit"
rpm_calculator = MockRpmCalculator()
rpm_calculator.setup()
with Stirrer(
Expand Down
3 changes: 1 addition & 2 deletions pioreactor/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from msgspec.json import encode as dumps

from pioreactor.background_jobs.stirring import start_stirring
from pioreactor.exc import JobNotRunningError
from pioreactor.tests.conftest import capture_requests
from pioreactor.utils import callable_stack
from pioreactor.utils import ClusterJobManager
Expand Down Expand Up @@ -342,5 +341,5 @@ def test_retrieve_setting(job_manager, job_id):

# turn off
job_manager.set_not_running(job_key)
with pytest.raises(JobNotRunningError):
with pytest.raises(NameError):
job_manager.get_setting_from_running_job("test_name", "my_setting_int")
72 changes: 65 additions & 7 deletions pioreactor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from pioreactor.pubsub import patch_into
from pioreactor.pubsub import subscribe_and_callback
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.timing import current_utc_timestamp
from pioreactor.utils.timing import catchtime
from pioreactor.utils.timing import current_utc_timestamp

if TYPE_CHECKING:
from pioreactor.pubsub import Client
Expand Down Expand Up @@ -262,6 +262,68 @@ def publish_setting(self, setting: str, value: Any) -> None:
jm.upsert_setting(self._job_id, setting, value)


class cache:
def __init__(self, table_name):
self.table_name = f"cache_{table_name}"
self.db_path = f"{tempfile.gettempdir()}/local_intermittent_pioreactor_metadata.sqlite"

def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
self._initialize_table()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.conn.commit()
self.conn.close()

def _initialize_table(self):
self.cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
key BLOB PRIMARY KEY,
value BLOB
)
"""
)

def __setitem__(self, key, value):
self.cursor.execute(
f"""
INSERT INTO {self.table_name} (key, value)
VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value
""",
(key, value),
)

def get(self, key, default=None):
self.cursor.execute(f"SELECT value FROM {self.table_name} WHERE key = ?", (key,))
result = self.cursor.fetchone()
return result[0] if result else default

def iterkeys(self):
self.cursor.execute(f"SELECT key FROM {self.table_name}")
return (row[0] for row in self.cursor.fetchall())

def __contains__(self, key):
self.cursor.execute(f"SELECT 1 FROM {self.table_name} WHERE key = ?", (key,))
return self.cursor.fetchone() is not None

def __iter__(self):
return self.iterkeys()

def __delitem__(self, key):
self.cursor.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,))

def __getitem__(self, key):
self.cursor.execute(f"SELECT value FROM {self.table_name} WHERE key = ?", (key,))
result = self.cursor.fetchone()
if result is None:
raise KeyError(f"Key '{key}' not found in cache.")
return result[0]


@contextmanager
def local_intermittent_storage(
cache_name: str,
Expand All @@ -282,11 +344,8 @@ def local_intermittent_storage(
Opening the same cache in a context manager is tricky, and should be avoided.
"""
# gettempdir find the directory named by the TMPDIR environment variable.
# TMPDIR is set in the Pioreactor img.
tmp_dir = tempfile.gettempdir()
with Cache(f"{tmp_dir}/{cache_name}", sqlite_journal_mode="wal") as cache:
yield cache # type: ignore
with cache(f"{cache_name}") as c:
yield c # type: ignore


@contextmanager
Expand Down Expand Up @@ -625,7 +684,6 @@ def get_setting_from_running_job(self, job_name: str, setting: str, timeout=None
if (timeout and timer() > timeout) or (timeout is None):
raise NameError(f"Setting {setting} was not found.")


def set_not_running(self, job_id: JobMetadataKey) -> None:
update_query = "UPDATE pio_job_metadata SET is_running=0, ended_at=STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW') WHERE id=(?)"
self.cursor.execute(update_query, (job_id,))
Expand Down
6 changes: 6 additions & 0 deletions pioreactor/whoami.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import time
import warnings
from functools import cache

from pioreactor import mureq
Expand All @@ -18,6 +19,11 @@
NO_EXPERIMENT = "$no_experiment_present"


def get_latest_experiment_name() -> str:
warnings.warn("Use whoami.get_assigned_experiment_name(unit) instead", DeprecationWarning, stacklevel=2)
return get_assigned_experiment_name(get_unit_name())


def get_testing_experiment_name() -> str:
try:
exp = get_assigned_experiment_name(get_unit_name())
Expand Down

0 comments on commit 5ff21f8

Please sign in to comment.