Skip to content

Commit 02d9190

Browse files
authored
Merge 6aa3f5c into 95c99f0
2 parents 95c99f0 + 6aa3f5c commit 02d9190

9 files changed

+23
-119
lines changed

go.sum

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:z
134134
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
135135
github.com/coredns/coredns v1.1.2 h1:bAFHrSsBeTeRG5W3Nf2su3lUGw7Npw2UKeCJm/3A638=
136136
github.com/coredns/coredns v1.1.2/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0=
137+
github.com/coreos/bbolt v1.3.4 h1:0VqjxUwoTLxM3PmsSIk0hI2ao6gTtButQ2z8FT4//yo=
137138
github.com/coreos/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
138139
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
139140
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -227,7 +228,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
227228
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
228229
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
229230
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
230-
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
231231
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
232232
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4=
233233
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=

protocol/dubbo/dubbo_invoker_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ func InitTest(t *testing.T) (protocol.Protocol, *common.URL) {
105105
ConnectionNum: 2,
106106
HeartbeatPeriod: "5s",
107107
SessionTimeout: "20s",
108-
PoolTTL: 600,
109-
PoolSize: 64,
110108
GettySessionParam: getty.GettySessionParam{
111109
CompressEncoding: false,
112110
TcpNoDelay: true,

protocol/dubbo/dubbo_protocol_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ func initDubboInvokerTest() {
6464
ConnectionNum: 1,
6565
HeartbeatPeriod: "3s",
6666
SessionTimeout: "20s",
67-
PoolTTL: 600,
68-
PoolSize: 64,
6967
GettySessionParam: getty.GettySessionParam{
7068
CompressEncoding: false,
7169
TcpNoDelay: true,

remoting/getty/config.go

-6
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,6 @@ type (
9595
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
9696
sessionTimeout time.Duration
9797

98-
// Connection Pool
99-
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
100-
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
101-
10298
// grpool
10399
GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"`
104100
QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"`
@@ -116,8 +112,6 @@ func GetDefaultClientConfig() ClientConfig {
116112
ConnectionNum: 16,
117113
HeartbeatPeriod: "30s",
118114
SessionTimeout: "180s",
119-
PoolSize: 4,
120-
PoolTTL: 600,
121115
GrPoolSize: 200,
122116
QueueLen: 64,
123117
QueueNumber: 10,

remoting/getty/getty_client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (c *Client) Connect(url *common.URL) error {
148148
initClient(url.Protocol)
149149
c.conf = *clientConf
150150
// new client
151-
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
151+
c.pool = newGettyRPCClientConnPool(c)
152152
c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false)
153153

154154
// codec

remoting/getty/getty_client_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func getClient(url *common.URL) *Client {
9393
}
9494

9595
func testClient_Call(t *testing.T, svr *Server, url *common.URL, c *Client) {
96-
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
96+
c.pool = newGettyRPCClientConnPool(c)
9797

9898
testGetBigPkg(t, c)
9999
testGetUser(t, c)
@@ -342,8 +342,6 @@ func InitTest(t *testing.T) (*Server, *common.URL) {
342342
ConnectionNum: 2,
343343
HeartbeatPeriod: "5s",
344344
SessionTimeout: "20s",
345-
PoolTTL: 600,
346-
PoolSize: 64,
347345
GettySessionParam: GettySessionParam{
348346
CompressEncoding: false,
349347
TcpNoDelay: true,

remoting/getty/pool.go

+17-82
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
229229
}
230230
}()
231231
if removeFlag {
232-
c.pool.safeRemove(c)
232+
c.pool.resetConn()
233233
c.close()
234234
}
235235
}
@@ -325,123 +325,58 @@ func (c *gettyRPCClient) close() error {
325325

326326
type gettyRPCClientPool struct {
327327
rpcClient *Client
328-
size int // size of []*gettyRPCClient
329-
ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
330328
sslEnabled bool
329+
closed bool
331330

332331
sync.Mutex
333-
conns []*gettyRPCClient
332+
conn *gettyRPCClient
334333
}
335334

336-
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
335+
func newGettyRPCClientConnPool(rpcClient *Client) *gettyRPCClientPool {
337336
return &gettyRPCClientPool{
338337
rpcClient: rpcClient,
339-
size: size,
340-
ttl: int64(ttl.Seconds()),
341-
// init capacity : 2
342-
conns: make([]*gettyRPCClient, 0, 2),
338+
closed: false,
343339
}
344340
}
345341

346342
func (p *gettyRPCClientPool) close() {
347343
p.Lock()
348-
conns := p.conns
349-
p.conns = nil
344+
conn := p.conn
345+
p.conn = nil
346+
p.closed = true
350347
p.Unlock()
351-
for _, conn := range conns {
348+
if conn != nil {
352349
conn.close()
353350
}
354351
}
355352

356353
func (p *gettyRPCClientPool) getGettyRpcClient(addr string) (*gettyRPCClient, error) {
354+
p.Lock()
355+
defer p.Unlock()
357356
conn, connErr := p.get()
358357
if connErr == nil && conn == nil {
359358
// create new conn
360359
rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr)
361360
if rpcErr == nil {
362-
p.put(rpcClientConn)
361+
p.conn = rpcClientConn
363362
}
364363
return rpcClientConn, perrors.WithStack(rpcErr)
365364
}
366365
return conn, perrors.WithStack(connErr)
367366
}
368367

369368
func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
370-
now := time.Now().Unix()
371-
372-
p.Lock()
373-
defer p.Unlock()
374-
if p.conns == nil {
369+
if p.closed {
375370
return nil, errClientPoolClosed
376371
}
377-
for num := len(p.conns); num > 0; {
378-
var conn *gettyRPCClient
379-
if num != 1 {
380-
conn = p.conns[rand.Int31n(int32(num))]
381-
} else {
382-
conn = p.conns[0]
383-
}
384-
// This will recreate gettyRpcClient for remove last position
385-
// p.conns = p.conns[:len(p.conns)-1]
386-
387-
if d := now - conn.getActive(); d > p.ttl {
388-
p.remove(conn)
389-
go conn.close()
390-
num = len(p.conns)
391-
continue
392-
}
393-
conn.updateActive(now) // update active time
394-
return conn, nil
372+
if p.conn != nil {
373+
return p.conn, nil
395374
}
396375
return nil, nil
397376
}
398377

399-
func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
400-
if conn == nil || conn.getActive() == 0 {
401-
return
402-
}
403-
p.Lock()
404-
defer p.Unlock()
405-
if p.conns == nil {
406-
return
407-
}
408-
// check whether @conn has existed in p.conns or not.
409-
for i := range p.conns {
410-
if p.conns[i] == conn {
411-
return
412-
}
413-
}
414-
if len(p.conns) >= p.size {
415-
// delete @conn from client pool
416-
// p.remove(conn)
417-
conn.close()
418-
return
419-
}
420-
p.conns = append(p.conns, conn)
421-
}
422-
423-
func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
424-
if conn == nil || conn.getActive() == 0 {
425-
return
426-
}
427-
428-
if p.conns == nil {
429-
return
430-
}
431-
432-
if len(p.conns) > 0 {
433-
for idx, c := range p.conns {
434-
if conn == c {
435-
p.conns = append(p.conns[:idx], p.conns[idx+1:]...)
436-
break
437-
}
438-
}
439-
}
440-
}
441-
442-
func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
378+
func (p *gettyRPCClientPool) resetConn() {
443379
p.Lock()
444380
defer p.Unlock()
445-
446-
p.remove(conn)
381+
p.conn = nil
447382
}

remoting/getty/pool_test.go

+3-20
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package getty
1919

2020
import (
2121
"testing"
22-
"time"
2322
)
2423

2524
import (
@@ -28,24 +27,8 @@ import (
2827

2928
func TestGetConnFromPool(t *testing.T) {
3029
var rpcClient Client
31-
32-
clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second))
33-
34-
var conn1 gettyRPCClient
35-
conn1.active = time.Now().Unix()
36-
clientPoll.put(&conn1)
37-
assert.Equal(t, 1, len(clientPoll.conns))
38-
39-
var conn2 gettyRPCClient
40-
conn2.active = time.Now().Unix()
41-
clientPoll.put(&conn2)
42-
assert.Equal(t, 1, len(clientPoll.conns))
43-
conn, err := clientPoll.get()
44-
assert.Nil(t, err)
45-
assert.Equal(t, &conn1, conn)
46-
time.Sleep(6 * time.Second)
47-
conn, err = clientPoll.get()
48-
assert.Nil(t, conn)
30+
clientPoll := newGettyRPCClientConnPool(&rpcClient)
31+
cli, err := clientPoll.get()
32+
assert.Nil(t, cli)
4933
assert.Nil(t, err)
50-
assert.Equal(t, 0, len(clientPoll.conns))
5134
}

remoting/getty/readwriter_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ func getServer(t *testing.T) (*Server, *common.URL) {
9191
ConnectionNum: 2,
9292
HeartbeatPeriod: "5s",
9393
SessionTimeout: "20s",
94-
PoolTTL: 600,
95-
PoolSize: 64,
9694
GettySessionParam: GettySessionParam{
9795
CompressEncoding: false,
9896
TcpNoDelay: true,

0 commit comments

Comments
 (0)