Skip to content

Add Marshal/MarshalJSON to Request/Response otlpgrpc messages #4085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/middleware"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand All @@ -51,12 +51,6 @@ type exporter struct {
logger *zap.Logger
}

var (
tracesMarshaler = otlp.NewProtobufTracesMarshaler()
metricsMarshaler = otlp.NewProtobufMetricsMarshaler()
logsMarshaler = otlp.NewProtobufLogsMarshaler()
)

const (
headerRetryAfter = "Retry-After"
maxHTTPResponseReadBytes = 64 * 1024
Expand Down Expand Up @@ -100,7 +94,9 @@ func (e *exporter) start(_ context.Context, host component.Host) error {
}

func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) error {
request, err := tracesMarshaler.MarshalTraces(td)
tr := otlpgrpc.NewTracesRequest()
tr.SetTraces(td)
request, err := tr.Marshal()
if err != nil {
return consumererror.Permanent(err)
}
Expand All @@ -109,15 +105,19 @@ func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) error {
}

func (e *exporter) pushMetrics(ctx context.Context, md pdata.Metrics) error {
request, err := metricsMarshaler.MarshalMetrics(md)
tr := otlpgrpc.NewMetricsRequest()
tr.SetMetrics(md)
request, err := tr.Marshal()
if err != nil {
return consumererror.Permanent(err)
}
return e.export(ctx, e.metricsURL, request)
}

func (e *exporter) pushLogs(ctx context.Context, ld pdata.Logs) error {
request, err := logsMarshaler.MarshalLogs(ld)
tr := otlpgrpc.NewLogsRequest()
tr.SetLogs(ld)
request, err := tr.Marshal()
if err != nil {
return consumererror.Permanent(err)
}
Expand Down
32 changes: 31 additions & 1 deletion model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package otlpgrpc

import (
"bytes"
"context"

"github.com/gogo/protobuf/jsonpb"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/model/pdata"
)

// TODO: Consider to add `LogsRequest`. If we add non pdata properties we can add them to the request.
var jsonMarshaler = &jsonpb.Marshaler{}

// LogsResponse represents the response for gRPC client/server.
type LogsResponse struct {
Expand All @@ -36,6 +38,20 @@ func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
}

// Marshal marshals LogsResponse into JSON bytes.
func (lr LogsResponse) Marshal() ([]byte, error) {
return lr.orig.Marshal()
}

// MarshalJSON marshals LogsResponse into JSON bytes.
func (lr LogsResponse) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, lr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// LogsRequest represents the response for gRPC client/server.
type LogsRequest struct {
orig *otlpcollectorlog.ExportLogsServiceRequest
Expand All @@ -46,6 +62,20 @@ func NewLogsRequest() LogsRequest {
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
}

// Marshal marshals LogsRequest into proto bytes.
func (lr LogsRequest) Marshal() ([]byte, error) {
return lr.orig.Marshal()
}

// MarshalJSON marshals LogsRequest into JSON bytes.
func (lr LogsRequest) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, lr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (lr LogsRequest) SetLogs(ld pdata.Logs) {
lr.orig.ResourceLogs = internal.LogsToOtlp(ld.InternalRep()).ResourceLogs
}
Expand Down
37 changes: 33 additions & 4 deletions model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package otlpgrpc

import (
"bytes"
"context"

"google.golang.org/grpc"
Expand All @@ -36,6 +37,20 @@ func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// Marshal marshals MetricsResponse into proto bytes.
func (mr MetricsResponse) Marshal() ([]byte, error) {
return mr.orig.Marshal()
}

// MarshalJSON marshals MetricsResponse into JSON bytes.
func (mr MetricsResponse) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, mr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// MetricsRequest represents the response for gRPC client/server.
type MetricsRequest struct {
orig *otlpcollectormetrics.ExportMetricsServiceRequest
Expand All @@ -46,12 +61,26 @@ func NewMetricsRequest() MetricsRequest {
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
}

func (lr MetricsRequest) SetMetrics(ld pdata.Metrics) {
lr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
// Marshal marshals MetricsRequest into proto bytes.
func (mr MetricsRequest) Marshal() ([]byte, error) {
return mr.orig.Marshal()
}

// MarshalJSON marshals LogsRequest into JSON bytes.
func (mr MetricsRequest) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, mr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
mr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
}

func (lr MetricsRequest) Metrics() pdata.Metrics {
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(lr.orig))
func (mr MetricsRequest) Metrics() pdata.Metrics {
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(mr.orig))
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
Expand Down
37 changes: 33 additions & 4 deletions model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package otlpgrpc

import (
"bytes"
"context"

"google.golang.org/grpc"
Expand All @@ -36,6 +37,20 @@ func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
}

// Marshal marshals TracesResponse into proto bytes.
func (tr TracesResponse) Marshal() ([]byte, error) {
return tr.orig.Marshal()
}

// MarshalJSON marshals TracesResponse into JSON bytes.
func (tr TracesResponse) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, tr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// TracesRequest represents the response for gRPC client/server.
type TracesRequest struct {
orig *otlpcollectortrace.ExportTraceServiceRequest
Expand All @@ -46,12 +61,26 @@ func NewTracesRequest() TracesRequest {
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

func (lr TracesRequest) SetTraces(ld pdata.Traces) {
lr.orig.ResourceSpans = internal.TracesToOtlp(ld.InternalRep()).ResourceSpans
// Marshal marshals TracesRequest into proto bytes.
func (tr TracesRequest) Marshal() ([]byte, error) {
return tr.orig.Marshal()
}

// MarshalJSON marshals TracesRequest into JSON bytes.
func (tr TracesRequest) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
if err := jsonMarshaler.Marshal(&buf, tr.orig); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (tr TracesRequest) SetTraces(td pdata.Traces) {
tr.orig.ResourceSpans = internal.TracesToOtlp(td.InternalRep()).ResourceSpans
}

func (lr TracesRequest) Traces() pdata.Traces {
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(lr.orig))
func (tr TracesRequest) Traces() pdata.Traces {
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(tr.orig))
}

// TracesClient is the client API for OTLP-GRPC Traces service.
Expand Down
Loading