Skip to content

Commit 138f6b3

Browse files
In the Cassandra storage make all write statements async.
ref: - #124 - #129
1 parent a7104ba commit 138f6b3

File tree

1 file changed

+71
-54
lines changed

1 file changed

+71
-54
lines changed

src/main/java/com/spotify/reaper/storage/CassandraStorage.java

+71-54
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,13 @@
33
import java.math.BigInteger;
44
import java.util.Collection;
55
import java.util.Collections;
6-
import java.util.Comparator;
76
import java.util.Date;
87
import java.util.List;
98
import java.util.Set;
109
import java.util.UUID;
10+
import java.util.concurrent.ExecutionException;
1111
import java.util.stream.Collectors;
1212

13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
1613
import com.datastax.driver.core.BatchStatement;
1714
import com.datastax.driver.core.CodecRegistry;
1815
import com.datastax.driver.core.ConsistencyLevel;
@@ -23,13 +20,12 @@
2320
import com.datastax.driver.core.ResultSetFuture;
2421
import com.datastax.driver.core.Row;
2522
import com.datastax.driver.core.Session;
26-
import com.datastax.driver.core.SocketOptions;
2723
import com.datastax.driver.core.utils.UUIDs;
2824
import com.google.common.base.Optional;
29-
import com.google.common.collect.ComparisonChain;
3025
import com.google.common.collect.ImmutableList;
3126
import com.google.common.collect.Lists;
3227
import com.google.common.collect.Sets;
28+
import com.google.common.util.concurrent.Futures;
3329
import com.spotify.reaper.ReaperApplication;
3430
import com.spotify.reaper.ReaperApplicationConfiguration;
3531
import com.spotify.reaper.core.Cluster;
@@ -47,14 +43,15 @@
4743
import com.spotify.reaper.service.RingRange;
4844
import com.spotify.reaper.storage.cassandra.DateTimeCodec;
4945
import com.spotify.reaper.storage.cassandra.Migration003;
50-
46+
import io.dropwizard.setup.Environment;
5147
import org.apache.cassandra.repair.RepairParallelism;
5248
import org.cognitor.cassandra.migration.Database;
5349
import org.cognitor.cassandra.migration.MigrationRepository;
5450
import org.cognitor.cassandra.migration.MigrationTask;
5551
import org.joda.time.DateTime;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
5654

57-
import io.dropwizard.setup.Environment;
5855
import systems.composable.dropwizard.cassandra.CassandraFactory;
5956

6057
public final class CassandraStorage implements IStorage, IDistributedStorage {
@@ -103,10 +100,10 @@ public CassandraStorage(ReaperApplicationConfiguration config, Environment envir
103100
CassandraFactory cassandraFactory = config.getCassandraFactory();
104101
// all INSERT and DELETE statement prepared in this class are idempotent
105102
cassandraFactory.setQueryOptions(java.util.Optional.of(new QueryOptions().setDefaultIdempotence(true)));
106-
cassandra = config.getCassandraFactory().build(environment);
107-
103+
cassandra = cassandraFactory.build(environment);
108104
if(config.getActivateQueryLogger())
109105
cassandra.register(QueryLogger.builder().build());
106+
110107
CodecRegistry codecRegistry = cassandra.getConfiguration().getCodecRegistry();
111108
codecRegistry.register(new DateTimeCodec());
112109
session = cassandra.connect(config.getCassandraFactory().getKeyspace());
@@ -171,12 +168,7 @@ public Collection<Cluster> getClusters() {
171168

172169
@Override
173170
public boolean addCluster(Cluster cluster) {
174-
try {
175-
session.execute(insertClusterPrepStmt.bind(cluster.getName(), cluster.getPartitioner(), cluster.getSeedHosts()));
176-
} catch (Exception e) {
177-
LOG.warn("failed inserting cluster with name: {}", cluster.getName(), e);
178-
return false;
179-
}
171+
session.execute(insertClusterPrepStmt.bind(cluster.getName(), cluster.getPartitioner(), cluster.getSeedHosts()));
180172
return true;
181173
}
182174

@@ -187,24 +179,25 @@ public boolean updateCluster(Cluster newCluster) {
187179

188180
@Override
189181
public Optional<Cluster> getCluster(String clusterName) {
190-
Cluster cluster = null;
191-
for(Row clusterRow : session.execute(getClusterPrepStmt.bind(clusterName))){
192-
cluster = new Cluster(clusterRow.getString("name"), clusterRow.getString("partitioner"), clusterRow.getSet("seed_hosts", String.class));
193-
}
194-
195-
return Optional.fromNullable(cluster);
182+
Row r = session.execute(getClusterPrepStmt.bind(clusterName)).one();
183+
184+
return r != null
185+
? Optional.fromNullable(
186+
new Cluster(r.getString("name"), r.getString("partitioner"), r.getSet("seed_hosts", String.class)))
187+
: Optional.absent();
196188
}
197189

198190
@Override
199191
public Optional<Cluster> deleteCluster(String clusterName) {
200-
session.execute(deleteClusterPrepStmt.bind(clusterName));
192+
session.executeAsync(deleteClusterPrepStmt.bind(clusterName));
201193
return Optional.fromNullable(new Cluster(clusterName, null, null));
202194
}
203195

204196
@Override
205197
public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builder> newSegments) {
206198
RepairRun newRepairRun = repairRun.build(UUIDs.timeBased());
207199
BatchStatement repairRunBatch = new BatchStatement(BatchStatement.Type.UNLOGGED);
200+
List<ResultSetFuture> futures = Lists.newArrayList();
208201

209202
repairRunBatch.add(insertRepairRunPrepStmt.bind(
210203
newRepairRun.getId(),
@@ -222,9 +215,6 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
222215
newRepairRun.getSegmentCount(),
223216
newRepairRun.getRepairParallelism().toString()));
224217

225-
session.execute(insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId()));
226-
session.execute(insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId()));
227-
228218
for(RepairSegment.Builder builder:newSegments){
229219
RepairSegment segment = builder.withRunId(newRepairRun.getId()).build(UUIDs.timeBased());
230220

@@ -241,17 +231,39 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
241231
segment.getFailCount()));
242232

243233
if(100 == repairRunBatch.size()){
244-
session.execute(repairRunBatch);
234+
futures.add(session.executeAsync(repairRunBatch));
245235
repairRunBatch = new BatchStatement(BatchStatement.Type.UNLOGGED);
246236
}
247237
}
248-
session.execute(repairRunBatch);
238+
futures.add(session.executeAsync(repairRunBatch));
239+
futures.add(session.executeAsync(insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId())));
240+
futures.add(session.executeAsync(insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId())));
241+
242+
try {
243+
Futures.allAsList(futures).get();
244+
} catch (InterruptedException | ExecutionException ex) {
245+
LOG.error("failed to quorum insert new repair run " + newRepairRun.getId(), ex);
246+
}
249247
return newRepairRun;
250248
}
251249

