Skip to content

Commit ee74217

Browse files
Add Marshal/MarshalJSON to Request/Response otlpgrpc messages (#4085)
* Add Marshal/MarshalJSON to Request/Response otlpgrpc messages Signed-off-by: Bogdan Drutu <[email protected]> * Update model/otlpgrpc/logs.go Co-authored-by: Anthony Mirabella <[email protected]> Co-authored-by: Anthony Mirabella <[email protected]>
1 parent ee30ab7 commit ee74217

File tree

7 files changed

+357
-135
lines changed

7 files changed

+357
-135
lines changed

exporter/otlphttpexporter/otlp.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737
"go.opentelemetry.io/collector/consumer/consumererror"
3838
"go.opentelemetry.io/collector/exporter/exporterhelper"
3939
"go.opentelemetry.io/collector/internal/middleware"
40-
"go.opentelemetry.io/collector/model/otlp"
40+
"go.opentelemetry.io/collector/model/otlpgrpc"
4141
"go.opentelemetry.io/collector/model/pdata"
4242
)
4343

@@ -51,12 +51,6 @@ type exporter struct {
5151
logger *zap.Logger
5252
}
5353

54-
var (
55-
tracesMarshaler = otlp.NewProtobufTracesMarshaler()
56-
metricsMarshaler = otlp.NewProtobufMetricsMarshaler()
57-
logsMarshaler = otlp.NewProtobufLogsMarshaler()
58-
)
59-
6054
const (
6155
headerRetryAfter = "Retry-After"
6256
maxHTTPResponseReadBytes = 64 * 1024
@@ -100,7 +94,9 @@ func (e *exporter) start(_ context.Context, host component.Host) error {
10094
}
10195

10296
func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) error {
103-
request, err := tracesMarshaler.MarshalTraces(td)
97+
tr := otlpgrpc.NewTracesRequest()
98+
tr.SetTraces(td)
99+
request, err := tr.Marshal()
104100
if err != nil {
105101
return consumererror.Permanent(err)
106102
}
@@ -109,15 +105,19 @@ func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) error {
109105
}
110106

111107
func (e *exporter) pushMetrics(ctx context.Context, md pdata.Metrics) error {
112-
request, err := metricsMarshaler.MarshalMetrics(md)
108+
tr := otlpgrpc.NewMetricsRequest()
109+
tr.SetMetrics(md)
110+
request, err := tr.Marshal()
113111
if err != nil {
114112
return consumererror.Permanent(err)
115113
}
116114
return e.export(ctx, e.metricsURL, request)
117115
}
118116

