|
21 | 21 | import java.util.function.Function;
|
22 | 22 |
|
23 | 23 | import io.netty.buffer.ByteBuf;
|
| 24 | +import io.netty.buffer.ByteBufAllocator; |
24 | 25 | import io.netty.buffer.CompositeByteBuf;
|
25 | 26 | import io.rsocket.Payload;
|
26 | 27 | import io.rsocket.RSocket;
|
27 | 28 | import io.rsocket.frame.FrameType;
|
| 29 | +import io.rsocket.metadata.CompositeMetadataCodec; |
28 | 30 | import io.rsocket.metadata.RoutingMetadata;
|
29 | 31 | import io.rsocket.metadata.TracingMetadataCodec;
|
30 | 32 | import io.rsocket.metadata.WellKnownMimeType;
|
@@ -104,34 +106,40 @@ <T> Mono<T> setSpan(Function<Payload, Mono<T>> input, Payload payload, FrameType
|
104 | 106 | log.debug("Extracted result from context or thread local " + span);
|
105 | 107 | }
|
106 | 108 | final Payload newPayload = PayloadUtils.cleanTracingMetadata(payload, new HashSet<>(propagator.fields()));
|
107 |
| - TraceContext traceContext = span.context(); |
| 109 | + final TraceContext traceContext = span.context(); |
| 110 | + final CompositeByteBuf metadata = (CompositeByteBuf) newPayload.metadata(); |
108 | 111 | if (this.isZipkinPropagationEnabled) {
|
109 |
| - injectDefaultZipkinRSocketHeaders(newPayload, traceContext); |
| 112 | + injectDefaultZipkinRSocketHeaders(metadata, traceContext); |
110 | 113 | }
|
111 |
| - this.propagator.inject(traceContext, (CompositeByteBuf) newPayload.metadata(), this.setter); |
| 114 | + this.propagator.inject(traceContext, metadata, this.setter); |
112 | 115 | return input.apply(newPayload).doOnError(span::error).doFinally(signalType -> span.end());
|
113 | 116 | });
|
114 | 117 | }
|
115 | 118 |
|
116 |
| - private void injectDefaultZipkinRSocketHeaders(Payload newPayload, TraceContext traceContext) { |
| 119 | + void injectDefaultZipkinRSocketHeaders(CompositeByteBuf metadata, TraceContext traceContext) { |
117 | 120 | TracingMetadataCodec.Flags flags = traceContext.sampled() == null ? TracingMetadataCodec.Flags.UNDECIDED
|
118 | 121 | : traceContext.sampled() ? TracingMetadataCodec.Flags.SAMPLE : TracingMetadataCodec.Flags.NOT_SAMPLE;
|
119 | 122 | String traceId = traceContext.traceId();
|
120 | 123 | long[] traceIds = EncodingUtils.fromString(traceId);
|
121 | 124 | long[] spanId = EncodingUtils.fromString(traceContext.spanId());
|
122 | 125 | long[] parentSpanId = EncodingUtils.fromString(traceContext.parentId());
|
123 | 126 | boolean isTraceId128Bit = traceIds.length == 2;
|
| 127 | + |
| 128 | + final ByteBufAllocator allocator = metadata.alloc(); |
124 | 129 | if (isTraceId128Bit) {
|
125 |
| - TracingMetadataCodec.encode128(newPayload.metadata().alloc(), traceIds[0], traceIds[1], spanId[0], |
126 |
| - EncodingUtils.fromString(traceContext.parentId())[0], flags); |
| 130 | + CompositeMetadataCodec.encodeAndAddMetadata(metadata, allocator, |
| 131 | + WellKnownMimeType.MESSAGE_RSOCKET_TRACING_ZIPKIN, |
| 132 | + TracingMetadataCodec.encode128(allocator, traceIds[0], traceIds[1], spanId[0], |
| 133 | + EncodingUtils.fromString(traceContext.parentId())[0], flags)); |
127 | 134 | }
|
128 | 135 | else {
|
129 |
| - TracingMetadataCodec.encode64(newPayload.metadata().alloc(), traceIds[0], spanId[0], parentSpanId[0], |
130 |
| - flags); |
| 136 | + CompositeMetadataCodec.encodeAndAddMetadata(metadata, allocator, |
| 137 | + WellKnownMimeType.MESSAGE_RSOCKET_TRACING_ZIPKIN, |
| 138 | + TracingMetadataCodec.encode64(allocator, traceIds[0], spanId[0], parentSpanId[0], flags)); |
131 | 139 | }
|
132 | 140 | }
|
133 | 141 |
|
134 |
| - private Span.Builder spanBuilder(ContextView contextView) { |
| 142 | + Span.Builder spanBuilder(ContextView contextView) { |
135 | 143 | Span.Builder spanBuilder = this.tracer.spanBuilder();
|
136 | 144 | if (contextView.hasKey(TraceContext.class)) {
|
137 | 145 | spanBuilder = spanBuilder.setParent(contextView.get(TraceContext.class));
|
|
0 commit comments