51
51
import org .springframework .data .redis .listener .ChannelTopic ;
52
52
import org .springframework .data .redis .listener .RedisMessageListenerContainer ;
53
53
import org .springframework .data .redis .listener .Topic ;
54
+ import org .springframework .integration .support .locks .CustomTtlLock ;
55
+ import org .springframework .integration .support .locks .CustomTtlLockRegistry ;
54
56
import org .springframework .integration .support .locks .ExpirableLockRegistry ;
55
57
import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
56
58
import org .springframework .util .Assert ;
90
92
* @since 4.0
91
93
*
92
94
*/
93
- public final class RedisLockRegistry implements ExpirableLockRegistry , DisposableBean {
95
+ public final class RedisLockRegistry implements ExpirableLockRegistry , CustomTtlLockRegistry , DisposableBean {
94
96
95
97
private static final Log LOGGER = LogFactory .getLog (RedisLockRegistry .class );
96
98
@@ -225,6 +227,11 @@ public void setRedisLockType(RedisLockType redisLockType) {
225
227
226
228
@ Override
227
229
public Lock obtain (Object lockKey ) {
230
+ return this .obtainCustomTtlLock (lockKey );
231
+ }
232
+
233
+ @ Override
234
+ public CustomTtlLock obtainCustomTtlLock (Object lockKey ) {
228
235
Assert .isInstanceOf (String .class , lockKey );
229
236
String path = (String ) lockKey ;
230
237
this .lock .lock ();
@@ -296,7 +303,7 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
296
303
};
297
304
}
298
305
299
- private abstract class RedisLock implements Lock {
306
+ private abstract class RedisLock implements CustomTtlLock {
300
307
301
308
private static final String OBTAIN_LOCK_SCRIPT = """
302
309
local lockClientId = redis.call('GET', KEYS[1])
@@ -334,11 +341,12 @@ public long getLockedAt() {
334
341
/**
335
342
* Attempt to acquire a lock in redis.
336
343
* @param time the maximum time(milliseconds) to wait for the lock, -1 infinity
344
+ * @param expireAfter the time-to-live(milliseconds) for the lock status data
337
345
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
338
346
* @throws InterruptedException –
339
347
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
340
348
*/
341
- protected abstract boolean tryRedisLockInner (long time ) throws ExecutionException , InterruptedException ;
349
+ protected abstract boolean tryRedisLockInner (long time , long expireAfter ) throws ExecutionException , InterruptedException ;
342
350
343
351
/**
344
352
* Unlock the lock using the unlink method in redis.
@@ -352,10 +360,16 @@ public long getLockedAt() {
352
360
353
361
@ Override
354
362
public final void lock () {
363
+ this .lock (RedisLockRegistry .this .expireAfter , TimeUnit .MILLISECONDS );
364
+ }
365
+
366
+ @ Override
367
+ public void lock (long customTtl , TimeUnit customTtlUnit ) {
355
368
this .localLock .lock ();
356
369
while (true ) {
357
370
try {
358
- if (tryRedisLock (-1L )) {
371
+ long customTtlInMilliseconds = TimeUnit .MILLISECONDS .convert (customTtl , customTtlUnit );
372
+ if (tryRedisLock (-1L , customTtlInMilliseconds )) {
359
373
return ;
360
374
}
361
375
}
@@ -382,7 +396,7 @@ public final void lockInterruptibly() throws InterruptedException {
382
396
this .localLock .lockInterruptibly ();
383
397
while (true ) {
384
398
try {
385
- if (tryRedisLock (-1L )) {
399
+ if (tryRedisLock (-1L , RedisLockRegistry . this . expireAfter )) {
386
400
return ;
387
401
}
388
402
}
@@ -411,12 +425,18 @@ public final boolean tryLock() {
411
425
412
426
@ Override
413
427
public final boolean tryLock (long time , TimeUnit unit ) throws InterruptedException {
428
+ return this .tryLock (time , unit , RedisLockRegistry .this .expireAfter , TimeUnit .MILLISECONDS );
429
+ }
430
+
431
+ @ Override
432
+ public boolean tryLock (long time , TimeUnit unit , long customTtl , TimeUnit customTtlUnit ) throws InterruptedException {
414
433
if (!this .localLock .tryLock (time , unit )) {
415
434
return false ;
416
435
}
417
436
try {
418
437
long waitTime = TimeUnit .MILLISECONDS .convert (time , unit );
419
- boolean acquired = tryRedisLock (waitTime );
438
+ long customTtlInMilliseconds = TimeUnit .MILLISECONDS .convert (customTtl , customTtlUnit );
439
+ boolean acquired = tryRedisLock (waitTime , customTtlInMilliseconds );
420
440
if (!acquired ) {
421
441
this .localLock .unlock ();
422
442
}
@@ -429,8 +449,8 @@ public final boolean tryLock(long time, TimeUnit unit) throws InterruptedExcepti
429
449
return false ;
430
450
}
431
451
432
- private boolean tryRedisLock (long time ) throws ExecutionException , InterruptedException {
433
- final boolean acquired = tryRedisLockInner (time );
452
+ private boolean tryRedisLock (long time , long expireAfter ) throws ExecutionException , InterruptedException {
453
+ final boolean acquired = tryRedisLockInner (time , expireAfter );
434
454
if (acquired ) {
435
455
if (LOGGER .isDebugEnabled ()) {
436
456
LOGGER .debug ("Acquired lock; " + this );
@@ -440,11 +460,11 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
440
460
return acquired ;
441
461
}
442
462
443
- protected final Boolean obtainLock () {
463
+ protected final Boolean obtainLock (long expireAfter ) {
444
464
return RedisLockRegistry .this .redisTemplate
445
465
.execute (OBTAIN_LOCK_REDIS_SCRIPT , Collections .singletonList (this .lockKey ),
446
466
RedisLockRegistry .this .clientId ,
447
- String .valueOf (RedisLockRegistry . this . expireAfter ));
467
+ String .valueOf (expireAfter ));
448
468
}
449
469
450
470
@ Override
@@ -598,8 +618,8 @@ private RedisPubSubLock(String path) {
598
618
}
599
619
600
620
@ Override
601
- protected boolean tryRedisLockInner (long time ) throws ExecutionException , InterruptedException {
602
- return subscribeLock (time );
621
+ protected boolean tryRedisLockInner (long time , long expireAfter ) throws ExecutionException , InterruptedException {
622
+ return subscribeLock (time , expireAfter );
603
623
}
604
624
605
625
@ Override
@@ -618,9 +638,9 @@ private boolean removeLockKeyWithScript(RedisScript<Boolean> redisScript) {
618
638
RedisLockRegistry .this .clientId , RedisLockRegistry .this .unLockChannelKey ));
619
639
}
620
640
621
- private boolean subscribeLock (long time ) throws ExecutionException , InterruptedException {
641
+ private boolean subscribeLock (long time , long expireAfter ) throws ExecutionException , InterruptedException {
622
642
final long expiredTime = System .currentTimeMillis () + time ;
623
- if (obtainLock ()) {
643
+ if (obtainLock (expireAfter )) {
624
644
return true ;
625
645
}
626
646
@@ -635,7 +655,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
635
655
Future <String > future =
636
656
RedisLockRegistry .this .unlockNotifyMessageListener .subscribeLock (this .lockKey );
637
657
//DCL
638
- if (obtainLock ()) {
658
+ if (obtainLock (expireAfter )) {
639
659
return true ;
640
660
}
641
661
try {
@@ -645,7 +665,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
645
665
}
646
666
catch (TimeoutException ignore ) {
647
667
}
648
- if (obtainLock ()) {
668
+ if (obtainLock (expireAfter )) {
649
669
return true ;
650
670
}
651
671
}
@@ -737,18 +757,18 @@ private RedisSpinLock(String path) {
737
757
}
738
758
739
759
@ Override
740
- protected boolean tryRedisLockInner (long time ) throws InterruptedException {
760
+ protected boolean tryRedisLockInner (long time , long expireAfter ) throws InterruptedException {
741
761
long now = System .currentTimeMillis ();
742
762
if (time == -1L ) {
743
- while (!obtainLock ()) {
763
+ while (!obtainLock (expireAfter )) {
744
764
Thread .sleep (100 ); //NOSONAR
745
765
}
746
766
return true ;
747
767
}
748
768
else {
749
769
long expire = now + TimeUnit .MILLISECONDS .convert (time , TimeUnit .MILLISECONDS );
750
770
boolean acquired ;
751
- while (!(acquired = obtainLock ()) && System .currentTimeMillis () < expire ) { //NOSONAR
771
+ while (!(acquired = obtainLock (expireAfter )) && System .currentTimeMillis () < expire ) { //NOSONAR
752
772
Thread .sleep (100 ); //NOSONAR
753
773
}
754
774
return acquired ;
0 commit comments