119117
func (e *exporter) pushLogs(ctx context.Context, ld pdata.Logs) error {
120-
request, err := logsMarshaler.MarshalLogs(ld)
118+
tr := otlpgrpc.NewLogsRequest()
119+
tr.SetLogs(ld)
120+
request, err := tr.Marshal()
121121
if err != nil {
122122
return consumererror.Permanent(err)
123123
}

model/otlpgrpc/logs.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,18 @@
1515
package otlpgrpc
1616

1717
import (
18+
"bytes"
1819
"context"
1920

21+
"github.com/gogo/protobuf/jsonpb"
2022
"google.golang.org/grpc"
2123

2224
"go.opentelemetry.io/collector/model/internal"
2325
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
2426
"go.opentelemetry.io/collector/model/pdata"
2527
)
2628

27-
// TODO: Consider to add `LogsRequest`. If we add non pdata properties we can add them to the request.
29+
var jsonMarshaler = &jsonpb.Marshaler{}
2830

2931
// LogsResponse represents the response for gRPC client/server.
3032
type LogsResponse struct {
@@ -36,6 +38,20 @@ func NewLogsResponse() LogsResponse {
3638
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
3739
}
3840

41+
// Marshal marshals LogsResponse into proto bytes.
42+
func (lr LogsResponse) Marshal() ([]byte, error) {
43+
return lr.orig.Marshal()
44+
}
45+
46+
// MarshalJSON marshals LogsResponse into JSON bytes.
47+
func (lr LogsResponse) MarshalJSON() ([]byte, error) {
48+
var buf bytes.Buffer
49+
if err := jsonMarshaler.Marshal(&buf, lr.orig); err != nil {
50+
return nil, err
51+
}
52+
return buf.Bytes(), nil
53+
}
54+
3955
// LogsRequest represents the response for gRPC client/server.
4056
type LogsRequest struct {
4157
orig *otlpcollectorlog.ExportLogsServiceRequest
@@ -46,6 +62,20 @@ func NewLogsRequest() LogsRequest {
4662
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
4763
}
4864

65+
// Marshal marshals LogsRequest into proto bytes.
66+
func (lr LogsRequest) Marshal() ([]byte, error) {
67+
return lr.orig.Marshal()
68+
}
69+
70+
// MarshalJSON marshals LogsRequest into JSON bytes.
71+
func (lr LogsRequest) MarshalJSON() ([]byte, error) {
72+
var buf bytes.Buffer
73+
if err := jsonMarshaler.Marshal(&buf, lr.orig); err != nil {
74+
return nil, err
75+
}
76+
return buf.Bytes(), nil
77+
}
78+
4979
func (lr LogsRequest) SetLogs(ld pdata.Logs) {
5080
lr.orig.ResourceLogs = internal.LogsToOtlp(ld.InternalRep()).ResourceLogs
5181
}

model/otlpgrpc/metrics.go

+33-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package otlpgrpc
1616

1717
import (
18+
"bytes"
1819
"context"
1920

2021
"google.golang.org/grpc"
@@ -36,6 +37,20 @@ func NewMetricsResponse() MetricsResponse {
3637
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
3738
}
3839

40+
// Marshal marshals MetricsResponse into proto bytes.
41+
func (mr MetricsResponse) Marshal() ([]byte, error) {
42+
return mr.orig.Marshal()
43+
}
44+
45+
// MarshalJSON marshals MetricsResponse into JSON bytes.
46+
func (mr MetricsResponse) MarshalJSON() ([]byte, error) {
47+
var buf bytes.Buffer
48+
if err := jsonMarshaler.Marshal(&buf, mr.orig); err != nil {
49+
return nil, err
50+
}
51+
return buf.Bytes(), nil
52+
}
53+
3954
// MetricsRequest represents the response for gRPC client/server.
4055
type MetricsRequest struct {
4156
orig *otlpcollectormetrics.ExportMetricsServiceRequest
@@ -46,12 +61,26 @@ func NewMetricsRequest() MetricsRequest {
4661
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
4762
}
4863

49-
func (lr MetricsRequest) SetMetrics(ld pdata.Metrics) {
50-
lr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
64+
// Marshal marshals MetricsRequest into proto bytes.
65+
func (mr MetricsRequest) Marshal() ([]byte, error) {
66+
return mr.orig.Marshal()
67+
}
68+
69+
// MarshalJSON marshals LogsRequest into JSON bytes.
70+
func (mr MetricsRequest) MarshalJSON() ([]byte, error) {
71+
var buf bytes.Buffer
72+
if err := jsonMarshaler.Marshal(&buf, mr.orig); err != nil {
73+
return nil, err
74+
}
75+
return buf.Bytes(), nil
76+
}
77+
78+
func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
79+
mr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
5180
}
5281

53-
func (lr MetricsRequest) Metrics() pdata.Metrics {
54-
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(lr.orig))
82+
func (mr MetricsRequest) Metrics() pdata.Metrics {
83+
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(mr.orig))
5584
}
5685

5786
// MetricsClient is the client API for OTLP-GRPC Metrics service.

model/otlpgrpc/traces.go

+33-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package otlpgrpc
1616

1717
import (
18+
"bytes"
1819
"context"
1920

2021
"google.golang.org/grpc"
@@ -36,6 +37,20 @@ func NewTracesResponse() TracesResponse {
3637
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
3738
}
3839

40+
// Marshal marshals TracesResponse into proto bytes.
41+
func (tr TracesResponse) Marshal() ([]byte, error) {
42+
return tr.orig.Marshal()
43+
}
44+
45+
// MarshalJSON marshals TracesResponse into JSON bytes.
46+
func (tr TracesResponse) MarshalJSON() ([]byte, error) {
47+
var buf bytes.Buffer
48+
if err := jsonMarshaler.Marshal(&buf, tr.orig); err != nil {
49+
return nil, err
50+
}
51+
return buf.Bytes(), nil
52+
}
53+
3954
// TracesRequest represents the response for gRPC client/server.
4055
type TracesRequest struct {
4156
orig *otlpcollectortrace.ExportTraceServiceRequest
@@ -46,12 +61,26 @@ func NewTracesRequest() TracesRequest {
4661
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
4762
}
4863

49-
func (lr TracesRequest) SetTraces(ld pdata.Traces) {
50-
lr.orig.ResourceSpans = internal.TracesToOtlp(ld.InternalRep()).ResourceSpans
64+
// Marshal marshals TracesRequest into proto bytes.
65+
func (tr TracesRequest) Marshal() ([]byte, error) {
66+
return tr.orig.Marshal()
67+
}
68+
69+
// MarshalJSON marshals TracesRequest into JSON bytes.
70+
func (tr TracesRequest) MarshalJSON() ([]byte, error) {
71+
var buf bytes.Buffer
72+
if err := jsonMarshaler.Marshal(&buf, tr.orig); err != nil {
73+
return nil, err
74+
}
75+
return buf.Bytes(), nil
76+
}
77+
78+
func (tr TracesRequest) SetTraces(td pdata.Traces) {
79+
tr.orig.ResourceSpans = internal.TracesToOtlp(td.InternalRep()).ResourceSpans
5180
}
5281

53-
func (lr TracesRequest) Traces() pdata.Traces {
54-
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(lr.orig))
82+
func (tr TracesRequest) Traces() pdata.Traces {
83+
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(tr.orig))
5584
}
5685

5786
// TracesClient is the client API for OTLP-GRPC Traces service.

0 commit comments

Comments
 (0)