Skip to content

Commit c0c76e6

Browse files
Passing IndicesService to ingest processor factory with processor params (#10307)
* Passing IngestService to processor factory with processor params Signed-off-by: Martin Gaievski <[email protected]> * Fixed typo in the changlelog entry for this PR Signed-off-by: Martin Gaievski <[email protected]> --------- Signed-off-by: Martin Gaievski <[email protected]>
1 parent 379acf3 commit c0c76e6

File tree

6 files changed

+35
-18
lines changed

6 files changed

+35
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
103103
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
104104
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
105105
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
106+
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))
106107

107108
### Dependencies
108109
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.opensearch.index.IndexSettings;
7373
import org.opensearch.index.VersionType;
7474
import org.opensearch.index.analysis.AnalysisRegistry;
75+
import org.opensearch.indices.IndicesService;
7576
import org.opensearch.plugins.IngestPlugin;
7677
import org.opensearch.script.ScriptService;
7778
import org.opensearch.threadpool.ThreadPool;
@@ -128,7 +129,8 @@ public IngestService(
128129
ScriptService scriptService,
129130
AnalysisRegistry analysisRegistry,
130131
List<IngestPlugin> ingestPlugins,
131-
Client client
132+
Client client,
133+
IndicesService indicesService
132134
) {
133135
this.clusterService = clusterService;
134136
this.scriptService = scriptService;
@@ -143,7 +145,8 @@ public IngestService(
143145
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC),
144146
this,
145147
client,
146-
threadPool.generic()::execute
148+
threadPool.generic()::execute,
149+
indicesService
147150
)
148151
);
149152
this.threadPool = threadPool;

server/src/main/java/org/opensearch/ingest/Processor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.common.util.concurrent.ThreadContext;
3737
import org.opensearch.env.Environment;
3838
import org.opensearch.index.analysis.AnalysisRegistry;
39+
import org.opensearch.indices.IndicesService;
3940
import org.opensearch.script.ScriptService;
4041
import org.opensearch.threadpool.Scheduler;
4142

@@ -156,6 +157,8 @@ class Parameters {
156157
*/
157158
public final Client client;
158159

160+
public final IndicesService indicesService;
161+
159162
public Parameters(
160163
Environment env,
161164
ScriptService scriptService,
@@ -165,7 +168,8 @@ public Parameters(
165168
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
166169
IngestService ingestService,
167170
Client client,
168-
Consumer<Runnable> genericExecutor
171+
Consumer<Runnable> genericExecutor,
172+
IndicesService indicesService
169173
) {
170174
this.env = env;
171175
this.scriptService = scriptService;
@@ -176,6 +180,7 @@ public Parameters(
176180
this.ingestService = ingestService;
177181
this.client = client;
178182
this.genericExecutor = genericExecutor;
183+
this.indicesService = indicesService;
179184
}
180185

181186
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -629,15 +629,6 @@ protected Node(
629629
metricsRegistry = metricsRegistryFactory.getMetricsRegistry();
630630
resourcesToClose.add(tracer::close);
631631
resourcesToClose.add(metricsRegistry::close);
632-
final IngestService ingestService = new IngestService(
633-
clusterService,
634-
threadPool,
635-
this.environment,
636-
scriptService,
637-
analysisModule.getAnalysisRegistry(),
638-
pluginsService.filterPlugins(IngestPlugin.class),
639-
client
640-
);
641632

642633
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
643634
final UsageService usageService = new UsageService();
@@ -823,6 +814,17 @@ protected Node(
823814
recoverySettings
824815
);
825816

817+
final IngestService ingestService = new IngestService(
818+
clusterService,
819+
threadPool,
820+
this.environment,
821+
scriptService,
822+
analysisModule.getAnalysisRegistry(),
823+
pluginsService.filterPlugins(IngestPlugin.class),
824+
client,
825+
indicesService
826+
);
827+
826828
final AliasValidator aliasValidator = new AliasValidator();
827829

828830
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);

server/src/test/java/org/opensearch/ingest/IngestServiceTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.opensearch.core.xcontent.XContentBuilder;
6868
import org.opensearch.index.IndexSettings;
6969
import org.opensearch.index.VersionType;
70+
import org.opensearch.indices.IndicesService;
7071
import org.opensearch.plugins.IngestPlugin;
7172
import org.opensearch.script.MockScriptEngine;
7273
import org.opensearch.script.Script;
@@ -149,7 +150,8 @@ public void testIngestPlugin() {
149150
null,
150151
null,
151152
Collections.singletonList(DUMMY_PLUGIN),
152-
client
153+
client,
154+
mock(IndicesService.class)
153155
);
154156
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
155157
assertTrue(factories.containsKey("foo"));
@@ -167,7 +169,8 @@ public void testIngestPluginDuplicate() {
167169
null,
168170
null,
169171
Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN),
170-
client
172+
client,
173+
mock(IndicesService.class)
171174
)
172175
);
173176
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
@@ -182,7 +185,8 @@ public void testExecuteIndexPipelineDoesNotExist() {
182185
null,
183186
null,
184187
Collections.singletonList(DUMMY_PLUGIN),
185-
client
188+
client,
189+
mock(IndicesService.class)
186190
);
187191
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
188192
.source(emptyMap())
@@ -1485,7 +1489,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
14851489
null,
14861490
null,
14871491
Arrays.asList(testPlugin),
1488-
client
1492+
client,
1493+
mock(IndicesService.class)
14891494
);
14901495
ingestService.addIngestClusterStateListener(ingestClusterStateListener);
14911496

@@ -1702,7 +1707,7 @@ private static IngestService createWithProcessors(Map<String, Processor.Factory>
17021707
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
17031708
return processors;
17041709
}
1705-
}), client);
1710+
}), client, mock(IndicesService.class));
17061711
}
17071712

17081713
private CompoundProcessor mockCompoundProcessor() {

server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2209,7 +2209,8 @@ public void onFailure(final Exception e) {
22092209
scriptService,
22102210
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
22112211
Collections.emptyList(),
2212-
client
2212+
client,
2213+
indicesService
22132214
),
22142215
transportShardBulkAction,
22152216
client,

0 commit comments

Comments
 (0)