Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROGER-649:Fix docker metrics plugin for collectd #21

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 51 additions & 38 deletions dockerplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ def _c(c):

class Stats:
@classmethod
def emit(cls, container, type, value, t=None, type_instance=None):
def emit(cls, container, type, value, t=None, type_instance=None, apptaskid=""):
val = collectd.Values()
val.plugin = 'docker'
val.plugin_instance = container['Name']
val.plugin_instance = apptaskid if apptaskid else container['Name']

if type:
val.type = type
Expand All @@ -72,13 +72,13 @@ def emit(cls, container, type, value, t=None, type_instance=None):
val.dispatch()

@classmethod
def read(cls, container, stats, t):
def read(cls, container, stats, t, apptaskid):
raise NotImplementedError


class BlkioStats(Stats):
@classmethod
def read(cls, container, stats, t):
def read(cls, container, stats, t, apptaskid):
blkio_stats = stats['blkio_stats']
for key, values in blkio_stats.items():
# Block IO stats are reported by block device (with major/minor
Expand All @@ -96,12 +96,12 @@ def read(cls, container, stats, t):
for type_instance, values in device_stats.items():
if len(values) == 5:
cls.emit(container, 'blkio', values,
type_instance=type_instance, t=t)
type_instance=type_instance, t=t, apptaskid=apptaskid)
elif len(values) == 1:
# For some reason, some fields contains only one value and
# the 'op' field is empty. Need to investigate this
cls.emit(container, 'blkio.single', values,
type_instance=key, t=t)
type_instance=key, t=t, apptaskid=apptaskid)
else:
collectd.warn(('Unexpected number of blkio stats for '
'container {container}!')
Expand All @@ -110,22 +110,22 @@ def read(cls, container, stats, t):

class CpuStats(Stats):
@classmethod
def read(cls, container, stats, t):
def read(cls, container, stats, t, apptaskid):
cpu_stats = stats['cpu_stats']
cpu_usage = cpu_stats['cpu_usage']

percpu = cpu_usage['percpu_usage']
for cpu, value in enumerate(percpu):
cls.emit(container, 'cpu.percpu.usage', [value],
type_instance='cpu%d' % (cpu,), t=t)
type_instance='cpu%d' % (cpu,), t=t, apptaskid=apptaskid)

items = sorted(cpu_stats['throttling_data'].items())
cls.emit(container, 'cpu.throttling_data', [x[1] for x in items], t=t)
cls.emit(container, 'cpu.throttling_data', [x[1] for x in items], t=t, apptaskid=apptaskid)

system_cpu_usage = cpu_stats['system_cpu_usage']
values = [cpu_usage['total_usage'], cpu_usage['usage_in_kernelmode'],
cpu_usage['usage_in_usermode'], system_cpu_usage]
cls.emit(container, 'cpu.usage', values, t=t)
cls.emit(container, 'cpu.usage', values, t=t, apptaskid=apptaskid)

# CPU Percentage based on calculateCPUPercent Docker method
# https://github.com/docker/docker/blob/master/api/client/stats.go
Expand All @@ -137,30 +137,30 @@ def read(cls, container, stats, t):
system_delta = system_cpu_usage - precpu_stats['system_cpu_usage']
if system_delta > 0 and cpu_delta > 0:
cpu_percent = 100.0 * cpu_delta / system_delta * len(percpu)
cls.emit(container, "cpu.percent", ["%.2f" % (cpu_percent)], t=t)
cls.emit(container, "cpu.percent", ["%.2f" % (cpu_percent)], t=t, apptaskid=apptaskid)


class NetworkStats(Stats):
@classmethod
def read(cls, container, stats, t):
def read(cls, container, stats, t, apptaskid):
items = sorted(stats['network'].items())
cls.emit(container, 'network.usage', [x[1] for x in items], t=t)
cls.emit(container, 'network.usage', [x[1] for x in items], t=t, apptaskid=apptaskid)


class MemoryStats(Stats):
@classmethod
def read(cls, container, stats, t):
def read(cls, container, stats, t, apptaskid):
mem_stats = stats['memory_stats']
values = [mem_stats['limit'], mem_stats['max_usage'],
mem_stats['usage']]
cls.emit(container, 'memory.usage', values, t=t)
cls.emit(container, 'memory.usage', values, t=t, apptaskid=apptaskid)

for key, value in mem_stats['stats'].items():
cls.emit(container, 'memory.stats', [value],
type_instance=key, t=t)
type_instance=key, t=t, apptaskid=apptaskid)

mem_percent = 100.0 * mem_stats['usage'] / mem_stats['limit']
cls.emit(container, 'memory.percent', ["%.2f" % mem_percent], t=t)
cls.emit(container, 'memory.percent', ["%.2f" % mem_percent], t=t, apptaskid=apptaskid)


