Skip to content

Commit 4cd8861

Browse files
committed
fix not closing jmx connector sometimes
1 parent 323b4b9 commit 4cd8861

File tree

7 files changed

+58
-45
lines changed

7 files changed

+58
-45
lines changed

src/main/java/com/spotify/reaper/cassandra/JmxProxy.java

+44-31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.common.base.Function;
1717
import com.google.common.base.Optional;
1818
import com.google.common.collect.Lists;
19+
1920
import com.spotify.reaper.ReaperException;
2021
import com.spotify.reaper.core.Cluster;
2122
import com.spotify.reaper.service.RingRange;
@@ -73,12 +74,14 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
7374
private final StorageServiceMBean ssProxy;
7475
private final Optional<RepairStatusHandler> repairStatusHandler;
7576
private final String host;
77+
private final JMXServiceURL jmxUrl;
7678
private final String clusterName;
7779

78-
private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnector jmxConnector,
79-
StorageServiceMBean ssProxy, ObjectName ssMbeanName,
80-
MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) {
80+
private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXServiceURL jmxUrl,
81+
JMXConnector jmxConnector, StorageServiceMBean ssProxy, ObjectName ssMbeanName,
82+
MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) {
8183
this.host = host;
84+
this.jmxUrl = jmxUrl;
8285
this.jmxConnector = jmxConnector;
8386
this.ssMbeanName = ssMbeanName;
8487
this.mbeanServer = mbeanServer;
@@ -88,11 +91,12 @@ private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnecto
8891
this.clusterName = Cluster.toSymbolicName(ssProxy.getClusterName());
8992
}
9093

91-
94+
9295
/**
9396
* @see JmxProxy#connect(Optional, String, int, String, String)
9497
*/
95-
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, String username, String password)
98+
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, String username,
99+
String password)
96100
throws ReaperException {
97101
assert null != host : "null host given to JmxProxy.connect()";
98102
String[] parts = host.split(":");
@@ -103,22 +107,24 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, Stri
103107
}
104108
}
105109

106-
110+
107111
/**
108112
* Connect to JMX interface on the given host and port.
109113
*
110-
* @param handler Implementation of {@link RepairStatusHandler} to process incoming notifications
111-
* of repair events.
112-
* @param host hostname or ip address of Cassandra node
113-
* @param port port number to use for JMX connection
114+
* @param handler Implementation of {@link RepairStatusHandler} to process incoming
115+
* notifications
116+
* of repair events.
117+
* @param host hostname or ip address of Cassandra node
118+
* @param port port number to use for JMX connection
114119
* @param username username to use for JMX authentication
115120
* @param password password to use for JMX authentication
116121
*/
117-
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int port, String username, String password)
122+
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int port,
123+
String username, String password)
118124
throws ReaperException {
119-
JMXServiceURL jmxUrl;
120125
ObjectName ssMbeanName;
121126
ObjectName cmMbeanName;
127+
JMXServiceURL jmxUrl;
122128
try {
123129
jmxUrl = new JMXServiceURL(String.format(JMX_URL, host, port));
124130
ssMbeanName = new ObjectName(SS_OBJECT_NAME);
@@ -129,7 +135,7 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
129135
}
130136
try {
131137
Map<String, Object> env = new HashMap<String, Object>();
132-
if(username != null && password != null) {
138+
if (username != null && password != null) {
133139
String[] creds = {username, password};
134140
env.put(JMXConnector.CREDENTIALS, creds);
135141
}
@@ -139,12 +145,13 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
139145
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
140146
CompactionManagerMBean cmProxy =
141147
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
142-
JmxProxy proxy =
143-
new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy);
148+
JmxProxy proxy = new JmxProxy(handler, host, jmxUrl, jmxConn, ssProxy, ssMbeanName,
149+
mbeanServerConn, cmProxy);
144150
// registering a listener throws bunch of exceptions, so we do it here rather than in the
145151
// constructor
146152
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
147-
LOG.debug(String.format("JMX connection to %s properly connected.", host));
153+
LOG.debug(String.format("JMX connection to %s properly connected: %s",
154+
host, jmxUrl.toString()));
148155
return proxy;
149156
} catch (IOException | InstanceNotFoundException e) {
150157
LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port));
@@ -293,13 +300,12 @@ public void cancelAllRepairs() {
293300

294301
/**
295302
* Checks if table exists in the cluster by instantiating a MBean for that table.
296-
*
297303
*/
298304
public boolean tableExists(String ks, String cf) {
299305
try {
300306
String type = cf.contains(".") ? "IndexColumnFamilies" : "ColumnFamilies";
301307
String nameStr = String.format("org.apache.cassandra.db:type=*%s,keyspace=%s,columnfamily=%s",
302-
type, ks, cf);
308+
type, ks, cf);
303309
Set<ObjectName> beans = mbeanServer.queryNames(new ObjectName(nameStr), null);
304310
if (beans.isEmpty() || beans.size() != 1) {
305311
return false;
@@ -308,7 +314,7 @@ public boolean tableExists(String ks, String cf) {
308314
JMX.newMBeanProxy(mbeanServer, bean, ColumnFamilyStoreMBean.class);
309315
} catch (MalformedObjectNameException | IOException e) {
310316
String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf,
311-
e.getMessage());
317+
e.getMessage());
312318
LOG.warn(errMsg);
313319
return false;
314320
}
@@ -323,7 +329,7 @@ public boolean tableExists(String ks, String cf) {
323329
* @return Repair command number, or 0 if nothing to repair
324330
*/
325331
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
326-
RepairParallelism repairParallelism, Collection<String> columnFamilies) {
332+
RepairParallelism repairParallelism, Collection<String> columnFamilies) {
327333
checkNotNull(ssProxy, "Looks like the proxy is not connected");
328334
String cassandraVersion = ssProxy.getReleaseVersion();
329335
boolean canUseDatacenterAware = false;
@@ -336,27 +342,27 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
336342
+ "host %s, with repair parallelism %s, in cluster with Cassandra "
337343
+ "version '%s' (can use DATACENTER_AWARE '%s'), "
338344
+ "for column families: %s",
339-
beginToken.toString(), endToken.toString(), keyspace, this.host,
340-
repairParallelism, cassandraVersion, canUseDatacenterAware,
341-
columnFamilies);
345+
beginToken.toString(), endToken.toString(), keyspace, this.host,
346+
repairParallelism, cassandraVersion, canUseDatacenterAware,
347+
columnFamilies);
342348
LOG.info(msg);
343349
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
344350
if (canUseDatacenterAware) {
345351
return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
346-
repairParallelism.ordinal(), null, null,
347-
columnFamilies
348-
.toArray(new String[columnFamilies.size()]));
352+
repairParallelism.ordinal(), null, null,
353+
columnFamilies
354+
.toArray(new String[columnFamilies.size()]));
349355
} else {
350356
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
351357
+ " falling back to SEQUENTIAL repair.",
352-
cassandraVersion);
358+
cassandraVersion);
353359
repairParallelism = RepairParallelism.SEQUENTIAL;
354360
}
355361
}
356362
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
357363
return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
358-
snapshotRepair, false,
359-
columnFamilies.toArray(new String[columnFamilies.size()]));
364+
snapshotRepair, false,
365+
columnFamilies.toArray(new String[columnFamilies.size()]));
360366
}
361367

