Skip to content

Commit 7607be6

Browse files
committed
* Making it possible to override the default context propagation in RabbitMQActivitySource to make it possible to add OpenTelemetry Baggage propagation
* Adding a separate OpenTelemetry integration package to make it easier to integrate with OTel * Adding OTel tests to test trace context and baggage propagation
1 parent e0367ad commit 7607be6

15 files changed

+967
-161
lines changed

RabbitMQDotNetClient.sln

+13
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
4040
EndProject
4141
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
4242
EndProject
43+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
44+
EndProject
45+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OTel", "projects\Test\OTel\OTel.csproj", "{33E86EAF-C269-4336-8E5C-71418AE360A2}"
46+
EndProject
4347
Global
4448
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4549
Debug|Any CPU = Debug|Any CPU
@@ -90,6 +94,14 @@ Global
9094
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
9195
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
9296
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
97+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
98+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
99+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
100+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
101+
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
102+
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
103+
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
104+
{33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.Build.0 = Release|Any CPU
93105
EndGlobalSection
94106
GlobalSection(SolutionProperties) = preSolution
95107
HideSolutionNode = FALSE
@@ -104,6 +116,7 @@ Global
104116
{F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
105117
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
106118
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
119+
{33E86EAF-C269-4336-8E5C-71418AE360A2} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
107120
EndGlobalSection
108121
GlobalSection(ExtensibilityGlobals) = postSolution
109122
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# RabbitMQ .NET Client - OAuth2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
5+
<NoWarn>$(NoWarn);CS1591</NoWarn>
6+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
7+
<AssemblyTitle>RabbitMQ OpenTelemetry Integration Package for .NET</AssemblyTitle>
8+
<Authors>VMware</Authors>
9+
<Company>VMware, Inc. or its affiliates.</Company>
10+
<Copyright>Copyright © 2007-2023 VMware, Inc. or its affiliates.</Copyright>
11+
<Description>The RabbitMQ OAuth2 Client Library for .NET enables OAuth2 token refresh for RabbitMQ.Client</Description>
12+
<GenerateDocumentationFile>true</GenerateDocumentationFile>
13+
<PackageIcon>icon.png</PackageIcon>
14+
<PackageLicenseExpression>Apache-2.0 OR MPL-2.0</PackageLicenseExpression>
15+
<PackageProjectUrl>https://www.rabbitmq.com/dotnet.html</PackageProjectUrl>
16+
<PackageTags>rabbitmq, amqp, oauth2</PackageTags>
17+
<Product>RabbitMQ</Product>
18+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
19+
<RepositoryUrl>https://github.com/rabbitmq/rabbitmq-dotnet-client.git</RepositoryUrl>
20+
<IncludeSymbols>true</IncludeSymbols>
21+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
22+
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
23+
<SignAssembly>true</SignAssembly>
24+
<MinVerTagPrefix>otel-</MinVerTagPrefix>
25+
<MinVerVerbosity>minimal</MinVerVerbosity>
26+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
27+
<PackageOutputPath>../../packages</PackageOutputPath>
28+
<PackageReadmeFile>README.md</PackageReadmeFile>
29+
<LangVersion>7.3</LangVersion>
30+
</PropertyGroup>
31+
32+
<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
33+
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
34+
<Deterministic>true</Deterministic>
35+
<EmbedUntrackedSources>true</EmbedUntrackedSources>
36+
</PropertyGroup>
37+
38+
<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
39+
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
40+
</ItemGroup>
41+
42+
<ItemGroup>
43+
<None Remove="icon.png" />
44+
<Content Include="icon.png" PackagePath="" />
45+
<None Include="README.md" Pack="true" PackagePath="/" />
46+
<InternalsVisibleTo Include="Unit" />
47+
<InternalsVisibleTo Include="Benchmarks" />
48+
</ItemGroup>
49+
50+
<ItemGroup>
51+
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
52+
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
53+
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="all" />
54+
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
55+
</ItemGroup>
56+
57+
<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
58+
</ItemGroup>
59+
60+
<ItemGroup>
61+
<ProjectReference Include="../RabbitMQ.Client/RabbitMQ.Client.csproj" />
62+
</ItemGroup>
63+
64+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace RabbitMQ.Client.OpenTelemetry
2+
{
3+
public class RabbitMQOpenTelemetryConfiguration
4+
{
5+
public bool PropagateBaggage { get; set; } = true;
6+
public bool UseRoutingKeyAsOperationName { get; set; } = true;
7+
public bool IncludePublishers { get; set; } = true;
8+
public bool IncludeSubscribers { get; set; } = true;
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Text;
6+
using OpenTelemetry.Context.Propagation;
7+
using RabbitMQ.Client;
8+
using RabbitMQ.Client.OpenTelemetry;
9+
10+
namespace OpenTelemetry.Trace
11+
{
12+
public static class OpenTelemetryExtensions
13+
{
14+
internal static TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;
15+
16+
public static TracerProviderBuilder AddRabbitMQ(this TracerProviderBuilder builder,
17+
RabbitMQOpenTelemetryConfiguration configuration)
18+
{
19+
if (configuration.PropagateBaggage)
20+
{
21+
s_propagator = new CompositeTextMapPropagator(new TextMapPropagator[]
22+
{
23+
new TraceContextPropagator(), new BaggagePropagator()
24+
});
25+
}
26+
else
27+
{
28+
s_propagator = new TraceContextPropagator();
29+
}
30+
31+
RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName;
32+
RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
33+
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
34+
35+
if (configuration.IncludeSubscribers)
36+
{
37+
builder.AddSource(RabbitMQActivitySource.SubscriberSourceName);
38+
}
39+
40+
if (configuration.IncludePublishers)
41+
{
42+
builder.AddSource(RabbitMQActivitySource.PublisherSourceName);
43+
}
44+
45+
return builder;
46+
}
47+
48+
private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
49+
{
50+
// Extract the PropagationContext of the upstream parent from the message headers.
51+
var parentContext = s_propagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
52+
Baggage.Current = parentContext.Baggage;
53+
return parentContext.ActivityContext;
54+
}
55+
56+
private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object> carrier, string key)
57+
{
58+
try
59+
{
60+
if (carrier.TryGetValue(key, out object value))
61+
{
62+
byte[] bytes = value as byte[];
63+
return new[] { Encoding.UTF8.GetString(bytes) };
64+
}
65+
}
66+
catch (Exception)
67+
{
68+
//this.logger.LogError(ex, "Failed to extract trace context.");
69+
}
70+
71+
return Enumerable.Empty<string>();
72+
}
73+
74+
private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object> props)
75+
{
76+
// Inject the current Activity's context into the message headers.
77+
s_propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
78+
}
79+
80+
private static void OpenTelemetryContextSetter(IDictionary<string, object> carrier, string key, string value)
81+
{
82+
carrier[key] = Encoding.UTF8.GetBytes(value);
83+
}
84+
}
85+
}
6.61 KB
Loading

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

+5-1
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,7 @@ static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception
848848
static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection<string>
849849
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
850850
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
851-
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
851+
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
852852
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
853853
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
854854
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
@@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
956956
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
957957
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
958958
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
959+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
960+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
961+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
962+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
959963
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
<InternalsVisibleTo Include="Integration" />
5656
<InternalsVisibleTo Include="SequentialIntegration" />
5757
<InternalsVisibleTo Include="Benchmarks" />
58+
<InternalsVisibleTo Include="OTel" />
5859
</ItemGroup>
5960

6061
<ItemGroup>

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

+77-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.client.impl;
3637

@@ -86,17 +87,58 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
8687
/// <remarks>
8788
/// The publication occurs with mandatory=false and immediate=false.
8889
/// </remarks>
89-
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
90+
public static async ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, T basicProperties, ReadOnlyMemory<byte> body)
9091
where T : IReadOnlyBasicProperties, IAmqpHeader
9192
{
92-
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
93+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
94+
? RabbitMQActivitySource.Send(addr.RoutingKey, addr.ExchangeName, body.Length)
95+
: default;
96+
97+
if (sendActivity != null)
98+
{
99+
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
100+
await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, props, body);
101+
}
102+
else
103+
{
104+
await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
105+
}
106+
}
107+
108+
public static async ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
109+
{
110+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
111+
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
112+
: default;
113+
114+
if (sendActivity != null)
115+
{
116+
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
117+
await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory);
118+
}
119+
else
120+
{
121+
await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
122+
}
93123
}
94124

