@@ -74,7 +74,6 @@ class PooledClusterConnectionProvider<K, V>
74
74
75
75
private static final InternalLogger logger = InternalLoggerFactory .getInstance (PooledClusterConnectionProvider .class );
76
76
77
- // Contains NodeId-identified and HostAndPort-identified connections.
78
77
private final Lock stateLock = new ReentrantLock ();
79
78
80
79
private final boolean debugEnabled = logger .isDebugEnabled ();
@@ -157,44 +156,39 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Conne
157
156
}
158
157
159
158
private CompletableFuture <StatefulRedisConnection <K , V >> getWriteConnection (int slot ) {
160
- if (writers [slot ] == null ) {
161
- stateLock .lock ();
162
- try {
163
- if (writers [slot ] == null ) {
164
- RedisClusterNode master = partitions .getMasterBySlot (slot );
165
- if (master == null ) {
166
- clusterEventListener .onUncoveredSlot (slot );
167
- return Futures .failed (new PartitionSelectorException (
168
- "Cannot determine a partition for slot " + slot + "." , partitions .clone ()));
169
- }
170
159
171
- // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
172
- // host because the nodeId can be handled by a different host.
173
- RedisURI uri = master . getUri () ;
174
- ConnectionKey key = new ConnectionKey ( ConnectionIntent . WRITE , uri . getHost (), uri . getPort ());
160
+ CompletableFuture < StatefulRedisConnection < K , V >> writer = writers [ slot ];
161
+ if ( writer != null ) {
162
+ return writer ;
163
+ }
175
164
176
- ConnectionFuture <StatefulRedisConnection <K , V >> future = getConnectionAsync (key );
165
+ RedisClusterNode master = partitions .getMasterBySlot (slot );
166
+ if (master == null ) {
167
+ clusterEventListener .onUncoveredSlot (slot );
168
+ return Futures .failed (
169
+ new PartitionSelectorException ("Cannot determine a partition for slot " + slot + "." , partitions .clone ()));
170
+ }
177
171
178
- return future .thenApply (connection -> {
172
+ // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
173
+ // host because the nodeId can be handled by a different host.
174
+ RedisURI uri = master .getUri ();
175
+ ConnectionKey key = new ConnectionKey (ConnectionIntent .WRITE , uri .getHost (), uri .getPort ());
179
176
180
- stateLock .lock ();
181
- try {
182
- if (writers [slot ] == null ) {
183
- writers [slot ] = CompletableFuture .completedFuture (connection );
184
- }
185
- } finally {
186
- stateLock .unlock ();
187
- }
177
+ ConnectionFuture <StatefulRedisConnection <K , V >> future = getConnectionAsync (key );
178
+
179
+ return future .thenApply (connection -> {
188
180
189
- return connection ;
190
- }).toCompletableFuture ();
181
+ stateLock .lock ();
182
+ try {
183
+ if (writers [slot ] == null ) {
184
+ writers [slot ] = CompletableFuture .completedFuture (connection );
191
185
}
192
186
} finally {
193
187
stateLock .unlock ();
194
188
}
195
- }
196
189
197
- return writers [slot ];
190
+ return connection ;
191
+ }).toCompletableFuture ();
198
192
}
199
193
200
194
private CompletableFuture <StatefulRedisConnection <K , V >> getReadConnection (int slot ) {
@@ -651,7 +645,6 @@ public ReadFrom getReadFrom() {
651
645
}
652
646
653
647
/**
654
- *
655
648
* @return number of connections.
656
649
*/
657
650
long getConnectionCount () {
@@ -682,8 +675,8 @@ private static RuntimeException connectionAttemptRejected(String message) {
682
675
}
683
676
684
677
private boolean validateClusterNodeMembership () {
685
- return redisClusterClient .getClusterClientOptions () == null
686
- || redisClusterClient . getClusterClientOptions () .isValidateClusterNodeMembership ();
678
+ return redisClusterClient .getClusterClientOptions () == null || redisClusterClient . getClusterClientOptions ()
679
+ .isValidateClusterNodeMembership ();
687
680
}
688
681
689
682
/**
0 commit comments