Skip to content

Commit 0a3b372

Browse files
Support shard pointer reset in resume API
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 54af445 commit 0a3b372

File tree

39 files changed

+941
-163
lines changed

39 files changed

+941
-163
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Use QueryCoordinatorContext for the rewrite in validate API. ([#18272](https://github.com/opensearch-project/OpenSearch/pull/18272))
2929
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
3030
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
31+
- Support consumer reset in Resume API for pull-based ingestion ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
3132

3233
### Changed
3334
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testKafkaIngestion_RewindByTimeStamp() {
8585
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
8686
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
8787
.put("ingestion_source.type", "kafka")
88-
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
88+
.put("ingestion_source.pointer.init.reset", "reset_by_timestamp")
8989
// 1739459500000 is the timestamp of the first message
9090
// 1739459800000 is the timestamp of the second message
9191
// by resetting to 1739459600000, only the second message will be ingested
@@ -115,7 +115,7 @@ public void testKafkaIngestion_RewindByOffset() {
115115
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
116116
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
117117
.put("ingestion_source.type", "kafka")
118-
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
118+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
119119
.put("ingestion_source.pointer.init.reset.value", "1")
120120
.put("ingestion_source.param.topic", "test")
121121
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
1717
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
18+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1819
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1920
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
2021
import org.opensearch.action.pagination.PageParams;
@@ -176,6 +177,15 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
176177
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
177178
}
178179

180+
protected ResumeIngestionResponse resumeIngestion(
181+
String index,
182+
int shard,
183+
ResumeIngestionRequest.ResetSettings.ResetMode mode,
184+
String value
185+
) throws ExecutionException, InterruptedException {
186+
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(index, shard, mode, value)).get();
187+
}
188+
179189
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
180190
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
181191
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.kafka;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.apache.kafka.clients.producer.KafkaProducer;
14+
import org.apache.kafka.clients.producer.Producer;
15+
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
18+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
19+
import org.opensearch.action.search.SearchResponse;
20+
import org.opensearch.cluster.metadata.IndexMetadata;
21+
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.query.RangeQueryBuilder;
23+
import org.opensearch.indices.pollingingest.PollingIngestStats;
24+
import org.opensearch.plugins.Plugin;
25+
import org.opensearch.test.OpenSearchSingleNodeTestCase;
26+
import org.opensearch.transport.client.Requests;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
30+
import java.util.Arrays;
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
import java.util.Properties;
34+
import java.util.concurrent.Callable;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
37+
38+
import org.testcontainers.containers.KafkaContainer;
39+
import org.testcontainers.utility.DockerImageName;
40+
41+
/**
42+
* This IT mainly covers high level scenarios for test coverage purpose.
43+
*/
44+
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
45+
public class KafkaSingleNodeIT extends OpenSearchSingleNodeTestCase {
46+
private KafkaContainer kafka;
47+
private Producer<String, String> producer;
48+
private final String topicName = "test";
49+
private final String indexName = "testindex";
50+
private final String mappings = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
51+
52+
@Override
53+
protected Collection<Class<? extends Plugin>> getPlugins() {
54+
return Collections.singleton(KafkaPlugin.class);
55+
}
56+
57+
@Before
58+
public void setup() {
59+
setupKafka();
60+
}
61+
62+
@After
63+
public void cleanup() {
64+
stopKafka();
65+
}
66+
67+
public void testPauseAndResumeAPIs() throws Exception {
68+
produceData("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
69+
produceData("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
70+
71+
createIndex(
72+
indexName,
73+
Settings.builder()
74+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
75+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
76+
.put("ingestion_source.type", "kafka")
77+
.put("ingestion_source.pointer.init.reset", "earliest")
78+
.put("ingestion_source.param.topic", topicName)
79+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
80+
.put("index.replication.type", "SEGMENT")
81+
.build(),
82+
mappings
83+
);
84+
85+
waitForState(() -> {
86+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
87+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
88+
return response.getHits().getTotalHits().value() == 2;
89+
});
90+
91+
// pause ingestion
92+
client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
93+
waitForState(() -> {
94+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
95+
return ingestionState.getFailedShards() == 0
96+
&& Arrays.stream(ingestionState.getShardStates())
97+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
98+
});
99+
100+
produceData("{\"_id\":\"1\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
101+
produceData("{\"_id\":\"2\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
102+
103+
// resume ingestion with offset reset
104+
client().admin()
105+
.indices()
106+
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
107+
.get();
108+
waitForState(() -> {
109+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
110+
return Arrays.stream(ingestionState.getShardStates())
111+
.allMatch(
112+
state -> state.isPollerPaused() == false
113+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
114+
);
115+
});
116+
117+
// validate duplicate messages are skipped
118+
waitForState(() -> {
119+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
120+
.getPollingIngestStats();
121+
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;
122+
});
123+
}
124+
125+
private void setupKafka() {
126+
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
127+
// disable topic auto creation
128+
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
129+
kafka.start();
130+
131+
// setup producer
132+
String boostrapServers = kafka.getBootstrapServers();
133+
KafkaUtils.createTopic(topicName, 1, boostrapServers);
134+
Properties props = new Properties();
135+
props.put("bootstrap.servers", kafka.getBootstrapServers());
136+
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
137+
}
138+
139+
private void stopKafka() {
140+
if (producer != null) {
141+
producer.close();
142+
}
143+
144+
if (kafka != null) {
145+
kafka.stop();
146+
}
147+
}
148+
149+
private void produceData(String payload) {
150+
producer.send(new ProducerRecord<>(topicName, null, 1739459500000L, "null", payload));
151+
}
152+
153+
protected GetIngestionStateResponse getIngestionState(String indexName) throws ExecutionException, InterruptedException {
154+
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexName)).get();
155+
}
156+
157+
protected void waitForState(Callable<Boolean> checkState) throws Exception {
158+
assertBusy(() -> {
159+
if (checkState.call() == false) {
160+
fail("Provided state requirements not met");
161+
}
162+
}, 1, TimeUnit.MINUTES);
163+
}
164+
165+
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
1414
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
15+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1516
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1617
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1718
import org.opensearch.action.pagination.PageParams;
@@ -498,6 +499,132 @@ public void testClusterWriteBlock() throws Exception {
498499
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
499500
}
500501

502+
public void testOffsetUpdateOnBlockErrorPolicy() throws Exception {
503+
// setup nodes and index using block strategy
504+
// produce one invalid message to block the processor
505+
produceData("1", "name1", "21");
506+
produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}");
507+
produceData("2", "name2", "22");
508+
produceData("3", "name3", "24");
509+
produceData("4", "name4", "24");
510+
internalCluster().startClusterManagerOnlyNode();
511+
final String nodeA = internalCluster().startDataOnlyNode();
512+
final String nodeB = internalCluster().startDataOnlyNode();
513+
514+
createIndex(
515+
indexName,
516+
Settings.builder()
517+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
518+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
519+
.put("ingestion_source.type", "kafka")
520+
.put("ingestion_source.error_strategy", "block")
521+
.put("ingestion_source.pointer.init.reset", "earliest")
522+
.put("ingestion_source.internal_queue_size", "1000")
523+
.put("ingestion_source.param.topic", topicName)
524+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
525+
.put("index.replication.type", "SEGMENT")
526+
.build(),
527+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
528+
);
529+
530+
ensureGreen(indexName);
531+
// expect only 1 document to be successfully indexed
532+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
533+
534+
// pause ingestion
535+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
536+
assertTrue(pauseResponse.isAcknowledged());
537+
assertTrue(pauseResponse.isShardsAcknowledged());
538+
waitForState(() -> {
539+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
540+
return ingestionState.getFailedShards() == 0
541+
&& Arrays.stream(ingestionState.getShardStates())
542+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
543+
});
544+
// revalidate that only 1 document is visible
545+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
546+
547+
// update offset to skip past the invalid message
548+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "2");
549+
assertTrue(resumeResponse.isAcknowledged());
550+
assertTrue(resumeResponse.isShardsAcknowledged());
551+
waitForState(() -> {
552+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
553+
return Arrays.stream(ingestionState.getShardStates())
554+
.allMatch(
555+
state -> state.isPollerPaused() == false
556+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
557+
);
558+
});
559+
560+
// validate remaining messages are successfully indexed
561+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
562+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
563+
.getPollingIngestStats();
564+
assertThat(stats.getConsumerStats().totalDuplicateMessageSkippedCount(), is(0L));
565+
}
566+
567+
public void testConsumerResetByTimestamp() throws Exception {
568+
produceData("1", "name1", "21", 100, "index");
569+
produceData("2", "name2", "22", 105, "index");
570+
produceData("3", "name3", "24", 110, "index");
571+
produceData("4", "name4", "24", 120, "index");
572+
internalCluster().startClusterManagerOnlyNode();
573+
final String nodeA = internalCluster().startDataOnlyNode();
574+
final String nodeB = internalCluster().startDataOnlyNode();
575+
576+
createIndex(
577+
indexName,
578+
Settings.builder()
579+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
581+
.put("ingestion_source.type", "kafka")
582+
.put("ingestion_source.error_strategy", "drop")
583+
.put("ingestion_source.pointer.init.reset", "earliest")
584+
.put("ingestion_source.internal_queue_size", "1000")
585+
.put("ingestion_source.param.topic", topicName)
586+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
587+
.put("index.replication.type", "SEGMENT")
588+
.build(),
589+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
590+
);
591+
592+
ensureGreen(indexName);
593+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
594+
595+
// expect error response since ingestion not yet paused
596+
ResumeIngestionResponse resumeResponse = resumeIngestion(
597+
indexName,
598+
0,
599+
ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP,
600+
"100"
601+
);
602+
assertTrue(resumeResponse.isAcknowledged());
603+
assertFalse(resumeResponse.isShardsAcknowledged());
604+
assertEquals(1, resumeResponse.getShardFailures().length);
605+
606+
// pause ingestion
607+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
608+
assertTrue(pauseResponse.isAcknowledged());
609+
assertTrue(pauseResponse.isShardsAcknowledged());
610+
waitForState(() -> {
611+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
612+
return ingestionState.getFailedShards() == 0
613+
&& Arrays.stream(ingestionState.getShardStates())
614+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
615+
});
616+
617+
// reset consumer by a timestamp after first message was produced
618+
resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP, "102");
619+
assertTrue(resumeResponse.isAcknowledged());
620+
assertTrue(resumeResponse.isShardsAcknowledged());
621+
waitForState(() -> {
622+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
623+
.getPollingIngestStats();
624+
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 3;
625+
});
626+
}
627+
501628
private void verifyRemoteStoreEnabled(String node) {
502629
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
503630
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaOffset.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ public byte[] serialize() {
3838
return buffer.array();
3939
}
4040

41-
@Override
42-
public String asString() {
43-
return String.valueOf(offset);
44-
}
45-
4641
@Override
4742
public Field asPointField(String fieldName) {
4843
return new LongPoint(fieldName, offset);
@@ -63,7 +58,7 @@ public long getOffset() {
6358

6459
@Override
6560
public String toString() {
66-
return "KafkaOffset{" + "offset=" + offset + '}';
61+
return String.valueOf(offset);
6762
}
6863

6964
@Override

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaOffsetTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testAsString() {
5151
long offset = 12345L;
5252
KafkaOffset kafkaOffset = new KafkaOffset(offset);
5353

54-
Assert.assertEquals("The string representation should be correct", String.valueOf(offset), kafkaOffset.asString());
54+
Assert.assertEquals("The string representation should be correct", String.valueOf(offset), kafkaOffset.toString());
5555
}
5656

5757
public void testAsPointField() {

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
117117
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
118118
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
119119
.put("ingestion_source.type", "kinesis")
120-
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
120+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
121121
.put("ingestion_source.pointer.init.reset.value", sequenceNumber)
122122
.put("ingestion_source.param.stream", "test")
123123
.put("ingestion_source.param.region", localstack.getRegion())

0 commit comments

Comments
 (0)