Skip to content

Commit ed4cf97

Browse files
committed
* Reverting breaking changes for existing API methods
* Adding OTel tests for all publish methods
1 parent 7607be6 commit ed4cf97

File tree

4 files changed

+193
-80
lines changed

4 files changed

+193
-80
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,7 @@ static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception
848848
static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection<string>
849849
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
850850
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
851-
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
851+
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
852852
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
853853
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
854854
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

+9-76
Original file line numberDiff line numberDiff line change
@@ -87,58 +87,20 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
8787
/// <remarks>
8888
/// The publication occurs with mandatory=false and immediate=false.
8989
/// </remarks>
90-
public static async ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, T basicProperties, ReadOnlyMemory<byte> body)
90+
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties,
91+
ReadOnlyMemory<byte> body)
9192
where T : IReadOnlyBasicProperties, IAmqpHeader
9293
{
93-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
94-
? RabbitMQActivitySource.Send(addr.RoutingKey, addr.ExchangeName, body.Length)
95-
: default;
96-
97-
if (sendActivity != null)
98-
{
99-
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
100-
await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, props, body);
101-
}
102-
else
103-
{
104-
await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
105-
}
106-
}
107-
108-
public static async ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
109-
{
110-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
111-
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
112-
: default;
113-
114-
if (sendActivity != null)
115-
{
116-
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
117-
await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory);
118-
}
119-
else
120-
{
121-
await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
122-
}
94+
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
12395
}
12496

125-
public static async ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
126-
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
127-
{
128-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
129-
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
130-
: default;
97+
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
98+
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
99+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
131100

132-
if (sendActivity != null)
133-
{
134-
BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity);
135-
await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory);
136-
}
137-
else
138-
{
139-
await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
140-
}
141-
}
101+
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
102+
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
103+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
142104

143105
#nullable disable
144106

@@ -227,34 +189,5 @@ public static Task CloseAsync(this IChannel channel, ushort replyCode, string re
227189
{
228190
return channel.CloseAsync(replyCode, replyText, false);
229191
}
230-
231-
private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(TProperties basicProperties,
232-
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
233-
{
234-
// This activity is marked as recorded, so let's propagate the trace and span ids.
235-
if (sendActivity.IsAllDataRequested)
236-
{
237-
if (!string.IsNullOrEmpty(basicProperties.CorrelationId))
238-
{
239-
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId);
240-
}
241-
}
242-
243-
BasicProperties props = default;
244-
if (basicProperties is BasicProperties properties)
245-
{
246-
props = properties;
247-
}
248-
else if (basicProperties is EmptyBasicProperty)
249-
{
250-
props = new BasicProperties();
251-
}
252-
253-
var headers = props.Headers ?? new Dictionary<string, object>();
254-
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
255-
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
256-
props.Headers = headers;
257-
return props;
258-
}
259192
}
260193
}

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

+68-3
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,19 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
10361036
try
10371037
{
10381038
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1039-
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1039+
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
1040+
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
1041+
: default;
1042+
1043+
if (sendActivity != null)
1044+
{
1045+
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
1046+
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
1047+
}
1048+
else
1049+
{
1050+
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1051+
}
10401052
}
10411053
catch
10421054
{
@@ -1068,7 +1080,19 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
10681080
try
10691081
{
10701082
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1071-
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1083+
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
1084+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
1085+
: default;
1086+
1087+
if (sendActivity != null)
1088+
{
1089+
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
1090+
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
1091+
}
1092+
else
1093+
{
1094+
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1095+
}
10721096
}
10731097
catch
10741098
{
@@ -1100,7 +1124,19 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
11001124
try
11011125
{
11021126
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1103-
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1127+
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
1128+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
1129+
: default;
1130+
1131+
if (sendActivity != null)
1132+
{
1133+
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
1134+
await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
1135+
}
1136+
else
1137+
{
1138+
await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
1139+
}
11041140
}
11051141
catch
11061142
{
@@ -1783,5 +1819,34 @@ await CloseAsync(ea, false)
17831819
throw;
17841820
}
17851821
}
1822+
1823+
private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(TProperties basicProperties,
1824+
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
1825+
{
1826+
// This activity is marked as recorded, so let's propagate the trace and span ids.
1827+
if (sendActivity.IsAllDataRequested)
1828+
{
1829+
if (!string.IsNullOrEmpty(basicProperties.CorrelationId))
1830+
{
1831+
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId);
1832+
}
1833+
}
1834+
1835+
BasicProperties props = default;
1836+
if (basicProperties is BasicProperties properties)
1837+
{
1838+
props = properties;
1839+
}
1840+
else if (basicProperties is EmptyBasicProperty)
1841+
{
1842+
props = new BasicProperties();
1843+
}
1844+
1845+
var headers = props.Headers ?? new Dictionary<string, object>();
1846+
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1847+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
1848+
props.Headers = headers;
1849+
return props;
1850+
}
17861851
}
17871852
}

