Skip to content

Commit 926b1cf

Browse files
committed
KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in StreamsConfig
Signed-off-by: PoAn Yang <[email protected]>
1 parent b4be178 commit 926b1cf

14 files changed

+389
-119
lines changed

docs/upgrade.html

+3
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
253253
been updated to enable the replication of topics that appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
254254
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics">KIP-1074</a> for more details.
255255
</li>
256+
<li>
257+
The <code>window.size.ms</code> and <code>window.inner.serde.class</code> in stream config are deprecated.
258+
</li>
256259
</ul>
257260
</li>
258261
</ul>

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -1061,9 +1061,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<
10611061
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
10621062
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
10631063
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
1064-
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
1065-
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
1066-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
1064+
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
1065+
if (keyDeserializer instanceof TimeWindowedDeserializer) {
1066+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
1067+
Serdes.serdeFrom(innerClass).getClass().getName());
1068+
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
1069+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
10671070
Serdes.serdeFrom(innerClass).getClass().getName());
10681071
}
10691072
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1085,9 +1088,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
10851088
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
10861089
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
10871090
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
1088-
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
1089-
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
1090-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
1091+
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
1092+
if (keyDeserializer instanceof TimeWindowedDeserializer) {
1093+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
1094+
Serdes.serdeFrom(innerClass).getClass().getName());
1095+
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
1096+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
10911097
Serdes.serdeFrom(innerClass).getClass().getName());
10921098
}
10931099
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1119,7 +1125,7 @@ private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
11191125
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
11201126
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
11211127
"--property", "key.separator=" + keySeparator,
1122-
"--property", "key.deserializer." + StreamsConfig.WINDOWED_INNER_CLASS_SERDE + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
1128+
"--property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
11231129
"--property", "key.deserializer.window.size.ms=500",
11241130
};
11251131

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.streams.kstream.JoinWindows;
3333
import org.apache.kafka.streams.kstream.KStream;
3434
import org.apache.kafka.streams.kstream.Produced;
35+
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
3536
import org.apache.kafka.streams.kstream.ValueJoiner;
3637
import org.apache.kafka.test.TestUtils;
3738

