Skip to content

Commit

Permalink
configurable flush time
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Jul 27, 2024
1 parent 7933862 commit d5c3121
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 92 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ GO_COVER_FILE=$(COVERAGE_DIR)/go-coverage.txt
include build/base.mk
include build/k8s.mk

$(ARTIFACTS)::
CGO_ENABLED=0 go build -trimpath -o $(BUILD_DIR)/$@ ./cmd/.
main:
CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath -o $(BUILD_DIR)/prom2parquet ./cmd/.

lint:
golangci-lint run
Expand Down
2 changes: 1 addition & 1 deletion build
Submodule build updated 4 files
+8 −10 base.mk
+19 −0 docker_tag.sh
+0 −11 get_unclean_sha.sh
+10 −3 k8s.mk
20 changes: 11 additions & 9 deletions cmd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package main
import (
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/acrlabs/prom2parquet/pkg/backends"
)

const (
prefixFlag = "prefix"
serverPortFlag = "server-port"
backendFlag = "backend"
backendRootFlag = "backend-root"
verbosityFlag = "verbosity"
prefixFlag = "prefix"
serverPortFlag = "server-port"
flushIntervalFlag = "flush-interval"
backendFlag = "backend"
backendRootFlag = "backend-root"
verbosityFlag = "verbosity"
)

//nolint:gochecknoglobals
Expand All @@ -35,10 +37,10 @@ var logLevelIDs = map[log.Level][]string{
}

type options struct {
port int

backend backends.StorageBackend
backendRoot string
port int
flushInterval time.Duration
backend backends.StorageBackend
backendRoot string

verbosity log.Level
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -26,6 +27,13 @@ func rootCmd() *cobra.Command {
},
}

root.PersistentFlags().DurationVar(
&opts.flushInterval,
flushIntervalFlag,
10*time.Minute,
"data flush interval",
)

root.PersistentFlags().IntVarP(
&opts.port,
serverPortFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (self *promserver) spawnWriter(ctx context.Context, channelName string) (ch
self.opts.backendRoot,
channelName,
self.opts.backend,
self.opts.flushInterval,
)
if err != nil {
return nil, fmt.Errorf("could not create writer for %s: %w", channelName, err)
Expand Down
2 changes: 2 additions & 0 deletions k8s/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(self):
image=image,
args=[
"/prom2parquet",
"--backend", "s3",
"--backend-root", "simkube",
],
).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG)

Expand Down
114 changes: 65 additions & 49 deletions pkg/parquet/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
const pageNum = 4

type Prom2ParquetWriter struct {
backend backends.StorageBackend
root string
prefix string
backend backends.StorageBackend
root string
prefix string
flushInterval time.Duration

currentFile string
nextFlushTime time.Time
pw *writer.ParquetWriter
currentFile string
pw *writer.ParquetWriter

clock clockwork.Clock
}
Expand All @@ -43,30 +43,50 @@ func NewProm2ParquetWriter(
ctx context.Context,
root, prefix string,
backend backends.StorageBackend,
flushInterval time.Duration,
) (*Prom2ParquetWriter, error) {
return &Prom2ParquetWriter{
backend: backend,
root: root,
prefix: prefix,
backend: backend,
root: root,
prefix: prefix,
flushInterval: flushInterval,

clock: clockwork.NewRealClock(),
}, nil
}

func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) {
if err := self.flush(); err != nil {
log.Errorf("could not flush writer: %v", err)
self.listen(stream, self.getFlushTimer(), nil)
}

func (self *Prom2ParquetWriter) listen(
stream <-chan prompb.TimeSeries,
flushTimer <-chan time.Time,
running chan<- bool, // used for testing
) {
if err := self.createBackendWriter(); err != nil {
log.Errorf("could not create backend writer: %v", err)
return
}
defer self.closeFile()

flushTicker := time.NewTicker(time.Minute)
// self.pw is a pointer to the writer instance, but it can get switched
// out from under us whenever we flush; go defer evaluates the function
// args when the defer call happens, not when the deferred function actually
// executes, so here we need to use a double pointer so that we can make
// sure we're closing the actual correct writer instance
defer func(pw **writer.ParquetWriter) {
closeFile(*pw)
close(running)
}(&self.pw)

if running != nil {
running <- true
}

for {
select {
case ts, ok := <-stream:
if !ok {
self.closeFile()
return
}

Expand All @@ -79,36 +99,29 @@ func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) {
log.Errorf("could not write datapoint: %v", err)
}
}
case <-flushTicker.C:
if time.Now().After(self.nextFlushTime) {
log.Infof("Flush triggered: %v >= %v", time.Now(), self.nextFlushTime)
if err := self.flush(); err != nil {
log.Errorf("could not flush data: %v", err)
return
}
case <-flushTimer:
flushTimer = self.getFlushTimer()
log.Infof("flush triggered for %v", self.currentFile)

// Run this in a separate goroutine so that writing the data
// to S3 (with throttling or whatever) doesn't block the new incoming
// datapoints
go closeFile(self.pw)
if err := self.createBackendWriter(); err != nil {
log.Errorf("could not create backend writer: %v", err)
return
}
}
}
}

func (self *Prom2ParquetWriter) closeFile() {
if self.pw != nil {
if err := self.pw.WriteStop(); err != nil {
log.Errorf("can't close parquet writer: %v", err)
}
self.pw = nil
}
}

