Skip to content

Semaphore exception when publishing with confirmation #1818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
patriktiain opened this issue Mar 27, 2025 · 26 comments · May be fixed by #1819
Open

Semaphore exception when publishing with confirmation #1818

patriktiain opened this issue Mar 27, 2025 · 26 comments · May be fixed by #1819
Assignees
Labels
Milestone

Comments

@patriktiain
Copy link

Describe the bug

Found on client version 7.1.1

I'm running in parallel allowing multiple threads using the same IChannel instance to publish message with confirmation.

When we use the version 6 of the client we had a channel pool to make sure that the channel wasn't shared between threads but when we upgraded to version 7 and got noticed that it might be thread safe (From discussion like this one #1721). We have started to test with a more liberal channel pool that allows multiple threads publish on the same IChannel instance.

After a period testing we have found multiple semaphore exceptions among a burst of confirmation fails. All exceptions happened within one second.

System.InvalidOperationException: BUG FOUND - please report this exception (with stacktrace) here: https://github.com/rabbitmq/rabbitmq-dotnet-client/issuesh
 ---> System.Threading.SemaphoreFullException: Adding the specified count to the semaphore would cause it to exceed its maximum count.
   at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
   at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
   at RabbitMQ.Client.Impl.Channel.BasicPublishAsync[TProperties](String exchange, String routingKey, Boolean mandatory, TProperties basicProperties, ReadOnlyMemory`1 body, CancellationToken cancellationToken)

We are using RabbitMQ server version 3.11.28 and .Net 8.

Reproduction steps

1.Enable confirmation on channel
2.Publish in parallel on channel

Expected behavior

Additional context

No response

@lukebakken lukebakken self-assigned this Mar 27, 2025
@lukebakken lukebakken removed the bug label Mar 27, 2025
@lukebakken lukebakken added this to the 7.1.3 milestone Mar 27, 2025
@lukebakken lukebakken added the bug label Mar 27, 2025
@lukebakken
Copy link
Collaborator

lukebakken commented Mar 27, 2025

Related:

Hello, thanks for the report.

There are tests that publish in parallel using the same channel, so you must have found an edge case. There's also not enough information to reproduce this issue, so I hope you can help out with that.

Can you reproduce this issue easily, or easily enough? Please upgrade to version 7.1.2 and try to reproduce this issue using that version.

After a period testing we have found multiple semaphore exceptions among a burst of confirmation fails

Can you describe this further? Why did you get nacks from RabbitMQ for confirmations? Did RabbitMQ log anything at that time?

lukebakken added a commit that referenced this issue Mar 27, 2025
Fixes #1818

Start by modifying test to concurrently publish to the same `IChannel` instance.
@lukebakken lukebakken linked a pull request Mar 27, 2025 that will close this issue
@lukebakken
Copy link
Collaborator

@patriktiain please see the test I modified in this PR - #1819

Can you confirm that I'm publishing with the same channel using multiple threads (Tasks) in a similar manner to you? If not, could you provide code that publishes in the same manner as you?

lukebakken added a commit that referenced this issue Mar 27, 2025
Fixes #1818

Start by modifying test to concurrently publish to the same `IChannel` instance.
@patriktiain
Copy link
Author

Hi! Thank you for you fast response and contribution!

During a 18 hour test with 2.6 million publish this exception was thrown 10 times. So I wouldn't call it easy nor difficult but I don't think I can do it today. But I will check with our testing resources.

After a period testing we have found multiple semaphore exceptions among a burst of confirmation fails

I'm sorry for not being clear. I didn't mean confirmation Nack. I meant that the wait for the confirmation timed out. On this run we used a timeout of 5 seconds. So we get this SemaphoreException and a burst of Confirmation Timeouts.

I have looked through some system metrics from the test and I suspect some other process were using quite a lot of resources during the time we got these exceptions. Might have something to do with it or not, I don't know.

Did RabbitMQ log anything at that time?

The semaphore exception occurred:

2025-03-27 13:28:20.588
2025-03-27 13:28:20.599
2025-03-27 13:28:20.620
2025-03-27 13:28:20.641

2025-03-27 13:41:57.078
2025-03-27 13:41:57.096

2025-03-27 14:24:50.593
2025-03-27 14:24:50.603

The only RabbitMQ logs around that time were:

> 2025-03-27 13:16:57.396000+01:00 [warning] <0.3012.0> AMQP 0-9-1 client call timeout was 70000 ms, is updated to a safe effective value of 130000 ms
> 2025-03-27 13:56:51.880000+01:00 [warning] <0.2531.0> Channel is stopping with 1 pending publisher confirms
> 2025-03-27 15:40:00.118000+01:00 [warning] <0.2575.0> Channel is stopping with 1 pending publisher confirms

Can you confirm that I'm publishing with the same channel using multiple threads (Tasks) in a similar manner to you? If not, could you provide code that publishes in the same manner as you?

Yes, I would say this is similar. The only difference is:

  • We use different settings for the RateLimiter. We have set it to 150 concurrent calls and 50% throttling percentage.
  • We never check that we have reached the RateLimiter as you did in the test:
    if (i % 128 == 0) { await WaitPublishTasksAsync(publishTasks); }
    However we have some metrics gauging the number of threads that uses the same Channel and it in this case we can't see it even reach 100 per Channel.

I hope this provides enough information!

@michaelklishin
Copy link
Contributor

Within RabbitMQ itself, we have seen timeouts of 5 seconds — it happens to be a common default in parts of Erlang/OTP — to be guaranteed to produce false positives under substantial load. So the practical minimum we try to use is 15s, although, of course, there is no right answer since every deployment, host resources and what constitutes "substantial load" will vary.

Is the behavior the same with a 15s timeout, just to compare?

@patriktiain
Copy link
Author

Within RabbitMQ itself, we have seen timeouts of 5 seconds — it happens to be a common default in parts of Erlang/OTP — to be guaranteed to produce false positives under substantial load.

Interesting, I didn't know of any default timeouts in RabbitMQ. Is there any documentation or blog post I can read on this subject?

We can't use such a high publish confirmation timeout as we time restrictions from the client that created the message. Usually we use 2 seconds for confirmation timeout

@michaelklishin
Copy link
Contributor

michaelklishin commented Mar 28, 2025

I didn't know of any default timeouts in RabbitMQ

I was referring to operation timeouts between the components of RabbitMQ, different cluster members, and so on. There are no timeouts on client publishing operations (publisher confirms).

There is a delivery acknowledgement timeout but it is 30 minutes by default or so, and it does not apply to your case.

@lukebakken
Copy link
Collaborator

I'm sorry for not being clear. I didn't mean confirmation Nack. I meant that the wait for the confirmation timed out. On this run we used a timeout of 5 seconds.

Can you tell me how you are setting this confirmation timeout? I want to be sure I'm doing it exactly the same way. Thank you.

@patriktiain
Copy link
Author

By CancellationToken

try
{    
    await channel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, CreateCancellationTokenSource().Token);
}
    catch (TaskCanceledException ex)
{
    _log.Error(ex);
    throw new RabbitMqException($"Failed to receive publisher confirms (publishtimeout on '{routingKey}' with timeout {_publishingAckTimeoutMs}");
}

private CancellationTokenSource CreateCancellationTokenSource() =>
    new CancellationTokenSource(TimeSpan.FromMilliseconds(_publishingAckTimeoutMs));

Where _publishingAckTimeoutMs is the configurable timeout. In the last test it was 5000 ms.

Thank you!

@lukebakken
Copy link
Collaborator

Great, thank you. You're probably aware of this, but CancellationTokenSource is IDisposable. You may want to consider an explicit Dispose call, otherwise timers can be leaked. I ran into that issue with this library recently and had to fix it!

@patriktiain
Copy link
Author

Thank you! I didn't know that. I'll fix right away.

@lukebakken
Copy link
Collaborator

@patriktiain - can you re run your tests using version 7.1.3-alpha.1, available here?

https://www.myget.org/feed/rabbitmq-dotnet-client/package/nuget/RabbitMQ.Client

@patriktiain
Copy link
Author

Thank you, I'll try to test it out this week.

I tested with version 7.1.2 and got more SemephoreExceptions then before.

Also, which exception should you catch in case of confirmation timeout? I have catched the TaskCancellationException but with 7.1.2 I got a lot of:

Failed to publish message to queue. RabbitMQ.Client.Exceptions.PublishException: Exception of type 'RabbitMQ.Client.Exceptions.PublishException' was thrown.
   at RabbitMQ.Client.Impl.Channel.PublisherConfirmationInfo.MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
   at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
   at RabbitMQ.Client.Impl.Channel.BasicPublishAsync[TProperties](String exchange, String routingKey, Boolean mandatory, TProperties basicProperties, ReadOnlyMemory`1 body, CancellationToken cancellationToken)

