Skip to content

Change OpenCensus receiver to the new interfaces #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -54,16 +55,16 @@ func TestCreateTraceExporter(t *testing.T) {
// exporter keeps trying to update its connection state in the background
// so unless there is a receiver enabled the stop call can return different
// results. Standing up a receiver to ensure that stop don't report errors.
rcvFactory := &opencensusreceiver.Factory{}
rcvFactory := opencensusreceiver.NewFactory()
require.NotNil(t, rcvFactory)
rcvCfg := rcvFactory.CreateDefaultConfig().(*opencensusreceiver.Config)
rcvCfg.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)

rcv, err := rcvFactory.CreateTraceReceiver(
context.Background(),
zap.NewNop(),
component.ReceiverCreateParams{Logger: zap.NewNop()},
rcvCfg,
new(exportertest.SinkTraceExporterOld))
new(exportertest.SinkTraceExporter))
require.NotNil(t, rcv)
require.Nil(t, err)
require.Nil(t, rcv.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
2 changes: 1 addition & 1 deletion receiver/opencensusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

Expand Down
41 changes: 20 additions & 21 deletions receiver/opencensusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,28 @@ package opencensusreceiver
import (
"context"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

const (
// The value of "type" key in configuration.
typeStr = "opencensus"
)

// Factory is the Factory for receiver.
type Factory struct {
}

// Type gets the type of the ocReceiver config created by this Factory.
func (f *Factory) Type() configmodels.Type {
return typeStr
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(createTraceReceiver),
receiverhelper.WithMetrics(createMetricsReceiver))
}

// CreateDefaultConfig creates the default configuration for receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
Expand All @@ -58,14 +55,13 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
func createTraceReceiver(
_ context.Context,
_ *zap.Logger,
_ component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumerOld,
nextConsumer consumer.TraceConsumer,
) (component.TraceReceiver, error) {
r, err := f.createReceiver(cfg)
r, err := createReceiver(cfg)
if err != nil {
return nil, err
}
Expand All @@ -75,10 +71,13 @@ func (f *Factory) CreateTraceReceiver(
return r, nil
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(_ context.Context, _ *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {

r, err := f.createReceiver(cfg)
func createMetricsReceiver(
_ context.Context,
_ component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
if err != nil {
return nil, err
}
Expand All @@ -88,7 +87,7 @@ func (f *Factory) CreateMetricsReceiver(_ context.Context, _ *zap.Logger, cfg co
return r, nil
}

func (f *Factory) createReceiver(cfg configmodels.Receiver) (*ocReceiver, error) {
func createReceiver(cfg configmodels.Receiver) (*ocReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
Expand Down
34 changes: 14 additions & 20 deletions receiver/opencensusreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -33,30 +34,28 @@ import (
)

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
cfg := createDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateReceiver(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
cfg := createDefaultConfig()

config := cfg.(*Config)
config.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)

tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := createTraceReceiver(context.Background(), params, cfg, nil)
assert.NotNil(t, tReceiver)
assert.NoError(t, err)

mReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, nil)
mReceiver, err := createMetricsReceiver(context.Background(), params, cfg, nil)
assert.NotNil(t, mReceiver)
assert.NoError(t, err)
}

func TestCreateTraceReceiver(t *testing.T) {
factory := Factory{}
defaultReceiverSettings := configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
Expand Down Expand Up @@ -109,26 +108,23 @@ func TestCreateTraceReceiver(t *testing.T) {
},
}
ctx := context.Background()
logger := zap.NewNop()
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(exportertest.SinkTraceExporterOld)
tr, err := factory.CreateTraceReceiver(ctx, logger, tt.cfg, sink)
tr, err := createTraceReceiver(ctx, params, tt.cfg, exportertest.NewNopTraceExporter())
if (err != nil) != tt.wantErr {
t.Errorf("factory.CreateTraceReceiver() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tr != nil {
err := tr.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Start() error = %v", err)
tr.Shutdown(context.Background())
require.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, tr.Shutdown(context.Background()))
}
})
}
}

func TestCreateMetricReceiver(t *testing.T) {
factory := Factory{}
defaultReceiverSettings := configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
Expand Down Expand Up @@ -188,19 +184,17 @@ func TestCreateMetricReceiver(t *testing.T) {
},
},
}
logger := zap.NewNop()
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(exportertest.SinkMetricsExporterOld)
tc, err := factory.CreateMetricsReceiver(context.Background(), logger, tt.cfg, sink)
tc, err := createMetricsReceiver(context.Background(), params, tt.cfg, exportertest.NewNopMetricsExporter())
if (err != nil) != tt.wantErr {
t.Errorf("factory.CreateMetricsReceiver() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tc != nil {
err := tc.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Start() error = %v", err)
tc.Shutdown(context.Background())
require.NoError(t, tc.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, tc.Shutdown(context.Background()))
}
})
}
Expand Down
7 changes: 4 additions & 3 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
)

