Skip to content

Commit

Permalink
adding tests, some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Feb 23, 2024
1 parent 01b4b70 commit 8d9070f
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint:

test:
mkdir -p $(COVERAGE_DIR)
go test -coverprofile=$(GO_COVER_FILE) ./...
go test -v -coverprofile=$(GO_COVER_FILE) ./...

cover:
go tool cover -func=$(GO_COVER_FILE)
37 changes: 26 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
![build status](https://github.com/acrlabs/prom2parquet/actions/workflows/verify.yml/badge.svg)

# prom2parquet

Remote write target for Prometheus that saves metrics to parquet files

**This should be considered an alpha project**
**⚠️ This should be considered an alpha project. ⚠️**
In particular, the schema for the saved Parquet files is likely to change in the future.

## Overview

Expand All @@ -25,30 +28,31 @@ Usage:
prom2parquet [flags]
Flags:
--clean-local-storage delete pod-local parquet files upon flush
--backend backend supported remote backends for saving parquet files
(valid options: none, s3/aws) (default local)
--backend-root string root path/location for the specified backend (e.g. bucket name for AWS S3)
(default "/data")
-h, --help help for prom2parquet
--prefix string directory prefix for saving parquet files
--remote remote supported remote endpoints for saving parquet files
(valid options: none, s3/aws) (default none)
-p, --server-port int port for the remote write endpoint to listen on (default 1234)
-v, --verbosity verbosity log level (valid options: debug, error, fatal, info, panic, trace, warning/warn)
(default info)
```

Here is a brief overview of the options:

### clean-local-storage
### backend

To reduce pod-local storage, you can configure prom2parquet to remove all parquet files after they've been written
(currently once per hour). This is generally not very useful unless you've also configured a remote storage option.
Where to store the Parquet files;; currently supports pod-local storage and AWS S3.

### prefix
### backend-root

This option provides a prefix that can be used to differentiate between metrics collections.
"Root" location for the backend storage. For pod-local storage this is the base directory, for AWS S3 this is the
bucket name.

### remote
### prefix

Whether to save the parquet files to some remote storage; currently the only supported remote storage option is AWS S3.
This option provides a prefix that can be used to differentiate between metrics collections.

### server-port

Expand Down Expand Up @@ -89,6 +93,17 @@ the executable, create and push the Docker images, and deploy to the configured
All build artifacts are placed in the `.build/` subdirectory. You can remove this directory or run `make clean` to
clean up.

### Testing

Run `make test` to run all the unit/integration tests. If you want to test using pod-local storage, and you want to
flush the Parquet files to disk without terminating the pod (e.g., so you can copy them elsewhere), you can send the
process a SIGUSR1:

```
> kubectl exec prom2parquet-pod -- kill -s SIGUSR1 <pid>
> kubectl cp prom2parquet-pod:/path/to/files ./
```

### Code of Conduct

Applied Computing Research Labs has a strict code of conduct we expect all contributors to adhere to. Please read the
Expand Down
2 changes: 1 addition & 1 deletion cmd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

//nolint:gochecknoglobals
var supportedBackendIDs = map[backends.StorageBackend][]string{
backends.Local: {"none"},
backends.Local: {"local"},
backends.S3: {"s3", "aws"},
}

Expand Down
52 changes: 29 additions & 23 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type promserver struct {
opts *options
channels map[string]chan prompb.TimeSeries

m sync.RWMutex
m sync.RWMutex
flushChannel chan os.Signal
killChannel chan os.Signal
}

func newServer(opts *options) *promserver {
Expand All @@ -37,18 +39,18 @@ func newServer(opts *options) *promserver {
httpserv: &http.Server{Addr: fulladdr, Handler: mux, ReadHeaderTimeout: 10 * time.Second},
opts: opts,
channels: map[string]chan prompb.TimeSeries{},

flushChannel: make(chan os.Signal, 1),
killChannel: make(chan os.Signal, 1),
}
mux.HandleFunc("/receive", s.metricsReceive)

return s
}

func (self *promserver) run() {
flushChannel := make(chan os.Signal, 1)
signal.Notify(flushChannel, syscall.SIGUSR1)

killChannel := make(chan os.Signal, 1)
signal.Notify(killChannel, syscall.SIGTERM)
signal.Notify(self.flushChannel, syscall.SIGUSR1)
signal.Notify(self.killChannel, syscall.SIGTERM)

endChannel := make(chan struct{}, 1)

Expand All @@ -59,15 +61,15 @@ func (self *promserver) run() {
}()

go func() {
<-killChannel
<-self.killChannel
self.handleShutdown()
close(endChannel)
}()

go func() {
<-flushChannel
log.Infof("SIGUSR1 received")
self.stopServer(true)
<-self.flushChannel
log.Infof("SIGUSR1 received; sleeping indefinitely")
self.stopServer()
}()

log.Infof("server listening on %s", self.httpserv.Addr)
Expand All @@ -82,15 +84,15 @@ func (self *promserver) handleShutdown() {
}
}()

self.stopServer(false)
self.stopServer()
timer := time.AfterFunc(shutdownTime, func() {
os.Exit(0)
})

<-timer.C
}

func (self *promserver) stopServer(stayAlive bool) {
func (self *promserver) stopServer() {
log.Infof("flushing all data files")
for _, ch := range self.channels {
close(ch)
Expand All @@ -101,11 +103,6 @@ func (self *promserver) stopServer(stayAlive bool) {
if err := self.httpserv.Shutdown(ctxTimeout); err != nil {
log.Errorf("failed shutting server down: %v", err)
}

if stayAlive {
log.Infof("sleeping indefinitely")
select {}
}
}

func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request) {
Expand All @@ -115,10 +112,18 @@ func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request)
return
}

for _, ts := range body.Timeseries {
if err := self.sendTimeseries(req.Context(), body.Timeseries); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

func (self *promserver) sendTimeseries(ctx context.Context, timeserieses []prompb.TimeSeries) (err error) {
for _, ts := range timeserieses {
// I'm not 100% sure which of these things would be recreated/shadowed below, so to be safe
// I'm just declaring everything upfront
var ch chan prompb.TimeSeries
var ok bool
var err error

nameLabel, _ := lo.Find(ts.Labels, func(i prompb.Label) bool { return i.Name == model.MetricNameLabel })
metricName := nameLabel.Value
Expand All @@ -130,15 +135,16 @@ func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request)
self.m.RUnlock()

if !ok {
ch, err = self.spawnWriter(req.Context(), metricName)
ch, err = self.spawnWriter(ctx, metricName)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return fmt.Errorf("could not spawn timeseries writer for %s: %w", metricName, err)
}
}

ch <- ts
}

return nil
}

func (self *promserver) spawnWriter(ctx context.Context, metricName string) (chan prompb.TimeSeries, error) {
Expand All @@ -159,7 +165,7 @@ func (self *promserver) spawnWriter(ctx context.Context, metricName string) (cha
ch := make(chan prompb.TimeSeries)
self.channels[metricName] = ch

go writer.Listen(ch)
go writer.Listen(ch) //nolint:contextcheck // the req context and the backend creation context should be separate

return ch, nil
}
140 changes: 140 additions & 0 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package main

import (
"context"
"strings"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

const metricName = "kube_node_stuff"

// These tests are actually super-hacky and brittle, but I'm not too sure how to do these tests a different way. It
// would be nice to not just be comparing log outputs to see which code paths were taken, for one thing...
//
// Also, these tests are sortof inherently race-y, as evidenced by this delightful constant:
const sleepTime = 100 * time.Millisecond

func TestServerRun(t *testing.T) {
cases := map[string]struct {
operations func(*promserver)
expectedLogEntries []string
forbiddenLogEntries []string
}{
"kill": {
operations: func(s *promserver) { close(s.killChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"shutting down",
},
forbiddenLogEntries: []string{
"SIGUSR1 received",
"recovered from panic",
},
},
"flush": {
operations: func(s *promserver) { close(s.flushChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"SIGUSR1 received",
},
forbiddenLogEntries: []string{
"shutting down",
"recovered from panic",
},
},
"flush then kill": {
operations: func(s *promserver) { close(s.flushChannel); time.Sleep(sleepTime); close(s.killChannel) },
expectedLogEntries: []string{
"server failed",
"flushing all data files",
"SIGUSR1 received",
"shutting down",
"recovered from panic",
},
forbiddenLogEntries: []string{},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
logs := test.NewGlobal()
srv := newServer(&options{})
srv.channels["foo"] = make(chan prompb.TimeSeries)
go srv.run()

tc.operations(srv)
time.Sleep(sleepTime)

entries := logs.AllEntries()

for _, expected := range tc.expectedLogEntries {
assert.GreaterOrEqual(t, len(lo.Filter(entries, func(e *log.Entry, _ int) bool {
return strings.Contains(e.Message, expected)
})), 1)
}

for _, forbidden := range tc.forbiddenLogEntries {
assert.Len(t, lo.Filter(entries, func(e *log.Entry, _ int) bool {
return strings.Contains(e.Message, forbidden)
}), 0)
}
})
}
}

func TestSendTimeseries(t *testing.T) {
srv := newServer(&options{})
srv.channels[metricName] = make(chan prompb.TimeSeries)

ts := prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: model.MetricNameLabel,
Value: metricName,
},
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "buz",
},
},
Samples: []prompb.Sample{
{
Value: 1.0,
Timestamp: 0,
},
{
Value: 2.0,
Timestamp: 1,
},
},
}

go func() {
err := srv.sendTimeseries(context.TODO(), []prompb.TimeSeries{ts})
assert.Nil(t, err)
}()

val := <-srv.channels[metricName]
assert.Equal(t, ts, val)
}

func TestSpawnWriter(t *testing.T) {
srv := newServer(&options{backend: backends.Memory})
_, err := srv.spawnWriter(context.TODO(), metricName)
assert.Nil(t, err)
assert.Contains(t, srv.channels, metricName)
}
2 changes: 0 additions & 2 deletions k8s/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def __init__(self):
args=[
"/prom2parquet",
"--prefix", "testing",
"--backend", "s3",
"--backend-root", "simkube",
],
).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG)

Expand Down
6 changes: 5 additions & 1 deletion pkg/backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ func ConstructBackendForFile( //nolint:ireturn // this is fine

case Memory:
log.Warnf("using in-memory backend, this is intended for testing only!")
fullPath, err := filepath.Abs(fmt.Sprintf("%s/%s", root, file))
if err != nil {
return nil, fmt.Errorf("can't construct local path %s/%s: %w", root, file, err)
}

fw, err := mem.NewMemFileWriter(file, nil)
fw, err := mem.NewMemFileWriter(fullPath, nil)
if err != nil {
return nil, fmt.Errorf("can't create in-memory writer: %w", err)
}
Expand Down
Loading

0 comments on commit 8d9070f

Please sign in to comment.