Skip to content

Commit

Permalink
Finer-grained mutex control on Store (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodaine authored Apr 18, 2017
1 parent 3370670 commit 9c6c900
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 62 deletions.
10 changes: 5 additions & 5 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import "github.com/kelseyhightower/envconfig"
// variables to setup its settings.
type Settings struct {
// Use statsd as a stats sink.
UseStatsd bool `envconfig:"USE_STATSD" default:"true"`
UseStatsd bool `envconfig:"USE_STATSD" default:"true"`
// Address where statsd is running at.
StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"`
StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"`
// Port where statsd is listening at.
StatsdPort int `envconfig:"STATSD_PORT" default:"8125"`
// Flushing interval.
FlushIntervalS int `envconfig:"GOSTATS_FLUSH_INTERVAL_SECONDS" default:"5"`
StatsdPort int `envconfig:"STATSD_PORT" default:"8125"`
// Flushing interval.
FlushIntervalS int `envconfig:"GOSTATS_FLUSH_INTERVAL_SECONDS" default:"5"`
}

// GetSettings returns the Settings gostats will run with.
Expand Down
137 changes: 80 additions & 57 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ type timer struct {
sink Sink
}

func (t *timer) time(timev time.Duration) {
t.sink.FlushTimer(t.name, float64(timev/time.Microsecond))
func (t *timer) time(dur time.Duration) {
t.sink.FlushTimer(t.name, float64(dur/time.Microsecond))
}

func (t *timer) AddValue(value float64) {
Expand All @@ -289,23 +289,30 @@ func (ts *timespan) Complete() {
}

type statStore struct {
sync.Mutex
counters map[string]*counter
gauges map[string]*gauge
timers map[string]*timer
sink Sink
countersMtx sync.RWMutex
counters map[string]*counter

gaugesMtx sync.RWMutex
gauges map[string]*gauge

timersMtx sync.RWMutex
timers map[string]*timer

genMtx sync.RWMutex
statGenerators []StatGenerator
export bool

sink Sink
export bool
}

func (s *statStore) Flush() {
s.Lock()
defer s.Unlock()

s.genMtx.RLock()
for _, g := range s.statGenerators {
g.GenerateStats()
}
s.genMtx.RUnlock()

s.countersMtx.RLock()
for name, cv := range s.counters {
value := cv.latch()

Expand All @@ -316,26 +323,28 @@ func (s *statStore) Flush() {

s.sink.FlushCounter(name, value)
}
s.countersMtx.RUnlock()

s.gaugesMtx.RLock()
for name, gv := range s.gauges {
value := gv.Value()
s.sink.FlushGauge(name, value)
}
s.gaugesMtx.RUnlock()
}

func (s *statStore) Start(ticker *time.Ticker) {
s.run(ticker)
}

func (s *statStore) AddStatGenerator(statGenerator StatGenerator) {
s.Lock()
defer s.Unlock()

s.genMtx.Lock()
s.statGenerators = append(s.statGenerators, statGenerator)
s.genMtx.Unlock()
}

func (s *statStore) run(ticker *time.Ticker) {
for _ = range ticker.C {
for range ticker.C {
s.Flush()
}
}
Expand All @@ -344,33 +353,38 @@ func (s *statStore) Store() Store {
return s
}

func (registry *statStore) Scope(name string) Scope {
return subScope{registry: registry, name: name}
func (s *statStore) Scope(name string) Scope {
return subScope{registry: s, name: name}
}

func (registry *statStore) NewCounter(name string) Counter {
registry.Lock()
defer registry.Unlock()
func (s *statStore) NewCounter(name string) Counter {
s.countersMtx.RLock()
c, ok := s.counters[name]
s.countersMtx.RUnlock()

counterv, ok := registry.counters[name]
if ok {
return counterv
return c
} else {
counterv = &counter{}
registry.counters[name] = counterv
if registry.export && expvar.Get(name) == nil {
expvar.Publish(name, counterv)
c = &counter{}

s.countersMtx.Lock()
s.counters[name] = c
s.countersMtx.Unlock()

if s.export && expvar.Get(name) == nil {
expvar.Publish(name, c)
}
return counterv

return c
}
}

func (registry *statStore) NewCounterWithTags(name string, tags map[string]string) Counter {
func (s *statStore) NewCounterWithTags(name string, tags map[string]string) Counter {
serializedTags := serializeTags(tags)
return registry.NewCounter(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewCounter(fmt.Sprintf("%s%s", name, serializedTags))
}

func (registry *statStore) NewPerInstanceCounter(name string, tags map[string]string) Counter {
func (s *statStore) NewPerInstanceCounter(name string, tags map[string]string) Counter {
if tags == nil {
tags = make(map[string]string, 1)
}
Expand All @@ -379,32 +393,37 @@ func (registry *statStore) NewPerInstanceCounter(name string, tags map[string]st
tags["_f"] = "i"
}
serializedTags := serializeTags(tags)
return registry.NewCounter(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewCounter(fmt.Sprintf("%s%s", name, serializedTags))
}

func (registry *statStore) NewGauge(name string) Gauge {
registry.Lock()
defer registry.Unlock()
func (s *statStore) NewGauge(name string) Gauge {
s.gaugesMtx.RLock()
g, ok := s.gauges[name]
s.gaugesMtx.RUnlock()

gaugev, ok := registry.gauges[name]
if ok {
return gaugev
return g
} else {
gaugev = &gauge{}
registry.gauges[name] = gaugev
if registry.export && expvar.Get(name) == nil {
expvar.Publish(name, gaugev)
g = &gauge{}

s.gaugesMtx.Lock()
s.gauges[name] = g
s.gaugesMtx.Unlock()

if s.export && expvar.Get(name) == nil {
expvar.Publish(name, g)
}
return gaugev

return g
}
}

func (registry *statStore) NewGaugeWithTags(name string, tags map[string]string) Gauge {
func (s *statStore) NewGaugeWithTags(name string, tags map[string]string) Gauge {
serializedTags := serializeTags(tags)
return registry.NewGauge(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewGauge(fmt.Sprintf("%s%s", name, serializedTags))
}

func (registry *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gauge {
func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gauge {
if tags == nil {
tags = make(map[string]string, 1)
}
Expand All @@ -413,29 +432,33 @@ func (registry *statStore) NewPerInstanceGauge(name string, tags map[string]stri
tags["_f"] = "i"
}
serializedTags := serializeTags(tags)
return registry.NewGauge(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewGauge(fmt.Sprintf("%s%s", name, serializedTags))
}

func (registry *statStore) NewTimer(name string) Timer {
registry.Lock()
defer registry.Unlock()
func (s *statStore) NewTimer(name string) Timer {
s.timersMtx.RLock()
t, ok := s.timers[name]
s.timersMtx.RUnlock()

timerv, ok := registry.timers[name]
if ok {
return timerv
return t
} else {
timerv = &timer{name: name, sink: registry.sink}
registry.timers[name] = timerv
return timerv
t = &timer{name: name, sink: s.sink}

s.timersMtx.Lock()
s.timers[name] = t
s.timersMtx.Unlock()

return t
}
}

func (registry *statStore) NewTimerWithTags(name string, tags map[string]string) Timer {
func (s *statStore) NewTimerWithTags(name string, tags map[string]string) Timer {
serializedTags := serializeTags(tags)
return registry.NewTimer(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewTimer(fmt.Sprintf("%s%s", name, serializedTags))
}

func (registry *statStore) NewPerInstanceTimer(name string, tags map[string]string) Timer {
func (s *statStore) NewPerInstanceTimer(name string, tags map[string]string) Timer {
if tags == nil {
tags = make(map[string]string, 1)
}
Expand All @@ -444,7 +467,7 @@ func (registry *statStore) NewPerInstanceTimer(name string, tags map[string]stri
tags["_f"] = "i"
}
serializedTags := serializeTags(tags)
return registry.NewTimer(fmt.Sprintf("%s%s", name, serializedTags))
return s.NewTimer(fmt.Sprintf("%s%s", name, serializedTags))
}

func (s subScope) Scope(name string) Scope {
Expand Down
21 changes: 21 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package stats

import (
"math/rand"
"strconv"
"sync"
"testing"
"time"
)

// Ensure flushing and adding generators does not race
Expand All @@ -29,3 +32,21 @@ func TestStats(t *testing.T) {

wg.Wait()
}

var bmID = ""
var bmVal = uint64(0)

func BenchmarkStore_MutexContention(b *testing.B) {
s := NewStore(&nullSink{}, false)
t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics
defer t.Stop()
go s.Start(t)

b.ResetTimer()
for i := 0; i < b.N; i++ {
bmID = strconv.Itoa(rand.Intn(1000))
c := s.NewCounter(bmID)
c.Inc()
bmVal = c.Value()
}
}
8 changes: 8 additions & 0 deletions tcp_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@ package stats
import (
"fmt"
"strings"
"sync"
"testing"
)

type testStatSink struct {
sync.Mutex
record string
}

func (s *testStatSink) FlushCounter(name string, value uint64) {
s.Lock()
s.record += fmt.Sprintf("%s:%d|c\n", name, value)
s.Unlock()
}

func (s *testStatSink) FlushGauge(name string, value uint64) {
s.Lock()
s.record += fmt.Sprintf("%s:%d|g\n", name, value)
s.Unlock()
}

func (s *testStatSink) FlushTimer(name string, value float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms\n", name, value)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
Expand Down

0 comments on commit 9c6c900

Please sign in to comment.