@@ -19,6 +19,7 @@ import (
19
19
"fmt"
20
20
"sync"
21
21
"testing"
22
+ "time"
22
23
23
24
"github.com/Shopify/sarama"
24
25
"github.com/stretchr/testify/assert"
@@ -72,7 +73,7 @@ func TestReceiverStartConsume(t *testing.T) {
72
73
}
73
74
74
75
func TestReceiver_error (t * testing.T ) {
75
- zcore , o := observer .New (zapcore .ErrorLevel )
76
+ zcore , logObserver := observer .New (zapcore .ErrorLevel )
76
77
logger := zap .New (zcore )
77
78
78
79
expectedErr := fmt .Errorf ("handler error" )
@@ -86,7 +87,10 @@ func TestReceiver_error(t *testing.T) {
86
87
err := c .Start (context .Background (), nil )
87
88
require .NoError (t , err )
88
89
c .Shutdown (context .Background ())
89
- assert .True (t , o .FilterField (zap .Error (expectedErr )).Len () > 0 )
90
+ waitUntil (func () bool {
91
+ return logObserver .FilterField (zap .Error (expectedErr )).Len () > 0
92
+ }, 100 , time .Millisecond * 100 )
93
+ assert .True (t , logObserver .FilterField (zap .Error (expectedErr )).Len () > 0 )
90
94
}
91
95
92
96
func TestConsumerGroupHandler (t * testing.T ) {
@@ -278,3 +282,12 @@ func (t testConsumerGroup) Errors() <-chan error {
278
282
func (t testConsumerGroup ) Close () error {
279
283
return nil
280
284
}
285
+
286
+ func waitUntil (f func () bool , iterations int , sleepInterval time.Duration ) {
287
+ for i := 0 ; i < iterations ; i ++ {
288
+ if f () {
289
+ return
290
+ }
291
+ time .Sleep (sleepInterval )
292
+ }
293
+ }
0 commit comments