Skip to content

Commit 1253f03

Browse files
Refactor the IStorage.getRepairUnit(UUID) method to return the RepairUnit instead of an Optional.
The RepairUnit is entirely an internal construct, a relationship and constraint against the RepairRun and Schedule tables. The RepairUnit UUID is only used as a reference from either of these other tables, so it makes little sense to wrap the value in an Optional. Furthermore the majority of the calling code did not deal with (or appropriately handle) an absent value. In the Cassandra storage backend the read and write statements have been degraded from CL.ONE to CL.QUORUM accordingly. This table is small and already has row cache enabled, so this should be of small consequence.
1 parent a4626dd commit 1253f03

14 files changed

+91
-147
lines changed

src/server/src/main/java/io/cassandrareaper/resources/RepairRunResource.java

+9-32
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import javax.ws.rs.core.UriInfo;
5555

5656
import com.google.common.base.Optional;
57-
import com.google.common.base.Preconditions;
5857
import com.google.common.collect.Lists;
5958
import com.google.common.collect.Sets;
6059
import org.apache.cassandra.repair.RepairParallelism;
@@ -374,15 +373,8 @@ public Response modifyRunState(
374373
if (!repairRun.isPresent()) {
375374
return Response.status(Status.NOT_FOUND).entity("repair run " + repairRunId + " doesn't exist").build();
376375
}
377-
final RepairRun.RunState newState = parseRunState(stateStr.get());
378-
379-
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
380-
if (!repairUnit.isPresent()) {
381-
String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found";
382-
LOG.error(errMsg);
383-
return Response.status(Status.CONFLICT).entity(errMsg).build();
384-
}
385376

377+
final RepairRun.RunState newState = parseRunState(stateStr.get());
386378
if (isUnitAlreadyRepairing(repairRun.get())) {
387379
String errMsg = "repair unit already has run " + repairRun.get().getRepairUnitId() + " in RUNNING state";
388380
LOG.error(errMsg);
@@ -439,13 +431,6 @@ public Response modifyRunIntensity(
439431
return Response.status(Status.NOT_FOUND).entity("repair run " + repairRunId + " doesn't exist").build();
440432
}
441433

442-
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
443-
if (!repairUnit.isPresent()) {
444-
String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found";
445-
LOG.error(errMsg);
446-
return Response.status(Response.Status.NOT_FOUND).entity(errMsg).build();
447-
}
448-
449434
if (RunState.PAUSED != repairRun.get().getRunState() && RunState.NOT_STARTED != repairRun.get().getRunState()) {
450435
return Response.status(Status.CONFLICT).entity("repair run must first be paused").build();
451436
}
@@ -633,10 +618,9 @@ public Response getRepairRunsForCluster(
633618
* @return only a status of a repair run, not the entire repair run info.
634619
*/
635620
private RepairRunStatus getRepairRunStatus(RepairRun repairRun) {
636-
final Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId());
637-
Preconditions.checkState(repairUnit.isPresent(), "no repair unit found with id: %s", repairRun.getRepairUnitId());
638-
final int segmentsRepaired = getSegmentAmountForRepairRun(repairRun.getId());
639-
return new RepairRunStatus(repairRun, repairUnit.get(), segmentsRepaired);
621+
RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId());
622+
int segmentsRepaired = getSegmentAmountForRepairRun(repairRun.getId());
623+
return new RepairRunStatus(repairRun, repairUnit, segmentsRepaired);
640624
}
641625

642626
/**
@@ -701,19 +685,12 @@ private List<RepairRunStatus> getRunStatuses(
701685
if (!desiredStates.isEmpty() && !desiredStates.contains(run.getRunState().name())) {
702686
continue;
703687
}
704-
final Optional<RepairUnit> runsUnit = context.storage.getRepairUnit(run.getRepairUnitId());
705-
if (runsUnit.isPresent()) {
706-
int segmentsRepaired = run.getSegmentCount();
707-
if (!run.getRunState().equals(RepairRun.RunState.DONE)) {
708-
segmentsRepaired = getSegmentAmountForRepairRun(run.getId());
709-
}
710-
711-
runStatuses.add(new RepairRunStatus(run, runsUnit.get(), segmentsRepaired));
712-
} else {
713-
final String errMsg = String.format("Found repair run %s with no associated repair unit", run.getId());
714-
LOG.error(errMsg);
715-
throw new ReaperException("Internal server error : " + errMsg);
688+
RepairUnit runsUnit = context.storage.getRepairUnit(run.getRepairUnitId());
689+
int segmentsRepaired = run.getSegmentCount();
690+
if (!run.getRunState().equals(RepairRun.RunState.DONE)) {
691+
segmentsRepaired = getSegmentAmountForRepairRun(run.getId());
716692
}
693+
runStatuses.add(new RepairRunStatus(run, runsUnit, segmentsRepaired));
717694
}
718695

719696
return runStatuses;

src/server/src/main/java/io/cassandrareaper/resources/RepairScheduleResource.java

+7-22
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,7 @@ public Response modifyState(
321321
.build();
322322
}
323323

324-
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairSchedule.get().getRepairUnitId());
325-
if (!repairUnit.isPresent()) {
326-
String errMsg = "repair unit with id " + repairSchedule.get().getRepairUnitId() + " not found";
327-
LOG.error(errMsg);
328-
return Response.status(Response.Status.NOT_FOUND).entity(errMsg).build();
329-
}
330-
324+
RepairUnit repairUnit = context.storage.getRepairUnit(repairSchedule.get().getRepairUnitId());
331325
RepairSchedule.State newState;
332326
try {
333327
newState = RepairSchedule.State.valueOf(state.get().toUpperCase());
@@ -410,10 +404,8 @@ public Response getRepairSchedulesForCluster(
410404
* @return RepairSchedule status for viewing
411405
*/
412406
private RepairScheduleStatus getRepairScheduleStatus(RepairSchedule repairSchedule) {
413-
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairSchedule.getRepairUnitId());
414-
Preconditions.checkState(
415-
repairUnit.isPresent(), "no repair unit found with id: " + repairSchedule.getRepairUnitId());
416-
return new RepairScheduleStatus(repairSchedule, repairUnit.get());
407+
RepairUnit repairUnit = context.storage.getRepairUnit(repairSchedule.getRepairUnitId());
408+
return new RepairScheduleStatus(repairSchedule, repairUnit);
417409
}
418410

