Skip to content

[StatsD receiver] Change OpenCensus type to OpenTelemetry type for statsD receiver #2733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The Following settings are optional:

- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)

- `enable_metric_type: true`(default value is false): Enbale the statsd receiver to be able to emit the mertic type(gauge, counter, timer(in the future), histogram(in the future)) as a lable.
- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

Example:

Expand Down
67 changes: 67 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package protocol

import (
"time"

"go.opentelemetry.io/collector/consumer/pdata"
)

func buildCounterMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
dp := pdata.NewIntDataPoint()
dp.SetValue(parsedMetric.intvalue)
dp.SetTimestamp(pdata.TimestampFromTime(timeNow))
for i, key := range parsedMetric.labelKeys {
dp.LabelsMap().Insert(key, parsedMetric.labelValues[i])
}

nm := pdata.NewMetric()
nm.SetName(parsedMetric.description.name)
if parsedMetric.unit != "" {
nm.SetUnit(parsedMetric.unit)
}
nm.SetDataType(pdata.MetricDataTypeIntSum)
nm.IntSum().DataPoints().Append(dp)
nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
nm.IntSum().SetIsMonotonic(true)

ilm := pdata.NewInstrumentationLibraryMetrics()
ilm.Metrics().Append(nm)

return ilm
}

func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
dp := pdata.NewDoubleDataPoint()
dp.SetValue(parsedMetric.floatvalue)
dp.SetTimestamp(pdata.TimestampFromTime(timeNow))
for i, key := range parsedMetric.labelKeys {
dp.LabelsMap().Insert(key, parsedMetric.labelValues[i])
}

nm := pdata.NewMetric()
nm.SetName(parsedMetric.description.name)
if parsedMetric.unit != "" {
nm.SetUnit(parsedMetric.unit)
}
nm.SetDataType(pdata.MetricDataTypeDoubleGauge)
nm.DoubleGauge().DataPoints().Append(dp)

ilm := pdata.NewInstrumentationLibraryMetrics()
ilm.Metrics().Append(nm)

return ilm
}
76 changes: 76 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package protocol

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestBuildCounterMetric(t *testing.T) {
timeNow := time.Now()
metricDescription := statsDMetricdescription{
name: "testCounter",
}
parsedMetric := statsDMetric{
description: metricDescription,
intvalue: 32,
unit: "meter",
labelKeys: []string{"mykey"},
labelValues: []string{"myvalue"},
}
metric := buildCounterMetric(parsedMetric, timeNow)
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
expectedMetric.Metrics().Resize(1)
expectedMetric.Metrics().At(0).SetName("testCounter")
expectedMetric.Metrics().At(0).SetUnit("meter")
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeIntSum)
expectedMetric.Metrics().At(0).IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
expectedMetric.Metrics().At(0).IntSum().SetIsMonotonic(true)
expectedMetric.Metrics().At(0).IntSum().DataPoints().Resize(1)
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).SetValue(32)
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).LabelsMap().Insert("mykey", "myvalue")
assert.Equal(t, metric, expectedMetric)
}

func TestBuildGaugeMetric(t *testing.T) {
timeNow := time.Now()
metricDescription := statsDMetricdescription{
name: "testGauge",
}
parsedMetric := statsDMetric{
description: metricDescription,
floatvalue: 32.3,
unit: "meter",
labelKeys: []string{"mykey", "mykey2"},
labelValues: []string{"myvalue", "myvalue2"},
}
metric := buildGaugeMetric(parsedMetric, timeNow)
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
expectedMetric.Metrics().Resize(1)
expectedMetric.Metrics().At(0).SetName("testGauge")
expectedMetric.Metrics().At(0).SetUnit("meter")
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeDoubleGauge)
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().Resize(1)
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).SetValue(32.3)
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Insert("mykey", "myvalue")
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Insert("mykey2", "myvalue2")
assert.Equal(t, metric, expectedMetric)
}
4 changes: 2 additions & 2 deletions receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package protocol

import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"go.opentelemetry.io/collector/consumer/pdata"
)

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize(enableMetricType bool) error
GetMetrics() []*metricspb.Metric
GetMetrics() pdata.Metrics
Aggregate(line string) error
}
124 changes: 29 additions & 95 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
"strings"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand All @@ -39,8 +38,8 @@ const TagMetricType = "metric_type"

// StatsDParser supports the Parse method for parsing StatsD messages with Tags.
type StatsDParser struct {
gauges map[statsDMetricdescription]*metricspb.Metric
counters map[statsDMetricdescription]*metricspb.Metric
gauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
counters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
enableMetricType bool
}

Expand All @@ -51,10 +50,9 @@ type statsDMetric struct {
floatvalue float64
addition bool
unit string
metricType metricspb.MetricDescriptor_Type
sampleRate float64
labelKeys []*metricspb.LabelKey
labelValues []*metricspb.LabelValue
labelKeys []string
labelValues []string
}

