Skip to content

Commit b4287d5

Browse files
authored
Merge pull request #45826 from ozangunalp/kafka_transactions_doc
Added kafka transaction / hibernate transaction chaining tests
2 parents b15ffb4 + 806fcec commit b4287d5

File tree

13 files changed

+337
-13
lines changed

13 files changed

+337
-13
lines changed

docs/src/main/asciidoc/kafka.adoc

+93-12
Original file line numberDiff line numberDiff line change
@@ -3191,23 +3191,104 @@ public class FruitProducer {
31913191
@Consumes(MediaType.APPLICATION_JSON)
31923192
@Bulkhead(1)
31933193
public Uni<Void> post(Fruit fruit) {
3194-
Context context = Vertx.currentContext(); // <2>
3195-
return sf.withTransaction(session -> // <3>
3196-
kafkaTx.withTransaction(emitter -> // <4>
3197-
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <5>
3198-
).emitOn(context::runOnContext) // <6>
3199-
);
3194+
return sf.withTransaction(session -> // <2>
3195+
kafkaTx.withTransaction(emitter -> // <3>
3196+
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <4>
3197+
));
32003198
}
32013199
}
32023200
----
32033201

32043202
<1> Inject the Hibernate Reactive `SessionFactory`.
3205-
<2> Capture the caller Vert.x context.
3206-
<3> Begin a Hibernate Reactive transaction.
3207-
<4> Begin a Kafka transaction.
3208-
<5> Persist the payload and send the entity to Kafka.
3209-
<6> The Kafka transaction terminates on the Kafka producer sender thread.
3210-
We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.
3203+
<2> Begin a Hibernate Reactive transaction.
3204+
<3> Begin a Kafka transaction.
3205+
<4> Persist the payload and send the entity to Kafka.
3206+
3207+
Alternatively, you can use the `@WithTransaction` annotation to start a transaction and commit it when the method returns:
3208+
3209+
[source, java]
3210+
----
3211+
import jakarta.inject.Inject;
3212+
import jakarta.ws.rs.Consumes;
3213+
import jakarta.ws.rs.POST;
3214+
import jakarta.ws.rs.Path;
3215+
import jakarta.ws.rs.core.MediaType;
3216+
3217+
import org.eclipse.microprofile.faulttolerance.Bulkhead;
3218+
import org.eclipse.microprofile.reactive.messaging.Channel;
3219+
3220+
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
3221+
import io.smallrye.mutiny.Uni;
3222+
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
3223+
3224+
@Path("/")
3225+
public class FruitProducer {
3226+
3227+
@Channel("kafka") KafkaTransactions<Fruit> kafkaTx;
3228+
3229+
@POST
3230+
@Path("/fruits")
3231+
@Consumes(MediaType.APPLICATION_JSON)
3232+
@Bulkhead(1)
3233+
@WithTransaction // <1>
3234+
public Uni<Void> post(Fruit fruit) {
3235+
return kafkaTx.withTransaction(emitter -> // <2>
3236+
fruit.persist().invoke(() -> emitter.send(fruit)) // <3>
3237+
);
3238+
}
3239+
}
3240+
----
3241+
3242+
<1> Start a Hibernate Reactive transaction and commit it when the method returns.
3243+
<2> Begin a Kafka transaction.
3244+
<3> Persist the payload and send the entity to Kafka.
3245+
3246+
[[chaining-kafka-transactions-with-hibernate-orm-transactions]]
3247+
=== Chaining Kafka Transactions with Hibernate ORM transactions
3248+
3249+
While `KafkaTransactions` provide a reactive API on top of Mutiny to manage Kafka transactions,
3250+
you can still chain Kafka transactions with blocking Hibernate ORM transactions.
3251+
3252+
[source, java]
3253+
----
3254+
import jakarta.transaction.Transactional;
3255+
import jakarta.ws.rs.POST;
3256+
import jakarta.ws.rs.Path;
3257+
3258+
import org.eclipse.microprofile.reactive.messaging.Channel;
3259+
3260+
import io.quarkus.logging.Log;
3261+
import io.smallrye.mutiny.Uni;
3262+
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
3263+
3264+
@Path("/")
3265+
public class FruitProducer {
3266+
3267+
@Channel("kafka") KafkaTransactions<Pet> emitter;
3268+
3269+
@POST
3270+
@Path("/fruits")
3271+
@Consumes(MediaType.APPLICATION_JSON)
3272+
@Bulkhead(1)
3273+
@Transactional // <1>
3274+
public void post(Fruit fruit) {
3275+
emitter.withTransaction(e -> { // <2>
3276+
// if id is attributed by the database, will need to flush to get it
3277+
// fruit.persistAndFlush();
3278+
fruit.persist(); // <3>
3279+
Log.infov("Persisted fruit {0}", p);
3280+
e.send(p); // <4>
3281+
return Uni.createFrom().voidItem();
3282+
}).await().indefinitely(); // <5>
3283+
}
3284+
}
3285+
----
3286+
3287+
<1> Start a Hibernate ORM transaction. The transaction is committed when the method returns.
3288+
<2> Begin a Kafka transaction.
3289+
<3> Persist the payload.
3290+
<4> Send the entity to Kafka inside the Kafka transaction.
3291+
<5> Wait on the returned `Uni` for the Kafka transaction to complete.
32113292