95-
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
96-
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
125+
public static async ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
126+
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
127+
{
128+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
129+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
130+
: default;
97131

98-
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
99-
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
132+
if (sendActivity != null)
133+
{
134+
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
135+
await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory);
136+
}
137+
else
138+
{
139+
await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
140+
}
141+
}
100142

101143
#nullable disable
102144

@@ -185,5 +227,34 @@ public static Task CloseAsync(this IChannel channel, ushort replyCode, string re
185227
{
186228
return channel.CloseAsync(replyCode, replyText, false);
187229
}
230+
231+
private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(TProperties basicProperties,
232+
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
233+
{
234+
// This activity is marked as recorded, so let's propagate the trace and span ids.
235+
if (sendActivity.IsAllDataRequested)
236+
{
237+
if (!string.IsNullOrEmpty(basicProperties.CorrelationId))
238+
{
239+
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId);
240+
}
241+
}
242+
243+
BasicProperties props = default;
244+
if (basicProperties is BasicProperties properties)
245+
{
246+
props = properties;
247+
}
248+
else if (basicProperties is EmptyBasicProperty)
249+
{
250+
props = new BasicProperties();
251+
}
252+
253+
var headers = props.Headers ?? new Dictionary<string, object>();
254+
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
255+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
256+
props.Headers = headers;
257+
return props;
258+
}
188259
}
189260
}

0 commit comments

Comments
 (0)