Skip to content

Convert Zipkin receiver and exporter to use OTLP and fix translation bugs #1446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4fd5004
fixed all Jaeger translation diffs from correctness tests
kbrockhoff Jun 27, 2020
9d8e891
fixed Jaeger translations where Resource is missing or empty
kbrockhoff Jun 27, 2020
23e5237
improve test coverage
kbrockhoff Jun 29, 2020
36d90b5
Merge branch 'master' into fix-jaeger-translations
kbrockhoff Jul 3, 2020
fd565c9
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 10, 2020
3d39683
initial dev of otlp to zipkin translation
kbrockhoff Jul 10, 2020
cab9be8
initial dev of otlp to zipkin translation
kbrockhoff Jul 11, 2020
76451ec
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 15, 2020
140bb5f
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 18, 2020
149ba26
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 22, 2020
6739625
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 22, 2020
b08ee12
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 25, 2020
bde1771
initial dev of zipkin v1 to otlp translation
kbrockhoff Jul 25, 2020
bd76ad3
change zipkin receiver and exporter to use internal traces
kbrockhoff Jul 25, 2020
85c3391
improve event/annotation translations
kbrockhoff Jul 25, 2020
e59740e
fix broken zipkin tests
kbrockhoff Jul 27, 2020
c43074f
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 28, 2020
65bffa2
convert zipken receiver/exporter to new factory signature
kbrockhoff Jul 28, 2020
1fb224b
fixed receiver config
kbrockhoff Jul 28, 2020
3101bc7
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 1, 2020
f3b7b75
add code from master
kbrockhoff Aug 1, 2020
8c536aa
fix broken test
kbrockhoff Aug 1, 2020
95168a1
fix PR requested changes
kbrockhoff Aug 1, 2020
c228643
translation fixes
kbrockhoff Aug 1, 2020
972ddac
improve perf
kbrockhoff Aug 3, 2020
2f19c3a
added more test coverage
kbrockhoff Aug 3, 2020
98d4a8b
changed timestamp validations to only check to nearest millisecond
kbrockhoff Aug 7, 2020
2cc7420
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 7, 2020
86cfabd
fix new lint issues and PR comments
kbrockhoff Aug 8, 2020
8b98e7b
improve timestamp validation
kbrockhoff Aug 8, 2020
f989072
improve timestamp validation
kbrockhoff Aug 8, 2020
d6a7585
fix missed timestamp validation
kbrockhoff Aug 8, 2020
daa24ae
added missing goldendata examples
kbrockhoff Aug 8, 2020
f749072
fix generator tests
kbrockhoff Aug 8, 2020
acdce2b
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 11, 2020
325ffbe
fix PR comments
kbrockhoff Aug 11, 2020
50616c8
improve test coverage
kbrockhoff Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions exporter/zipkinexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package zipkinexporter

import (
"context"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
)
Expand Down Expand Up @@ -48,4 +51,7 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, "zipkin/2", e1.(*Config).Name())
assert.Equal(t, "https://somedest:1234/api/v2/spans", e1.(*Config).Endpoint)
assert.Equal(t, "proto", e1.(*Config).Format)
params := component.ExporterCreateParams{Logger: zap.NewNop()}
_, err = factory.CreateTraceExporter(context.Background(), params, e1)
require.NoError(t, err)
}
8 changes: 5 additions & 3 deletions exporter/zipkinexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ const (
defaultServiceName string = "<missing service name>"
)

// NewFactory creates a factory for OTLP exporter.
// NewFactory creates a factory for Zipkin exporter
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
}

