diff --git a/examol/store/db/base.py b/examol/store/db/base.py index 5b9e3e9..bf8f2ff 100644 --- a/examol/store/db/base.py +++ b/examol/store/db/base.py @@ -1,20 +1,23 @@ """Base classes for storage utilities""" import gzip -from typing import Iterable +from abc import ABC from pathlib import Path +from typing import Iterable +from contextlib import AbstractContextManager + from examol.store.models import MoleculeRecord -class MoleculeStore: +class MoleculeStore(AbstractContextManager, ABC): """Base class defining how to interface with a dataset of molecule records. Data stores provide the ability to persist the data collected by ExaMol to disk during a run. - The :meth:`update_record` call need not imemdaitely + The :meth:`update_record` call need not immediately persist the data but should ensure that the data + is stored on disk eventually. + In fact, it is actually better for the update operation to not block until the resulting write has completed. Stores do not need support concurrent access from multiple client, which is why this documentation avoids the word "database." - - """ def __getitem__(self, mol_key: str) -> MoleculeRecord: diff --git a/examol/store/db/memory.py b/examol/store/db/memory.py index 9b33f88..57f4155 100644 --- a/examol/store/db/memory.py +++ b/examol/store/db/memory.py @@ -35,6 +35,23 @@ def __init__(self, path: Path, write_freq: float = 10.): # Start by loading the molecules self._load_molecules() + def __enter__(self): + logger.info('Start the writing thread') + self._write_thread = Thread(target=self._writer) + self._write_thread.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Trigger a last write + logger.info('Triggering a last write to the database') + self._closing.set() + if self._write_thread is not None: + self._write_thread.join() + + # Mark that we're closed + self._write_thread = None + self._closing.clear() + def _load_molecules(self): """Load molecules from disk""" if not self.path.is_file(): @@ -44,6 +61,7 @@ def _load_molecules(self): for line in fp: record = MoleculeRecord.from_json(line) self.db[record.key] = record + logger.info(f'Loaded {len(self.db)} molecule records') def iterate_over_records(self) -> Iterable[MoleculeRecord]: yield from list(self.db.values()) # Use `list` to copy the current state of the db and avoid errors due to concurrent writes @@ -56,7 +74,7 @@ def __len__(self): def _writer(self): next_write = 0 - while not self._closing.is_set(): + while not (self._closing.is_set() or self._updates_available.is_set()): # Loop until closing and no updates are available # Wait until updates are available and the standoff is not met, or if we're closing while (monotonic() < next_write or not self._updates_available.is_set()) and not self._closing.is_set(): self._updates_available.wait(timeout=1) @@ -70,20 +88,4 @@ def _writer(self): def update_record(self, record: MoleculeRecord): self.db[record.key] = record - - # Start the write thread, if needed, and trigger it - if self._write_thread is None: - logger.info('Start the writing thread') - self._write_thread = Thread(target=self._writer) - self._write_thread.start() self._updates_available.set() - - def close(self): - # Trigger a last write - self._closing.set() - if self._write_thread is not None: - self._write_thread.join() - - # Mark that we're closed - self._write_thread = None - self._closing.clear() diff --git a/tests/store/test_memory.py b/tests/store/test_memory.py index 7f10238..33ee168 100644 --- a/tests/store/test_memory.py +++ b/tests/store/test_memory.py @@ -14,8 +14,7 @@ def records() -> list[MoleculeRecord]: def test_store(tmpdir, records): # Open the database db_path = tmpdir / 'db.json.gz' - store = InMemoryStore(db_path) - try: + with InMemoryStore(db_path) as store: assert len(store) == 0 # Add the records @@ -23,12 +22,6 @@ def test_store(tmpdir, records): store.update_record(record) assert len(store) == 3 - finally: - store.close() - # Load database back in - store = InMemoryStore(db_path) - try: + with InMemoryStore(db_path) as store: assert len(store) == 3 - finally: - store.close()