Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements simple k8s API retries #102

Merged
merged 13 commits into from
Oct 14, 2019
Merged
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,22 @@ To view open issues related to conformance, see the [conformance](https://github
## Setup

Please see [examples](examples) for installation and setup instructions.

## Environment Variables

Calrissian's behaviors can be customized by setting the following environment variables in the container specification.

### Pod lifecycle

By default, pods for a job step will be deleted after termination

- `CALRISSIAN_DELETE_PODS`: Default `true`. If `false`, job step pods will not be deleted.

### Kubernetes API retries

When encountering a Kubernetes API exception, Calrissian uses a library to retry API calls with an exponential backoff. See the [tenacity documentation](https://tenacity.readthedocs.io/en/latest/index.html#waiting-before-retrying) for details.

- `RETRY_MULTIPLIER`: Default `5`. Unit for multiplying the exponent interval.
- `RETRY_MIN`: Default `5`. Minimum interval between retries.
- `RETRY_MAX`: Default `1200`. Maximum interval between retries.
- `RETRY_ATTEMPTS`: Default `10`. Max number of retries before giving up.
16 changes: 15 additions & 1 deletion calrissian/k8s.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from kubernetes import client, config, watch
from kubernetes.client.api_client import ApiException
from kubernetes.config.config_exception import ConfigException
from calrissian.retry import retry_exponential_if_exception_type
import threading
import logging
import os
from urllib3.exceptions import HTTPError

log = logging.getLogger('calrissian.k8s')

Expand Down Expand Up @@ -70,6 +72,7 @@ def __init__(self):
self.namespace = load_config_get_namespace()
self.core_api_instance = client.CoreV1Api()

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def submit_pod(self, pod_body):
with PodMonitor() as monitor:
pod = self.core_api_instance.create_namespaced_pod(self.namespace, pod_body)
Expand All @@ -89,11 +92,17 @@ def should_delete_pod(self):
else:
return True

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def delete_pod_name(self, pod_name):
try:
self.core_api_instance.delete_namespaced_pod(pod_name, self.namespace)
dleehr marked this conversation as resolved.
Show resolved Hide resolved
except ApiException as e:
raise CalrissianJobException('Error deleting pod named {}'.format(pod_name), e)
if e.status == 404:
# pod was not found - already deleted, so do not retry
pass
else:
# Re-raise
raise

def _handle_completion(self, state, container):
"""
Expand All @@ -117,6 +126,7 @@ def _handle_completion(self, state, container):
)
log.info('handling completion with {}'.format(exit_code))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def follow_logs(self):
pod_name = self.pod.metadata.name
log.info('[{}] follow_logs start'.format(pod_name))
Expand All @@ -131,6 +141,7 @@ def follow_logs(self):
log.debug('[{}] {}'.format(pod_name, line))
log.info('[{}] follow_logs end'.format(pod_name))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def wait_for_completion(self):
w = watch.Watch()
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
Expand Down Expand Up @@ -212,6 +223,7 @@ def _extract_start_finish_times(self, state):
"""
return (state.terminated.started_at, state.terminated.finished_at,)

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def get_pod_for_name(self, pod_name):
"""
Given a pod name return details about this pod
Expand Down Expand Up @@ -274,6 +286,7 @@ def remove(self, pod):

@staticmethod
def cleanup():
log.info('Starting Cleanup')
with PodMonitor() as monitor:
k8s_client = KubernetesClient()
for pod_name in PodMonitor.pod_names:
Expand All @@ -283,6 +296,7 @@ def cleanup():
except Exception:
log.error('Error deleting pod named {}, ignoring'.format(pod_name))
PodMonitor.pod_names = []
log.info('Finishing Cleanup')


def delete_pods():
Expand Down
24 changes: 24 additions & 0 deletions calrissian/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from tenacity import retry, wait_exponential, retry_if_exception_type, stop_after_attempt, before_sleep_log
import logging
import os


class RetryParameters(object):
MULTIPLIER = float(os.getenv('RETRY_MULTIPLIER', 5)) # Unit for multiplying the exponent
MIN = float(os.getenv('RETRY_MIN', 5)) # Min time for retrying
MAX = float(os.getenv('RETRY_MAX', 1200)) # Max interval between retries
ATTEMPTS = int(os.getenv('RETRY_ATTEMPTS', 10)) # Max number of retries before giving up
dleehr marked this conversation as resolved.
Show resolved Hide resolved


def retry_exponential_if_exception_type(exc_type, logger):
"""
Decorator function that returns the tenacity @retry decorator with our commonly-used config
:param exc_type: Type of exception (or tuple of types) to retry if encountered
:param logger: A logger instance to send retry logs to
:return: Result of tenacity.retry decorator function
"""
return retry(retry=retry_if_exception_type(exc_type),
wait=wait_exponential(multiplier=RetryParameters.MULTIPLIER, min=RetryParameters.MIN, max=RetryParameters.MAX),
stop=stop_after_attempt(RetryParameters.ATTEMPTS),
before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ scandir==1.10.0
schema-salad==4.5.20190621200723
shellescape==3.4.1
six==1.12.0
tenacity==5.1.1
typing-extensions==3.7.4
urllib3==1.24.3
websocket-client==0.56.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def run(self):
'urllib3<1.25,>=1.24.2',
'kubernetes==10.0.1',
'cwltool==1.0.20190621234233',
'tenacity==5.1.1',
],
test_suite='nose2.collector.collector',
tests_require=['nose2'],
Expand Down
11 changes: 6 additions & 5 deletions tests/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,11 @@ def test_delete_pod_name_calls_api(self, mock_get_namespace, mock_client):
kc.delete_pod_name('pod-123')
self.assertEqual('pod-123', mock_client.CoreV1Api.return_value.delete_namespaced_pod.call_args[0][0])

def test_delete_pod_name_raises(self, mock_get_namespace, mock_client):
mock_client.rest.ApiException = Exception
mock_client.CoreV1Api.return_value.delete_namespaced_pod.side_effect = ApiException
def test_delete_pod_name_ignores_404(self, mock_get_namespace, mock_client):
mock_client.CoreV1Api.return_value.delete_namespaced_pod.side_effect = ApiException(status=404)
kc = KubernetesClient()
with self.assertRaisesRegex(CalrissianJobException, 'Error deleting pod named pod-123'):
kc.delete_pod_name('pod-123')
kc.delete_pod_name('pod-123')
self.assertEqual('pod-123', mock_client.CoreV1Api.return_value.delete_namespaced_pod.call_args[0][0])

@patch('calrissian.k8s.log')
def test_follow_logs_streams_to_logging(self, mock_log, mock_get_namespace, mock_client):
Expand Down Expand Up @@ -370,7 +369,9 @@ def test_remove_after_cleanup(self, mock_log, mock_client):
monitor.remove(pod)
mock_log.info.assert_has_calls([
call('PodMonitor adding pod-123'),
call('Starting Cleanup'),
call('PodMonitor deleting pod pod-123'),
call('Finishing Cleanup'),
])
mock_log.warning.assert_called_with('PodMonitor pod-123 has already been removed')

Expand Down
72 changes: 72 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from calrissian.retry import retry_exponential_if_exception_type
from unittest import TestCase
from unittest.mock import Mock, patch


class RetryTestCase(TestCase):
def setUp(self):
self.logger = Mock()
self.mock = Mock()

def setup_mock_retry_parameters(self, mock_retry_parameters):
mock_retry_parameters.MULTIPLIER = 0.001
mock_retry_parameters.MIN = 0.001
mock_retry_parameters.MAX = 0.010
mock_retry_parameters.ATTEMPTS = 5

def test_retry_calls_wrapped_function(self):
@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
return self.mock()

result = func()
self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 1)

@patch('calrissian.retry.RetryParameters')
def test_retry_gives_up_and_raises(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)
self.mock.side_effect = ValueError('value error')

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ValueError, 'value error'):
func()

self.assertEqual(self.mock.call_count, 5)

@patch('calrissian.retry.RetryParameters')
def test_retry_eventually_succeeds_without_exception(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
r = self.mock()
if self.mock.call_count < 3:
raise ValueError('value error')
return r

result = func()

self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 3)

@patch('calrissian.retry.RetryParameters')
def test_retry_raises_other_exceptions_without_second_attempt(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

class ExceptionA(Exception): pass
class ExceptionB(Exception): pass

self.mock.side_effect = ExceptionA('exception a')

@retry_exponential_if_exception_type(ExceptionB, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ExceptionA, 'exception a'):
func()

self.assertEqual(self.mock.call_count, 1)