Skip to content

Commit

Permalink
Support remote write v2 by converting request
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 28, 2024
1 parent a78470b commit 5344155
Show file tree
Hide file tree
Showing 18 changed files with 1,365 additions and 115 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Support Prometheus remote write 2.0. #6330
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,11 @@ ha_tracker:
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
# request.
# CLI flag: -distributor.remote-write2-enabled
[remote_write2_enabled: <boolean> | default = false]
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Currently experimental features are:
- Distributor:
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
- Blocks storage bucket index
Expand Down
76 changes: 76 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
Expand Down Expand Up @@ -334,3 +335,78 @@ func CreateBlock(

return id, nil
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()

lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
for _, lbl := range additionalLabels {
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}
40 changes: 40 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"
Expand Down Expand Up @@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
var metricName string
attributes := make(map[string]any)
Expand Down Expand Up @@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

// Metadata runs a metadata query
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
Loading

0 comments on commit 5344155

Please sign in to comment.