Skip to content

Commit

Permalink
Distributor: do not propagate errors with non-utf8 characters (#10236)
Browse files Browse the repository at this point in the history
* Distributor: do not propagate errors with non-utf8 characters

Signed-off-by: Yuri Nikolic <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Yuri Nikolic <[email protected]>

* Lint ignore faillint for grpcstatus.FromError()

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Dec 15, 2024
1 parent dfc8917 commit 3410e68
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 2.15.0-rc.0

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145

Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,15 @@ func httpRetryableToOTLPRetryable(httpStatusCode int) int {
// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
validUTF8Msg := validUTF8Message(msg)
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
if server.IsHandledByHttpgrpcServer(reqCtx) {
w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
w.Header().Set(server.ErrorMessageHeaderKey, validUTF8Msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
}
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(status.New(grpcCode, msg).Proto())
respBytes, err := proto.Marshal(status.New(grpcCode, validUTF8Msg).Proto())
if err != nil {
level.Error(logger).Log("msg", "otlp response marshal failed", "err", err)
writeResponseFailedBody, _ := proto.Marshal(status.New(codes.Internal, "failed to marshal OTLP response").Proto())
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func handler(
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, code, retryCfg)
http.Error(w, msg, code)
http.Error(w, validUTF8Message(msg), code)
}
})
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,26 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...",
},

"invalid JSON with non-utf8 characters request returns 400": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
// \xf6 and \xd3 are not valid UTF8 characters, and they should be replaced with \UFFFD in the output.
Body: []byte("\n\xf6\x16\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 400,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|..."),
},
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|...",
},

"empty JSON is good request, with 200 status code": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Expand Down
14 changes: 13 additions & 1 deletion pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI
return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls))
} else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() {
m.invalidLabelValue.WithLabelValues(userID, group).Inc()
return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName)
return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, validUTF8Message(l.Value), unsafeMetricName)
} else if len(l.Value) > maxLabelValueLength {
m.labelValueTooLong.WithLabelValues(userID, group).Inc()
return fmt.Errorf(labelValueTooLongMsgFormat, l.Name, l.Value, mimirpb.FromLabelAdaptersToString(ls))
Expand Down Expand Up @@ -512,3 +512,15 @@ func getMetricAndEllipsis(ls []mimirpb.LabelAdapter) (string, string) {
}
return metric, ellipsis
}

// validUTF8ErrMessage ensures that the given message contains only valid utf8 characters.
// The presence of non-utf8 characters in some errors might break some crucial parts of distributor's logic.
// For example, if httpgrpc.HTTPServer.Handle() returns a httpgprc.Error containing a non-utf8 character,
// this error will not be propagated to httpgrpc.HTTPClient as a htttpgrpc.Error, but as a generic error,
// which might break some of Mimir internal logic.
// This is because golang's proto.Marshal(), which is used by gRPC internally, fails when it marshals the
// httpgrpc.Error containing non-utf8 character produced by httpgrpc.HTTPServer.Handle(), making the resulting
// error lose some important properties.
func validUTF8Message(msg string) string {
return strings.ToValidUTF8(msg, string(utf8.RuneError))
}
66 changes: 65 additions & 1 deletion pkg/distributor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ package distributor
import (
"errors"
"fmt"
"net/http"
"strings"
"testing"
"time"
"unicode/utf8"

"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
grpcstatus "google.golang.org/grpc/status"
golangproto "google.golang.org/protobuf/proto"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -212,7 +218,7 @@ func TestValidateLabels(t *testing.T) {
skipLabelCountValidation: false,
err: fmt.Errorf(
invalidLabelValueMsgFormat,
"label1", "abcdef", "foo",
"label1", "abc\ufffddef", "foo",
),
},
{
Expand Down Expand Up @@ -671,3 +677,61 @@ func tooManyLabelsArgs(series []mimirpb.LabelAdapter, limit int) []any {

return []any{len(series), limit, metric, ellipsis}
}

func TestValidUTF8Message(t *testing.T) {
testCases := map[string]struct {
body []byte
containsNonUTF8Characters bool
}{
"valid message returns no error": {
body: []byte("valid message"),
containsNonUTF8Characters: false,
},
"message containing only UTF8 characters returns no error": {
body: []byte("\n\ufffd\u0016\n\ufffd\u0002\n\u001D\n\u0011container.runtime\u0012\b\n\u0006docker\n'\n\u0012container.h"),
containsNonUTF8Characters: false,
},
"message containing non-UTF8 character returns an error": {
// \xf6 and \xd3 are not valid UTF8 characters.
body: []byte("\n\xf6\x1a\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"),
containsNonUTF8Characters: true,
},
}

for name, tc := range testCases {
for _, withValidation := range []bool{false, true} {
t.Run(fmt.Sprintf("%s withValidation: %v", name, withValidation), func(t *testing.T) {
msg := string(tc.body)
if withValidation {
msg = validUTF8Message(msg)
}
httpgrpcErr := httpgrpc.Error(http.StatusBadRequest, msg)

// gogo's proto.Marshal() correctly processes both httpgrpc errors with and without non-utf8 characters.
st, ok := grpcutil.ErrorToStatus(httpgrpcErr)
require.True(t, ok)
stBytes, err := proto.Marshal(st.Proto())
require.NoError(t, err)
require.NotNil(t, stBytes)

//lint:ignore faillint We want to explicitly use on grpcstatus.FromError()
grpcSt, ok := grpcstatus.FromError(httpgrpcErr)
require.True(t, ok)
stBytes, err = golangproto.Marshal(grpcSt.Proto())
if withValidation {
// Ensure that errors with validated messages can always be correctly marshaled.
require.NoError(t, err)
require.NotNil(t, stBytes)
} else {
if tc.containsNonUTF8Characters {
// Ensure that errors with non-validated non-utf8 messages cannot be correctly marshaled.
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, stBytes)
}
}
})
}
}
}

0 comments on commit 3410e68

Please sign in to comment.