type statsDMetricdescription struct {
Expand All @@ -64,32 +62,34 @@ type statsDMetricdescription struct {
}

func (p *StatsDParser) Initialize(enableMetricType bool) error {
p.gauges = make(map[statsDMetricdescription]*metricspb.Metric)
p.counters = make(map[statsDMetricdescription]*metricspb.Metric)
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.enableMetricType = enableMetricType
return nil
}

// get the metrics preparing for flushing and reset the state
func (p *StatsDParser) GetMetrics() []*metricspb.Metric {
var metrics []*metricspb.Metric
func (p *StatsDParser) GetMetrics() pdata.Metrics {
metrics := pdata.NewMetrics()
metrics.ResourceMetrics().Resize(1)
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Resize(0)

for _, metric := range p.gauges {
metrics = append(metrics, metric)
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(metric)
}

for _, metric := range p.counters {
metrics = append(metrics, metric)
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(metric)
}

p.gauges = make(map[statsDMetricdescription]*metricspb.Metric)
p.counters = make(map[statsDMetricdescription]*metricspb.Metric)
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)

return metrics
}

var timeNowFunc = func() int64 {
return time.Now().Unix()
var timeNowFunc = func() time.Time {
return time.Now()
}

//aggregate for each metric line
Expand All @@ -102,30 +102,25 @@ func (p *StatsDParser) Aggregate(line string) error {
case "g":
_, ok := p.gauges[parsedMetric.description]
if !ok {
metricPoint := buildPoint(parsedMetric)
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
} else {
if parsedMetric.addition {
savedValue := p.gauges[parsedMetric.description].GetTimeseries()[0].Points[0].GetDoubleValue()
savedValue := p.gauges[parsedMetric.description].Metrics().At(0).DoubleGauge().DataPoints().At(0).Value()
parsedMetric.floatvalue = parsedMetric.floatvalue + savedValue
metricPoint := buildPoint(parsedMetric)
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
} else {
metricPoint := buildPoint(parsedMetric)
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
}
}

case "c":
_, ok := p.counters[parsedMetric.description]
if !ok {
metricPoint := buildPoint(parsedMetric)
p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
} else {
savedValue := p.counters[parsedMetric.description].GetTimeseries()[0].Points[0].GetInt64Value()
savedValue := p.counters[parsedMetric.description].Metrics().At(0).IntSum().DataPoints().At(0).Value()
parsedMetric.intvalue = parsedMetric.intvalue + savedValue
metricPoint := buildPoint(parsedMetric)
p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
}
}

Expand Down Expand Up @@ -188,11 +183,8 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
if len(tagParts) != 2 {
return result, fmt.Errorf("invalid tag format: %s", tagParts)
}
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: tagParts[0]})
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
Value: tagParts[1],
HasValue: true,
})
result.labelKeys = append(result.labelKeys, tagParts[0])
result.labelValues = append(result.labelValues, tagParts[1])
kvs = append(kvs, attribute.String(tagParts[0], tagParts[1]))
}

Expand All @@ -207,7 +199,6 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
return result, fmt.Errorf("gauge: parse metric value string: %s", result.value)
}
result.floatvalue = f
result.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE
case "c":
f, err := strconv.ParseFloat(result.value, 64)
if err != nil {
Expand All @@ -218,7 +209,6 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
i = int64(f / result.sampleRate)
}
result.intvalue = i
result.metricType = metricspb.MetricDescriptor_GAUGE_INT64
}

// add metric_type dimension for all metrics
Expand All @@ -231,11 +221,9 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
case "c":
metricType = "counter"
}
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: TagMetricType})
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
Value: metricType,
HasValue: true,
})
result.labelKeys = append(result.labelKeys, TagMetricType)
result.labelValues = append(result.labelValues, metricType)

kvs = append(kvs, attribute.String(TagMetricType, metricType))
}

Expand All @@ -255,57 +243,3 @@ func contains(slice []string, element string) bool {
}
return false
}

func buildMetric(metric statsDMetric, point *metricspb.Point) *metricspb.Metric {
return &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: metric.description.name,
Type: metric.metricType,
LabelKeys: metric.labelKeys,
Unit: metric.unit,
},
Timeseries: []*metricspb.TimeSeries{
{
LabelValues: metric.labelValues,
Points: []*metricspb.Point{
point,
},
},
},
}
}

func buildPoint(parsedMetric statsDMetric) *metricspb.Point {
now := &timestamppb.Timestamp{
Seconds: timeNowFunc(),
}

switch parsedMetric.description.statsdMetricType {
case "c":
return buildCounterPoint(parsedMetric, now)
case "g":
return buildGaugePoint(parsedMetric, now)
}

return nil
}

func buildCounterPoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point {
point := &metricspb.Point{
Timestamp: now,
Value: &metricspb.Point_Int64Value{
Int64Value: parsedMetric.intvalue,
},
}
return point
}

func buildGaugePoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point {
point := &metricspb.Point{
Timestamp: now,
Value: &metricspb.Point_DoubleValue{
DoubleValue: parsedMetric.floatvalue,
},
}
return point
}
Loading