32123293
== Logging
32133294

integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java

+16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.quarkus.it.kafka.fruit.Fruit;
1717
import io.quarkus.it.kafka.people.PeopleState;
1818
import io.quarkus.it.kafka.people.Person;
19+
import io.quarkus.it.kafka.pet.Pet;
1920
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
2021

2122
@Path("/kafka")
@@ -41,6 +42,21 @@ public List<Person> getPeople() {
4142
return receivers.getPeople();
4243
}
4344

45+
@GET
46+
@Path("/pets")
47+
@Produces(MediaType.APPLICATION_JSON)
48+
@Transactional
49+
public List<Pet> getPets() {
50+
return receivers.getPets();
51+
}
52+
53+
@GET
54+
@Path("/pets-consumed")
55+
@Produces(MediaType.APPLICATION_JSON)
56+
public List<Pet> getConsumedPets() {
57+
return receivers.getPetsConsumed();
58+
}
59+
4460
@GET
4561
@Path("/people-state")
4662
@Produces(MediaType.APPLICATION_JSON)

integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java

+16
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import io.quarkus.it.kafka.fruit.FruitDto;
1717
import io.quarkus.it.kafka.people.PeopleState;
1818
import io.quarkus.it.kafka.people.Person;
19+
import io.quarkus.it.kafka.pet.Pet;
1920
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
2021

