Skip to content

Commit c6f578b

Browse files
Create builder for IngestionState
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent b77d9bd commit c6f578b

File tree

9 files changed

+229
-21
lines changed

9 files changed

+229
-21
lines changed
Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import com.carrotsearch.randomizedtesting.ThreadFilter;
1112
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
1213

1314
import org.apache.kafka.clients.producer.KafkaProducer;
1415
import org.apache.kafka.clients.producer.Producer;
1516
import org.apache.kafka.clients.producer.ProducerRecord;
1617
import org.apache.kafka.common.serialization.StringSerializer;
1718
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
19+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1820
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1921
import org.opensearch.action.search.SearchResponse;
2022
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -38,8 +40,8 @@
3840
import org.testcontainers.containers.KafkaContainer;
3941
import org.testcontainers.utility.DockerImageName;
4042

41-
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
42-
public class KafkaSingleNodeIT extends OpenSearchSingleNodeTestCase {
43+
@ThreadLeakFilters(filters = KafkaSingleNodeTests.TestContainerThreadLeakFilter.class)
44+
public class KafkaSingleNodeTests extends OpenSearchSingleNodeTestCase {
4345
private KafkaContainer kafka;
4446
private Producer<String, String> producer;
4547
private final String topicName = "test";
@@ -78,13 +80,22 @@ public void testPauseAndResumeAPIs() throws Exception {
7880
.build(),
7981
mappings
8082
);
83+
ensureGreen(indexName);
8184

8285
waitForState(() -> {
8386
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
8487
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
8588
return response.getHits().getTotalHits().value() == 2;
8689
});
8790

91+
ResumeIngestionResponse resumeResponse = client().admin()
92+
.indices()
93+
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
94+
.get();
95+
assertTrue(resumeResponse.isAcknowledged());
96+
assertFalse(resumeResponse.isShardsAcknowledged());
97+
assertEquals(1, resumeResponse.getShardFailures().length);
98+
8899
// pause ingestion
89100
client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
90101
waitForState(() -> {
@@ -159,4 +170,12 @@ protected void waitForState(Callable<Boolean> checkState) throws Exception {
159170
}, 1, TimeUnit.MINUTES);
160171
}
161172

173+
public static final class TestContainerThreadLeakFilter implements ThreadFilter {
174+
@Override
175+
public boolean reject(Thread t) {
176+
return t.getName().startsWith("testcontainers-pull-watchdog-")
177+
|| t.getName().startsWith("testcontainers-ryuk")
178+
|| t.getName().startsWith("stream-poller-consumer");
179+
}
180+
}
162181
}

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,18 @@ protected ShardIngestionState shardOperation(UpdateIngestionStateRequest request
142142
StreamPoller.ResetState resetState = getStreamPollerResetState(resetSettings);
143143
String resetValue = resetSettings != null ? resetSettings.getValue() : null;
144144
if (resetState != null && resetValue != null) {
145-
indexShard.updateShardIngestionState(new IngestionSettings(null, resetState, resetValue));
145+
IngestionSettings ingestionSettings = IngestionSettings.builder()
146+
.setResetState(resetState)
147+
.setResetValue(resetValue)
148+
.build();
149+
indexShard.updateShardIngestionState(ingestionSettings);
146150
}
147151
}
148152

149153
// update ingestion state
150154
if (request.getIngestionPaused() != null) {
151-
indexShard.updateShardIngestionState(new IngestionSettings(request.getIngestionPaused(), null, null));
155+
IngestionSettings ingestionSettings = IngestionSettings.builder().setIsPaused(request.getIngestionPaused()).build();
156+
indexShard.updateShardIngestionState(ingestionSettings);
152157
}
153158

154159
return indexShard.getIngestionState();

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,12 @@ private void validateDocumentVersion(final Operation operation) throws IOExcepti
545545
*/
546546
public void updateIngestionSettings(IngestionSettings ingestionSettings) {
547547
// reset poller position and reinitialize poller
548-
if (ingestionSettings.resetState() != null && ingestionSettings.resetValue() != null) {
549-
resetStreamPoller(ingestionSettings.resetState(), ingestionSettings.resetValue());
548+
if (ingestionSettings.getResetState() != null && ingestionSettings.getResetValue() != null) {
549+
resetStreamPoller(ingestionSettings.getResetState(), ingestionSettings.getResetValue());
550550
}
551551

552552
// update ingestion state
553-
if (ingestionSettings.isPaused() != null) {
553+
if (ingestionSettings.getIsPaused() != null) {
554554
updateIngestionState(ingestionSettings);
555555
}
556556
}
@@ -559,7 +559,7 @@ public void updateIngestionSettings(IngestionSettings ingestionSettings) {
559559
* Update ingestion state of the poller.
560560
*/
561561
private void updateIngestionState(IngestionSettings ingestionSettings) {
562-
if (ingestionSettings.isPaused()) {
562+
if (ingestionSettings.getIsPaused()) {
563563
streamPoller.pause();
564564
} else {
565565
streamPoller.resume();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5486,7 +5486,10 @@ public void updateShardIngestionState(IndexMetadata indexMetadata) {
54865486
return;
54875487
}
54885488

5489-
updateShardIngestionState(new IngestionSettings(indexMetadata.getIngestionStatus().isPaused(), null, null));
5489+
IngestionSettings ingestionSettings = IngestionSettings.builder()
5490+
.setIsPaused(indexMetadata.getIngestionStatus().isPaused())
5491+
.build();
5492+
updateShardIngestionState(ingestionSettings);
54905493
}
54915494

54925495
/**

server/src/main/java/org/opensearch/indices/pollingingest/IngestionSettings.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,77 @@
1313

1414
/**
1515
* Holds the ingestion settings required to update the poller. All values are optional to support partial update.
16-
* @param isPaused Indicates if poller needs to be paused or resumed.
17-
* @param resetState Optional. Indicates target reset state of the poller.
18-
* @param resetValue Optional. Indicates target reset value (offset/timestamp/sequence number etc).
1916
*/
2017
@ExperimentalApi
21-
public record IngestionSettings(@Nullable Boolean isPaused, @Nullable StreamPoller.ResetState resetState, @Nullable String resetValue) {
18+
public class IngestionSettings {
19+
20+
// Indicates if poller needs to be paused or resumed.
21+
@Nullable
22+
private final Boolean isPaused;
23+
24+
// Indicates target reset state of the poller.
25+
@Nullable
26+
private final StreamPoller.ResetState resetState;
27+
28+
// Indicates target reset value (offset/timestamp/sequence number etc).
29+
@Nullable
30+
private final String resetValue;
31+
32+
private IngestionSettings(Builder builder) {
33+
this.isPaused = builder.isPaused;
34+
this.resetState = builder.resetState;
35+
this.resetValue = builder.resetValue;
36+
}
37+
38+
@Nullable
39+
public Boolean getIsPaused() {
40+
return isPaused;
41+
}
42+
43+
@Nullable
44+
public StreamPoller.ResetState getResetState() {
45+
return resetState;
46+
}
47+
48+
@Nullable
49+
public String getResetValue() {
50+
return resetValue;
51+
}
52+
53+
public static Builder builder() {
54+
return new Builder();
55+
}
56+
57+
/**
58+
* Builder for IngestionSettings. Only set the fields that need to be used for updating ingestion state.
59+
*/
60+
@ExperimentalApi
61+
public static class Builder {
62+
@Nullable
63+
private Boolean isPaused;
64+
@Nullable
65+
private StreamPoller.ResetState resetState;
66+
@Nullable
67+
private String resetValue;
68+
69+
public Builder setIsPaused(Boolean isPaused) {
70+
this.isPaused = isPaused;
71+
return this;
72+
}
73+
74+
public Builder setResetState(StreamPoller.ResetState resetState) {
75+
this.resetState = resetState;
76+
return this;
77+
}
78+
79+
public Builder setResetValue(String resetValue) {
80+
this.resetValue = resetValue;
81+
return this;
82+
}
83+
84+
public IngestionSettings build() {
85+
return new IngestionSettings(this);
86+
}
87+
}
88+
2289
}

server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/resume/ResumeIngestionRequestTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,37 @@ public void testFromXContentMissingInput() {
149149
);
150150
}
151151

152+
public void testFromXContentInvalidField() {
153+
String json = """
154+
{
155+
"reset_settings": [
156+
{
157+
"unknown_field": 0,
158+
"value": "123"
159+
}
160+
]
161+
}
162+
""";
163+
164+
assertThrows(
165+
IllegalArgumentException.class,
166+
() -> ResumeIngestionRequest.fromXContent(new String[] { "index" }, createParser(json))
167+
);
168+
}
169+
170+
public void testFromXContentInvalidJson() {
171+
String json = """
172+
{
173+
"reset_settings": {}
174+
}
175+
""";
176+
177+
assertThrows(
178+
IllegalArgumentException.class,
179+
() -> ResumeIngestionRequest.fromXContent(new String[] { "index" }, createParser(json))
180+
);
181+
}
182+
152183
private XContentParser createParser(String json) throws IOException {
153184
return createParser(JsonXContent.jsonXContent, new BytesArray(json).streamInput());
154185
}

server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,11 @@ public void testIngestionStateUpdate() {
197197
waitForResults(ingestionEngine, 6);
198198

199199
// pause ingestion
200-
ingestionEngine.updateIngestionSettings(new IngestionSettings(true, null, null));
200+
ingestionEngine.updateIngestionSettings(IngestionSettings.builder().setIsPaused(true).build());
201201
// resume ingestion with offset reset
202-
ingestionEngine.updateIngestionSettings(new IngestionSettings(null, StreamPoller.ResetState.RESET_BY_OFFSET, "0"));
202+
ingestionEngine.updateIngestionSettings(
203+
IngestionSettings.builder().setIsPaused(false).setResetState(StreamPoller.ResetState.RESET_BY_OFFSET).setResetValue("0").build()
204+
);
203205
ShardIngestionState resumedIngestionState = ingestionEngine.getIngestionState();
204206
assertEquals(false, resumedIngestionState.isPollerPaused());
205207

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.rest.action.admin.indices;
34+
35+
import org.opensearch.rest.RestRequest;
36+
import org.opensearch.test.OpenSearchTestCase;
37+
import org.opensearch.test.rest.FakeRestRequest;
38+
import org.opensearch.transport.client.node.NodeClient;
39+
40+
import java.io.IOException;
41+
import java.util.HashMap;
42+
import java.util.Map;
43+
44+
import static org.mockito.Mockito.mock;
45+
46+
public class RestResumeIngestionActionTests extends OpenSearchTestCase {
47+
48+
private RestResumeIngestionAction action;
49+
50+
@Override
51+
public void setUp() throws Exception {
52+
super.setUp();
53+
action = new RestResumeIngestionAction();
54+
}
55+
56+
public void testPrepareRequest() throws IOException {
57+
Map<String, String> params = new HashMap<>();
58+
params.put("index", "test-index");
59+
params.put("cluster_manager_timeout", "30s");
60+
params.put("timeout", "60s");
61+
62+
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("/test-index/ingestion/_resume")
63+
.withMethod(RestRequest.Method.POST)
64+
.withParams(params)
65+
.build();
66+
67+
NodeClient client = mock(NodeClient.class);
68+
assertNotNull(action.prepareRequest(request, client));
69+
}
70+
71+
public void testRoutes() {
72+
assertEquals(1, action.routes().size());
73+
assertEquals("/" + "{index}" + "/ingestion/_resume", action.routes().get(0).getPath());
74+
assertEquals(RestRequest.Method.POST, action.routes().get(0).getMethod());
75+
}
76+
77+
public void testGetName() {
78+
assertEquals("resume_ingestion_action", action.getName());
79+
}
80+
81+
}

test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,13 @@ protected IndexService createIndexWithSimpleMappings(String index, Settings sett
382382
/**
383383
* Create a new index on the singleton node with the provided index settings and mappings source.
384384
*/
385-
protected IndexService createIndexWithMappingSource(String index, Settings settings, String mappingSource) {
386-
CreateIndexRequestBuilder createIndexRequestBuilder = client().admin().indices().prepareCreate(index).setSettings(settings);
387-
if (mappingSource != null) {
388-
createIndexRequestBuilder.setMapping(mappingSource);
389-
}
390-
return createIndex(index, createIndexRequestBuilder);
385+
protected IndexService createIndexWithMappingSource(String index, Settings settings, String mappingSource) {
386+
CreateIndexRequestBuilder createIndexRequestBuilder = client().admin().indices().prepareCreate(index).setSettings(settings);
387+
if (mappingSource != null) {
388+
createIndexRequestBuilder.setMapping(mappingSource);
391389
}
390+
return createIndex(index, createIndexRequestBuilder);
391+
}
392392

393393
protected IndexService createIndex(String index, CreateIndexRequestBuilder createIndexRequestBuilder) {
394394
assertAcked(createIndexRequestBuilder.get());

0 commit comments

Comments
 (0)