diff --git a/dockerplugin.py b/dockerplugin.py index 2a9dc04..ee29486 100755 --- a/dockerplugin.py +++ b/dockerplugin.py @@ -47,10 +47,10 @@ def _c(c): class Stats: @classmethod - def emit(cls, container, type, value, t=None, type_instance=None): + def emit(cls, container, type, value, t=None, type_instance=None, apptaskid=""): val = collectd.Values() val.plugin = 'docker' - val.plugin_instance = container['Name'] + val.plugin_instance = apptaskid if apptaskid else container['Name'] if type: val.type = type @@ -72,13 +72,13 @@ def emit(cls, container, type, value, t=None, type_instance=None): val.dispatch() @classmethod - def read(cls, container, stats, t): + def read(cls, container, stats, t, apptaskid): raise NotImplementedError class BlkioStats(Stats): @classmethod - def read(cls, container, stats, t): + def read(cls, container, stats, t, apptaskid): blkio_stats = stats['blkio_stats'] for key, values in blkio_stats.items(): # Block IO stats are reported by block device (with major/minor @@ -96,12 +96,12 @@ def read(cls, container, stats, t): for type_instance, values in device_stats.items(): if len(values) == 5: cls.emit(container, 'blkio', values, - type_instance=type_instance, t=t) + type_instance=type_instance, t=t, apptaskid=apptaskid) elif len(values) == 1: # For some reason, some fields contains only one value and # the 'op' field is empty. Need to investigate this cls.emit(container, 'blkio.single', values, - type_instance=key, t=t) + type_instance=key, t=t, apptaskid=apptaskid) else: collectd.warn(('Unexpected number of blkio stats for ' 'container {container}!') @@ -110,22 +110,22 @@ def read(cls, container, stats, t): class CpuStats(Stats): @classmethod - def read(cls, container, stats, t): + def read(cls, container, stats, t, apptaskid): cpu_stats = stats['cpu_stats'] cpu_usage = cpu_stats['cpu_usage'] percpu = cpu_usage['percpu_usage'] for cpu, value in enumerate(percpu): cls.emit(container, 'cpu.percpu.usage', [value], - type_instance='cpu%d' % (cpu,), t=t) + type_instance='cpu%d' % (cpu,), t=t, apptaskid=apptaskid) items = sorted(cpu_stats['throttling_data'].items()) - cls.emit(container, 'cpu.throttling_data', [x[1] for x in items], t=t) + cls.emit(container, 'cpu.throttling_data', [x[1] for x in items], t=t, apptaskid=apptaskid) system_cpu_usage = cpu_stats['system_cpu_usage'] values = [cpu_usage['total_usage'], cpu_usage['usage_in_kernelmode'], cpu_usage['usage_in_usermode'], system_cpu_usage] - cls.emit(container, 'cpu.usage', values, t=t) + cls.emit(container, 'cpu.usage', values, t=t, apptaskid=apptaskid) # CPU Percentage based on calculateCPUPercent Docker method # https://github.com/docker/docker/blob/master/api/client/stats.go @@ -137,30 +137,30 @@ def read(cls, container, stats, t): system_delta = system_cpu_usage - precpu_stats['system_cpu_usage'] if system_delta > 0 and cpu_delta > 0: cpu_percent = 100.0 * cpu_delta / system_delta * len(percpu) - cls.emit(container, "cpu.percent", ["%.2f" % (cpu_percent)], t=t) + cls.emit(container, "cpu.percent", ["%.2f" % (cpu_percent)], t=t, apptaskid=apptaskid) class NetworkStats(Stats): @classmethod - def read(cls, container, stats, t): + def read(cls, container, stats, t, apptaskid): items = sorted(stats['network'].items()) - cls.emit(container, 'network.usage', [x[1] for x in items], t=t) + cls.emit(container, 'network.usage', [x[1] for x in items], t=t, apptaskid=apptaskid) class MemoryStats(Stats): @classmethod - def read(cls, container, stats, t): + def read(cls, container, stats, t, apptaskid): mem_stats = stats['memory_stats'] values = [mem_stats['limit'], mem_stats['max_usage'], mem_stats['usage']] - cls.emit(container, 'memory.usage', values, t=t) + cls.emit(container, 'memory.usage', values, t=t, apptaskid=apptaskid) for key, value in mem_stats['stats'].items(): cls.emit(container, 'memory.stats', [value], - type_instance=key, t=t) + type_instance=key, t=t, apptaskid=apptaskid) mem_percent = 100.0 * mem_stats['usage'] / mem_stats['limit'] - cls.emit(container, 'memory.percent', ["%.2f" % mem_percent], t=t) + cls.emit(container, 'memory.percent', ["%.2f" % mem_percent], t=t, apptaskid=apptaskid) class ContainerStats(threading.Thread): @@ -182,7 +182,7 @@ class ContainerStats(threading.Thread): second), and make the most recently read data available in a variable. """ - def __init__(self, container, client, stream): + def __init__(self, container, client): threading.Thread.__init__(self) self.daemon = True self.stop = False @@ -191,7 +191,6 @@ def __init__(self, container, client, stream): self._client = client self._feed = None self._stats = None - self._stream = stream # Automatically start stats reading thread self.start() @@ -203,15 +202,10 @@ def run(self): failures = 0 while not self.stop: try: - - if not self._stream: - if not self._feed: + if not self._feed: self._feed = self._client.stats(self._container, decode=True) - self._stats = self._feed.next() - else: - self._stats = self._client.stats(self._container, - decode=True, stream=False) + self._stats = self._feed.next() # Reset failure count on successfull read from the stats API. failures = 0 except Exception, e: @@ -223,9 +217,9 @@ def run(self): # stop the thread. If the container is still there, we'll spin # up a new stats gathering thread the next time read_callback() # gets called by CollectD. - time.sleep(1) + time.sleep(self._client.sleep_interval) failures += 1 - if failures > 3: + if failures > self._client.failure_count: self.stop = True # Marking the feed as dead so we'll attempt to recreate it and @@ -253,27 +247,30 @@ class DockerPlugin: DEFAULT_BASE_URL = 'unix://var/run/docker.sock' DEFAULT_DOCKER_TIMEOUT = 5 + DEFAULT_FAILURE_COUNT = 3 + DEFAULT_SLEEP_INTERVAL = 1 + # The stats endpoint is only supported by API >= 1.17 MIN_DOCKER_API_VERSION = '1.17' CLASSES = [NetworkStats, BlkioStats, CpuStats, MemoryStats] - def __init__(self, docker_url=None): + def __init__(self, docker_url=None, failure_count=None, sleep_interval=None): self.docker_url = docker_url or DockerPlugin.DEFAULT_BASE_URL self.timeout = DockerPlugin.DEFAULT_DOCKER_TIMEOUT + self.failure_count = failure_count or DockerPlugin.DEFAULT_FAILURE_COUNT + self.sleep_interval = sleep_interval or DockerPlugin.DEFAULT_SLEEP_INTERVAL self.capture = False self.stats = {} - self.stream = False - s_version = re.match('([\d.]+)', docker.__version__) - version = tuple([int(x) for x in s_version.group(1).split('.')]) - if version >= STREAM_DOCKER_PY_VERSION: - self.stream = True - collectd.info('Docker stats use stream') def configure_callback(self, conf): for node in conf.children: if node.key == 'BaseURL': self.docker_url = node.values[0] + elif node.key == 'failure_count': + self.failure_count = int(node.values[0]) + elif node.key == 'sleep_interval': + self.sleep_interval = int(node.values[0]) elif node.key == 'Timeout': self.timeout = int(node.values[0]) @@ -282,6 +279,8 @@ def init_callback(self): base_url=self.docker_url, version=DockerPlugin.MIN_DOCKER_API_VERSION) self.client.timeout = self.timeout + self.client.failure_count = self.failure_count + self.client.sleep_interval = self.sleep_interval # Check API version for stats endpoint support. try: @@ -315,6 +314,21 @@ def read_callback(self): for container in containers: try: + + # Get mesos task id from the Env (if available) + inspect_result = self.client.inspect_container(container) + apptaskid = '' + try: + envVars = inspect_result.get('Config').get('Env') + for var in envVars: + if var.startswith('MESOS_TASK_ID=') : + apptaskid = var[var.index('=')+1:] + break + except KeyError: + # Ignore. The inspect did not return an Env. This is odd. Let's log this and continue + collectd.error("Could not get Config or Env for container " + container[id]) + pass + for name in container['Names']: # Containers can be linked and the container name is not # necessarly the first entry of the list @@ -324,14 +338,13 @@ def read_callback(self): # Start a stats gathering thread if the container is new. if container['Id'] not in self.stats: self.stats[container['Id']] = ContainerStats(container, - self.client, - self.stream) + self.client) # Get and process stats from the container. stats = self.stats[container['Id']].stats t = stats['read'] for klass in self.CLASSES: - klass.read(container, stats, t) + klass.read(container, stats, t, apptaskid) except Exception, e: collectd.warning(('Error getting stats for container ' '{container}: {msg}')