Skip to content

Commit cdf5147

Browse files
Add methods to JmxProxy so know which datacenters each host is in.
If `allowUnreachableNodes=true` then it's only required that we have metrics to all hosts within the coordinator's datacenter. Taking this approach allows us to remove the cached host metrics in the Cassandra backend.
1 parent 08bc563 commit cdf5147

File tree

8 files changed

+298
-402
lines changed

8 files changed

+298
-402
lines changed

src/main/java/com/spotify/reaper/cassandra/JmxProxy.java

+63-33
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.math.BigInteger;
2121
import java.net.InetSocketAddress;
2222
import java.net.MalformedURLException;
23+
import java.net.UnknownHostException;
2324
import java.rmi.server.RMIClientSocketFactory;
2425
import java.rmi.server.RMISocketFactory;
2526
import java.util.AbstractMap;
@@ -31,7 +32,6 @@
3132
import java.util.Map;
3233
import java.util.Set;
3334
import javax.rmi.ssl.SslRMIClientSocketFactory;
34-
import java.util.concurrent.Callable;
3535
import java.util.concurrent.ExecutionException;
3636
import java.util.concurrent.ExecutorService;
3737
import java.util.concurrent.Executors;
@@ -57,6 +57,7 @@
5757
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
5858
import org.apache.cassandra.gms.FailureDetector;
5959
import org.apache.cassandra.gms.FailureDetectorMBean;
60+
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
6061
import org.apache.cassandra.repair.RepairParallelism;
6162
import org.apache.cassandra.repair.messages.RepairOption;
6263
import org.apache.cassandra.service.ActiveRepairService;
@@ -66,7 +67,6 @@
6667
import org.slf4j.Logger;
6768
import org.slf4j.LoggerFactory;
6869

69-
import com.google.common.base.Function;
7070
import com.google.common.base.Optional;
7171
import com.google.common.collect.Lists;
7272
import com.google.common.collect.Maps;
@@ -100,6 +100,7 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
100100
private final ObjectName ssMbeanName;
101101
private final MBeanServerConnection mbeanServer;
102102
private final CompactionManagerMBean cmProxy;
103+
private final EndpointSnitchInfoMBean endpointSnitchMbean;
103104
private final Object ssProxy;
104105
private final Object fdProxy;
105106
private final Optional<RepairStatusHandler> repairStatusHandler;
@@ -110,9 +111,18 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
110111
public static final Integer JMX_CONNECTION_TIMEOUT = 5;
111112
public static final TimeUnit JMX_CONNECTION_TIMEOUT_UNIT = TimeUnit.SECONDS;
112113

113-
private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXServiceURL jmxUrl,
114-
JMXConnector jmxConnector, Object ssProxy, ObjectName ssMbeanName,
115-
MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy, FailureDetectorMBean fdProxy) {
114+
private JmxProxy(
115+
Optional<RepairStatusHandler> handler,
116+
String host,
117+
JMXServiceURL jmxUrl,
118+
JMXConnector jmxConnector,
119+
Object ssProxy,
120+
ObjectName ssMbeanName,
121+
MBeanServerConnection mbeanServer,
122+
CompactionManagerMBean cmProxy,
123+
EndpointSnitchInfoMBean endpointSnitchMbean,
124+
FailureDetectorMBean fdProxy) {
125+
116126
this.host = host;
117127
this.jmxUrl = jmxUrl;
118128
this.jmxConnector = jmxConnector;
@@ -121,6 +131,7 @@ private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXServiceU
121131
this.ssProxy = ssProxy;
122132
this.repairStatusHandler = handler;
123133
this.cmProxy = cmProxy;
134+
this.endpointSnitchMbean = endpointSnitchMbean;
124135
this.clusterName = Cluster.toSymbolicName(((StorageServiceMBean) ssProxy).getClusterName());
125136
this.fdProxy = fdProxy;
126137
}
@@ -157,12 +168,19 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, Stri
157168
* @param password password to use for JMX authentication
158169
* @param addressTranslator if EC2MultiRegionAddressTranslator isn't null it will be used to translate addresses
159170
*/
160-
static JmxProxy connect(Optional<RepairStatusHandler> handler, String originalHost, int port,
161-
String username, String password, final EC2MultiRegionAddressTranslator addressTranslator, int connectionTimeout)
162-
throws ReaperException {
171+
static JmxProxy connect(
172+
Optional<RepairStatusHandler> handler,
173+
String originalHost,
174+
int port,
175+
String username,
176+
String password,
177+
final EC2MultiRegionAddressTranslator addressTranslator,
178+
int connectionTimeout) throws ReaperException {
179+
163180
ObjectName ssMbeanName;
164181
ObjectName cmMbeanName;
165182
ObjectName fdMbeanName;
183+
ObjectName endpointSnitchMbeanName;
166184
JMXServiceURL jmxUrl;
167185
String host = originalHost;
168186

@@ -177,35 +195,44 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String originalHo
177195
ssMbeanName = new ObjectName(SS_OBJECT_NAME);
178196
cmMbeanName = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
179197
fdMbeanName = new ObjectName(FailureDetector.MBEAN_NAME);
198+
endpointSnitchMbeanName = new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo");
180199
} catch (MalformedURLException | MalformedObjectNameException e) {
181200
LOG.error(String.format("Failed to prepare the JMX connection to %s:%s", host, port));
182201
throw new ReaperException("Failure during preparations for JMX connection", e);
183202
}
184203
try {
185-
Map<String, Object> env = new HashMap<String, Object>();
204+
Map<String, Object> env = new HashMap<>();
186205
if (username != null && password != null) {
187206
String[] creds = {username, password};
188207
env.put(JMXConnector.CREDENTIALS, creds);
189208
}
190209
env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());
191210
JMXConnector jmxConn = connectWithTimeout(jmxUrl, connectionTimeout, TimeUnit.SECONDS, env);
192211
MBeanServerConnection mbeanServerConn = jmxConn.getMBeanServerConnection();
193-
Object ssProxy =
194-
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
212+
Object ssProxy = JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
195213
String cassandraVersion = ((StorageServiceMBean) ssProxy).getReleaseVersion();
196214
if(cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")){
197-
ssProxy = JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean20.class);
215+
ssProxy = JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean20.class);
198216
}
199217

200-
CompactionManagerMBean cmProxy =
201-
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
218+
CompactionManagerMBean cmProxy = JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
219+
FailureDetectorMBean fdProxy = JMX.newMBeanProxy(mbeanServerConn, fdMbeanName, FailureDetectorMBean.class);
202220

221+
EndpointSnitchInfoMBean endpointSnitchProxy
222+
= JMX.newMBeanProxy(mbeanServerConn, endpointSnitchMbeanName, EndpointSnitchInfoMBean.class);
203223

204-
FailureDetectorMBean fdProxy =
205-
JMX.newMBeanProxy(mbeanServerConn, fdMbeanName, FailureDetectorMBean.class);
224+
JmxProxy proxy = new JmxProxy(
225+
handler,
226+
host,
227+
jmxUrl,
228+
jmxConn,
229+
ssProxy,
230+
ssMbeanName,
231+
mbeanServerConn,
232+
cmProxy,
233+
endpointSnitchProxy,
234+
fdProxy);
206235

207-
JmxProxy proxy = new JmxProxy(handler, host, jmxUrl, jmxConn, ssProxy, ssMbeanName,
208-
mbeanServerConn, cmProxy, fdProxy);
209236
// registering a listener throws bunch of exceptions, so we do it here rather than in the
210237
// constructor
211238
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
@@ -233,23 +260,30 @@ public String getHost() {
233260
return host;
234261
}
235262

