Skip to content

Panache and Hibernate Reactive use different Vert.x local contexts #47441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
import org.hibernate.reactive.common.spi.Implementor;
import org.hibernate.reactive.context.Context.Key;
import org.hibernate.reactive.context.impl.BaseKey;
import org.hibernate.reactive.context.impl.ContextualDataStorage;
import org.hibernate.reactive.mutiny.Mutiny;
import org.hibernate.reactive.mutiny.Mutiny.Session;
import org.hibernate.reactive.mutiny.Mutiny.SessionFactory;
import org.hibernate.reactive.mutiny.Mutiny.Transaction;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ClientProxy;
import io.quarkus.arc.impl.LazyValue;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.context.storage.AccessMode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Static util methods for {@link Mutiny.Session}.
Expand Down Expand Up @@ -50,8 +52,8 @@ public Key<Session> get() {
});

// This key is used to indicate that a reactive session should be opened lazily (when needed) in the current vertx context
private static final String SESSION_ON_DEMAND_KEY = "hibernate.reactive.panache.sessionOnDemand";
private static final String SESSION_ON_DEMAND_OPENED_KEY = "hibernate.reactive.panache.sessionOnDemandOpened";
private static final Key<Boolean> SESSION_ON_DEMAND_KEY = new BaseKey<>(Boolean.class, "hibernate.reactive.panache.sessionOnDemand");
private static final Key<Boolean> SESSION_ON_DEMAND_OPENED_KEY = new BaseKey<>(Boolean.class, "hibernate.reactive.panache.sessionOnDemandOpened");