// Receiver is the type used to handle metrics from OpenCensus exporters.
type Receiver struct {
agentmetricspb.UnimplementedMetricsServiceServer
instanceName string
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer
}

// New creates a new ocmetrics.Receiver reference.
func New(instanceName string, nextConsumer consumer.MetricsConsumerOld) (*Receiver, error) {
func New(instanceName string, nextConsumer consumer.MetricsConsumer) (*Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, md cons

var consumerErr error
if len(md.Metrics) > 0 {
consumerErr = ocr.nextConsumer.ConsumeMetricsData(ctx, md)
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}))
}

obsreport.EndMetricsReceiveOp(
Expand Down
19 changes: 13 additions & 6 deletions receiver/opencensusreceiver/ocmetrics/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/obsreport"
Expand All @@ -49,7 +50,7 @@ import (
// accept nodes from downstream sources, but if a node isn't specified in
// an exportMetrics request, assume it is from the last received and non-nil node.
func TestExportMultiplexing(t *testing.T) {
metricSink := new(exportertest.SinkMetricsExporterOld)
metricSink := new(exportertest.SinkMetricsExporter)

port, doneFn := ocReceiverOnGRPCServer(t, metricSink)
defer doneFn()
Expand Down Expand Up @@ -115,7 +116,10 @@ func TestExportMultiplexing(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
resultsMapping[nodeToKey(md.Node)] = append(resultsMapping[nodeToKey(md.Node)], md.Metrics...)
ocmds := pdatautil.MetricsToMetricsData(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
}
}

// First things first, we expect exactly 3 unique keys
Expand Down Expand Up @@ -158,7 +162,7 @@ func TestExportMultiplexing(t *testing.T) {
// The first message without a Node MUST be rejected and teardown the connection.
// See https://github.com/census-instrumentation/opencensus-service/issues/53
func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) {
metricSink := new(exportertest.SinkMetricsExporterOld)
metricSink := new(exportertest.SinkMetricsExporter)

port, doneFn := ocReceiverOnGRPCServer(t, metricSink)
defer doneFn()
Expand Down Expand Up @@ -232,7 +236,7 @@ func TestExportProtocolConformation_metricsInFirstMessage(t *testing.T) {
"https://github.com/census-instrumentation/opencensus-service/issues/225",
)

metricSink := new(exportertest.SinkMetricsExporterOld)
metricSink := new(exportertest.SinkMetricsExporter)

port, doneFn := ocReceiverOnGRPCServer(t, metricSink)
defer doneFn()
Expand All @@ -255,7 +259,10 @@ func TestExportProtocolConformation_metricsInFirstMessage(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
resultsMapping[nodeToKey(md.Node)] = append(resultsMapping[nodeToKey(md.Node)], md.Metrics...)
ocmds := pdatautil.MetricsToMetricsData(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
}
}

if g, w := len(resultsMapping), 1; g != w {
Expand Down Expand Up @@ -309,7 +316,7 @@ func nodeToKey(n *commonpb.Node) string {
return string(blob)
}

func ocReceiverOnGRPCServer(t *testing.T, sr consumer.MetricsConsumerOld) (int, func()) {
func ocReceiverOnGRPCServer(t *testing.T, sr consumer.MetricsConsumer) (int, func()) {
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)

Expand Down
6 changes: 3 additions & 3 deletions receiver/opencensusreceiver/octrace/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestEnsureRecordedMetrics(t *testing.T) {
require.NoError(t, err)
defer doneFn()

port, doneReceiverFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporterOld())
port, doneReceiverFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporter())
defer doneReceiverFn()

n := 20
Expand All @@ -66,7 +66,7 @@ func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) {
require.NoError(t, err)
defer doneFn()

port, doneFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporterOld())
port, doneFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporter())
defer doneFn()

n := 20
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
trace.RegisterExporter(ocSpansSaver)
defer trace.UnregisterExporter(ocSpansSaver)

port, doneFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporterOld())
port, doneFn := ocReceiverOnGRPCServer(t, exportertest.NewNopTraceExporter())
defer doneFn()

traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port)
Expand Down
7 changes: 4 additions & 3 deletions receiver/opencensusreceiver/octrace/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/translator/internaldata"
)

const (
Expand All @@ -39,12 +40,12 @@ const (
// Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct {
agenttracepb.UnimplementedTraceServiceServer
nextConsumer consumer.TraceConsumerOld
nextConsumer consumer.TraceConsumer
instanceName string
}

// New creates a new opencensus.Receiver reference.
func New(instanceName string, nextConsumer consumer.TraceConsumerOld, opts ...Option) (*Receiver, error) {
func New(instanceName string, nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, traceda
var err error
numSpans := len(tracedata.Spans)
if numSpans != 0 {
err = ocr.nextConsumer.ConsumeTraceData(ctx, tracedata)
err = ocr.nextConsumer.ConsumeTraces(ctx, internaldata.OCToTraceData(tracedata))
}

obsreport.EndTraceDataReceiveOp(ctx, receiverDataFormat, numSpans, err)
Expand Down
Loading