Skip to content

TestBinder+Kafka not working as intended (tombstone records) #2971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rnferreira opened this issue Jul 9, 2024 · 5 comments
Closed

TestBinder+Kafka not working as intended (tombstone records) #2971

rnferreira opened this issue Jul 9, 2024 · 5 comments
Milestone

Comments

@rnferreira
Copy link

rnferreira commented Jul 9, 2024

Greetings, everyone! I encountered an issue with the most recent release of Spring Cloud Stream + Kafka binder (release train 2023.0.2) when using KafkaNull (tombstone records) and TestBinder. With version 2023.0.1, all tests asserting the output of functions that returned KafkaNull passed, whereas, in this new version, they don't. The output is now transformed into byte[], and the payload now has the byte equivalent of "{}".

Here are the relevant snippets (extremely minimalist):

build.gradle:

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.1'
    id 'io.spring.dependency-management' version '1.1.5'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "2023.0.2")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation "org.springframework.cloud:spring-cloud-starter-stream-kafka"
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
//    testImplementation 'org.springframework.cloud:spring-cloud-starter-contract-stub-runner'

}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

tasks.named('test') {
    useJUnitPlatform()
}

Function configuration:

@SpringBootApplication
public class Gh2971Application {

    public static void main(String[] args) {
        SpringApplication.run(Gh2971Application.class, args);
    }

    @Bean
    public Function<Message<?>,Message<?>> myFunction() {
        return v -> MessageBuilder.withPayload(KafkaNull.INSTANCE).build();
    }
}

Test:

// 
@ImportAutoConfiguration(TestChannelBinderConfiguration.class)
@SpringBootTest(properties = {"spring.cloud.function.definition=myFunction"})
class Gh2971ApplicationTests {

    @Value("${spring.cloud.stream.bindings.myFunction-in-0.destination}")
    private String inputTopic;

    @Value("${spring.cloud.stream.bindings.myFunction-out-0.destination}")
    private String outputTopic;

    @Autowired
    private InputDestination input;
    @Autowired
    private OutputDestination output;

    @Test
    void test() {
        var message =
                MessageBuilder.withPayload(KafkaNull.INSTANCE)
                        .build();
        this.input.send(message, this.inputTopic);

        var received = this.output.receive(0L, this.outputTopic);
        assertThat(received)
                .isNotNull()
                .extracting(Message::getPayload)
                .isEqualTo(KafkaNull.INSTANCE);
    }
}

Application properties:

spring.application.name=gh-2971
spring.cloud.function.definition=myFunction
spring.cloud.stream.bindings.myFunction-in-0.destination=in
spring.cloud.stream.bindings.myFunction-out-0.destination=out
spring.cloud.stream.kafka.binder.consumer-properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.spring.json.use.type.headers=false

Expected Behavior:

The test passes.

Actual Behavior:

The test fails with the following message:

org.opentest4j.AssertionFailedError: 
expected: org.springframework.kafka.support.KafkaNull@c317472
 but was: [123, 125]
Expected :org.springframework.kafka.support.KafkaNull@c317472
Actual   :[123, 125]

Additional context:
Spring Cloud Stream version: 4.1.2
Kafka Binder version: 4.1.2
Test Binder: 4.1.2

@olegz
Copy link
Contributor

olegz commented Jul 9, 2024

I can'r see what could have changed, but i just plugged in your sample and i see the same 2 bytes with 4.1.1 as I do with 4.1.2

GenericMessage [payload=byte[2], 

@rnferreira
Copy link
Author

Hi Oleg! Thank you for getting back to me so quickly. Your comment pointed me in the right direction. If I include the dependency org.springframework.cloud:spring-cloud-starter-contract-stub-runner, the test passes when I use 2023.0.1 and fails on `2023.0.2'.

Was this always the behaviour? Should we now assume that the tombstone is transformed into those two bytes?

Please advise.

PS: I updated my original comment with more context.

@olegz
Copy link
Contributor

olegz commented Jul 9, 2024

I have to assume that contract is doing something, but what you can do is compile a simple project with a test and attach it as zip or push it to github so I can take a look

@rnferreira
Copy link
Author

@rnferreira
Copy link
Author

Quick update: I dug deeper and found that the method org.springframework.cloud.function.context.config.JsonMessageConverter#convertToInternal is converting the KafkaNull payload into an empty JSON ({}).

Please let me know whether you think this is worth "fixing" or not.

@olegz olegz closed this as completed in 07e7d81 Jul 17, 2024
@olegz olegz added this to the 4.1.4 milestone Jul 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants