Skip to content

Commit 966d8a9

Browse files
big-cirtishun
andauthored
Add custom connection validation to ConnectionPoolSupport #3081 (#3138)
* Add custom connection validation to ConnectionPoolSupport * Polishing --------- Co-authored-by: Tihomir Mateev <[email protected]>
1 parent b7ef59e commit 966d8a9

File tree

3 files changed

+135
-9
lines changed

3 files changed

+135
-9
lines changed

Diff for: src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java

+70-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.atomic.AtomicReference;
77
import java.util.concurrent.locks.Lock;
88
import java.util.concurrent.locks.ReentrantLock;
9+
import java.util.function.Predicate;
910
import java.util.function.Supplier;
1011

1112
import org.apache.commons.pool2.BasePooledObjectFactory;
@@ -60,6 +61,7 @@
6061
* </pre>
6162
*
6263
* @author Mark Paluch
64+
* @author dae won
6365
* @since 4.3
6466
*/
6567
public abstract class ConnectionPoolSupport {
@@ -69,7 +71,8 @@ private ConnectionPoolSupport() {
6971

7072
/**
7173
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
72-
* returned with {@link ObjectPool#returnObject(Object)}.
74+
* returned with {@link ObjectPool#returnObject(Object)}. By default, connections are validated by checking their
75+
* {@link StatefulConnection#isOpen()} method.
7376
*
7477
* @param connectionSupplier must not be {@code null}.
7578
* @param config must not be {@code null}.
@@ -78,30 +81,67 @@ private ConnectionPoolSupport() {
7881
*/
7982
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
8083
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config) {
81-
return createGenericObjectPool(connectionSupplier, config, true);
84+
return createGenericObjectPool(connectionSupplier, config, true, (c) -> c.isOpen());
8285
}
8386

8487
/**
85-
* Creates a new {@link GenericObjectPool} using the {@link Supplier}.
88+
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
89+
* returned with {@link ObjectPool#returnObject(Object)}.
90+
*
91+
* @param connectionSupplier must not be {@code null}.
92+
* @param config must not be {@code null}.
93+
* @param validationPredicate a {@link Predicate} to help validate connections
94+
* @param <T> connection type.
95+
* @return the connection pool.
96+
*/
97+
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
98+
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, Predicate<T> validationPredicate) {
99+
return createGenericObjectPool(connectionSupplier, config, true, validationPredicate);
100+
}
101+
102+
/**
103+
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. By default, connections are validated by checking
104+
* their {@link StatefulConnection#isOpen()} method.
86105
*
87106
* @param connectionSupplier must not be {@code null}.
88107
* @param config must not be {@code null}.
89108
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
90-
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
109+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
91110
* when invoking {@link StatefulConnection#close()}.
92111
* @param <T> connection type.
93112
* @return the connection pool.
94113
*/
95114
@SuppressWarnings("unchecked")
96115
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
97116
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections) {
117+
return createGenericObjectPool(connectionSupplier, config, wrapConnections, (c) -> c.isOpen());
118+
}
119+
120+
/**
121+
* Creates a new {@link GenericObjectPool} using the {@link Supplier}.
122+
*
123+
* @param connectionSupplier must not be {@code null}.
124+
* @param config must not be {@code null}.
125+
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
126+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
127+
* when invoking {@link StatefulConnection#close()}.
128+
* @param validationPredicate a {@link Predicate} to help validate connections
129+
* @param <T> connection type.
130+
* @return the connection pool.
131+
*/
132+
@SuppressWarnings("unchecked")
133+
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
134+
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections,
135+
Predicate<T> validationPredicate) {
98136

99137
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
100138
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
139+
LettuceAssert.notNull(validationPredicate, "Connection validator must not be null");
101140

102141
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
103142

104-
GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
143+
GenericObjectPool<T> pool = new GenericObjectPool<T>(
144+
new RedisPooledObjectFactory<>(connectionSupplier, validationPredicate), config) {
105145

106146
@Override
107147
public T borrowObject() throws Exception {
@@ -144,20 +184,38 @@ public void returnObject(T obj) {
144184
*
145185
* @param connectionSupplier must not be {@code null}.
146186
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
147-
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
187+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
148188
* when invoking {@link StatefulConnection#close()}.
149189
* @param <T> connection type.
150190
* @return the connection pool.
151191
*/
152192
@SuppressWarnings("unchecked")
153193
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
154194
Supplier<T> connectionSupplier, boolean wrapConnections) {
195+
return createSoftReferenceObjectPool(connectionSupplier, wrapConnections, (c) -> c.isOpen());
196+
}
197+
198+
/**
199+
* Creates a new {@link SoftReferenceObjectPool} using the {@link Supplier}.
200+
*
201+
* @param connectionSupplier must not be {@code null}.
202+
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
203+
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connections that are returned to the pool
204+
* when invoking {@link StatefulConnection#close()}.
205+
* @param validationPredicate a {@link Predicate} to help validate connections
206+
* @param <T> connection type.
207+
* @return the connection pool.
208+
*/
209+
@SuppressWarnings("unchecked")
210+
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
211+
Supplier<T> connectionSupplier, boolean wrapConnections, Predicate<T> validationPredicate) {
155212

156213
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
157214

158215
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
159216

160-
SoftReferenceObjectPool<T> pool = new SoftReferenceObjectPool<T>(new RedisPooledObjectFactory<>(connectionSupplier)) {
217+
SoftReferenceObjectPool<T> pool = new SoftReferenceObjectPool<T>(
218+
new RedisPooledObjectFactory<>(connectionSupplier, validationPredicate)) {
161219

162220
private final Lock lock = new ReentrantLock();
163221

@@ -200,8 +258,11 @@ private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>
200258

201259
private final Supplier<T> connectionSupplier;
202260

203-
RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
261+
private final Predicate<T> validationPredicate;
262+
263+
RedisPooledObjectFactory(Supplier<T> connectionSupplier, Predicate<T> validationPredicate) {
204264
this.connectionSupplier = connectionSupplier;
265+
this.validationPredicate = validationPredicate;
205266
}
206267

207268
@Override
@@ -221,7 +282,7 @@ public PooledObject<T> wrap(T obj) {
221282

222283
@Override
223284
public boolean validateObject(PooledObject<T> p) {
224-
return p.getObject().isOpen();
285+
return this.validationPredicate.test(p.getObject());
225286
}
226287

227288
}

Diff for: src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java

+22
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,28 @@ void shouldWorkWithPooledConnection() throws Exception {
112112
pool.close();
113113
}
114114

115+
@Test
116+
void shouldWorkWithPooledConnectionAndCustomValidation() throws Exception {
117+
118+
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
119+
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>(), connection -> {
120+
try {
121+
return "PONG".equals(connection.sync().ping());
122+
} catch (Exception e) {
123+
return false;
124+
}
125+
});
126+
127+
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
128+
129+
RedisCommandFactory factory = new RedisCommandFactory(connection);
130+
SimpleCommands commands = factory.getCommands(SimpleCommands.class);
131+
commands.get("foo");
132+
}
133+
134+
pool.close();
135+
}
136+
115137
@Test
116138
void shouldWorkWithAsyncPooledConnection() {
117139

Diff for: src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java

+43
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,27 @@ void genericPoolShouldWorkWithPlainConnections() throws Exception {
131131
pool.close();
132132
}
133133

134+
@Test
135+
void genericPoolShouldWorkWithValidationPredicate() throws Exception {
136+
137+
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
138+
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
139+
try {
140+
return "PONG".equals(connection.sync().ping());
141+
} catch (Exception e) {
142+
return false;
143+
}
144+
});
145+
146+
borrowAndReturn(pool);
147+
148+
StatefulRedisConnection<String, String> connection = pool.borrowObject();
149+
assertThat(Proxy.isProxyClass(connection.getClass())).isFalse();
150+
pool.returnObject(connection);
151+
152+
pool.close();
153+
}
154+
134155
@Test
135156
void softReferencePoolShouldWorkWithPlainConnections() throws Exception {
136157

@@ -147,6 +168,28 @@ void softReferencePoolShouldWorkWithPlainConnections() throws Exception {
147168
pool.close();
148169
}
149170

171+
@Test
172+
void softReferencePoolShouldWorkWithValidationPredicate() throws Exception {
173+
174+
SoftReferenceObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
175+
.createSoftReferenceObjectPool(() -> client.connect(), false, connection -> {
176+
try {
177+
return "PONG".equals(connection.sync().ping());
178+
} catch (Exception e) {
179+
return false;
180+
}
181+
});
182+
183+
borrowAndReturn(pool);
184+
185+
StatefulRedisConnection<String, String> connection = pool.borrowObject();
186+
assertThat(Proxy.isProxyClass(connection.getClass())).isFalse();
187+
pool.returnObject(connection);
188+
189+
connection.close();
190+
pool.close();
191+
}
192+
150193
@Test
151194
void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception {
152195

0 commit comments

Comments
 (0)