Skip to content

Commit 2f9e90e

Browse files
committed
Add /cluster/x/runs endpoint. Candidate for replacing /cluster/x
1 parent 84a4f7c commit 2f9e90e

File tree

10 files changed

+245
-28
lines changed

10 files changed

+245
-28
lines changed

src/main/java/com/spotify/reaper/resources/ClusterResource.java

+29
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@
1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
22+
import com.fasterxml.jackson.databind.util.ISO8601DateFormat;
23+
import com.fasterxml.jackson.datatype.joda.JodaModule;
2224
import com.spotify.reaper.AppContext;
2325
import com.spotify.reaper.ReaperException;
2426
import com.spotify.reaper.cassandra.JmxProxy;
2527
import com.spotify.reaper.core.Cluster;
2628
import com.spotify.reaper.core.RepairRun;
29+
import com.spotify.reaper.resources.view.ClusterRun;
2730
import com.spotify.reaper.resources.view.ClusterStatus;
2831
import com.spotify.reaper.resources.view.KeyspaceStatus;
2932
import com.spotify.reaper.resources.view.hierarchy.HCluster;
33+
import com.spotify.reaper.storage.PostgresStorage;
3034

35+
import org.joda.time.format.ISODateTimeFormat;
3136
import org.slf4j.Logger;
3237
import org.slf4j.LoggerFactory;
3338

39+
import sun.nio.cs.ISO_8859_2;
40+
3441
import java.net.URI;
3542
import java.net.URL;
43+
import java.text.DateFormat;
3644
import java.util.ArrayList;
3745
import java.util.Collection;
3846
import java.util.Collections;
@@ -114,6 +122,27 @@ public Response getClusterHierarchy(@PathParam("cluster_name") String clusterNam
114122
}
115123
}
116124

125+
@GET
126+
@Path("/{cluster_name}/runs")
127+
public Response getClusterRunOverview(
128+
@PathParam("cluster_name") String clusterName,
129+
@QueryParam("limit") Optional<Integer> limit) {
130+
PostgresStorage ps = (PostgresStorage) context.storage;
131+
132+
Collection<ClusterRun> view =
133+
ps.getClusterRunOverview(clusterName, limit.or(10));
134+
135+
ObjectMapper objectMapper = new ObjectMapper()
136+
.setPropertyNamingStrategy(
137+
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES)
138+
.registerModule(new JodaModule());
139+
try {
140+
return Response.ok().entity(objectMapper.writeValueAsString(view)).build();
141+
} catch (JsonProcessingException e) {
142+
return Response.serverError().entity("JSON processing failed").build();
143+
}
144+
}
145+
117146
@POST
118147
public Response addCluster(
119148
@Context UriInfo uriInfo,

src/main/java/com/spotify/reaper/resources/RepairRunResource.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public Response modifyRunState(
244244
}
245245

246246
int segmentsRepaired =
247-
context.storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.DONE);
247+
context.storage.getSegmentAmountForRepairRunWithState(repairRunId, RepairSegment.State.DONE);
248248

249249
RepairRun.RunState newState;
250250
try {
@@ -343,7 +343,8 @@ private RepairRunStatus getRepairRunStatus(RepairRun repairRun) {
343343
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId());
344344
assert repairUnit.isPresent() : "no repair unit found with id: " + repairRun.getRepairUnitId();
345345
int segmentsRepaired =
346-
context.storage.getSegmentAmountForRepairRun(repairRun.getId(), RepairSegment.State.DONE);
346+
context.storage.getSegmentAmountForRepairRunWithState(repairRun.getId(),
347+
RepairSegment.State.DONE);
347348
return new RepairRunStatus(repairRun, repairUnit.get(), segmentsRepaired);
348349
}
349350

@@ -389,7 +390,8 @@ public Response listRepairRuns(@QueryParam("state") Optional<String> state) {
389390
}
390391
Optional<RepairUnit> runsUnit = context.storage.getRepairUnit(run.getRepairUnitId());
391392
int segmentsRepaired =
392-
context.storage.getSegmentAmountForRepairRun(run.getId(), RepairSegment.State.DONE);
393+
context.storage.getSegmentAmountForRepairRunWithState(run.getId(),
394+
RepairSegment.State.DONE);
393395
if (runsUnit.isPresent()) {
394396
runStatuses.add(new RepairRunStatus(run, runsUnit.get(), segmentsRepaired));
395397
} else {
@@ -455,7 +457,7 @@ public Response deleteRepairRun(@PathParam("id") Long runId,
455457
"Repair run with id \"" + runId + "\" is not owned by the user you defined: "
456458
+ owner.get()).build();
457459
}
458-
if (context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING) > 0) {
460+
if (context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.RUNNING) > 0) {
459461
return Response.status(Response.Status.FORBIDDEN).entity(
460462
"Repair run with id \"" + runId
461463
+ "\" has a running segment, which must be waited to finish before deleting").build();
@@ -464,7 +466,7 @@ public Response deleteRepairRun(@PathParam("id") Long runId,
464466
Optional<RepairUnit> unitPossiblyDeleted =
465467
context.storage.getRepairUnit(runToDelete.get().getRepairUnitId());
466468
int segmentsRepaired =
467-
context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.DONE);
469+
context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.DONE);
468470
Optional<RepairRun> deletedRun = context.storage.deleteRepairRun(runId);
469471
if (deletedRun.isPresent()) {
470472
RepairRunStatus repairRunStatus =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.spotify.reaper.resources.view;
15+
16+
import com.spotify.reaper.core.RepairRun;
17+
import com.spotify.reaper.resources.CommonTools;
18+
import com.spotify.reaper.storage.postgresql.RepairRunMapper;
19+
20+
import org.apache.commons.lang.time.DurationFormatUtils;
21+
import org.joda.time.DateTime;
22+
import org.joda.time.Duration;
23+
import org.skife.jdbi.v2.StatementContext;
24+
import org.skife.jdbi.v2.tweak.ResultSetMapper;
25+
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
29+
public class ClusterRun {
30+
31+
public final int runId;
32+
public final String cluserName;
33+
public final String keyspaceName;
34+
public final String[] columnFamilies;
35+
public final int segmentsRepaired;
36+
public final int totalSegments;
37+
public final RepairRun.RunState state;
38+
private final DateTime startTime;
39+
public String getStartTime() {
40+
return CommonTools.dateTimeToISO8601(startTime);
41+
}
42+
private final Duration duration;
43+
public String getDuration() {
44+
if (null == duration) {
45+
return null;
46+
}
47+
return DurationFormatUtils.formatDurationWords(duration.getMillis(), false, false);
48+
}
49+
public final String cause;
50+
public final String owner;
51+
public final String lastEvent;
52+
private final DateTime estimatedTimeOfArrival;
53+
public String getEstimatedTimeOfArrival() {
54+
return CommonTools.dateTimeToISO8601(estimatedTimeOfArrival);
55+
}
56+
57+
public ClusterRun(int runId, String clusterName, String keyspaceName, String[] columnFamilies,
58+
int segmentsRepaired, int totalSegments, RepairRun.RunState state, DateTime startTime,
59+
DateTime endTime, String cause, String owner, String lastEvent) {
60+
this.runId = runId;
61+
this.cluserName = clusterName;
62+
this.keyspaceName = keyspaceName;
63+
this.columnFamilies = columnFamilies;
64+
this.segmentsRepaired = segmentsRepaired;
65+
this.totalSegments = totalSegments;
66+
this.state = state;
67+
this.startTime = startTime;
68+
this.cause = cause;
69+
this.owner = owner;
70+
this.lastEvent = lastEvent;
71+
72+
if (startTime == null) {
73+
estimatedTimeOfArrival = null;
74+
duration = null;
75+
} else if (endTime == null) {
76+
duration = null;
77+
if (state == RepairRun.RunState.ERROR || state == RepairRun.RunState.DELETED) {
78+
estimatedTimeOfArrival = null;
79+
} else {
80+
long now = DateTime.now().getMillis();
81+
estimatedTimeOfArrival = new DateTime(
82+
now + (now - startTime.getMillis()) /
83+
segmentsRepaired * (totalSegments - segmentsRepaired));
84+
}
85+
} else {
86+
estimatedTimeOfArrival = null;
87+
duration = new Duration(startTime.toInstant(), endTime.toInstant());
88+
}
89+
}
90+
91+
public static class Mapper implements ResultSetMapper<ClusterRun> {
92+
93+
@Override
94+
public ClusterRun map(int index, ResultSet r, StatementContext ctx) throws SQLException {
95+
int runId = r.getInt("id");
96+
String clusterName = r.getString("cluster_name");
97+
String keyspaceName = r.getString("keyspace_name");
98+
String[] columnFamilies = (String[]) r.getArray("column_families").getArray();
99+
int segmentsRepaired = (int)r.getLong("count");
100+
int totalSegments = r.getInt("segment_count");
101+
RepairRun.RunState state = RepairRun.RunState.valueOf(r.getString("state"));
102+
DateTime startTime = RepairRunMapper.getDateTimeOrNull(r, "start_time");
103+
DateTime endTime = RepairRunMapper.getDateTimeOrNull(r, "end_time");
104+
String cause = r.getString("cause");
105+
String owner = r.getString("owner");
106+
String lastEvent = r.getString("last_event");
107+
return new ClusterRun(runId, clusterName, keyspaceName, columnFamilies, segmentsRepaired,
108+
totalSegments, state, startTime, endTime, cause, owner, lastEvent);
109+
}
110+
}
111+
}

src/main/java/com/spotify/reaper/service/RepairRunner.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ private void end() {
144144
private void startNextSegment() throws ReaperException {
145145
// Currently not allowing parallel repairs.
146146
assert
147-
context.storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.RUNNING) == 0;
147+
context.storage.getSegmentAmountForRepairRunWithState(repairRunId,
148+
RepairSegment.State.RUNNING) == 0;
148149
Optional<RepairSegment> nextSegment = context.storage.getNextFreeSegment(repairRunId);
149150
if (nextSegment.isPresent()) {
150151
repairSegment(nextSegment.get().getId(), nextSegment.get().getTokenRange());

src/main/java/com/spotify/reaper/storage/IStorage.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,
9191

9292
Optional<RepairSegment> getRepairSegment(long id);
9393

94+
Collection<RepairSegment> getRepairSegmentsForRun(long runId);
95+
9496
Optional<RepairSegment> getNextFreeSegment(long runId);
9597

9698
Optional<RepairSegment> getNextFreeSegmentInRange(long runId, RingRange range);
@@ -99,7 +101,9 @@ Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,
99101

100102
Collection<Long> getRepairRunIdsForCluster(String clusterName);
101103

102-
int getSegmentAmountForRepairRun(long runId, RepairSegment.State state);
104+
int getSegmentAmountForRepairRun(long runId);
105+
106+
int getSegmentAmountForRepairRunWithState(long runId, RepairSegment.State state);
103107

104108
RepairSchedule addRepairSchedule(RepairSchedule.Builder repairSchedule);
105109

src/main/java/com/spotify/reaper/storage/MemoryStorage.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private int deleteRepairSegmentsForRun(long runId) {
193193
public Optional<RepairRun> deleteRepairRun(long id) {
194194
RepairRun deletedRun = repairRuns.remove(id);
195195
if (deletedRun != null) {
196-
if (getSegmentAmountForRepairRun(id, RepairSegment.State.RUNNING) == 0) {
196+
if (getSegmentAmountForRepairRunWithState(id, RepairSegment.State.RUNNING) == 0) {
197197
deleteRepairUnit(deletedRun.getRepairUnitId());
198198
deleteRepairSegmentsForRun(id);
199199
deletedRun = deletedRun.with().runState(RepairRun.RunState.DELETED).build(id);
@@ -257,6 +257,11 @@ public Optional<RepairSegment> getRepairSegment(long id) {
257257
return Optional.fromNullable(repairSegments.get(id));
258258
}
259259

260+
@Override
261+
public Collection<RepairSegment> getRepairSegmentsForRun(long runId) {
262+
return repairSegmentsByRunId.get(runId).values();
263+
}
264+
260265
@Override
261266
public Optional<RepairSegment> getNextFreeSegment(long runId) {
262267
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
@@ -302,7 +307,13 @@ public Collection<Long> getRepairRunIdsForCluster(String clusterName) {
302307
}
303308

304309
@Override
305-
public int getSegmentAmountForRepairRun(long runId, RepairSegment.State state) {
310+
public int getSegmentAmountForRepairRun(long runId) {
311+
Map<Long, RepairSegment> segmentsMap = repairSegmentsByRunId.get(runId);
312+
return segmentsMap == null ? 0 : segmentsMap.size();
313+
}
314+
315+
@Override
316+
public int getSegmentAmountForRepairRunWithState(long runId, RepairSegment.State state) {
306317
Map<Long, RepairSegment> segmentsMap = repairSegmentsByRunId.get(runId);
307318
int amount = 0;
308319
if (null != segmentsMap) {

src/main/java/com/spotify/reaper/storage/PostgresStorage.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.spotify.reaper.core.RepairSchedule;
2424
import com.spotify.reaper.core.RepairSegment;
2525
import com.spotify.reaper.core.RepairUnit;
26+
import com.spotify.reaper.resources.view.ClusterRun;
2627
import com.spotify.reaper.service.RingRange;
2728
import com.spotify.reaper.storage.postgresql.BigIntegerArgumentFactory;
2829
import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL;
@@ -198,7 +199,8 @@ public Optional<RepairRun> deleteRepairRun(long id) {
198199
IStoragePostgreSQL pg = getPostgresStorage(h);
199200
RepairRun runToDelete = pg.getRepairRun(id);
200201
if (runToDelete != null) {
201-
int segmentsRunning = pg.getSegmentAmountForRepairRun(id, RepairSegment.State.RUNNING);
202+
int segmentsRunning = pg.getSegmentAmountForRepairRunWithState(id,
203+
RepairSegment.State.RUNNING);
202204
if (segmentsRunning == 0) {
203205
pg.deleteRepairSegmentsForRun(runToDelete.getId());
204206
pg.deleteRepairRun(id);
@@ -325,6 +327,13 @@ public Optional<RepairSegment> getRepairSegment(long id) {
325327
return Optional.fromNullable(result);
326328
}
327329

330+
@Override
331+
public Collection<RepairSegment> getRepairSegmentsForRun(long runId) {
332+
try (Handle h = jdbi.open()) {
333+
return getPostgresStorage(h).getRepairSegmentsForRun(runId);
334+
}
335+
}
336+
328337
@Override
329338
public Optional<RepairSegment> getNextFreeSegment(long runId) {
330339
RepairSegment result;
@@ -349,7 +358,7 @@ public Collection<RepairSegment> getSegmentsWithState(long runId,
349358
RepairSegment.State segmentState) {
350359
Collection<RepairSegment> result;
351360
try (Handle h = jdbi.open()) {
352-
result = getPostgresStorage(h).getRepairSegmentForRunWithState(runId, segmentState);
361+
result = getPostgresStorage(h).getRepairSegmentsForRunWithState(runId, segmentState);
353362
}
354363
return result;
355364
}
@@ -364,10 +373,17 @@ public Collection<Long> getRepairRunIdsForCluster(String clusterName) {
364373
}
365374

366375
@Override
367-
public int getSegmentAmountForRepairRun(long runId, RepairSegment.State state) {
376+
public int getSegmentAmountForRepairRun(long runId) {
377+
try (Handle h = jdbi.open()) {
378+
return getPostgresStorage(h).getSegmentAmountForRepairRun(runId);
379+
}
380+
}
381+
382+
@Override
383+
public int getSegmentAmountForRepairRunWithState(long runId, RepairSegment.State state) {
368384
int result;
369385
try (Handle h = jdbi.open()) {
370-
result = getPostgresStorage(h).getSegmentAmountForRepairRun(runId, state);
386+
result = getPostgresStorage(h).getSegmentAmountForRepairRunWithState(runId, state);
371387
}
372388
return result;
373389
}
@@ -440,4 +456,10 @@ public Optional<RepairSchedule> deleteRepairSchedule(long id) {
440456
}
441457
return Optional.fromNullable(result);
442458
}
459+
460+
public Collection<ClusterRun> getClusterRunOverview(String clusterName, int limit) {
461+
try (Handle h = jdbi.open()) {
462+
return getPostgresStorage(h).getClusterRunOverview(clusterName, limit);
463+
}
464+
}
443465
}

0 commit comments

Comments
 (0)