Skip to content

Commit 6f52602

Browse files
Merge branch 'opensearch-project:main' into FeatureRequest17599
2 parents a979b7b + 20d56d2 commit 6f52602

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ public void run() {
313313
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
314314
boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException;
315315
if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
316+
logDroppedMessage(shardUpdateMessage);
316317
shardUpdateMessage = null;
317318
retryCount = 0;
318319
messageProcessorMetrics.failedMessageDroppedCounter.inc();
@@ -360,6 +361,13 @@ public void close() {
360361
closed = true;
361362
}
362363

364+
private void logDroppedMessage(ShardUpdateMessage shardUpdateMessage) {
365+
String id = shardUpdateMessage.autoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP
366+
? (String) shardUpdateMessage.parsedPayloadMap().get(ID)
367+
: "null";
368+
logger.warn("Exhausted retries, dropping message: _id:{}, pointer:{}", id, shardUpdateMessage.pointer().asString());
369+
}
370+
363371
/**
364372
* Tracks MessageProcessor metrics.
365373
*/

server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.indices.pollingingest;
1010

1111
import org.opensearch.action.DocWriteRequest;
12+
import org.opensearch.index.IngestionShardPointer;
1213
import org.opensearch.index.Message;
1314
import org.opensearch.index.engine.Engine;
1415
import org.opensearch.index.engine.FakeIngestionSource;
@@ -23,6 +24,7 @@
2324

2425
import java.io.IOException;
2526
import java.nio.charset.StandardCharsets;
27+
import java.util.Collections;
2628
import java.util.concurrent.ArrayBlockingQueue;
2729
import java.util.concurrent.TimeUnit;
2830

@@ -240,29 +242,28 @@ public void testMessageRetrySuccess() throws Exception {
240242
thread.interrupt();
241243
}
242244

243-
public void testMessageRetryFail() throws Exception {
245+
public void testDropPolicyMessageRetryFail() throws Exception {
244246
MessageProcessorRunnable.MessageProcessor processor = mock(MessageProcessorRunnable.MessageProcessor.class);
245247
DropIngestionErrorStrategy errorStrategy = new DropIngestionErrorStrategy("ingestion_source");
246248
MessageProcessorRunnable messageProcessorRunnable = new MessageProcessorRunnable(
247249
new ArrayBlockingQueue<>(5),
248250
processor,
249251
errorStrategy
250252
);
251-
messageProcessorRunnable.getBlockingQueue().put(new ShardUpdateMessage(null, null, null, 0));
253+
messageProcessorRunnable.getBlockingQueue()
254+
.put(new ShardUpdateMessage(mock(IngestionShardPointer.class), null, Collections.emptyMap(), 0));
255+
messageProcessorRunnable.getBlockingQueue()
256+
.put(new ShardUpdateMessage(mock(IngestionShardPointer.class), null, Collections.emptyMap(), -1));
252257

253-
doThrow(new RuntimeException()).doThrow(new RuntimeException())
254-
.doThrow(new RuntimeException())
255-
.doNothing()
256-
.when(processor)
257-
.process(any(), any());
258+
doThrow(new RuntimeException()).when(processor).process(any(), any());
258259

259260
Thread thread = new Thread(messageProcessorRunnable::run);
260261
thread.start();
261262
assertBusy(() -> {
262-
verify(processor, times(3)).process(any(), any());
263-
assertEquals(1, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageDroppedCounter().count());
264-
assertEquals(3, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageCounter().count());
265-
}, 1, TimeUnit.MINUTES);
263+
verify(processor, times(6)).process(any(), any());
264+
assertEquals(2, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageDroppedCounter().count());
265+
assertEquals(6, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageCounter().count());
266+
}, 2, TimeUnit.MINUTES);
266267

267268
messageProcessorRunnable.close();
268269
thread.interrupt();

0 commit comments

Comments
 (0)