Skip to content

Commit 9fbdee5

Browse files
authored
Merge pull request #1809 from rabbitmq/rabbitmq-dotnet-client-1802
Address `ObjectDisposedException`
2 parents bdbfa3b + 9a8d57e commit 9fbdee5

File tree

7 files changed

+395
-78
lines changed

7 files changed

+395
-78
lines changed

projects/Applications/CreateChannel/Program.cs

+30-6
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131

3232
using System;
3333
using System.Diagnostics;
34-
using System.Threading;
34+
using System.Globalization;
35+
using System.Runtime.ExceptionServices;
3536
using System.Threading.Tasks;
3637

3738
using RabbitMQ.Client;
39+
using RabbitMQ.Client.Exceptions;
3840

3941
namespace CreateChannel
4042
{
@@ -44,11 +46,11 @@ public static class Program
4446
private const int ChannelsToOpen = 50;
4547

4648
private static int channelsOpened;
47-
private static AutoResetEvent doneEvent;
49+
private readonly static TaskCompletionSource<bool> s_tcs = new();
4850

4951
public static async Task Main()
5052
{
51-
doneEvent = new AutoResetEvent(false);
53+
AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;
5254

5355
var connectionFactory = new ConnectionFactory { };
5456
await using IConnection connection = await connectionFactory.CreateConnectionAsync();
@@ -67,26 +69,48 @@ public static async Task Main()
6769

6870
for (int j = 0; j < channels.Length; j++)
6971
{
72+
if (j % 2 == 0)
73+
{
74+
try
75+
{
76+
await channels[j].QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
77+
}
78+
catch (Exception)
79+
{
80+
}
81+
}
7082
await channels[j].DisposeAsync();
7183
}
7284
}
7385

74-
doneEvent.Set();
86+
s_tcs.SetResult(true);
7587
});
7688

7789
Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
7890
Console.WriteLine();
7991
Console.WriteLine("Opened");
80-
while (!doneEvent.WaitOne(500))
92+
while (false == s_tcs.Task.IsCompleted)
8193
{
94+
await Task.Delay(500);
8295
Console.WriteLine($"{channelsOpened,5}");
8396
}
8497
watch.Stop();
8598
Console.WriteLine($"{channelsOpened,5}");
8699
Console.WriteLine();
87100
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
101+
}
102+
103+
private static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);
88104

89-
Console.ReadLine();
105+
private static void CurrentDomain_FirstChanceException(object sender, FirstChanceExceptionEventArgs e)
106+
{
107+
if (e.Exception is OperationInterruptedException)
108+
{
109+
}
110+
else
111+
{
112+
Console.Error.WriteLine("{0} [ERROR] {1}", Now, e.Exception);
113+
}
90114
}
91115
}
92116
}

projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs

+23
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
9696
return _tcsConfiguredTaskAwaitable.GetAwaiter();
9797
}
9898

99+
public abstract ProtocolCommandId[] HandledProtocolCommandIds { get; }
100+
99101
public async Task HandleCommandAsync(IncomingCommand cmd)
100102
{
101103
try
@@ -203,6 +205,9 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout,
203205
{
204206
}
205207

208+
public override ProtocolCommandId[] HandledProtocolCommandIds
209+
=> [ProtocolCommandId.ConnectionSecure, ProtocolCommandId.ConnectionTune];
210+
206211
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
207212
{
208213
if (cmd.CommandId == ProtocolCommandId.ConnectionSecure)
@@ -240,6 +245,9 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan
240245
_expectedCommandId = expectedCommandId;
241246
}
242247

248+
public override ProtocolCommandId[] HandledProtocolCommandIds
249+
=> [_expectedCommandId];
250+
243251
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
244252
{
245253
if (cmd.CommandId == _expectedCommandId)
@@ -297,6 +305,9 @@ public BasicConsumeAsyncRpcContinuation(IAsyncBasicConsumer consumer, IConsumerD
297305
_consumerDispatcher = consumerDispatcher;
298306
}
299307

308+
public override ProtocolCommandId[] HandledProtocolCommandIds
309+
=> [ProtocolCommandId.BasicConsumeOk];
310+
300311
protected override async Task DoHandleCommandAsync(IncomingCommand cmd)
301312
{
302313
if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk)
@@ -326,6 +337,9 @@ public BasicGetAsyncRpcContinuation(Func<ulong, ulong> adjustDeliveryTag,
326337
_adjustDeliveryTag = adjustDeliveryTag;
327338
}
328339

340+
public override ProtocolCommandId[] HandledProtocolCommandIds
341+
=> [ProtocolCommandId.BasicGetOk, ProtocolCommandId.BasicGetEmpty];
342+
329343
internal DateTime StartTime { get; } = DateTime.UtcNow;
330344

331345
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
@@ -441,6 +455,9 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellati
441455
{
442456
}
443457

458+
public override ProtocolCommandId[] HandledProtocolCommandIds
459+
=> [ProtocolCommandId.QueueDeclareOk];
460+
444461
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
445462
{
446463
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
@@ -481,6 +498,9 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellatio
481498
{
482499
}
483500

501+
public override ProtocolCommandId[] HandledProtocolCommandIds
502+
=> [ProtocolCommandId.QueueDeleteOk];
503+
484504
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
485505
{
486506
if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk)
@@ -504,6 +524,9 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellation
504524
{
505525
}
506526

527+
public override ProtocolCommandId[] HandledProtocolCommandIds
528+
=> [ProtocolCommandId.QueuePurgeOk];
529+
507530
protected override Task DoHandleCommandAsync(IncomingCommand cmd)
508531
{
509532
if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk)

0 commit comments

Comments
 (0)