Skip to content

Commit 546eb0b

Browse files
authored
spring-cloudGH-265: Add Kafka Supplier & Source (spring-cloud#474)
* Also add support for nested config props for `kafka-sink` Fixes spring-cloud#265
1 parent 6291e30 commit 546eb0b

File tree

24 files changed

+665
-2
lines changed

24 files changed

+665
-2
lines changed

Diff for: applications/sink/kafka-sink/README.adoc

+14
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ $$use-template-converter$$:: $$Whether to use the template's message converter t
3333
$$bootstrap-servers$$:: $$Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.$$ *($$List<String>$$, default: `$$<none>$$`)*
3434
$$client-id$$:: $$ID to pass to the server when making requests. Used for server-side logging.$$ *($$String$$, default: `$$<none>$$`)*
3535
$$properties$$:: $$Additional properties, common to producers and consumers, used to configure the client.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
36+
37+
=== spring.kafka.producer
38+
39+
$$acks$$:: $$Number of acknowledgments the producer requires the leader to have received before considering a request complete.$$ *($$String$$, default: `$$<none>$$`)*
40+
$$batch-size$$:: $$Default batch size. A small batch size will make batching less common and may reduce throughput (a batch size of zero disables batching entirely).$$ *($$DataSize$$, default: `$$<none>$$`)*
41+
$$bootstrap-servers$$:: $$Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers.$$ *($$List<String>$$, default: `$$<none>$$`)*
42+
$$buffer-memory$$:: $$Total memory size the producer can use to buffer records waiting to be sent to the server.$$ *($$DataSize$$, default: `$$<none>$$`)*
43+
$$client-id$$:: $$ID to pass to the server when making requests. Used for server-side logging.$$ *($$String$$, default: `$$<none>$$`)*
44+
$$compression-type$$:: $$Compression type for all data generated by the producer.$$ *($$String$$, default: `$$<none>$$`)*
45+
$$key-serializer$$:: $$Serializer class for keys.$$ *($$Class<?>$$, default: `$$<none>$$`)*
46+
$$properties$$:: $$Additional producer-specific properties used to configure the client.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
47+
$$retries$$:: $$When greater than zero, enables retrying of failed sends.$$ *($$Integer$$, default: `$$<none>$$`)*
48+
$$transaction-id-prefix$$:: $$When non empty, enables transaction support for producer.$$ *($$String$$, default: `$$<none>$$`)*
49+
$$value-serializer$$:: $$Serializer class for values.$$ *($$Class<?>$$, default: `$$<none>$$`)*
3650
//end::configuration-properties[]
3751

3852
//end::ref-doc[]
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
configuration-properties.classes=org.springframework.cloud.fn.consumer.kafka.KafkaPublisherProperties, \
2-
org.springframework.boot.autoconfigure.kafka.KafkaProperties
2+
org.springframework.boot.autoconfigure.kafka.KafkaProperties, \
3+
org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer

Diff for: applications/source/kafka-source/README.adoc

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//tag::ref-doc[]
2+
= Apache Kafka Source
3+
4+
This module consumes messages from Apache Kafka.
5+
6+
== Options
7+
8+
The **$$kafka$$** $$source$$ has the following options:
9+
10+
(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)
11+
12+
//tag::configuration-properties[]
13+
Properties grouped by prefix:
14+
15+
16+
=== kafka.supplier
17+
18+
$$ack-discarded$$:: $$Whether to acknowledge discarded records after 'RecordFilterStrategy'.$$ *($$Boolean$$, default: `$$<none>$$`)*
19+
$$record-filter$$:: $$SpEL expression for 'RecordFilterStrategy' with a 'ConsumerRecord' as a root object.$$ *($$Expression$$, default: `$$<none>$$`)*
20+
$$topic-pattern$$:: $$Apache Kafka topics pattern to subscribe.$$ *($$Pattern$$, default: `$$<none>$$`)*
21+
$$topics$$:: $$Apache Kafka topics to subscribe.$$ *($$String[]$$, default: `$$<none>$$`)*
22+
23+
=== spring.kafka
24+
25+
$$bootstrap-servers$$:: $$Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.$$ *($$List<String>$$, default: `$$<none>$$`)*
26+
$$client-id$$:: $$ID to pass to the server when making requests. Used for server-side logging.$$ *($$String$$, default: `$$<none>$$`)*
27+
$$properties$$:: $$Additional properties, common to producers and consumers, used to configure the client.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
28+
29+
=== spring.kafka.consumer
30+
31+
$$auto-commit-interval$$:: $$Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.$$ *($$Duration$$, default: `$$<none>$$`)*
32+
$$auto-offset-reset$$:: $$What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.$$ *($$String$$, default: `$$<none>$$`)*
33+
$$bootstrap-servers$$:: $$Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.$$ *($$List<String>$$, default: `$$<none>$$`)*
34+
$$client-id$$:: $$ID to pass to the server when making requests. Used for server-side logging.$$ *($$String$$, default: `$$<none>$$`)*
35+
$$enable-auto-commit$$:: $$Whether the consumer's offset is periodically committed in the background.$$ *($$Boolean$$, default: `$$<none>$$`)*
36+
$$fetch-max-wait$$:: $$Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size".$$ *($$Duration$$, default: `$$<none>$$`)*
37+
$$fetch-min-size$$:: $$Minimum amount of data the server should return for a fetch request.$$ *($$DataSize$$, default: `$$<none>$$`)*
38+
$$group-id$$:: $$Unique string that identifies the consumer group to which this consumer belongs.$$ *($$String$$, default: `$$<none>$$`)*
39+
$$heartbeat-interval$$:: $$Expected time between heartbeats to the consumer coordinator.$$ *($$Duration$$, default: `$$<none>$$`)*
40+
$$isolation-level$$:: $$Isolation level for reading messages that have been written transactionally.$$ *($$IsolationLevel$$, default: `$$read-uncommitted$$`, possible values: `READ_UNCOMMITTED`,`READ_COMMITTED`)*
41+
$$key-deserializer$$:: $$Deserializer class for keys.$$ *($$Class<?>$$, default: `$$<none>$$`)*
42+
$$max-poll-records$$:: $$Maximum number of records returned in a single call to poll().$$ *($$Integer$$, default: `$$<none>$$`)*
43+
$$properties$$:: $$Additional consumer-specific properties used to configure the client.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
44+
$$value-deserializer$$:: $$Deserializer class for values.$$ *($$Class<?>$$, default: `$$<none>$$`)*
45+
46+
=== spring.kafka.listener
47+
48+
$$ack-count$$:: $$Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".$$ *($$Integer$$, default: `$$<none>$$`)*
49+
$$ack-mode$$:: $$Listener AckMode. See the spring-kafka documentation.$$ *($$AckMode$$, default: `$$<none>$$`, possible values: `RECORD`,`BATCH`,`TIME`,`COUNT`,`COUNT_TIME`,`MANUAL`,`MANUAL_IMMEDIATE`)*
50+
$$ack-time$$:: $$Time between offset commits when ackMode is "TIME" or "COUNT_TIME".$$ *($$Duration$$, default: `$$<none>$$`)*
51+
$$async-acks$$:: $$Support for asynchronous record acknowledgements. Only applies when spring.kafka.listener.ack-mode is manual or manual-immediate.$$ *($$Boolean$$, default: `$$<none>$$`)*
52+
$$auto-startup$$:: $$Whether to auto start the container.$$ *($$Boolean$$, default: `$$true$$`)*
53+
$$client-id$$:: $$Prefix for the listener's consumer client.id property.$$ *($$String$$, default: `$$<none>$$`)*
54+
$$concurrency$$:: $$Number of threads to run in the listener containers.$$ *($$Integer$$, default: `$$<none>$$`)*
55+
$$idle-between-polls$$:: $$Sleep interval between Consumer.poll(Duration) calls.$$ *($$Duration$$, default: `$$0$$`)*
56+
$$idle-event-interval$$:: $$Time between publishing idle consumer events (no data received).$$ *($$Duration$$, default: `$$<none>$$`)*
57+
$$idle-partition-event-interval$$:: $$Time between publishing idle partition consumer events (no data received for partition).$$ *($$Duration$$, default: `$$<none>$$`)*
58+
$$immediate-stop$$:: $$Whether the container stops after the current record is processed or after all the records from the previous poll are processed.$$ *($$Boolean$$, default: `$$false$$`)*
59+
$$log-container-config$$:: $$Whether to log the container configuration during initialization (INFO level).$$ *($$Boolean$$, default: `$$<none>$$`)*
60+
$$missing-topics-fatal$$:: $$Whether the container should fail to start if at least one of the configured topics are not present on the broker.$$ *($$Boolean$$, default: `$$false$$`)*
61+
$$monitor-interval$$:: $$Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.$$ *($$Duration$$, default: `$$<none>$$`)*
62+
$$no-poll-threshold$$:: $$Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.$$ *($$Float$$, default: `$$<none>$$`)*
63+
$$poll-timeout$$:: $$Timeout to use when polling the consumer.$$ *($$Duration$$, default: `$$<none>$$`)*
64+
$$type$$:: $$Listener type.$$ *($$Type$$, default: `$$single$$`)*
65+
//end::configuration-properties[]
66+
67+
//end::ref-doc[]

Diff for: applications/source/kafka-source/pom.xml

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
6+
<modelVersion>4.0.0</modelVersion>
7+
8+
<parent>
9+
<artifactId>stream-applications-core</artifactId>
10+
<groupId>org.springframework.cloud.stream.app</groupId>
11+
<version>4.0.0-SNAPSHOT</version>
12+
<relativePath>../../stream-applications-core/pom.xml</relativePath>
13+
</parent>
14+
15+
<artifactId>kafka-source</artifactId>
16+
<name>kafka-source</name>
17+
<description>Apache Kafka Source Apps</description>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.springframework.cloud.fn</groupId>
22+
<artifactId>kafka-supplier</artifactId>
23+
</dependency>
24+
25+
<dependency>
26+
<groupId>org.springframework.kafka</groupId>
27+
<artifactId>spring-kafka-test</artifactId>
28+
<scope>test</scope>
29+
</dependency>
30+
</dependencies>
31+
32+
<build>
33+
<plugins>
34+
<plugin>
35+
<groupId>org.apache.maven.plugins</groupId>
36+
<artifactId>maven-deploy-plugin</artifactId>
37+
<version>3.0.0</version>
38+
<configuration>
39+
<skip>false</skip>
40+
</configuration>
41+
</plugin>
42+
<plugin>
43+
<groupId>org.springframework.cloud</groupId>
44+
<artifactId>spring-cloud-dataflow-apps-docs-plugin</artifactId>
45+
</plugin>
46+
<plugin>
47+
<groupId>org.springframework.cloud</groupId>
48+
<artifactId>spring-cloud-dataflow-apps-generator-plugin</artifactId>
49+
<configuration>
50+
<application>
51+
<name>kafka</name>
52+
<type>source</type>
53+
<version>${project.version}</version>
54+
<configClass>AUTOCONFIGURATION</configClass>
55+
<functionDefinition>kafkaSupplier</functionDefinition>
56+
57+
<maven>
58+
<dependencies>
59+
<dependency>
60+
<groupId>org.springframework.cloud.fn</groupId>
61+
<artifactId>kafka-supplier</artifactId>
62+
</dependency>
63+
</dependencies>
64+
</maven>
65+
</application>
66+
67+
</configuration>
68+
</plugin>
69+
70+
</plugins>
71+
</build>
72+
73+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
configuration-properties.classes=org.springframework.cloud.fn.supplier.kafka.KafkaSupplierProperties, \
2+
org.springframework.boot.autoconfigure.kafka.KafkaProperties, \
3+
org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer, \
4+
org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.app.source.kafka;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.boot.autoconfigure.SpringBootApplication;
23+
import org.springframework.boot.test.context.SpringBootTest;
24+
import org.springframework.cloud.stream.binder.test.OutputDestination;
25+
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
26+
import org.springframework.context.annotation.Import;
27+
import org.springframework.kafka.core.KafkaTemplate;
28+
import org.springframework.kafka.test.context.EmbeddedKafka;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.test.annotation.DirtiesContext;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
/**
35+
* @author Artem Bilan
36+
*
37+
* @since 4.0
38+
*/
39+
@SpringBootTest(properties = {
40+
"kafka.supplier.topics=" + KafkaSourceTests.TEST_TOPIC,
41+
"spring.kafka.consumer.group-id=test-group",
42+
"spring.kafka.consumer.auto-offset-reset=earliest"
43+
})
44+
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
45+
@DirtiesContext
46+
public class KafkaSourceTests {
47+
48+
static final String TEST_TOPIC = "TEST_TOPIC";
49+
50+
@Test
51+
void fromKafkaToTestBinder(
52+
@Autowired KafkaTemplate<Object, Object> kafkaTemplate,
53+
@Autowired OutputDestination outputDestination) {
54+
55+
String testData = "some test data";
56+
kafkaTemplate.send(TEST_TOPIC, testData);
57+
kafkaTemplate.flush();
58+
59+
Message<byte[]> receive = outputDestination.receive(30_000, "kafkaSupplier-out-0");
60+
assertThat(receive).extracting(Message::getPayload).isEqualTo(testData.getBytes());
61+
}
62+
63+
@SpringBootApplication
64+
@Import(TestChannelBinderConfiguration.class)
65+
public static class KafkaSourceTestApplication {
66+
67+
}
68+
69+
}

Diff for: applications/source/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
<module>jdbc-source</module>
1717
<module>jms-source</module>
1818
<module>http-source</module>
19+
<module>kafka-source</module>
1920
<module>load-generator-source</module>
2021
<module>mail-source</module>
2122
<module>mongodb-source</module>

Diff for: functions/consumer/kafka-publisher/README.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ A `Consumer<Message<?>>` that allows to publish messages to Apache Kafka topic.
77

88
The `KafkaPublisherConfiguration` is an auto-configuration, so no need to import anything.
99

10-
The `Consumer<Message<?>> kafkaPublisher` bean can be injection into target service for producing data into Kafka topic.
10+
The `Consumer<Message<?>> kafkaPublisher` bean can be injected into the target service for producing data into Kafka topic.
1111

1212
## Configuration Options
1313

Diff for: functions/function-dependencies/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
<artifactId>jms-supplier</artifactId>
5858
<version>${project.version}</version>
5959
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.cloud.fn</groupId>
62+
<artifactId>kafka-supplier</artifactId>
63+
<version>${project.version}</version>
64+
</dependency>
6065
<dependency>
6166
<groupId>org.springframework.cloud.fn</groupId>
6267
<artifactId>mongodb-supplier</artifactId>

Diff for: functions/supplier/kafka-supplier/README.adoc

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Apache Kafka (Consumer) Supplier
2+
3+
A `Supplier` that allows to consume messages from Apache Kafka topic.
4+
5+
6+
## Beans for injection
7+
8+
The `KafkaSupplierConfiguration` is an auto-configuration, so no need to import anything.
9+
10+
The `Supplier<Flux<Message<?>>> kafkaSupplier` bean can be injected into the target service for consuming data from Kafka topics.
11+
12+
## Configuration Options
13+
14+
All configuration properties are prefixed with `kafka.supplier`.
15+
16+
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/kafka/KafkaSupplierProperties.java[KafkaSupplierProperties].
17+
Also, this artifact fully depends on Spring for Apache Kafka auto-configuration and injects a `ConcurrentKafkaListenerContainerFactory` from there.
18+
19+
A `ComponentCustomizer<KafkaMessageDrivenChannelAdapterSpec<?, ?, ?>>` bean can be added in the target project to provide any custom options for the `KafkaMessageDrivenChannelAdapterSpec` configuration used by the `kafkaSupplier`.
20+
21+
See more information about `KafkaMessageDrivenChannelAdapter` configuration and behavior in Spring Integration https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound[documentation].
22+
23+
## Tests
24+
25+
26+
## Other usage
27+
28+
See this https://github.com/spring-cloud/stream-applications/blob/master/applications/source/kafka-source/README.adoc[README] where this supplier is used to create a Spring Cloud Stream application where it makes an Apache Kafka source.

Diff for: functions/supplier/kafka-supplier/pom.xml

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
6+
<modelVersion>4.0.0</modelVersion>
7+
8+
<parent>
9+
<groupId>org.springframework.cloud.fn</groupId>
10+
<artifactId>spring-functions-parent</artifactId>
11+
<version>4.0.0-SNAPSHOT</version>
12+
<relativePath>../../spring-functions-parent/pom.xml</relativePath>
13+
</parent>
14+
15+
16+
<artifactId>kafka-supplier</artifactId>
17+
<name>kafka-supplier</name>
18+
<description>Apache Kafka Supplier based on Apache Kafka Consumer</description>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.springframework.integration</groupId>
23+
<artifactId>spring-integration-kafka</artifactId>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>org.springframework.kafka</groupId>
28+
<artifactId>spring-kafka-test</artifactId>
29+
<scope>test</scope>
30+
</dependency>
31+
</dependencies>
32+
33+
</project>

0 commit comments

Comments
 (0)