Skip to content

Commit 96ef445

Browse files
authored
Datanode sniffer implementation for opensearch client (#22184)
* Datanode sniffer implementation for opensearch client * added license * code cleanup * fixed import * Added changelog
1 parent 31be465 commit 96ef445

File tree

14 files changed

+475
-156
lines changed

14 files changed

+475
-156
lines changed

changelog/unreleased/pr-22184.toml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type = "added"
2+
message = "Added datanode sniffer to opensearch clients where applicable"
3+
4+
issues = []
5+
pulls = ["22155"]

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/OpenSearch2Module.java

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.inject.assistedinject.FactoryModuleBuilder;
2020
import com.google.inject.binder.LinkedBindingBuilder;
21+
import com.google.inject.multibindings.Multibinder;
2122
import org.graylog.events.search.MoreSearchAdapter;
2223
import org.graylog.plugins.datanode.DatanodeUpgradeServiceAdapter;
2324
import org.graylog.plugins.views.migrations.V20200730000000_AddGl2MessageIdFieldAliasForEvents;
@@ -28,6 +29,12 @@
2829
import org.graylog.storage.opensearch2.client.OSCredentialsProvider;
2930
import org.graylog.storage.opensearch2.fieldtypes.streams.StreamsForFieldRetrieverOS2;
3031
import org.graylog.storage.opensearch2.migrations.V20170607164210_MigrateReopenedIndicesToAliasesClusterStateOS2;
32+
import org.graylog.storage.opensearch2.sniffer.SnifferBuilder;
33+
import org.graylog.storage.opensearch2.sniffer.SnifferFilter;
34+
import org.graylog.storage.opensearch2.sniffer.impl.DatanodesSniffer;
35+
import org.graylog.storage.opensearch2.sniffer.impl.NodeAttributesFilter;
36+
import org.graylog.storage.opensearch2.sniffer.impl.NodeLoggingFilter;
37+
import org.graylog.storage.opensearch2.sniffer.impl.OpensearchClusterSniffer;
3138
import org.graylog.storage.opensearch2.views.migrations.V20200730000000_AddGl2MessageIdFieldAliasForEventsOS2;
3239
import org.graylog2.indexer.IndexToolsAdapter;
3340
import org.graylog2.indexer.client.IndexerHostsAdapter;
@@ -93,6 +100,14 @@ protected void configure() {
93100
bind(CredentialsProvider.class).toProvider(OSCredentialsProvider.class);
94101
bindForSupportedVersion(DatanodeUpgradeServiceAdapter.class).to(DatanodeUpgradeServiceAdapterOS2.class);
95102

103+
Multibinder<SnifferBuilder> snifferBuilders = Multibinder.newSetBinder(binder(), SnifferBuilder.class);
104+
snifferBuilders.addBinding().to(OpensearchClusterSniffer.class);
105+
snifferBuilders.addBinding().to(DatanodesSniffer.class);
106+
107+
Multibinder<SnifferFilter> snifferFilters = Multibinder.newSetBinder(binder(), SnifferFilter.class);
108+
snifferFilters.addBinding().to(NodeAttributesFilter.class);
109+
snifferFilters.addBinding().to(NodeLoggingFilter.class);
110+
96111
bindForSupportedVersion(IndexerHostsAdapter.class).to(IndexerHostsAdapterOS2.class);
97112
}
98113

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RestClientProvider.java

+32-27
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.graylog.storage.opensearch2;
1818

1919
import com.google.common.base.Suppliers;
20+
import jakarta.annotation.Nonnull;
2021
import jakarta.inject.Inject;
2122
import jakarta.inject.Provider;
2223
import jakarta.inject.Singleton;
@@ -25,24 +26,28 @@
2526
import org.graylog.shaded.opensearch2.org.apache.http.client.CredentialsProvider;
2627
import org.graylog.shaded.opensearch2.org.opensearch.client.RestClient;
2728
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
28-
import org.graylog.shaded.opensearch2.org.opensearch.client.sniff.OpenSearchNodesSniffer;
29+
import org.graylog.shaded.opensearch2.org.opensearch.client.sniff.NodesSniffer;
30+
import org.graylog.shaded.opensearch2.org.opensearch.client.sniff.Sniffer;
31+
import org.graylog.storage.opensearch2.sniffer.SnifferAggregator;
32+
import org.graylog.storage.opensearch2.sniffer.SnifferBuilder;
33+
import org.graylog.storage.opensearch2.sniffer.SnifferFilter;
2934
import org.graylog2.configuration.ElasticsearchClientConfiguration;
3035
import org.graylog2.configuration.IndexerHosts;
3136
import org.graylog2.configuration.RunsWithDataNode;
3237
import org.graylog2.security.IndexerJwtAuthTokenProvider;
3338
import org.graylog2.security.TrustManagerAndSocketFactoryProvider;
3439
import org.graylog2.system.shutdown.GracefulShutdownService;
35-
import jakarta.annotation.Nonnull;
3640

3741
import java.net.URI;
3842
import java.util.List;
39-
import java.util.Locale;
40-
import java.util.concurrent.TimeUnit;
43+
import java.util.Set;
4144
import java.util.function.Supplier;
4245

4346
@Singleton
4447
public class RestClientProvider implements Provider<RestHighLevelClient> {
4548
private final Supplier<RestHighLevelClient> clientSupplier;
49+
private final Set<SnifferBuilder> snifferBuilders;
50+
private final Set<SnifferFilter> snifferFilters;
4651
private final GracefulShutdownService shutdownService;
4752
private final ElasticsearchClientConfiguration configuration;
4853
private final CredentialsProvider credentialsProvider;
@@ -58,46 +63,46 @@ public RestClientProvider(
5863
CredentialsProvider credentialsProvider,
5964
TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider,
6065
@RunsWithDataNode Boolean runsWithDataNode,
61-
IndexerJwtAuthTokenProvider indexerJwtAuthTokenProvider) {
66+
IndexerJwtAuthTokenProvider indexerJwtAuthTokenProvider,
67+
Set<SnifferBuilder> snifferBuilders,
68+
Set<SnifferFilter> snifferFilters
69+
) {
6270
this.shutdownService = shutdownService;
6371
this.configuration = configuration;
6472
this.credentialsProvider = credentialsProvider;
6573
this.trustManagerAndSocketFactoryProvider = trustManagerAndSocketFactoryProvider;
6674
this.runsWithDataNode = runsWithDataNode;
6775
this.indexerJwtAuthTokenProvider = indexerJwtAuthTokenProvider;
6876
this.clientSupplier = Suppliers.memoize(() -> createClient(hosts));
77+
this.snifferBuilders = snifferBuilders;
78+
this.snifferFilters = snifferFilters;
6979
}
7080

7181

7282
@Nonnull
7383
private RestHighLevelClient createClient(List<URI> hosts) {
7484
final RestHighLevelClient client = buildBasicRestClient(hosts);
85+
registerSniffers(client);
86+
return client;
87+
}
7588

76-
var sniffer = SnifferWrapper.create(
77-
client,
78-
TimeUnit.SECONDS.toMillis(5),
79-
configuration.discoveryFrequency(),
80-
mapDefaultScheme(configuration.defaultSchemeForDiscoveredNodes())
81-
);
89+
private void registerSniffers(RestHighLevelClient client) {
90+
final List<NodesSniffer> sniffers = snifferBuilders.stream()
91+
.filter(SnifferBuilder::enabled)
92+
.map(b -> b.create(client.getLowLevelClient()))
93+
.toList();
8294

83-
if (configuration.discoveryEnabled()) {
84-
sniffer.add(FilteredOpenSearchNodesSniffer.create(configuration.discoveryFilter()));
85-
}
86-
if (configuration.isNodeActivityLogger()) {
87-
sniffer.add(NodeListSniffer.create());
88-
}
95+
if (!sniffers.isEmpty()) {
96+
final List<SnifferFilter> filters = snifferFilters.stream().filter(SnifferFilter::enabled).toList();
97+
final SnifferAggregator snifferAggregator = new SnifferAggregator(sniffers, filters);
8998

90-
sniffer.build().ifPresent(s -> shutdownService.register(s::close));
91-
return client;
92-
}
99+
final Sniffer sniffer = Sniffer.builder(client.getLowLevelClient())
100+
.setSniffIntervalMillis(Math.toIntExact(configuration.discoveryFrequency().toMilliseconds()))
101+
.setNodesSniffer(snifferAggregator)
102+
.build();
93103

94-
private OpenSearchNodesSniffer.Scheme mapDefaultScheme(String defaultSchemeForDiscoveredNodes) {
95-
return switch (defaultSchemeForDiscoveredNodes.toUpperCase(Locale.ENGLISH)) {
96-
case "HTTP" -> OpenSearchNodesSniffer.Scheme.HTTP;
97-
case "HTTPS" -> OpenSearchNodesSniffer.Scheme.HTTPS;
98-
default ->
99-
throw new IllegalArgumentException("Invalid default scheme for discovered OS nodes: " + defaultSchemeForDiscoveredNodes);
100-
};
104+
shutdownService.register(sniffer::close);
105+
}
101106
}
102107

103108
@Override

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/SnifferWrapper.java

-74
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog.storage.opensearch2.sniffer;
18+
19+
import jakarta.annotation.Nonnull;
20+
import org.graylog.shaded.opensearch2.org.opensearch.client.Node;
21+
import org.graylog.shaded.opensearch2.org.opensearch.client.sniff.NodesSniffer;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Set;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.Stream;
32+
import java.util.function.Predicate;
33+
import java.util.function.Function;
34+
35+
/**
36+
* This aggregator has two functions. It allows more sniffers to work together, each providing its own list of available nodes,
37+
* even if other sniffers fail. Then it applies filters to the result. Each filter can modify the nodes list.
38+
*/
39+
public class SnifferAggregator implements org.graylog.shaded.opensearch2.org.opensearch.client.sniff.NodesSniffer {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(SnifferAggregator.class);
42+
43+
private final List<NodesSniffer> sniffers;
44+
private final List<SnifferFilter> filters;
45+
46+
public SnifferAggregator(List<NodesSniffer> sniffers, List<SnifferFilter> filters) {
47+
this.sniffers = sniffers;
48+
this.filters = filters;
49+
}
50+
51+
@Override
52+
public List<Node> sniff() throws IOException {
53+
List<Node> discoveredNodes = discoverNodes().stream()
54+
.filter(distinctByKey(n -> n.getHost().toURI()))
55+
.collect(Collectors.toCollection(ArrayList::new));
56+
57+
for (SnifferFilter sniffer : filters) {
58+
discoveredNodes = sniffer.filterNodes(discoveredNodes);
59+
}
60+
return discoveredNodes;
61+
}
62+
63+
@Nonnull
64+
private List<Node> discoverNodes() {
65+
return sniffers.stream().flatMap(SnifferAggregator::sniff).toList();
66+
}
67+
68+
@Nonnull
69+
private static Stream<Node> sniff(NodesSniffer sniffer) {
70+
try {
71+
return sniffer.sniff().stream();
72+
} catch (IOException e) {
73+
LOG.warn("Sniffer {} failed to sniff nodes: {}", sniffer.getClass(), e.getMessage());
74+
return Stream.empty();
75+
}
76+
}
77+
78+
private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
79+
Set<Object> seen = ConcurrentHashMap.newKeySet();
80+
return t -> seen.add(keyExtractor.apply(t));
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog.storage.opensearch2.sniffer;
18+
19+
import org.graylog.shaded.opensearch2.org.opensearch.client.RestClient;
20+
import org.graylog.shaded.opensearch2.org.opensearch.client.sniff.NodesSniffer;
21+
22+
public interface SnifferBuilder {
23+
/**
24+
* @return true if the configuration of this node allows that type of sniffer
25+
*/
26+
boolean enabled();
27+
28+
/**
29+
* @return Always a new instance of a sniffer.
30+
*/
31+
NodesSniffer create(RestClient restClient);
32+
}

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/NodesSniffer.java renamed to graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/sniffer/SnifferFilter.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
* along with this program. If not, see
1515
* <http://www.mongodb.com/licensing/server-side-public-license>.
1616
*/
17-
package org.graylog.storage.opensearch2;
17+
package org.graylog.storage.opensearch2.sniffer;
1818

1919
import org.graylog.shaded.opensearch2.org.opensearch.client.Node;
2020

2121
import java.io.IOException;
2222
import java.util.List;
2323

24-
public interface NodesSniffer {
25-
List<Node> sniff(List<Node> nodes) throws IOException;
24+
public interface SnifferFilter {
25+
boolean enabled();
26+
List<Node> filterNodes(List<Node> nodes) throws IOException;
2627
}

0 commit comments

Comments
 (0)