Skip to content

Commit db4e74b

Browse files
authored
MINOR: Add Functional Interface annotation to interfaces used by Lambdas (#19234)
Adds the FunctionalInterface annotation to relevant Kafka Streams classes. While this is not strictly required for Java, it's still best practice and also useful for better integration with other JVM languages, for example Clojure, to allow using these interfaces as lambdas. Reviewers: Matthias J. Sax <[email protected]>
1 parent f1bb29b commit db4e74b

26 files changed

+26
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
2727
* may be executed in any thread calling {@link Consumer#poll(java.time.Duration) poll()}.
2828
*/
29+
@FunctionalInterface
2930
public interface OffsetCommitCallback {
3031

3132
/**

streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
3939
* @see Reducer
4040
*/
41+
@FunctionalInterface
4142
public interface Aggregator<K, V, VAgg> {
4243

4344
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
*
3131
* @see KStream#foreach(ForeachAction)
3232
*/
33+
@FunctionalInterface
3334
public interface ForeachAction<K, V> {
3435

3536
/**

streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
3131
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
3232
*/
33+
@FunctionalInterface
3334
public interface Initializer<VAgg> {
3435

3536
/**

streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @see KTable#groupBy(KeyValueMapper, Grouped)
4545
* @see KTable#toStream(KeyValueMapper)
4646
*/
47+
@FunctionalInterface
4748
public interface KeyValueMapper<K, V, VR> {
4849

4950
/**

streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* @param <K> key type
2424
* @param <V> aggregate value type
2525
*/
26+
@FunctionalInterface
2627
public interface Merger<K, V> {
2728

2829
/**

streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* @see KTable#filter(Predicate)
3232
* @see KTable#filterNot(Predicate)
3333
*/
34+
@FunctionalInterface
3435
public interface Predicate<K, V> {
3536

3637
/**

streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* @see SessionWindowedKStream#reduce(Reducer, Materialized)
3737
* @see Aggregator
3838
*/
39+
@FunctionalInterface
3940
public interface Reducer<V> {
4041

4142
/**

streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier} instead.
3939
*/
4040
@Deprecated
41+
@FunctionalInterface
4142
public interface TransformerSupplier<K, V, R> extends ConnectedStoreProvider, Supplier<Transformer<K, V, R>> {
4243

4344
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* @see KTable#leftJoin(KTable, ValueJoiner)
4141
* @see KTable#outerJoin(KTable, ValueJoiner)
4242
*/
43+
@FunctionalInterface
4344
public interface ValueJoiner<V1, V2, VR> {
4445

4546
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoinerWithKey.java

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
4545
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey, Named)
4646
*/
47+
@FunctionalInterface
4748
public interface ValueJoinerWithKey<K1, V1, V2, VR> {
4849

4950
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
* @see KTable#mapValues(ValueMapper)
3838
* @see KTable#mapValues(ValueMapperWithKey)
3939
*/
40+
@FunctionalInterface
4041
public interface ValueMapper<V, VR> {
4142

4243
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @see KTable#mapValues(ValueMapperWithKey)
3939
*/
4040

41+
@FunctionalInterface
4142
public interface ValueMapperWithKey<K, V, VR> {
4243

4344
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
3737
*/
3838
@Deprecated
39+
@FunctionalInterface
3940
public interface ValueTransformerSupplier<V, VR> extends ConnectedStoreProvider {
4041

4142
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* @see Transformer
3636
* @see TransformerSupplier
3737
*/
38+
@FunctionalInterface
3839
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> {
3940

4041
/**

streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* It is expected that implementations of this class will not call the {@link StateRestoreCallback#restore(byte[],
2929
* byte[])} method.
3030
*/
31+
@FunctionalInterface
3132
public interface BatchingStateRestoreCallback extends StateRestoreCallback {
3233

3334
/**

streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
*
2424
* @see Punctuator
2525
*/
26+
@FunctionalInterface
2627
public interface Cancellable {
2728

2829
/**

streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*
2727
* @see Cancellable
2828
*/
29+
@FunctionalInterface
2930
public interface Punctuator {
3031

3132
/**

streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Restoration logic for log-backed state stores upon restart,
2121
* it takes one record at a time from the logs to apply to the restoring state.
2222
*/
23+
@FunctionalInterface
2324
public interface StateRestoreCallback {
2425

2526
void restore(byte[] key, byte[] value);

streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
5151
* @see Topology#addSink(String, String, StreamPartitioner, String...)
5252
*/
53+
@FunctionalInterface
5354
public interface StreamPartitioner<K, V> {
5455

5556
/**

streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord}.
2424
* The extracted timestamp is defined as milliseconds.
2525
*/
26+
@FunctionalInterface
2627
public interface TimestampExtractor {
2728

2829
/**

streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
/**
2020
* An interface that allows to dynamically determine the name of the Kafka topic to send at the sink node of the topology.
2121
*/
22+
@FunctionalInterface
2223
public interface TopicNameExtractor<K, V> {
2324

2425
/**

streams/src/main/java/org/apache/kafka/streams/processor/api/FixedKeyProcessor.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* @param <VIn> the type of input values
3030
* @param <VOut> the type of output values
3131
*/
32+
@FunctionalInterface
3233
public interface FixedKeyProcessor<KIn, VIn, VOut> {
3334

3435
/**

streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @param <KOut> the type of output keys
3131
* @param <VOut> the type of output values
3232
*/
33+
@FunctionalInterface
3334
public interface Processor<KIn, VIn, KOut, VOut> {
3435

3536
/**

streams/src/main/java/org/apache/kafka/streams/processor/api/WrappedFixedKeyProcessorSupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
2525
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
2626
*/
27+
@FunctionalInterface
2728
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> {
2829

2930
}

streams/src/main/java/org/apache/kafka/streams/processor/api/WrappedProcessorSupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
2525
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
2626
*/
27+
@FunctionalInterface
2728
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> {
2829

2930
}

0 commit comments

Comments
 (0)