Skip to content

Commit 559129a

Browse files
committed
EMF: Store the initial value for cumulative metrics and skip sending to backend
1 parent 858456f commit 559129a

8 files changed

+430
-351
lines changed

exporter/awsemfexporter/datapoint.go

+45-17
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, timestampMs t
3232
countDelta := metricEntry.count
3333
if prev != nil {
3434
prevSummaryEntry := prev.RawValue.(summaryMetricEntry)
35-
summaryDelta = summaryDelta - prevSummaryEntry.sum
36-
countDelta = countDelta - prevSummaryEntry.count
35+
summaryDelta = metricEntry.sum - prevSummaryEntry.sum
36+
countDelta = metricEntry.count - prevSummaryEntry.count
37+
} else {
38+
return summaryMetricEntry{summaryDelta, countDelta}, false
3739
}
3840
return summaryMetricEntry{summaryDelta, countDelta}, true
3941
}
@@ -53,8 +55,11 @@ type DataPoint struct {
5355
// - pdata.SummaryDataPointSlice
5456
type DataPoints interface {
5557
Len() int
56-
// NOTE: At() is an expensive call as it calculates the metric's value
57-
At(i int) DataPoint
58+
// At gets the adjusted datapoint from the DataPointSlice at i-th index.
59+
// dataPoint: the adjusted data point
60+
// retained: indicates whether the data point is valid for further process
61+
// NOTE: It is an expensive call as it calculates the metric value.
62+
At(i int) (dataPoint DataPoint, retained bool)
5863
}
5964

6065
// deltaMetricMetadata contains the metadata required to perform rate/delta calculation
@@ -112,49 +117,67 @@ type summaryMetricEntry struct {
112117
}
113118

114119
// At retrieves the IntDataPoint at the given index and performs rate/delta calculation if necessary.
115-
func (dps IntDataPointSlice) At(i int) DataPoint {
120+
func (dps IntDataPointSlice) At(i int) (DataPoint, bool) {
116121
metric := dps.IntDataPointSlice.At(i)
117122
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
118123
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
119124

120125
var metricVal float64
121126
metricVal = float64(metric.Value())
127+
retained := true
122128
if dps.adjustToDelta {
123-
deltaVal, _ := deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
129+
var deltaVal interface{}
130+
deltaVal, retained = deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
124131
metricVal, metric.Timestamp().AsTime())
125-
metricVal = deltaVal.(float64)
132+
if !retained {
133+
return DataPoint{}, retained
134+
}
135+
// It should not happen in practice that the previous metric value is smaller than the current one.
136+
// If it happens, we assume that the metric is reset for some reason.
137+
if deltaVal.(float64) >= 0 {
138+
metricVal = deltaVal.(float64)
139+
}
126140
}
127141

128142
return DataPoint{
129143
Value: metricVal,
130144
Labels: labels,
131145
TimestampMs: timestampMs,
132-
}
146+
}, retained
133147
}
134148

135149
// At retrieves the DoubleDataPoint at the given index and performs rate/delta calculation if necessary.
136-
func (dps DoubleDataPointSlice) At(i int) DataPoint {
150+
func (dps DoubleDataPointSlice) At(i int) (DataPoint, bool) {
137151
metric := dps.DoubleDataPointSlice.At(i)
138152
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
139153
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
140154

141155
var metricVal float64
142156
metricVal = metric.Value()
157+
retained := true
143158
if dps.adjustToDelta {
144-
deltaVal, _ := deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
159+
var deltaVal interface{}
160+
deltaVal, retained = deltaMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
145161
metricVal, metric.Timestamp().AsTime())
146-
metricVal = deltaVal.(float64)
162+
if !retained {
163+
return DataPoint{}, retained
164+
}
165+
// It should not happen in practice that the previous metric value is smaller than the current one.
166+
// If it happens, we assume that the metric is reset for some reason.
167+
if deltaVal.(float64) >= 0 {
168+
metricVal = deltaVal.(float64)
169+
}
147170
}
148171

149172
return DataPoint{
150173
Value: metricVal,
151174
Labels: labels,
152175
TimestampMs: timestampMs,
153-
}
176+
}, retained
154177
}
155178

156179
// At retrieves the HistogramDataPoint at the given index.
157-
func (dps HistogramDataPointSlice) At(i int) DataPoint {
180+
func (dps HistogramDataPointSlice) At(i int) (DataPoint, bool) {
158181
metric := dps.HistogramDataPointSlice.At(i)
159182
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
160183
timestamp := unixNanoToMilliseconds(metric.Timestamp())
@@ -166,20 +189,25 @@ func (dps HistogramDataPointSlice) At(i int) DataPoint {
166189
},
167190
Labels: labels,
168191
TimestampMs: timestamp,
169-
}
192+
}, true
170193
}
171194

172195
// At retrieves the SummaryDataPoint at the given index.
173-
func (dps SummaryDataPointSlice) At(i int) DataPoint {
196+
func (dps SummaryDataPointSlice) At(i int) (DataPoint, bool) {
174197
metric := dps.SummaryDataPointSlice.At(i)
175198
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
176199
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
177200

178201
sum := metric.Sum()
179202
count := metric.Count()
203+
retained := true
180204
if dps.adjustToDelta {
181-
delta, _ := summaryMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
205+
var delta interface{}
206+
delta, retained = summaryMetricCalculator.Calculate(dps.metricName, mergeLabels(dps.deltaMetricMetadata, labels),
182207
summaryMetricEntry{metric.Sum(), metric.Count()}, metric.Timestamp().AsTime())
208+
if !retained {
209+
return DataPoint{}, retained
210+
}
183211
summaryMetricDelta := delta.(summaryMetricEntry)
184212
sum = summaryMetricDelta.sum
185213
count = summaryMetricDelta.count
@@ -198,7 +226,7 @@ func (dps SummaryDataPointSlice) At(i int) DataPoint {
198226
Value: metricVal,
199227
Labels: labels,
200228
TimestampMs: timestampMs,
201-
}
229+
}, retained
202230
}
203231

204232
// createLabels converts OTel StringMap labels to a map

0 commit comments

Comments
 (0)