func (self *Prom2ParquetWriter) flush() error {
now := self.clock.Now().UTC()

self.closeFile()
func (self *Prom2ParquetWriter) createBackendWriter() error {
basename := self.now().Truncate(self.flushInterval).Format("20060102150405")
self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, basename)

self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, now.Format("2006010215"))
fw, err := backends.ConstructBackendForFile(self.root, self.currentFile, self.backend)
if err != nil {
return fmt.Errorf("can't create storage backend writer: %w", err)
return fmt.Errorf("can't create storage backend: %w", err)
}

pw, err := writer.NewParquetWriter(fw, new(DataPoint), pageNum)
Expand All @@ -118,21 +131,24 @@ func (self *Prom2ParquetWriter) flush() error {
pw.CompressionType = parquet.CompressionCodec_SNAPPY

self.pw = pw
self.advanceFlushTime(&now)

return nil
}

func (self *Prom2ParquetWriter) advanceFlushTime(now *time.Time) {
nextHour := now.Add(time.Hour)
self.nextFlushTime = time.Date(
nextHour.Year(),
nextHour.Month(),
nextHour.Day(),
nextHour.Hour(),
0,
0,
0,
nextHour.Location(),
)
func (self *Prom2ParquetWriter) getFlushTimer() <-chan time.Time {
now := self.now()
nextFlushTime := now.Truncate(self.flushInterval).Add(self.flushInterval)
return time.After(nextFlushTime.Sub(now))
}

func (self *Prom2ParquetWriter) now() time.Time {
return self.clock.Now().UTC()
}

func closeFile(pw *writer.ParquetWriter) {
if pw != nil {
if err := pw.WriteStop(); err != nil {
log.Errorf("can't close parquet writer: %v", err)
}
}
}
97 changes: 66 additions & 31 deletions pkg/parquet/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,91 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/prometheus/prometheus/prompb"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/xitongsys/parquet-go-source/mem"

"github.com/acrlabs/prom2parquet/pkg/backends"
)

func TestFlush(t *testing.T) {
fs := afero.NewMemMapFs()
mem.SetInMemFileFs(&fs)
func newTestProm2ParquetWriter(cl clockwork.Clock) *Prom2ParquetWriter {
return &Prom2ParquetWriter{
backend: backends.Memory,
root: "/test",
prefix: "prefix/kube_node_stuff",
flushInterval: 127 * time.Second,

w := Prom2ParquetWriter{
backend: backends.Memory,
root: "/test",
prefix: "prefix/kube_node_stuff",

clock: clockwork.NewFakeClockAt(time.Time{}),
}

err := w.flush()
assert.Nil(t, err)

exists, err := afero.Exists(fs, "/test/prefix/kube_node_stuff/0001010100.parquet")
if err != nil {
panic(err)
clock: cl,
}
assert.True(t, exists)
}

func TestAdvanceFlushTime(t *testing.T) {
w := Prom2ParquetWriter{}

func TestListen(t *testing.T) {
cases := map[string]struct {
now time.Time
expected time.Time
flush bool
expectedFiles []string
}{
"same day": {
now: time.Date(2024, 02, 20, 21, 34, 45, 0, time.UTC),
expected: time.Date(2024, 02, 20, 22, 0, 0, 0, time.UTC),
"no flush": {
expectedFiles: []string{
"/test/prefix/kube_node_stuff/00010101000000.parquet",
},
},
"next day": {
now: time.Date(2024, 02, 20, 23, 34, 45, 0, time.UTC),
expected: time.Date(2024, 02, 21, 0, 0, 0, 0, time.UTC),
"flush": {
flush: true,
expectedFiles: []string{
"/test/prefix/kube_node_stuff/00010101000000.parquet",
"/test/prefix/kube_node_stuff/00010101000207.parquet",
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
w.advanceFlushTime(&tc.now)
assert.Equal(t, tc.expected, w.nextFlushTime)
fs := afero.NewMemMapFs()
mem.SetInMemFileFs(&fs)

cl := clockwork.NewFakeClockAt(time.Time{})
w := newTestProm2ParquetWriter(cl)
stream := make(chan prompb.TimeSeries, 1)
flushTimer := make(chan time.Time, 1)
running := make(chan bool, 1)

go w.listen(stream, flushTimer, running)

// First block to make sure that all the setup is done (writer created, defer created)
<-running

if tc.flush {
cl.Advance(w.flushInterval + time.Second)
flushTimer <- w.clock.Now()
}

close(stream)

// Next block until the end of the defer block to ensure the final flush is complete
<-running

for _, filename := range tc.expectedFiles {
exists, err := afero.Exists(fs, filename)
if err != nil {
panic(err)
}
assert.True(t, exists)
}
})
}
}

func TestCreateBackendWriter(t *testing.T) {
w := newTestProm2ParquetWriter(clockwork.NewFakeClockAt(time.Date(2024, 3, 7, 10, 14, 30, 0, time.UTC)))
err := w.createBackendWriter()
assert.Nil(t, err)

// time.Truncate(d) returns the result of rounding down to the nearest multiple of d since the zero time.
// In this test, d = 127 seconds, and the zero time is always 0001-01-01T00:00:00Z. The given test time
// is 2024-03-07T10:14:30Z. There are 63845403270 seconds between the zero time and the test time;
// 63845403270 // 127 = 502719710, 502719710 * 127 = 63845403170, and 0001-01-01T00:00:00Z + 63845403170 seconds
// is 2024-03-07T10:12:50Z.
assert.Equal(t, w.currentFile, "prefix/kube_node_stuff/20240307101250.parquet")
assert.NotNil(t, w.pw)
}

0 comments on commit d5c3121

Please sign in to comment.