-
Notifications
You must be signed in to change notification settings - Fork 147
Incoming messages only processed when sending messages #490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I don't think you can use a queue in your scenario. With a queue, each message goes FIFO and goes to ONE receiver only. In this case you have both clients reading from the same queue. They are effectively competing for every message. There is no guarantee that a message sent by client1 goes to client2. In many times, the message is read back by the client itself. For your scenario you would want to use a topic with n subscribers. Each subscriber now gets a copy of the message and it can filter out its own messages so overall it looks like it is broadcasting to other clients. |
Thank you for responding. I am using a topic however (specifying /topic/foo for source/target) and all messages are delivered to both clients, just only as part of sending a message. In the readme in the linked repro, you can see examples of how messages are received. Using rabbitmq i am also seeing a queue created per connected client. Is there something more i should do when using a topic? EDIT: Just in case I didn't communicate it clearly: The repro I linked to was created solely for this issue and contains only the minimal code needed (e.g. I am not pointing you to some huge project full of other code :) |
I believe the issue you observed is described here SendAsync continuation is run on the connection's I/O thread, and Console.ReadLine() is blocking it, so the connection is not reading the transport until ReadLine returns. This is a danger of having blocking calls in an async ode path. To confirm if this is true, you can try adding the following line before
|
Damn, you are right. I can also see that OnMessage gets called by the same thread that the main loop runs on. I thought the IO thread was a dedicated thread created by amqpnetlite (meaning I should just avoid blocking it in the callback). Doing readline in a Task.Run fixes the problem in this test app. What exactly determines what thread is used to receive messages? Is it the thread calling I need to go back to my real code base and check if it is indeed the same problem there. I will report back (and probably close the issue) ASAP. |
Playing around with it some more, and I can see that callbacks always occurs on the main thread of the running program (and doing receiverLink.Start in another thread does not change that). I can solve the problem in my sample app by running the "application logic"( the little loop) in a separate thread. In my real codebase (which is a simple notification API supporting different different backends), I have a sender thread that reads outgoing messages from an in-memory queue. It sounds like I should do something similar for receiving messages. E.g. run a receiver thread calling the blocking Recieve method, or will I still have a connection pump on my main thread? Btw, you referenced https://github.com/Azure/amqpnetlite/blob/master/docs/articles/building_application.md#threading but reading that it specifically talks about not blocking the callbacks (which I am not). Rather I am blocking the thread that happens to be where the "connection pump" is running it seems. It kind of feels like its "hijacking" my main thread :) If you can help me understand the "threading model" behind the scenes I would be happy to do a little PR to extend the documentation and prevent others from hitting the same behavior. |
@xinchen10 Is it correctly understood that this is (essentially) the same problem as #237 ? I decide to resolve the problem in my real code base by simply stop using the Async APIs. There is no way that I can ensure that I (or users of my lib) won't block a thread pool thread for extended periods (and that thread might be the one running the pump), and that seems to be a requirement for using this async API. Instead I am creating a separate thread and using the Sync API. PS: Interestingly, I have backends implemented in the library using MQTTNet, Redis and a few others with async APIs, and I have not encountered anything similar to this problem. |
Of course, you can. You just need to handle the continuation properly and do not perform any client-specific work on the pump thread. Have a look at how I'm doing this in ArtemisNetClient which uses this lib under the hood and is fully async. Consumer: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/Consumer.cs#L33 |
Thanks for replying, it was this kind of response I was trying to "provoke" :)
See, this is what I don't get (probably me being dense :). We keep talking about the pump thread, but as far as I can tell from the code its an async pump, so it is not associated any thread specifically, right? That is why I am so surprised that it cares about what happens in its continuation. Looking at #237 , I get the impression that it might be related to synchronous continuation inside a lock? Normally when using an async API, I would not expect there to be any thread "affinity".
Looking at your producer code, it seems that you are using the sync API for sending: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/ProducerBase.cs#L110 ? Just to be clear, for my use case it is perfectly fine to use the sync API, I am looking at notifications with a very low rate, and very few channels, so a bit of extra overhead is not a problem. However, I really would like to understand it :) Maybe if i did I could help to explain it to anyone else that is confused as well (and looking at #237 I do not seem to be alone ). |
The pump is just while(true) loop statement that's reading bytes from the socket. If it receives enough data to parse a complete frame it deserializes it and invokes the client-specific code like your The same applies when you are sending a message. When the other side receives it, it sends back ack frame that completes this operation from your POV (again there's callback that completes In my producer example I manually control completion via my own I hope that answers your question. |
It is somewhat expected that you should not block in a callback ( I mean that would also apply with a dedicated receiver thread), but far less clear for a continuation.
So it is in fact because of I wonder if @xinchen10 would be open to a doc PR to clarify this a bit.
Thanks, I think I understand it now, and it also clarifies the discussion here: #237 . |
The library doesn't use dedicated threads for I/O operations, neither does it for internal library operations, so the term "pump thread" is not 100% correct. It is just a thread from thread pool that executes the connection's pump at a given moment. The framework later supported task creation and continuation options to run continuation asynchronously, but the problem is knowing the potential issue and mitigating it with that option. Alternatively, you could just make it the default and do not care about the performance penalty. We could add an option to connection factory that controls how user's callback and task continuation should be executed, but it goes back to the original question on what should be the default value and when it should be changed. PRs are always welcome. If we explain this better in the doc, it will help others. |
@xinchen10 I will close this issue and am just writing up a small PR for the docs. |
I am new to AMQP and amqpnetlite, but have been struggling with some fairly basic functionality. I am trying to implement a basic pub/sub API using AMQP, and I keep having problems with message only being received as part of sending a message, either immediately or after a short delay.
I have boiled it down to a very small example, with a dummy "chat" app, that I have made available here: https://github.com/petertiedemann/amqpnetlite-repro
The repo includes detailed repro steps, and ready-to-run code, but overall workflow is this:
The behavior I see depends a bit on the broker, but the general pattern is that at some point the client stops receiving incoming messages, but that as soon as the client sends a message, the callback is processed.
My guess is that there is something simple that I missed (and perhaps isn't documented?) Perhaps related to link credit?
The text was updated successfully, but these errors were encountered: