Skip to content

Commit

Permalink
Introduce a "Store" class
Browse files Browse the repository at this point in the history
Move towards backends that will store data
on disk rather than in memory
  • Loading branch information
WardLT committed Oct 20, 2023
1 parent cfb6a10 commit d3d5d7e
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions examol/store/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tools for interfacing with data stores"""
56 changes: 56 additions & 0 deletions examol/store/db/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Base classes for storage utilities"""
import gzip
from typing import Iterable
from pathlib import Path

from examol.store.models import MoleculeRecord


class MoleculeStore:
"""Base class defining how to interface with a dataset of molecule records.
Data stores provide the ability to persist the data collected by ExaMol to disk during a run.
The :meth:`update_record` call need not imemdaitely
Stores do not need support concurrent access from multiple client, which is why this documentation avoids the word "database."
"""

def __getitem__(self, mol_key: str) -> MoleculeRecord:
"""Retrieve a molecule record"""
raise NotImplementedError()

def __len__(self):
raise NotImplementedError()

def iterate_over_records(self) -> Iterable[MoleculeRecord]:
"""Iterate over all records in data
Yields:
A single record
"""
raise NotImplementedError()

def update_record(self, record: MoleculeRecord):
"""Update a single record
Args:
record: Record to be updated
"""
raise NotImplementedError()

def close(self):
"""Block until all changes to the database have been written to disk"""
raise NotImplementedError()

def export_records(self, path: Path):
"""Save a current copy of the database to disk as line-delimited JSON
Args:
path: Path in which to save all data. Use a ".json.gz"
"""

with (gzip.open(path, 'wt') if path.name.endswith('.gz') else open(path, 'w')) as fp:
for record in self.iterate_over_records():
print(record.to_json(), file=fp)
89 changes: 89 additions & 0 deletions examol/store/db/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Stores that keep the entire dataset in memory"""
import gzip
import logging
from pathlib import Path
from time import monotonic
from threading import Thread, Event
from typing import Iterable

from examol.store.db.base import MoleculeStore
from examol.store.models import MoleculeRecord

logger = logging.getLogger(__name__)


class InMemoryStore(MoleculeStore):
"""Store all molecule records in memory, write to disk as a single file
The class will start checkpointing as soon as any record is updated.
Args:
path: Path from which to read data. Must be a JSON file, can be compressed with GZIP
write_freq: Minimum time between writing checkpoints
"""

def __init__(self, path: Path, write_freq: float = 10.):
self.path = Path(path)
self.write_freq = write_freq
self.db: dict[str, MoleculeRecord] = {}

# Start thread which writes until
self._write_thread = None
self._updates_available: Event = Event()
self._closing = Event()

# Start by loading the molecules
self._load_molecules()

def _load_molecules(self):
"""Load molecules from disk"""
if not self.path.is_file():
return
logger.info(f'Loading data from {self.path}')
with (gzip.open(self.path, 'rt') if self.path.name.endswith('.gz') else self.path.open()) as fp:
for line in fp:
record = MoleculeRecord.from_json(line)
self.db[record.key] = record

def iterate_over_records(self) -> Iterable[MoleculeRecord]:
yield from list(self.db.values()) # Use `list` to copy the current state of the db and avoid errors due to concurrent writes

def __getitem__(self, item):
return self.db[item]

def __len__(self):
return len(self.db)

def _writer(self):
next_write = 0
while not self._closing.is_set():
# Wait until updates are available and the standoff is not met, or if we're closing
while (monotonic() < next_write or not self._updates_available.is_set()) and not self._closing.is_set():
self._updates_available.wait(timeout=1)

# Mark that we've caught up with whatever signaled this thread
self._updates_available.clear()

# Checkpoint and advance the standoff
self.export_records(self.path)
next_write = monotonic() + self.write_freq

def update_record(self, record: MoleculeRecord):
self.db[record.key] = record

# Start the write thread, if needed, and trigger it
if self._write_thread is None:
logger.info('Start the writing thread')
self._write_thread = Thread(target=self._writer)
self._write_thread.start()
self._updates_available.set()

def close(self):
# Trigger a last write
self._closing.set()
if self._write_thread is not None:
self._write_thread.join()

# Mark that we're closed
self._write_thread = None
self._closing.clear()
34 changes: 34 additions & 0 deletions tests/store/test_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Test an in-memory store for molecular data"""

from pytest import fixture

from examol.store.db.memory import InMemoryStore
from examol.store.models import MoleculeRecord


@fixture()
def records() -> list[MoleculeRecord]:
return [MoleculeRecord.from_identifier(smiles) for smiles in ['C', 'O', 'N']]


def test_store(tmpdir, records):
# Open the database
db_path = tmpdir / 'db.json.gz'
store = InMemoryStore(db_path)
try:
assert len(store) == 0

# Add the records
for record in records:
store.update_record(record)
assert len(store) == 3

finally:
store.close()

# Load database back in
store = InMemoryStore(db_path)
try:
assert len(store) == 3
finally:
store.close()

0 comments on commit d3d5d7e

Please sign in to comment.