Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spring Cloud Stream Event Externalization support #1047

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spring-modulith-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<module>spring-modulith-events-messaging</module>
<module>spring-modulith-events-mongodb</module>
<module>spring-modulith-events-neo4j</module>
<module>spring-modulith-events-scs</module>
</modules>

<profiles>
Expand Down
132 changes: 132 additions & 0 deletions spring-modulith-events/spring-modulith-events-scs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Spring-Modulith Events Externalizer for Spring Cloud Stream

[![Maven Central](https://img.shields.io/maven-central/v/io.zenwave360.sdk/spring-modulith-events-scs.svg?label=Maven%20Central&logo=apachemaven)](https://search.maven.org/artifact/io.zenwave360.sdk/spring-modulith-events-scs)
[![build](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/workflows/Build/badge.svg)](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml)
[![coverage](https://raw.githubusercontent.com/ZenWave360/spring-modulith-events-spring-cloud-stream/badges/jacoco.svg)](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml)
[![branches coverage](https://raw.githubusercontent.com/ZenWave360/spring-modulith-events-spring-cloud-stream/badges/branches.svg)](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml)
[![GitHub](https://img.shields.io/github/license/ZenWave360/spring-modulith-events-spring-cloud-stream)](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/blob/main/LICENSE)

Spring-Modulith Events Externalizer that uses Spring Cloud Stream supporting both JSON and Avro serialization formats.

## Getting Started

### Dependency
Add the following Maven dependency to your project:

```xml
<dependency>
<groupId>io.zenwave360.sdk</groupId>
<artifactId>spring-modulith-events-scs</artifactId>
<version>${spring-modulith-events-scs.version}</version>
</dependency>
```

### Configuration
Use `@EnableSpringCloudStreamEventExternalization` annotation to enable Spring Cloud Stream event externalization in your Spring configuration:

```java
@Configuration
@EnableSpringCloudStreamEventExternalization
public class SpringCloudStreamEventsConfig {
// Additional configurations (if needed)
}
```

This configuration ensures that, in addition to events annotated with `@Externalized`, all events of type `org.springframework.messaging.Message` with a header named `SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_EVENT_HEADER` will be externalized and routed to their specified destination using the value of this header as the routing target.

---

## Event Serialization

Using the transactional event publication log requires serializing events to a format that can be stored in a database. Since the generic type of `Message<?>` payload is lost when using the default `JacksonEventSerializer`, this library adds an extra `_class` field to preserve payload type information, allowing for complete deserialization to its original type.

This library provides support for POJO (JSON) and Avro serialization formats for `Message<?>` payloads.

### Avro Serialization

Avro serialization needs `com.fasterxml.jackson.dataformat.avro.AvroMapper` class present in the classpath. In order to use Avro serialization, you need to add the following dependency to your project:

```xml
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
</dependency>
```

---

## Routing Events

### Programmatic Routing for `Message<?`> events

You can define routing targets programmatically using a Message header:

```java
public class CustomerEventsProducer implements ICustomerEventsProducer {

private final ApplicationEventPublisher applicationEventPublisher;

public void onCustomerCreated(CustomerCreated event) {
Message<CustomerCreated> message = MessageBuilder.withPayload(event)
.setHeader(
SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER,
"customer-created") // <- target binding name
.build();
applicationEventPublisher.publishEvent(message);
}
}
```

### Annotation-Based Routing for POJO Events

Leverage the `@Externalized` annotation to define the target binding name and routing key:

```java
@Externalized("customer-created::#{#this.getLastname()}")
class CustomerCreated {

public String getLastname() {
// Return the customer's last name
}
}
```

### Configure Spring Cloud Stream destination

Configure Spring Cloud Stream destination for your bindings as usual in `application.yml`:

```yaml
spring:
cloud:
stream:
bindings:
customer-created:
destination: customer-created-topic
```
### Routing Key
`SpringCloudStreamEventExternalizer` dynamically sets the appropriate Message header (e.g., `kafka_messageKey` or `rabbit_routingKey`) from your routing key based on the channel binder type, if the routing header is not already present.

- KafkaMessageChannelBinder: `kafka_messageKey`
- RabbitMessageChannelBinder: `rabbit_routingKey`
- KinesisMessageChannelBinder: `partitionKey`
- PubSubMessageChannelBinder: `pubsub_orderingKey`
- EventHubsMessageChannelBinder: `partitionKey`
- SolaceMessageChannelBinder: `solace_messageKey`
- PulsarMessageChannelBinder: `pulsar_key`

---

## Using Snapshot Versions
In order to test snapshot versions of this library, add the following repository to your Maven configuration:

```xml
<repository>
<id>gh</id>
<url>https://raw.githubusercontent.com/ZenWave360/maven-snapshots/refs/heads/main</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
```
149 changes: 149 additions & 0 deletions spring-modulith-events/spring-modulith-events-scs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>

<name>Spring Modulith - Events - Spring Cloud Stream support</name>
<artifactId>spring-modulith-events-scs</artifactId>

<properties>
<module.name>org.springframework.modulith.events.scs</module.name>

<!-- integration testing versions -->
<spring-boot.version>3.4.0</spring-boot.version>
<spring-cloud.version>2024.0.0</spring-cloud.version>
<spring-cloud-stream-schema.version>2.2.1.RELEASE</spring-cloud-stream-schema.version>
<avro.version>1.11.4</avro.version>

<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
</properties>

<developers>
<developer>
<name>Ivan Garcia Sainz-Aja</name>
<email>[email protected]</email>
<organization>ZenWave360</organization>
<organizationUrl>https://github.com/ZenWave360</organizationUrl>
</developer>
</developers>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<!-- optional -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<optional>true</optional>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream-schema.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-starter-jdbc</artifactId>
<version>${parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<!-- Force alphabetical order to have a reproducible build -->
<runOrder>alphabetical</runOrder>
<excludes>
<exclude>**/*IT*</exclude>
<exclude>**/*IntTest*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.springframework.modulith.events.scs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import org.springframework.modulith.events.core.EventSerializer;

import java.util.Map;

public class AvroEventSerializer extends MessageEventSerializer implements EventSerializer {

private AvroMapper avroMapper;

public AvroEventSerializer(ObjectMapper jacksonMapper) {
super(jacksonMapper);
this.avroMapper = AvroMapper.builder().build();
}

public AvroEventSerializer(AvroMapper avroMapper, ObjectMapper jacksonMapper) {
super(jacksonMapper);
this.avroMapper = avroMapper;
}

protected Map<String, Object> serializeToMap(Object payload) {
ObjectNode objectNode = avroMapper.valueToTree(payload);
objectNode.remove("specificData"); // TODO: remove this recursively
return avroMapper.convertValue(objectNode, Map.class);
}

}
Loading