Skip to content

Commit e717ad8

Browse files
For Cassandra storage minimise the number of tombstones (NULL entries) in the repair_run table.
Remove RepairSegment.repairCommandId as no one was using/storing it. Make explicit which fields in RepairSegment can be null and be nulled. Hide the RepairSegment.Builder constructor. Re-read segments once leader-election is done, ensuring we're not writing back stale data Add Preconditions and asserts in ensuring lifecycle state is correct and null fields only exist as/when appropriate. Preconditions are used when fast and providing fail-fast design. Asserts are used where expensive and/or are secondary checks. ref: - #240 - #244
1 parent 22afbdb commit e717ad8

File tree

9 files changed

+186
-107
lines changed

9 files changed

+186
-107
lines changed

src/server/src/main/java/io/cassandrareaper/core/RepairSegment.java

+33-19
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.math.BigInteger;
2020
import java.util.UUID;
21+
import javax.annotation.Nullable;
2122

23+
import com.google.common.base.Preconditions;
2224
import org.joda.time.DateTime;
2325

2426
public final class RepairSegment {
@@ -30,23 +32,25 @@ public final class RepairSegment {
3032
private final int failCount;
3133
private final State state;
3234
private final String coordinatorHost;
33-
private final Integer repairCommandId; // received when triggering repair in Cassandra
3435
private final DateTime startTime;
3536
private final DateTime endTime;
3637

37-
private RepairSegment(Builder builder, UUID id) {
38+
private RepairSegment(Builder builder, @Nullable UUID id) {
3839
this.id = id;
3940
this.runId = builder.runId;
4041
this.repairUnitId = builder.repairUnitId;
4142
this.tokenRange = builder.tokenRange;
4243
this.failCount = builder.failCount;
4344
this.state = builder.state;
4445
this.coordinatorHost = builder.coordinatorHost;
45-
this.repairCommandId = builder.repairCommandId;
4646
this.startTime = builder.startTime;
4747
this.endTime = builder.endTime;
4848
}
4949

50+
public static Builder builder(RingRange tokenRange, UUID repairUnitId) {
51+
return new Builder(tokenRange, repairUnitId);
52+
}
53+
5054
public UUID getId() {
5155
return id;
5256
}
@@ -79,18 +83,17 @@ public RepairSegment.State getState() {
7983
return state;
8084
}
8185

86+
@Nullable
8287
public String getCoordinatorHost() {
8388
return coordinatorHost;
8489
}
8590

86-
public Integer getRepairCommandId() {
87-
return repairCommandId;
88-
}
89-
91+
@Nullable
9092
public DateTime getStartTime() {
9193
return startTime;
9294
}
9395

96+
@Nullable
9497
public DateTime getEndTime() {
9598
return endTime;
9699
}
@@ -105,19 +108,20 @@ public enum State {
105108
DONE
106109
}
107110

108-
public static class Builder {
111+
public static final class Builder {
109112

110113
public final RingRange tokenRange;
111114
private final UUID repairUnitId;
112115
private UUID runId;
113116
private int failCount;
114117
private State state;
115118
private String coordinatorHost;
116-
private Integer repairCommandId;
117119
private DateTime startTime;
118120
private DateTime endTime;
119121

120-
public Builder(RingRange tokenRange, UUID repairUnitId) {
122+
private Builder(RingRange tokenRange, UUID repairUnitId) {
123+
Preconditions.checkNotNull(tokenRange);
124+
Preconditions.checkNotNull(repairUnitId);
121125
this.repairUnitId = repairUnitId;
122126
this.tokenRange = tokenRange;
123127
this.failCount = 0;
@@ -131,12 +135,12 @@ private Builder(RepairSegment original) {
131135
failCount = original.failCount;
132136
state = original.state;
133137
coordinatorHost = original.coordinatorHost;
134-
repairCommandId = original.repairCommandId;
135138
startTime = original.startTime;
136139
endTime = original.endTime;
137140
}
138141

139142
public Builder withRunId(UUID runId) {
143+
Preconditions.checkNotNull(runId);
140144
this.runId = runId;
141145
return this;
142146
}
@@ -147,32 +151,42 @@ public Builder failCount(int failCount) {
147151
}
148152

149153
public Builder state(State state) {
154+
Preconditions.checkNotNull(state);
150155
this.state = state;
151156
return this;
152157
}
153158

154-
public Builder coordinatorHost(String coordinatorHost) {
159+
public Builder coordinatorHost(@Nullable String coordinatorHost) {
155160
this.coordinatorHost = coordinatorHost;
156161
return this;
157162
}
158163

159-
public Builder repairCommandId(Integer repairCommandId) {
160-
this.repairCommandId = repairCommandId;
161-
return this;
162-
}
164+
public Builder startTime(@Nullable DateTime startTime) {
165+
Preconditions.checkState(
166+
null != startTime || null == endTime,
167+
"unsetting startTime only permitted if endTime unset");
163168

164-
public Builder startTime(DateTime startTime) {
165169
this.startTime = startTime;
166170
return this;
167171
}
168172

169173
public Builder endTime(DateTime endTime) {
174+
Preconditions.checkNotNull(endTime);
170175
this.endTime = endTime;
171176
return this;
172177
}
173178

174-
public RepairSegment build(UUID id) {
175-
return new RepairSegment(this, id);
179+
public RepairSegment build(@Nullable UUID segmentId) {
180+
// a null segmentId is a special case where the storage uses a sequence for it
181+
Preconditions.checkNotNull(runId);
182+
Preconditions.checkState(null != startTime || null == endTime, "if endTime is set, so must startTime be set");
183+
Preconditions.checkState(null == endTime || State.DONE == state, "endTime can only be set if segment is DONE");
184+
185+
Preconditions.checkState(
186+
null != startTime || State.NOT_STARTED == state,
187+
"startTime can only be unset if segment is NOT_STARTED");
188+
189+
return new RepairSegment(this, segmentId);
176190
}
177191
}
178192
}

src/server/src/main/java/io/cassandrareaper/resources/CommonTools.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private static List<RepairSegment.Builder> createRepairSegments(
241241
RepairUnit repairUnit) {
242242

243243
List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
244-
tokenSegments.forEach(range -> repairSegmentBuilders.add(new RepairSegment.Builder(range, repairUnit.getId())));
244+
tokenSegments.forEach(range -> repairSegmentBuilders.add(RepairSegment.builder(range, repairUnit.getId())));
245245
return repairSegmentBuilders;
246246
}
247247

@@ -260,7 +260,7 @@ private static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRep
260260
.forEach(
261261
range
262262
-> repairSegmentBuilders.add(
263-
new RepairSegment.Builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));
263+
RepairSegment.builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));
264264

265265
return repairSegmentBuilders;
266266
}

src/server/src/main/java/io/cassandrareaper/service/RepairManager.java

+23-17
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ public void resumeRunningRepairRuns(AppContext context) throws ReaperException {
113113
}
114114

115115
private void abortSegmentsWithNoLeader(
116-
AppContext context, RepairRun repairRun, Collection<RepairSegment> runningSegments) {
117-
if (context.storage instanceof IDistributedStorage
118-
|| !repairRunners.containsKey(repairRun.getId())) {
116+
AppContext context,
117+
RepairRun repairRun,
118+
Collection<RepairSegment> runningSegments) {
119+
120+
if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) {
119121
// When multiple Reapers are in use, we can get stuck segments when one instance is rebooted
120122
// Any segment in RUNNING state but with no leader should be killed
121123
List<UUID> activeLeaders =
@@ -141,20 +143,24 @@ public void abortSegments(
141143
for (RepairSegment segment : runningSegments) {
142144
UUID leaderElectionId = repairUnit.getIncrementalRepair() ? repairRun.getId() : segment.getId();
143145
if (takeLead(context, leaderElectionId) || renewLead(context, leaderElectionId)) {
144-
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(
145-
segment.getCoordinatorHost(), context.config.getJmxConnectionTimeoutInSeconds())) {
146-
147-
SegmentRunner.abort(context, segment, jmxProxy);
148-
} catch (ReaperException e) {
149-
LOG.debug(
150-
"Tried to abort repair on segment {} marked as RUNNING, "
151-
+ "but the host was down (so abortion won't be needed)",
152-
segment.getId(),
153-
e);
154-
} finally {
155-
// if someone else does hold the lease, ie renewLead(..) was true,
156-
// then their writes to repair_run table and any call to releaseLead(..) will throw an exception
157-
releaseLead(context, leaderElectionId);
146+
// refresh segment once we're inside leader-election
147+
segment = context.storage.getRepairSegment(repairRun.getId(), segment.getId()).get();
148+
if (RepairSegment.State.RUNNING == segment.getState()) {
149+
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(
150+
segment.getCoordinatorHost(), context.config.getJmxConnectionTimeoutInSeconds())) {
151+
152+
SegmentRunner.abort(context, segment, jmxProxy);
153+
} catch (ReaperException e) {
154+
LOG.debug(
155+
"Tried to abort repair on segment {} marked as RUNNING, "
156+
+ "but the host was down (so abortion won't be needed)",
157+
segment.getId(),
158+
e);
159+
} finally {
160+
// if someone else does hold the lease, ie renewLead(..) was true,
161+
// then their writes to repair_run table and any call to releaseLead(..) will throw an exception
162+
releaseLead(context, leaderElectionId);
163+
}
158164
}
159165
}
160166
}

src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ public SegmentRunner(
129129

130130
@Override
131131
public void run() {
132-
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
133-
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);
134132
if (takeLead()) {
135133
try {
136134
if (runRepair()) {
@@ -147,7 +145,7 @@ public void run() {
147145
}
148146
}
149147

150-
public static void postpone(AppContext context, RepairSegment segment, Optional<RepairUnit> repairUnit) {
148+
private static void postpone(AppContext context, RepairSegment segment, Optional<RepairUnit> repairUnit) {
151149
LOG.info("Postponing segment {}", segment.getId());
152150
try {
153151
context.storage.updateRepairSegment(
@@ -158,7 +156,6 @@ public static void postpone(AppContext context, RepairSegment segment, Optional<
158156
repairUnit.isPresent() && repairUnit.get().getIncrementalRepair()
159157
? segment.getCoordinatorHost()
160158
: null) // set coordinator host to null only for full repairs
161-
.repairCommandId(null)
162159
.startTime(null)
163160
.failCount(segment.getFailCount() + 1)
164161
.build(segment.getId()));
@@ -211,6 +208,7 @@ private static long getOpenFilesAmount() {
211208
private boolean runRepair() {
212209
LOG.debug("Run repair for segment #{}", segmentId);
213210
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
211+
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);
214212

215213
try (Timer.Context cxt = context.metricRegistry.timer(metricNameForRunRepair(segment)).time();
216214
JmxProxy coordinator = context.jmxConnectionFactory.connectAny(
@@ -292,8 +290,7 @@ protected Set<String> initialize() {
292290
}
293291

294292
long timeout = repairUnit.getIncrementalRepair() ? timeoutMillis * MAX_TIMEOUT_EXTENSIONS : timeoutMillis;
295-
context.storage.updateRepairSegment(
296-
segment.with().coordinatorHost(coordinator.getHost()).repairCommandId(commandId).build(segmentId));
293+
context.storage.updateRepairSegment(segment.with().coordinatorHost(coordinator.getHost()).build(segmentId));
297294
String eventMsg
298295
= String.format("Triggered repair of segment %s via host %s", segment.getId(), coordinator.getHost());
299296

0 commit comments

Comments
 (0)