Skip to content

Commit 47e384d

Browse files
djaglowskijaronoff97
authored andcommitted
Add forward connector (open-telemetry#6763)
1 parent 8dcd76d commit 47e384d

File tree

10 files changed

+740
-0
lines changed

10 files changed

+740
-0
lines changed

.chloggen/forwardconnector.yaml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: new_component
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: forwardconnector
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add forward connector
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [6763]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

.github/dependabot.yml

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ updates:
2626
directory: "/confmap"
2727
schedule:
2828
interval: "weekly"
29+
- package-ecosystem: "gomod"
30+
directory: "/connector/forwardconnector"
31+
schedule:
32+
interval: "weekly"
2933
- package-ecosystem: "gomod"
3034
directory: "/consumer"
3135
schedule:

connector/forwardconnector/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common

connector/forwardconnector/README.md

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Forward Connector
2+
3+
The `forwardconnector` can be used to chain pipelines of the same type together.
4+
For example, it can replicate a signal to multiple pipelines so that each pipeline
5+
can process the signal independently in varying ways. Alternately, it can be used to
6+
merge two pipelines together so they can be processed as one pipeline.
7+
8+
## Supported connection types
9+
10+
Connectors are always used in two or more pipelines. Therefore, support and stability
11+
are defined per _pair of signal types_. The pipeline in which a connector is used as
12+
an exporter is referred to below as the "Exporter pipeline". Likewise, the pipeline in
13+
which the connector is used as a receiver is referred to below as the "Receiver pipeline".
14+
15+
| Exporter pipeline | Receiver pipeline | Stability |
16+
| ----------------- | ----------------- | ----------------- |
17+
| traces | traces | [in development] |
18+
| metrics | metrics | [in development] |
19+
| logs | logs | [in development] |
20+
21+
[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development

connector/forwardconnector/doc.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package forwardconnector passes signals from one pipeline to another.
16+
package forwardconnector // import "go.opentelemetry.io/collector/connector/forwardconnector"

connector/forwardconnector/forward.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package forwardconnector // import "go.opentelemetry.io/collector/connector/forwardconnector"
16+
17+
import (
18+
"context"
19+
20+
"go.opentelemetry.io/collector/component"
21+
"go.opentelemetry.io/collector/connector"
22+
"go.opentelemetry.io/collector/consumer"
23+
"go.opentelemetry.io/collector/internal/sharedcomponent"
24+
)
25+
26+
const (
27+
typeStr = "forward"
28+
)
29+
30+
type forwardFactory struct {
31+
// This is the map of already created forward connectors for particular configurations.
32+
// We maintain this map because the Factory is asked trace, metric, and log receivers
33+
// separately but they must not create separate objects. When the connector is shutdown
34+
// it should be removed from this map so the same configuration can be recreated successfully.
35+
*sharedcomponent.SharedComponents
36+
}
37+
38+
// NewFactory returns a connector.Factory.
39+
func NewFactory() connector.Factory {
40+
f := &forwardFactory{sharedcomponent.NewSharedComponents()}
41+
return connector.NewFactory(
42+
typeStr,
43+
createDefaultConfig,
44+
connector.WithTracesToTraces(f.createTracesToTraces, component.StabilityLevelDevelopment),
45+
connector.WithMetricsToMetrics(f.createMetricsToMetrics, component.StabilityLevelDevelopment),
46+
connector.WithLogsToLogs(f.createLogsToLogs, component.StabilityLevelDevelopment),
47+
)
48+
}
49+
50+
// createDefaultConfig creates the default configuration.
51+
func createDefaultConfig() component.Config {
52+
return &struct{}{}
53+
}
54+
55+
// createTracesToTraces creates a trace receiver based on provided config.
56+
func (f *forwardFactory) createTracesToTraces(
57+
_ context.Context,
58+
set connector.CreateSettings,
59+
cfg component.Config,
60+
nextConsumer consumer.Traces,
61+
) (connector.Traces, error) {
62+
comp := f.GetOrAdd(cfg, func() component.Component {
63+
return &forward{}
64+
})
65+
66+
conn := comp.Unwrap().(*forward)
67+
conn.Traces = nextConsumer
68+
return conn, nil
69+
}
70+
71+
// createMetricsToMetrics creates a metrics receiver based on provided config.
72+
func (f *forwardFactory) createMetricsToMetrics(
73+
_ context.Context,
74+
set connector.CreateSettings,
75+
cfg component.Config,
76+
nextConsumer consumer.Metrics,
77+
) (connector.Metrics, error) {
78+
comp := f.GetOrAdd(cfg, func() component.Component {
79+
return &forward{}
80+
})
81+
82+
conn := comp.Unwrap().(*forward)
83+
conn.Metrics = nextConsumer
84+
return conn, nil
85+
}
86+
87+
// createLogsToLogs creates a log receiver based on provided config.
88+
func (f *forwardFactory) createLogsToLogs(
89+
_ context.Context,
90+
set connector.CreateSettings,
91+
cfg component.Config,
92+
nextConsumer consumer.Logs,
93+
) (connector.Logs, error) {
94+
comp := f.GetOrAdd(cfg, func() component.Component {
95+
return &forward{}
96+
})
97+
98+
conn := comp.Unwrap().(*forward)
99+
conn.Logs = nextConsumer
100+
return conn, nil
101+
}
102+
103+
// forward is used to pass signals directly from one pipeline to another.
104+
// This is useful when there is a need to replicate data and process it in more
105+
// than one way. It can also be used to join pipelines together.
106+
type forward struct {
107+
consumer.Traces
108+
consumer.Metrics
109+
consumer.Logs
110+
component.StartFunc
111+
component.ShutdownFunc
112+
}
113+
114+
func (c *forward) Capabilities() consumer.Capabilities {
115+
return consumer.Capabilities{MutatesData: false}
116+
}
+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package forwardconnector
15+
16+
import (
17+
"context"
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
22+
"go.opentelemetry.io/collector/component/componenttest"
23+
"go.opentelemetry.io/collector/connector/connectortest"
24+
"go.opentelemetry.io/collector/consumer/consumertest"
25+
"go.opentelemetry.io/collector/pdata/plog"
26+
"go.opentelemetry.io/collector/pdata/pmetric"
27+
"go.opentelemetry.io/collector/pdata/ptrace"
28+
)
29+
30+
func TestForward(t *testing.T) {
31+
f := NewFactory()
32+
cfg := f.CreateDefaultConfig()
33+
assert.Equal(t, &struct{}{}, cfg)
34+
35+
ctx := context.Background()
36+
set := connectortest.NewNopCreateSettings()
37+
host := componenttest.NewNopHost()
38+
39+
tracesSink := new(consumertest.TracesSink)
40+
tracesToTraces, err := f.CreateTracesToTraces(ctx, set, cfg, tracesSink)
41+
assert.NoError(t, err)
42+
assert.NotNil(t, tracesToTraces)
43+
44+
metricsSink := new(consumertest.MetricsSink)
45+
metricsToMetrics, err := f.CreateMetricsToMetrics(ctx, set, cfg, metricsSink)
46+
assert.NoError(t, err)
47+
assert.NotNil(t, metricsToMetrics)
48+
49+
logsSink := new(consumertest.LogsSink)
50+
logsToLogs, err := f.CreateLogsToLogs(ctx, set, cfg, logsSink)
51+
assert.NoError(t, err)
52+
assert.NotNil(t, logsToLogs)
53+
54+
assert.NoError(t, tracesToTraces.Start(ctx, host))
55+
assert.NoError(t, metricsToMetrics.Start(ctx, host))
56+
assert.NoError(t, logsToLogs.Start(ctx, host))
57+
58+
assert.NoError(t, tracesToTraces.ConsumeTraces(ctx, ptrace.NewTraces()))
59+
60+
assert.NoError(t, metricsToMetrics.ConsumeMetrics(ctx, pmetric.NewMetrics()))
61+
assert.NoError(t, metricsToMetrics.ConsumeMetrics(ctx, pmetric.NewMetrics()))
62+
63+
assert.NoError(t, logsToLogs.ConsumeLogs(ctx, plog.NewLogs()))
64+
assert.NoError(t, logsToLogs.ConsumeLogs(ctx, plog.NewLogs()))
65+
assert.NoError(t, logsToLogs.ConsumeLogs(ctx, plog.NewLogs()))
66+
67+
assert.NoError(t, tracesToTraces.Shutdown(ctx))
68+
assert.NoError(t, metricsToMetrics.Shutdown(ctx))
69+
assert.NoError(t, logsToLogs.Shutdown(ctx))
70+
71+
assert.Equal(t, 1, len(tracesSink.AllTraces()))
72+
assert.Equal(t, 2, len(metricsSink.AllMetrics()))
73+
assert.Equal(t, 3, len(logsSink.AllLogs()))
74+
}

connector/forwardconnector/go.mod

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
module go.opentelemetry.io/collector/connector/forwardconnector
2+
3+
go 1.18
4+
5+
require (
6+
github.com/stretchr/testify v1.8.1
7+
go.opentelemetry.io/collector v0.67.0
8+
go.opentelemetry.io/collector/component v0.67.0
9+
go.opentelemetry.io/collector/consumer v0.67.0
10+
go.opentelemetry.io/collector/pdata v1.0.0-rc1
11+
)
12+
13+
require (
14+
github.com/davecgh/go-spew v1.1.1 // indirect
15+
github.com/gogo/protobuf v1.3.2 // indirect
16+
github.com/golang/protobuf v1.5.2 // indirect
17+
github.com/json-iterator/go v1.1.12 // indirect
18+
github.com/knadh/koanf v1.4.4 // indirect
19+
github.com/mitchellh/copystructure v1.2.0 // indirect
20+
github.com/mitchellh/mapstructure v1.5.0 // indirect
21+
github.com/mitchellh/reflectwalk v1.0.2 // indirect
22+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
23+
github.com/modern-go/reflect2 v1.0.2 // indirect
24+
github.com/pmezard/go-difflib v1.0.0 // indirect
25+
go.opentelemetry.io/collector/confmap v0.67.0 // indirect
26+
go.opentelemetry.io/collector/featuregate v0.67.0 // indirect
27+
go.opentelemetry.io/otel v1.11.2 // indirect
28+
go.opentelemetry.io/otel/metric v0.34.0 // indirect
29+
go.opentelemetry.io/otel/trace v1.11.2 // indirect
30+
go.uber.org/atomic v1.10.0 // indirect
31+
go.uber.org/multierr v1.8.0 // indirect
32+
go.uber.org/zap v1.24.0 // indirect
33+
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
34+
golang.org/x/sys v0.3.0 // indirect
35+
golang.org/x/text v0.4.0 // indirect
36+
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
37+
google.golang.org/grpc v1.51.0 // indirect
38+
google.golang.org/protobuf v1.28.1 // indirect
39+
gopkg.in/yaml.v3 v3.0.1 // indirect
40+
)
41+
42+
replace go.opentelemetry.io/collector => ../../
43+
44+
replace go.opentelemetry.io/collector/component => ../../component
45+
46+
replace go.opentelemetry.io/collector/pdata => ../../pdata
47+
48+
replace go.opentelemetry.io/collector/semconv => ../../semconv
49+
50+
replace go.opentelemetry.io/collector/extension/zpagesextension => ../../extension/zpagesextension
51+
52+
replace go.opentelemetry.io/collector/featuregate => ../../featuregate
53+
54+
replace go.opentelemetry.io/collector/consumer => ../../consumer
55+
56+
replace go.opentelemetry.io/collector/confmap => ../../confmap

0 commit comments

Comments
 (0)