Skip to content

Commit 9035f50

Browse files
authored
[Pull-based ingestion] make maxPollSize and pollTimeout in IngestionSource configurable (#17863)
* make maxPollSize and pollTimeout in IngestionSource configurable Signed-off-by: Yupeng Fu <[email protected]> * changelog Signed-off-by: Yupeng Fu <[email protected]> * comment Signed-off-by: Yupeng Fu <[email protected]> --------- Signed-off-by: Yupeng Fu <[email protected]>
1 parent 155f892 commit 9035f50

File tree

6 files changed

+86
-8
lines changed

6 files changed

+86
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3232
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
3333
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
3434
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))
35+
- Allow maxPollSize and pollTimeout in IngestionSource to be configurable ([#17863](https://github.com/opensearch-project/OpenSearch/pull/17863))
3536

3637
### Changed
3738
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,30 @@ public Iterator<Setting<?>> settings() {
807807
Property.Dynamic
808808
);
809809

810+
/**
811+
* Defines the max poll size per batch for pull-based ingestion.
812+
*/
813+
public static final String SETTING_INGESTION_SOURCE_MAX_POLL_SIZE = "index.ingestion_source.poll.max_batch_size";
814+
public static final Setting<Long> INGESTION_SOURCE_MAX_POLL_SIZE = Setting.longSetting(
815+
SETTING_INGESTION_SOURCE_MAX_POLL_SIZE,
816+
1000,
817+
0,
818+
Property.IndexScope,
819+
Property.Dynamic
820+
);
821+
822+
/**
823+
* Defines the poll timeout for pull-based ingestion in milliseconds.
824+
*/
825+
public static final String SETTING_INGESTION_SOURCE_POLL_TIMEOUT = "index.ingestion_source.poll.timeout";
826+
public static final Setting<Integer> INGESTION_SOURCE_POLL_TIMEOUT = Setting.intSetting(
827+
SETTING_INGESTION_SOURCE_POLL_TIMEOUT,
828+
1000,
829+
0,
830+
Property.IndexScope,
831+
Property.Dynamic
832+
);
833+
810834
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
811835
"index.ingestion_source.param.",
812836
key -> new Setting<>(key, "", (value) -> {
@@ -1047,9 +1071,13 @@ public IngestionSource getIngestionSource() {
10471071

10481072
final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
10491073
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
1074+
final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings);
1075+
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
10501076
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
10511077
.setPointerInitReset(pointerInitReset)
10521078
.setErrorStrategy(errorStrategy)
1079+
.setMaxPollSize(maxPollSize)
1080+
.setPollTimeout(pollTimeout)
10531081
.build();
10541082
}
10551083
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@
99
package org.opensearch.cluster.metadata;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.common.settings.Settings;
1213
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1314
import org.opensearch.indices.pollingingest.StreamPoller;
1415

1516
import java.util.HashMap;
1617
import java.util.Map;
1718
import java.util.Objects;
1819

20+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
21+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
22+
1923
/**
2024
* Class encapsulating the configuration of an ingestion source.
2125
*/
@@ -25,17 +29,23 @@ public class IngestionSource {
2529
private final PointerInitReset pointerInitReset;
2630
private final IngestionErrorStrategy.ErrorStrategy errorStrategy;
2731
private final Map<String, Object> params;
32+
private final long maxPollSize;
33+
private final int pollTimeout;
2834

2935
private IngestionSource(
3036
String type,
3137
PointerInitReset pointerInitReset,
3238
IngestionErrorStrategy.ErrorStrategy errorStrategy,
33-
Map<String, Object> params
39+
Map<String, Object> params,
40+
long maxPollSize,
41+
int pollTimeout
3442
) {
3543
this.type = type;
3644
this.pointerInitReset = pointerInitReset;
3745
this.params = params;
3846
this.errorStrategy = errorStrategy;
47+
this.maxPollSize = maxPollSize;
48+
this.pollTimeout = pollTimeout;
3949
}
4050

4151
public String getType() {
@@ -54,6 +64,14 @@ public Map<String, Object> params() {
5464
return params;
5565
}
5666

67+
public long getMaxPollSize() {
68+
return maxPollSize;
69+
}
70+
71+
public int getPollTimeout() {
72+
return pollTimeout;
73+
}
74+
5775
@Override
5876
public boolean equals(Object o) {
5977
if (this == o) return true;
@@ -62,12 +80,14 @@ public boolean equals(Object o) {
6280
return Objects.equals(type, ingestionSource.type)
6381
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
6482
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
65-
&& Objects.equals(params, ingestionSource.params);
83+
&& Objects.equals(params, ingestionSource.params)
84+
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
85+
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout);
6686
}
6787

6888
@Override
6989
public int hashCode() {
70-
return Objects.hash(type, pointerInitReset, params, errorStrategy);
90+
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout);
7191
}
7292

7393
@Override
@@ -84,6 +104,10 @@ public String toString() {
84104
+ '\''
85105
+ ", params="
86106
+ params
107+
+ ", maxPollSize="
108+
+ maxPollSize
109+
+ ", pollTimeout="
110+
+ pollTimeout
87111
+ '}';
88112
}
89113

@@ -137,6 +161,8 @@ public static class Builder {
137161
private PointerInitReset pointerInitReset;
138162
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
139163
private Map<String, Object> params;
164+
private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY);
165+
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);
140166

141167
public Builder(String type) {
142168
this.type = type;
@@ -165,13 +191,23 @@ public Builder setParams(Map<String, Object> params) {
165191
return this;
166192
}
167193

194+
public Builder setMaxPollSize(long maxPollSize) {
195+
this.maxPollSize = maxPollSize;
196+
return this;
197+
}
198+
168199
public Builder addParam(String key, Object value) {
169200
this.params.put(key, value);
170201
return this;
171202
}
172203

204+
public Builder setPollTimeout(int pollTimeout) {
205+
this.pollTimeout = pollTimeout;
206+
return this;
207+
}
208+
173209
public IngestionSource build() {
174-
return new IngestionSource(type, pointerInitReset, errorStrategy, params);
210+
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout);
175211
}
176212

177213
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ public void start() {
121121
resetState,
122122
resetValue,
123123
ingestionErrorStrategy,
124-
initialPollerState
124+
initialPollerState,
125+
ingestionSource.getMaxPollSize(),
126+
ingestionSource.getPollTimeout()
125127
);
126128
streamPoller.start();
127129
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
public class DefaultStreamPoller implements StreamPoller {
3333
private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);
3434

35-
// TODO: make this configurable
3635
public static final long MAX_POLL_SIZE = 1000;
3736
public static final int POLL_TIMEOUT = 1000;
3837

@@ -77,7 +76,9 @@ public DefaultStreamPoller(
7776
ResetState resetState,
7877
String resetValue,
7978
IngestionErrorStrategy errorStrategy,
80-
State initialState
79+
State initialState,
80+
long maxPollSize,
81+
int pollTimeout
8182
) {
8283
this(
8384
startPointer,

server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public void testConstructorAndGetters() {
3636
assertEquals("1000", source.getPointerInitReset().getValue());
3737
assertEquals(DROP, source.getErrorStrategy());
3838
assertEquals(params, source.params());
39+
assertEquals(1000, source.getMaxPollSize());
40+
assertEquals(1000, source.getPollTimeout());
3941
}
4042

4143
public void testEquals() {
@@ -44,13 +46,17 @@ public void testEquals() {
4446
IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1)
4547
.setPointerInitReset(pointerInitReset)
4648
.setErrorStrategy(DROP)
49+
.setMaxPollSize(500)
50+
.setPollTimeout(500)
4751
.build();
4852

4953
Map<String, Object> params2 = new HashMap<>();
5054
params2.put("key", "value");
5155
IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2)
5256
.setPointerInitReset(pointerInitReset)
5357
.setErrorStrategy(DROP)
58+
.setMaxPollSize(500)
59+
.setPollTimeout(500)
5460
.build();
5561
assertTrue(source1.equals(source2));
5662
assertTrue(source2.equals(source1));
@@ -68,13 +74,17 @@ public void testHashCode() {
6874
IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1)
6975
.setPointerInitReset(pointerInitReset)
7076
.setErrorStrategy(DROP)
77+
.setMaxPollSize(500)
78+
.setPollTimeout(500)
7179
.build();
7280

7381
Map<String, Object> params2 = new HashMap<>();
7482
params2.put("key", "value");
7583
IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2)
7684
.setPointerInitReset(pointerInitReset)
7785
.setErrorStrategy(DROP)
86+
.setMaxPollSize(500)
87+
.setPollTimeout(500)
7888
.build();
7989
assertEquals(source1.hashCode(), source2.hashCode());
8090

@@ -93,7 +103,7 @@ public void testToString() {
93103
.setErrorStrategy(DROP)
94104
.build();
95105
String expected =
96-
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}}";
106+
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000}";
97107
assertEquals(expected, source.toString());
98108
}
99109
}

0 commit comments

Comments
 (0)