Skip to content

Commit

Permalink
Merge pull request #1779 from dashpole/on_demand_metrics
Browse files Browse the repository at this point in the history
On-Demand container metrics
  • Loading branch information
dashpole authored Nov 20, 2017
2 parents a27bed7 + 3d6ad6d commit 17dcf1c
Show file tree
Hide file tree
Showing 43 changed files with 7,615 additions and 1,121 deletions.
142 changes: 80 additions & 62 deletions Godeps/Godeps.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions collector/generic_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,17 @@ func TestMetricCollection(t *testing.T) {
assert.NoError(errMetric)
metricNames := []string{"activeConnections", "reading", "writing", "waiting"}
// activeConnections = 3
assert.Equal(metrics[metricNames[0]][0].IntValue, 3)
assert.Equal(metrics[metricNames[0]][0].FloatValue, 0)
assert.Equal(metrics[metricNames[0]][0].IntValue, int64(3))
assert.Equal(metrics[metricNames[0]][0].FloatValue, float64(0))
// reading = 0
assert.Equal(metrics[metricNames[1]][0].IntValue, 0)
assert.Equal(metrics[metricNames[1]][0].FloatValue, 0)
assert.Equal(metrics[metricNames[1]][0].IntValue, int64(0))
assert.Equal(metrics[metricNames[1]][0].FloatValue, float64(0))
// writing = 1
assert.Equal(metrics[metricNames[2]][0].IntValue, 1)
assert.Equal(metrics[metricNames[2]][0].FloatValue, 0)
assert.Equal(metrics[metricNames[2]][0].IntValue, int64(1))
assert.Equal(metrics[metricNames[2]][0].FloatValue, float64(0))
// waiting = 2
assert.Equal(metrics[metricNames[3]][0].IntValue, 2)
assert.Equal(metrics[metricNames[3]][0].FloatValue, 0)
assert.Equal(metrics[metricNames[3]][0].IntValue, int64(2))
assert.Equal(metrics[metricNames[3]][0].FloatValue, float64(0))
}

func TestMetricCollectionLimit(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions collector/prometheus_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ metric_with_multiple_labels{label1="One", label2="Two", label3="Three"} 81
assert.Equal(1.7560473e+07, go_gc_duration_sum[0].FloatValue)
assert.Equal("__name__=go_gc_duration_seconds_sum", go_gc_duration_sum[0].Label)
go_gc_duration_count := metrics["go_gc_duration_seconds_count"]
assert.Equal(2693, go_gc_duration_count[0].FloatValue)
assert.Equal(float64(2693), go_gc_duration_count[0].FloatValue)
assert.Equal("__name__=go_gc_duration_seconds_count", go_gc_duration_count[0].Label)

goRoutines := metrics["go_goroutines"]
assert.Equal(16, goRoutines[0].FloatValue)
assert.Equal(float64(16), goRoutines[0].FloatValue)
assert.Equal("__name__=go_goroutines", goRoutines[0].Label)

metricWithSpaces := metrics["metric_with_spaces_in_label"]
assert.Equal(72, metricWithSpaces[0].FloatValue)
assert.Equal(float64(72), metricWithSpaces[0].FloatValue)
assert.Equal("__name__=metric_with_spaces_in_label\xffname=Network Agent", metricWithSpaces[0].Label)

metricWithMultipleLabels := metrics["metric_with_multiple_labels"]
assert.Equal(81, metricWithMultipleLabels[0].FloatValue)
assert.Equal(float64(81), metricWithMultipleLabels[0].FloatValue)
assert.Equal("__name__=metric_with_multiple_labels\xfflabel1=One\xfflabel2=Two\xfflabel3=Three", metricWithMultipleLabels[0].Label)
}

Expand Down Expand Up @@ -215,7 +215,7 @@ go_goroutines 16
assert.Len(metrics, 1)

goRoutines := metrics["go_goroutines"]
assert.Equal(goRoutines[0].FloatValue, 16)
assert.Equal(goRoutines[0].FloatValue, float64(16))
}

