@@ -2,7 +2,9 @@ package dht
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"sync"
7
+ "sync/atomic"
6
8
"testing"
7
9
"time"
8
10
@@ -12,7 +14,6 @@ import (
12
14
13
15
func init () {
14
16
DialQueueScalingMutePeriod = 0
15
- DialQueueMaxIdle = 1 * time .Second
16
17
}
17
18
18
19
func TestDialQueueErrorsWithTooManyConsumers (t * testing.T ) {
@@ -41,13 +42,14 @@ func TestDialQueueErrorsWithTooManyConsumers(t *testing.T) {
41
42
}
42
43
43
44
func TestDialQueueGrowsOnSlowDials (t * testing.T ) {
45
+ DialQueueMaxIdle = 10 * time .Minute
46
+
44
47
in := queue .NewChanQueue (context .Background (), queue .NewXORDistancePQ ("test" ))
45
48
hang := make (chan struct {})
46
49
47
- var wg sync.WaitGroup
48
- wg .Add (19 ) // we expect 19 workers
50
+ var cnt int32
49
51
dialFn := func (ctx context.Context , p peer.ID ) error {
50
- wg . Done ( )
52
+ atomic . AddInt32 ( & cnt , 1 )
51
53
<- hang
52
54
return nil
53
55
}
@@ -65,26 +67,25 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
65
67
time .Sleep (100 * time .Millisecond )
66
68
}
67
69
68
- doneCh := make (chan struct {})
70
+ for i := 0 ; i < 20 ; i ++ {
71
+ if atomic .LoadInt32 (& cnt ) > int32 (DialQueueMinParallelism ) {
72
+ return
73
+ }
74
+ time .Sleep (100 * time .Millisecond )
75
+ }
69
76
70
- // wait in a goroutine in case the test fails and we block.
71
- go func () {
72
- defer close (doneCh )
73
- wg .Wait ()
74
- }()
77
+ t .Errorf ("expected 19 concurrent dials, got %d" , atomic .LoadInt32 (& cnt ))
75
78
76
- select {
77
- case <- doneCh :
78
- case <- time .After (2 * time .Second ):
79
- t .Error ("expected 19 concurrent dials, got less" )
80
- }
81
79
}
82
80
83
81
func TestDialQueueShrinksWithNoConsumers (t * testing.T ) {
82
+ // reduce interference from the other shrink path.
83
+ DialQueueMaxIdle = 10 * time .Minute
84
+
84
85
in := queue .NewChanQueue (context .Background (), queue .NewXORDistancePQ ("test" ))
85
86
hang := make (chan struct {})
86
87
87
- var wg sync.WaitGroup
88
+ wg := new ( sync.WaitGroup )
88
89
wg .Add (13 )
89
90
dialFn := func (ctx context.Context , p peer.ID ) error {
90
91
wg .Done ()
@@ -94,48 +95,55 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
94
95
95
96
dq := newDialQueue (context .Background (), "test" , in , dialFn , 3 )
96
97
97
- // Enqueue 13 jobs, one per worker we'll grow to.
98
- for i := 0 ; i < 13 ; i ++ {
99
- in . EnqChan <- peer . ID ( i )
100
- }
98
+ defer func () {
99
+ recover ()
100
+ fmt . Println ( dq . nWorkers )
101
+ }()
101
102
102
103
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
103
104
// and immediately returnable.
104
105
for i := 0 ; i < 3 ; i ++ {
105
106
_ = dq .Consume ()
106
- time .Sleep (100 * time .Millisecond )
107
107
}
108
108
109
- waitForWg (t , & wg , 2 * time .Second )
109
+ // Enqueue 13 jobs, one per worker we'll grow to.
110
+ for i := 0 ; i < 13 ; i ++ {
111
+ in .EnqChan <- peer .ID (i )
112
+ }
113
+
114
+ waitForWg (t , wg , 2 * time .Second )
110
115
111
116
// Release a few dialFn, but not all of them because downscaling happens when workers detect there are no
112
117
// consumers to consume their values. So the other three will be these witnesses.
113
- for i := 0 ; i < 10 ; i ++ {
118
+ for i := 0 ; i < 3 ; i ++ {
114
119
hang <- struct {}{}
115
120
}
116
121
117
122
// allow enough time for signalling and dispatching values to outstanding consumers.
118
- time .Sleep (500 * time .Millisecond )
123
+ time .Sleep (1 * time .Second )
119
124
120
- // unblock the other three .
121
- hang <- struct {}{}
122
- hang <- struct {}{}
123
- hang <- struct {}{ }
125
+ // unblock the rest .
126
+ for i := 0 ; i < 10 ; i ++ {
127
+ hang <- struct {}{}
128
+ }
124
129
130
+ wg = new (sync.WaitGroup )
125
131
// we should now only have 6 workers, because all the shrink events will have been honoured.
126
132
wg .Add (6 )
127
133
128
- // enqueue more jobs
129
- for i := 0 ; i < 20 ; i ++ {
134
+ // enqueue more jobs.
135
+ for i := 0 ; i < 6 ; i ++ {
130
136
in .EnqChan <- peer .ID (i )
131
137
}
132
138
133
139
// let's check we have 6 workers hanging.
134
- waitForWg (t , & wg , 2 * time .Second )
140
+ waitForWg (t , wg , 2 * time .Second )
135
141
}
136
142
137
143
// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
138
- func TestDialQueueShrinksWithInactivity (t * testing.T ) {
144
+ func TestDialQueueShrinksWithWhenIdle (t * testing.T ) {
145
+ DialQueueMaxIdle = 1 * time .Second
146
+
139
147
in := queue .NewChanQueue (context .Background (), queue .NewXORDistancePQ ("test" ))
140
148
hang := make (chan struct {})
141
149
0 commit comments