419411
/**
@@ -438,17 +430,10 @@ public Response listSchedules(
438430
@QueryParam("keyspace") Optional<String> keyspaceName) {
439431

440432
List<RepairScheduleStatus> scheduleStatuses = Lists.newArrayList();
441-
Collection<RepairSchedule> schedules = getScheduleList(clusterName, keyspaceName);
442-
for (RepairSchedule schedule : schedules) {
443-
Optional<RepairUnit> unit = context.storage.getRepairUnit(schedule.getRepairUnitId());
444-
if (unit.isPresent()) {
445-
scheduleStatuses.add(new RepairScheduleStatus(schedule, unit.get()));
446-
} else {
447-
String errMsg = String.format("Found repair schedule %s with no associated repair unit", schedule.getId());
448-
LOG.error(errMsg);
449-
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
450-
}
451-
}
433+
getScheduleList(clusterName, keyspaceName).forEach((schedule) -> {
434+
RepairUnit unit = context.storage.getRepairUnit(schedule.getRepairUnitId());
435+
scheduleStatuses.add(new RepairScheduleStatus(schedule, unit));
436+
});
452437
return Response.ok().entity(scheduleStatuses).build();
453438
}
454439

src/server/src/main/java/io/cassandrareaper/service/ClusterRepairScheduler.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.atomic.AtomicInteger;
2929
import java.util.stream.Collectors;
3030

31-
import com.google.common.base.Optional;
3231
import com.google.common.base.Throwables;
3332
import com.google.common.collect.ImmutableSet;
3433
import com.google.common.collect.Sets;
@@ -179,11 +178,7 @@ private Set<String> keyspacesThatHaveSchedules(AppContext context, Cluster clust
179178
Collection<RepairSchedule> currentSchedules = context.storage.getRepairSchedulesForCluster(cluster.getName());
180179
return currentSchedules
181180
.stream()
182-
.map(
183-
repairSchedule -> {
184-
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairSchedule.getRepairUnitId());
185-
return repairUnit.get().getKeyspaceName();
186-
})
181+
.map(repairSchedule -> context.storage.getRepairUnit(repairSchedule.getRepairUnitId()).getKeyspaceName())
187182
.collect(Collectors.toSet());
188183
}
189184

src/server/src/main/java/io/cassandrareaper/service/RepairManager.java

+15-16
Original file line numberDiff line numberDiff line change
@@ -197,32 +197,31 @@ public void abortSegments(
197197
RepairRun repairRun,
198198
boolean forced,
199199
boolean postponeWithoutAborting) {
200-
RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()).get();
200+
201+
RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId());
202+
201203
for (RepairSegment segment : runningSegments) {
202-
LOG.debug(
203-
"Trying to abort stuck segment {} in repair run {}", segment.getId(), repairRun.getId());
204+
LOG.debug("Trying to abort stuck segment {} in repair run {}", segment.getId(), repairRun.getId());
204205
UUID leaderElectionId = repairUnit.getIncrementalRepair() ? repairRun.getId() : segment.getId();
205206
if (forced || takeLead(context, leaderElectionId) || renewLead(context, leaderElectionId)) {
206207
// refresh segment once we're inside leader-election
207208
segment = context.storage.getRepairSegment(repairRun.getId(), segment.getId()).get();
208209
if (RepairSegment.State.RUNNING == segment.getState()) {
209210
try {
210-
JmxProxy jmxProxy =
211-
context.jmxConnectionFactory.connect(
212-
Node.builder()
213-
.withClusterName(repairRun.getClusterName())
214-
.withHostname(segment.getCoordinatorHost())
215-
.build(),
216-
context.config.getJmxConnectionTimeoutInSeconds());
211+
Node node = Node.builder()
212+
.withClusterName(repairRun.getClusterName())
213+
.withHostname(segment.getCoordinatorHost())
214+
.build();
215+
216+
JmxProxy jmxProxy
217+
= context.jmxConnectionFactory.connect(node, context.config.getJmxConnectionTimeoutInSeconds());
217218

218219
SegmentRunner.abort(context, segment, jmxProxy);
219220
} catch (ReaperException | NumberFormatException | InterruptedException e) {
220-
LOG.debug(
221-
"Tried to abort repair on segment {} marked as RUNNING, "
222-
+ "but the host was down (so abortion won't be needed). "
223-
+ "Postponing the segment.",
224-
segment.getId(),
225-
e);
221+
String msg = "Tried to abort repair on segment {} marked as RUNNING, but the "
222+
+ "host was down (so abortion won't be needed). Postponing the segment.";
223+
224+
LOG.debug(msg, segment.getId(), e);
226225
SegmentRunner.postponeSegment(context, segment);
227226
} finally {
228227
// if someone else does hold the lease, ie renewLead(..) was true,

src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java

+8-11
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,17 @@ final class RepairRunner implements Runnable {
7272
assert repairRun.isPresent() : "No RepairRun with ID " + repairRunId + " found from storage";
7373
Optional<Cluster> cluster = context.storage.getCluster(repairRun.get().getClusterName());
7474
assert cluster.isPresent() : "No Cluster with name " + repairRun.get().getClusterName() + " found from storage";
75-
Optional<RepairUnit> repairUnitOpt = context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
76-
assert repairUnitOpt.isPresent() : "No RepairUnit with id " + repairRun.get().getRepairUnitId()
77-
+ " found in storage";
75+
RepairUnit repairUnitOpt = context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
7876
this.clusterName = cluster.get().getName();
7977

80-
JmxProxy jmx =
81-
this.context.jmxConnectionFactory.connectAny(
82-
cluster.get(), context.config.getJmxConnectionTimeoutInSeconds());
78+
JmxProxy jmx = this.context.jmxConnectionFactory
79+
.connectAny(cluster.get(), context.config.getJmxConnectionTimeoutInSeconds());
8380

84-
String keyspace = repairUnitOpt.get().getKeyspaceName();
81+
String keyspace = repairUnitOpt.getKeyspaceName();
8582
int parallelRepairs
8683
= getPossibleParallelRepairsCount(jmx.getRangeToEndpointMap(keyspace), jmx.getEndpointToHostId());
8784

88-
if ((repairUnitOpt.isPresent() && repairUnitOpt.get().getIncrementalRepair())) {
85+
if (repairUnitOpt.getIncrementalRepair()) {
8986
// with incremental repair, can't have more parallel repairs than nodes
9087
// Same goes for local mode
9188
parallelRepairs = 1;
@@ -104,8 +101,8 @@ final class RepairRunner implements Runnable {
104101
Collections2.transform(
105102
repairSegments, segment -> segment.getTokenRange().getBaseRange())));
106103

107-
String repairUnitClusterName = repairUnitOpt.get().getClusterName();
108-
String repairUnitKeyspaceName = repairUnitOpt.get().getKeyspaceName();
104+
String repairUnitClusterName = repairUnitOpt.getClusterName();
105+
String repairUnitKeyspaceName = repairUnitOpt.getKeyspaceName();
109106

110107
// below four metric names are duplicated, so monitoring systems can follow per cluster or per cluster and keyspace
111108
String metricNameForRepairProgressPerKeyspace
@@ -417,7 +414,7 @@ private boolean repairSegment(final int rangeIndex, final UUID segmentId, Segmen
417414
repairProgress = (float) amountDone / repairRun.getSegmentCount();
418415
}
419416

420-
RepairUnit repairUnit = context.storage.getRepairUnit(unitId).get();
417+
RepairUnit repairUnit = context.storage.getRepairUnit(unitId);
421418
String keyspace = repairUnit.getKeyspaceName();
422419
LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRunId);
423420

src/server/src/main/java/io/cassandrareaper/service/RepairScheduleService.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ public Optional<RepairSchedule> conflictingRepairSchedule(Cluster cluster, Repai
5656
.getRepairSchedulesForClusterAndKeyspace(repairUnit.getClusterName(), repairUnit.getKeyspaceName());
5757

5858
for (RepairSchedule sched : repairSchedules) {
59-
Optional<RepairUnit> repairUnitForSched = context.storage.getRepairUnit(sched.getRepairUnitId());
60-
Preconditions.checkState(repairUnitForSched.isPresent());
61-
Preconditions.checkState(repairUnitForSched.get().getClusterName().equals(repairUnit.getClusterName()));
62-
Preconditions.checkState(repairUnitForSched.get().getKeyspaceName().equals(repairUnit.getKeyspaceName()));
59+
RepairUnit repairUnitForSched = context.storage.getRepairUnit(sched.getRepairUnitId());
60+
Preconditions.checkState(repairUnitForSched.getClusterName().equals(repairUnit.getClusterName()));
61+
Preconditions.checkState(repairUnitForSched.getKeyspaceName().equals(repairUnit.getKeyspaceName()));
6362

64-
if (isConflictingSchedules(cluster, repairUnitForSched.get(), repairUnit)) {
63+
if (isConflictingSchedules(cluster, repairUnitForSched, repairUnit)) {
6564
return Optional.of(sched);
6665
}
6766
}

src/server/src/main/java/io/cassandrareaper/service/SchedulingManager.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,7 @@ private boolean manageSchedule(RepairSchedule schdle) {
138138
schedule.getRepairUnitId(),
139139
schedule.getId());
140140

141-
Optional<RepairUnit> fetchedUnit = context.storage.getRepairUnit(schedule.getRepairUnitId());
142-
if (!fetchedUnit.isPresent()) {
143-
LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId());
144-
return false;
145-
}
146-
RepairUnit repairUnit = fetchedUnit.get();
141+
RepairUnit repairUnit = context.storage.getRepairUnit(schedule.getRepairUnitId());
147142
if (repairRunAlreadyScheduled(schedule, repairUnit)) {
148143
return false;
149144
}

src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java

+7-15
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,14 @@ static void postponeSegment(AppContext context, RepairSegment segment) {
149149
postpone(context, segment, context.storage.getRepairUnit(segment.getRepairUnitId()));
150150
}
151151

152-
private static void postpone(
153-
AppContext context, RepairSegment segment, Optional<RepairUnit> repairUnit) {
152+
private static void postpone(AppContext context, RepairSegment segment, RepairUnit repairUnit) {
154153
LOG.info("Postponing segment {}", segment.getId());
155154
try {
156155
context.storage.updateRepairSegment(
157156
segment
158157
.reset()
159-
.withCoordinatorHost(
160-
repairUnit.isPresent() && repairUnit.get().getIncrementalRepair()
161-
? segment.getCoordinatorHost()
162-
: null) // set coordinator host to null only for full repairs
158+
// set coordinator host to null only for full repairs
159+
.withCoordinatorHost(repairUnit.getIncrementalRepair() ? segment.getCoordinatorHost() : null)
163160
.withFailCount(segment.getFailCount() + 1)
164161
.withId(segment.getId())
165162
.build());
@@ -417,18 +414,13 @@ private void processTriggeredSegment(
417414
}
418415
}
419416

420-
private static String metricNameForPostpone(Optional<RepairUnit> unit, RepairSegment segment) {
421-
return unit.isPresent()
422-
? MetricRegistry.name(
417+
private static String metricNameForPostpone(RepairUnit unit, RepairSegment segment) {
418+
return MetricRegistry.name(
423419
SegmentRunner.class,
424420
"postpone",
425421
Optional.fromNullable(segment.getCoordinatorHost()).or("null").replace('.', '-'),
426-
unit.get().getClusterName().replace('.', '-'),
427-
unit.get().getKeyspaceName())
428-
: MetricRegistry.name(
429-
SegmentRunner.class,
430-
"postpone",
431-
Optional.fromNullable(segment.getCoordinatorHost()).or("null").replace('.', '-'));
422+
unit.getClusterName().replace('.', '-'),
423+
unit.getKeyspaceName());
432424
}
433425

434426
private String metricNameForRepairing(RepairSegment rs) {

0 commit comments

Comments
 (0)