362368
/**
@@ -404,11 +410,18 @@ public boolean isConnectionAlive() {
404410
*/
405411
@Override
406412
public void close() throws ReaperException {
413+
LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl));
407414
try {
408415
mbeanServer.removeNotificationListener(ssMbeanName, this);
416+
} catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) {
417+
LOG.warn("failed on removing notification listener");
418+
e.printStackTrace();
419+
}
420+
try {
409421
jmxConnector.close();
410-
} catch (IOException | InstanceNotFoundException | ListenerNotFoundException e) {
411-
throw new ReaperException(e);
422+
} catch (IOException e) {
423+
LOG.warn("failed closing a JMX connection");
424+
e.printStackTrace();
412425
}
413426
}
414427

src/main/java/com/spotify/reaper/resources/ClusterResource.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ClusterResource(AppContext context) {
5959

6060
@GET
6161
public Response getClusterList() {
62-
LOG.info("get cluster list called");
62+
LOG.debug("get cluster list called");
6363
Collection<Cluster> clusters = context.storage.getClusters();
6464
List<String> clusterNames = new ArrayList<>();
6565
for (Cluster cluster : clusters) {
@@ -73,7 +73,7 @@ public Response getClusterList() {
7373
public Response getCluster(
7474
@PathParam("cluster_name") String clusterName,
7575
@QueryParam("limit") Optional<Integer> limit) {
76-
LOG.info("get cluster called with cluster_name: {}", clusterName);
76+
LOG.debug("get cluster called with cluster_name: {}", clusterName);
7777
return viewCluster(clusterName, limit, Optional.<URI>absent());
7878
}
7979

@@ -107,7 +107,7 @@ public Response addCluster(
107107
LOG.error("POST on cluster resource called without seedHost");
108108
return Response.status(400).entity("query parameter \"seedHost\" required").build();
109109
}
110-
LOG.info("add cluster called with seedHost: {}", seedHost.get());
110+
LOG.debug("add cluster called with seedHost: {}", seedHost.get());
111111

112112
Cluster newCluster;
113113
try {

src/main/java/com/spotify/reaper/resources/PingResource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class PingResource {
2929

3030
@GET
3131
public String answerPing() {
32-
LOG.info("ping called");
32+
LOG.debug("ping called");
3333
return String.format("Cassandra Reaper ping resource: PONG");
3434
}
3535

src/main/java/com/spotify/reaper/resources/RepairRunResource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ private Response abortRun(RepairRun repairRun, RepairUnit repairUnit, int segmen
327327
@GET
328328
@Path("/{id}")
329329
public Response getRepairRun(@PathParam("id") Long repairRunId) {
330-
LOG.info("get repair_run called with: id = {}", repairRunId);
330+
LOG.debug("get repair_run called with: id = {}", repairRunId);
331331
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
332332
if (repairRun.isPresent()) {
333333
return Response.ok().entity(getRepairRunStatus(repairRun.get())).build();
@@ -343,7 +343,7 @@ public Response getRepairRun(@PathParam("id") Long repairRunId) {
343343
@GET
344344
@Path("/cluster/{cluster_name}")
345345
public Response getRepairRunsForCluster(@PathParam("cluster_name") String clusterName) {
346-
LOG.info("get repair run for cluster called with: cluster_name = {}", clusterName);
346+
LOG.debug("get repair run for cluster called with: cluster_name = {}", clusterName);
347347
Collection<RepairRun> repairRuns = context.storage.getRepairRunsForCluster(clusterName);
348348
Collection<RepairRunStatus> repairRunViews = new ArrayList<>();
349349
for (RepairRun repairRun : repairRuns) {

src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ private Response resumeSchedule(RepairSchedule repairSchedule, RepairUnit repair
259259
@GET
260260
@Path("/{id}")
261261
public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) {
262-
LOG.info("get repair_schedule called with: id = {}", repairScheduleId);
262+
LOG.debug("get repair_schedule called with: id = {}", repairScheduleId);
263263
Optional<RepairSchedule> repairSchedule = context.storage.getRepairSchedule(repairScheduleId);
264264
if (repairSchedule.isPresent()) {
265265
return Response.ok().entity(getRepairScheduleStatus(repairSchedule.get())).build();
@@ -275,7 +275,7 @@ public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) {
275275
@GET
276276
@Path("/cluster/{cluster_name}")
277277
public Response getRepairSchedulesForCluster(@PathParam("cluster_name") String clusterName) {
278-
LOG.info("get repair schedules for cluster called with: cluster_name = {}", clusterName);
278+
LOG.debug("get repair schedules for cluster called with: cluster_name = {}", clusterName);
279279
Collection<RepairSchedule> repairSchedules =
280280
context.storage.getRepairSchedulesForCluster(clusterName);
281281
Collection<RepairScheduleStatus> repairScheduleViews = new ArrayList<>();
@@ -319,6 +319,7 @@ private URI buildRepairScheduleURI(UriInfo uriInfo, RepairSchedule repairSchedul
319319
*/
320320
@GET
321321
public Response listSchedules() {
322+
LOG.debug("list all repair schedules called");
322323
List<RepairScheduleStatus> scheduleStatuses = Lists.newArrayList();
323324
Collection<RepairSchedule> schedules = context.storage.getAllRepairSchedules();
324325
for (RepairSchedule schedule : schedules) {

src/main/java/com/spotify/reaper/service/RepairManager.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import com.spotify.reaper.AppContext;
1010
import com.spotify.reaper.ReaperException;
11+
import com.spotify.reaper.cassandra.JmxProxy;
1112
import com.spotify.reaper.core.RepairRun;
1213
import com.spotify.reaper.core.RepairSegment;
1314

@@ -59,9 +60,9 @@ public void resumeRunningRepairRuns(AppContext context) {
5960
Collection<RepairSegment> runningSegments =
6061
context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING);
6162
for (RepairSegment segment : runningSegments) {
62-
try {
63-
SegmentRunner.abort(context, segment,
64-
context.jmxConnectionFactory.connect(segment.getCoordinatorHost()));
63+
try (JmxProxy jmxProxy = context.jmxConnectionFactory
64+
.connect(segment.getCoordinatorHost())) {
65+
SegmentRunner.abort(context, segment, jmxProxy);
6566
} catch (ReaperException e) {
6667
LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down"
6768
+ " (so abortion won't be needed)", segment.getId());

src/main/java/com/spotify/reaper/service/SegmentRunner.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import java.lang.management.ManagementFactory;
3838
import java.lang.management.OperatingSystemMXBean;
39-
import java.net.SocketException;
4039
import java.util.Collection;
4140
import java.util.Map;
4241
import java.util.concurrent.TimeUnit;
@@ -353,8 +352,7 @@ protected void tryClearSnapshots(String message) {
353352
String repairId = parseRepairId(message);
354353
if (repairId != null) {
355354
for (String involvedNode : potentialCoordinators) {
356-
try {
357-
JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode);
355+
try (JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode)) {
358356
// there is no way of telling if the snapshot was cleared or not :(
359357
jmx.clearSnapshot(repairId, keyspace);
360358
} catch (ReaperException e) {

0 commit comments

Comments
 (0)