252250
@Override
253251
public boolean updateRepairRun(RepairRun repairRun) {
254-
session.execute(insertRepairRunPrepStmt.bind(repairRun.getId(), repairRun.getClusterName(), repairRun.getRepairUnitId(), repairRun.getCause(), repairRun.getOwner(), repairRun.getRunState().toString(), repairRun.getCreationTime(), repairRun.getStartTime(), repairRun.getEndTime(), repairRun.getPauseTime(), repairRun.getIntensity(), repairRun.getLastEvent(), repairRun.getSegmentCount(), repairRun.getRepairParallelism().toString()));
252+
session.execute(insertRepairRunPrepStmt.bind(
253+
repairRun.getId(),
254+
repairRun.getClusterName(),
255+
repairRun.getRepairUnitId(),
256+
repairRun.getCause(),
257+
repairRun.getOwner(),
258+
repairRun.getRunState().toString(),
259+
repairRun.getCreationTime(),
260+
repairRun.getStartTime(),
261+
repairRun.getEndTime(),
262+
repairRun.getPauseTime(),
263+
repairRun.getIntensity(),
264+
repairRun.getLastEvent(),
265+
repairRun.getSegmentCount(),
266+
repairRun.getRepairParallelism().toString()));
255267
return true;
256268
}
257269

