44
44
import org .apache .rocketmq .common .message .Message ;
45
45
import org .apache .rocketmq .common .message .MessageQueue ;
46
46
import org .apache .rocketmq .common .topic .TopicValidator ;
47
+ import org .apache .rocketmq .common .utils .ThreadUtils ;
47
48
import org .apache .rocketmq .logging .org .slf4j .Logger ;
48
49
import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
49
50
import org .apache .rocketmq .remoting .RPCHook ;
@@ -54,6 +55,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
54
55
private static final Logger log = LoggerFactory .getLogger (AsyncTraceDispatcher .class );
55
56
private static final AtomicInteger COUNTER = new AtomicInteger ();
56
57
private static final AtomicInteger INSTANCE_NUM = new AtomicInteger (0 );
58
+ private static final long WAIT_FOR_SHUTDOWN = 5000L ;
57
59
private volatile boolean stopped = false ;
58
60
private final int traceInstanceId = INSTANCE_NUM .getAndIncrement ();
59
61
private final int batchNum ;
@@ -190,23 +192,19 @@ public boolean append(final Object ctx) {
190
192
191
193
@ Override
192
194
public void flush () {
193
- long end = System .currentTimeMillis () + 500 ;
194
- while (traceContextQueue .size () > 0 || appenderQueue .size () > 0 && System .currentTimeMillis () <= end ) {
195
+ while (traceContextQueue .size () > 0 ) {
195
196
try {
196
197
flushTraceContext (true );
197
198
} catch (Throwable throwable ) {
198
199
log .error ("flushTraceContext error" , throwable );
199
200
}
200
201
}
201
- if (appenderQueue .size () > 0 ) {
202
- log .error ("There are still some traces that haven't been sent " + traceContextQueue .size () + " " + appenderQueue .size ());
203
- }
204
202
}
205
203
206
204
@ Override
207
205
public void shutdown () {
208
206
flush ();
209
- this .traceExecutor . shutdown ( );
207
+ ThreadUtils . shutdownGracefully ( this .traceExecutor , WAIT_FOR_SHUTDOWN , TimeUnit . MILLISECONDS );
210
208
if (isStarted .get ()) {
211
209
traceProducer .shutdown ();
212
210
}
0 commit comments