-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[FIXED] Subscriptions not closed in the cluster when using auto unsubscribe #551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch! I would like to discuss the need for a new broadcast function. It seems to be that setting max to 0 when gathering the subscriptions would be enough.
For tests, I like that you made the cluster check function more generic. We could modify checkClusterFormed to use yours.
server/client.go
Outdated
@@ -1344,7 +1344,7 @@ func (c *client) closeConnection() { | |||
// Forward on unsubscribes if we are not | |||
// a router ourselves. | |||
if c.typ != ROUTER { | |||
srv.broadcastUnSubscribe(sub) | |||
srv.broadcastForceUnSubscribe(sub) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could do without this new function by simply setting sub.max = 0
(with a comment ;-)) above in the loop where we gather the subscriptions. We are under the client's mutex lock that protects the subscription's object. https://github.com/nats-io/gnatsd/pull/551/files#diff-853eb184ac73cf9597d7833f6b89e9c9R1320
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that https://github.com/nats-io/gnatsd/pull/551/files#diff-853eb184ac73cf9597d7833f6b89e9c9R1320 is the right place for zeroing sub.max
, and no need new function)
server/route.go
Outdated
@@ -610,6 +610,13 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { | |||
s.broadcastInterestToRoutes(proto) | |||
} | |||
|
|||
func (s *Server) broadcastForceUnSubscribe(sub *subscription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May not need that function.
server/routes_test.go
Outdated
cluster := []*Server{srvA, srvB} | ||
|
||
// Wait for route to form. | ||
checkClusterFor(t, routesCheck(len(cluster)-1), 10*time.Second, cluster...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now that you use this function for different type of checks. However, to check for cluster formation, there is already a function. You could use:checkClusterFormed(t, srvA, srvB)
. I see that yours is more generic (I did something very similar in NATS Streaming server). Since checkClusterFormed()
is used quite a bit, we could change checkClusterFormed()
implementation to simply call into yours. Does not have to be part of this PR though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe https://github.com/nats-io/gnatsd/blob/master/test/cluster_test.go is better place for this test, if we test the behavior of the cluster? I can move it in this PR
@ingosus I marked it as "Request changes" because I would like to open the discussion about where the max is set. Seems to be that it could be done in a different place but could be convinced otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@ingosus Thanks! Nice simple fix and test. |
Resolves #NNN
git pull --rebase origin master
)Resolves #549