Skip to content

KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in StreamsConfig #18297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
<li> See <a href="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a> and
<a href="https://cwiki.apache.org/confluence/x/8ItyEg">KIP-1050</a> for more details </li>
</ul>
<li>
The <code>window.size.ms</code> and <code>window.inner.serde.class</code> in stream config are deprecated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be in the 4.0 section.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The <code>window.size.ms</code> and <code>window.inner.serde.class</code> in stream config are deprecated.
The <code>window.size.ms</code> and <code>window.inner.serde.class</code> in `StreamsConfig` are deprecated. Use the corresponding string constants defined in `TimeWindowedSerializer`, `TimeWindowedDeserializer`, `SessionWindowedSerializer` and `SessionWindowedDeserializer` instead.

Something like that? Could also be the actual string constants. But it's good to make these notes actionable.

</li>
</ul>
</li>
</ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,9 +1065,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
Expand All @@ -1089,9 +1092,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
Expand Down Expand Up @@ -1123,7 +1129,7 @@ private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator,
"--property", "key.deserializer." + StreamsConfig.WINDOWED_INNER_CLASS_SERDE + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer.window.size.ms=500",
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;

Expand Down Expand Up @@ -262,7 +263,7 @@ private <K, V> boolean processKeyValueAndVerifyCount(
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);


final List<KeyValueTimestamp<K, V>> actual =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
Expand Down
23 changes: 21 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
Expand Down Expand Up @@ -823,13 +827,28 @@ public class StreamsConfig extends AbstractConfig {
+ CONFIG_ERROR_MSG
+ "\"NO_OPTIMIZATION\" by default.";

/** {@code windowed.inner.class.serde} */
/**
* {@code windowed.inner.class.serde}
*
* @deprecated since 4.0.0.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.1.0

* Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link TimeWindowedSerializer}.
* Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link TimeWindowedDeserializer}.
* Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link SessionWindowedSerializer}.
* Use {@link SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link SessionWindowedDeserializer}.
*/
@Deprecated
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
"in an error as it is meant to be used only from Plain consumer client.";

/** {@code window.size.ms} */
/**
* {@code window.size.ms}
*
* @deprecated since 4.0.0.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.1.0

* Use {@link TimeWindowedDeserializer#WINDOW_SIZE_MS_CONFIG} for {@link TimeWindowedDeserializer}.
*/
@Deprecated
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.SessionKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>> {

/**
* Configuration key for the windowed inner deserializer class.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put any more information into this (and the other) java doc comments about the constants? Since this is a public string constant now, the use of the constant should ideally be clear from the javadoc comments.

*/
public static final String WINDOWED_INNER_DESERIALIZER_CLASS = "windowed.inner.deserializer.class";

private final Logger log = LoggerFactory.getLogger(SessionWindowedDeserializer.class);

private Deserializer<T> inner;

// Default constructor needed by Kafka
Expand All @@ -36,34 +46,43 @@ public SessionWindowedDeserializer(final Deserializer<T> inner) {
this.inner = inner;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);

Serde<T> windowInnerClassSerde = null;
String deserializerConfigFrom = WINDOWED_INNER_DESERIALIZER_CLASS;
String windowedInnerDeserializerClassConfig = (String) configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
if (windowedInnerDeserializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
deserializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerDeserializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_DESERIALIZER_CLASS);
}
}

if (windowedInnerClassSerdeConfig != null) {
Serde<T> windowedInnerDeserializerClass = null;
if (windowedInnerDeserializerClassConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
windowedInnerDeserializerClass = Utils.newInstance(windowedInnerDeserializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
throw new ConfigException(deserializerConfigFrom, windowedInnerDeserializerClassConfig,
"Serde class " + windowedInnerDeserializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName())) {
if (inner != null && windowedInnerDeserializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class deserializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.deserializer().getClass().getName() + ").");
" is different from the one set in " + deserializerConfigFrom + " config " +
"(" + windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
} else if (inner == null && windowedInnerDeserializerClassConfig == null) {
throw new IllegalArgumentException("Inner class deserializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
"or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowInnerClassSerde.deserializer();
inner = windowedInnerDeserializerClass.deserializer();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,20 @@
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.SessionKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {

/**
* Configuration key for the windowed inner serializer class.
*/
public static final String WINDOWED_INNER_SERIALIZER_CLASS = "windowed.inner.serializer.class";

private final Logger log = LoggerFactory.getLogger(SessionWindowedSerializer.class);

private Serializer<T> inner;

// Default constructor needed by Kafka
Expand All @@ -37,32 +47,42 @@ public SessionWindowedSerializer(final Serializer<T> inner) {
this.inner = inner;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
Serde<T> windowInnerClassSerde = null;
if (windowedInnerClassSerdeConfig != null) {
String serializerConfigFrom = WINDOWED_INNER_SERIALIZER_CLASS;
String windowedInnerSerializerClassConfig = (String) configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
if (windowedInnerSerializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
serializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerSerializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_SERIALIZER_CLASS);
}
}
Serde<T> windowedInnerSerializerClass = null;
if (windowedInnerSerializerClassConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
windowedInnerSerializerClass = Utils.newInstance(windowedInnerSerializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
throw new ConfigException(serializerConfigFrom, windowedInnerSerializerClassConfig,
"Serde class " + windowedInnerSerializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName())) {
if (inner != null && windowedInnerSerializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class serializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.serializer().getClass().getName() + ").");
" is different from the one set in " + serializerConfigFrom + " config " +
"(" + windowedInnerSerializerClass.serializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
} else if (inner == null && windowedInnerSerializerClassConfig == null) {
throw new IllegalArgumentException("Inner class serializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
"or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowInnerClassSerde.serializer();
inner = windowedInnerSerializerClass.serializer();
}

@Override
Expand Down
Loading