Skip to content

Commit

Permalink
Merge branch 'master' into conformance-1.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
fabricebrito committed Nov 21, 2023
2 parents a92cdc8 + 4ec616d commit ad83931
Show file tree
Hide file tree
Showing 21 changed files with 424 additions and 145 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ jobs:
command: |
. venv/bin/activate
nose2
environment:
RETRY_ATTEMPTS: 1
deploy:
docker:
- image: circleci/python:3.7
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ tests/__pycache__/
build
*.egg
*.whl
*.zip
*.zip
36 changes: 35 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,45 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [v0.14.0] - 2023-06-19

### Added

- adds CWL GPU requirements supports (PR #153)

## [v0.13.0] - 2023-06-06

### Added

- adds an argument `--conf <conf_file_path>` that enables CLI arguments from a json file (PR #150)
- default configuration can be defined in `$HOME/.calrissian/default.json`

### Changed

- Updated cwltool to `3.1.20230201224320` and some other dependencies. (PR #142)

## [v0.12.0] - 2023-02-13

### Added

- adds an argument `--tool-logs-basepath <local_folder_path>` that enable the tool to fetch the pod logs by tool specified in the workflow (PR #139)
- returns proper exit code when the pod fails (PR #139)

### Changed

- contraints the pod to complete with a proper termination status or raise an exception. (PR #139)

## [v0.11.0] - 2022-11-10

### Added

- adds an argument `--pod-nodeselectors <yaml_file>` to add a node selector for computing pods
- adda `--pod_serviceaccount` arg to set pods serviceaacount

### Changed

- cwltool upgraded to current version 3.1 + all requirements
- Fixed faulty bytes in log stream (PR #137)

## [v0.10.0] - 2021-03-31

Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ Calrissian is designed to issue tasks in parallel if they are independent, and t

When running `calrissian`, you must provide a limit the the number of CPU cores (`--max-cores`) and RAM megabytes (`--max-ram`) to use concurrently. Calrissian will use CWL [ResourceRequirements](https://www.commonwl.org/v1.0/CommandLineTool.html#ResourceRequirement) to track usage and stay within the limits provided. We highly recommend using accurate ResourceRequirements in your workloads, so that they can be scheduled efficiently and are less likely to be terminated or refused by the cluster.

`calrissian` parameters can be provided via a JSON configuration file either stored under `~/.calrissian/default.json` or provided via the `--conf` option.

Below an example of such a file:

```json
{
"max_ram": "16G",
"max_cores": "10",
"outdir": "/calrissian",
"tmpdir_prefix": "/calrissian/tmp"
}
```

## CWL Conformance

Calrissian leverages [cwltool](https://github.com/common-workflow-language/cwltool) heavily and most conformance tests for CWL v1.0. Please see [conformance](conformance) for further details and processes.
Expand Down
4 changes: 4 additions & 0 deletions calrissian/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ def __init__(self, kwargs=None):
# None and let super() handle the rest.
self.pod_labels = None
self.pod_env_vars = None
self.pod_nodeselectors = None
self.pod_serviceaccount = None
self.tool_logs_basepath = None
self.max_gpus = None
return super(CalrissianRuntimeContext, self).__init__(kwargs)
51 changes: 30 additions & 21 deletions calrissian/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,61 +20,70 @@ class InconsistentResourcesException(Exception):
pass


class IncompleteStatusException(Exception):
pass


class Resources(object):
"""
Class to encapsulate compute resources and provide arithmetic operations and comparisons
"""
RAM = 'ram'
CORES = 'cores'
GPUS = 'gpus'

def __init__(self, ram=0, cores=0):
def __init__(self, ram=0, cores=0, gpus=0):
self.ram = ram
self.cores = cores
self.gpus = gpus

def __sub__(self, other):
ram = self.ram - other.ram
cores = self.cores - other.cores
return Resources(ram, cores)
gpus = self.gpus - other.gpus
return Resources(ram, cores, gpus)

def __add__(self, other):
ram = self.ram + other.ram
cores = self.cores + other.cores
return Resources(ram, cores)
gpus = self.gpus + other.gpus
return Resources(ram, cores, gpus)

def __neg__(self):
return Resources(-self.ram, -self.cores)
return Resources(-self.ram, -self.cores, -self.gpus)

def __lt__(self, other):
return self.ram < other.ram and self.cores < other.cores
return self.ram < other.ram and self.cores < other.cores and self.gpus < other.gpus

def __gt__(self, other):
return self.ram > other.ram and self.cores > other.cores
return self.ram > other.ram and self.cores > other.cores and self.gpus > other.gpus

def __eq__(self, other):
return self.ram == other.ram and self.cores == other.cores
return self.ram == other.ram and self.cores == other.cores and self.gpus == other.gpus

def __ge__(self, other):
return self.ram >= other.ram and self.cores >= other.cores
return self.ram >= other.ram and self.cores >= other.cores and self.gpus >= other.gpus

def __le__(self, other):
return self.ram <= other.ram and self.cores <= other.cores
return self.ram <= other.ram and self.cores <= other.cores and self.gpus <= other.gpus

def __str__(self):
return '[ram: {}, cores: {}]'.format(self.ram, self.cores)
return '[ram: {}, cores: {}, gpus {}]'.format(self.ram, self.cores, self.gpus)

def is_negative(self):
return self.ram < 0 or self.cores < 0
return self.ram < 0 or self.cores < 0 or self.gpus < 0

def exceeds(self, other):
return self.ram > other.ram or self.cores > other.cores
return self.ram > other.ram or self.cores > other.cores or self.gpus > other.gpus

def to_dict(self):
return { Resources.CORES: self.cores,
Resources.RAM: self.ram }
Resources.RAM: self.ram,
Resources.GPUS: self.gpus }

@classmethod
def from_dict(cls, d):
return cls(d.get(cls.RAM, 0), d.get(cls.CORES, 0))
return cls(d.get(cls.RAM, 0), d.get(cls.CORES, 0), d.get(cls.GPUS, 0))

@classmethod
def from_job(cls, job):
Expand All @@ -85,10 +94,10 @@ def from_job(cls, job):

@classmethod
def min(cls, rsc1, rsc2):
return Resources(min(rsc1.ram, rsc2.ram), min(rsc1.cores, rsc2.cores))
return Resources(min(rsc1.ram, rsc2.ram), min(rsc1.cores, rsc2.cores), min(rsc1.gpus, rsc2.gpus))


Resources.EMPTY = Resources(0, 0)
Resources.EMPTY = Resources(0, 0, 0)


class JobResourceQueue(object):
Expand Down Expand Up @@ -160,7 +169,7 @@ class ThreadPoolJobExecutor(JobExecutor):
Relevant: https://github.com/common-workflow-language/cwltool/issues/888
"""

def __init__(self, total_ram, total_cores, max_workers=None):
def __init__(self, total_ram, total_cores, total_gpus=0, max_workers=None):
"""
Initialize a ThreadPoolJobExecutor
:param total_ram: RAM limit in megabytes for concurrent jobs
Expand All @@ -174,8 +183,8 @@ def __init__(self, total_ram, total_cores, max_workers=None):
self.max_workers = max_workers
self.jrq = JobResourceQueue()
self.exceptions = Queue()
self.total_resources = Resources(total_ram, total_cores)
self.available_resources = Resources(total_ram, total_cores) # start with entire pool available
self.total_resources = Resources(total_ram, total_cores, total_gpus)
self.available_resources = Resources(total_ram, total_cores, total_gpus) # start with entire pool available
self.resources_lock = threading.Lock()

def select_resources(self, request, runtime_context):
Expand All @@ -188,8 +197,8 @@ def select_resources(self, request, runtime_context):
:param runtime_context: RuntimeContext, unused
:return: dict of selected resources
"""
requested_min = Resources(request.get('ramMin'), request.get('coresMin'))
requested_max = Resources(request.get('ramMax'), request.get('coresMax'))
requested_min = Resources(request.get('ramMin'), request.get('coresMin'), request.get('cudaDeviceCountMin', 0))
requested_max = Resources(request.get('ramMax'), request.get('coresMax'), request.get('cudaDeviceCountMax', 0))

if requested_min.exceeds(self.total_resources):
raise WorkflowException('Requested minimum resources {} exceed total available {}'.format(
Expand Down
Loading

0 comments on commit ad83931

Please sign in to comment.