@@ -262,7 +263,7 @@ private <K, V> boolean processKeyValueAndVerifyCount(
262263
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
263264
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
264265
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
265-
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
266+
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
266267

267268

268269
final List<KeyValueTimestamp<K, V>> actual =

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
444444
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
445445
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
446446
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
447-
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
448-
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
449-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
447+
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
448+
if (keyDeserializer instanceof TimeWindowedDeserializer) {
449+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
450+
Serdes.serdeFrom(innerClass).getClass().getName());
451+
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
452+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
450453
Serdes.serdeFrom(innerClass).getClass().getName());
451454
}
452455
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
478478
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
479479
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
480480
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
481-
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
482-
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
483-
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
481+
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
482+
if (keyDeserializer instanceof TimeWindowedDeserializer) {
483+
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
484+
Serdes.serdeFrom(innerClass).getClass().getName());
485+
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
486+
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
484487
Serdes.serdeFrom(innerClass).getClass().getName());
485488
}
486489
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
import org.apache.kafka.streams.errors.StreamsException;
4646
import org.apache.kafka.streams.internals.StreamsConfigUtils;
4747
import org.apache.kafka.streams.internals.UpgradeFromValues;
48+
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
49+
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
50+
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
51+
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
4852
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
4953
import org.apache.kafka.streams.processor.TimestampExtractor;
5054
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
@@ -815,13 +819,28 @@ public class StreamsConfig extends AbstractConfig {
815819
+ CONFIG_ERROR_MSG
816820
+ "\"NO_OPTIMIZATION\" by default.";
817821

818-
/** {@code windowed.inner.class.serde} */
822+
/**
823+
* {@code windowed.inner.class.serde}
824+
*
825+
* @deprecated since 4.0.0.
826+
* Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link TimeWindowedSerializer}.
827+
* Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link TimeWindowedDeserializer}.
828+
* Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link SessionWindowedSerializer}.
829+
* Use {@link SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link SessionWindowedDeserializer}.
830+
*/
831+
@Deprecated
819832
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
820833
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
821834
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
822835
"in an error as it is meant to be used only from Plain consumer client.";
823836

824-
/** {@code window.size.ms} */
837+
/**
838+
* {@code window.size.ms}
839+
*
840+
* @deprecated since 4.0.0.
841+
* Use {@link TimeWindowedDeserializer#WINDOW_SIZE_MS_CONFIG} for {@link TimeWindowedDeserializer}.
842+
*/
843+
@Deprecated
825844
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
826845
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";
827846

streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java

+34-15
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,20 @@
2323
import org.apache.kafka.streams.StreamsConfig;
2424
import org.apache.kafka.streams.state.internals.SessionKeySchema;
2525

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
2629
import java.util.Map;
2730

2831
public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
2932

33+
/**
34+
* Configuration key for the windowed inner deserializer class.
35+
*/
36+
public static final String WINDOWED_INNER_DESERIALIZER_CLASS = "windowed.inner.deserializer.class";
37+
38+
private final Logger log = LoggerFactory.getLogger(SessionWindowedDeserializer.class);
39+
3040
private Deserializer<T> inner;
3141

3242
// Default constructor needed by Kafka
@@ -36,34 +46,43 @@ public SessionWindowedDeserializer(final Deserializer<T> inner) {
3646
this.inner = inner;
3747
}
3848

39-
@SuppressWarnings("unchecked")
49+
@SuppressWarnings({"deprecation", "unchecked"})
4050
@Override
4151
public void configure(final Map<String, ?> configs, final boolean isKey) {
42-
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
43-
44-
Serde<T> windowInnerClassSerde = null;
52+
String deserializerConfigFrom = WINDOWED_INNER_DESERIALIZER_CLASS;
53+
String windowedInnerDeserializerClassConfig = (String) configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
54+
if (windowedInnerDeserializerClassConfig == null) {
55+
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
56+
if (windowedInnerClassSerdeConfig != null) {
57+
deserializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
58+
windowedInnerDeserializerClassConfig = windowedInnerClassSerdeConfig;
59+
log.warn("Config {} is deprecated. Please use {} instead.",
60+
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_DESERIALIZER_CLASS);
61+
}
62+
}
4563

46-
if (windowedInnerClassSerdeConfig != null) {
64+
Serde<T> windowedInnerDeserializerClass = null;
65+
if (windowedInnerDeserializerClassConfig != null) {
4766
try {
48-
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
67+
windowedInnerDeserializerClass = Utils.newInstance(windowedInnerDeserializerClassConfig, Serde.class);
4968
} catch (final ClassNotFoundException e) {
50-
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
51-
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
69+
throw new ConfigException(deserializerConfigFrom, windowedInnerDeserializerClassConfig,
70+
"Serde class " + windowedInnerDeserializerClassConfig + " could not be found.");
5271
}
5372
}
5473

55-
if (inner != null && windowedInnerClassSerdeConfig != null) {
56-
if (!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName())) {
74+
if (inner != null && windowedInnerDeserializerClassConfig != null) {
75+
if (!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName())) {
5776
throw new IllegalArgumentException("Inner class deserializer set using constructor "
5877
+ "(" + inner.getClass().getName() + ")" +
59-
" is different from the one set in windowed.inner.class.serde config " +
60-
"(" + windowInnerClassSerde.deserializer().getClass().getName() + ").");
78+
" is different from the one set in " + deserializerConfigFrom + " config " +
79+
"(" + windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
6180
}
62-
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
81+
} else if (inner == null && windowedInnerDeserializerClassConfig == null) {
6382
throw new IllegalArgumentException("Inner class deserializer should be set either via constructor " +
64-
"or via the windowed.inner.class.serde config");
83+
"or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
6584
} else if (inner == null)
66-
inner = windowInnerClassSerde.deserializer();
85+
inner = windowedInnerDeserializerClass.deserializer();
6786
}
6887

6988
@Override

streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java

+34-14
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,20 @@
2424
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
2525
import org.apache.kafka.streams.state.internals.SessionKeySchema;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import java.util.Map;
2831

2932
public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
3033

34+
/**
35+
* Configuration key for the windowed inner serializer class.
36+
*/
37+
public static final String WINDOWED_INNER_SERIALIZER_CLASS = "windowed.inner.serializer.class";
38+
39+
private final Logger log = LoggerFactory.getLogger(SessionWindowedSerializer.class);
40+
3141
private Serializer<T> inner;
3242

3343
// Default constructor needed by Kafka
@@ -37,32 +47,42 @@ public SessionWindowedSerializer(final Serializer<T> inner) {
3747
this.inner = inner;
3848
}
3949

40-
@SuppressWarnings("unchecked")
50+
@SuppressWarnings({"deprecation", "unchecked"})
4151
@Override
4252
public void configure(final Map<String, ?> configs, final boolean isKey) {
43-
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
44-
Serde<T> windowInnerClassSerde = null;
45-
if (windowedInnerClassSerdeConfig != null) {
53+
String serializerConfigFrom = WINDOWED_INNER_SERIALIZER_CLASS;
54+
String windowedInnerSerializerClassConfig = (String) configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
55+
if (windowedInnerSerializerClassConfig == null) {
56+
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
57+
if (windowedInnerClassSerdeConfig != null) {
58+
serializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
59+
windowedInnerSerializerClassConfig = windowedInnerClassSerdeConfig;
60+
log.warn("Config {} is deprecated. Please use {} instead.",
61+
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_SERIALIZER_CLASS);
62+
}
63+
}
64+
Serde<T> windowedInnerSerializerClass = null;
65+
if (windowedInnerSerializerClassConfig != null) {
4666
try {
47-
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
67+
windowedInnerSerializerClass = Utils.newInstance(windowedInnerSerializerClassConfig, Serde.class);
4868
} catch (final ClassNotFoundException e) {
49-
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
50-
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
69+
throw new ConfigException(serializerConfigFrom, windowedInnerSerializerClassConfig,
70+
"Serde class " + windowedInnerSerializerClassConfig + " could not be found.");
5171
}
5272
}
5373

54-
if (inner != null && windowedInnerClassSerdeConfig != null) {
55-
if (!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName())) {
74+
if (inner != null && windowedInnerSerializerClassConfig != null) {
75+
if (!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName())) {
5676
throw new IllegalArgumentException("Inner class serializer set using constructor "
5777
+ "(" + inner.getClass().getName() + ")" +
58-
" is different from the one set in windowed.inner.class.serde config " +
59-
"(" + windowInnerClassSerde.serializer().getClass().getName() + ").");
78+
" is different from the one set in " + serializerConfigFrom + " config " +
79+
"(" + windowedInnerSerializerClass.serializer().getClass().getName() + ").");
6080
}
61-
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
81+
} else if (inner == null && windowedInnerSerializerClassConfig == null) {
6282
throw new IllegalArgumentException("Inner class serializer should be set either via constructor " +
63-
"or via the windowed.inner.class.serde config");
83+
"or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
6484
} else if (inner == null)
65-
inner = windowInnerClassSerde.serializer();
85+
inner = windowedInnerSerializerClass.serializer();
6686
}
6787

6888
@Override

0 commit comments

Comments
 (0)