Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Unable to start processor when using KTable as output #1085

Closed
selcuksert opened this issue May 31, 2021 · 4 comments
Closed

Unable to start processor when using KTable as output #1085

selcuksert opened this issue May 31, 2021 · 4 comments
Assignees

Comments

@selcuksert
Copy link

selcuksert commented May 31, 2021

Describe the issue

For a BiFunction processor (also reproducible for Function processors) that outputs KTable, the processor bean cannot be initialized due to class cast exception:


Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.KTableImpl cannot be cast to class org.apache.kafka.streams.kstream.KStream (org.apache.kafka.streams.kstream.internals.KTableImpl and org.apache.kafka.streams.kstream.KStream are in unnamed module of loader 'app')
	at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor.setupFunctionInvokerForKafkaStreams(KafkaStreamsFunctionProcessor.java:322) ~[spring-cloud-stream-binder-kafka-streams-3.1.3.jar:3.1.3]

Version of the framework

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>2020.0.3</version>
</dependency>

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        <version>3.1.3</version>
</dependency>

Expected behavior
The processor should start. It was working with org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.0.12.RELEASE

Additional context

The processor snippet:

  @Bean
  public BiFunction<KStream<String, Task>, GlobalKTable<String, User>, KTable<String, DetailedTask>> detail() {
      return (taskStream, userTable) ->
              taskStream.leftJoin(userTable, (key, task) -> task.getUserid(), (task, user) -> {
                  DetailedTask dt = new DetailedTask();
                  dt.setFirstname(user.getFirstname());
                  dt.setLastname(user.getLastname());
                  dt.setDetails(task.getDetails());
                  dt.setDuedate(task.getDuedate());
                  dt.setId(task.getId());
                  dt.setTitle(task.getTitle());
                  dt.setStatus(task.getStatus());
                  return dt;
              }).transformValues(TaskHeaderTransformer::new).toTable(Materialized.as(detailTable));

    @Bean
    public Function<KStream<String, Task>, KTable<String, Long>> activity() {
        return (taskStream) -> {
            KTable<String, Long> taskCount = taskStream
                    .filter((taskId, task) -> task != null)
                    .map((taskId, task) -> KeyValue.pair(task.getUserid(), task))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(Duration.ofHours(1)))
                    .count().toStream().map((userId, task) -> KeyValue.pair(userId.key(), task))
                    .toTable(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(countTable)
                            .withKeySerde(Serdes.String())
                            .withValueSerde(Serdes.Long()));

            if (log.isDebugEnabled()) {
                taskCount.toStream().peek((k, v) -> log.debug("Task Count: {}->{}", k, v));
            }

            return taskCount;
        };
    }

Configuration:

spring:
  cloud:
    stream:
      function:
        definition: detail;activity
      bindings:
        detail-in-0:
          destination: tasks
          consumer:
            use-native-decoding: true
        detail-in-1:
          destination: users
          consumer:
            use-native-decoding: true
            materialized-as: user-table
        activity-in-0:
          destination: tasks
          consumer:
            use-native-decoding: true

As the result was casted with KStream, following snippet causes ClassCastException for output bindings of KTable:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/9e4a1075d437f2e6aa7dad0a38d8286de94234a4/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java#L322

Is there a new/known limitation on using output binding of KTable with the new version of binder?

@sobychacko
Copy link
Contributor

@selcuksert At the moment, the Kafka Streams binder does not support table types on the outbound? Is there a compelling reason for this requirement? This can easily be addressed by changing the output type as KStream and calling toStream on your current return value. In either case (KTable or KStream), the end result is the same, i.e the record being sent to a destination Kafka topic. If you can elaborate more on your use case and if it is necessary, we can consider adding this as an enhancement to the binder.

@selcuksert
Copy link
Author

@sobychacko Thanks for the feedback. Actually the aim is to store data using KTable state store as the index space is large and query it utilizing InteractiveQueryService in a performant way. Additionally, the same key get several updates in our data flow and we are just dealing with the last state, which in fact applies to the update logic of a changelog stream rather that the insert logic of a record stream. With the log compaction is in effect, there will also be an option to effectively use the storage area of Kafka nodes. That is why the output is just streamed through the KTable. Hope, this provides a more clear context.

Per your suggestion I appended toStream() to processor and the service is activated without any error. IMHO, it would be better to highlight this limitation in official documentation or fire an exception with more informative message rather than ClassCastException.

In any case, I really appreciate your efforts and enabling the capability of Kafka Streams utilization for Spring Framework ecosystem.

@sobychacko
Copy link
Contributor

@selcuksert That context is helpful, especially around the log compacted topics and conserving storage on the broker. I will keep this issue open to thinking through this. Maybe we can indeed provide this as an enhancement in the binder. Thank you!

sobychacko added a commit to sobychacko/spring-cloud-stream-binder-kafka that referenced this issue Jul 15, 2021
At the moment, Kafka Streams binder only allows KStream bindings on the outbound.
There is a delegation mechanism in which we stil can use KStream for output binding
while allowing the applications to provide a KTable type as the function return type.

Update docs.

Resolves spring-attic#1085
@olegz olegz closed this as completed in 54ac274 Jul 16, 2021
olegz pushed a commit that referenced this issue Jul 16, 2021
At the moment, Kafka Streams binder only allows KStream bindings on the outbound.
There is a delegation mechanism in which we stil can use KStream for output binding
while allowing the applications to provide a KTable type as the function return type.

Update docs.

Resolves #1085

Resolves #1105
@sobychacko
Copy link
Contributor

sobychacko commented Jul 16, 2021

@selcuksert We added the ability to have KTable as return types on the functions. See the relevant commit mentioned above. With this, you can define a function with KTable as the return signature and the binder will convert that as a KStream and send the record to Kafka. It will be part of the next 3.1.x release.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

Successfully merging a pull request may close this issue.

2 participants