diff --git a/colmena/models/__init__.py b/colmena/models/__init__.py index e69de29..c462121 100644 --- a/colmena/models/__init__.py +++ b/colmena/models/__init__.py @@ -0,0 +1,3 @@ +from colmena.models.results import Result, ResourceRequirements, FailureInformation, SerializationMethod + +__all__ = ['Result', 'ResourceRequirements', 'FailureInformation', 'ExecutableTask', 'SerializationMethod'] diff --git a/colmena/models/results.py b/colmena/models/results.py index e69de29..d694a96 100644 --- a/colmena/models/results.py +++ b/colmena/models/results.py @@ -0,0 +1,488 @@ +"""Data models for requests for and results from compuations""" +import json +import logging +import sys +from math import nan +import pickle as pkl +from datetime import datetime +from enum import Enum +from functools import partial +from time import perf_counter +from traceback import TracebackException +from typing import Any, Tuple, Dict, Optional, Union, List, Sequence +from uuid import uuid4 + +from pydantic import BaseModel, Field, Extra +from proxystore.proxy import Proxy + +from colmena.proxy import get_store, store_proxy_stats +from colmena.proxy import proxy_json_encoder + +logger = logging.getLogger(__name__) + + +class SerializationMethod(str, Enum): + """Serialization options""" + + JSON = "json" # Serialize using JSONf + PICKLE = "pickle" # Pickle serialization + + @staticmethod + def serialize(method: 'SerializationMethod', data: Any) -> str: + """Serialize an object using a specified method + + Args: + method: Method used to serialize the object + data: Object to be serialized + Returns: + Serialized data + """ + + if method == "json": + return json.dumps(data) + elif method == "pickle": + return pkl.dumps(data).hex() + else: + raise NotImplementedError(f'Method {method} not yet implemented') + + @staticmethod + def deserialize(method: 'SerializationMethod', message: str) -> Any: + """Deserialize an object + + Args: + method: Method used to serialize + message: Message to deserialize + Returns: + Result object + """ + + if method == "json": + return json.loads(message) + elif method == "pickle": + return pkl.loads(bytes.fromhex(message)) + else: + raise NotImplementedError(f'Method {method} not yet implemented') + + +def _serialized_str_to_bytes_shim( + s: str, + method: Union[str, SerializationMethod], +) -> bytes: + """Shim between Colmena serialized objects and bytes. + + Colmena's serialization mechanisms produce strings but ProxyStore + serializes to bytes, so this shim takes an object serialized by Colmena + and converts it to bytes. + + Args: + s: Serialized string object + method: Serialization method used to produce s + + Returns: + bytes representation of s + """ + if method == "json": + return s.encode('utf-8') + elif method == "pickle": + # In this case the conversion goes from obj > bytes > str > bytes + # which results in an unnecessary conversion to a string but this is + # an unavoidable side effect of converting between the Colmena + # and ProxyStore serialization formats. + return bytes.fromhex(s) + else: + raise NotImplementedError(f'Method {method} not yet implemented') + + +def _serialized_bytes_to_obj_wrapper( + b: str, + method: Union[str, SerializationMethod], +) -> Any: + """Wrapper which converts bytes to strings before deserializing. + + Args: + b: Byte string of serialized object + method: Serialization method used to produce b + + Returns: + Deserialized object + """ + if method == "json": + s = b.decode('utf-8') + elif method == "pickle": + # In this case the conversion goes from bytes > str > bytes > obj + # which results in an unecessary conversion to a string but this is + # an unavoidable side effect of converting between the Colmena + # and ProxyStore serialization formats. + s = b.hex() + else: + raise NotImplementedError(f'Method {method} not yet implemented') + + return SerializationMethod.deserialize(method, s) + + +class FailureInformation(BaseModel): + """Stores information about a task failure""" + + exception: str = Field(..., description="The exception returned by the failed task") + traceback: Optional[str] = Field(None, description="Full stack trace for exception, if available") + + @classmethod + def from_exception(cls, exc: BaseException) -> 'FailureInformation': + tb = TracebackException.from_exception(exc) + return cls(exception=repr(exc), traceback="".join(tb.format())) + + +class WorkerInformation(BaseModel, extra=Extra.allow): + """Information about the worker that executed this task""" + + hostname: Optional[str] = Field(None, description='Hostname of the worker who executed this task') + + +class ResourceRequirements(BaseModel): + """Resource requirements for tasks. Used by some Colmena backends to allocate resources to the task + + Follows the naming conventions of + `RADICAL-Pilot `_. + """ + + # Defining how we use CPU resources + node_count: int = Field(1, description='Total number of nodes to use for the task') + cpu_processes: int = Field(1, description='Total number of MPI ranks per node') + cpu_threads: int = Field(1, description='Number of threads per process') + + @property + def total_ranks(self) -> int: + """Total number of MPI ranks""" + return self.node_count * self.cpu_processes + + +class Timestamps(BaseModel): + """A class which records the system times at which key events in a task occurred + + All should be in UTC. + """ + + created: float = Field(description="Time this value object was created", + default_factory=lambda: datetime.now().timestamp()) + input_received: float = Field(nan, description="Time the inputs was received by the task server") + compute_started: float = Field(nan, description="Time workflow process began executing a task") + compute_ended: float = Field(nan, description="Time workflow process finished executing a task") + result_sent: float = Field(nan, description="Time message was sent from the server") + result_received: float = Field(nan, description="Time value was received by client") + start_task_submission: float = Field(nan, description="Time marking the start of the task submission to workflow engine") + task_received: float = Field(nan, description="Time task result received from workflow engine") + + +class TimeSpans(BaseModel): + """Amount of time elapsed between major events + + All are recorded in seconds + """ + + running: float = Field(nan, description="Runtime of the method, if available") + serialize_inputs: float = Field(nan, description="Time required to serialize inputs on client") + deserialize_inputs: float = Field(nan, description="Time required to deserialize inputs on worker") + serialize_results: float = Field(nan, description="Time required to serialize results on worker") + deserialize_results: float = Field(nan, description="Time required to deserialize results on client") + async_resolve_proxies: float = Field(nan, description="Time required to start async resolves of proxies") + proxy: Dict[str, Dict[str, dict]] = Field(default_factory=dict, + description='Timings related to resolving ProxyStore proxies on the compute worker') + + additional: Dict[str, float] = Field(default_factory=dict, + description="Additional timings reported by a task server") + + +class Result(BaseModel): + """A class which describes the inputs and results of the calculations evaluated by the MethodServer + + Each instance of this class stores the inputs and outputs to the function along with some tracking + information allowing for performance analysis (e.g., time submitted to Queue, time received by client). + All times are listed as Unix timestamps. + + The Result class also handles serialization of the data to be transmitted over a RedisQueue + """ + + # Core result information + task_id: str = Field(default_factory=lambda: str(uuid4()), description='Unique identifier for each task') + inputs: Union[Tuple[Tuple[Any, ...], Dict[str, Any]], str] = \ + Field(None, description="Input to a function. Positional and keyword arguments. The `str` data type " + "is for internal use and is used when communicating serialized objects.") + value: Any = Field(None, description="Output of a function") + method: Optional[str] = Field(None, description="Name of the method to run.") + success: Optional[bool] = Field(None, description="Whether the task completed successfully") + + # Store task information + task_info: Optional[Dict[str, Any]] = Field(default_factory=dict, + description="Task tracking information to be transmitted " + "along with inputs and results. User provided") + resources: ResourceRequirements = Field(default_factory=ResourceRequirements, help='List of the resources required for a task, if desired') + failure_info: Optional[FailureInformation] = Field(None, description="Messages about task failure. Provided by Task Server") + worker_info: Optional[WorkerInformation] = Field(None, description="Information about the worker which executed a task. Provided by Task Server") + message_sizes: Dict[str, int] = Field(default_factory=dict, description='Sizes of the inputs and results in bytes') + + # Timings + timestamp: Timestamps = Field(default_factory=Timestamps, help='Times at which major events occurred') + time: TimeSpans = Field(default_factory=TimeSpans, help='Elapsed time between major events') + + # Serialization options + serialization_method: SerializationMethod = Field(SerializationMethod.JSON, + description="Method used to serialize input data") + keep_inputs: bool = Field(True, description="Whether to keep the inputs with the result object or delete " + "them after the method has completed") + proxystore_name: Optional[str] = Field(None, description="Name of ProxyStore backend you use for transferring large objects") + proxystore_config: Optional[Dict] = Field(None, description="ProxyStore backend configuration") + proxystore_threshold: Optional[int] = Field(None, + description="Proxy all input/output objects larger than this threshold in bytes") + + def __init__(self, inputs: Tuple[Tuple[Any], Dict[str, Any]], **kwargs): + """ + Args: + inputs (Any, Dict): Inputs to a function. Separated into positional and keyword arguments + """ + super().__init__(inputs=inputs, **kwargs) + + @classmethod + def from_args_and_kwargs(cls, fn_args: Sequence[Any], fn_kwargs: Dict[str, Any] = None, **kwargs): + """Create a result object form a the arguments and kwargs for the function + + Keyword arguments to this function are passed to the initializer for `Result`. + + Args: + fn_args: Positional arguments to the function + fn_kwargs: Keyword arguments to the function + Returns: + Result object with the results object + """ + return cls(inputs=(tuple(fn_args), fn_kwargs or dict()), **kwargs) + + @property + def args(self) -> Tuple[Any]: + return tuple(self.inputs[0]) + + @property + def kwargs(self) -> Dict[str, Any]: + return self.inputs[1] + + def json(self, **kwargs: Dict[str, Any]) -> str: + """Override json encoder to use a custom encoder with proxy support""" + if 'exclude' in kwargs: + # Make a shallow copy of the user passed excludes + user_exclude = kwargs['exclude'].copy() + if isinstance(kwargs['exclude'], dict): + kwargs['exclude'].update({'inputs': True, 'value': True}) + if isinstance(kwargs['exclude'], set): + kwargs['exclude'].update({'inputs', 'value'}) + else: + raise ValueError( + f'Unsupported type {type(kwargs["exclude"])} for argument "exclude". Expected set or dict') + else: + user_exclude = set() + kwargs['exclude'] = {'inputs', 'value'} + + # Use pydantic's encoding for everything except `inputs` and `values` + data = super().dict(**kwargs) + + # Add inputs/values back to data unless the user excluded them + if isinstance(user_exclude, set): + if 'inputs' not in user_exclude: + data['inputs'] = self.inputs + if 'value' not in user_exclude: + data['value'] = self.value + elif isinstance(user_exclude, dict): + if not user_exclude['inputs']: + data['inputs'] = self.inputs + if not user_exclude['value']: + data['value'] = self.value + + # Jsonify with custom proxy encoder + return json.dumps(data, default=proxy_json_encoder) + + def mark_result_received(self): + """Mark that a completed computation was received by a client""" + self.timestamp.result_received = datetime.now().timestamp() + + def mark_input_received(self): + """Mark that a task server has received a value""" + self.timestamp.input_received = datetime.now().timestamp() + + def mark_compute_started(self): + """Mark that the compute for a method has started""" + self.timestamp.compute_started = datetime.now().timestamp() + + def mark_result_sent(self): + """Mark when a result is sent from the task server""" + self.timestamp.result_sent = datetime.now().timestamp() + + def mark_start_task_submission(self): + """Mark when the Task Server submits a task to the engine""" + self.timestamp.start_task_submission = datetime.now().timestamp() + + def mark_task_received(self): + """Mark when the Task Server receives the task from the engine""" + self.timestamp.task_received = datetime.now().timestamp() + + def mark_compute_ended(self): + """Mark when the task finished executing""" + self.timestamp.compute_ended = datetime.now().timestamp() + + def set_result(self, result: Any, runtime: float = nan): + """Set the value of this computation + + Automatically sets the "time_result_completed" field and, if known, defines the runtime. + + Will delete the inputs to the function if the user specifies ``self.return_inputs == False``. + Removing the inputs once the result is known can save communication time + + Args: + result: Result to be stored + runtime (float): Runtime for the function + """ + self.value = result + if not self.keep_inputs: + self.inputs = ((), {}) + self.time.running = runtime + self.success = True + + def serialize(self) -> Tuple[float, List[Proxy]]: + """Stores the input and value fields as a pickled objects + + Returns: + - (float) Time to serialize + - List of any proxies that were created + """ + start_time = perf_counter() + _value = self.value + _inputs = self.inputs + proxies = [] + + if self.proxystore_name is not None: + store = get_store(name=self.proxystore_name, config=self.proxystore_config) + else: + store = None + + def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]: + """Helper function for serializing and proxying + + Args: + value: Value to be serialized + evict: Whether to evict from proxy store on reading + Returns: + - Serialized representation, which is either the object or a proxy to it + - Size of the serialized object (not the size of the proxy) + """ + # Serialized object before proxying to compare size of serialized + # object to value server threshold. Using sys.getsizeof would be + # faster but sys.getsizeof does not account for the memory + # consumption of objects that value refers to + value_str = SerializationMethod.serialize( + self.serialization_method, value + ) + value_size = sys.getsizeof(value_str) + + if ( + store is not None and + self.proxystore_threshold is not None and + not isinstance(value, Proxy) and + value_size >= self.proxystore_threshold + ): + # Override ProxyStore's default serialization with these shims + # to Colmena's serialization mechanisms. This avoids value + # being serialized twice: once to get the size of the + # serialized object and once by proxy(). + deserializer = partial( + _serialized_bytes_to_obj_wrapper, + method=self.serialization_method, + ) + serializer = partial( + _serialized_str_to_bytes_shim, + method=self.serialization_method, + ) + + value_proxy = store.proxy( + value_str, + evict=evict, + deserializer=deserializer, + serializer=serializer, + ) + logger.debug(f'Proxied object of type {type(value)} with id={id(value)}') + proxies.append(value_proxy) + + # Update the statistics + store_proxy_stats(value_proxy, self.time.proxy) + + # Serialize the proxy with Colmena's utilities. This is + # efficient since the proxy is just a reference and metadata + value_str = SerializationMethod.serialize( + self.serialization_method, value_proxy + ) + + return value_str, value_size + + try: + # Each value in *args and **kwargs is serialized independently + if len(_inputs[0]) > 0: + args, args_sizes = zip(*map(_serialize_and_proxy, _inputs[0])) + else: + args = args_sizes = [] + kwargs = {} + kwarg_sizes = [] + for k, v in _inputs[1].items(): + _kwarg_str, _kwarg_size = _serialize_and_proxy(v) + kwargs[k] = _kwarg_str + kwarg_sizes.append(_kwarg_size) + self.inputs = (args, kwargs) + + # Store the size of the input if not already there + if 'inputs' not in self.message_sizes: + self.message_sizes['inputs'] = sum(args_sizes) + sum(kwarg_sizes) + + # The entire result is serialized as one object. Pass evict=True + # so the value is evicted from the value server once it is resolved + # by the thinker. + if _value is not None: + self.value, size = _serialize_and_proxy(_value, evict=True) + if 'value' not in self.message_sizes: + self.message_sizes['value'] = size + + return perf_counter() - start_time, proxies + except Exception as e: + # Put the original values back + self.inputs = _inputs + self.value = _value + raise e + + def deserialize(self) -> float: + """De-serialize the input and value fields + + Returns: + (float) The time required to deserialize + """ + # Check that the data is actually a string + start_time = perf_counter() + _value = self.value + _inputs = self.inputs + + def _deserialize(value): + if not isinstance(value, str): + return value + return SerializationMethod.deserialize(self.serialization_method, value) + + if isinstance(_inputs, str): + _inputs = SerializationMethod.deserialize(self.serialization_method, _inputs) + + try: + # Deserialize each value in *args and **kwargs + args = tuple(map(_deserialize, _inputs[0])) + kwargs = {k: _deserialize(v) for k, v in _inputs[1].items()} + self.inputs = (args, kwargs) + + # Deserialize result if it exists + if _value is not None: + self.value = _deserialize(_value) + + return perf_counter() - start_time + except Exception as e: + # Put the original values back + self.inputs = _inputs + self.value = _value + raise e diff --git a/colmena/models/tasks.py b/colmena/models/tasks.py index 69467f2..716ac28 100644 --- a/colmena/models/tasks.py +++ b/colmena/models/tasks.py @@ -9,9 +9,9 @@ from tempfile import TemporaryDirectory from time import perf_counter from inspect import signature, isgeneratorfunction -from typing import Any, Dict, List, Tuple, Optional, Callable, Iterable +from typing import Any, Dict, List, Tuple, Optional, Callable, Generator -from .results import ResourceRequirements, Result, FailureInformation +from colmena.models.results import ResourceRequirements, Result, FailureInformation from colmena.proxy import resolve_proxies_async, store_proxy_stats from colmena.queue import ColmenaQueues @@ -33,11 +33,12 @@ def function(self, *args, **kwargs) -> Any: """Function provided by the Colmena user""" raise NotImplementedError() - def __call__(self, result: Result, queues: Optional[ColmenaQueues]) -> Result: + def __call__(self, result: Result, queues: Optional[ColmenaQueues] = None) -> Result: """Invoke a Colmena task request Args: result: Request, which inclues the arguments and will hold the result + queues: Queues used to send intermediate results back [Not Yet Used] Returns: The input result object, populated with the results """ @@ -100,7 +101,12 @@ def __call__(self, result: Result, queues: Optional[ColmenaQueues]) -> Result: class PythonTask(ColmenaTask): - """A Python function to be executed on a single worker of a larger workflow""" + """A Python function to be executed on a single worker + + Args: + function: Generator function to be executed + name: Name of the function. Defaults to `function.__name__` + """ function: Callable @@ -112,21 +118,39 @@ def __init__(self, function: Callable, name: Optional[str] = None) -> None: class PythonGeneratorTask(ColmenaTask): - """Python task which generates intermediate results""" + """Python task which runs on a single worker and generates results iteratively - def __init__(self, function: Callable[..., Iterable], name: Optional[str] = None) -> None: + Args: + function: Generator function to be executed + name: Name of the function. Defaults to `function.__name__` + store_return_value: Whether to capture the `return value `_ + of the generator and store it in the Result object. + """ + + def __init__(self, + function: Callable[..., Generator], + name: Optional[str] = None, + store_return_value: bool = False) -> None: if not isgeneratorfunction(function): raise ValueError('Function is not a generator function. Use `PythonTask` instead.') self._function = function self.name = name or function.__name__ + self.store_return_value = store_return_value def function(self, *args, **kwargs) -> Any: """Run the Colmena task and collect intermediate results to provide as a list""" # TODO (wardlt): Have the function push intemediate results back to a function queue - return [ - result for result in self._function(*args, **kwargs) - ] + gen = self._function(*args, **kwargs) + iter_results = [] + while True: + try: + iter_results.append(next(gen)) + except StopIteration as e: + if self.store_return_value: + return iter_results, e.value + else: + return iter_results class ExecutableTask(ColmenaTask): @@ -160,6 +184,12 @@ class ExecutableTask(ColmenaTask): The attributes of this class (e.g., ``node_count``, ``total_ranks``) will be used as arguments to `format`. For example, a template of ``aprun -N {total_ranks} -n {cpu_process}`` will produce ``aprun -N 6 -n 3`` if you specify ``node_count=2`` and ``cpu_processes=3``. + + Args: + executable: List of executable arguments + name: Name used for the task. Defaults to ``executable[0]`` + mpi: Whether to use MPI to launch the exectuable + mpi_command_string: Template for MPI launcher. See :attr:`mpi_command_string`. """ executable: List[str] @@ -173,9 +203,13 @@ class ExecutableTask(ColmenaTask): Should include placeholders named after the fields in ResourceRequirements marked using {}'s. Example: `mpirun -np {total_ranks}`""" - @property - def __name__(self): - return self.__class__.__name__.lower() + def __init__(self, executable: List[str], name: Optional[str] = None, + mpi: bool = False, mpi_command_string: Optional[str] = None) -> None: + super().__init__() + self.name = name or executable[0] + self.executable = executable + self.mpi = mpi + self.mpi_command_string = mpi_command_string def render_mpi_launch(self, resources: ResourceRequirements) -> str: """Create an MPI launch command given the configuration diff --git a/colmena/queue/__init__.py b/colmena/queue/__init__.py index c3b2a4a..c94f5a4 100644 --- a/colmena/queue/__init__.py +++ b/colmena/queue/__init__.py @@ -1,4 +1,4 @@ """Implementations of the task and result queues from Colmena""" -from .base import ColmenaQueues # noqa: 401 -from .python import PipeQueues # noqa: 401 +from colmena.queue.base import ColmenaQueues # noqa: 401 +from colmena.queue.python import PipeQueues # noqa: 401 diff --git a/colmena/task_server/parsl.py b/colmena/task_server/parsl.py index 69346f4..06d299d 100644 --- a/colmena/task_server/parsl.py +++ b/colmena/task_server/parsl.py @@ -17,9 +17,10 @@ from parsl.app.bash import BashApp from parsl.config import Config from parsl.app.python import PythonApp +from colmena.models.tasks import ExecutableTask from colmena.queue.base import ColmenaQueues -from colmena.models import Result, ExecutableTask, FailureInformation, ResourceRequirements +from colmena.models import Result, FailureInformation, ResourceRequirements from colmena.proxy import resolve_proxies_async from colmena.task_server.base import run_and_record_timing, FutureBasedTaskServer diff --git a/colmena/task_server/tests/test_base.py b/colmena/task_server/tests/test_base.py index 66526ab..16bd900 100644 --- a/colmena/task_server/tests/test_base.py +++ b/colmena/task_server/tests/test_base.py @@ -7,7 +7,8 @@ from proxystore.store import unregister_store from pytest import fixture -from colmena.models import Result, ExecutableTask, SerializationMethod +from colmena.models import Result, SerializationMethod +from colmena.models.tasks import ExecutableTask from colmena.task_server.base import run_and_record_timing diff --git a/colmena/tests/test_model.py b/colmena/tests/test_model.py index bff90d1..5d0c11e 100644 --- a/colmena/tests/test_model.py +++ b/colmena/tests/test_model.py @@ -1,20 +1,7 @@ """Tests for the data models""" import sys -from typing import Any, Tuple, Dict, List, Optional -from pathlib import Path -from colmena.models import ResourceRequirements, ExecutableTask, Result - - -class EchoTask(ExecutableTask): - def __init__(self): - super().__init__(executable=['echo']) - - def preprocess(self, run_dir: Path, args: Tuple[Any], kwargs: Dict[str, Any]) -> Tuple[List[str], Optional[str]]: - return list(map(str, args)), None - - def postprocess(self, run_dir: Path) -> Any: - return (run_dir / 'colmena.stdout').read_text() +from colmena.models import ResourceRequirements, Result def test_resources(): @@ -41,18 +28,3 @@ def test_message_sizes(): result.serialize() assert result.message_sizes['inputs'] >= 2 * sys.getsizeof('0' * 8) assert result.message_sizes['inputs'] >= sys.getsizeof(1) - - -def test_executable_task(): - # Run a basic tak - task = EchoTask() - assert task.executable == ['echo'] - assert task(1) == '1\n' - - # Run an "MPI task" - task.mpi = True - task.mpi_command_string = 'aprun -N {total_ranks} -n {cpu_processes} --cc depth' - assert task.render_mpi_launch(ResourceRequirements(node_count=2, cpu_processes=4)) == 'aprun -N 8 -n 4 --cc depth' - - task.mpi_command_string = 'echo -N {total_ranks} -n {cpu_processes} --cc depth' - assert task(1, _resources=ResourceRequirements(node_count=2, cpu_processes=3)) == '-N 6 -n 3 --cc depth echo 1\n'