@@ -343,9 +355,9 @@ public Collection<RepairRun> getRepairRunsWithState(RunState runState) {
343355
public Optional<RepairRun> deleteRepairRun(UUID id) {
344356
Optional<RepairRun> repairRun = getRepairRun(id);
345357
if(repairRun.isPresent()){
346-
session.execute(deleteRepairRunPrepStmt.bind(id));
347-
session.execute(deleteRepairRunByClusterPrepStmt.bind(id, repairRun.get().getClusterName()));
348-
session.execute(deleteRepairRunByUnitPrepStmt.bind(id, repairRun.get().getRepairUnitId()));
358+
session.executeAsync(deleteRepairRunByUnitPrepStmt.bind(id, repairRun.get().getRepairUnitId()));
359+
session.executeAsync(deleteRepairRunByClusterPrepStmt.bind(id, repairRun.get().getClusterName()));
360+
session.executeAsync(deleteRepairRunPrepStmt.bind(id));
349361
}
350362
return repairRun;
351363
}
@@ -394,7 +406,7 @@ assert hasLeadOnSegment(newRepairSegment.getId())
394406
if (newRepairSegment.getStartTime() != null) {
395407
startTime = newRepairSegment.getStartTime().toDate();
396408
}
397-
session.executeAsync(insertRepairSegmentPrepStmt.bind(
409+
session.execute(insertRepairSegmentPrepStmt.bind(
398410
newRepairSegment.getRunId(),
399411
newRepairSegment.getId(),
400412
newRepairSegment.getRepairUnitId(),
@@ -553,12 +565,11 @@ public RepairSchedule addRepairSchedule(com.spotify.reaper.core.RepairSchedule.B
553565

554566
@Override
555567
public Optional<RepairSchedule> getRepairSchedule(UUID repairScheduleId) {
556-
RepairSchedule schedule = null;
557568
Row sched = session.execute(getRepairSchedulePrepStmt.bind(repairScheduleId)).one();
558-
if(sched!=null){
559-
schedule = createRepairScheduleFromRow(sched);
560-
}
561-
return Optional.fromNullable(schedule);
569+
570+
return sched != null
571+
? Optional.fromNullable(createRepairScheduleFromRow(sched))
572+
: Optional.absent();
562573
}
563574

564575
private RepairSchedule createRepairScheduleFromRow(Row repairScheduleRow){
@@ -625,18 +636,19 @@ public Collection<RepairSchedule> getAllRepairSchedules() {
625636
ResultSet scheduleResults = session.execute("SELECT * FROM repair_schedule_v1");
626637
for(Row scheduleRow:scheduleResults){
627638
schedules.add(createRepairScheduleFromRow(scheduleRow));
628-
629639
}
630-
640+
631641
return schedules;
632642
}
633643

634644
@Override
635645
public boolean updateRepairSchedule(RepairSchedule newRepairSchedule) {
636646
final Set<UUID> repairHistory = Sets.newHashSet();
637647
repairHistory.addAll(newRepairSchedule.getRunHistory());
648+
RepairUnit repairUnit = getRepairUnit(newRepairSchedule.getRepairUnitId()).get();
649+
List<ResultSetFuture> futures = Lists.newArrayList();
638650

639-
session.execute(insertRepairSchedulePrepStmt.bind(newRepairSchedule.getId(),
651+
futures.add(session.executeAsync(insertRepairSchedulePrepStmt.bind(newRepairSchedule.getId(),
640652
newRepairSchedule.getRepairUnitId(),
641653
newRepairSchedule.getState().toString(),
642654
newRepairSchedule.getDaysBetween(),
@@ -647,18 +659,22 @@ public boolean updateRepairSchedule(RepairSchedule newRepairSchedule) {
647659
newRepairSchedule.getIntensity(),
648660
newRepairSchedule.getCreationTime(),
649661
newRepairSchedule.getOwner(),
650-
newRepairSchedule.getPauseTime())
651-
);
652-
RepairUnit repairUnit = getRepairUnit(newRepairSchedule.getRepairUnitId()).get();
662+
newRepairSchedule.getPauseTime())));
653663

654-
session.execute(insertRepairScheduleByClusterAndKsPrepStmt
655-
.bind(repairUnit.getClusterName(), repairUnit.getKeyspaceName(), newRepairSchedule.getId()));
664+
futures.add(session.executeAsync(insertRepairScheduleByClusterAndKsPrepStmt
665+
.bind(repairUnit.getClusterName(), repairUnit.getKeyspaceName(), newRepairSchedule.getId())));
656666

657-
session.execute(insertRepairScheduleByClusterAndKsPrepStmt
658-
.bind(repairUnit.getClusterName(), " ", newRepairSchedule.getId()));
667+
futures.add(session.executeAsync(insertRepairScheduleByClusterAndKsPrepStmt
668+
.bind(repairUnit.getClusterName(), " ", newRepairSchedule.getId())));
659669

660-
session.execute(insertRepairScheduleByClusterAndKsPrepStmt
661-
.bind(" ", repairUnit.getKeyspaceName(), newRepairSchedule.getId()));
670+
futures.add(session.executeAsync(insertRepairScheduleByClusterAndKsPrepStmt
671+
.bind(" ", repairUnit.getKeyspaceName(), newRepairSchedule.getId())));
672+
673+
try {
674+
Futures.allAsList(futures).get();
675+
} catch (InterruptedException | ExecutionException ex) {
676+
LOG.error("failed to quorum update repair schedule " + newRepairSchedule.getId(), ex);
677+
}
662678

663679
return true;
664680
}
@@ -668,16 +684,17 @@ public Optional<RepairSchedule> deleteRepairSchedule(UUID id) {
668684
Optional<RepairSchedule> repairSchedule = getRepairSchedule(id);
669685
if(repairSchedule.isPresent()){
670686
RepairUnit repairUnit = getRepairUnit(repairSchedule.get().getRepairUnitId()).get();
671-
session.execute(deleteRepairSchedulePrepStmt.bind(repairSchedule.get().getId()));
672687

673-
session.execute(deleteRepairScheduleByClusterAndKsPrepStmt
688+
session.executeAsync(deleteRepairScheduleByClusterAndKsPrepStmt
674689
.bind(repairUnit.getClusterName(), repairUnit.getKeyspaceName(), repairSchedule.get().getId()));
675690

676-
session.execute(deleteRepairScheduleByClusterAndKsPrepStmt
691+
session.executeAsync(deleteRepairScheduleByClusterAndKsPrepStmt
677692
.bind(repairUnit.getClusterName(), " ", repairSchedule.get().getId()));
678693

679-
session.execute(deleteRepairScheduleByClusterAndKsPrepStmt
694+
session.executeAsync(deleteRepairScheduleByClusterAndKsPrepStmt
680695
.bind(" ", repairUnit.getKeyspaceName(), repairSchedule.get().getId()));
696+
697+
session.executeAsync(deleteRepairSchedulePrepStmt.bind(repairSchedule.get().getId()));
681698
}
682699

683700
return repairSchedule;

0 commit comments

Comments
 (0)