func TestPrometheusFiltersMetricsCountLimit(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions info/v2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ type RequestOptions struct {
Count int `json:"count"`
// Whether to include stats for child subcontainers.
Recursive bool `json:"recursive"`
// Update stats if they are older than MaxAge
// nil indicates no update, and 0 will always trigger an update.
MaxAge *time.Duration `json:"max_age"`
}

type ProcessInfo struct {
Expand Down
3 changes: 0 additions & 3 deletions info/v2/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ func InstCpuStats(last, cur *v1.ContainerStats) (*CpuInstStats, error) {
return nil, fmt.Errorf("different number of cpus")
}
timeDelta := cur.Timestamp.Sub(last.Timestamp)
if timeDelta <= 100*time.Millisecond {
return nil, fmt.Errorf("time delta unexpectedly small")
}
// Nanoseconds to gain precision and avoid having zero seconds if the
// difference between the timestamps is just under a second
timeDeltaNs := uint64(timeDelta.Nanoseconds())
Expand Down
10 changes: 0 additions & 10 deletions info/v2/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,6 @@ func TestInstCpuStats(t *testing.T) {
},
nil,
},
// Unexpectedly small time delta
{
&v1.ContainerStats{
Timestamp: time.Unix(100, 0),
},
&v1.ContainerStats{
Timestamp: time.Unix(100, 0).Add(30 * time.Millisecond),
},
nil,
},
// Different number of cpus
{
&v1.ContainerStats{
Expand Down
109 changes: 75 additions & 34 deletions manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

units "github.com/docker/go-units"
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/util/clock"
)

// Housekeeping interval.
Expand All @@ -65,8 +67,11 @@ type containerData struct {
housekeepingInterval time.Duration
maxHousekeepingInterval time.Duration
allowDynamicHousekeeping bool
lastUpdatedTime time.Time
infoLastUpdatedTime time.Time
statsLastUpdatedTime time.Time
lastErrorTime time.Time
// used to track time
clock clock.Clock

// Decay value used for load average smoothing. Interval length of 10 seconds is used.
loadDecay float64
Expand All @@ -77,6 +82,9 @@ type containerData struct {
// Tells the container to stop.
stop chan bool

// Tells the container to immediately collect stats
onDemandChan chan chan struct{}

// Runs custom metric collectors.
collectorManager collector.CollectorManager

Expand Down Expand Up @@ -110,16 +118,43 @@ func (c *containerData) Stop() error {
}

func (c *containerData) allowErrorLogging() bool {
if time.Since(c.lastErrorTime) > time.Minute {
c.lastErrorTime = time.Now()
if c.clock.Since(c.lastErrorTime) > time.Minute {
c.lastErrorTime = c.clock.Now()
return true
}
return false
}

// OnDemandHousekeeping performs housekeeping on the container and blocks until it has completed.
// It is designed to be used in conjunction with periodic housekeeping, and will cause the timer for
// periodic housekeeping to reset. This should be used sparingly, as calling OnDemandHousekeeping frequently
// can have serious performance costs.
func (c *containerData) OnDemandHousekeeping(maxAge time.Duration) {
if c.clock.Since(c.statsLastUpdatedTime) > maxAge {
housekeepingFinishedChan := make(chan struct{})
c.onDemandChan <- housekeepingFinishedChan
select {
case <-c.stop:
case <-housekeepingFinishedChan:
}
}
}

// notifyOnDemand notifies all calls to OnDemandHousekeeping that housekeeping is finished
func (c *containerData) notifyOnDemand() {
for {
select {
case finishedChan := <-c.onDemandChan:
close(finishedChan)
default:
return
}
}
}

func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, error) {
// Get spec and subcontainers.
if time.Since(c.lastUpdatedTime) > 5*time.Second {
if c.clock.Since(c.infoLastUpdatedTime) > 5*time.Second {
err := c.updateSpec()
if err != nil {
return nil, err
Expand All @@ -130,7 +165,7 @@ func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo,
return nil, err
}
}
c.lastUpdatedTime = time.Now()
c.infoLastUpdatedTime = c.clock.Now()
}
// Make a copy of the info for the user.
c.lock.Lock()
Expand Down Expand Up @@ -310,7 +345,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
return processes, nil
}

func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool) (*containerData, error) {
func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, clock clock.Clock) (*containerData, error) {
if memoryCache == nil {
return nil, fmt.Errorf("nil memory storage")
}
Expand All @@ -332,6 +367,8 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
loadAvg: -1.0, // negative value indicates uninitialized.
stop: make(chan bool, 1),
collectorManager: collectorManager,
onDemandChan: make(chan chan struct{}, 100),
clock: clock,
}
cont.info.ContainerReference = ref

Expand Down Expand Up @@ -362,7 +399,7 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
}

// Determine when the next housekeeping should occur.
func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Time {
func (self *containerData) nextHousekeepingInterval() time.Duration {
if self.allowDynamicHousekeeping {
var empty time.Time
stats, err := self.memoryCache.RecentStats(self.info.Name, empty, empty, 2)
Expand All @@ -385,7 +422,7 @@ func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Tim
}
}

return lastHousekeeping.Add(jitter(self.housekeepingInterval, 1.0))
return jitter(self.housekeepingInterval, 1.0)
}

// TODO(vmarmol): Implement stats collecting as a custom collector.
Expand All @@ -411,24 +448,19 @@ func (c *containerData) housekeeping() {

// Housekeep every second.
glog.V(3).Infof("Start housekeeping for container %q\n", c.info.Name)
lastHousekeeping := time.Now()
houseKeepingTimer := c.clock.NewTimer(0 * time.Second)
defer houseKeepingTimer.Stop()
for {
select {
case <-c.stop:
// Stop housekeeping when signaled.
if !c.housekeepingTick(houseKeepingTimer.C(), longHousekeeping) {
return
default:
// Perform housekeeping.
start := time.Now()
c.housekeepingTick()

// Log if housekeeping took too long.
duration := time.Since(start)
if duration >= longHousekeeping {
glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration)
}
// Stop and drain the timer so that it is safe to reset it
if !houseKeepingTimer.Stop() {
select {
case <-houseKeepingTimer.C():
default:
}
}

// Log usage if asked to do so.
if c.logUsage {
const numSamples = 60
Expand All @@ -455,26 +487,35 @@ func (c *containerData) housekeeping() {
glog.Infof("[%s] %.3f cores (average: %.3f cores), %s of memory", c.info.Name, instantUsageInCores, usageInCores, usageInHuman)
}
}

next := c.nextHousekeeping(lastHousekeeping)

// Schedule the next housekeeping. Sleep until that time.
if time.Now().Before(next) {
time.Sleep(next.Sub(time.Now()))
} else {
next = time.Now()
}
lastHousekeeping = next
houseKeepingTimer.Reset(c.nextHousekeepingInterval())
}
}

func (c *containerData) housekeepingTick() {
func (c *containerData) housekeepingTick(timer <-chan time.Time, longHousekeeping time.Duration) bool {
select {
case <-c.stop:
// Stop housekeeping when signaled.
return false
case finishedChan := <-c.onDemandChan:
// notify the calling function once housekeeping has completed
defer close(finishedChan)
case <-timer:
}
start := c.clock.Now()
err := c.updateStats()
if err != nil {
if c.allowErrorLogging() {
glog.Infof("Failed to update stats for container \"%s\": %s", c.info.Name, err)
}
}
// Log if housekeeping took too long.
duration := c.clock.Since(start)
if duration >= longHousekeeping {
glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration)
}
c.notifyOnDemand()
c.statsLastUpdatedTime = c.clock.Now()
return true
}

func (c *containerData) updateSpec() error {
Expand Down Expand Up @@ -550,7 +591,7 @@ func (c *containerData) updateStats() error {
var customStatsErr error
cm := c.collectorManager.(*collector.GenericCollectorManager)
if len(cm.Collectors) > 0 {
if cm.NextCollectionTime.Before(time.Now()) {
if cm.NextCollectionTime.Before(c.clock.Now()) {
customStats, err := c.updateCustomStats()
if customStats != nil {
stats.CustomMetrics = customStats
Expand Down
Loading

0 comments on commit 17dcf1c

Please sign in to comment.