Skip to content

[Prometheus Remote Write Exporter] Handling Staleness flag from OTLP. #6679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 💡 Enhancements 💡

- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)

## 🛑 Breaking changes 🛑

## 🚀 New components 🚀
Expand Down
91 changes: 88 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -380,7 +381,20 @@ func Test_PushMetrics(t *testing.T) {

emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])

checkFunc := func(t *testing.T, r *http.Request, expected int) {
// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])

staleNaNSummaryBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSummary])

staleNaNIntGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntGauge])

staleNaNDoubleGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNDoubleGauge])

staleNaNIntSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntSum])

staleNaNSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSum])

checkFunc := func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
Expand All @@ -397,15 +411,19 @@ func Test_PushMetrics(t *testing.T) {
ok := proto.Unmarshal(dest, wr)
require.Nil(t, ok)
assert.EqualValues(t, expected, len(wr.Timeseries))
if isStaleMarker {
assert.True(t, value.IsStaleNaN(wr.Timeseries[0].Samples[0].Value))
}
}

tests := []struct {
name string
md *pdata.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int)
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
}{
{
"invalid_type_case",
Expand All @@ -414,6 +432,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"intSum_case",
Expand All @@ -422,6 +441,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"doubleSum_case",
Expand All @@ -430,6 +450,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"doubleGauge_case",
Expand All @@ -438,6 +459,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"intGauge_case",
Expand All @@ -446,6 +468,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"histogram_case",
Expand All @@ -454,6 +477,7 @@ func Test_PushMetrics(t *testing.T) {
12,
http.StatusAccepted,
false,
false,
},
{
"summary_case",
Expand All @@ -462,6 +486,7 @@ func Test_PushMetrics(t *testing.T) {
10,
http.StatusAccepted,
false,
false,
},
{
"unmatchedBoundBucketHist_case",
Expand All @@ -470,6 +495,7 @@ func Test_PushMetrics(t *testing.T) {
5,
http.StatusAccepted,
false,
false,
},
{
"5xx_case",
Expand All @@ -478,6 +504,7 @@ func Test_PushMetrics(t *testing.T) {
5,
http.StatusServiceUnavailable,
true,
false,
},
{
"emptyGauge_case",
Expand All @@ -486,6 +513,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptyCumulativeSum_case",
Expand All @@ -494,6 +522,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptyCumulativeHistogram_case",
Expand All @@ -502,6 +531,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptySummary_case",
Expand All @@ -510,14 +540,69 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"staleNaNIntGauge_case",
&staleNaNIntGaugeBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNDoubleGauge_case",
&staleNaNDoubleGaugeBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNIntSum_case",
&staleNaNIntSumBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNSum_case",
&staleNaNSumBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNHistogram_case",
&staleNaNHistogramBatch,
checkFunc,
6,
http.StatusAccepted,
false,
true,
},
{
"staleNaNSummary_case",
&staleNaNSummaryBatch,
checkFunc,
5,
http.StatusAccepted,
false,
true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tt.reqTestFunc != nil {
tt.reqTestFunc(t, r, tt.expectedTimeSeries)
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
}
w.WriteHeader(tt.httpResponseCode)
}))
Expand Down
27 changes: 26 additions & 1 deletion exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -329,6 +330,9 @@ func addSingleNumberDataPoint(pt pdata.NumberDataPoint, resource pdata.Resource,
case pdata.MetricValueTypeDouble:
sample.Value = pt.DoubleVal()
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sample.Value = math.Float64frombits(value.StaleNaN)
}
addSample(tsMap, sample, labels, metric)
}

Expand All @@ -344,6 +348,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: pt.Sum(),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sum.Value = math.Float64frombits(value.StaleNaN)
}

sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric)
Expand All @@ -353,6 +360,10 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(pt.Count()),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
count.Value = math.Float64frombits(value.StaleNaN)
}

countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric)

Expand All @@ -373,6 +384,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(cumulativeCount),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
bucket.Value = math.Float64frombits(value.StaleNaN)
}
Comment on lines +387 to +389
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the protocol in Prometheus that you do this for all the buckets?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it is documented, but the observed behavior is that the stale marker is produced for individual buckets that disappear. Thus I would expect that when the entire histogram disappears each bucket should be marked stale.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aneurysm9 FYI you need a followup on our proto/specification to ensure that we send the buckets definition when we set the flag as well, because this was not clear to me at least.

boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
labels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, boundStr)
sig := addSample(tsMap, bucket, labels, metric)
Expand All @@ -385,6 +399,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(cumulativeCount),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
infBucket.Value = math.Float64frombits(value.StaleNaN)
}
infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
sig := addSample(tsMap, infBucket, infLabels, metric)

Expand Down Expand Up @@ -431,7 +448,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: pt.Sum(),
Timestamp: time,
}

if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sum.Value = math.Float64frombits(value.StaleNaN)
}
sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric)

Expand All @@ -440,6 +459,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: float64(pt.Count()),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
count.Value = math.Float64frombits(value.StaleNaN)
}
countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric)

Expand All @@ -450,6 +472,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: qt.Value(),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
quantile.Value = math.Float64frombits(value.StaleNaN)
}
percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64)
qtlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName, quantileStr, percentileStr)
addSample(tsMap, quantile, qtlabels, metric)
Expand Down
Loading