Skip to content

Commit 362a610

Browse files
committed
Create obsconsumer package and switch connector produced attribute to data point
1 parent 0d58f3e commit 362a610

16 files changed

+918
-254
lines changed

service/internal/graph/connector.go

+98-141
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import (
2020
"go.opentelemetry.io/collector/service/internal/builders"
2121
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
2222
"go.opentelemetry.io/collector/service/internal/metadata"
23+
"go.opentelemetry.io/collector/service/internal/obsconsumer"
2324
)
2425

25-
const pipelineIDAttrKey = "otelcol.pipeline.id.output"
26+
const pipelineIDAttrKey = "otelcol.pipeline.id"
2627

2728
var _ consumerNode = (*connectorNode)(nil)
2829

@@ -79,67 +80,56 @@ func (n *connectorNode) buildTraces(
7980
builder *builders.ConnectorBuilder,
8081
nexts []baseConsumer,
8182
) error {
82-
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
83-
for _, next := range nexts {
84-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
85-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
86-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
87-
if err != nil {
88-
return err
89-
}
90-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerTraces{
91-
Traces: next.(consumer.Traces),
92-
itemCounter: tb.ConnectorProducedItems,
93-
}
94-
}
95-
next := connector.NewTracesRouter(consumers)
96-
9783
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
9884
if err != nil {
9985
return err
10086
}
10187

88+
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
89+
for _, next := range nexts {
90+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
91+
next.(consumer.Traces),
92+
tb.ConnectorProducedItems,
93+
obsconsumer.WithStaticDataPointAttribute(
94+
otelattr.String(
95+
pipelineIDAttrKey,
96+
next.(*capabilitiesNode).pipelineID.String(),
97+
),
98+
),
99+
)
100+
}
101+
next := connector.NewTracesRouter(consumers)
102+
102103
switch n.exprPipelineType {
103104
case pipeline.SignalTraces:
104105
n.Component, err = builder.CreateTracesToTraces(ctx, set, next)
105106
if err != nil {
106107
return err
107108
}
108-
n.consumer = obsConsumerTraces{
109-
// Connectors which might pass along data must inherit capabilities of all nexts
110-
Traces: capabilityconsumer.NewTraces(
111-
n.Component.(consumer.Traces),
112-
aggregateCap(n.Component.(consumer.Traces), nexts),
113-
),
114-
itemCounter: tb.ConnectorConsumedItems,
115-
}
109+
// Connectors which might pass along data must inherit capabilities of all nexts
110+
capConsumer := capabilityconsumer.NewTraces(
111+
n.Component.(consumer.Traces),
112+
aggregateCap(n.Component.(consumer.Traces), nexts),
113+
)
114+
n.consumer = obsconsumer.NewTraces(capConsumer, tb.ConnectorConsumedItems)
116115
case pipeline.SignalMetrics:
117116
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
118117
if err != nil {
119118
return err
120119
}
121-
n.consumer = obsConsumerMetrics{
122-
Metrics: n.Component.(consumer.Metrics),
123-
itemCounter: tb.ConnectorConsumedItems,
124-
}
120+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
125121
case pipeline.SignalLogs:
126122
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
127123
if err != nil {
128124
return err
129125
}
130-
n.consumer = obsConsumerLogs{
131-
Logs: n.Component.(consumer.Logs),
132-
itemCounter: tb.ConnectorConsumedItems,
133-
}
126+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
134127
case xpipeline.SignalProfiles:
135128
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
136129
if err != nil {
137130
return err
138131
}
139-
n.consumer = obsConsumerProfiles{
140-
Profiles: n.Component.(xconsumer.Profiles),
141-
itemCounter: tb.ConnectorConsumedItems,
142-
}
132+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
143133
}
144134
return nil
145135
}
@@ -150,67 +140,56 @@ func (n *connectorNode) buildMetrics(
150140
builder *builders.ConnectorBuilder,
151141
nexts []baseConsumer,
152142
) error {
153-
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
154-
for _, next := range nexts {
155-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
156-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
157-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
158-
if err != nil {
159-
return err
160-
}
161-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerMetrics{
162-
Metrics: next.(consumer.Metrics),
163-
itemCounter: tb.ConnectorProducedItems,
164-
}
165-
}
166-
next := connector.NewMetricsRouter(consumers)
167-
168143
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
169144
if err != nil {
170145
return err
171146
}
172147

148+
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
149+
for _, next := range nexts {
150+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
151+
next.(consumer.Metrics),
152+
tb.ConnectorProducedItems,
153+
obsconsumer.WithStaticDataPointAttribute(
154+
otelattr.String(
155+
pipelineIDAttrKey,
156+
next.(*capabilitiesNode).pipelineID.String(),
157+
),
158+
),
159+
)
160+
}
161+
next := connector.NewMetricsRouter(consumers)
162+
173163
switch n.exprPipelineType {
174164
case pipeline.SignalMetrics:
175165
n.Component, err = builder.CreateMetricsToMetrics(ctx, set, next)
176166
if err != nil {
177167
return err
178168
}
179-
n.consumer = obsConsumerMetrics{
180-
// Connectors which might pass along data must inherit capabilities of all nexts
181-
Metrics: capabilityconsumer.NewMetrics(
182-
n.Component.(consumer.Metrics),
183-
aggregateCap(n.Component.(consumer.Metrics), nexts),
184-
),
185-
itemCounter: tb.ConnectorConsumedItems,
186-
}
169+
// Connectors which might pass along data must inherit capabilities of all nexts
170+
capConsumer := capabilityconsumer.NewMetrics(
171+
n.Component.(consumer.Metrics),
172+
aggregateCap(n.Component.(consumer.Metrics), nexts),
173+
)
174+
n.consumer = obsconsumer.NewMetrics(capConsumer, tb.ConnectorConsumedItems)
187175
case pipeline.SignalTraces:
188176
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
189177
if err != nil {
190178
return err
191179
}
192-
n.consumer = obsConsumerTraces{
193-
Traces: n.Component.(consumer.Traces),
194-
itemCounter: tb.ConnectorConsumedItems,
195-
}
180+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
196181
case pipeline.SignalLogs:
197182
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
198183
if err != nil {
199184
return err
200185
}
201-
n.consumer = obsConsumerLogs{
202-
Logs: n.Component.(consumer.Logs),
203-
itemCounter: tb.ConnectorConsumedItems,
204-
}
186+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
205187
case xpipeline.SignalProfiles:
206188
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
207189
if err != nil {
208190
return err
209191
}
210-
n.consumer = obsConsumerProfiles{
211-
Profiles: n.Component.(xconsumer.Profiles),
212-
itemCounter: tb.ConnectorConsumedItems,
213-
}
192+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
214193
}
215194
return nil
216195
}
@@ -221,67 +200,56 @@ func (n *connectorNode) buildLogs(
221200
builder *builders.ConnectorBuilder,
222201
nexts []baseConsumer,
223202
) error {
224-
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
225-
for _, next := range nexts {
226-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
227-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
228-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
229-
if err != nil {
230-
return err
231-
}
232-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerLogs{
233-
Logs: next.(consumer.Logs),
234-
itemCounter: tb.ConnectorProducedItems,
235-
}
236-
}
237-
next := connector.NewLogsRouter(consumers)
238-
239203
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
240204
if err != nil {
241205
return err
242206
}
243207

208+
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
209+
for _, next := range nexts {
210+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
211+
next.(consumer.Logs),
212+
tb.ConnectorProducedItems,
213+
obsconsumer.WithStaticDataPointAttribute(
214+
otelattr.String(
215+
pipelineIDAttrKey,
216+
next.(*capabilitiesNode).pipelineID.String(),
217+
),
218+
),
219+
)
220+
}
221+
next := connector.NewLogsRouter(consumers)
222+
244223
switch n.exprPipelineType {
245224
case pipeline.SignalLogs:
246225
n.Component, err = builder.CreateLogsToLogs(ctx, set, next)
247226
if err != nil {
248227
return err
249228
}
250-
n.consumer = obsConsumerLogs{
251-
// Connectors which might pass along data must inherit capabilities of all nexts
252-
Logs: capabilityconsumer.NewLogs(
253-
n.Component.(consumer.Logs),
254-
aggregateCap(n.Component.(consumer.Logs), nexts),
255-
),
256-
itemCounter: tb.ConnectorConsumedItems,
257-
}
229+
// Connectors which might pass along data must inherit capabilities of all nexts
230+
capConsumer := capabilityconsumer.NewLogs(
231+
n.Component.(consumer.Logs),
232+
aggregateCap(n.Component.(consumer.Logs), nexts),
233+
)
234+
n.consumer = obsconsumer.NewLogs(capConsumer, tb.ConnectorConsumedItems)
258235
case pipeline.SignalTraces:
259236
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
260237
if err != nil {
261238
return err
262239
}
263-
n.consumer = obsConsumerTraces{
264-
Traces: n.Component.(consumer.Traces),
265-
itemCounter: tb.ConnectorConsumedItems,
266-
}
240+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
267241
case pipeline.SignalMetrics:
268242
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
269243
if err != nil {
270244
return err
271245
}
272-
n.consumer = obsConsumerMetrics{
273-
Metrics: n.Component.(consumer.Metrics),
274-
itemCounter: tb.ConnectorConsumedItems,
275-
}
246+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
276247
case xpipeline.SignalProfiles:
277248
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
278249
if err != nil {
279250
return err
280251
}
281-
n.consumer = obsConsumerProfiles{
282-
Profiles: n.Component.(xconsumer.Profiles),
283-
itemCounter: tb.ConnectorConsumedItems,
284-
}
252+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
285253
}
286254
return nil
287255
}
@@ -292,67 +260,56 @@ func (n *connectorNode) buildProfiles(
292260
builder *builders.ConnectorBuilder,
293261
nexts []baseConsumer,
294262
) error {
295-
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
296-
for _, next := range nexts {
297-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
298-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
299-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
300-
if err != nil {
301-
return err
302-
}
303-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerProfiles{
304-
Profiles: next.(xconsumer.Profiles),
305-
itemCounter: tb.ConnectorProducedItems,
306-
}
307-
}
308-
next := xconnector.NewProfilesRouter(consumers)
309-
310263
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
311264
if err != nil {
312265
return err
313266
}
314267

268+
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
269+
for _, next := range nexts {
270+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
271+
next.(xconsumer.Profiles),
272+
tb.ConnectorProducedItems,
273+
obsconsumer.WithStaticDataPointAttribute(
274+
otelattr.String(
275+
pipelineIDAttrKey,
276+
next.(*capabilitiesNode).pipelineID.String(),
277+
),
278+
),
279+
)
280+
}
281+
next := xconnector.NewProfilesRouter(consumers)
282+
315283
switch n.exprPipelineType {
316284
case xpipeline.SignalProfiles:
317285
n.Component, err = builder.CreateProfilesToProfiles(ctx, set, next)
318286
if err != nil {
319287
return err
320288
}
321-
n.consumer = obsConsumerProfiles{
322-
// Connectors which might pass along data must inherit capabilities of all nexts
323-
Profiles: capabilityconsumer.NewProfiles(
324-
n.Component.(xconsumer.Profiles),
325-
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
326-
),
327-
itemCounter: tb.ConnectorConsumedItems,
328-
}
289+
// Connectors which might pass along data must inherit capabilities of all nexts
290+
capConsumer := capabilityconsumer.NewProfiles(
291+
n.Component.(xconsumer.Profiles),
292+
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
293+
)
294+
n.consumer = obsconsumer.NewProfiles(capConsumer, tb.ConnectorConsumedItems)
329295
case pipeline.SignalTraces:
330296
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
331297
if err != nil {
332298
return err
333299
}
334-
n.consumer = obsConsumerTraces{
335-
Traces: n.Component.(consumer.Traces),
336-
itemCounter: tb.ConnectorConsumedItems,
337-
}
300+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
338301
case pipeline.SignalMetrics:
339302
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
340303
if err != nil {
341304
return err
342305
}
343-
n.consumer = obsConsumerMetrics{
344-
Metrics: n.Component.(consumer.Metrics),
345-
itemCounter: tb.ConnectorConsumedItems,
346-
}
306+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
347307
case pipeline.SignalLogs:
348308
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
349309
if err != nil {
350310
return err
351311
}
352-
n.consumer = obsConsumerLogs{
353-
Logs: n.Component.(consumer.Logs),
354-
itemCounter: tb.ConnectorConsumedItems,
355-
}
312+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
356313
}
357314
return nil
358315
}

0 commit comments

Comments
 (0)