Skip to content

Commit

Permalink
Merge branch '4.x' into 5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
fvennetier committed Sep 16, 2019
2 parents 6bf74ce + fb5d1e1 commit 3c3e268
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 121 deletions.
6 changes: 6 additions & 0 deletions etc/event-handlers.conf-sample
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ queue_url = ${QUEUE_URL}
use = egg:oio#notify
tube = oio-repli
queue_url = ${QUEUE_URL}
# Exclude accounts and/or specific containers.
# Account and container names must be urlencoded.
# Values are separated with commas.
#exclude = account/container
#exclude = account1,account2,account3
#exclude = account1/container1,account2,account3/container1

[filter:noop]
use = egg:oio#noop
Expand Down
34 changes: 32 additions & 2 deletions oio/cli/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,38 @@ def take_action(self, parsed_args):
self.app.client_manager.conscience.flush(srv_type)
self.log.warn('%s services flushed', srv_type)
except Exception as err:
raise Exception('Error while flushing service %s: %s' %
(srv_type, str(err)))
raise Exception('Error while flushing %s service: %s' %
(srv_type, err))


class ClusterDeregister(command.Command):
"""Deregister specific services of the cluster."""

log = getLogger(__name__ + '.ClusterDeregister')

def get_parser(self, prog_name):
parser = super(ClusterDeregister, self).get_parser(prog_name)
parser.add_argument(
'srv_type',
help='Service type')
parser.add_argument(
'srv_ids',
metavar='<srv_ids>',
nargs='+',
help='IDs of the services.')
return parser

def take_action(self, parsed_args):
service_definitions = list()
for srv_id in parsed_args.srv_ids:
service_definitions.append(
self.app.client_manager.cluster.get_service_definition(
parsed_args.srv_type, srv_id))
try:
self.app.client_manager.cluster.deregister(service_definitions)
except Exception as err:
raise Exception('Error while deregistering %s services: %s' %
(parsed_args.srv_type, err))


class ClusterResolve(show.ShowOne):
Expand Down
5 changes: 2 additions & 3 deletions oio/conscience/agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -147,8 +147,7 @@ def register(self):
# Use a boolean so we can easily convert it to a number in conscience
self.service_definition['tags']['tag.up'] = self.status
try:
self.cs.register(self.service['type'], self.service_definition,
retries=False)
self.cs.register(self.service_definition, retries=False)
except OioException as rqe:
self.logger.warn("Failed to register service %s: %s",
self.service_definition["addr"], rqe)
Expand Down
20 changes: 18 additions & 2 deletions oio/conscience/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,26 @@ def service_types(self):
raise OioException("ERROR while getting services types: %s" %
resp.text)

def register(self, pool, service_definition, **kwargs):
data = json.dumps(service_definition)
def get_service_definition(self, srv_type, srv_id,
score=None, tags=None):
service_definition = dict()
service_definition['ns'] = self.ns
service_definition['type'] = srv_type
service_definition['addr'] = srv_id
if score is not None:
service_definition['score'] = score
if tags is not None:
service_definition['tags'] = tags
return service_definition

def register(self, service_definitions, **kwargs):
data = json.dumps(service_definitions)
resp, body = self._request('POST', '/register', data=data, **kwargs)

def deregister(self, service_definitions, **kwargs):
data = json.dumps(service_definitions)
resp, body = self._request('POST', '/deregister', data=data, **kwargs)

def info(self):
resp, body = self._request("GET", '/info')
return body
Expand Down
8 changes: 6 additions & 2 deletions oio/ecd/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2016-2018 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2016-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -88,7 +88,11 @@ def load_meta_chunk(request, nb_chunks, pos=None):
h = request.headers
meta_chunk = []
for i in xrange(nb_chunks):
chunk_url = h['%schunk-%s' % (SYS_PREFIX, i)]
try:
chunk_url = h['%schunk-%s' % (SYS_PREFIX, i)]
except KeyError:
# Missing chunk
continue
chunk_pos = '%s.%d' % (pos, i) if pos else str(i)
chunk = {
'url': chunk_url,
Expand Down
40 changes: 29 additions & 11 deletions oio/event/beanstalk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -537,17 +537,25 @@ def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0):
def delete(self, job_id):
self.execute_command('delete', job_id)

def drain_tube(self, tube):
"""Delete all jobs from the specified tube."""
self.watch(tube)
def _drain(self, fetch_func):
try:
job_id = True
while job_id is not None:
job_id, _ = self.reserve(timeout=0)
job_id, _ = fetch_func()
self.delete(job_id)
except ResponseError:
pass

def drain_buried(self, tube):
self.use(tube)
return self._drain(self.peek_buried)

def drain_tube(self, tube):
"""Delete all jobs from the specified tube."""
self.watch(tube)
from functools import partial
return self._drain(partial(self.reserve, timeout=0))

def kick_job(self, job_id):
"""
Variant of` kick` that operates with a single job.
Expand All @@ -569,18 +577,28 @@ def kick(self, bound=1000):
kicked = int(self.execute_command('kick', str(bound))[1][0])
return kicked

def peek_ready(self):
"""
Read the next ready job without reserving it.
"""
def _peek_generic(self, command_suffix=''):
command = 'peek' + command_suffix
try:
return self.execute_command('peek-ready')
return self.execute_command(command)
except ResponseError as err:
if err.args[0] == 'peek-ready' and err.args[1] == 'NOT_FOUND':
if err.args[0] == command and err.args[1] == 'NOT_FOUND':
return None, None
else:
raise

def peek_buried(self):
"""
Read the next buried job without kicking it.
"""
return self._peek_generic('-buried')

def peek_ready(self):
"""
read the next ready job without reserving it.
"""
return self._peek_generic('-ready')

def wait_until_empty(self, tube, timeout=float('inf'), poll_interval=0.2,
initial_delay=0.0):
"""
Expand Down
Loading

0 comments on commit 3c3e268

Please sign in to comment.