263+
public String getDataCenter() {
264+
// return endpointSnitchMbean.getDatacenter(); // not available until Cassandra-3.0
265+
return getDataCenter(host);
266+
}
267+
268+
public String getDataCenter(String host) {
269+
try {
270+
return endpointSnitchMbean.getDatacenter(host);
271+
} catch (UnknownHostException ex) {
272+
throw new IllegalArgumentException(ex);
273+
}
274+
}
275+
236276
/**
237277
* @return list of tokens in the cluster
238278
*/
239279
public List<BigInteger> getTokens() {
240280
checkNotNull(ssProxy, "Looks like the proxy is not connected");
281+
241282
return Lists.transform(
242-
Lists.newArrayList(((StorageServiceMBean) ssProxy).getTokenToEndpointMap().keySet()),
243-
new Function<String, BigInteger>() {
244-
@Override
245-
public BigInteger apply(String s) {
246-
return new BigInteger(s);
247-
}
248-
});
283+
Lists.newArrayList(((StorageServiceMBean) ssProxy).getTokenToEndpointMap().keySet()), s -> new BigInteger(s));
249284
}
250285

251-
public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
252-
throws ReaperException {
286+
public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) throws ReaperException {
253287
checkNotNull(ssProxy, "Looks like the proxy is not connected");
254288
try {
255289
return ((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace);
@@ -259,8 +293,7 @@ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
259293
}
260294
}
261295

262-
public List<RingRange> getRangesForLocalEndpoint(String keyspace)
263-
throws ReaperException {
296+
public List<RingRange> getRangesForLocalEndpoint(String keyspace) throws ReaperException {
264297
checkNotNull(ssProxy, "Looks like the proxy is not connected");
265298
List<RingRange> localRanges = Lists.newArrayList();
266299
try {
@@ -274,7 +307,6 @@ public List<RingRange> getRangesForLocalEndpoint(String keyspace)
274307
}
275308
});
276309

277-
278310
LOG.info("LOCAL RANGES {}", localRanges);
279311
return localRanges;
280312
} catch (Exception e) {
@@ -294,8 +326,7 @@ public String getLocalEndpoint() {
294326
@NotNull
295327
public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange) {
296328
checkNotNull(ssProxy, "Looks like the proxy is not connected");
297-
Set<Map.Entry<List<String>, List<String>>> entries =
298-
((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace).entrySet();
329+
Set<Map.Entry<List<String>, List<String>>> entries = ((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace).entrySet();
299330
for (Map.Entry<List<String>, List<String>> entry : entries) {
300331
BigInteger rangeStart = new BigInteger(entry.getKey().get(0));
301332
BigInteger rangeEnd = new BigInteger(entry.getKey().get(1));
@@ -396,7 +427,6 @@ public boolean isRepairRunning() {
396427
return isRepairRunningPre22() || isRepairRunningPost22() || isValidationCompactionRunning();
397428
}
398429

399-
400430
/**
401431
* @return true if any repairs are running on the node.
402432
*/

src/main/java/com/spotify/reaper/service/RepairRunner.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.spotify.reaper.storage.IDistributedStorage;
4545
import java.util.Collection;
4646
import java.util.UUID;
47+
import java.util.stream.Collectors;
4748

4849
public class RepairRunner implements Runnable {
4950

@@ -325,9 +326,9 @@ private boolean repairSegment(final int rangeIndex, final UUID segmentId, RingRa
325326

326327
List<String> potentialCoordinators;
327328
if(!repairUnit.getIncrementalRepair()) {
328-
// full repair
329+
// full repair
329330
try {
330-
potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange);
331+
potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange);
331332
} catch (RuntimeException e) {
332333
LOG.warn("Couldn't get token ranges from coordinator: #{}", e);
333334
return true;
@@ -370,11 +371,11 @@ public void onSuccess(Object ignored) {
370371
@Override
371372
public void onFailure(Throwable t) {
372373
currentlyRunningSegments.set(rangeIndex, null);
373-
LOG.error("Executing SegmentRunner failed: {}", t.getMessage());
374+
LOG.error("Executing SegmentRunner failed", t);
374375
}
375376
});
376377
} catch (ReaperException ex) {
377-
LOG.error("Executing SegmentRunner failed: {}", ex.getMessage());
378+
LOG.error("Executing SegmentRunner failed", ex);
378379
}
379380

380381
return true;

0 commit comments

Comments
 (0)