From 3adc8c2d00a6e5e5f8ed19cb6bfa48e8addc295a Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 13:25:04 -0700 Subject: [PATCH 1/4] upgrade dskit. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b1e21026878..a84b8815416 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa + github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index a500f0c2419..e6f92bf56ec 100644 --- a/go.sum +++ b/go.sum @@ -1256,8 +1256,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa h1:jfSQq+jfs1q0cZr9n4u9g6Wz3423VJVE9grnLpx+eS4= -github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1 h1:Po2a56CzhVeDbGo786TsdAaS87RG0ZelRkV7f0ilCs0= +github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009101240-fa97d35e871f h1:nsrRsQHfpqs6dWxErIOS3gD6R20H/9XM0ItykNtBFW8= From aed46ffa26a985893a3a9eb7ac9b3c07a057afc0 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 13:29:53 -0700 Subject: [PATCH 2/4] upgrade dskit --- go.mod | 2 +- go.sum | 4 +- .../dskit/kv/memberlist/memberlist_client.go | 88 ++++++++++++++--- .../dskit/kv/memberlist/tcp_transport.go | 99 +++++++++++++++---- vendor/github.com/grafana/dskit/ring/ring.go | 4 +- vendor/modules.txt | 2 +- 6 files changed, 164 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index a84b8815416..0ff5100a661 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1 + github.com/grafana/dskit v0.0.0-20241011202249-8e7752e59bab github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index e6f92bf56ec..67aaa10bb4f 100644 --- a/go.sum +++ b/go.sum @@ -1256,8 +1256,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1 h1:Po2a56CzhVeDbGo786TsdAaS87RG0ZelRkV7f0ilCs0= -github.com/grafana/dskit v0.0.0-20241011194340-a36b0ccbd9f1/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241011202249-8e7752e59bab h1:jrQwxBzwH5bjLUWXpaoTbq6BenhCCo2EjBLX10Hm0d4= +github.com/grafana/dskit v0.0.0-20241011202249-8e7752e59bab/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009101240-fa97d35e871f h1:nsrRsQHfpqs6dWxErIOS3gD6R20H/9XM0ItykNtBFW8= diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 1d96363fe3f..452798e04e7 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -137,6 +137,7 @@ type KVConfig struct { GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` + NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") + f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") @@ -251,6 +253,10 @@ type KV struct { watchers map[string][]chan string prefixWatchers map[string][]chan string + // Delayed notifications for watchers + notifMu sync.Mutex + keyNotifications map[string]struct{} + // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - workersChannels: make(map[string]chan valueUpdate), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + keyNotifications: make(map[string]struct{}), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error { return errFailedToJoinCluster } + if m.cfg.NotifyInterval > 0 { + // Start delayed key notifications. + notifTicker := time.NewTicker(m.cfg.NotifyInterval) + defer notifTicker.Stop() + go m.monitorKeyNotifications(ctx, notifTicker.C) + } + var tickerChan <-chan time.Time if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { t := time.NewTicker(m.cfg.RejoinInterval) @@ -905,7 +919,59 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st } } +// notifyWatchers sends notification to all watchers of given key. If delay is +// enabled, it accumulates them for later sending. func (m *KV) notifyWatchers(key string) { + if m.cfg.NotifyInterval <= 0 { + m.notifyWatchersSync(key) + return + } + + m.notifMu.Lock() + defer m.notifMu.Unlock() + m.keyNotifications[key] = struct{}{} +} + +// monitorKeyNotifications sends accumulated notifications to all watchers of +// respective keys when the given channel ticks. +func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { + if m.cfg.NotifyInterval <= 0 { + panic("sendNotifications called with NotifyInterval <= 0") + } + + for { + select { + case <-tickChan: + m.sendKeyNotifications() + case <-ctx.Done(): + return + } + } +} + +// sendKeyNotifications sends accumulated notifications to watchers of respective keys. +func (m *KV) sendKeyNotifications() { + newNotifs := func() map[string]struct{} { + // Grab and clear accumulated notifications. + m.notifMu.Lock() + defer m.notifMu.Unlock() + + if len(m.keyNotifications) == 0 { + return nil + } + newMap := make(map[string]struct{}) + notifs := m.keyNotifications + m.keyNotifications = newMap + return notifs + } + + for key := range newNotifs() { + m.notifyWatchersSync(key) + } +} + +// notifyWatcherSync immediately sends notification to all watchers of given key. +func (m *KV) notifyWatchersSync(key string) { m.watchersMu.Lock() defer m.watchersMu.Unlock() diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 751ad1163a9..2010d3919a2 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" @@ -52,7 +51,13 @@ type TCPTransportConfig struct { // Timeout for writing packet data. Zero = no timeout. PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` - // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on + // Maximum number of concurrent writes to other nodes. + MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"` + + // Timeout for acquiring one of the concurrent write slots. + AcquireWriterTimeout time.Duration `yaml:"acquire_writer_timeout" category:"advanced"` + + // Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-" category:"advanced"` // Where to put custom metrics. nil = don't register. @@ -73,12 +78,19 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 2*time.Second, "Timeout used when connecting to other nodes to send packet.") f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.") + f.IntVar(&cfg.MaxConcurrentWrites, prefix+"memberlist.max-concurrent-writes", 3, "Maximum number of concurrent writes to other nodes.") + f.DurationVar(&cfg.AcquireWriterTimeout, prefix+"memberlist.acquire-writer-timeout", 250*time.Millisecond, "Timeout for acquiring one of the concurrent write slots. After this time, the message will be dropped.") f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.") f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.") cfg.TLS.RegisterFlagsWithPrefix(prefix+"memberlist", f) } +type writeRequest struct { + b []byte + addr string +} + // TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream // operations ("packet" and "stream" are terms used by memberlist). // It uses a new TCP connections for each operation. There is no connection reuse. @@ -91,7 +103,11 @@ type TCPTransport struct { tcpListeners []net.Listener tlsConfig *tls.Config - shutdown atomic.Int32 + shutdownMu sync.RWMutex + shutdown bool + writeCh chan writeRequest // this channel is protected by shutdownMu + + writeWG sync.WaitGroup advertiseMu sync.RWMutex advertiseAddr string @@ -119,11 +135,21 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr // Build out the new transport. var ok bool + concurrentWrites := config.MaxConcurrentWrites + if concurrentWrites <= 0 { + concurrentWrites = 1 + } t := TCPTransport{ cfg: config, logger: log.With(logger, "component", "memberlist TCPTransport"), packetCh: make(chan *memberlist.Packet), connCh: make(chan net.Conn), + writeCh: make(chan writeRequest), + } + + for i := 0; i < concurrentWrites; i++ { + t.writeWG.Add(1) + go t.writeWorker() } var err error @@ -205,7 +231,10 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) { for { conn, err := tcpLn.Accept() if err != nil { - if s := t.shutdown.Load(); s == 1 { + t.shutdownMu.RLock() + isShuttingDown := t.shutdown + t.shutdownMu.RUnlock() + if isShuttingDown { break } @@ -424,29 +453,49 @@ func (t *TCPTransport) getAdvertisedAddr() string { // WriteTo is a packet-oriented interface that fires off the given // payload to the given address. func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { - t.sentPackets.Inc() - t.sentPacketsBytes.Add(float64(len(b))) + t.shutdownMu.RLock() + defer t.shutdownMu.RUnlock() // Unlock at the end to protect the chan + if t.shutdown { + return time.Time{}, errors.New("transport is shutting down") + } - err := t.writeTo(b, addr) - if err != nil { + // Send the packet to the write workers + // If this blocks for too long (as configured), abort and log an error. + select { + case <-time.After(t.cfg.AcquireWriterTimeout): + level.Warn(t.logger).Log("msg", "WriteTo failed to acquire a writer. Dropping message", "timeout", t.cfg.AcquireWriterTimeout, "addr", addr) t.sentPacketsErrors.Inc() - - logLevel := level.Warn(t.logger) - if strings.Contains(err.Error(), "connection refused") { - // The connection refused is a common error that could happen during normal operations when a node - // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. - logLevel = t.debugLog() - } - logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) - // WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors, // but memberlist library doesn't seem to cope with that very well. That is why we return nil instead. return time.Now(), nil + case t.writeCh <- writeRequest{b: b, addr: addr}: + // OK } return time.Now(), nil } +func (t *TCPTransport) writeWorker() { + defer t.writeWG.Done() + for req := range t.writeCh { + b, addr := req.b, req.addr + t.sentPackets.Inc() + t.sentPacketsBytes.Add(float64(len(b))) + err := t.writeTo(b, addr) + if err != nil { + t.sentPacketsErrors.Inc() + + logLevel := level.Warn(t.logger) + if strings.Contains(err.Error(), "connection refused") { + // The connection refused is a common error that could happen during normal operations when a node + // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. + logLevel = t.debugLog() + } + logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) + } + } +} + func (t *TCPTransport) writeTo(b []byte, addr string) error { // Open connection, write packet header and data, data hash, close. Simple. c, err := t.getConnection(addr, t.cfg.PacketDialTimeout) @@ -559,17 +608,31 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn { // Shutdown is called when memberlist is shutting down; this gives the // transport a chance to clean up any listeners. +// This will avoid log spam about errors when we shut down. func (t *TCPTransport) Shutdown() error { + t.shutdownMu.Lock() // This will avoid log spam about errors when we shut down. - t.shutdown.Store(1) + if t.shutdown { + t.shutdownMu.Unlock() + return nil // already shut down + } + + // Set the shutdown flag and close the write channel. + t.shutdown = true + close(t.writeCh) + t.shutdownMu.Unlock() // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { _ = conn.Close() } + // Wait until all write workers have finished. + t.writeWG.Wait() + // Block until all the listener threads have died. t.wg.Wait() + return nil } diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index c8db7da50c6..d47eb8fe256 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -215,13 +215,13 @@ type Ring struct { // Number of registered instances per zone. instancesCountPerZone map[string]int - // Nubmber of registered instances with tokens per zone. + // Number of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int // Number of registered instances are writable and have tokens. writableInstancesWithTokensCount int - // Nubmber of registered instances with tokens per zone that are writable. + // Number of registered instances with tokens per zone that are writable. writableInstancesWithTokensCountPerZone map[string]int // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. diff --git a/vendor/modules.txt b/vendor/modules.txt index 2eda84b5813..c295f412ba8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -611,7 +611,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa +# github.com/grafana/dskit v0.0.0-20241011202249-8e7752e59bab ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast From f9e994aa82af36ac65792f61261506b5c28d768a Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 13:39:44 -0700 Subject: [PATCH 3/4] Regenerate docs, reference-help. --- cmd/mimir/config-descriptor.json | 33 +++++++++++++++++++ cmd/mimir/help-all.txt.tmpl | 6 ++++ .../configuration-parameters/index.md | 14 ++++++++ 3 files changed, 53 insertions(+) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index b80c3b08b5c..056c87e0e17 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -16041,6 +16041,17 @@ "fieldType": "boolean", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "notify_interval", + "required": false, + "desc": "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "memberlist.notify-interval", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "advertise_addr", @@ -16233,6 +16244,28 @@ "fieldType": "duration", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "max_concurrent_writes", + "required": false, + "desc": "Maximum number of concurrent writes to other nodes.", + "fieldValue": null, + "fieldDefaultValue": 3, + "fieldFlag": "memberlist.max-concurrent-writes", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "acquire_writer_timeout", + "required": false, + "desc": "Timeout for acquiring one of the concurrent write slots. After this time, the message will be dropped.", + "fieldValue": null, + "fieldDefaultValue": 250000000, + "fieldFlag": "memberlist.acquire-writer-timeout", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "tls_enabled", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9b210d947bb..afed2ba684a 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1709,6 +1709,8 @@ Usage of ./cmd/mimir/mimir: Size of memory ballast to allocate. -memberlist.abort-if-join-fails If this node fails to join memberlist cluster, abort. + -memberlist.acquire-writer-timeout duration + Timeout for acquiring one of the concurrent write slots. After this time, the message will be dropped. (default 250ms) -memberlist.advertise-addr string Gossip address to advertise to other members in the cluster. Used for NAT traversal. -memberlist.advertise-port int @@ -1739,6 +1741,8 @@ Usage of ./cmd/mimir/mimir: Timeout for leaving memberlist cluster. (default 20s) -memberlist.left-ingesters-timeout duration How long to keep LEFT ingesters in the ring. (default 5m0s) + -memberlist.max-concurrent-writes int + Maximum number of concurrent writes to other nodes. (default 3) -memberlist.max-join-backoff duration Max backoff duration to join other cluster members. (default 1m0s) -memberlist.max-join-retries int @@ -1749,6 +1753,8 @@ Usage of ./cmd/mimir/mimir: Min backoff duration to join other cluster members. (default 1s) -memberlist.nodename string Name of the node in memberlist cluster. Defaults to hostname. + -memberlist.notify-interval duration + How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay. -memberlist.packet-dial-timeout duration Timeout used when connecting to other nodes to send packet. (default 2s) -memberlist.packet-write-timeout duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 5db94ddba41..4ab17090b42 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2967,6 +2967,11 @@ The `memberlist` block configures the Gossip memberlist. # CLI flag: -memberlist.compression-enabled [compression_enabled: | default = true] +# (advanced) How frequently to notify watchers when a key changes. Can reduce +# CPU activity in large memberlist deployments. 0 to notify without delay. +# CLI flag: -memberlist.notify-interval +[notify_interval: | default = 0s] + # Gossip address to advertise to other members in the cluster. Used for NAT # traversal. # CLI flag: -memberlist.advertise-addr @@ -3059,6 +3064,15 @@ The `memberlist` block configures the Gossip memberlist. # CLI flag: -memberlist.packet-write-timeout [packet_write_timeout: | default = 5s] +# (advanced) Maximum number of concurrent writes to other nodes. +# CLI flag: -memberlist.max-concurrent-writes +[max_concurrent_writes: | default = 3] + +# (advanced) Timeout for acquiring one of the concurrent write slots. After this +# time, the message will be dropped. +# CLI flag: -memberlist.acquire-writer-timeout +[acquire_writer_timeout: | default = 250ms] + # (advanced) Enable TLS on the memberlist transport layer. # CLI flag: -memberlist.tls-enabled [tls_enabled: | default = false] From 7e7b874754198e6ed87948655e66467ab64ecd8d Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 14:06:22 -0700 Subject: [PATCH 4/4] Update changelog. --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 899c6190749..e530644230b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,10 @@ * [ENHANCEMENT] Add SASL plain authentication support to Kafka client used by the experimental ingest storage. Configure SASL credentials via the following settings: #9584 * `-ingest-storage.kafka.sasl-password` * `-ingest-storage.kafka.sasl-username` +* [ENHANCEMENT] memberlist: TCP transport write path is now non-blocking, and is configurable by new flags: #9594 + * `-memberlist.max-concurrent-writes` + * `-memberlist.acquire-writer-timeout` +* [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508