Skip to content

Commit b3bc674

Browse files
authored
MINOR: Cleanups in connect:runtime (#19452)
Various cleanups in Connect: - use enhanced switch - remove dead code - convert classes to records Reviewers: Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]>, Ken Huang <[email protected]>
1 parent 270948b commit b3bc674

File tree

56 files changed

+203
-614
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+203
-614
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@
7373

7474
import java.io.ByteArrayOutputStream;
7575
import java.io.PrintStream;
76-
import java.io.UnsupportedEncodingException;
7776
import java.nio.charset.StandardCharsets;
7877
import java.util.ArrayList;
7978
import java.util.Collection;
@@ -1082,12 +1081,8 @@ protected final boolean maybeAddConfigErrors(
10821081

10831082
private String trace(Throwable t) {
10841083
ByteArrayOutputStream output = new ByteArrayOutputStream();
1085-
try {
1086-
t.printStackTrace(new PrintStream(output, false, StandardCharsets.UTF_8.name()));
1087-
return output.toString(StandardCharsets.UTF_8.name());
1088-
} catch (UnsupportedEncodingException e) {
1089-
return null;
1090-
}
1084+
t.printStackTrace(new PrintStream(output, false, StandardCharsets.UTF_8));
1085+
return output.toString(StandardCharsets.UTF_8);
10911086
}
10921087

10931088
/*

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private void validate(String connectorName, VersionRange range) throws ConnectEx
5454
}
5555
}
5656

57-
private Connector lookup(String connectorName, VersionRange range) throws Exception {
57+
private Connector lookup(String connectorName, VersionRange range) {
5858
String version = range == null ? LATEST_VERSION : range.toString();
5959
if (connectors.containsKey(connectorName) && connectors.get(connectorName).containsKey(version)) {
6060
return connectors.get(connectorName).get(version);
@@ -73,7 +73,7 @@ private Connector lookup(String connectorName, VersionRange range) throws Except
7373
}
7474
}
7575

76-
public Connector getConnector(String connectorName, VersionRange range) throws Exception {
76+
public Connector getConnector(String connectorName, VersionRange range) {
7777
validate(connectorName, range);
7878
return lookup(connectorName, range);
7979
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java

+2-18
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationS
372372
try {
373373
final String typeConfig = prefix + "type";
374374
final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX;
375-
@SuppressWarnings("unchecked") final Transformation<R> transformation = getTransformationOrPredicate(plugins, typeConfig, versionConfig);
375+
final Transformation<R> transformation = getTransformationOrPredicate(plugins, typeConfig, versionConfig);
376376
Map<String, Object> configs = originalsWithPrefix(prefix);
377377
Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
378378
Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
@@ -382,7 +382,6 @@ public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationS
382382
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
383383
final String predicateTypeConfig = predicatePrefix + "type";
384384
final String predicateVersionConfig = predicatePrefix + WorkerConfig.PLUGIN_VERSION_SUFFIX;
385-
@SuppressWarnings("unchecked")
386385
Predicate<R> predicate = getTransformationOrPredicate(plugins, predicateTypeConfig, predicateVersionConfig);
387386
predicate.configure(originalsWithPrefix(predicatePrefix));
388387
Plugin<Predicate<R>> predicatePlugin = metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
@@ -781,22 +780,7 @@ public boolean visible(String name, Map<String, Object> parsedConfig) {
781780
}
782781
}
783782

784-
private static class ConverterDefaults {
785-
private final String type;
786-
private final String version;
787-
788-
public ConverterDefaults(String type, String version) {
789-
this.type = type;
790-
this.version = version;
791-
}
792-
793-
public String type() {
794-
return type;
795-
}
796-
797-
public String version() {
798-
return version;
799-
}
783+
private record ConverterDefaults(String type, String version) {
800784
}
801785

802786
public static class PluginVersionValidator implements ConfigDef.Validator {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java

+19-22
Original file line numberDiff line numberDiff line change
@@ -428,24 +428,22 @@ private TransactionBoundaryManager buildTransactionManager(
428428
SourceConnectorConfig sourceConfig,
429429
WorkerTransactionContext transactionContext) {
430430
TransactionBoundary boundary = sourceConfig.transactionBoundary();
431-
switch (boundary) {
432-
case POLL:
433-
return new TransactionBoundaryManager() {
434-
@Override
435-
protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
436-
return true;
437-
}
438-
439-
@Override
440-
protected boolean shouldCommitFinalTransaction() {
441-
return true;
442-
}
443-
};
431+
return switch (boundary) {
432+
case POLL -> new TransactionBoundaryManager() {
433+
@Override
434+
protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
435+
return true;
436+
}
444437

445-
case INTERVAL:
438+
@Override
439+
protected boolean shouldCommitFinalTransaction() {
440+
return true;
441+
}
442+
};
443+
case INTERVAL -> {
446444
long transactionBoundaryInterval = Optional.ofNullable(sourceConfig.transactionBoundaryInterval())
447445
.orElse(workerConfig.offsetCommitInterval());
448-
return new TransactionBoundaryManager() {
446+
yield new TransactionBoundaryManager() {
449447
private final long commitInterval = transactionBoundaryInterval;
450448
private long lastCommit;
451449

@@ -465,14 +463,14 @@ protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
465463
}
466464

467465
@Override
468-
protected boolean shouldCommitFinalTransaction() {
466+
protected boolean shouldCommitFinalTransaction() {
469467
return true;
470468
}
471469
};
472-
473-
case CONNECTOR:
470+
}
471+
case CONNECTOR -> {
474472
Objects.requireNonNull(transactionContext, "Transaction context must be provided when using connector-defined transaction boundaries");
475-
return new TransactionBoundaryManager() {
473+
yield new TransactionBoundaryManager() {
476474
@Override
477475
protected boolean shouldCommitFinalTransaction() {
478476
return shouldCommitTransactionForBatch(time.milliseconds());
@@ -513,9 +511,8 @@ private void maybeAbortTransaction() {
513511
transactionOpen = false;
514512
}
515513
};
516-
default:
517-
throw new IllegalArgumentException("Unrecognized transaction boundary: " + boundary);
518-
}
514+
}
515+
};
519516
}
520517

521518
TransactionMetricsGroup transactionMetricsGroup() {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java

+1-31
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.Collection;
3838
import java.util.List;
3939
import java.util.Map;
40-
import java.util.Objects;
4140

4241
/**
4342
* <p>
@@ -392,35 +391,6 @@ enum ConfigReloadAction {
392391
RESTART
393392
}
394393

395-
class Created<T> {
396-
private final boolean created;
397-
private final T result;
398-
399-
public Created(boolean created, T result) {
400-
this.created = created;
401-
this.result = result;
402-
}
403-
404-
public boolean created() {
405-
return created;
406-
}
407-
408-
public T result() {
409-
return result;
410-
}
411-
412-
@Override
413-
public boolean equals(Object o) {
414-
if (this == o) return true;
415-
if (o == null || getClass() != o.getClass()) return false;
416-
Created<?> created1 = (Created<?>) o;
417-
return Objects.equals(created, created1.created) &&
418-
Objects.equals(result, created1.result);
419-
}
420-
421-
@Override
422-
public int hashCode() {
423-
return Objects.hash(created, result);
424-
}
394+
record Created<T>(boolean created, T result) {
425395
}
426396
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import org.apache.kafka.connect.util.ConnectorTaskId;
2323

2424
import java.util.Collection;
25-
import java.util.Collections;
2625
import java.util.Objects;
27-
import java.util.stream.Collectors;
2826

2927
/**
3028
* An immutable restart plan per connector.
@@ -45,13 +43,10 @@ public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo)
4543
this.request = Objects.requireNonNull(request, "RestartRequest name may not be null");
4644
this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null");
4745
// Collect the task IDs to stop and restart (may be none)
48-
this.idsToRestart = Collections.unmodifiableList(
49-
stateInfo.tasks()
50-
.stream()
51-
.filter(this::isRestarting)
52-
.map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id()))
53-
.collect(Collectors.toList())
54-
);
46+
this.idsToRestart = stateInfo.tasks()
47+
.stream()
48+
.filter(this::isRestarting)
49+
.map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())).toList();
5550
}
5651

5752
/**

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java

+10-62
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,25 @@
1717
package org.apache.kafka.connect.runtime;
1818

1919
import org.apache.kafka.connect.connector.Connector;
20-
import org.apache.kafka.connect.connector.Task;
2120

2221
import java.util.Objects;
2322

2423
/**
2524
* A request to restart a connector and/or task instances.
2625
* <p>
2726
* The natural order is based first upon the connector name and then requested restart behaviors.
28-
* If two requests have the same connector name, then the requests are ordered based on the
27+
* If two requests have the same connector name, then the requests are ordered based on the
2928
* probable number of tasks/connector this request is going to restart.
29+
* @param connectorName the name of the connector; may not be null
30+
* @param onlyFailed true if only failed instances should be restarted
31+
* @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted
3032
*/
31-
public class RestartRequest implements Comparable<RestartRequest> {
33+
public record RestartRequest(String connectorName,
34+
boolean onlyFailed,
35+
boolean includeTasks) implements Comparable<RestartRequest> {
3236

33-
private final String connectorName;
34-
private final boolean onlyFailed;
35-
private final boolean includeTasks;
36-
37-
/**
38-
* Create a new request to restart a connector and optionally its tasks.
39-
*
40-
* @param connectorName the name of the connector; may not be null
41-
* @param onlyFailed true if only failed instances should be restarted
42-
* @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted
43-
*/
44-
public RestartRequest(String connectorName, boolean onlyFailed, boolean includeTasks) {
45-
this.connectorName = Objects.requireNonNull(connectorName, "Connector name may not be null");
46-
this.onlyFailed = onlyFailed;
47-
this.includeTasks = includeTasks;
48-
}
49-
50-
/**
51-
* Get the name of the connector.
52-
*
53-
* @return the connector name; never null
54-
*/
55-
public String connectorName() {
56-
return connectorName;
57-
}
58-
59-
/**
60-
* Determine whether only failed instances be restarted.
61-
*
62-
* @return true if only failed instances should be restarted, or false if all applicable instances should be restarted
63-
*/
64-
public boolean onlyFailed() {
65-
return onlyFailed;
66-
}
67-
68-
/**
69-
* Determine whether {@link Task} instances should also be restarted in addition to the {@link Connector} instance.
70-
*
71-
* @return true if the connector and task instances should be restarted, or false if just the connector should be restarted
72-
*/
73-
public boolean includeTasks() {
74-
return includeTasks;
37+
public RestartRequest {
38+
Objects.requireNonNull(connectorName, "Connector name may not be null");
7539
}
7640

7741
/**
@@ -108,6 +72,7 @@ public int compareTo(RestartRequest o) {
10872
int result = connectorName.compareTo(o.connectorName);
10973
return result == 0 ? impactRank() - o.impactRank() : result;
11074
}
75+
11176
//calculates an internal rank for the restart request based on the probable number of tasks/connector this request is going to restart
11277
private int impactRank() {
11378
if (onlyFailed && !includeTasks) { //restarts only failed connector so least impactful
@@ -121,23 +86,6 @@ private int impactRank() {
12186
return 3;
12287
}
12388

124-
@Override
125-
public boolean equals(Object o) {
126-
if (this == o) {
127-
return true;
128-
}
129-
if (o == null || getClass() != o.getClass()) {
130-
return false;
131-
}
132-
RestartRequest that = (RestartRequest) o;
133-
return onlyFailed == that.onlyFailed && includeTasks == that.includeTasks && Objects.equals(connectorName, that.connectorName);
134-
}
135-
136-
@Override
137-
public int hashCode() {
138-
return Objects.hash(connectorName, onlyFailed, includeTasks);
139-
}
140-
14189
@Override
14290
public String toString() {
14391
return "restart request for {" + "connectorName='" + connectorName + "', onlyFailed=" + onlyFailed + ", includeTasks=" + includeTasks + '}';

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SessionKey.java

+5-46
Original file line numberDiff line numberDiff line change
@@ -22,53 +22,12 @@
2222

2323
/**
2424
* A session key, which can be used to validate internal REST requests between workers.
25+
* @param key the actual cryptographic key to use for request validation; may not be null
26+
* @param creationTimestamp the time at which the key was generated
2527
*/
26-
public class SessionKey {
28+
public record SessionKey(SecretKey key, long creationTimestamp) {
2729

28-
private final SecretKey key;
29-
private final long creationTimestamp;
30-
31-
/**
32-
* Create a new session key with the given key value and creation timestamp
33-
* @param key the actual cryptographic key to use for request validation; may not be null
34-
* @param creationTimestamp the time at which the key was generated
35-
*/
36-
public SessionKey(SecretKey key, long creationTimestamp) {
37-
this.key = Objects.requireNonNull(key, "Key may not be null");
38-
this.creationTimestamp = creationTimestamp;
39-
}
40-
41-
/**
42-
* Get the cryptographic key to use for request validation.
43-
*
44-
* @return the cryptographic key; may not be null
45-
*/
46-
public SecretKey key() {
47-
return key;
48-
}
49-
50-
/**
51-
* Get the time at which the key was generated.
52-
*
53-
* @return the time at which the key was generated
54-
*/
55-
public long creationTimestamp() {
56-
return creationTimestamp;
57-
}
58-
59-
@Override
60-
public boolean equals(Object o) {
61-
if (this == o)
62-
return true;
63-
if (o == null || getClass() != o.getClass())
64-
return false;
65-
SessionKey that = (SessionKey) o;
66-
return creationTimestamp == that.creationTimestamp
67-
&& key.equals(that.key);
68-
}
69-
70-
@Override
71-
public int hashCode() {
72-
return Objects.hash(key, creationTimestamp);
30+
public SessionKey {
31+
Objects.requireNonNull(key, "Key may not be null");
7332
}
7433
}

0 commit comments

Comments
 (0)