Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multi-node support #20

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ccwatch.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ ccwatch:
broker: null
camera: celery_cloudwatch.CloudWatchCamera
verbose: no
task-prefix: myprefix
camera:
frequency: 60.0
verbose: no
cloudwatch-camera:
dryrun: no
namespace: celery
metrics:
- CeleryEventSent
- CeleryEventStarted
- CeleryEventSucceeded
- CeleryEventFailed
- CeleryNumWaiting
- CeleryNumRunning
- CeleryWaitingTime
- CeleryProcessingTime
tasks:
- myapp.mytasks.taskname
- myapp.mytasks.anothertask
Expand Down
4 changes: 3 additions & 1 deletion celery_cloudwatch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
v.Optional('ccwatch', default={}): v.Schema({
v.Optional('broker', default=None): v.Any(None, v_str),
v.Optional('camera', default="celery_cloudwatch.CloudWatchCamera"): v_str,
v.Optional('verbose', default=False): bool
v.Optional('verbose', default=False): bool,
v.Optional('task-prefix', default=None): v.Any(None, v_str),
}, extra=False),
v.Optional('camera', default={}): v.Schema({
v.Optional('frequency', default=60.0): v.Any(int, float),
Expand All @@ -23,6 +24,7 @@
v.Optional('cloudwatch-camera', default={}): v.Schema({
v.Optional('dryrun', default=False): bool,
v.Optional('namespace', default='celery'): v_str,
v.Optional('metrics', default=[]): [v_str],
v.Optional('tasks', default=[]): v.Schema([
v_str, v.Schema({
'name': v_str,
Expand Down
2 changes: 1 addition & 1 deletion celery_cloudwatch/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.0.0'
__version__ = '2.0.1'
66 changes: 47 additions & 19 deletions celery_cloudwatch/cloudwatch_camera.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import fnmatch
import re
import itertools
import json
import logging
Expand All @@ -23,6 +23,10 @@ def __init__(self, state, config, cloudwatch_client=None):
self.verbose = config['camera']['verbose']
if not config['cloudwatch-camera']['dryrun'] and cloudwatch_client is None:
cloudwatch_client = boto3.client('cloudwatch')
self.reported_metrics = []
if 'metrics' in config['cloudwatch-camera']:
self.reported_metrics = config['cloudwatch-camera']['metrics']

self.cloudwatch_client = cloudwatch_client
self.cloud_watch_namespace = config['cloudwatch-camera']['namespace']
self.task_mapping = {}
Expand Down Expand Up @@ -90,24 +94,32 @@ def _build_metrics(self, state):
def _add_task_events(self, metrics, task_event_sent, task_event_started, task_event_succeeded, task_event_failed,
num_waiting_by_task, num_running_by_task, time_to_start, time_to_process):
for task_name, dimensions in self.task_mapping.items():
metrics.add('CeleryEventSent', unit='Count', value=task_event_sent.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryEventStarted', unit='Count', value=task_event_started.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryEventSucceeded', unit='Count', value=task_event_succeeded.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryEventFailed', unit='Count', value=task_event_failed.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryNumWaiting', unit='Count', value=num_waiting_by_task.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryNumRunning', unit='Count', value=num_running_by_task.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSent', unit='Count',
value=task_event_sent.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventStarted', unit='Count',
value=task_event_started.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSucceeded', unit='Count',
value=task_event_succeeded.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventFailed', unit='Count',
value=task_event_failed.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumWaiting', unit='Count',
value=num_waiting_by_task.get(task_name, 0), dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumRunning', unit='Count',
value=num_running_by_task.get(task_name, 0), dimensions=dimensions)
waiting_time = time_to_start.get(task_name)
if waiting_time:
metrics.add('CeleryWaitingTime', unit='Seconds', dimensions=dimensions, stats=waiting_time.__dict__.copy())
metrics = self.add_metrics(metrics=metrics, task_name='CeleryWaitingTime', unit='Seconds',
stats=waiting_time.__dict__.copy(), dimensions=dimensions)

running_time = time_to_process.get(task_name)
if running_time:
metrics.add('CeleryProcessingTime', unit='Seconds', dimensions=dimensions, stats=running_time.__dict__.copy())
metrics = self.add_metrics(metrics=metrics, task_name='CeleryProcessingTime', unit='Seconds',
stats=running_time.__dict__.copy(), dimensions=dimensions)

def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_event_succeeded,
task_event_failed, num_waiting_by_task, num_running_by_task, time_to_start, time_to_process):

all_task_names = set(itertools.chain(task_event_sent, task_event_started, task_event_succeeded, task_event_failed))

for task_group in self.task_groups:
dimensions = task_group['dimensions']
waiting = 0
Expand All @@ -124,7 +136,7 @@ def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_ev
task_names = []
for task_name in all_task_names:
for pattern in patterns:
if fnmatch.fnmatchcase(pattern, task_name):
if re.fullmatch(pattern, task_name):
task_names.append(task_name)
break
else:
Expand All @@ -148,18 +160,34 @@ def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_ev
running_time = Stats()
running_time += task_run_time

metrics.add('CeleryEventSent', unit='Count', value=waiting, dimensions=dimensions)
metrics.add('CeleryEventStarted', unit='Count', value=running, dimensions=dimensions)
metrics.add('CeleryEventSucceeded', unit='Count', value=completed, dimensions=dimensions)
metrics.add('CeleryEventFailed', unit='Count', value=failed, dimensions=dimensions)
metrics.add('CeleryNumWaiting', unit='Count', value=num_waiting, dimensions=dimensions)
metrics.add('CeleryNumRunning', unit='Count', value=num_running, dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSent', unit='Count', value=waiting,
dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventStarted', unit='Count', value=running,
dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSucceeded', unit='Count', value=completed,
dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventFailed', unit='Count', value=failed,
dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumWaiting', unit='Count', value=num_waiting,
dimensions=dimensions)
metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumRunning', unit='Count', value=num_running,
dimensions=dimensions)
if waiting_time:
metrics.add('CeleryWaitingTime', unit='Seconds', dimensions=dimensions, stats=waiting_time.__dict__.copy())
metrics = self.add_metrics(metrics=metrics, task_name='CeleryWaitingTime', unit='Seconds',
stats=waiting_time.__dict__.copy(), dimensions=dimensions)
if running_time:
metrics.add('CeleryProcessingTime', unit='Seconds', dimensions=dimensions, stats=running_time.__dict__.copy())
metrics = self.add_metrics(metrics=metrics, task_name='CeleryProcessingTime', unit='Seconds',
stats=running_time.__dict__.copy(), dimensions=dimensions)


def add_metrics(self, metrics, task_name, unit, dimensions, value=None, stats=None):
if len(self.reported_metrics) == 0 or task_name in self.reported_metrics:
if stats is not None:
metrics.add(task_name, unit=unit, dimensions=dimensions, stats=stats)
else:
metrics.add(task_name, unit=unit, dimensions=dimensions, value=value)
return metrics

def xchunk(arr, size):
for x in range(0, len(arr), size):
yield arr[x:x+size]
Expand Down
70 changes: 42 additions & 28 deletions celery_cloudwatch/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class State(object):

# http://docs.celeryproject.org/en/latest/userguide/monitoring.html

def __init__(self):
def __init__(self, config):
self._mutex = threading.Lock()

# track the number of events in the current window
Expand All @@ -24,6 +24,7 @@ def __init__(self):
self.time_to_start = defaultdict(Stats)

self.time_to_process = defaultdict(Stats)
self.config = config

self.registry = {}

Expand Down Expand Up @@ -67,41 +68,51 @@ def num_waiting_running_by_task(self):

def task_sent(self, event):
with self._mutex:
uuid = event['uuid']
if uuid not in self.registry:
task_name = event['name']
self.registry[uuid] = TaskRecord(task_name, event['timestamp'], None, None, None)
self.task_event_sent[task_name] += 1
return

task_record = self.registry[uuid]._replace(
name=event['name'],
sent_at=event['timestamp']
)
self.registry[uuid] = task_record
self.task_event_sent[task_record.name] += 1

if task_record.started_at is None:
return
self._initiate_task(event)

self.task_event_started[task_record.name] += 1
self.time_to_start[task_record.name] += task_record.wait_duration
if not task_record.finished:
return
del self.registry[uuid]
if task_record.successful:
self.task_event_succeeded[task_record.name] += 1
self.time_to_process[task_record.name] += task_record.processing_duration
def _initiate_task(self, event):
uuid = event['uuid']
if uuid not in self.registry:
if 'name' in event:
task_name = event['name']
else:
self.task_event_failed[task_record.name] += 1
if 'task-prefix' in self.config['ccwatch'] and self.config['ccwatch']['task-prefix']:
task_name = "{}-{}".format(self.config['ccwatch']['task-prefix'], uuid)
else:
task_name = uuid

self.registry[uuid] = TaskRecord(task_name, event['timestamp'], None, None, None)
self.task_event_sent[task_name] += 1
return

task_record = self.registry[uuid]._replace(
name=event['name'],
sent_at=event['timestamp']
)
self.registry[uuid] = task_record
self.task_event_sent[task_record.name] += 1

if task_record.started_at is None:
return

self.task_event_started[task_record.name] += 1
self.time_to_start[task_record.name] += task_record.wait_duration
if not task_record.finished:
return
del self.registry[uuid]
if task_record.successful:
self.task_event_succeeded[task_record.name] += 1
self.time_to_process[task_record.name] += task_record.processing_duration
else:
self.task_event_failed[task_record.name] += 1

def task_started(self, event):
with self._mutex:
uuid = event['uuid']
task_record = self.registry.get(uuid, None)
if task_record is None:
self.registry[uuid] = TaskRecord(None, None, event['timestamp'], None, None)
return
self._initiate_task(event)
task_record = self.registry.get(uuid, None)

task_record = task_record._replace(started_at=event['timestamp'])
self.registry[uuid] = task_record
Expand Down Expand Up @@ -192,6 +203,9 @@ def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return tuple(self)

def print(self):
print("TaskRecord: {} - {} - {} - {} - {}".format(self.name, self.sent_at, self.started_at, self.succeeded_at, self.failed_at))

name = _property(_itemgetter(0), doc='Alias for field number 0')

sent_at = _property(_itemgetter(1), doc='Alias for field number 1')
Expand Down
2 changes: 1 addition & 1 deletion celery_cloudwatch/task_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, broker=None, camera='celery_cloudwatch.PrintCamera',

def run(self):
app = Celery(broker=self.broker)
state = State()
state = State(self.config)

factory = CameraFactory(self.camera)
camera = factory.camera(state, self.config)
Expand Down