/**
* Marks the current vertx duplicated context as "lazy" which indicates that a reactive session should be opened lazily if
Expand All @@ -63,17 +65,17 @@ public Key<Session> get() {
* @see #getSession()
*/
static <T> Uni<T> withSessionOnDemand(Supplier<Uni<T>> work) {
Context context = vertxContext();
if (context.getLocal(SESSION_ON_DEMAND_KEY) != null) {
Map<Key<Boolean>, Boolean> contextualDataMap = contextualDataMap(vertxContext());
if (contextualDataMap.get(SESSION_ON_DEMAND_KEY) != null) {
// context already marked - no need to set the key and close the session
return work.get();
} else {
// mark the lazy session
context.putLocal(SESSION_ON_DEMAND_KEY, true);
contextualDataMap.put(SESSION_ON_DEMAND_KEY, Boolean.TRUE);
// perform the work and eventually close the session and remove the key
return work.get().eventually(() -> {
context.removeLocal(SESSION_ON_DEMAND_KEY);
context.removeLocal(SESSION_ON_DEMAND_OPENED_KEY);
contextualDataMap.remove(SESSION_ON_DEMAND_KEY);
contextualDataMap.remove(SESSION_ON_DEMAND_OPENED_KEY);
return closeSession();
});
}
Expand Down Expand Up @@ -109,17 +111,17 @@ public static <T> Uni<T> withTransaction(Function<Transaction, Uni<T>> work) {
* @return a new {@link Uni}
*/
public static <T> Uni<T> withSession(Function<Mutiny.Session, Uni<T>> work) {
Context context = vertxContext();
Map<Key<Session>, Session> contextualDataMap = SessionOperations.<Session> contextualDataMap(vertxContext());
Key<Mutiny.Session> key = getSessionKey();
Mutiny.Session current = context.getLocal(key);
Mutiny.Session current = contextualDataMap.get(key);
if (current != null && current.isOpen()) {
// reactive session exists - reuse this session
return work.apply(current);
} else {
// reactive session does not exist - open a new one and close it when the returned Uni completes
return getSessionFactory()
.openSession()
.invoke(s -> context.putLocal(key, s))
.invoke(s -> contextualDataMap.put(key, s))
.chain(work::apply)
.eventually(SessionOperations::closeSession);
}
Expand All @@ -140,22 +142,24 @@ public static <T> Uni<T> withSession(Function<Mutiny.Session, Uni<T>> work) {
* @return the {@link Mutiny.Session}
*/
public static Uni<Mutiny.Session> getSession() {
Context context = vertxContext();
ContextInternal context = vertxContext();
Key<Mutiny.Session> key = getSessionKey();
Mutiny.Session current = context.getLocal(key);
final Mutiny.Session current = SessionOperations.<Session> contextualDataMap(context).get(key);
final Map<Key<Boolean>, Boolean> objectsDataMap = contextualDataMap(context);
if (current != null && current.isOpen()) {
// reuse the existing reactive session
return Uni.createFrom().item(current);
} else {
if (context.getLocal(SESSION_ON_DEMAND_KEY) != null) {
if (context.getLocal(SESSION_ON_DEMAND_OPENED_KEY) != null) {
if (objectsDataMap.get(SESSION_ON_DEMAND_KEY) != null) {
if (objectsDataMap.get(SESSION_ON_DEMAND_OPENED_KEY) != null) {
// a new reactive session is opened in a previous stage
return Uni.createFrom().item(SessionOperations::getCurrentSession);
} else {
// open a new reactive session and store it in the vertx duplicated context
// the context was marked as "lazy" which means that the session will be eventually closed
context.putLocal(SESSION_ON_DEMAND_OPENED_KEY, true);
return getSessionFactory().openSession().invoke(s -> context.putLocal(key, s));
objectsDataMap.put(SESSION_ON_DEMAND_OPENED_KEY, Boolean.TRUE);
return getSessionFactory().openSession().invoke(s -> SessionOperations
.<Session> contextualDataMap(context).put(key, s));
}
} else {
throw new IllegalStateException("No current Mutiny.Session found"
Expand All @@ -170,23 +174,30 @@ public static Uni<Mutiny.Session> getSession() {
* @return the current reactive session stored in the context, or {@code null} if no session exists
*/
public static Mutiny.Session getCurrentSession() {
Context context = vertxContext();
Mutiny.Session current = context.getLocal(getSessionKey());
Mutiny.Session current = SessionOperations.<Session> contextualDataMap(vertxContext()).get(getSessionKey());
if (current != null && current.isOpen()) {
return current;
}
return null;
}

private static <T> Map<Key<T>, T> contextualDataMap(ContextInternal vertxContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating this method here, we could make it public in Hibernate Reactive and keep the ContextualDataStorage.CONTEXTUAL_DATA_KEY private.

By doing so, the implementation details about how the ConcurrentMap is created/retrieved remains hidden, yet the consumers can manipulate the data it holds.

return vertxContext.getLocal(
ContextualDataStorage.CONTEXTUAL_DATA_KEY,
AccessMode.CONCURRENT,
ConcurrentHashMap::new);
}

/**
*
* @return the current vertx duplicated context
* @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the
* {@link VertxContextSafetyToggle}
*/
private static Context vertxContext() {
Context context = Vertx.currentContext();
private static ContextInternal vertxContext() {
ContextInternal context = ContextInternal.current();
if (context != null) {
// TODO: Update check for ContextItnernal
VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG);
return context;
} else {
Expand All @@ -195,11 +206,11 @@ private static Context vertxContext() {
}

static Uni<Void> closeSession() {
Context context = vertxContext();
Key<Mutiny.Session> key = getSessionKey();
Mutiny.Session current = context.getLocal(key);
Map<Key<Session>, Session> contextualDataMap = contextualDataMap(vertxContext());
Mutiny.Session current = contextualDataMap.get(key);
if (current != null && current.isOpen()) {
return current.close().eventually(() -> context.removeLocal(key));
return current.close().eventually(() -> contextualDataMap.remove(key));
}
return Uni.createFrom().voidItem();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.it.panache.reactive;

import io.quarkus.hibernate.reactive.panache.PanacheEntityBase;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;

@Entity
public class Counter extends PanacheEntityBase {

@Id
private Long id;
private int count = 0;

public Counter() {
}

public Counter(long id) {
this.id = id;
}

public Long getId() {
return id;
}

public int getCount() {
return count;
}

@Override
public String toString() {
return String.valueOf(count);
}

public void increase() {
this.count++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.it.panache.reactive;

import org.hibernate.reactive.mutiny.Mutiny;

import org.junit.jupiter.api.Assertions;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.function.Function;
import java.util.function.Supplier;

import static jakarta.persistence.LockModeType.PESSIMISTIC_WRITE;

/**
* The goal if this class is to test the usage of {@link WithTransaction},
* you should not use {@link io.quarkus.hibernate.reactive.panache.Panache#withTransaction(Supplier)}
* nor {@link org.hibernate.reactive.mutiny.Mutiny.SessionFactory#withTransaction(Function)}
*/
@ApplicationScoped
@WithTransaction
public class WithTransactionCounterBean {

private static final Long ID_COUNTER = 42L;

@Inject
Mutiny.SessionFactory sessionFactory;

public Uni<Counter> createOrResetCounter() {
final Counter counter = new Counter(ID_COUNTER);
return sessionFactory.withStatelessSession(session -> session
.upsert(counter)
.replaceWith(counter)
);
}

public Uni<Counter> increaseCounterWithHR() {
return sessionFactory.withSession(session -> session
.find(Counter.class, ID_COUNTER, PESSIMISTIC_WRITE)
.invoke(Counter::increase)
);
}

public Uni<Counter> increaseCounterWithPanache() {
return Counter
.<Counter> findById(ID_COUNTER, PESSIMISTIC_WRITE)
.invoke(Counter::increase);
}

public Uni<Void> assertThatSessionsAreEqual() {
return sessionFactory.withSession(hrSession -> Panache
.getSession().chain(panacheSession -> {
if (panacheSession != hrSession) {
return Uni.createFrom().failure(new AssertionError("Sessions are different!"));
}
return Uni.createFrom().voidItem();
})
);
}

public Uni<Counter> findCounter() {
return sessionFactory.withSession(session -> session
.find(Counter.class, ID_COUNTER)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${postgres.reactive.url}

quarkus.hibernate-orm.database.generation=drop-and-create

quarkus.log.category."org.hibernate.reactive.context".level=TRACE

quarkus.hibernate-orm.log.sql=true
quarkus.hibernate-orm.log.format-sql=false
quarkus.hibernate-orm.log.highlight-sql=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.quarkus.it.panache.reactive;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import static io.quarkus.vertx.VertxContextSupport.subscribeAndAwait;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Hibernate Reactive and Panache store the created sessions in a local Vert.x context,
* and we need to make sure that they won't get lost.
* See issue <a href="https://github.com/quarkusio/quarkus/issues/47314">47314</a>
*
* @see io.quarkus.hibernate.reactive.panache.common.runtime.SessionOperations
* @see io.quarkus.hibernate.reactive.panache.common.WithTransaction
*/
@QuarkusTest
public class WithTransactionTest {
// How many times we want to increase the counter
private static final int INCREASES_NUM = 10;

@Inject
WithTransactionCounterBean counterBean;

@BeforeEach
public void createOrResetCounter() throws Throwable {
subscribeAndAwait(counterBean::createOrResetCounter);
}

@Test
void increaseCounterWithHibernateReactive() throws Throwable {
subscribeAndAwait(counterBean::increaseCounterWithHR);
Counter counter = subscribeAndAwait(counterBean::findCounter);

assertThat(counter.getCount()).isEqualTo(1);
}

@Test
void increaseCounterWithPanache() throws Throwable {
subscribeAndAwait(counterBean::increaseCounterWithPanache);
Counter counter = subscribeAndAwait(counterBean::findCounter);

assertThat(counter.getCount()).isEqualTo(1);
}

@Test
void shouldReuseExistingSessions() throws Throwable {
subscribeAndAwait(counterBean::assertThatSessionsAreEqual);
}

@Test
void increaseCounter() throws Throwable {
List<Supplier<Uni<Counter>>> suppliers = new ArrayList<>();
for (int i = 0; i < INCREASES_NUM; i++) {
suppliers.add(() -> counterBean.increaseCounterWithHR());
}

final List<Counter> results = new ArrayList<>();
suppliers.stream()
.parallel()
.forEach(uni -> increaseCounter(uni, results));

final Counter dbCounter = subscribeAndAwait(() -> counterBean.findCounter());
assertThat(dbCounter.getCount()).isEqualTo(INCREASES_NUM);
}

private static void increaseCounter(Supplier<Uni<Counter>> func, List<Counter> counters) {
try {
final var counter = subscribeAndAwait(func);
counters.add(counter);
} catch (final Throwable e) {
fail(e);
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
<antlr.version>4.13.0</antlr.version> <!-- version controlled by Hibernate ORM's needs -->
<bytebuddy.version>1.15.11</bytebuddy.version> <!-- version controlled by Hibernate ORM's needs -->
<hibernate-commons-annotations.version>7.0.3.Final</hibernate-commons-annotations.version> <!-- version controlled by Hibernate ORM's needs -->
<hibernate-reactive.version>2.4.6.Final</hibernate-reactive.version> <!-- highly sensitive to Hibernate ORM upgrades -->
<hibernate-reactive.version>2.4.7.Final</hibernate-reactive.version> <!-- highly sensitive to Hibernate ORM upgrades -->
<hibernate-validator.version>8.0.2.Final</hibernate-validator.version>
<hibernate-search.version>7.2.3.Final</hibernate-search.version>

Expand Down