Skip to content

Commit 5aca35a

Browse files
authored
Fix object disposed exception during channel Recovery (#1648)
* Fix object disposed exception during channel Recovery Fixes #1647 * * Remove channels that do not recover successfully * * Hold the `_channelsSemaphore` for a shorter period of time.
1 parent db5bc23 commit 5aca35a

File tree

5 files changed

+109
-9
lines changed

5 files changed

+109
-9
lines changed

RabbitMQDotNetClient.sln

+8-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
4040
EndProject
4141
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
4242
EndProject
43-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
43+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
44+
EndProject
45+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
4446
EndProject
4547
Global
4648
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -96,6 +98,10 @@ Global
9698
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
9799
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
98100
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
101+
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
102+
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
103+
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
104+
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
99105
EndGlobalSection
100106
GlobalSection(SolutionProperties) = preSolution
101107
HideSolutionNode = FALSE
@@ -110,6 +116,7 @@ Global
110116
{F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
111117
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
112118
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
119+
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
113120
EndGlobalSection
114121
GlobalSection(ExtensibilityGlobals) = postSolution
115122
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

+22-6
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,19 @@ public IAsyncBasicConsumer? DefaultConsumer
144144

145145
public string? CurrentQueue => InnerChannel.CurrentQueue;
146146

147-
internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
147+
internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
148148
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
149149
{
150150
if (false == recordedEntitiesSemaphoreHeld)
151151
{
152152
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
153153
}
154154

155-
ThrowIfDisposed();
155+
if (_disposed)
156+
{
157+
return false;
158+
}
159+
156160
_connection = conn;
157161

158162
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
@@ -189,15 +193,27 @@ await newChannel.TxSelectAsync(cancellationToken)
189193
* chance that an invalid Channel will be used to handle a basic.deliver frame,
190194
* with the resulting basic.ack never getting sent out.
191195
*/
192-
_innerChannel = newChannel;
193196

194-
if (recoverConsumers)
197+
if (_disposed)
195198
{
196-
await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld)
199+
await newChannel.AbortAsync()
197200
.ConfigureAwait(false);
201+
return false;
198202
}
203+
else
204+
{
205+
_innerChannel = newChannel;
206+
207+
if (recoverConsumers)
208+
{
209+
await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld)
210+
.ConfigureAwait(false);
211+
}
212+
213+
_innerChannel.RunRecoveryEventHandlers(this);
199214

200-
_innerChannel.RunRecoveryEventHandlers(this);
215+
return true;
216+
}
201217
}
202218

203219
public async Task CloseAsync(ushort replyCode, string replyText, bool abort,

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

+35-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Linq;
3435
using System.Threading;
3536
using System.Threading.Tasks;
@@ -583,12 +584,44 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
583584
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
584585
}
585586

586-
foreach (AutorecoveringChannel channel in _channels)
587+
var channelsToRecover = new List<AutorecoveringChannel>();
588+
await _channelsSemaphore.WaitAsync(cancellationToken)
589+
.ConfigureAwait(false);
590+
try
591+
{
592+
channelsToRecover.AddRange(_channels);
593+
}
594+
finally
595+
{
596+
_channelsSemaphore.Release();
597+
}
598+
599+
var notRecoveredChannels = new List<AutorecoveringChannel>();
600+
foreach (AutorecoveringChannel channel in channelsToRecover)
587601
{
588-
await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
602+
bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
589603
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
590604
cancellationToken: cancellationToken)
591605
.ConfigureAwait(false);
606+
607+
if (false == recovered)
608+
{
609+
notRecoveredChannels.Add(channel);
610+
}
611+
}
612+
613+
await _channelsSemaphore.WaitAsync(cancellationToken)
614+
.ConfigureAwait(false);
615+
try
616+
{
617+
foreach (AutorecoveringChannel channel in notRecoveredChannels)
618+
{
619+
_channels.Remove(channel);
620+
}
621+
}
622+
finally
623+
{
624+
_channelsSemaphore.Release();
592625
}
593626
}
594627
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<RootNamespace>GH_1647</RootNamespace>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
<Nullable>enable</Nullable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="../../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
13+
</ItemGroup>
14+
15+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
2+
using System.Text;
3+
using RabbitMQ.Client;
4+
5+
ConnectionFactory connectionFactory = new()
6+
{
7+
AutomaticRecoveryEnabled = true,
8+
UserName = "guest",
9+
Password = "guest"
10+
};
11+
12+
var props = new BasicProperties();
13+
byte[] msg = Encoding.UTF8.GetBytes("test");
14+
using var connection = await connectionFactory.CreateConnectionAsync();
15+
for (int i = 0; i < 300; i++)
16+
{
17+
try
18+
{
19+
using var channel = await connection.CreateChannelAsync(); // New channel for each message
20+
await Task.Delay(1000);
21+
await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg);
22+
Console.WriteLine($"Sent message {i}");
23+
}
24+
catch (Exception ex)
25+
{
26+
Console.WriteLine($"Failed to send message {i}: {ex.Message}");
27+
await Task.Delay(1000);
28+
}
29+
}

0 commit comments

Comments
 (0)