Skip to content

Commit b123b2f

Browse files
authored
Merge pull request globalsign#6 from globalsign/development
Merge development
2 parents 9b9ec8e + 5b87030 commit b123b2f

File tree

4 files changed

+122
-18
lines changed

4 files changed

+122
-18
lines changed

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ func (cluster *mongoCluster) removeServer(server *mongoServer) {
122122
other := cluster.servers.Remove(server)
123123
cluster.Unlock()
124124
if other != nil {
125-
other.Close()
125+
other.CloseIdle()
126126
log("Removed server ", server.Addr, " from cluster.")
127127
}
128-
server.Close()
128+
server.CloseIdle()
129129
}
130130

131131
type isMasterResult struct {

cluster_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
package mgo_test
2828

2929
import (
30+
"errors"
3031
"fmt"
3132
"io"
3233
"net"
3334
"strings"
3435
"sync"
36+
"sync/atomic"
3537
"time"
3638

3739
mgo "github.com/globalsign/mgo"
@@ -2087,3 +2089,72 @@ func (s *S) TestDoNotFallbackToMonotonic(c *C) {
20872089
c.Assert(q13b, Equals, q13a)
20882090
}
20892091
}
2092+
2093+
func (s *S) TestConnectServerFailed(c *C) {
2094+
dials := int32(0)
2095+
maxDials := 50
2096+
info := &mgo.DialInfo{
2097+
Addrs: []string{"localhost:40001"},
2098+
DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) {
2099+
n := atomic.AddInt32(&dials, 1)
2100+
if n == int32(maxDials/2) {
2101+
return nil, errors.New("expected dial failed")
2102+
}
2103+
return net.Dial("tcp", addr.String())
2104+
},
2105+
}
2106+
2107+
session, err := mgo.DialWithInfo(info)
2108+
c.Assert(err, IsNil)
2109+
defer session.Close()
2110+
2111+
mgo.ResetStats()
2112+
2113+
errs := make(chan error, 1)
2114+
var done int32
2115+
var finished sync.WaitGroup
2116+
var starting sync.WaitGroup
2117+
defer func() {
2118+
atomic.StoreInt32(&done, 1)
2119+
finished.Wait()
2120+
}()
2121+
for i := 0; i < maxDials; i++ {
2122+
finished.Add(1)
2123+
starting.Add(1)
2124+
go func(s0 *mgo.Session) {
2125+
defer finished.Done()
2126+
for i := 0; ; i++ {
2127+
if atomic.LoadInt32(&done) == 1 {
2128+
break
2129+
}
2130+
err := func(s0 *mgo.Session) error {
2131+
s := s0.Copy()
2132+
defer s.Close()
2133+
coll := s.DB("mydb").C("mycoll")
2134+
2135+
var ret []interface{}
2136+
return coll.Find(nil).All(&ret)
2137+
}(s0)
2138+
if err != nil {
2139+
select {
2140+
case errs <- err:
2141+
default:
2142+
}
2143+
}
2144+
if i == 0 {
2145+
starting.Done()
2146+
}
2147+
}
2148+
}(session)
2149+
time.Sleep(10 * time.Millisecond)
2150+
}
2151+
starting.Wait()
2152+
2153+
// no errors expect.
2154+
var opErr error
2155+
select {
2156+
case opErr = <-errs:
2157+
default:
2158+
}
2159+
c.Assert(opErr, IsNil)
2160+
}

server.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,16 @@ func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error)
187187
// Close forces closing all sockets that are alive, whether
188188
// they're currently in use or not.
189189
func (server *mongoServer) Close() {
190+
server.close(false)
191+
}
192+
193+
// CloseIdle closing all sockets that are idle,
194+
// sockets currently in use will be closed after idle.
195+
func (server *mongoServer) CloseIdle() {
196+
server.close(true)
197+
}
198+
199+
func (server *mongoServer) close(waitForIdle bool) {
190200
server.Lock()
191201
server.closed = true
192202
liveSockets := server.liveSockets
@@ -196,7 +206,11 @@ func (server *mongoServer) Close() {
196206
server.Unlock()
197207
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
198208
for i, s := range liveSockets {
199-
s.Close()
209+
if waitForIdle {
210+
s.CloseAfterIdle()
211+
} else {
212+
s.Close()
213+
}
200214
liveSockets[i] = nil
201215
}
202216
for i := range unusedSockets {

socket.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,20 @@ type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
4040

4141
type mongoSocket struct {
4242
sync.Mutex
43-
server *mongoServer // nil when cached
44-
conn net.Conn
45-
timeout time.Duration
46-
addr string // For debugging only.
47-
nextRequestId uint32
48-
replyFuncs map[uint32]replyFunc
49-
references int
50-
creds []Credential
51-
logout []Credential
52-
cachedNonce string
53-
gotNonce sync.Cond
54-
dead error
55-
serverInfo *mongoServerInfo
43+
server *mongoServer // nil when cached
44+
conn net.Conn
45+
timeout time.Duration
46+
addr string // For debugging only.
47+
nextRequestId uint32
48+
replyFuncs map[uint32]replyFunc
49+
references int
50+
creds []Credential
51+
logout []Credential
52+
cachedNonce string
53+
gotNonce sync.Cond
54+
dead error
55+
serverInfo *mongoServerInfo
56+
closeAfterIdle bool
5657
}
5758

5859
type queryOpFlags uint32
@@ -268,10 +269,13 @@ func (socket *mongoSocket) Release() {
268269
if socket.references == 0 {
269270
stats.socketsInUse(-1)
270271
server := socket.server
272+
closeAfterIdle := socket.closeAfterIdle
271273
socket.Unlock()
272274
socket.LogoutAll()
273-
// If the socket is dead server is nil.
274-
if server != nil {
275+
if closeAfterIdle {
276+
socket.Close()
277+
} else if server != nil {
278+
// If the socket is dead server is nil.
275279
server.RecycleSocket(socket)
276280
}
277281
} else {
@@ -320,6 +324,21 @@ func (socket *mongoSocket) Close() {
320324
socket.kill(errors.New("Closed explicitly"), false)
321325
}
322326

327+
// CloseAfterIdle terminates an idle socket, which has a zero
328+
// reference, or marks the socket to be terminate after idle.
329+
func (socket *mongoSocket) CloseAfterIdle() {
330+
socket.Lock()
331+
if socket.references == 0 {
332+
socket.Unlock()
333+
socket.Close()
334+
logf("Socket %p to %s: idle and close.", socket, socket.addr)
335+
return
336+
}
337+
socket.closeAfterIdle = true
338+
socket.Unlock()
339+
logf("Socket %p to %s: close after idle.", socket, socket.addr)
340+
}
341+
323342
func (socket *mongoSocket) kill(err error, abend bool) {
324343
socket.Lock()
325344
if socket.dead != nil {

0 commit comments

Comments
 (0)