Is this another type of Exception? If yes, is it safest to dispose the channel after such a exception?

Thanks again!

@lukebakken
Copy link
Collaborator

@patriktiain - PublishException is thrown when the broker (RabbitMQ) sends basic.nack or basic.return for a message. You should not close the channel in this case, instead, figure out why the message was rejected by the broker. It will happen if you have a queue length limit, and reject-publish overflow behavior, for one case. If you publish messages with mandatory: true, and they are not routed to a queue, basic.return will be sent to your client, and PublishException thrown.

@patriktiain
Copy link
Author

Thank you for clearing that out!

@lukebakken
Copy link
Collaborator

@patriktiain I've made some comments related to this issue here - #1674 (reply in thread)

If you are using a 5 second timeout for confirmations from RabbitMQ, and experiencing those timeouts, your server is probably overloaded. Also, version 3.11 is very, very old and no longer supported. Just FYI.

Thanks for testing 7.1.3-alpha.1

@lukebakken
Copy link
Collaborator

@patriktiain - did you find time to test 7.1.3-alpha.1? I hate to nag but I'd like to release 7.1.3 and you're the only person that can trigger this issue.

@patriktiain
Copy link
Author

patriktiain commented Apr 8, 2025

Hi! Sorry, we just went through regression so the resources has used up by testers.

