Skip to content

Commit db5f18b

Browse files
authored
Merge pull request globalsign#16 from globalsign/bugfix/jameinel-max-txn-queue-length
Bound TXN queue lengths
2 parents db81f4c + ad5fa11 commit db5f18b

File tree

4 files changed

+131
-5
lines changed

4 files changed

+131
-5
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
1515
* Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2))
1616
* Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5))
1717
* Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7))
18-
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11))
1918
* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464))
19+
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16))
2020
* Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899))
2121

2222
---

txn/flusher.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,16 @@ NextDoc:
271271
change.Upsert = false
272272
chaos("")
273273
if _, err := cquery.Apply(change, &info); err == nil {
274+
if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength {
275+
// abort with TXN Queue too long, but remove the entry we just added
276+
innerErr := c.UpdateId(dkey.Id,
277+
bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}})
278+
if innerErr != nil {
279+
f.debugf("error while backing out of queue-too-long: %v", innerErr)
280+
}
281+
return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)",
282+
dkey.Id, dkey.C, len(info.Queue))
283+
}
274284
if info.Remove == "" {
275285
// Fast path, unless workload is insert/remove heavy.
276286
revno[dkey] = info.Revno

txn/txn.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,14 @@ const (
217217
// A Runner applies operations as part of a transaction onto any number
218218
// of collections within a database. See the Run method for details.
219219
type Runner struct {
220-
tc *mgo.Collection // txns
221-
sc *mgo.Collection // stash
222-
lc *mgo.Collection // log
220+
tc *mgo.Collection // txns
221+
sc *mgo.Collection // stash
222+
lc *mgo.Collection // log
223+
opts RunnerOptions // runtime options
223224
}
224225

226+
const defaultMaxTxnQueueLength = 1000
227+
225228
// NewRunner returns a new transaction runner that uses tc to hold its
226229
// transactions.
227230
//
@@ -233,7 +236,36 @@ type Runner struct {
233236
// will be used for implementing the transactional behavior of insert
234237
// and remove operations.
235238
func NewRunner(tc *mgo.Collection) *Runner {
236-
return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
239+
return &Runner{
240+
tc: tc,
241+
sc: tc.Database.C(tc.Name + ".stash"),
242+
lc: nil,
243+
opts: DefaultRunnerOptions(),
244+
}
245+
}
246+
247+
// RunnerOptions encapsulates ways you can tweak transaction Runner behavior.
248+
type RunnerOptions struct {
249+
// MaxTxnQueueLength is a way to limit bad behavior. Many operations on
250+
// transaction queues are O(N^2), and transaction queues growing too large
251+
// are usually indicative of a bug in behavior. This should be larger
252+
// than the maximum number of concurrent operations to a single document.
253+
// Normal operations are likely to only ever hit 10 or so, we use a default
254+
// maximum length of 1000.
255+
MaxTxnQueueLength int
256+
}
257+
258+
// SetOptions allows people to change some of the internal behavior of a Runner.
259+
func (r *Runner) SetOptions(opts RunnerOptions) {
260+
r.opts = opts
261+
}
262+
263+
// DefaultRunnerOptions defines default behavior for a Runner.
264+
// Users can use the DefaultRunnerOptions to only override specific behavior.
265+
func DefaultRunnerOptions() RunnerOptions {
266+
return RunnerOptions{
267+
MaxTxnQueueLength: defaultMaxTxnQueueLength,
268+
}
237269
}
238270

239271
var ErrAborted = fmt.Errorf("transaction aborted")

txn/txn_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,90 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
621621
}
622622
}
623623

624+
func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) {
625+
txn.SetDebug(false)
626+
txn.SetChaos(txn.Chaos{
627+
KillChance: 1,
628+
Breakpoint: "set-applying",
629+
})
630+
defer txn.SetChaos(txn.Chaos{})
631+
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
632+
c.Assert(err, IsNil)
633+
ops := []txn.Op{{
634+
C: "accounts",
635+
Id: 0,
636+
Update: M{"$inc": M{"balance": 100}},
637+
}}
638+
for i := 0; i < expectedQueueLength; i++ {
639+
err := s.runner.Run(ops, "", nil)
640+
c.Assert(err, Equals, txn.ErrChaos)
641+
}
642+
txn.SetDebug(true)
643+
// Now that we've filled up the queue, we should see that there are 1000
644+
// items in the queue, and the error applying a new one will change.
645+
var doc bson.M
646+
err = s.accounts.FindId(0).One(&doc)
647+
c.Assert(err, IsNil)
648+
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
649+
err = s.runner.Run(ops, "", nil)
650+
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`)
651+
// The txn-queue should not have grown
652+
err = s.accounts.FindId(0).One(&doc)
653+
c.Assert(err, IsNil)
654+
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
655+
}
656+
657+
func (s *S) TestTxnQueueDefaultMaxSize(c *C) {
658+
s.runner.SetOptions(txn.DefaultRunnerOptions())
659+
s.checkTxnQueueLength(c, 1000)
660+
}
661+
662+
func (s *S) TestTxnQueueCustomMaxSize(c *C) {
663+
opts := txn.DefaultRunnerOptions()
664+
opts.MaxTxnQueueLength = 100
665+
s.runner.SetOptions(opts)
666+
s.checkTxnQueueLength(c, 100)
667+
}
668+
669+
func (s *S) TestTxnQueueUnlimited(c *C) {
670+
opts := txn.DefaultRunnerOptions()
671+
// A value of 0 should mean 'unlimited'
672+
opts.MaxTxnQueueLength = 0
673+
s.runner.SetOptions(opts)
674+
// it isn't possible to actually prove 'unlimited' but we can prove that
675+
// we at least can insert more than the default number of transactions
676+
// without getting a 'too many transactions' failure.
677+
txn.SetDebug(false)
678+
txn.SetChaos(txn.Chaos{
679+
KillChance: 1,
680+
// Use set-prepared because we are adding more transactions than
681+
// other tests, and this speeds up setup time a bit
682+
Breakpoint: "set-prepared",
683+
})
684+
defer txn.SetChaos(txn.Chaos{})
685+
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
686+
c.Assert(err, IsNil)
687+
ops := []txn.Op{{
688+
C: "accounts",
689+
Id: 0,
690+
Update: M{"$inc": M{"balance": 100}},
691+
}}
692+
for i := 0; i < 1100; i++ {
693+
err := s.runner.Run(ops, "", nil)
694+
c.Assert(err, Equals, txn.ErrChaos)
695+
}
696+
txn.SetDebug(true)
697+
var doc bson.M
698+
err = s.accounts.FindId(0).One(&doc)
699+
c.Assert(err, IsNil)
700+
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100)
701+
err = s.runner.Run(ops, "", nil)
702+
c.Check(err, Equals, txn.ErrChaos)
703+
err = s.accounts.FindId(0).One(&doc)
704+
c.Assert(err, IsNil)
705+
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101)
706+
}
707+
624708
func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
625709
// This test ensures that PurgeMissing can handle very large
626710
// txn-queue fields. Previous iterations of PurgeMissing would

0 commit comments

Comments
 (0)