2122
@ApplicationScoped
2223
public class KafkaReceivers {
2324

2425
private final List<Person> people = new CopyOnWriteArrayList<>();
2526

27+
private final List<Pet> pets = new CopyOnWriteArrayList<>();
28+
2629
@Channel("fruits-persisted")
2730
Emitter<FruitDto> emitter;
2831

@@ -33,6 +36,11 @@ public CompletionStage<Void> persist(Fruit fruit) {
3336
return emitter.send(new FruitDto(fruit));
3437
}
3538

39+
@Incoming("pets-in")
40+
public void persist(Pet pet) {
41+
pets.add(pet);
42+
}
43+
3644
@Incoming("people-in")
3745
public CompletionStage<Void> consume(Message<Person> msg) {
3846
CheckpointMetadata<PeopleState> store = CheckpointMetadata.fromMessage(msg);
@@ -57,4 +65,12 @@ public List<Person> getPeople() {
5765
return people;
5866
}
5967

68+
public List<Pet> getPets() {
69+
return Pet.listAll();
70+
}
71+
72+
public List<Pet> getPetsConsumed() {
73+
return pets;
74+
}
75+
6076
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.quarkus.it.kafka.pet;
2+
3+
import jakarta.persistence.Entity;
4+
import jakarta.persistence.GeneratedValue;
5+
import jakarta.persistence.GenerationType;
6+
import jakarta.persistence.Id;
7+
8+
import io.quarkus.hibernate.orm.panache.PanacheEntityBase;
9+
10+
@Entity
11+
public class Pet extends PanacheEntityBase {
12+
13+
@Id
14+
@GeneratedValue(strategy = GenerationType.IDENTITY)
15+
public Long id;
16+
17+
public String name;
18+
19+
public Pet(String name) {
20+
this.name = name;
21+
}
22+
23+
public Pet() {
24+
// Jackson will use this constructor.
25+
}
26+
27+
@Override
28+
public String toString() {
29+
return this.getClass().getSimpleName() + "<" + id + ">";
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.quarkus.it.kafka.pet;
2+
3+
import jakarta.transaction.Transactional;
4+
import jakarta.ws.rs.POST;
5+
import jakarta.ws.rs.Path;
6+
7+
import org.eclipse.microprofile.reactive.messaging.Channel;
8+
9+
import io.quarkus.logging.Log;
10+
import io.smallrye.mutiny.Uni;
11+
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
12+
13+
@Path("/kafka")
14+
public class PetProducer {
15+
16+
@Channel("pets")
17+
KafkaTransactions<Pet> emitter;
18+
19+
@POST
20+
@Path("/pets")
21+
@Transactional
22+
public void post(String name) {
23+
Log.infov("Sending pet {0}", name);
24+
Pet pet = new Pet(name);
25+
emitter.withTransaction(e -> {
26+
pet.persist();
27+
if (pet.name.equals("bad")) {
28+
throw new IllegalArgumentException("bad pet");
29+
}
30+
Log.infov("Persisted pet {0}", pet);
31+
e.send(pet);
32+
return Uni.createFrom().voidItem();
33+
}).await().indefinitely();
34+
}
35+
}

integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ mp.messaging.incoming.people-in.auto.commit.interval.ms=500
2525
mp.messaging.incoming.people-in.group.id=people-checkpoint
2626

2727
quarkus.datasource.devservices.enabled=true
28-
quarkus.hibernate-orm.packages=io.quarkus.it.kafka.fruit
28+
quarkus.hibernate-orm.packages=io.quarkus.it.kafka.fruit,io.quarkus.it.kafka.pet
29+
2930
quarkus.datasource."people".devservices.enabled=true
3031
quarkus.hibernate-orm."people".datasource=people
3132
quarkus.hibernate-orm."people".packages=io.quarkus.it.kafka.people
33+
34+
mp.messaging.incoming.pets-in.topic=pets
35+
mp.messaging.incoming.pets-in.read-committed=true

integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.quarkus.it.kafka.fruit.Fruit;
1818
import io.quarkus.it.kafka.people.PeopleState;
1919
import io.quarkus.it.kafka.people.Person;
20+
import io.quarkus.it.kafka.pet.Pet;
2021
import io.quarkus.test.common.QuarkusTestResource;
2122
import io.quarkus.test.junit.QuarkusTest;
2223
import io.quarkus.test.kafka.InjectKafkaCompanion;
@@ -74,4 +75,30 @@ public void testPeople() {
7475
});
7576
}
7677

78+
protected static final TypeRef<List<Pet>> PET_TYPE_REF = new TypeRef<List<Pet>>() {
79+
};
80+
81+
@Test
82+
public void testPets() {
83+
await().untilAsserted(() -> Assertions.assertEquals(0, get("/kafka/pets").as(PET_TYPE_REF).size()));
84+
85+
given().body("cat").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
86+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
87+
given().body("dog").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
88+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
89+
given().body("bad").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
90+
.assertThat().statusCode(is(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
91+
given().body("mouse").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
92+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
93+
given().body("rabbit").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
94+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
95+
given().body("fish").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
96+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
97+
given().body("hamster").contentType(ContentType.TEXT).when().post("/kafka/pets").then()
98+
.assertThat().statusCode(is(Response.Status.NO_CONTENT.getStatusCode()));
99+
100+
await().untilAsserted(() -> Assertions.assertEquals(6, get("/kafka/pets").as(PET_TYPE_REF).size()));
101+
await().untilAsserted(() -> Assertions.assertEquals(6, get("/kafka/pets-consumed").as(PET_TYPE_REF).size()));
102+
}
103+
77104
}

integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java

+14
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ public List<Person> getPeople() {
3737
return receivers.getPeople();
3838
}
3939

40+
@GET
41+
@Path("/pets")
42+
@Produces(MediaType.APPLICATION_JSON)
43+
public Uni<List<Pet>> getPets() {
44+
return receivers.getPets();
45+
}
46+
47+
@GET
48+
@Path("/pets-consumed")
49+
@Produces(MediaType.APPLICATION_JSON)
50+
public List<Pet> getConsumedPets() {
51+
return receivers.getConsumedPets();
52+
}
53+
4054
@GET
4155
@Path("/people-state/{consumerGroupId}/{topic}/{partition}")
4256
@Produces(MediaType.APPLICATION_JSON)

integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java

+15
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class KafkaReceivers {
2424

2525
private final List<Person> people = new CopyOnWriteArrayList<>();
2626

27+
private final List<Pet> pets = new CopyOnWriteArrayList<>();
28+
2729
@Incoming("fruits-in")
2830
@Outgoing("fruits-persisted")
2931
@WithTransaction
@@ -45,6 +47,11 @@ public void consumeFruit(Fruit fruit) {
4547
assert Objects.equals(ContextLocals.get("fruit-id").get(), fruit.id);
4648
}
4749

50+
@Incoming("pets-in")
51+
public void consumePet(Pet pet) {
52+
pets.add(pet);
53+
}
54+
4855
@Incoming("people-in")
4956
public CompletionStage<Void> consume(Message<Person> msg) {
5057
CheckpointMetadata<PeopleState> store = CheckpointMetadata.fromMessage(msg);
@@ -70,4 +77,12 @@ public List<Person> getPeople() {
7077
return people;
7178
}
7279

80+
@WithSession
81+
public Uni<List<Pet>> getPets() {
82+
return Pet.listAll();
83+
}
84+
85+
public List<Pet> getConsumedPets() {
86+
return pets;
87+
}
7388
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.quarkus.it.kafka;
2+
3+
import jakarta.persistence.Entity;
4+
5+
import io.quarkus.hibernate.reactive.panache.PanacheEntity;
6+
7+
@Entity
8+
public class Pet extends PanacheEntity {
9+
10+
public String name;
11+
12+
public Pet(String name) {
13+
this.name = name;
14+
}
15+
16+
public Pet() {
17+
// Jackson will use this constructor.
18+
}
19+
}

0 commit comments

Comments
 (0)