Skip to content

Commit d4cb992

Browse files
committed
spring-atticGH-1085: Allow KTable binding on the outbound
At the moment, Kafka Streams binder only allows KStream bindings on the outbound. There is a delegation mechanism in which we stil can use KStream for output binding while allowing the applications to provide a KTable type as the function return type. Update docs. Resolves spring-attic#1085
1 parent 80b707e commit d4cb992

File tree

4 files changed

+46
-17
lines changed

4 files changed

+46
-17
lines changed

docs/src/main/asciidoc/kafka-streams.adoc

+24-7
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,28 @@ The input from the three partial functions which are `KStream`, `GlobalKTable`,
252252
Input bindings are named as `enrichOrder-in-0`, `enrichOrder-in-1` and `enrichOrder-in-2` respectively. Output binding is named as `enrichOrder-out-0`.
253253

254254
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code.
255-
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
255+
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
256+
257+
===== Output Bindings
258+
259+
Kafka Streams binder allows types of either `KStream` or `KTable` as output bindings.
260+
Behind the scenes, the binder uses the `to` method on `KStream` to send the resultant records to the output topic.
261+
If the application provides a `KTable` as output in the function, the binder still uses this technique by delegating to the `to` method of `KStream`.
262+
263+
For example both functions below will work:
264+
265+
```
266+
@Bean
267+
public Function<KStream<String, String>, KTable<String, String>> foo() {
268+
return KStream::toTable;
269+
};
270+
}
271+
272+
@Bean
273+
public Function<KTable<String, String>, KStream<String, String>> bar() {
274+
return KTable::toStream;
275+
}
276+
```
256277

257278
===== Multiple Output Bindings
258279

@@ -383,8 +404,7 @@ The default output binding for this example becomes `curriedFoobar-out-0`.
383404

384405
====== Special note on using `KTable` as output in function composition
385406

386-
When using function composition, for intermediate functions, you can use `KTable` as output.
387-
For instance, lets say you have the following two functions.
407+
Lets say you have the following two functions.
388408

389409
```
390410
@Bean
@@ -399,10 +419,7 @@ public Function<KTable<String, String>, KStream<String, String>> bar() {
399419
}
400420
```
401421

402-
You can compose them as `foo|bar` although foo's output is `KTable`.
403-
In normal case, when you use `foo` as standalone, this will not work, as the binder does not support `KTable` as the final output.
404-
Note that in the example above, bar's output is still a `KStream`.
405-
We are only able to use `foo` which has a `KTable` output, since we are composing with another function that has `KStream` as its output.
422+
You can compose them as `foo|bar`, but keep in mind that the second function (`bar` in this case) must have a `KTable` as input since the first function (`foo`) has `KTable` as output.
406423

407424
==== Imperative programming model.
408425

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinder.java

-5
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,10 @@
4444
* @author Soby Chacko
4545
*/
4646
class KTableBinder extends
47-
// @checkstyle:off
4847
AbstractBinder<KTable<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
4948
implements
5049
ExtendedPropertiesBinder<KTable<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
5150

52-
// @checkstyle:on
53-
5451
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
5552

5653
private final KafkaTopicProvisioner kafkaTopicProvisioner;
@@ -111,9 +108,7 @@ public synchronized void stop() {
111108
@Override
112109
protected Binding<KTable<Object, Object>> doBindProducer(String name,
113110
KTable<Object, Object> outboundBindTarget,
114-
// @checkstyle:off
115111
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
116-
// @checkstyle:on
117112
throw new UnsupportedOperationException(
118113
"No producer level binding is allowed for KTable");
119114
}

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.streams.StreamsConfig;
4040
import org.apache.kafka.streams.Topology;
4141
import org.apache.kafka.streams.kstream.KStream;
42+
import org.apache.kafka.streams.kstream.KTable;
4243

4344
import org.springframework.beans.BeansException;
4445
import org.springframework.beans.factory.BeanFactory;
@@ -298,7 +299,13 @@ else if (Function.class.isAssignableFrom(bean.getClass()) || BiFunction.class.is
298299
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
299300
}
300301
else {
301-
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
302+
if (KTable.class.isAssignableFrom(result.getClass())) {
303+
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
304+
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
305+
}
306+
else {
307+
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
308+
}
302309
}
303310
}
304311
}
@@ -337,8 +344,14 @@ else if (Function.class.isAssignableFrom(bean.getClass()) || BiFunction.class.is
337344
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
338345
}
339346
else {
340-
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
341-
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
347+
if (KTable.class.isAssignableFrom(result.getClass())) {
348+
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
349+
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
350+
}
351+
else {
352+
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
353+
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
354+
}
342355
}
343356
}
344357
}

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ public void afterPropertiesSet() {
145145
}
146146

147147
if (outboundArgument != null && outboundArgument.getRawClass() != null && (!outboundArgument.isArray() &&
148-
outboundArgument.getRawClass().isAssignableFrom(KStream.class))) {
148+
(outboundArgument.getRawClass().isAssignableFrom(KStream.class) ||
149+
outboundArgument.getRawClass().isAssignableFrom(KTable.class)))) { //Allowing both KStream and KTable on the outbound.
149150
// if the type is array, we need to do a late binding as we don't know the number of
150151
// output bindings at this point in the flow.
151152

@@ -157,12 +158,15 @@ public void afterPropertiesSet() {
157158
if (outputBindingsIter.hasNext()) {
158159
outputBinding = outputBindingsIter.next();
159160
}
160-
161161
}
162162
else {
163163
outputBinding = String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX);
164164
}
165165
Assert.isTrue(outputBinding != null, "output binding is not inferred.");
166+
// We will only allow KStream targets on the outbound. If the user provides a KTable,
167+
// we still use the KStreamBinder to send it through the outbound.
168+
// In that case before sending, we do a cast from KTable to KStream.
169+
// See KafkaStreamsFunctionsProcessor#setupFunctionInvokerForKafkaStreams for details.
166170
KafkaStreamsBindableProxyFactory.this.outputHolders.put(outputBinding,
167171
new BoundTargetHolder(getBindingTargetFactory(KStream.class)
168172
.createOutput(outputBinding), true));

0 commit comments

Comments
 (0)