I tried it out today though. The application crashed unfortunately. I restarted it and it seams to be running now. We will attach some load within the hour.

Here is the exception from EventViewer.

Description: The application requested process termination through System.Environment.FailFast.
   RabbitMQ.Client.Impl.BasicCancelAsyncRpcContinuation.DoHandleCommandAsync(IncomingCommand cmd)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at RabbitMQ.Client.Impl.BasicCancelAsyncRpcContinuation.DoHandleCommandAsync(IncomingCommand cmd)
   at RabbitMQ.Client.Impl.AsyncRpcContinuation`1.HandleCommandAsync(IncomingCommand cmd)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at RabbitMQ.Client.Impl.AsyncRpcContinuation`1.HandleCommandAsync(IncomingCommand cmd)
   at RabbitMQ.Client.Impl.Channel.HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at RabbitMQ.Client.Impl.Channel.HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
   at RabbitMQ.Client.Impl.Session.HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
   at RabbitMQ.Client.Framing.Connection.ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at RabbitMQ.Client.Framing.Connection.ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
   at RabbitMQ.Client.Framing.Connection.ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AsyncStateMachineBox`1.MoveNext(Thread threadPoolThread)
   at System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(IAsyncStateMachineBox box, Boolean allowInlining)
   at System.Threading.Tasks.Task.RunContinuations(Object continuationObject)
   at System.Runtime.CompilerServices.AsyncValueTaskMethodBuilder.SetResult()
   at RabbitMQ.Client.Impl.InboundFrame.ReadFromPipeAsync(PipeReader reader, UInt32 maxInboundMessageBodySize, InboundFrame frame, CancellationToken mainLoopCancellationToken)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AsyncStateMachineBox`1.MoveNext(Thread threadPoolThread)
   at System.IO.Pipelines.StreamPipeReader.<ReadInternalAsync>g__Core|40_0(StreamPipeReader reader, Nullable`1 minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

@lukebakken
Copy link
Collaborator

lukebakken commented Apr 8, 2025

@patriktiain your environment is triggering some unusual behavior! The stack trace you provide is from hitting this code:

https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/main/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs#L153

The message The application requested process termination through System.Environment.FailFast can be generated by hitting a Debug.Assert, so maybe this line is being hit:

https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/main/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs#L283

If that line is hit, it means that the incoming consumer tag for the basic.cancel isn't what is expected. This seems to suggest that however you are using threading / Tasks, it doesn't jive well with this library.

Let me know how your testing goes.

lukebakken added a commit that referenced this issue Apr 8, 2025
Fixes #1818

Start by modifying test to concurrently publish to the same `IChannel` instance.
@lukebakken
Copy link
Collaborator

@patriktiain - if it's not too much trouble, please use 7.1.3-alpha.2 that I just published here:

https://www.myget.org/feed/rabbitmq-dotnet-client/package/nuget/RabbitMQ.Client

It has my latest changes from PR #1819. Thanks again.

@patriktiain
Copy link
Author

Hi! We started to load yesterday on 7.1.3-alpha1 on the test server and unfortunate we still se the exception.

System.InvalidOperationException: BUG FOUND - please report this exception (with stacktrace) here: https://github.com/rabbitmq/rabbitmq-dotnet-client/issues
---> System.Threading.SemaphoreFullException: Adding the specified count to the semaphore would cause it to exceed its maximum count.
at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.BasicPublishAsync[TProperties](String exchange, String routingKey, Boolean mandatory, TProperties basicProperties, ReadOnlyMemory`1 body, CancellationToken cancellationToken)