// CreateDefaultConfig creates the default configuration for exporter.
func createDefaultConfig() configmodels.Exporter {
return &Config{
ExporterSettings: configmodels.ExporterSettings{
Expand All @@ -60,12 +61,13 @@ func createDefaultConfig() configmodels.Exporter {
}
}

// CreateTraceExporter creates a trace exporter based on this config.
func createTraceExporter(
_ context.Context,
_ component.ExporterCreateParams,
cfg configmodels.Exporter,
config configmodels.Exporter,
) (component.TraceExporter, error) {
zc := cfg.(*Config)
zc := config.(*Config)

if zc.Endpoint == "" {
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/215
Expand Down
12 changes: 8 additions & 4 deletions exporter/zipkinexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,28 @@ import (
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateInstanceViaFactory(t *testing.T) {
cfg := createDefaultConfig()
factory := NewFactory()

cfg := factory.CreateDefaultConfig()
params := component.ExporterCreateParams{Logger: zap.NewNop()}

// Default config doesn't have default endpoint so creating from it should
// fail.
ze, err := createTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
ze, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.Error(t, err)
assert.Nil(t, ze)

// URL doesn't have a default value so set it directly.
zeCfg := cfg.(*Config)
zeCfg.Endpoint = "http://some.location.org:9411/api/v2/spans"
ze, err = createTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
ze, err = factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NotNil(t, ze)
}
53 changes: 53 additions & 0 deletions exporter/zipkinexporter/test_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkinexporter

import (
"encoding/json"
"testing"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/require"
)

func unmarshalZipkinSpanArrayToMap(t *testing.T, jsonStr string) map[zipkinmodel.ID]*zipkinmodel.SpanModel {
var i interface{}

err := json.Unmarshal([]byte(jsonStr), &i)
require.NoError(t, err)

results := make(map[zipkinmodel.ID]*zipkinmodel.SpanModel)

switch x := i.(type) {
case []interface{}:
for _, j := range x {
span := jsonToSpan(t, j)
results[span.ID] = span
}
default:
span := jsonToSpan(t, x)
results[span.ID] = span
}
return results
}

func jsonToSpan(t *testing.T, j interface{}) *zipkinmodel.SpanModel {
b, err := json.Marshal(j)
require.NoError(t, err)
span := &zipkinmodel.SpanModel{}
err = span.UnmarshalJSON(b)
require.NoError(t, err)
return span
}
29 changes: 9 additions & 20 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"fmt"
"net/http"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/internaldata"
"go.opentelemetry.io/collector/translator/trace/zipkin"
)

Expand All @@ -51,7 +49,7 @@ func newTraceExporter(config *Config) (component.TraceExporter, error) {
if err != nil {
return nil, err
}
zexp, err := exporterhelper.NewTraceExporter(config, ze.pushTraceData)
zexp, err := exporterhelper.NewTraceExporter(config, ze.PushTraceData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -83,39 +81,30 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) {
return ze, nil
}

func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) {
numSpans := td.SpanCount()
octds := internaldata.TraceDataToOC(td)

tbatch := make([]*zipkinmodel.SpanModel, 0, numSpans)
for _, octd := range octds {
for _, span := range octd.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(octd.Node, octd.Resource, span, ze.defaultServiceName)
if err != nil {
return numSpans, consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
tbatch = append(tbatch, zs)
}
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td pdata.Traces) (int, error) {
tbatch, err := zipkin.InternalTracesToZipkinSpans(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return numSpans, consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return numSpans, fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

resp, err := ze.client.Do(req)
if err != nil {
return numSpans, fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return numSpans, fmt.Errorf("failed the request with status code %d", resp.StatusCode)
return td.SpanCount(), fmt.Errorf("failed the request with status code %d", resp.StatusCode)
}
return 0, nil
}
45 changes: 29 additions & 16 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/testutil"
)
Expand All @@ -51,7 +53,7 @@ import (
// The rest of the fields should match up exactly
func TestZipkinExporter_roundtripJSON(t *testing.T) {
buf := new(bytes.Buffer)
var sizes []int64
sizes := []int64{}
cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s, _ := io.Copy(buf, r.Body)
sizes = append(sizes, s)
Expand All @@ -65,15 +67,16 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
},
Format: "json",
}
zexp, err := newTraceExporter(config)
tes, err := newTraceExporter(config)
assert.NoError(t, err)
require.NotNil(t, zexp)
require.NotNil(t, tes)

// The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use
// a mock to ensure that this happens as intended.
mzr := newMockZipkinReporter(cst.URL)

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumer{tes})
addr := testutil.GetAvailableLocalAddress(t)
cfg := &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Expand Down Expand Up @@ -114,7 +117,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
},
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010db",
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010dc",
"kind": "SERVER","name": "put",
"timestamp": 1472470996199000,"duration": 207000,
"localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"},
Expand All @@ -124,23 +127,31 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
{"timestamp": 1472470996403000,"value": "bar"}
],
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
}]
`, `
[{
},
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91386",
"id": "4d1e00c0db9010db",
"id": "4d1e00c0db9010dd",
"kind": "SERVER",
"name": "put",
"timestamp": 1472470996199000,
"duration": 207000
}]
`}
for i, s := range wants {
want := testutil.GenerateNormalizedJSON(t, s)
want := unmarshalZipkinSpanArrayToMap(t, s)
gotBytes := buf.Next(int(sizes[i]))
got := testutil.GenerateNormalizedJSON(t, string(gotBytes))
assert.Equal(t, want, got)
got := unmarshalZipkinSpanArrayToMap(t, string(gotBytes))
for id, expected := range want {
actual, ok := got[id]
assert.True(t, ok)
assert.Equal(t, expected.ID, actual.ID)
assert.Equal(t, expected.Name, actual.Name)
assert.Equal(t, expected.TraceID, actual.TraceID)
assert.Equal(t, expected.Timestamp, actual.Timestamp)
assert.Equal(t, expected.Duration, actual.Duration)
assert.Equal(t, expected.Kind, actual.Kind)
}
}
}

Expand All @@ -151,7 +162,7 @@ type mockZipkinReporter struct {
serializer zipkinreporter.SpanSerializer
}

var _ zipkinreporter.Reporter = (*mockZipkinReporter)(nil)
var _ (zipkinreporter.Reporter) = (*mockZipkinReporter)(nil)

func (r *mockZipkinReporter) Send(span zipkinmodel.SpanModel) {
r.batch = append(r.batch, &span)
Expand Down Expand Up @@ -235,7 +246,7 @@ const zipkinSpansJSONJavaLibrary = `
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91386",
"id": "4d1e00c0db9010db",
"id": "4d1e00c0db9010dc",
"kind": "SERVER",
"name": "put",
"timestamp": 1472470996199000,
Expand Down Expand Up @@ -267,7 +278,7 @@ const zipkinSpansJSONJavaLibrary = `
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91386",
"id": "4d1e00c0db9010db",
"id": "4d1e00c0db9010dd",
"kind": "SERVER",
"name": "put",
"timestamp": 1472470996199000,
Expand All @@ -283,7 +294,8 @@ func TestZipkinExporter_invalidFormat(t *testing.T) {
Format: "foobar",
}
f := NewFactory()
_, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config)
params := component.ExporterCreateParams{Logger: zap.NewNop()}
_, err := f.CreateTraceExporter(context.Background(), params, config)
require.Error(t, err)
}

Expand All @@ -304,7 +316,7 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
},
Format: "proto",
}
zexp, err := newTraceExporter(config)
tes, err := newTraceExporter(config)
require.NoError(t, err)

// The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use
Expand All @@ -314,6 +326,7 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
mzr.serializer = zipkinproto.SpanSerializer{}

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumer{tes})
port := testutil.GetAvailablePort(t)
cfg := &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Expand Down
1 change: 1 addition & 0 deletions internal/goldendataset/pict_tracing_input_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
ResourceK8sOnPrem PICTInputResource = "K8sOnPrem"
ResourceK8sCloud PICTInputResource = "K8sCloud"
ResourceFaas PICTInputResource = "Faas"
ResourceExec PICTInputResource = "Exec"
)

// Enumerates the number and kind of instrumentation library instances that can be generated.
Expand Down
13 changes: 13 additions & 0 deletions internal/goldendataset/resource_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func GenerateResource(rscID PICTInputResource) *otlpresource.Resource {
attrs = generateCloudK8sAttributes()
case ResourceFaas:
attrs = generateFassAttributes()
case ResourceExec:
attrs = generateExecAttributes()
default:
attrs = generateEmptyAttributes()
}
Expand Down Expand Up @@ -141,3 +143,14 @@ func generateFassAttributes() map[string]interface{} {
attrMap[conventions.AttributeCloudZone] = "us-central1-a"
return attrMap
}

func generateExecAttributes() map[string]interface{} {
attrMap := make(map[string]interface{})
attrMap[conventions.AttributeProcessExecutableName] = "otelcol"
attrMap[conventions.AttributeProcessCommandLine] =
"--config=/etc/otel-collector-config.yaml --mem-ballast-size-mib=683"
attrMap[conventions.AttributeProcessExecutablePath] = "/usr/local/bin/otelcol"
attrMap[conventions.AttributeProcessID] = 2020
attrMap[conventions.AttributeProcessOwner] = "otel"
return attrMap
}
2 changes: 1 addition & 1 deletion internal/goldendataset/resource_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func TestGenerateResource(t *testing.T) {
resourceIds := []PICTInputResource{ResourceNil, ResourceEmpty, ResourceVMOnPrem, ResourceVMCloud, ResourceK8sOnPrem,
ResourceK8sCloud, ResourceFaas}
ResourceK8sCloud, ResourceFaas, ResourceExec}
for _, rscID := range resourceIds {
rsc := GenerateResource(rscID)
if rscID == ResourceNil {
Expand Down
Loading