5
5
import java .util .List ;
6
6
import java .util .Set ;
7
7
import java .util .concurrent .atomic .AtomicBoolean ;
8
+ import java .util .stream .Collectors ;
8
9
9
10
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
10
11
import org .slf4j .Logger ;
@@ -21,7 +22,7 @@ public class JedisSentinelPool extends JedisPoolAbstract {
21
22
@ Deprecated
22
23
protected static Logger log = LoggerFactory .getLogger (JedisSentinelPool .class );
23
24
24
- protected final GenericObjectPoolConfig <Jedis > poolConfig ;
25
+ @ Deprecated protected final GenericObjectPoolConfig <Jedis > poolConfig ;
25
26
private final JedisFactory factory ;
26
27
27
28
@ Deprecated protected int connectionTimeout ;
@@ -39,6 +40,8 @@ public class JedisSentinelPool extends JedisPoolAbstract {
39
40
@ Deprecated protected String sentinelPassword ;
40
41
@ Deprecated protected String sentinelClientName ;
41
42
43
+ private final JedisClientConfig sentinelClientConfig ;
44
+
42
45
protected final Set <MasterListener > masterListeners = new HashSet <>();
43
46
44
47
private volatile HostAndPort currentHostMaster ;
@@ -177,14 +180,33 @@ public JedisSentinelPool(String masterName, Set<String> sentinels,
177
180
178
181
public JedisSentinelPool (String masterName , Set <String > sentinels ,
179
182
final GenericObjectPoolConfig <Jedis > poolConfig , final JedisFactory factory ) {
183
+ this (masterName , parseHostAndPorts (sentinels ), poolConfig , factory ,
184
+ DefaultJedisClientConfig .builder ().build ());
185
+ }
186
+
187
+ public JedisSentinelPool (String masterName , Set <HostAndPort > sentinels ,
188
+ final GenericObjectPoolConfig <Jedis > poolConfig , final JedisClientConfig masteClientConfig ,
189
+ final JedisClientConfig sentinelClientConfig ) {
190
+ this (masterName , sentinels , poolConfig , new JedisFactory (masteClientConfig ), sentinelClientConfig );
191
+ }
192
+
193
+ public JedisSentinelPool (String masterName , Set <HostAndPort > sentinels ,
194
+ final GenericObjectPoolConfig <Jedis > poolConfig , final JedisFactory factory ,
195
+ final JedisClientConfig sentinelClientConfig ) {
180
196
super (poolConfig , factory );
197
+
181
198
this .poolConfig = poolConfig ;
182
199
this .factory = factory ;
200
+ this .sentinelClientConfig = sentinelClientConfig ;
183
201
184
202
HostAndPort master = initSentinels (sentinels , masterName );
185
203
initMaster (master );
186
204
}
187
205
206
+ private static Set <HostAndPort > parseHostAndPorts (Set <String > strings ) {
207
+ return strings .parallelStream ().map (str -> HostAndPort .parseString (str )).collect (Collectors .toSet ());
208
+ }
209
+
188
210
@ Override
189
211
public void destroy () {
190
212
for (MasterListener m : masterListeners ) {
@@ -212,51 +234,44 @@ private void initMaster(HostAndPort master) {
212
234
}
213
235
}
214
236
215
- private HostAndPort initSentinels (Set <String > sentinels , final String masterName ) {
237
+ private HostAndPort initSentinels (Set <HostAndPort > sentinels , final String masterName ) {
216
238
217
239
HostAndPort master = null ;
218
240
boolean sentinelAvailable = false ;
219
241
220
242
log .info ("Trying to find master from available Sentinels..." );
221
243
222
- for (String sentinel : sentinels ) {
223
- final HostAndPort hap = HostAndPort .parseString (sentinel );
244
+ for (HostAndPort sentinel : sentinels ) {
224
245
225
- log .debug ("Connecting to Sentinel {}" , hap );
246
+ log .debug ("Connecting to Sentinel {}" , sentinel );
226
247
227
- try (Jedis jedis = new Jedis (hap .getHost (), hap .getPort (), sentinelConnectionTimeout , sentinelSoTimeout )) {
228
- if (sentinelUser != null ) {
229
- jedis .auth (sentinelUser , sentinelPassword );
230
- } else if (sentinelPassword != null ) {
231
- jedis .auth (sentinelPassword );
232
- }
233
- if (sentinelClientName != null ) {
234
- jedis .clientSetname (sentinelClientName );
235
- }
248
+ try (Jedis jedis = new Jedis (sentinel , sentinelClientConfig )) {
236
249
237
250
List <String > masterAddr = jedis .sentinelGetMasterAddrByName (masterName );
238
251
239
252
// connected to sentinel...
240
253
sentinelAvailable = true ;
241
254
242
255
if (masterAddr == null || masterAddr .size () != 2 ) {
243
- log .warn ("Can not get master addr, master name: {}. Sentinel: {}" , masterName , hap );
256
+ log .warn ("Can not get master addr, master name: {}. Sentinel: {}" , masterName , sentinel );
244
257
continue ;
245
258
}
246
259
247
260
master = toHostAndPort (masterAddr );
248
261
log .debug ("Found Redis master at {}" , master );
249
262
break ;
250
263
} catch (JedisException e ) {
251
- // resolves #1036, it should handle JedisException there's another chance of raising JedisDataException
252
- log .warn ("Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one." , hap , e );
264
+ // resolves #1036, it should handle JedisException there's another chance
265
+ // of raising JedisDataException
266
+ log .warn (
267
+ "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one." ,
268
+ sentinel , e );
253
269
}
254
270
}
255
271
256
272
if (master == null ) {
257
273
if (sentinelAvailable ) {
258
- // can connect to sentinel, but master name seems to not
259
- // monitored
274
+ // can connect to sentinel, but master name seems to not monitored
260
275
throw new JedisException ("Can connect to sentinel, but " + masterName
261
276
+ " seems to be not monitored..." );
262
277
} else {
@@ -267,9 +282,9 @@ private HostAndPort initSentinels(Set<String> sentinels, final String masterName
267
282
268
283
log .info ("Redis master running at {}, starting Sentinel listeners..." , master );
269
284
270
- for (String sentinel : sentinels ) {
271
- final HostAndPort hap = HostAndPort . parseString ( sentinel );
272
- MasterListener masterListener = new MasterListener (masterName , hap .getHost (), hap .getPort ());
285
+ for (HostAndPort sentinel : sentinels ) {
286
+
287
+ MasterListener masterListener = new MasterListener (masterName , sentinel .getHost (), sentinel .getPort ());
273
288
// whether MasterListener threads are alive or not, process can be stopped
274
289
masterListener .setDaemon (true );
275
290
masterListeners .add (masterListener );
@@ -357,28 +372,22 @@ public void run() {
357
372
break ;
358
373
}
359
374
360
- j = new Jedis (host , port , sentinelConnectionTimeout , sentinelSoTimeout );
361
- if (sentinelUser != null ) {
362
- j .auth (sentinelUser , sentinelPassword );
363
- } else if (sentinelPassword != null ) {
364
- j .auth (sentinelPassword );
365
- }
366
- if (sentinelClientName != null ) {
367
- j .clientSetname (sentinelClientName );
368
- }
375
+ final HostAndPort hostPort = new HostAndPort (host , port );
376
+ j = new Jedis (hostPort , sentinelClientConfig );
369
377
370
378
// code for active refresh
371
379
List <String > masterAddr = j .sentinelGetMasterAddrByName (masterName );
372
380
if (masterAddr == null || masterAddr .size () != 2 ) {
373
- log .warn ("Can not get master addr, master name: {}. Sentinel: {}:{}." , masterName , host , port );
381
+ log .warn ("Can not get master addr, master name: {}. Sentinel: {}." , masterName ,
382
+ hostPort );
374
383
} else {
375
384
initMaster (toHostAndPort (masterAddr ));
376
385
}
377
386
378
387
j .subscribe (new JedisPubSub () {
379
388
@ Override
380
389
public void onMessage (String channel , String message ) {
381
- log .debug ("Sentinel {}:{} published: {}." , host , port , message );
390
+ log .debug ("Sentinel {} published: {}." , hostPort , message );
382
391
383
392
String [] switchMasterMsg = message .split (" " );
384
393
@@ -393,9 +402,8 @@ public void onMessage(String channel, String message) {
393
402
}
394
403
395
404
} else {
396
- log .error (
397
- "Invalid message received on Sentinel {}:{} on channel +switch-master: {}" , host ,
398
- port , message );
405
+ log .error ("Invalid message received on Sentinel {} on channel +switch-master: {}" ,
406
+ hostPort , message );
399
407
}
400
408
}
401
409
}, "+switch-master" );
@@ -427,7 +435,7 @@ public void shutdown() {
427
435
running .set (false );
428
436
// This isn't good, the Jedis object is not thread safe
429
437
if (j != null ) {
430
- j .disconnect ();
438
+ j .close ();
431
439
}
432
440
} catch (Exception e ) {
433
441
log .error ("Caught exception while shutting down: " , e );
0 commit comments