Skip to content

Commit

Permalink
Support multi-fidelity active learning (#122)
Browse files Browse the repository at this point in the history
* Start a base class for thinkers that use ML

* Move training logic into base class

* Start specification for multi-fidelity

* Move startup to the base class

* Make a "get or make" DB operation

* Include the inchi key method

* Add the task selection logic for multifi

* We are printing the InChI key now

* Keep track of which molecules are in the database

* Make PipelineThinker a subclass of Single

Was about to refactor in a way that the Single objective
thinker did not have any methods

* Fix the call to the super __init__

* Combined submit and store for inference

Will make it easier to subclass

* Refactor inference submission, implement multifi

* Fix model evicting logic

* Pass recipe along with task information

* Find levels given solution

* Filter out scores from search space molecules in database

Also start on multi-fidelity training

* Refactor to use new multifi scorer interface

E.g., pass lower fidelities as a kwarg

* Extend test for better coverage

* Made code more robust

1. Race condition of completed molecule being put back in queue
2. Molecules failing to parse during loading
3. One of two recipes not having desired fidelity

* Updated tests with new logging messages

* Only compute features once

* Document the new thinker
  • Loading branch information
WardLT authored Nov 20, 2023
1 parent 8c62f37 commit 31df55c
Show file tree
Hide file tree
Showing 17 changed files with 753 additions and 190 deletions.
8 changes: 8 additions & 0 deletions docs/api/examol.steer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ examol.steer.single
:members:
:undoc-members:
:show-inheritance:

examol.steer.multifi
--------------------

.. automodule:: examol.steer.multifi
:members:
:undoc-members:
:show-inheritance:
5 changes: 4 additions & 1 deletion docs/components/steer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ Each steering strategy is associated with a specific `Solution strategy <solutio
- :class:`~examol.specify.base.SolutionSpecification`
- Evaluate all molecules in an initial population
* - :class:`~examol.steer.single.SingleStepThinker`
- :class:`~examol.specify.solution.SingleFidelityActiveLearning`
- :class:`~examol.solution.SingleFidelityActiveLearning`
- Run all recipes for each selected molecule
* - :class:`~examol.steer.multifi.PipelineThinker`
- :class:`~examol.solution.MultiFidelityActiveLearning`
- Run the next step in a pipeline each time a model is selected


Single Objective Thinker as an Example
Expand Down
31 changes: 16 additions & 15 deletions examol/score/rdkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def score(self, model_msg: ModelType, inputs: InputType, lower_fidelities: np.nd
return np.sum(deltas, axis=1)

def retrain(self, model_msg: Pipeline, inputs: InputType, outputs: np.ndarray,
bootstrap: bool = True,
bootstrap: bool = False,
lower_fidelities: np.ndarray | None = None) -> ModelType:
if bootstrap:
samples = np.random.random_integers(0, len(inputs) - 1, size=(len(inputs),))
Expand Down Expand Up @@ -149,20 +149,21 @@ def make_gpr_model(num_pcs: int | None = None, max_pcs: int = 10, k: int = 3) ->
gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=40, normalize_y=False)

# Make the pipeline
pipeline = Pipeline([
('fingerprints', FingerprintTransformer(compute_doan_2020_fingerprints)),
('pca', PCA(n_components=num_pcs)),
('gpr', gp)
])

if num_pcs is None:
pipeline = GridSearchCV(
pipeline,
param_grid={'pca__n_components': range(1, max_pcs + 1)},
cv=k,
n_jobs=1 # Parallelism is a level below
)
return pipeline
if num_pcs is not None:
return Pipeline([
('fingerprints', FingerprintTransformer(compute_doan_2020_fingerprints)),
('pca', PCA(n_components=num_pcs)),
('gpr', gp)
])
else:
return Pipeline([
('fingerprints', FingerprintTransformer(compute_doan_2020_fingerprints)),
('model', GridSearchCV(
Pipeline([('pca', PCA(n_components=num_pcs)), ('gpr', gp)]),
param_grid={'pca__n_components': range(1, max_pcs + 1)},
cv=k,
)),
])


class FingerprintTransformer(BaseEstimator, TransformerMixin):
Expand Down
43 changes: 41 additions & 2 deletions examol/solution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Specifications for different solution methods"""
from functools import partial, update_wrapper
from dataclasses import field, dataclass
from typing import Callable
from typing import Callable, Sequence, Collection

from examol.score.base import Scorer
from examol.select.base import Selector
from examol.start.base import Starter
from examol.start.fast import RandomStarter
from examol.store.recipes import PropertyRecipe


@dataclass
Expand Down Expand Up @@ -37,7 +38,9 @@ class SingleFidelityActiveLearning(SolutionSpecification):
scorer: Scorer = ... # TODO (wardlt): Support a different type of model for each recipe
"""Defines algorithms used to retrain and run :attr:`models`"""
models: list[list[object]] = ...
"""List of machine learning models used to predict outcome of :attr:`recipes`"""
"""List of machine learning models used to predict outcome of :attr:`recipes`.
``models[i][j] is model ``j`` for property ``i``"""
minimum_training_size: int = 10
"""Minimum database size before starting training. Thinkers will run selections from :attr:`starter` before them"""

Expand All @@ -57,3 +60,39 @@ def _wrap_function(fun, options: dict):
_wrap_function(self.scorer.retrain, self.train_options),
_wrap_function(self.scorer.score, self.score_options)
]


@dataclass
class MultiFidelityActiveLearning(SingleFidelityActiveLearning):
"""Tools needed for solving a multi-fidelity active learning problem
Users must define a series of recipes to be run at each step in the workflow,
:attr:`steps`.
The next step is run each time a molecule is selected for execution.
For example, a Thinker would run all recipes in the first step for a molecule for which no data is available
and then the second step of recipes after all from the first have completed.
The user also specifies a set fraction of entries to progress through each stage,
which sets the probability of selecting a certain step in the calculation.
For example a :attr:`pipeline_target` of 0.1 means that 10% of entries will
pass through each stage of the pipeline.
We can achieve this target by selecting to run the first stage of the pipeline
10 times more often then the next stage.
"""

steps: Sequence[Collection[PropertyRecipe]] = ()
"""Incremental steps to perform along the way to a maximum level of fidelity"""

pipeline_target: float = 0.1
"""Fraction of entries to progress through each stage of the pipeline"""

def get_levels_for_property(self, recipe: PropertyRecipe) -> list[PropertyRecipe]:
"""Get the list of levels at which we compute a certain property"""

levels = []
for recipes in self.steps:
for step in recipes:
if recipe.name == step.name:
levels.append(step)
levels.append(recipe)
return levels
80 changes: 53 additions & 27 deletions examol/steer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from dataclasses import asdict
from threading import Condition
from collections import defaultdict
from typing import Iterator, Sequence
from typing import Iterator, Sequence, Iterable

import numpy as np
from colmena.models import Result
from colmena.queue import ColmenaQueues
from colmena.thinker import BaseThinker, ResourceCounter, result_processor, task_submitter
from pydantic import ValidationError

from examol.simulate.base import SimResult
from examol.solution import SolutionSpecification
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self,
self.task_queue_lock = Condition()
self.task_queue = [] # List of tasks to run, SMILES string and score
self.task_iterator = self.task_iterator() # Tool for pulling from the task queue
self.recipe_types = dict((r.name, r) for r in recipes)

def iterate_over_search_space(self, only_smiles: bool = False) -> Iterator[MoleculeRecord | str]:
"""Function to produce a stream of molecules from the input files
Expand Down Expand Up @@ -100,51 +102,66 @@ def iterate_over_search_space(self, only_smiles: bool = False) -> Iterator[Molec
elif is_json:
yield MoleculeRecord.parse_raw(line)
else:
yield MoleculeRecord.from_identifier(line.strip())
try:
yield MoleculeRecord.from_identifier(line.strip())
except ValidationError:
self.logger.warning(f'Parsing failed for molecule: {line}')
else:
raise ValueError(f'File type is unrecognized for {path}')

def _write_result(self, result: Result, result_type: str):
with (self.run_dir / f'{result_type}-results.json').open('a') as fp:
print(result.json(exclude={'value', 'inputs'}), file=fp)

def task_iterator(self) -> Iterator[tuple[MoleculeRecord, SimulationRequest]]:
"""Iterate over the next tasks in the task queue"""
def _get_next_tasks(self) -> tuple[MoleculeRecord, float, Iterable[PropertyRecipe]]:
"""Get the next task from the task queue
Assumes that the task queue is locked and there are tasks in the queue
"""
# Return the next one off the list
smiles, score = self.task_queue.pop(0) # Get the next task
return self.database.get_or_make_record(smiles), score, self.recipes

def task_iterator(self) -> Iterator[tuple[MoleculeRecord, Iterable[PropertyRecipe], SimulationRequest]]:
"""Iterate over the next tasks in the task queue
Yields:
- Molecule being processed
- Recipes being computed
- Simulation to execute
"""

while True:
# Get the next molecule to run
# Get the next task to run
with self.task_queue_lock:
if len(self.task_queue) == 0:
self.logger.info('No tasks available to run. Waiting')
while not self.task_queue_lock.wait(timeout=2):
if self.done.is_set():
yield None, None
smiles, score = self.task_queue.pop(0) # Get the next task
self.logger.info(f'Selected {smiles} to run next. Score={score:.2f}, queue length={len(self.task_queue)}')

# Get the molecule record
record = MoleculeRecord.from_identifier(smiles)
if record.key in self.database:
record = self.database[record.key]
else:
self.database.update_record(record)
yield None, None, None
record, score, recipes = self._get_next_tasks()
self.logger.info(f'Selected {record.key} to run next. Score={score:.2f}, queue length={len(self.task_queue)}')

# Determine which computations to run next
try:
suggestions = set()
for recipe in self.recipes:
for recipe in recipes:
suggestions = set(recipe.suggest_computations(record))
except ValueError as exc:
self.logger.warning(f'Generating computations for {smiles} failed. Skipping. Reason: {exc}')
self.logger.warning(f'Generating computations for {record.key} failed. Skipping. Reason: {exc}')
continue
self.logger.info(f'Found {len(suggestions)} more computations to do for {smiles}')
self.logger.info(f'Found {len(suggestions)} more computations to do for {record.key}')
self.molecules_in_progress[record.key] += len(suggestions) # Update the number of computations in progress for this molecule

for suggestion in suggestions:
yield record, suggestion
yield record, recipes, suggestion

def _simulations_complete(self, record: MoleculeRecord):
"""This function is called when all ongoing computations for a molecule have finished
def _simulations_complete(self):
"""This function is called when all ongoing computations for a molecule have finished"""
Args:
record: Record for the molecule which had completed
"""
pass

@result_processor(topic='simulation')
Expand Down Expand Up @@ -175,8 +192,14 @@ def store_simulation(self, result: Result):
else:
raise NotImplementedError()

# Assemble the recipes being complete
recipes = [
self.recipe_types[r['name']].from_name(**r) for r in result.task_info['recipes']
]
self.logger.info(f'Checking if we have completed recipes: {", ".join([r.name + "//" + r.level for r in recipes])}')

# If we can compute then property than we are done
not_done = sum(recipe.lookup(record, recompute=True) is None for recipe in self.recipes) # TODO (wardlt): Keep track of recipe being computed
not_done = sum(recipe.lookup(record, recompute=True) is None for recipe in recipes)
if not_done == 0:
# If so, mark that we have finished computing the property
self.completed += 1
Expand All @@ -189,7 +212,7 @@ def store_simulation(self, result: Result):
# Mark that we've finished with all recipes
result.task_info['status'] = 'finished'
result.task_info['result'] = [recipe.lookup(record) for recipe in self.recipes]
self._simulations_complete()
self._simulations_complete(record)
else:
# If not, see if we need to resubmit to finish the computation
self.logger.info(f'Finished {len(self.recipes) - not_done}/{len(self.recipes)} recipes for {mol_key}')
Expand All @@ -211,24 +234,27 @@ def store_simulation(self, result: Result):
# Remove molecule from the list of those in progress if no other computations remain
if self.molecules_in_progress[mol_key] == 0:
self.molecules_in_progress.pop(mol_key)
self._simulations_complete()
self._simulations_complete(record)

self._write_result(result, 'simulation')

@task_submitter()
def submit_simulation(self):
"""Submit a simulation task when resources are available"""
record, suggestion = next(self.task_iterator)
record, recipes, suggestion = next(self.task_iterator)
if record is None:
return # The thinker is done

task_info = {'key': record.key,
'recipes': [{'name': r.name, 'level': r.level} for r in recipes],
'computation': asdict(suggestion)}
if suggestion.optimize:
self.logger.info(f'Optimizing structure for {record.key} with a charge of {suggestion.charge}')
self.queues.send_inputs(
record.key, suggestion.xyz, suggestion.config_name, suggestion.charge, suggestion.solvent,
method='optimize_structure',
topic='simulation',
task_info={'key': record.key, **asdict(suggestion)}
task_info=task_info
)
else:
self.logger.info(f'Getting single-point energy for {record.key} with a charge of {suggestion.charge} ' +
Expand All @@ -237,5 +263,5 @@ def submit_simulation(self):
record.key, suggestion.xyz, suggestion.config_name, suggestion.charge, suggestion.solvent,
method='compute_energy',
topic='simulation',
task_info={'key': record.key, **asdict(suggestion)}
task_info=task_info
)
2 changes: 1 addition & 1 deletion examol/steer/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def startup(self):
self.task_queue.append((key, np.nan)) # All get the same score
self.task_queue_lock.notify_all()

def _simulations_complete(self):
def _simulations_complete(self, record):
if len(self.task_queue) == 0:
self.logger.info('Run out of molecules to run. Exiting')
self.done.set()
Loading

0 comments on commit 31df55c

Please sign in to comment.