projects/Test/OTel/TestOpenTelemetry.cs

+115
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,121 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
189189
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
190190
}
191191
}
192+
193+
[Theory]
194+
[InlineData(true)]
195+
[InlineData(false)]
196+
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
197+
{
198+
var exportedItems = new List<Activity>();
199+
using (var tracer = Sdk.CreateTracerProviderBuilder()
200+
.AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration())
201+
.AddInMemoryExporter(exportedItems)
202+
.Build())
203+
{
204+
string baggageGuid = Guid.NewGuid().ToString();
205+
Baggage.SetBaggage("TestItem", baggageGuid);
206+
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
207+
await _channel.ConfirmSelectAsync();
208+
209+
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
210+
await Task.Delay(500);
211+
212+
string queueName = $"{Guid.NewGuid()}";
213+
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
214+
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
215+
byte[] consumeBody = null;
216+
var consumer = new EventingBasicConsumer(_channel);
217+
var consumerReceivedTcs =
218+
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
219+
consumer.Received += (o, a) =>
220+
{
221+
consumeBody = a.Body.ToArray();
222+
string baggageItem = Baggage.GetBaggage("TestItem");
223+
if (baggageItem == baggageGuid)
224+
{
225+
consumerReceivedTcs.SetResult(true);
226+
}
227+
else
228+
{
229+
consumerReceivedTcs.SetException(
230+
EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
231+
}
232+
};
233+
234+
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
235+
var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName);
236+
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
237+
await _channel.WaitForConfirmsOrDieAsync();
238+
Baggage.ClearBaggage();
239+
Assert.Null(Baggage.GetBaggage("TestItem"));
240+
241+
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
242+
Assert.True(await consumerReceivedTcs.Task);
243+
244+
await _channel.BasicCancelAsync(consumerTag);
245+
await Task.Delay(500);
246+
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
247+
}
248+
}
249+
250+
[Theory]
251+
[InlineData(true)]
252+
[InlineData(false)]
253+
public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
254+
{
255+
var exportedItems = new List<Activity>();
256+
using (var tracer = Sdk.CreateTracerProviderBuilder()
257+
.AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration())
258+
.AddInMemoryExporter(exportedItems)
259+
.Build())
260+
{
261+
string baggageGuid = Guid.NewGuid().ToString();
262+
Baggage.SetBaggage("TestItem", baggageGuid);
263+
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
264+
await _channel.ConfirmSelectAsync();
265+
266+
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
267+
await Task.Delay(500);
268+
269+
string queueName = $"{Guid.NewGuid()}";
270+
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
271+
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
272+
byte[] consumeBody = null;
273+
var consumer = new EventingBasicConsumer(_channel);
274+
var consumerReceivedTcs =
275+
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
276+
consumer.Received += (o, a) =>
277+
{
278+
consumeBody = a.Body.ToArray();
279+
string baggageItem = Baggage.GetBaggage("TestItem");
280+
if (baggageItem == baggageGuid)
281+
{
282+
consumerReceivedTcs.SetResult(true);
283+
}
284+
else
285+
{
286+
consumerReceivedTcs.SetException(
287+
EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
288+
}
289+
};
290+
291+
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
292+
CachedString exchange = new CachedString("");
293+
CachedString routingKey = new CachedString(queueName);
294+
await _channel.BasicPublishAsync(exchange, routingKey, sendBody);
295+
await _channel.WaitForConfirmsOrDieAsync();
296+
Baggage.ClearBaggage();
297+
Assert.Null(Baggage.GetBaggage("TestItem"));
298+
299+
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
300+
Assert.True(await consumerReceivedTcs.Task);
301+
302+
await _channel.BasicCancelAsync(consumerTag);
303+
await Task.Delay(500);
304+
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
305+
}
306+
}
192307

193308
[Theory]
194309
[InlineData(true)]

0 commit comments

Comments
 (0)