diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a1aeef8f7e..7be69ae4fd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [ENHANCEMENT] Distributor: Add validation to ensure remote write v2 requests contain at least one sample or histogram. #7201 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 * [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145 diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index d15c5e51e2b..f00723aadff 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -197,6 +197,10 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni return v1Req, err } + if len(v2Ts.Samples) == 0 && len(v2Ts.Histograms) == 0 { + return v1Req, fmt.Errorf("TimeSeries must contain at least one sample or histogram for series %v", lbs.String()) + } + unit := symbols[v2Ts.Metadata.UnitRef] metricType := v2Ts.Metadata.Type shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "") diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index bf21863eea4..2fc2e478c84 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -452,30 +452,85 @@ func TestHandler_remoteWrite(t *testing.T) { flagext.DefaultValues(&limits) overrides := validation.NewOverrides(limits, nil) - t.Run("remote write v1", func(t *testing.T) { - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) - req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) - resp := httptest.NewRecorder() - handler.ServeHTTP(resp, req) - assert.Equal(t, http.StatusOK, resp.Code) - }) - t.Run("remote write v2", func(t *testing.T) { - ctx := context.Background() - ctx = user.InjectOrgID(ctx, "user-1") + tests := []struct { + name string + createBody func() ([]byte, bool) // returns (bodyBytes, isV2) + expectedStatus int + expectedBody string + verifyResponse func(resp *httptest.ResponseRecorder) + }{ + { + name: "remote write v1", + createBody: func() ([]byte, bool) { + return createPrometheusRemoteWriteProtobuf(t), false + }, + expectedStatus: http.StatusOK, + }, + { + name: "remote write v2", + createBody: func() ([]byte, bool) { + return createPrometheusRemoteWriteV2Protobuf(t), true + }, + expectedStatus: http.StatusNoContent, + verifyResponse: func(resp *httptest.ResponseRecorder) { + respHeader := resp.Header() + assert.Equal(t, "1", respHeader[rw20WrittenSamplesHeader][0]) + assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0]) + assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0]) + }, + }, + { + name: "remote write v2 with empty samples and histograms should return 400", + createBody: func() ([]byte, bool) { + // Create a request with a TimeSeries that has no samples and no histograms + reqProto := writev2.Request{ + Symbols: []string{"", "__name__", "foo"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Exemplars: []writev2.Exemplar{ + { + LabelsRefs: []uint32{}, + Value: 1.0, + Timestamp: time.Now().UnixMilli(), + }, + }, + }, + }, + } + reqBytes, err := reqProto.Marshal() + require.NoError(t, err) + return reqBytes, true + }, + expectedStatus: http.StatusBadRequest, + expectedBody: "TimeSeries must contain at least one sample or histogram for series {__name__=\"foo\"}", + }, + } - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) - req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) - req = req.WithContext(ctx) - resp := httptest.NewRecorder() - handler.ServeHTTP(resp, req) - assert.Equal(t, http.StatusNoContent, resp.Code) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) - // test header value - respHeader := resp.Header() - assert.Equal(t, "1", respHeader[rw20WrittenSamplesHeader][0]) - assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0]) - assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0]) - }) + body, isV2 := test.createBody() + req := createRequest(t, body, isV2) + req = req.WithContext(ctx) + + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + assert.Equal(t, test.expectedStatus, resp.Code) + + if test.expectedBody != "" { + assert.Contains(t, resp.Body.String(), test.expectedBody) + } + + if test.verifyResponse != nil { + test.verifyResponse(resp) + } + }) + } } func TestHandler_ContentTypeAndEncoding(t *testing.T) {