diff --git a/settings.go b/settings.go index 5d7e916..505f938 100644 --- a/settings.go +++ b/settings.go @@ -6,13 +6,13 @@ import "github.com/kelseyhightower/envconfig" // variables to setup its settings. type Settings struct { // Use statsd as a stats sink. - UseStatsd bool `envconfig:"USE_STATSD" default:"true"` + UseStatsd bool `envconfig:"USE_STATSD" default:"true"` // Address where statsd is running at. - StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` + StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` // Port where statsd is listening at. - StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` - // Flushing interval. - FlushIntervalS int `envconfig:"GOSTATS_FLUSH_INTERVAL_SECONDS" default:"5"` + StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` + // Flushing interval. + FlushIntervalS int `envconfig:"GOSTATS_FLUSH_INTERVAL_SECONDS" default:"5"` } // GetSettings returns the Settings gostats will run with. diff --git a/stats.go b/stats.go index ae9b1eb..c3fcea3 100644 --- a/stats.go +++ b/stats.go @@ -267,8 +267,8 @@ type timer struct { sink Sink } -func (t *timer) time(timev time.Duration) { - t.sink.FlushTimer(t.name, float64(timev/time.Microsecond)) +func (t *timer) time(dur time.Duration) { + t.sink.FlushTimer(t.name, float64(dur/time.Microsecond)) } func (t *timer) AddValue(value float64) { @@ -289,23 +289,30 @@ func (ts *timespan) Complete() { } type statStore struct { - sync.Mutex - counters map[string]*counter - gauges map[string]*gauge - timers map[string]*timer - sink Sink + countersMtx sync.RWMutex + counters map[string]*counter + + gaugesMtx sync.RWMutex + gauges map[string]*gauge + + timersMtx sync.RWMutex + timers map[string]*timer + + genMtx sync.RWMutex statGenerators []StatGenerator - export bool + + sink Sink + export bool } func (s *statStore) Flush() { - s.Lock() - defer s.Unlock() - + s.genMtx.RLock() for _, g := range s.statGenerators { g.GenerateStats() } + s.genMtx.RUnlock() + s.countersMtx.RLock() for name, cv := range s.counters { value := cv.latch() @@ -316,11 +323,14 @@ func (s *statStore) Flush() { s.sink.FlushCounter(name, value) } + s.countersMtx.RUnlock() + s.gaugesMtx.RLock() for name, gv := range s.gauges { value := gv.Value() s.sink.FlushGauge(name, value) } + s.gaugesMtx.RUnlock() } func (s *statStore) Start(ticker *time.Ticker) { @@ -328,14 +338,13 @@ func (s *statStore) Start(ticker *time.Ticker) { } func (s *statStore) AddStatGenerator(statGenerator StatGenerator) { - s.Lock() - defer s.Unlock() - + s.genMtx.Lock() s.statGenerators = append(s.statGenerators, statGenerator) + s.genMtx.Unlock() } func (s *statStore) run(ticker *time.Ticker) { - for _ = range ticker.C { + for range ticker.C { s.Flush() } } @@ -344,33 +353,38 @@ func (s *statStore) Store() Store { return s } -func (registry *statStore) Scope(name string) Scope { - return subScope{registry: registry, name: name} +func (s *statStore) Scope(name string) Scope { + return subScope{registry: s, name: name} } -func (registry *statStore) NewCounter(name string) Counter { - registry.Lock() - defer registry.Unlock() +func (s *statStore) NewCounter(name string) Counter { + s.countersMtx.RLock() + c, ok := s.counters[name] + s.countersMtx.RUnlock() - counterv, ok := registry.counters[name] if ok { - return counterv + return c } else { - counterv = &counter{} - registry.counters[name] = counterv - if registry.export && expvar.Get(name) == nil { - expvar.Publish(name, counterv) + c = &counter{} + + s.countersMtx.Lock() + s.counters[name] = c + s.countersMtx.Unlock() + + if s.export && expvar.Get(name) == nil { + expvar.Publish(name, c) } - return counterv + + return c } } -func (registry *statStore) NewCounterWithTags(name string, tags map[string]string) Counter { +func (s *statStore) NewCounterWithTags(name string, tags map[string]string) Counter { serializedTags := serializeTags(tags) - return registry.NewCounter(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewCounter(fmt.Sprintf("%s%s", name, serializedTags)) } -func (registry *statStore) NewPerInstanceCounter(name string, tags map[string]string) Counter { +func (s *statStore) NewPerInstanceCounter(name string, tags map[string]string) Counter { if tags == nil { tags = make(map[string]string, 1) } @@ -379,32 +393,37 @@ func (registry *statStore) NewPerInstanceCounter(name string, tags map[string]st tags["_f"] = "i" } serializedTags := serializeTags(tags) - return registry.NewCounter(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewCounter(fmt.Sprintf("%s%s", name, serializedTags)) } -func (registry *statStore) NewGauge(name string) Gauge { - registry.Lock() - defer registry.Unlock() +func (s *statStore) NewGauge(name string) Gauge { + s.gaugesMtx.RLock() + g, ok := s.gauges[name] + s.gaugesMtx.RUnlock() - gaugev, ok := registry.gauges[name] if ok { - return gaugev + return g } else { - gaugev = &gauge{} - registry.gauges[name] = gaugev - if registry.export && expvar.Get(name) == nil { - expvar.Publish(name, gaugev) + g = &gauge{} + + s.gaugesMtx.Lock() + s.gauges[name] = g + s.gaugesMtx.Unlock() + + if s.export && expvar.Get(name) == nil { + expvar.Publish(name, g) } - return gaugev + + return g } } -func (registry *statStore) NewGaugeWithTags(name string, tags map[string]string) Gauge { +func (s *statStore) NewGaugeWithTags(name string, tags map[string]string) Gauge { serializedTags := serializeTags(tags) - return registry.NewGauge(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewGauge(fmt.Sprintf("%s%s", name, serializedTags)) } -func (registry *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gauge { +func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gauge { if tags == nil { tags = make(map[string]string, 1) } @@ -413,29 +432,33 @@ func (registry *statStore) NewPerInstanceGauge(name string, tags map[string]stri tags["_f"] = "i" } serializedTags := serializeTags(tags) - return registry.NewGauge(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewGauge(fmt.Sprintf("%s%s", name, serializedTags)) } -func (registry *statStore) NewTimer(name string) Timer { - registry.Lock() - defer registry.Unlock() +func (s *statStore) NewTimer(name string) Timer { + s.timersMtx.RLock() + t, ok := s.timers[name] + s.timersMtx.RUnlock() - timerv, ok := registry.timers[name] if ok { - return timerv + return t } else { - timerv = &timer{name: name, sink: registry.sink} - registry.timers[name] = timerv - return timerv + t = &timer{name: name, sink: s.sink} + + s.timersMtx.Lock() + s.timers[name] = t + s.timersMtx.Unlock() + + return t } } -func (registry *statStore) NewTimerWithTags(name string, tags map[string]string) Timer { +func (s *statStore) NewTimerWithTags(name string, tags map[string]string) Timer { serializedTags := serializeTags(tags) - return registry.NewTimer(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewTimer(fmt.Sprintf("%s%s", name, serializedTags)) } -func (registry *statStore) NewPerInstanceTimer(name string, tags map[string]string) Timer { +func (s *statStore) NewPerInstanceTimer(name string, tags map[string]string) Timer { if tags == nil { tags = make(map[string]string, 1) } @@ -444,7 +467,7 @@ func (registry *statStore) NewPerInstanceTimer(name string, tags map[string]stri tags["_f"] = "i" } serializedTags := serializeTags(tags) - return registry.NewTimer(fmt.Sprintf("%s%s", name, serializedTags)) + return s.NewTimer(fmt.Sprintf("%s%s", name, serializedTags)) } func (s subScope) Scope(name string) Scope { diff --git a/stats_test.go b/stats_test.go index 0580e1a..db2b8e0 100644 --- a/stats_test.go +++ b/stats_test.go @@ -1,8 +1,11 @@ package stats import ( + "math/rand" + "strconv" "sync" "testing" + "time" ) // Ensure flushing and adding generators does not race @@ -29,3 +32,21 @@ func TestStats(t *testing.T) { wg.Wait() } + +var bmID = "" +var bmVal = uint64(0) + +func BenchmarkStore_MutexContention(b *testing.B) { + s := NewStore(&nullSink{}, false) + t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics + defer t.Stop() + go s.Start(t) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bmID = strconv.Itoa(rand.Intn(1000)) + c := s.NewCounter(bmID) + c.Inc() + bmVal = c.Value() + } +} diff --git a/tcp_sink_test.go b/tcp_sink_test.go index c2d6123..86a8c40 100644 --- a/tcp_sink_test.go +++ b/tcp_sink_test.go @@ -3,23 +3,31 @@ package stats import ( "fmt" "strings" + "sync" "testing" ) type testStatSink struct { + sync.Mutex record string } func (s *testStatSink) FlushCounter(name string, value uint64) { + s.Lock() s.record += fmt.Sprintf("%s:%d|c\n", name, value) + s.Unlock() } func (s *testStatSink) FlushGauge(name string, value uint64) { + s.Lock() s.record += fmt.Sprintf("%s:%d|g\n", name, value) + s.Unlock() } func (s *testStatSink) FlushTimer(name string, value float64) { + s.Lock() s.record += fmt.Sprintf("%s:%f|ms\n", name, value) + s.Unlock() } func TestCreateTimer(t *testing.T) {