class ContainerStats(threading.Thread):
Expand All @@ -182,7 +182,7 @@ class ContainerStats(threading.Thread):
second), and make the most recently read data available in a variable.
"""

def __init__(self, container, client, stream):
def __init__(self, container, client):
threading.Thread.__init__(self)
self.daemon = True
self.stop = False
Expand All @@ -191,7 +191,6 @@ def __init__(self, container, client, stream):
self._client = client
self._feed = None
self._stats = None
self._stream = stream

# Automatically start stats reading thread
self.start()
Expand All @@ -203,15 +202,10 @@ def run(self):
failures = 0
while not self.stop:
try:

if not self._stream:
if not self._feed:
if not self._feed:
self._feed = self._client.stats(self._container,
decode=True)
self._stats = self._feed.next()
else:
self._stats = self._client.stats(self._container,
decode=True, stream=False)
self._stats = self._feed.next()
# Reset failure count on successfull read from the stats API.
failures = 0
except Exception, e:
Expand All @@ -223,9 +217,9 @@ def run(self):
# stop the thread. If the container is still there, we'll spin
# up a new stats gathering thread the next time read_callback()
# gets called by CollectD.
time.sleep(1)
time.sleep(self._client.sleep_interval)
failures += 1
if failures > 3:
if failures > self._client.failure_count:
self.stop = True

# Marking the feed as dead so we'll attempt to recreate it and
Expand Down Expand Up @@ -253,27 +247,30 @@ class DockerPlugin:
DEFAULT_BASE_URL = 'unix://var/run/docker.sock'
DEFAULT_DOCKER_TIMEOUT = 5

DEFAULT_FAILURE_COUNT = 3
DEFAULT_SLEEP_INTERVAL = 1

# The stats endpoint is only supported by API >= 1.17
MIN_DOCKER_API_VERSION = '1.17'

CLASSES = [NetworkStats, BlkioStats, CpuStats, MemoryStats]

def __init__(self, docker_url=None):
def __init__(self, docker_url=None, failure_count=None, sleep_interval=None):
self.docker_url = docker_url or DockerPlugin.DEFAULT_BASE_URL
self.timeout = DockerPlugin.DEFAULT_DOCKER_TIMEOUT
self.failure_count = failure_count or DockerPlugin.DEFAULT_FAILURE_COUNT
self.sleep_interval = sleep_interval or DockerPlugin.DEFAULT_SLEEP_INTERVAL
self.capture = False
self.stats = {}
self.stream = False
s_version = re.match('([\d.]+)', docker.__version__)
version = tuple([int(x) for x in s_version.group(1).split('.')])
if version >= STREAM_DOCKER_PY_VERSION:
self.stream = True
collectd.info('Docker stats use stream')

def configure_callback(self, conf):
for node in conf.children:
if node.key == 'BaseURL':
self.docker_url = node.values[0]
elif node.key == 'failure_count':
self.failure_count = int(node.values[0])
elif node.key == 'sleep_interval':
self.sleep_interval = int(node.values[0])
elif node.key == 'Timeout':
self.timeout = int(node.values[0])

Expand All @@ -282,6 +279,8 @@ def init_callback(self):
base_url=self.docker_url,
version=DockerPlugin.MIN_DOCKER_API_VERSION)
self.client.timeout = self.timeout
self.client.failure_count = self.failure_count
self.client.sleep_interval = self.sleep_interval

# Check API version for stats endpoint support.
try:
Expand Down Expand Up @@ -315,6 +314,21 @@ def read_callback(self):

for container in containers:
try:

# Get mesos task id from the Env (if available)
inspect_result = self.client.inspect_container(container)
apptaskid = ''
try:
envVars = inspect_result.get('Config').get('Env')
for var in envVars:
if var.startswith('MESOS_TASK_ID=') :
apptaskid = var[var.index('=')+1:]
break
except KeyError:
# Ignore. The inspect did not return an Env. This is odd. Let's log this and continue
collectd.error("Could not get Config or Env for container " + container[id])
pass

for name in container['Names']:
# Containers can be linked and the container name is not
# necessarly the first entry of the list
Expand All @@ -324,14 +338,13 @@ def read_callback(self):
# Start a stats gathering thread if the container is new.
if container['Id'] not in self.stats:
self.stats[container['Id']] = ContainerStats(container,
self.client,
self.stream)
self.client)

# Get and process stats from the container.
stats = self.stats[container['Id']].stats
t = stats['read']
for klass in self.CLASSES:
klass.read(container, stats, t)
klass.read(container, stats, t, apptaskid)
except Exception, e:
collectd.warning(('Error getting stats for container '
'{container}: {msg}')
Expand Down