diff --git a/examol/store/db/__init__.py b/examol/store/db/__init__.py new file mode 100644 index 0000000..aa3db99 --- /dev/null +++ b/examol/store/db/__init__.py @@ -0,0 +1 @@ +"""Tools for interfacing with data stores""" diff --git a/examol/store/db/base.py b/examol/store/db/base.py new file mode 100644 index 0000000..5b9e3e9 --- /dev/null +++ b/examol/store/db/base.py @@ -0,0 +1,56 @@ +"""Base classes for storage utilities""" +import gzip +from typing import Iterable +from pathlib import Path + +from examol.store.models import MoleculeRecord + + +class MoleculeStore: + """Base class defining how to interface with a dataset of molecule records. + + Data stores provide the ability to persist the data collected by ExaMol to disk during a run. + The :meth:`update_record` call need not imemdaitely + + Stores do not need support concurrent access from multiple client, which is why this documentation avoids the word "database." + + + """ + + def __getitem__(self, mol_key: str) -> MoleculeRecord: + """Retrieve a molecule record""" + raise NotImplementedError() + + def __len__(self): + raise NotImplementedError() + + def iterate_over_records(self) -> Iterable[MoleculeRecord]: + """Iterate over all records in data + + Yields: + A single record + """ + raise NotImplementedError() + + def update_record(self, record: MoleculeRecord): + """Update a single record + + Args: + record: Record to be updated + """ + raise NotImplementedError() + + def close(self): + """Block until all changes to the database have been written to disk""" + raise NotImplementedError() + + def export_records(self, path: Path): + """Save a current copy of the database to disk as line-delimited JSON + + Args: + path: Path in which to save all data. Use a ".json.gz" + """ + + with (gzip.open(path, 'wt') if path.name.endswith('.gz') else open(path, 'w')) as fp: + for record in self.iterate_over_records(): + print(record.to_json(), file=fp) diff --git a/examol/store/db/memory.py b/examol/store/db/memory.py new file mode 100644 index 0000000..9b33f88 --- /dev/null +++ b/examol/store/db/memory.py @@ -0,0 +1,89 @@ +"""Stores that keep the entire dataset in memory""" +import gzip +import logging +from pathlib import Path +from time import monotonic +from threading import Thread, Event +from typing import Iterable + +from examol.store.db.base import MoleculeStore +from examol.store.models import MoleculeRecord + +logger = logging.getLogger(__name__) + + +class InMemoryStore(MoleculeStore): + """Store all molecule records in memory, write to disk as a single file + + The class will start checkpointing as soon as any record is updated. + + Args: + path: Path from which to read data. Must be a JSON file, can be compressed with GZIP + write_freq: Minimum time between writing checkpoints + """ + + def __init__(self, path: Path, write_freq: float = 10.): + self.path = Path(path) + self.write_freq = write_freq + self.db: dict[str, MoleculeRecord] = {} + + # Start thread which writes until + self._write_thread = None + self._updates_available: Event = Event() + self._closing = Event() + + # Start by loading the molecules + self._load_molecules() + + def _load_molecules(self): + """Load molecules from disk""" + if not self.path.is_file(): + return + logger.info(f'Loading data from {self.path}') + with (gzip.open(self.path, 'rt') if self.path.name.endswith('.gz') else self.path.open()) as fp: + for line in fp: + record = MoleculeRecord.from_json(line) + self.db[record.key] = record + + def iterate_over_records(self) -> Iterable[MoleculeRecord]: + yield from list(self.db.values()) # Use `list` to copy the current state of the db and avoid errors due to concurrent writes + + def __getitem__(self, item): + return self.db[item] + + def __len__(self): + return len(self.db) + + def _writer(self): + next_write = 0 + while not self._closing.is_set(): + # Wait until updates are available and the standoff is not met, or if we're closing + while (monotonic() < next_write or not self._updates_available.is_set()) and not self._closing.is_set(): + self._updates_available.wait(timeout=1) + + # Mark that we've caught up with whatever signaled this thread + self._updates_available.clear() + + # Checkpoint and advance the standoff + self.export_records(self.path) + next_write = monotonic() + self.write_freq + + def update_record(self, record: MoleculeRecord): + self.db[record.key] = record + + # Start the write thread, if needed, and trigger it + if self._write_thread is None: + logger.info('Start the writing thread') + self._write_thread = Thread(target=self._writer) + self._write_thread.start() + self._updates_available.set() + + def close(self): + # Trigger a last write + self._closing.set() + if self._write_thread is not None: + self._write_thread.join() + + # Mark that we're closed + self._write_thread = None + self._closing.clear() diff --git a/tests/store/test_memory.py b/tests/store/test_memory.py new file mode 100644 index 0000000..7f10238 --- /dev/null +++ b/tests/store/test_memory.py @@ -0,0 +1,34 @@ +"""Test an in-memory store for molecular data""" + +from pytest import fixture + +from examol.store.db.memory import InMemoryStore +from examol.store.models import MoleculeRecord + + +@fixture() +def records() -> list[MoleculeRecord]: + return [MoleculeRecord.from_identifier(smiles) for smiles in ['C', 'O', 'N']] + + +def test_store(tmpdir, records): + # Open the database + db_path = tmpdir / 'db.json.gz' + store = InMemoryStore(db_path) + try: + assert len(store) == 0 + + # Add the records + for record in records: + store.update_record(record) + assert len(store) == 3 + + finally: + store.close() + + # Load database back in + store = InMemoryStore(db_path) + try: + assert len(store) == 3 + finally: + store.close()