Skip to content

Commit 143e457

Browse files
committed
A new type RingRange represents token ranges
1 parent 1f5b7c2 commit 143e457

File tree

10 files changed

+126
-89
lines changed

10 files changed

+126
-89
lines changed

src/main/java/com/spotify/reaper/core/RepairSegment.java

+17-33
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package com.spotify.reaper.core;
1515

16+
import com.spotify.reaper.service.RingRange;
17+
1618
import org.joda.time.DateTime;
1719

1820
import java.math.BigInteger;
@@ -23,8 +25,7 @@ public class RepairSegment {
2325
private final Integer repairCommandId; // received when triggering repair in Cassandra
2426
private final long columnFamilyId;
2527
private final long runId;
26-
private final BigInteger startToken; // open
27-
private final BigInteger endToken; // closed
28+
private final RingRange tokenRange;
2829
private final State state;
2930
private final DateTime startTime;
3031
private final DateTime endTime;
@@ -45,12 +46,8 @@ public long getRunId() {
4546
return runId;
4647
}
4748

48-
public BigInteger getStartToken() {
49-
return startToken;
50-
}
51-
52-
public BigInteger getEndToken() {
53-
return endToken;
49+
public RingRange getTokenRange() {
50+
return tokenRange;
5451
}
5552

5653
public State getState() {
@@ -68,12 +65,11 @@ public DateTime getEndTime() {
6865
public static RepairSegment getCopy(RepairSegment origSegment, State newState,
6966
int newRepairCommandId,
7067
DateTime newStartTime, DateTime newEndTime) {
71-
return new Builder(origSegment.getStartToken(),
72-
origSegment.getEndToken(), newState)
68+
return new Builder(origSegment.getRunId(), origSegment.getTokenRange(), newState)
7369
.columnFamilyId(origSegment.getColumnFamilyId())
7470
.repairCommandId(newRepairCommandId)
7571
.startTime(newStartTime)
76-
.endTime(newEndTime).build(origSegment.getRunId(), origSegment.getId());
72+
.endTime(newEndTime).build(origSegment.getId());
7773
}
7874

7975
public enum State {
@@ -83,31 +79,30 @@ public enum State {
8379
DONE
8480
}
8581

86-
private RepairSegment(Builder builder, long runId, long id) {
82+
private RepairSegment(Builder builder,long id) {
8783
this.id = id;
8884
this.repairCommandId = builder.repairCommandId;
8985
this.columnFamilyId = builder.columnFamilyId;
90-
this.runId = runId;
91-
this.startToken = builder.startToken;
92-
this.endToken = builder.endToken;
86+
this.runId = builder.runId;
87+
this.tokenRange = builder.tokenRange;
9388
this.state = builder.state;
9489
this.startTime = builder.startTime;
9590
this.endTime = builder.endTime;
9691
}
9792

9893
public static class Builder {
9994

100-
public final BigInteger startToken;
101-
public final BigInteger endToken;
95+
public final long runId;
96+
public final RingRange tokenRange;
10297
public final State state;
10398
private long columnFamilyId;
10499
private int repairCommandId;
105100
private DateTime startTime;
106101
private DateTime endTime;
107102

108-
public Builder(BigInteger startToken, BigInteger endToken, State state) {
109-
this.startToken = startToken;
110-
this.endToken = endToken;
103+
public Builder(long runId, RingRange tokenRange, State state) {
104+
this.runId = runId;
105+
this.tokenRange = tokenRange;
111106
this.state = state;
112107
}
113108

@@ -131,19 +126,8 @@ public Builder endTime(DateTime endTime) {
131126
return this;
132127
}
133128

134-
public RepairSegment build(long runId, long id) {
135-
return new RepairSegment(this, runId, id);
129+
public RepairSegment build(long id) {
130+
return new RepairSegment(this, id);
136131
}
137-
138-
@Override
139-
public String toString() {
140-
return String.format("(%s,%s]", startToken.toString(), endToken.toString());
141-
}
142-
}
143-
144-
public String toString() {
145-
return String.format("(%s,%s]",
146-
startToken.toString(),
147-
endToken.toString());
148132
}
149133
}

src/main/java/com/spotify/reaper/resources/TableResource.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.spotify.reaper.resources;
1515

1616
import com.google.common.base.Optional;
17+
import com.google.common.collect.Lists;
1718

1819
import com.spotify.reaper.ReaperApplicationConfiguration;
1920
import com.spotify.reaper.ReaperException;
@@ -23,6 +24,7 @@
2324
import com.spotify.reaper.core.RepairRun;
2425
import com.spotify.reaper.core.RepairSegment;
2526
import com.spotify.reaper.service.RepairRunner;
27+
import com.spotify.reaper.service.RingRange;
2628
import com.spotify.reaper.service.SegmentGenerator;
2729
import com.spotify.reaper.storage.IStorage;
2830

@@ -161,7 +163,7 @@ public Response addTable(@Context UriInfo uriInfo,
161163
}
162164

163165
// create segments
164-
List<RepairSegment.Builder> segments = null;
166+
List<RingRange> segments = null;
165167
String usedSeedHost = null;
166168
try {
167169
SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner());
@@ -170,9 +172,7 @@ public Response addTable(@Context UriInfo uriInfo,
170172
try {
171173
JmxProxy jmxProxy = JmxProxy.connect(host);
172174
List<BigInteger> tokens = jmxProxy.getTokens();
173-
segments = sg.generateSegments(existingTable.getSegmentCount(),
174-
tokens,
175-
existingTable);
175+
segments = sg.generateSegments(existingTable.getSegmentCount(), tokens);
176176
jmxProxy.close();
177177
usedSeedHost = host;
178178
break;
@@ -208,7 +208,14 @@ public Response addTable(@Context UriInfo uriInfo,
208208
// Notice that our RepairRun core object doesn't contain pointer to
209209
// the set of RepairSegments in the run, as they are accessed separately.
210210
// RepairSegment has a pointer to the RepairRun it lives in.
211-
storage.addRepairSegments(newRepairRun.getId(), segments);
211+
List<RepairSegment.Builder> repairSegments = Lists.newArrayList();
212+
for (RingRange range : segments) {
213+
repairSegments
214+
.add(new RepairSegment.Builder(newRepairRun.getId(), range,
215+
RepairSegment.State.NOT_STARTED)
216+
.columnFamilyId(existingTable.getId()));
217+
}
218+
storage.addRepairSegments(repairSegments);
212219

213220
RepairRunner.startNewRepairRun(storage, newRepairRun, usedSeedHost);
214221

src/main/java/com/spotify/reaper/service/RepairRunner.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public static void startNewRepairRun(IStorage storage, RepairRun repairRun,
8484
*/
8585
@Override
8686
public void run() {
87-
LOG.debug("RepairRunner run on RepairRun \"{}\" with start token \"{}\"",
88-
repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getStartToken());
87+
LOG.debug("RepairRunner run on RepairRun \"{}\" with token range \"{}\"",
88+
repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getTokenRange());
8989

9090
if (!checkJmxProxyInitialized()) {
9191
LOG.error("failed to initialize JMX proxy, retrying after {} seconds",
@@ -181,8 +181,8 @@ private void checkIfNeedToStartNextSegment() {
181181
return;
182182
} else if (currentSegment.getState() == RepairSegment.State.NOT_STARTED &&
183183
DateTime.now().isAfter(startNextSegmentEarliest)) {
184-
LOG.info("triggering repair on segment {} with start token {} on run id {}",
185-
currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId());
184+
LOG.info("triggering repair on segment #{} with token range {} on run id {}",
185+
currentSegment.getId(), currentSegment.getTokenRange(), repairRun.getId());
186186
newRepairCommandId = triggerRepair(currentSegment);
187187
} else if (currentSegment.getState() == RepairSegment.State.DONE) {
188188
LOG.warn("segment {} repair completed for run {}",
@@ -223,8 +223,9 @@ private void checkIfNeedToStartNextSegment() {
223223

224224
private int triggerRepair(RepairSegment segment) {
225225
ColumnFamily columnFamily = this.storage.getColumnFamily(segment.getColumnFamilyId());
226-
return this.jmxProxy.triggerRepair(segment.getStartToken(), segment.getEndToken(),
227-
columnFamily.getKeyspaceName(), columnFamily.getName());
226+
return this.jmxProxy
227+
.triggerRepair(segment.getTokenRange().getStart(), segment.getTokenRange().getEnd(),
228+
columnFamily.getKeyspaceName(), columnFamily.getName());
228229
}
229230

230231
private void changeCurrentRepairRunState(RepairRun.RunState newRunState) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.spotify.reaper.service;
15+
16+
import java.math.BigInteger;
17+
18+
public class RingRange {
19+
private final BigInteger start;
20+
private final BigInteger end;
21+
22+
public RingRange(BigInteger start, BigInteger end) {
23+
this.start = start;
24+
this.end = end;
25+
}
26+
27+
public BigInteger getStart() {
28+
return start;
29+
}
30+
31+
public BigInteger getEnd() {
32+
return end;
33+
}
34+
35+
public BigInteger span(BigInteger ringSize) {
36+
if (SegmentGenerator.greaterThanOrEqual(start, end)) {
37+
return end.subtract(start).add(ringSize);
38+
} else {
39+
return end.subtract(start);
40+
}
41+
}
42+
43+
public boolean encloses(RingRange other) {
44+
// TODO: unit test for this
45+
if (SegmentGenerator.lowerThanOrEqual(start, end)) {
46+
return SegmentGenerator.greaterThanOrEqual(other.start, start) &&
47+
SegmentGenerator.lowerThanOrEqual(other.end, end);
48+
} else if (SegmentGenerator.lowerThanOrEqual(other.start, other.end)) {
49+
return SegmentGenerator.greaterThanOrEqual(other.start, start) ||
50+
SegmentGenerator.lowerThanOrEqual(other.end, end);
51+
} else {
52+
return SegmentGenerator.greaterThanOrEqual(other.start, start) &&
53+
SegmentGenerator.lowerThanOrEqual(other.end, end);
54+
}
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return String.format("(%s,%s]", start.toString(), end.toString());
60+
}
61+
}

src/main/java/com/spotify/reaper/service/SegmentGenerator.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class SegmentGenerator {
3333
private static final Logger LOG = LoggerFactory.getLogger(SegmentGenerator.class);
3434

3535
private final String partitioner;
36-
private final BigInteger MIN_SEGMENT_SIZE = new BigInteger("100");
3736
private BigInteger RANGE_MIN;
3837
private BigInteger RANGE_MAX;
3938
private BigInteger RANGE_SIZE;
@@ -61,13 +60,12 @@ public SegmentGenerator(String partitioner) throws ReaperException {
6160
* @param ringTokens list of all start tokens in a cluster. They have to be in ring order.
6261
* @return a list containing at least {@code totalSegmentCount} repair segments.
6362
*/
64-
public List<RepairSegment.Builder> generateSegments(int totalSegmentCount,
65-
List<BigInteger> ringTokens,
66-
ColumnFamily table)
63+
public List<RingRange> generateSegments(int totalSegmentCount,
64+
List<BigInteger> ringTokens)
6765
throws ReaperException {
6866
int tokenRangeCount = ringTokens.size();
6967

70-
List<RepairSegment.Builder> repairSegments = Lists.newArrayList();
68+
List<RingRange> repairSegments = Lists.newArrayList();
7169
for (int i = 0; i < tokenRangeCount; i++) {
7270
BigInteger start = ringTokens.get(i);
7371
BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount);
@@ -111,19 +109,16 @@ public List<RepairSegment.Builder> generateSegments(int totalSegmentCount,
111109

112110
// Append the segments between the endpoints
113111
for (int j = 0; j < segmentCount; j++) {
114-
repairSegments.add(new RepairSegment.Builder(endpointTokens.get(j),
115-
endpointTokens.get(j + 1),
116-
RepairSegment.State.NOT_STARTED)
117-
.columnFamilyId(table.getId()));
112+
repairSegments.add(new RingRange(endpointTokens.get(j), endpointTokens.get(j + 1)));
118113
LOG.debug("Segment #{}: [{},{})", j + 1, endpointTokens.get(j),
119114
endpointTokens.get(j + 1));
120115
}
121116
}
122117

123118
// verify that the whole range is repaired
124119
BigInteger total = BigInteger.ZERO;
125-
for (RepairSegment.Builder segment : repairSegments) {
126-
BigInteger size = segment.endToken.subtract(segment.startToken);
120+
for (RingRange segment : repairSegments) {
121+
BigInteger size = segment.span(RANGE_SIZE);
127122
if (lowerThan(size, BigInteger.ZERO))
128123
size = size.add(RANGE_SIZE);
129124
total = total.add(size);

src/main/java/com/spotify/reaper/storage/IStorage.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.spotify.reaper.core.ColumnFamily;
1818
import com.spotify.reaper.core.RepairRun;
1919
import com.spotify.reaper.core.RepairSegment;
20+
import com.spotify.reaper.service.RingRange;
2021

2122
import java.math.BigInteger;
2223
import java.util.Collection;
@@ -48,15 +49,15 @@ public interface IStorage {
4849

4950
ColumnFamily getColumnFamily(String cluster, String keyspace, String table);
5051

51-
int addRepairSegments(long runId, Collection<RepairSegment.Builder> newSegments);
52+
int addRepairSegments(Collection<RepairSegment.Builder> newSegments);
5253

5354
boolean updateRepairSegment(RepairSegment newRepairSegment);
5455

5556
RepairSegment getRepairSegment(long id);
5657

5758
RepairSegment getNextFreeSegment(long runId);
5859

59-
RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end);
60+
RepairSegment getNextFreeSegmentInRange(long runId, RingRange range);
6061

6162

6263
}

src/main/java/com/spotify/reaper/storage/MemoryStorage.java

+5-20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.spotify.reaper.core.ColumnFamily;
2020
import com.spotify.reaper.core.RepairRun;
2121
import com.spotify.reaper.core.RepairSegment;
22+
import com.spotify.reaper.service.RingRange;
2223
import com.spotify.reaper.service.SegmentGenerator;
2324

2425
import java.math.BigInteger;
@@ -157,10 +158,10 @@ public ColumnFamily getColumnFamily(String cluster, String keyspace, String tabl
157158
}
158159

159160
@Override
160-
public int addRepairSegments(long runId, Collection<RepairSegment.Builder> segments) {
161+
public int addRepairSegments(Collection<RepairSegment.Builder> segments) {
161162
LinkedHashMap<Long, RepairSegment> newSegments = Maps.newLinkedHashMap();
162163
for (RepairSegment.Builder segment : segments) {
163-
RepairSegment newRepairSegment = segment.build(runId, SEGMENT_ID.incrementAndGet());
164+
RepairSegment newRepairSegment = segment.build(SEGMENT_ID.incrementAndGet());
164165
repairSegments.put(newRepairSegment.getId(), newRepairSegment);
165166
newSegments.put(newRepairSegment.getId(), newRepairSegment);
166167
}
@@ -196,27 +197,11 @@ public RepairSegment getNextFreeSegment(long runId) {
196197
return null;
197198
}
198199

199-
200-
public static boolean encloses(BigInteger rangeStart, BigInteger rangeEnd,
201-
BigInteger segmentStart, BigInteger segmentEnd) {
202-
// TODO: unit test for this
203-
if (SegmentGenerator.lowerThanOrEqual(rangeStart, rangeEnd)) {
204-
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
205-
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
206-
} else if (SegmentGenerator.lowerThanOrEqual(segmentStart, segmentEnd)) {
207-
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) ||
208-
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
209-
} else {
210-
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
211-
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
212-
}
213-
}
214-
215200
@Override
216-
public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) {
201+
public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) {
217202
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
218203
if (segment.getState() == RepairSegment.State.NOT_STARTED &&
219-
encloses(start, end, segment.getStartToken(), segment.getEndToken())) {
204+
range.encloses(segment.getTokenRange())) {
220205
return segment;
221206
}
222207
}

0 commit comments

Comments
 (0)