@@ -157,47 +157,44 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Conne
157
157
}
158
158
159
159
private CompletableFuture <StatefulRedisConnection <K , V >> getWriteConnection (int slot ) {
160
+ if (writers [slot ] == null ) {
161
+ stateLock .lock ();
162
+ try {
163
+ if (writers [slot ] == null ) {
164
+ RedisClusterNode master = partitions .getMasterBySlot (slot );
165
+ if (master == null ) {
166
+ clusterEventListener .onUncoveredSlot (slot );
167
+ return Futures .failed (new PartitionSelectorException (
168
+ "Cannot determine a partition for slot " + slot + "." , partitions .clone ()));
169
+ }
160
170
161
- CompletableFuture <StatefulRedisConnection <K , V >> writer ;// avoid races when reconfiguring partitions.
171
+ // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
172
+ // host because the nodeId can be handled by a different host.
173
+ RedisURI uri = master .getUri ();
174
+ ConnectionKey key = new ConnectionKey (ConnectionIntent .WRITE , uri .getHost (), uri .getPort ());
162
175
163
- stateLock .lock ();
164
- try {
165
- writer = writers [slot ];
166
- } finally {
167
- stateLock .unlock ();
168
- }
176
+ ConnectionFuture <StatefulRedisConnection <K , V >> future = getConnectionAsync (key );
169
177
170
- if (writer == null ) {
171
- RedisClusterNode master = partitions .getMasterBySlot (slot );
172
- if (master == null ) {
173
- clusterEventListener .onUncoveredSlot (slot );
174
- return Futures .failed (new PartitionSelectorException ("Cannot determine a partition for slot " + slot + "." ,
175
- partitions .clone ()));
176
- }
178
+ return future .thenApply (connection -> {
177
179
178
- // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
179
- // host because the nodeId can be handled by a different host.
180
- RedisURI uri = master . getUri ();
181
- ConnectionKey key = new ConnectionKey ( ConnectionIntent . WRITE , uri . getHost (), uri . getPort () );
182
-
183
- ConnectionFuture < StatefulRedisConnection < K , V >> future = getConnectionAsync ( key );
184
-
185
- return future . thenApply ( connection -> {
180
+ stateLock . lock ();
181
+ try {
182
+ if ( writers [ slot ] == null ) {
183
+ writers [ slot ] = CompletableFuture . completedFuture ( connection );
184
+ }
185
+ } finally {
186
+ stateLock . unlock ();
187
+ }
186
188
187
- stateLock .lock ();
188
- try {
189
- if (writers [slot ] == null ) {
190
- writers [slot ] = CompletableFuture .completedFuture (connection );
191
- }
192
- } finally {
193
- stateLock .unlock ();
189
+ return connection ;
190
+ }).toCompletableFuture ();
194
191
}
195
-
196
- return connection ;
197
- }). toCompletableFuture ();
192
+ } finally {
193
+ stateLock . unlock () ;
194
+ }
198
195
}
199
196
200
- return writer ;
197
+ return writers [ slot ] ;
201
198
}
202
199
203
200
private CompletableFuture <StatefulRedisConnection <K , V >> getReadConnection (int slot ) {
0 commit comments