Skip to content

Commit f337e65

Browse files
committed
Fix for Cassandra 2.2 full repair bug
Fix for C* 2.1 when the dc collection and host collection are empty
1 parent aca6ee0 commit f337e65

File tree

3 files changed

+54
-51
lines changed

3 files changed

+54
-51
lines changed

pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
<name>Cassandra Reaper</name>
77
<groupId>com.spotify</groupId>
88
<artifactId>cassandra-reaper</artifactId>
9-
<version>0.3.1-SNAPSHOT</version>
9+
<version>0.3.2-SNAPSHOT</version>
1010
<packaging>jar</packaging>
1111

1212
<properties>
1313
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1414
<dropwizard.version>0.7.1</dropwizard.version>
1515
<dropwizard.cassandra.version>4.0.0</dropwizard.cassandra.version>
16-
<cassandra.version>2.1.16</cassandra.version>
17-
<cucumber.version>1.1.5</cucumber.version>
18-
</properties>
16+
<cassandra.version>2.2.7</cassandra.version>
17+
<cucumber.version>1.1.5</cucumber.version>
18+
</properties>
1919

2020
<dependencies>
2121
<!--compile scope -->

src/main/java/com/spotify/reaper/cassandra/JmxProxy.java

+47-44
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,10 @@ public String getCassandraVersion(){
353353
* For time being, we don't allow local nor snapshot repairs.
354354
*
355355
* @return Repair command number, or 0 if nothing to repair
356+
* @throws ReaperException
356357
*/
357358
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
358-
RepairParallelism repairParallelism, Collection<String> columnFamilies, boolean fullRepair) {
359+
RepairParallelism repairParallelism, Collection<String> columnFamilies, boolean fullRepair) throws ReaperException {
359360
checkNotNull(ssProxy, "Looks like the proxy is not connected");
360361
String cassandraVersion = getCassandraVersion();
361362
boolean canUseDatacenterAware = false;
@@ -373,49 +374,51 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
373374
columnFamilies);
374375
LOG.info(msg);
375376

376-
if(!cassandraVersion.startsWith("2.0")){
377-
if(fullRepair) {
378-
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
379-
if (canUseDatacenterAware) {
380-
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
381-
repairParallelism.ordinal(), null, null, fullRepair,
382-
columnFamilies
383-
.toArray(new String[columnFamilies.size()]));
384-
} else {
385-
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
386-
+ " falling back to SEQUENTIAL repair.",
387-
cassandraVersion);
388-
repairParallelism = RepairParallelism.SEQUENTIAL;
389-
}
390-
}
391-
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
392-
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
393-
snapshotRepair, false, fullRepair,
394-
columnFamilies.toArray(new String[columnFamilies.size()]));
395-
}
396-
else {
397-
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
398-
}
399-
}
400-
else {
401-
// Cassandra 2.0 compatibility
402-
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
403-
if (canUseDatacenterAware) {
404-
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
405-
repairParallelism.ordinal(), null, null,
406-
columnFamilies
407-
.toArray(new String[columnFamilies.size()]));
408-
} else {
409-
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
410-
+ " falling back to SEQUENTIAL repair.",
411-
cassandraVersion);
412-
repairParallelism = RepairParallelism.SEQUENTIAL;
413-
}
414-
}
415-
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
416-
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
417-
snapshotRepair, false,
418-
columnFamilies.toArray(new String[columnFamilies.size()]));
377+
try {
378+
if (!cassandraVersion.startsWith("2.0")) {
379+
if (fullRepair) {
380+
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
381+
if (canUseDatacenterAware) {
382+
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
383+
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
384+
columnFamilies.toArray(new String[columnFamilies.size()]));
385+
} else {
386+
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
387+
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
388+
repairParallelism = RepairParallelism.SEQUENTIAL;
389+
}
390+
}
391+
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
392+
393+
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
394+
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
395+
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
396+
columnFamilies.toArray(new String[columnFamilies.size()]));
397+
398+
} else {
399+
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
400+
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
401+
}
402+
} else {
403+
// Cassandra 2.0 compatibility
404+
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
405+
if (canUseDatacenterAware) {
406+
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
407+
keyspace, repairParallelism.ordinal(), null, null,
408+
columnFamilies.toArray(new String[columnFamilies.size()]));
409+
} else {
410+
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
411+
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
412+
repairParallelism = RepairParallelism.SEQUENTIAL;
413+
}
414+
}
415+
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
416+
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
417+
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));
418+
}
419+
} catch (Exception e) {
420+
LOG.error("Segment repair failed", e);
421+
throw new ReaperException(e);
419422
}
420423

421424
}

src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void timeoutTest() throws InterruptedException, ReaperException, Executio
8282

8383
context.jmxConnectionFactory = new JmxConnectionFactory() {
8484
@Override
85-
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) {
85+
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) throws ReaperException {
8686
JmxProxy jmx = mock(JmxProxy.class);
8787
when(jmx.getClusterName()).thenReturn("reaper");
8888
when(jmx.isConnectionAlive()).thenReturn(true);
@@ -145,7 +145,7 @@ public void successTest() throws InterruptedException, ReaperException, Executio
145145
context.storage = storage;
146146
context.jmxConnectionFactory = new JmxConnectionFactory() {
147147
@Override
148-
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) {
148+
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) throws ReaperException {
149149
JmxProxy jmx = mock(JmxProxy.class);
150150
when(jmx.getClusterName()).thenReturn("reaper");
151151
when(jmx.isConnectionAlive()).thenReturn(true);
@@ -219,7 +219,7 @@ public void failureTest() throws InterruptedException, ReaperException, Executio
219219
context.storage = storage;
220220
context.jmxConnectionFactory = new JmxConnectionFactory() {
221221
@Override
222-
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) {
222+
public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host) throws ReaperException {
223223
JmxProxy jmx = mock(JmxProxy.class);
224224
when(jmx.getClusterName()).thenReturn("reaper");
225225
when(jmx.isConnectionAlive()).thenReturn(true);

0 commit comments

Comments
 (0)