Skip to content

Commit 5ec6e9c

Browse files
authored
[Pull-based Ingestion] disable push-API for indexing in ingestionEngine (#17768)
--------- Signed-off-by: Yupeng Fu <[email protected]>
1 parent 3823169 commit 5ec6e9c

File tree

7 files changed

+68
-2
lines changed

7 files changed

+68
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
2828
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
2929
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
30+
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
3031

3132
### Changed
3233
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/OpenSearchServerException.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,5 +1224,13 @@ public static void registerExceptions() {
12241224
V_3_0_0
12251225
)
12261226
);
1227+
registerExceptionHandle(
1228+
new OpenSearchExceptionHandle(
1229+
org.opensearch.index.engine.IngestionEngineException.class,
1230+
org.opensearch.index.engine.IngestionEngineException::new,
1231+
176,
1232+
V_3_0_0
1233+
)
1234+
);
12271235
}
12281236
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,16 @@ protected Set<IngestionShardPointer> fetchPersistedOffsets(DirectoryReader direc
145145

146146
@Override
147147
public IndexResult index(Index index) throws IOException {
148+
throw new IngestionEngineException("push-based indexing is not supported in ingestion engine, use streaming source instead");
149+
}
150+
151+
/**
152+
* Indexes the document into the engine. This is used internally by the stream poller only.
153+
* @param index the index request
154+
* @return the index result
155+
* @throws IOException if an error occurs
156+
*/
157+
public IndexResult indexInternal(Index index) throws IOException {
148158
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
149159
ensureOpen();
150160
final IndexResult indexResult;
@@ -168,7 +178,7 @@ private void addDocs(final List<ParseContext.Document> docs, final IndexWriter i
168178

169179
@Override
170180
public DeleteResult delete(Delete delete) throws IOException {
171-
return null;
181+
throw new IngestionEngineException("push-based deletion is not supported in ingestion engine, use streaming source instead");
172182
}
173183

174184
@Override
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.OpenSearchWrapperException;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.rest.RestStatus;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* Exception thrown when there is an error in the ingestion engine.
20+
*
21+
* @opensearch.internal
22+
*/
23+
public class IngestionEngineException extends OpenSearchException implements OpenSearchWrapperException {
24+
public IngestionEngineException(String message) {
25+
super(message);
26+
}
27+
28+
public IngestionEngineException(StreamInput in) throws IOException {
29+
super(in);
30+
}
31+
32+
@Override
33+
public RestStatus status() {
34+
return RestStatus.BAD_REQUEST;
35+
}
36+
}

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ protected void process(Message message, IngestionShardPointer pointer) {
119119
Engine.Operation operation = getOperation(payload, pointer);
120120
switch (operation.operationType()) {
121121
case INDEX:
122-
engine.index((Engine.Index) operation);
122+
engine.indexInternal((Engine.Index) operation);
123123
break;
124124
case DELETE:
125125
engine.delete((Engine.Delete) operation);

server/src/test/java/org/opensearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.opensearch.core.xcontent.XContentLocation;
8888
import org.opensearch.crypto.CryptoRegistryException;
8989
import org.opensearch.env.ShardLockObtainFailedException;
90+
import org.opensearch.index.engine.IngestionEngineException;
9091
import org.opensearch.index.engine.RecoveryEngineException;
9192
import org.opensearch.index.query.QueryShardException;
9293
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException;
@@ -896,6 +897,7 @@ public void testIds() {
896897
ids.put(173, ViewAlreadyExistsException.class);
897898
ids.put(174, InvalidIndexContextException.class);
898899
ids.put(175, ResponseLimitBreachedException.class);
900+
ids.put(176, IngestionEngineException.class);
899901
ids.put(10001, IndexCreateBlockException.class);
900902

901903
Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();

server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicLong;
3737

38+
import org.mockito.Mockito;
39+
3840
import static org.awaitility.Awaitility.await;
3941
import static org.mockito.ArgumentMatchers.any;
4042
import static org.mockito.Mockito.doThrow;
@@ -128,6 +130,13 @@ public void testRecovery() throws IOException {
128130
waitForResults(ingestionEngine, 4);
129131
}
130132

133+
public void testPushAPIFailures() {
134+
Engine.Index indexMock = Mockito.mock(Engine.Index.class);
135+
assertThrows(IngestionEngineException.class, () -> ingestionEngine.index(indexMock));
136+
Engine.Delete deleteMock = Mockito.mock(Engine.Delete.class);
137+
assertThrows(IngestionEngineException.class, () -> ingestionEngine.delete(deleteMock));
138+
}
139+
131140
public void testCreationFailure() throws IOException {
132141
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
133142
FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);

0 commit comments

Comments
 (0)