@@ -562,7 +562,8 @@ public String getCassandraVersion(){
562
562
* @throws ReaperException
563
563
*/
564
564
public int triggerRepair (BigInteger beginToken , BigInteger endToken , String keyspace ,
565
- RepairParallelism repairParallelism , Collection <String > columnFamilies , boolean fullRepair ) throws ReaperException {
565
+ RepairParallelism repairParallelism , Collection <String > columnFamilies , boolean fullRepair ,
566
+ Collection <String > datacenters ) throws ReaperException {
566
567
checkNotNull (ssProxy , "Looks like the proxy is not connected" );
567
568
String cassandraVersion = getCassandraVersion ();
568
569
boolean canUseDatacenterAware = false ;
@@ -586,11 +587,13 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
586
587
}
587
588
try {
588
589
if (cassandraVersion .startsWith ("2.0" ) || cassandraVersion .startsWith ("1." )) {
589
- return triggerRepairPre2dot1 (repairParallelism , keyspace , columnFamilies , beginToken , endToken );
590
+ return triggerRepairPre2dot1 (repairParallelism , keyspace , columnFamilies , beginToken , endToken , datacenters . size () > 0 ? datacenters : null );
590
591
} else if (cassandraVersion .startsWith ("2.1" )){
591
- return triggerRepair2dot1 (fullRepair , repairParallelism , keyspace , columnFamilies , beginToken , endToken , cassandraVersion );
592
+ return triggerRepair2dot1 (fullRepair , repairParallelism , keyspace , columnFamilies , beginToken , endToken ,
593
+ cassandraVersion , datacenters .size () > 0 ? datacenters : null );
592
594
} else {
593
- return triggerRepairPost2dot2 (fullRepair , repairParallelism , keyspace , columnFamilies , beginToken , endToken , cassandraVersion );
595
+ return triggerRepairPost2dot2 (fullRepair , repairParallelism , keyspace , columnFamilies , beginToken , endToken ,
596
+ cassandraVersion , datacenters );
594
597
}
595
598
} catch (Exception e ) {
596
599
LOG .error ("Segment repair failed" , e );
@@ -599,7 +602,9 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
599
602
}
600
603
601
604
602
- public int triggerRepairPost2dot2 (boolean fullRepair , RepairParallelism repairParallelism , String keyspace , Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken , String cassandraVersion ) {
605
+ public int triggerRepairPost2dot2 (boolean fullRepair , RepairParallelism repairParallelism , String keyspace ,
606
+ Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken , String cassandraVersion ,
607
+ Collection <String > datacenters ) {
603
608
Map <String , String > options = new HashMap <>();
604
609
605
610
options .put (RepairOption .PARALLELISM_KEY , repairParallelism .getName ());
@@ -613,26 +618,29 @@ public int triggerRepairPost2dot2(boolean fullRepair, RepairParallelism repairPa
613
618
options .put (RepairOption .RANGES_KEY , beginToken .toString () + ":" + endToken .toString ());
614
619
}
615
620
616
- // options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(specificDataCenters , ","));
621
+ options .put (RepairOption .DATACENTERS_KEY , StringUtils .join (datacenters , "," ));
617
622
//options.put(RepairOption.HOSTS_KEY, StringUtils.join(specificHosts, ","));
618
623
619
624
return ((StorageServiceMBean ) ssProxy ).repairAsync (keyspace , options );
620
625
}
621
626
622
- public int triggerRepair2dot1 (boolean fullRepair , RepairParallelism repairParallelism , String keyspace , Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken , String cassandraVersion ) {
627
+ public int triggerRepair2dot1 (boolean fullRepair , RepairParallelism repairParallelism , String keyspace ,
628
+ Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken , String cassandraVersion ,
629
+ Collection <String > datacenters ) {
623
630
if (fullRepair ) {
624
631
// full repair
625
632
if (repairParallelism .equals (RepairParallelism .DATACENTER_AWARE )) {
626
633
return ((StorageServiceMBean ) ssProxy ).forceRepairRangeAsync (beginToken .toString (), endToken .toString (),
627
- keyspace , repairParallelism .ordinal (), cassandraVersion .startsWith ("2.2" )?new HashSet <String >():null , cassandraVersion .startsWith ("2.2" )?new HashSet <String >():null , fullRepair ,
634
+ keyspace , repairParallelism .ordinal (), datacenters ,
635
+ cassandraVersion .startsWith ("2.2" ) ? new HashSet <String >() : null , fullRepair ,
628
636
columnFamilies .toArray (new String [columnFamilies .size ()]));
629
637
}
630
638
631
639
boolean snapshotRepair = repairParallelism .equals (RepairParallelism .SEQUENTIAL );
632
640
633
641
return ((StorageServiceMBean ) ssProxy ).forceRepairRangeAsync (beginToken .toString (), endToken .toString (),
634
642
keyspace , snapshotRepair ? RepairParallelism .SEQUENTIAL .ordinal () : RepairParallelism .PARALLEL .ordinal (),
635
- cassandraVersion . startsWith ( "2.2" )? new HashSet < String >(): null , cassandraVersion .startsWith ("2.2" )? new HashSet <String >(): null , fullRepair ,
643
+ datacenters , cassandraVersion .startsWith ("2.2" ) ? new HashSet <String >() : null , fullRepair ,
636
644
columnFamilies .toArray (new String [columnFamilies .size ()]));
637
645
638
646
}
@@ -642,11 +650,12 @@ public int triggerRepair2dot1(boolean fullRepair, RepairParallelism repairParall
642
650
fullRepair , columnFamilies .toArray (new String [columnFamilies .size ()]));
643
651
}
644
652
645
- public int triggerRepairPre2dot1 (RepairParallelism repairParallelism , String keyspace , Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken ) {
653
+ public int triggerRepairPre2dot1 (RepairParallelism repairParallelism , String keyspace ,
654
+ Collection <String > columnFamilies , BigInteger beginToken , BigInteger endToken , Collection <String > datacenters ) {
646
655
// Cassandra 1.2 and 2.0 compatibility
647
656
if (repairParallelism .equals (RepairParallelism .DATACENTER_AWARE )) {
648
657
return ((StorageServiceMBean20 ) ssProxy ).forceRepairRangeAsync (beginToken .toString (), endToken .toString (),
649
- keyspace , repairParallelism .ordinal (), null , null ,
658
+ keyspace , repairParallelism .ordinal (), datacenters , null ,
650
659
columnFamilies .toArray (new String [columnFamilies .size ()]));
651
660
}
652
661
boolean snapshotRepair = repairParallelism .equals (RepairParallelism .SEQUENTIAL );
@@ -855,8 +864,8 @@ private static RMIClientSocketFactory getRMIClientSocketFactory() {
855
864
class ColumnFamilyStoreMBeanIterator
856
865
implements Iterator <Map .Entry <String , ColumnFamilyStoreMBean >> {
857
866
858
- private Iterator <ObjectName > resIter ;
859
- private MBeanServerConnection mbeanServerConn ;
867
+ private final Iterator <ObjectName > resIter ;
868
+ private final MBeanServerConnection mbeanServerConn ;
860
869
861
870
public ColumnFamilyStoreMBeanIterator (MBeanServerConnection mbeanServerConn )
862
871
throws MalformedObjectNameException , NullPointerException , IOException {
0 commit comments