I have tried to figure out if we something strange setup in our environment. One thing could be that we have about 130 potential threads competing for 2 IChannels. This is not a setup that we think is viable in production but we wanted to test how high we can push the concurrency on this client.

I till try with the new build asap.

Thank you for continues support!

@tai-yi
Copy link

tai-yi commented Apr 10, 2025

I got same error but I found log on channel.BasicReturnAsync event as below.
Also use channel pooling😢

Client version: 7.1.0

RabbitMQ, Code:312 Reason:NO_ROUTE RoutingKey

@lukebakken
Copy link
Collaborator

@tai-yi please share what your code does in BasicReturnAsync. If you can, share all of your code. I have not yet been able to reproduce this issue.

@patriktiain - if possible, could we do a Zoom session so I could review your code? I'm sure it would give me insights into trying to reproduce this issue. Since you can't share your code, and I can't reproduce this issue, that is the best option at this time.

@tai-yi
Copy link

tai-yi commented Apr 15, 2025

@lukebakken
Sorry I cannot share all code, our production connected 2 RabbitMQ cluster.

private Task HandleBasicReturnAsync(object _, BasicReturnEventArgs args)
{
    Logger.LogWarning(
        "Message {id} was returned by RabbitMQ, Code:{code} Reason:{reason} RoutingKey {routingKey}",
        args.BasicProperties.MessageId,
        args.ReplyCode,
        args.ReplyText,
        args.RoutingKey);
    return Task.CompletedTask;
}

We use same code like below

public class RabbitMqPublisher   // DI Singleton service
{
    //instance has one MQ connection
    private readonly ObjectPool<PooledChannel> _channels;

  public RabbitMqPublisher(...)
  {
      _channels = new DefaultObjectPoolProvider().Create(new PooledChannelPolicy(this));
  }

private class PooledChannelPolicy(RabbitMqPublisher publisher) : PooledObjectPolicy<PooledChannel>
{
    public override PooledChannel Create()
    {
        return new PooledChannel(publisher);
    }
    public override bool Return(PooledChannel obj) => obj.Reusable;
}

}
  1. directly 3 nodes cluster (pod connect to each MQ node)

    No exception.

  2. connect 3 nodes cluster (gateway, not directly connect to MQ node)

    Got publish exception, there is no connection display in RabbitMQ console while get publish exception.
    Restart pod can resolve i t for few days( still got exception >2 days).
    Our production pod does not have permission to create a dotnet dump😂.

@tai-yi
Copy link

tai-yi commented Apr 15, 2025

I will try disable channel pooling on next release( maybe need 2 months)

@lukebakken
Copy link
Collaborator

@patriktiain - any word on testing the latest release?

FYI, I have started a new position that will take my time away from this project for a while, but I will continue to work on this issue when I can.

@patriktiain
Copy link
Author

@lukebakken, we have refactored a part of our RabbitMQ integration last week. Most of the changes are to use separate channels for publishing and topology. We will start load testing again this week. I haven't tested the new build.

Sure, we could have a zoom meeting next week if that is alright with you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants