Skip to content

refactor listenDirEvent #1589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
"github.com/dubbogo/go-zookeeper/zk"
gxset "github.com/dubbogo/gost/container/set"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"

Expand Down Expand Up @@ -80,15 +81,20 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
logger.Errorf("zookeeper client start error ,error message is %v", err)
return nil, err
}
err = c.client.Create(c.rootPath)
if err != nil && err != zk.ErrNodeExists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Is

return nil, err
}

// Before handle client restart, we need to ensure that the zk dynamic configuration successfully start and create the configuration directory
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)

// Start listener
c.listener = zookeeper.NewZkEventListener(c.client)
c.cacheListener = NewCacheListener(c.rootPath)

err = c.client.Create(c.rootPath)
c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
return c, err
return c, nil
}

func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/apache/dubbo-go-hessian2 v1.9.3
github.com/creasty/defaults v1.5.2
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.19
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a
github.com/dubbogo/grpc-go v1.42.5-triple
github.com/dubbogo/triple v1.1.3
github.com/emicklei/go-restful/v3 v3.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4D
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A=
github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a h1:RLUCy1Rftro4EmUmqWQCdofwgo9mzPbrZ6d6xWgZNwo=
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/grpc-go v1.42.5-triple h1:Ed5z/ikkpdZHBMA4mTEthQFTQeKlHtkdAsQrZjTbFk8=
github.com/dubbogo/grpc-go v1.42.5-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
Expand Down
11 changes: 8 additions & 3 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (r *zkRegistry) InitListeners() {

// CreatePath creates the path in the registry center of zookeeper
func (r *zkRegistry) CreatePath(path string) error {
return r.ZkClient().Create(path)
err := r.ZkClient().Create(path)
if err != nil && err != zk.ErrNodeExists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, using errors.Is() is better than !=.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think != is just ok

return err
}
return nil
}

// DoRegister actually do the register job in the registry center of zookeeper
Expand Down Expand Up @@ -218,17 +222,18 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
}
logger.Infof("[Zookeeper Registry] Registry instance with root = %s, node = %s", root, node)
err = r.client.Create(root)
if err != nil {
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
return perrors.WithStack(err)
}

// try to register the node
// Try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err == nil {
return nil
}

// Maybe the node did exist, then we need to delete it first and recreate it
if perrors.Cause(err) == zk.ErrNodeExists {
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
Expand Down
48 changes: 14 additions & 34 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,52 +228,32 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
if err == nil {
ttl = timeout
} else {
logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL)
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// get current children for a zkPath
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkRootPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkRootPath, &event)
if err == errNilNode {
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkRootPath)
l.client.UnregisterEvent(zkRootPath, &event)
return
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)

// May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
l.client.UnregisterEvent(zkRootPath, &event)
continue
case <-l.exit:
l.client.UnregisterEvent(zkRootPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkRootPath)
return
case <-event:
logger.Debugf("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkRootPath, &event)
l.handleZkNodeEvent(zkRootPath, nil, listener)
continue
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.")
continue
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
Expand All @@ -283,7 +263,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// listen l service node
// Build the children path
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
Expand All @@ -294,7 +274,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkNodePath)
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath)
continue
}

Expand All @@ -308,13 +288,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li

l.client.RUnlock()
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkNodePath, perrors.WithStack(err))
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
}
logger.Debugf("Get children!{%s}", zkNodePath)
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Get children!{%s}", zkNodePath)
if !listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("[Zookeeper Listener] listen dubbo service key{%s}", zkNodePath)
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
Expand Down Expand Up @@ -366,7 +346,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type == zk.EventNodeChildrenChanged {
Expand Down