-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[ADDED] Account Support #755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note this work is backwards compatible, but only works for a single server. The cluster updates will be a separate PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments so far. Will continue review.
for _, im := range ims { | ||
// We have a match for a local subscription with an import from another account. | ||
// We will create a shadow subscription. | ||
nsub := *sub // copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a clone() here? This copy does not copy much ;-), the []byte will still point to sub, so are the pointers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is ok, I just wanted to not overwrite anything in the main sub when doing subject or reply rewriting.
server/client.go
Outdated
c.mu.Unlock() | ||
|
||
for sid, sub := range subs { | ||
if c.sl != nil && !c.canSubscribe(sub.subject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
canSubscribe() needs locking. During a reload, the client's perms may be changed (under the client's lock).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -143,6 +144,8 @@ type client struct { | |||
ncs string | |||
out outbound | |||
srv *Server | |||
acc *Account | |||
sl *Sublist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first it looked like a client had its own sublist, but I think this is just a reference to server's sublist. Did you do that so that you don't have to do c.srv.sl
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This brings the point that client should always have c.srv
!= nil, and we would not have many places where we check for that. It's probably only in few tests that we create client directly, maybe it would be worth "fixing" those instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since sl has it's own locking did not want to have to potentially lock c.acc in performance paths.
server/server.go
Outdated
return s.opts.AllowNewAccounts | ||
} | ||
|
||
func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this and below: do we need them to be exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I know that you don't like to name returned parameters (and me too somehow), but then we need to document them ;-) (here the boolean is true if did not exist and now registered, but could have been the opposite).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if people embed the server and want to configure without a config file they might, but agree we should double check. No problem on name parameters in some cases, agree here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment and named params.
c.parse(op) | ||
for cp := range c.pcd { | ||
cp.mu.Lock() | ||
cp.flags.clear(flushOutbound) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to understand why we need this? The only place this flag is set is in flushOutbound() itself and is cleared when this function returns. The purpose of the flag was to prevent re-entrant calls since flushOutbound() releases client's lock when doing actual write to socket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests do not run a flush Go routine, and by default data (msgs) are expected to be sent via that Go routine, so we need to flush the pcd list in the test, which we do. What I was finding was that the flag was set every once in while so the manual flush was failing, so I clear the flag to force it in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments...
) | ||
|
||
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { | ||
opts := defaultServerOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a t.Helper()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok will do.
server/accounts_test.go
Outdated
|
||
func TestAccountIsolation(t *testing.T) { | ||
s, fooAcc, barAcc := simpleAccountServer(t) | ||
if fooAcc == nil || barAcc == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed since simpleAccountServer()
would have failed if those were not properly registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if err != nil { | ||
t.Fatalf("Error for client 'bar' from server: %v", err) | ||
} | ||
if !strings.HasPrefix(l, "PONG\r\n") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this proves it. You first did a go cbar.parse([]byte("SUB foo 1\r\nPING\r\nPING\r\n"))
which sent 2 PINGs, then consumed one before doing foo
work. Ideally, you would then send the second PING after that, not prior, and then getting PONG would better prove that nothing went to bar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look.
s = New(&opts) | ||
|
||
c, _, _ = newClientForServer(s) | ||
err := c.parse(connectOp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you add a PING after the connect and get the PONG to show that there was no issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look.
t.Fatalf("Expected to see 2 accounts in opts, got %d", la) | ||
} | ||
|
||
if lu := len(opts.Users); lu != 4 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That brings a point: you can't have same user name across multiple accounts then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, was discussed in the DD specifically. Do you feel we should allow?
if err != nil { | ||
t.Fatalf("Error for client 'bar' from server: %v", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we consume PONG here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do with ReadString.
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) | ||
} | ||
checkPayload(crBar, []byte("hello\r\n"), t) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test that since bar only import *
, it would not get say foo.bar
(even though foo exports ">")? Or there may be another test checking that already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested elsewhere.
server/accounts_test.go
Outdated
t.Fatalf("Error registering client with 'bar' account: %v", err) | ||
} | ||
|
||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong comment again..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. | ||
t.Fatalf("Error adding account export to client foo: %v", err) | ||
} | ||
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addStreamImport()
checks if prefix's last character is .
or not, and if not, adds it. We have tests without the .
and here with it, which is good. Maybe a comment above this line to say that .
is added automatically but testing that it works if explicitly specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
server/accounts_test.go
Outdated
t.Fatalf("Error registering client with 'bar' account: %v", err) | ||
} | ||
|
||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, when I started the work originally we just had import and export. Once stream and service became evident I should have updated the original tests. Thanks..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I meant the // Public ...
comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look in a bit the others that you mentioned. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting close...
server/accounts_test.go
Outdated
t.Fatalf("Error registering client with 'bar' account: %v", err) | ||
} | ||
|
||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what I actually meant was the presence of // Public with no accounts defined.
is wrong since you provide an account.
server/accounts_test.go
Outdated
t.Fatalf("Error registering client with 'bar' account: %v", err) | ||
} | ||
|
||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above :-)
server/client.go
Outdated
c.mu.Unlock() | ||
|
||
for _, sub := range removed { | ||
c.sl.Remove(sub) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or could skip this and before or after the loop call c.sl.RemoveBatch(removed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.. Will change..
server/accounts_test.go
Outdated
t.Fatalf("Error registering client with 'bar' account: %v", err) | ||
} | ||
|
||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I meant the // Public ...
comment
server/server.go
Outdated
@@ -769,13 +833,14 @@ func (s *Server) copyInfo() Info { | |||
|
|||
func (s *Server) createClient(conn net.Conn) *client { | |||
// Snapshot server options. | |||
// TODO(dlc) - This can get expensive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is not. There is no copy/clone. s.getOpts() is just returning the pointer to the options under the optsMu lock, so that it is safe with the reload code which switch s.opts with new options pointer under that same lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, much better then ;)
} | ||
} | ||
|
||
func TestImportExportConfigFailures(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test, please check that returned error contains string that you expect. I noticed that some errors are just malformed config (say expect map got string) and not at all what is the expected error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok will take a look. This errors from the config parsing?
server/accounts_test.go
Outdated
cf = createConfFile(t, []byte(` | ||
accounts { | ||
nats.io { | ||
exports = [{service: {account: nats.io, subject:"foo.*"}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a missing bracket in this one (so right now the error is a parsing error)
diff --git a/server/accounts_test.go b/server/accounts_test.go
index eee29b3..dda4a25 100644
--- a/server/accounts_test.go
+++ b/server/accounts_test.go
@@ -423,13 +423,15 @@ func TestImportExportConfigFailures(t *testing.T) {
cf = createConfFile(t, []byte(`
accounts {
nats.io {
- exports = [{service: {account: nats.io, subject:"foo.*"}]
+ exports = [{service: {account: nats.io, subject:"foo.*"}}]
}
}
`))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -312,22 +320,33 @@ func (c *client) initClient() { | |||
} | |||
} | |||
|
|||
// RegisterWithAccount will register the given user with a specific | |||
// account. This will change the subject namespace. | |||
func (c *client) registerWithAccount(acc *Account) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this is called only if a client lib would send the account name in CONNECT. So how does this work if one defined an account in the server and list users there. I would have imagined that if a regular client connects with that user name, it would be associated with the account, which does not seem to be the case. Could you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this was for programmatic not file parsing. But this has been fixed.
server/client.go
Outdated
if shouldForward { | ||
c.srv.broadcastSubscribe(sub) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Check to see if we need to create a shadow subscription due to imports | ||
// in other accounts. | ||
// Assume lock is held |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually not, this function grabs the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
79f78ac
to
b453133
Compare
Signed-off-by: Derek Collison <[email protected]>
a732ede
to
14cdda8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just few comments
Signed-off-by: Derek Collison <[email protected]>
This PR introduces accounts and account isolation. Users are assigned to one account per connection and that defines their subject space.
We also introduce streams and services. These are ways to share messages across accounts with the proper permissions. Streams are pub/sub and Services are usually req/reply semantics.
We allow explicit permissions to services to also generate implicit permissions in realtime for one time use responses.
We also have support for configuration parsing.
/cc @nats-io/core