From d5c3121d7acde0bc9ff770c4995b69c59e8887e9 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Wed, 24 Jul 2024 15:24:39 -0700 Subject: [PATCH] configurable flush time --- Makefile | 4 +- build | 2 +- cmd/args.go | 20 ++++--- cmd/root.go | 8 +++ cmd/server.go | 1 + k8s/main.py | 2 + pkg/parquet/writer.go | 114 +++++++++++++++++++++---------------- pkg/parquet/writer_test.go | 97 +++++++++++++++++++++---------- 8 files changed, 156 insertions(+), 92 deletions(-) diff --git a/Makefile b/Makefile index 59edbd1..39d754c 100644 --- a/Makefile +++ b/Makefile @@ -5,8 +5,8 @@ GO_COVER_FILE=$(COVERAGE_DIR)/go-coverage.txt include build/base.mk include build/k8s.mk -$(ARTIFACTS):: - CGO_ENABLED=0 go build -trimpath -o $(BUILD_DIR)/$@ ./cmd/. +main: + CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath -o $(BUILD_DIR)/prom2parquet ./cmd/. lint: golangci-lint run diff --git a/build b/build index 9424a15..928b9f6 160000 --- a/build +++ b/build @@ -1 +1 @@ -Subproject commit 9424a1527722991c6db1ffefa4961a8825cf8782 +Subproject commit 928b9f67bb413d3968ea6f100a7e628afe0def6c diff --git a/cmd/args.go b/cmd/args.go index 309e208..efeaa4d 100644 --- a/cmd/args.go +++ b/cmd/args.go @@ -3,6 +3,7 @@ package main import ( "sort" "strings" + "time" log "github.com/sirupsen/logrus" @@ -10,11 +11,12 @@ import ( ) const ( - prefixFlag = "prefix" - serverPortFlag = "server-port" - backendFlag = "backend" - backendRootFlag = "backend-root" - verbosityFlag = "verbosity" + prefixFlag = "prefix" + serverPortFlag = "server-port" + flushIntervalFlag = "flush-interval" + backendFlag = "backend" + backendRootFlag = "backend-root" + verbosityFlag = "verbosity" ) //nolint:gochecknoglobals @@ -35,10 +37,10 @@ var logLevelIDs = map[log.Level][]string{ } type options struct { - port int - - backend backends.StorageBackend - backendRoot string + port int + flushInterval time.Duration + backend backends.StorageBackend + backendRoot string verbosity log.Level } diff --git a/cmd/root.go b/cmd/root.go index b8d9ab5..3391566 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -26,6 +27,13 @@ func rootCmd() *cobra.Command { }, } + root.PersistentFlags().DurationVar( + &opts.flushInterval, + flushIntervalFlag, + 10*time.Minute, + "data flush interval", + ) + root.PersistentFlags().IntVarP( &opts.port, serverPortFlag, diff --git a/cmd/server.go b/cmd/server.go index e4ed14a..f67d961 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -149,6 +149,7 @@ func (self *promserver) spawnWriter(ctx context.Context, channelName string) (ch self.opts.backendRoot, channelName, self.opts.backend, + self.opts.flushInterval, ) if err != nil { return nil, fmt.Errorf("could not create writer for %s: %w", channelName, err) diff --git a/k8s/main.py b/k8s/main.py index 03690d6..fb40f6a 100755 --- a/k8s/main.py +++ b/k8s/main.py @@ -27,6 +27,8 @@ def __init__(self): image=image, args=[ "/prom2parquet", + "--backend", "s3", + "--backend-root", "simkube", ], ).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG) diff --git a/pkg/parquet/writer.go b/pkg/parquet/writer.go index 746c75e..2360cfc 100644 --- a/pkg/parquet/writer.go +++ b/pkg/parquet/writer.go @@ -17,13 +17,13 @@ import ( const pageNum = 4 type Prom2ParquetWriter struct { - backend backends.StorageBackend - root string - prefix string + backend backends.StorageBackend + root string + prefix string + flushInterval time.Duration - currentFile string - nextFlushTime time.Time - pw *writer.ParquetWriter + currentFile string + pw *writer.ParquetWriter clock clockwork.Clock } @@ -43,30 +43,50 @@ func NewProm2ParquetWriter( ctx context.Context, root, prefix string, backend backends.StorageBackend, + flushInterval time.Duration, ) (*Prom2ParquetWriter, error) { return &Prom2ParquetWriter{ - backend: backend, - root: root, - prefix: prefix, + backend: backend, + root: root, + prefix: prefix, + flushInterval: flushInterval, clock: clockwork.NewRealClock(), }, nil } func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) { - if err := self.flush(); err != nil { - log.Errorf("could not flush writer: %v", err) + self.listen(stream, self.getFlushTimer(), nil) +} + +func (self *Prom2ParquetWriter) listen( + stream <-chan prompb.TimeSeries, + flushTimer <-chan time.Time, + running chan<- bool, // used for testing +) { + if err := self.createBackendWriter(); err != nil { + log.Errorf("could not create backend writer: %v", err) return } - defer self.closeFile() - flushTicker := time.NewTicker(time.Minute) + // self.pw is a pointer to the writer instance, but it can get switched + // out from under us whenever we flush; go defer evaluates the function + // args when the defer call happens, not when the deferred function actually + // executes, so here we need to use a double pointer so that we can make + // sure we're closing the actual correct writer instance + defer func(pw **writer.ParquetWriter) { + closeFile(*pw) + close(running) + }(&self.pw) + + if running != nil { + running <- true + } for { select { case ts, ok := <-stream: if !ok { - self.closeFile() return } @@ -79,36 +99,29 @@ func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) { log.Errorf("could not write datapoint: %v", err) } } - case <-flushTicker.C: - if time.Now().After(self.nextFlushTime) { - log.Infof("Flush triggered: %v >= %v", time.Now(), self.nextFlushTime) - if err := self.flush(); err != nil { - log.Errorf("could not flush data: %v", err) - return - } + case <-flushTimer: + flushTimer = self.getFlushTimer() + log.Infof("flush triggered for %v", self.currentFile) + + // Run this in a separate goroutine so that writing the data + // to S3 (with throttling or whatever) doesn't block the new incoming + // datapoints + go closeFile(self.pw) + if err := self.createBackendWriter(); err != nil { + log.Errorf("could not create backend writer: %v", err) + return } } } } -func (self *Prom2ParquetWriter) closeFile() { - if self.pw != nil { - if err := self.pw.WriteStop(); err != nil { - log.Errorf("can't close parquet writer: %v", err) - } - self.pw = nil - } -} - -func (self *Prom2ParquetWriter) flush() error { - now := self.clock.Now().UTC() - - self.closeFile() +func (self *Prom2ParquetWriter) createBackendWriter() error { + basename := self.now().Truncate(self.flushInterval).Format("20060102150405") + self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, basename) - self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, now.Format("2006010215")) fw, err := backends.ConstructBackendForFile(self.root, self.currentFile, self.backend) if err != nil { - return fmt.Errorf("can't create storage backend writer: %w", err) + return fmt.Errorf("can't create storage backend: %w", err) } pw, err := writer.NewParquetWriter(fw, new(DataPoint), pageNum) @@ -118,21 +131,24 @@ func (self *Prom2ParquetWriter) flush() error { pw.CompressionType = parquet.CompressionCodec_SNAPPY self.pw = pw - self.advanceFlushTime(&now) return nil } -func (self *Prom2ParquetWriter) advanceFlushTime(now *time.Time) { - nextHour := now.Add(time.Hour) - self.nextFlushTime = time.Date( - nextHour.Year(), - nextHour.Month(), - nextHour.Day(), - nextHour.Hour(), - 0, - 0, - 0, - nextHour.Location(), - ) +func (self *Prom2ParquetWriter) getFlushTimer() <-chan time.Time { + now := self.now() + nextFlushTime := now.Truncate(self.flushInterval).Add(self.flushInterval) + return time.After(nextFlushTime.Sub(now)) +} + +func (self *Prom2ParquetWriter) now() time.Time { + return self.clock.Now().UTC() +} + +func closeFile(pw *writer.ParquetWriter) { + if pw != nil { + if err := pw.WriteStop(); err != nil { + log.Errorf("can't close parquet writer: %v", err) + } + } } diff --git a/pkg/parquet/writer_test.go b/pkg/parquet/writer_test.go index f6b7836..4109d09 100644 --- a/pkg/parquet/writer_test.go +++ b/pkg/parquet/writer_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/prometheus/prometheus/prompb" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/xitongsys/parquet-go-source/mem" @@ -12,49 +13,83 @@ import ( "github.com/acrlabs/prom2parquet/pkg/backends" ) -func TestFlush(t *testing.T) { - fs := afero.NewMemMapFs() - mem.SetInMemFileFs(&fs) +func newTestProm2ParquetWriter(cl clockwork.Clock) *Prom2ParquetWriter { + return &Prom2ParquetWriter{ + backend: backends.Memory, + root: "/test", + prefix: "prefix/kube_node_stuff", + flushInterval: 127 * time.Second, - w := Prom2ParquetWriter{ - backend: backends.Memory, - root: "/test", - prefix: "prefix/kube_node_stuff", - - clock: clockwork.NewFakeClockAt(time.Time{}), - } - - err := w.flush() - assert.Nil(t, err) - - exists, err := afero.Exists(fs, "/test/prefix/kube_node_stuff/0001010100.parquet") - if err != nil { - panic(err) + clock: cl, } - assert.True(t, exists) } -func TestAdvanceFlushTime(t *testing.T) { - w := Prom2ParquetWriter{} - +func TestListen(t *testing.T) { cases := map[string]struct { - now time.Time - expected time.Time + flush bool + expectedFiles []string }{ - "same day": { - now: time.Date(2024, 02, 20, 21, 34, 45, 0, time.UTC), - expected: time.Date(2024, 02, 20, 22, 0, 0, 0, time.UTC), + "no flush": { + expectedFiles: []string{ + "/test/prefix/kube_node_stuff/00010101000000.parquet", + }, }, - "next day": { - now: time.Date(2024, 02, 20, 23, 34, 45, 0, time.UTC), - expected: time.Date(2024, 02, 21, 0, 0, 0, 0, time.UTC), + "flush": { + flush: true, + expectedFiles: []string{ + "/test/prefix/kube_node_stuff/00010101000000.parquet", + "/test/prefix/kube_node_stuff/00010101000207.parquet", + }, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { - w.advanceFlushTime(&tc.now) - assert.Equal(t, tc.expected, w.nextFlushTime) + fs := afero.NewMemMapFs() + mem.SetInMemFileFs(&fs) + + cl := clockwork.NewFakeClockAt(time.Time{}) + w := newTestProm2ParquetWriter(cl) + stream := make(chan prompb.TimeSeries, 1) + flushTimer := make(chan time.Time, 1) + running := make(chan bool, 1) + + go w.listen(stream, flushTimer, running) + + // First block to make sure that all the setup is done (writer created, defer created) + <-running + + if tc.flush { + cl.Advance(w.flushInterval + time.Second) + flushTimer <- w.clock.Now() + } + + close(stream) + + // Next block until the end of the defer block to ensure the final flush is complete + <-running + + for _, filename := range tc.expectedFiles { + exists, err := afero.Exists(fs, filename) + if err != nil { + panic(err) + } + assert.True(t, exists) + } }) } } + +func TestCreateBackendWriter(t *testing.T) { + w := newTestProm2ParquetWriter(clockwork.NewFakeClockAt(time.Date(2024, 3, 7, 10, 14, 30, 0, time.UTC))) + err := w.createBackendWriter() + assert.Nil(t, err) + + // time.Truncate(d) returns the result of rounding down to the nearest multiple of d since the zero time. + // In this test, d = 127 seconds, and the zero time is always 0001-01-01T00:00:00Z. The given test time + // is 2024-03-07T10:14:30Z. There are 63845403270 seconds between the zero time and the test time; + // 63845403270 // 127 = 502719710, 502719710 * 127 = 63845403170, and 0001-01-01T00:00:00Z + 63845403170 seconds + // is 2024-03-07T10:12:50Z. + assert.Equal(t, w.currentFile, "prefix/kube_node_stuff/20240307101250.parquet") + assert.NotNil(t, w.pw) +}