Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Serverless issue - psubscribe error - how can I disable in Spring #3075

Closed
dreamstar-enterprises opened this issue Dec 14, 2024 · 8 comments
Labels
status: invalid An issue that we don't feel is valid

Comments

@dreamstar-enterprises
Copy link

dreamstar-enterprises commented Dec 14, 2024

Hi,

I'm using AWS Elasticache Serverless, and in the logs keep seeing the below errors.

I thought I had turned pub/sub events off in my app, but keep getting 2 errors and cannot see where I have gone wrong.

Can someone help?

Here are the 2 errors, and then my Spring Connection Factory Code:

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'psubscribe', with args beginning with: spring:session:event:0:created:*

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

ERROR 1

14 December 2024 at 18:43 (UTC)
	
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'psubscribe', with args beginning with: spring:session:event:0:created:*
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:116) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:761) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:745) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.pubsub.PubSubCommandHandler.complete(PubSubCommandHandler.java:167) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:680) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:597) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1336) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
... 4 common frames omitted
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff

ERROR 2

2024-12-14T18:49:56.253Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

....

at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]

at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

CONNECTION FACTORY

**
 * Establishes a Redis Connection Factory with comprehensive configuration options.
 *
 * Provides configuration for:
 * - Connection pooling and lifecycle management
 * - Cluster and standalone deployment modes
 * - SSL/TLS security for production environments
 * - DNS resolution and caching
 * - Performance tuning (thread pools, buffers, queues)
 * - High availability features (topology refresh, failover)
 *
 * The factory supports different deployment profiles:
 * - Production: Clustered Redis with SSL
 * - Development: Standalone Redis without SSL
 *
 * @property profileProperties Configuration properties for active deployment profile
 * @since 1.0.0
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val profileProperties: ProfileProperties
) {

    /**
     * Client resources for Redis connections.
     *
     * Manages shared resources including:
     * - Thread pools for I/O and computation
     * - DNS resolution and caching
     * - Command latency metrics
     * - Connection lifecycle
     *
     * This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
     * Using lateinit as the resources are created after Spring context initialization.
     */
    private lateinit var clientResources: ClientResources

    companion object {

        private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)

        /**
         * Timeout configurations for Redis operations.
         * - Command timeout: Maximum time for command execution
         * - Connect timeout: Maximum time for connection establishment
         * - Topology refresh: Interval for cluster topology updates
         * - Adaptive refresh: Time window for topology change detection
         */
        private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
        private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
        private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
        private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
        private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
        private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L

        /**
         * Connection pool settings to optimize resource usage.
         * - Max total: Maximum number of connections in the pool
         * - Max/Min idle: Upper/Lower bounds for idle connections
         * - Max wait: Maximum time to wait for connection
         * - Eviction: Maintenance settings for idle connections
         */
        private const val MAX_TOTAL_CONNECTIONS = 100
        private const val MAX_IDLE_CONNECTIONS = 60
        private const val MIN_IDLE_CONNECTIONS = 20
        private const val MAX_WAIT_SECONDS = 120L
        private const val EVICTION_RUN_PERIOD_SECONDS = 120L
        private const val MIN_EVICTABLE_IDLE_MINUTES = 5L

        /**
         * Performance optimization parameters.
         * - Decode buffer ratio: Memory allocation for response buffers
         * - Request queue size: Maximum pending requests
         * - Latency publish interval: Metrics publication frequency
         * - Thread pool multiplier: Scaling factor for I/O threads
         */
        private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
        private const val REQUEST_QUEUE_SIZE = 2500
        private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
        private const val IO_THREAD_POOL_MULTIPLIER = 2
    }

    /**
     * Configures Redis keyspace notifications behavior.
     *
     * Returns NO_OP action to disable automatic configuration of keyspace
     * notifications, preventing potential connection issues during shutdown
     * related to Pub/Sub connections in Spring Session.
     *
     * @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
     */
    @Bean
    fun configureRedisAction(): ConfigureRedisAction {
        return ConfigureRedisAction.NO_OP
    }

    /* LETTUCE - reactive RedisConnectionFactory */
    /**
     * Creates the primary reactive Redis connection factory.
     *
     * Configures a Lettuce-based connection factory with:
     * - Profile-specific Redis deployment mode (clustered/standalone)
     * - Connection pooling
     * - Client resources (thread pools, DNS resolution)
     * - SSL for production environments
     *
     * @param clusterProperties Cluster node configuration
     * @param springDataRedisProperties Redis connection properties
     * @return Configured [ReactiveRedisConnectionFactory]
     */
    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
        springDataRedisProperties: SpringDataRedisProperties
    ): ReactiveRedisConnectionFactory {
        val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
        clientResources = createClientResources(springDataRedisProperties.host)
        val clientConfig = createLettuceClientConfig(
            clientResources,
            profileProperties.active ?: ProfileTypes.DEVELOPMENT.type
        )

        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)

            // Pre-warm the connection pool
            repeat(MIN_IDLE_CONNECTIONS) {
                try {
                    this.connection.close()
                } catch (e: Exception) {
                    // Ignore pre-warm exceptions
                }
            }
        }
    }

    /**
     * Performs graceful cleanup of Redis client resources on shutdown.
     *
     * Features:
     * - Allows in-flight operations to complete
     * - Controlled shutdown with timeouts
     * - Resource cleanup verification
     * - Error handling
     *
     * Timeouts:
     * - Quiet period: 1 seconds for normal completion
     * - Force shutdown: 2 additional seconds if needed
     */
    @PreDestroy
    fun cleanup() {
        try {

            logger.debug("Starting Redis client resources cleanup")

            // Brief pause before cleanup to allow in-flight operations to complete
            Thread.sleep(100)

            // Shutdown client resources with a grace period
            clientResources.shutdown(
                SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS,
                SHUTDOWN_TIMEOUT_SECONDS,
                TimeUnit.SECONDS
            )

            logger.debug("Redis client resources cleanup completed successfully")
        } catch (ex: Exception) {
            logger.debug("Failed to cleanup Redis client resources", ex)
        }
    }

    /**
     * Properties class for Redis cluster configuration.
     *
     * @property nodes List of Redis nodes in format `host:port`
     */
    @Component
    internal class ClusterConfigurationProperties(
        springDataRedisProperties: SpringDataRedisProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataRedisProperties.host}:${springDataRedisProperties.port}",
        )
    }

    /**
     * Creates appropriate Redis configuration based on active profile.
     *
     * @param properties Redis connection properties
     * @param clusterProperties Cluster node configuration
     * @return [RedisConfiguration] for either clustered or standalone deployment
     */
    private fun createRedisConfiguration(
        properties: SpringDataRedisProperties,
        clusterProperties: ClusterConfigurationProperties
    ): RedisConfiguration = when {
        profileProperties.active == ProfileTypes.PRODUCTION.type &&
                properties.type == RedisConfigTypes.CLUSTERED.type -> {

            // Redis Cluster for production
            RedisClusterConfiguration(clusterProperties.nodes).apply {
                password = RedisPassword.of(properties.password)
            }
        }
        else -> {

            // Redis Standalone for non-production
            RedisStandaloneConfiguration().apply {
                hostName = properties.host
                port = properties.port
                password = RedisPassword.of(properties.password)
            }
        }
    }

    /**
     * Creates client resources with optimized thread pools and DNS resolution.
     *
     * @param host Redis host for DNS resolution
     * @return Configured [ClientResources]
     */
    private fun createClientResources(host: String) = DefaultClientResources.builder()
        .ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
        .computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
        .socketAddressResolver(createCachingDnsResolver(host))
        .commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
        .commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
        .build()

    /**
     * Creates Lettuce client configuration with pooling and security settings.
     *
     * Configures:
     * - Read preferences (replica preferred)
     * - Command timeouts
     * - Connection pooling
     * - SSL (for production)
     *
     * @param clientResources Configured client resources for connection management
     * @param activeProfile Current deployment profile
     * @return Configured [LettucePoolingClientConfiguration]
     */
    private fun createLettuceClientConfig(
        clientResources: ClientResources,
        activeProfile: String
    ): LettucePoolingClientConfiguration {
        val clusterClientOptions = createClusterClientOptions(activeProfile)

        return LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            .commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())
            .shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
            .shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
            // conditionally use sslOptions if profileProperties.active is 'prod'
            .apply {
                if (activeProfile == ProfileTypes.PRODUCTION.type) {
                    useSsl()
                }
            }
            .build()
    }

    /**
     * Creates cluster client options with comprehensive connection settings.
     *
     * Configures:
     * - Auto-reconnect behavior
     * - Connection validation
     * - Timeout settings
     * - Socket options
     * - Topology refresh
     * - Buffer and queue sizes
     * - SSL (for production)
     *
     * @param activeProfile Current deployment profile
     * @return Configured [ClusterClientOptions]
     */
    private fun createClusterClientOptions(activeProfile: String): ClusterClientOptions {
        val builder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(createTimeoutOptions())
            .socketOptions(createSocketOptions())
            .topologyRefreshOptions(createTopologyRefreshOptions())
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
            .requestQueueSize(REQUEST_QUEUE_SIZE)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
            .publishOnScheduler(true)
            .protocolVersion(ProtocolVersion.RESP3)

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (activeProfile == ProfileTypes.PRODUCTION.type) {
            builder.sslOptions(createSslOptions())
        }

        return builder.build()
    }

    /**
     * Creates socket options for Redis connections.
     *
     * Configures:
     * - Keep-alive settings
     * - TCP no-delay
     * - Connection timeouts
     *
     * @return Configured [SocketOptions]
     */
    private fun createSocketOptions() = SocketOptions.builder()
        .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
        .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
        .connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
        .build()

    /**
     * Creates timeout options for Redis commands.
     *
     * Configures:
     * - Fixed timeout duration
     * - Command timeout behavior
     *
     * @return Configured [TimeoutOptions]
     */
    private fun createTimeoutOptions() = TimeoutOptions.builder()
        .fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
        .timeoutCommands(true)
        .build()

    /**
     * Creates topology refresh options for Redis cluster.
     *
     * Configures:
     * - Periodic refresh intervals
     * - Dynamic refresh sources
     * - Stale connection handling
     * - Adaptive refresh triggers
     *
     * @return Configured [ClusterTopologyRefreshOptions]
     */
    private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
        .enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
        .dynamicRefreshSources(true)
        .closeStaleConnections(true)
        .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
        .enableAllAdaptiveRefreshTriggers()
        .build()

    /**
     * Creates connection pool configuration.
     *
     * Configures:
     * - Maximum total/idle connections
     * - Connection wait times
     * - Eviction policies
     * - Connection testing
     * - Pool behavior (LIFO/FIFO)
     *
     * @return Configured [GenericObjectPoolConfig]
     */
    private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
        maxTotal = MAX_TOTAL_CONNECTIONS
        maxIdle = MAX_IDLE_CONNECTIONS
        minIdle = MIN_IDLE_CONNECTIONS
        setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
        timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
        minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
        testOnBorrow = true
        testWhileIdle = true
        testOnReturn = true
        blockWhenExhausted = true
        lifo = true
        jmxEnabled = false
        fairness = true
        evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
        numTestsPerEvictionRun = 3
    }

    /**
     * Creates SSL options for secure Redis connections.
     *
     * Configures JDK-based SSL provider for Redis connections
     * in production environments.
     *
     * @return Configured [SslOptions]
     */
    private fun createSslOptions(): SslOptions = SslOptions.builder()
        .jdkSslProvider()
        .build()

    /**
     * Creates DNS resolver with caching capabilities.
     *
     * Implements:
     * - DNS resolution caching
     * - Hostname-to-IP mapping
     * - Fallback handling for resolution failures
     *
     * @param host Redis host to resolve
     * @return Configured [MappingSocketAddressResolver]
     */
    private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
        val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val addresses = dnsCache.computeIfAbsent(host) {
                try {
                    DnsResolvers.JVM_DEFAULT.resolve(host)
                } catch (e: UnknownHostException) {
                    emptyArray()
                }
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress
            if (hostAndPort.hostText == cacheIP) {
                HostAndPort.of(host, hostAndPort.port)
            } else {
                hostAndPort
            }
        }

        return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
    }

}
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 14, 2024
@mp911de
Copy link
Member

mp911de commented Dec 16, 2024

spring:session:event is a prefix used by Spring Session. Spring Session uses Pub/Sub for session expiry. Because that is fundamental functionality, I think you cannot use Spring Session with Redis Serverless.

@Clete2
Copy link

Clete2 commented Feb 19, 2025

@mp911de apologies for commenting on a closed issue. I'm in a situation where we are also using Redis-backed sessions and using an ElastiCache Serverless instance setup by another team. We're trying to avoid the situation of having to make our own ElastiCache cluster, if at all possible.

If we do not plan to use @EventListener (from Spring docs) is there any harm to overriding the default RedisMessageListenerContainer bean to not have any subscriptions?

I am assuming that if we do not use session create/expire/destroy event listeners that there would be no impact to functionality. Or, is there internal functionality that we would break? Maybe some functionality that cleans up expired sessions.

@mp911de
Copy link
Member

mp911de commented Feb 20, 2025

How about not registering RedisMessageListenerContainer in the first place (or disabling auto-startup)? Overriding RedisMessageListenerContainer methods can lead to leaks if we decide to move functionality around.

@dreamstar-enterprises
Copy link
Author

I could not solve this with AWS Elasticache Serverless (using their Valkey or Redis offering)
So resorted to the less preferred option of AWS Elasticache Cluster
No errors then.
I might retry following mp911de suggestion above, but not sure where RedisMessageListenerContainer is in my code, to disable.
Would perhaps be nice to be able to use Spring Session with AWS Elasticache Serverless

@Clete2
Copy link

Clete2 commented Feb 20, 2025

RedisMessageListenerContainer is defined as a bean when RedisHttpSessionConfiguration is defined. I have yet to find where that is configured, but my assumption is that it is configured somehow when EnableRedisHttpSession annotation is processed. I don't think you or I are intentionally defining the RedisMessageListenerContainer.

@Clete2
Copy link

Clete2 commented Feb 21, 2025

@dreamstar-enterprises what version of Spring Boot are you using?

I did some testing on a very basic sample application this morning. I found that Spring Boot 2.x requires psubscribe when you use @EnableRedisHttpSession, but Spring Boot 3.x only requires it when you use an indexed session using @EnableRedisIndexedHttpSession.

For this very basic application, moving off of the legacy Spring Boot 2.x fixes the issue, so long as you do not need an indexed repository.

I'm working to validate this on a real world application.

@dreamstar-enterprises
Copy link
Author

dreamstar-enterprises commented Feb 21, 2025

Hi Clete,

I'm using :: Spring Boot :: (v3.3.7)

Here is my security chain config - and redis connection factory, but I still got the pub / sub issue (and a few other issues with Spring Security, Spring Redis, and Spring Session, since I started on my project in July 2024...)

I'm also using the reactive version, Spring Webflux.

Cheers,

/**
 * Security configuration for the BFF (Backend-For-Frontend) application.
 *
 * This configuration handles:
 * - OAuth2 client configuration
 * - CSRF protection
 * - CORS settings
 * - Session management
 * - Authentication flows
 * - Logout handling
 * - Security context management
 *
 * @property serverProperties Server configuration properties
 *
 * @see EnableWebFluxSecurity
 * @see EnableRedisIndexedWebSession
 */
@Configuration
@EnableWebFluxSecurity
@EnableRedisIndexedWebSession
internal class BffSecurityConfig (
    private val serverProperties: ServerProperties
) {

    /**
     * Creates the main security filter chain for client-side requests.
     *
     * Configures:
     * 1. Security matchers for protected routes
     * 2. Authentication handling
     * 3. CSRF protection
     * 4. CORS configuration
     * 5. Session management
     * 6. OAuth2 login flow
     * 7. Logout handling
     * 8. Back-channel logout (if enabled)
     *
     * @param http Base security configuration
     * @param nettyServerProperties Netty server settings
     * @param clientRegistrationRepository OAuth2 client registration repository
     *
     * @param loginProperties Login configuration
     *
     * @param serverCsrfTokenRepository CSRF token storage
     * @param spaCsrfTokenRequestHandler CSRF token handler for SPA
     * @param csrfProtectionMatcher CSRF protection matcher
     * @param csrfCookieWebFilter CSRF cookie filter
     *
     * @param corsConfig CORS configuration
     * @param corsProperties CORS settings
     *
     * @param reactiveRequestCache Request caching
     * @param reactiveSessionRegistry Session registry
     * @param maximumSessionsExceededHandler Session limit handler
     *
     * @param securityMatcherProperties SecurityMatcher settings
     * @param permitAllProperties Authorization settings
     *
     * @param oauthAuthorizationRequestResolver OAuth2 authorization resolver
     * @param redisAuthorizationRequestRepository OAuth2 authorization storage
     * @param serverPreAuthorizationCodeRedirectStrategy Authorization redirect handler
     * @param serverDelegatingAuthenticationSuccessHandler Authentication success handler
     * @param serverOAuth2AuthenticationFailureHandler Authentication failure handler
     *
     * @param redisSecurityContextRepository Security context storage
     * @param reactiveClientRegistrationRepository Client registration repository
     * @param redisAuthorizedClientRepository Authorized client storage
     *
     * @param logoutProperties Logout settings
     * @param logoutHandler Session logout handler
     * @param logoutSuccessHandler OAuth2 logout success handler
     * @param backChannelLogoutProperties Back-channel logout settings
     * @param noopSpringDataOidcSessionsStrategy OIDC session strategy
     *
     * @param authorizePostProcessor Authorization post-processor
     * @param httpPostProcessor HTTP security post-processor
     *
     * @param configurationExtra Extra Security-Chain configuration
     *
     * @return Configured SecurityWebFilterChain
     */
    @Bean
    @Order(Ordered.LOWEST_PRECEDENCE - 1)
    fun clientSecurityFilterChain(
        http: ServerHttpSecurity,
        nettyServerProperties: NettyServerProperties,
        clientRegistrationRepository: ReactiveClientRegistrationRepository,

        loginProperties: LoginProperties,

        serverCsrfTokenRepository: ServerCsrfTokenRepository,
        spaCsrfTokenRequestHandler: SPACsrfTokenRequestHandler,
        csrfProtectionMatcher: CsrfProtectionMatcher,
        csrfCookieWebFilter: CsrfWebCookieFilter,
        securityContextFilter: SecurityContextFilter,
        bffServerRateLimiterFilter: BFFServerRateLimiterFilter,

        corsConfig: CORSConfig,
        corsProperties: CorsProperties,

        reactiveRequestCache: ReactiveRequestCache,
        reactiveSessionRegistry: ReactiveSessionRegistry,
        maximumSessionsExceededHandler: CustomMaximumSessionsExceededHandler,

        securityMatcherProperties: SecurityMatcherProperties,
        permitAllProperties: PermitAllProperties,

        oauthAuthorizationRequestResolver: ServerOAuth2AuthorizationRequestResolver,
        redisAuthorizationRequestRepository: RedisAuthorizationRequestRepository,
        serverPreAuthorizationCodeRedirectStrategy: ServerPreAuthorizationCodeRedirectStrategy,
        serverDelegatingAuthenticationSuccessHandler: ServerDelegatingAuthenticationSuccessHandler,
        serverOAuth2AuthenticationFailureHandler: ServerOAuth2AuthenticationFailureHandler,

        redisSecurityContextRepository: RedisSecurityContextRepository,
        reactiveClientRegistrationRepository: ReactiveClientRegistrationRepository,
        redisAuthorizedClientRepository: RedisServerOAuth2AuthorizedClientRepository,

        logoutProperties: LogoutProperties,
        logoutHandler: ServerSessionLogoutHandler,
        logoutSuccessHandler: ServerOAuth2LogoutSuccessHandler,
        backChannelLogoutProperties: BackChannelLogoutProperties,
        noopSpringDataOidcSessionsStrategy: NoopSpringDataOidcSessionsStrategy,

        authorizePostProcessor: ClientAuthorizeExchangeSpecPostProcessor,
        httpPostProcessor: ClientReactiveHttpSecurityPostProcessor,

        configurationExtra: ClientConfigurationExtra,

        ): SecurityWebFilterChain {

        /* initialise logger */
        val logger = LoggerFactory.getLogger(SecurityWebFilterChain::class.java)

        /* apply security matchers to this filter chain */
        val clientRoutes: List<ServerWebExchangeMatcher> = securityMatcherProperties
            .securityMatchers
            .map { PathPatternParserServerWebExchangeMatcher(it) }
        logger.debug(
            "Applying client OAuth2 configuration for: {}",
            securityMatcherProperties.securityMatchers
        )
        http.securityMatcher(OrServerWebExchangeMatcher(clientRoutes))

        /* unauthenticated exception handler (re-directs to login uri) */
        loginProperties.LOGIN_URL.let { loginPath ->
            http.exceptionHandling { exceptionHandling ->
                exceptionHandling.authenticationEntryPoint(
                    RedirectServerAuthenticationEntryPoint(
                        UriComponentsBuilder.fromUri(
                            URI.create(serverProperties.bffClientServerUri)
                        ).path(loginPath).build().toString()
                    )
                )
            }
        }

        /* enable csrf */
        http.csrf { csrf ->
            csrf.csrfTokenRepository(serverCsrfTokenRepository)
            csrf.csrfTokenRequestHandler(spaCsrfTokenRequestHandler)
            csrf.requireCsrfProtectionMatcher(csrfProtectionMatcher)
        }

        /* configure cors */
        http.cors { cors ->
            cors.configurationSource(
                corsConfig.corsConfigurationSource()
            )
        }

        /* configure request cache */
        http.requestCache { cache ->
            cache.requestCache(reactiveRequestCache)
        }

        /* configure security context */
        http.securityContextRepository(redisSecurityContextRepository)

        /* configure session management */
        // this is also handled in the success handler, customConcurrentSessionControlSuccessHandler

        /* oauth2.0 client login */
        http.oauth2Login { oauth2 ->
            oauth2.authorizationRequestResolver(oauthAuthorizationRequestResolver)
            oauth2.authorizationRequestRepository(redisAuthorizationRequestRepository)

            oauth2.authorizationRedirectStrategy(serverPreAuthorizationCodeRedirectStrategy)
            oauth2.authenticationSuccessHandler(serverDelegatingAuthenticationSuccessHandler)
            oauth2.authenticationFailureHandler(serverOAuth2AuthenticationFailureHandler)

            oauth2.securityContextRepository(redisSecurityContextRepository)
            oauth2.clientRegistrationRepository(reactiveClientRegistrationRepository)
            oauth2.authorizedClientRepository(redisAuthorizedClientRepository)
            oauth2.oidcSessionRegistry(noopSpringDataOidcSessionsStrategy)
        }

        /* oauth2.0 client */
        http.oauth2Client {}

        /* configure logout (local session logout, then relying-party initiated logout) */
        http.logout { logoutSpec ->
            logoutSpec.logoutUrl(logoutProperties.LOGOUT_URL)
            logoutSpec.requiresLogout(ServerWebExchangeMatchers.pathMatchers(HttpMethod.GET, logoutProperties.LOGOUT_URL))
            logoutSpec.logoutHandler(logoutHandler)
            logoutSpec.logoutSuccessHandler(logoutSuccessHandler)
        }

        /* configure oidc backchannel-logout */
        // automatically creates: /logout/connect/back-channel/{registrationId}
        if(backChannelLogoutProperties.enabled) {
            http.oidcLogout { logout ->
                logout.backChannel { bc ->
                    bc.logoutUri(backChannelLogoutProperties.backChannelLogoutUri)
                }
                logout.clientRegistrationRepository(clientRegistrationRepository)
                logout.oidcSessionRegistry(noopSpringDataOidcSessionsStrategy)
            }
        }

        /* other filters */
        // apply security context filter at authenticaiton, and csrf filter after the logout handler
        // apply bff rate limiter filter (e.g. for health check endpoint)
        http.addFilterAt(securityContextFilter, SecurityWebFiltersOrder.AUTHENTICATION)
        http.addFilterAfter(bffServerRateLimiterFilter, SecurityWebFiltersOrder.AUTHENTICATION)
        http.addFilterAfter(csrfCookieWebFilter, SecurityWebFiltersOrder.LOGOUT)

        /* apply additional configurations */
        configurationExtra.configureClient(
            http,
            nettyServerProperties,
            corsProperties,
            permitAllProperties,
            authorizePostProcessor,
            httpPostProcessor
        )

        return http.build()
    }
}
// see here for more:
// https://docs.spring.io/spring-session/reference/web-session.html#websession-redis
// https://docs.spring.io/spring-session/reference/configuration/reactive-redis-indexed.html

/**
 * Establishes a Redis Connection Factory with comprehensive configuration options.
 *
 * Provides configuration for:
 * - Connection pooling and lifecycle management
 * - Cluster and standalone deployment modes
 * - SSL/TLS security for production environments
 * - DNS resolution and caching
 * - Performance tuning (thread pools, buffers, queues)
 * - High availability features (topology refresh, failover)
 *
 * The factory supports different deployment profiles:
 * - Production: Clustered Redis with SSL
 * - Development: Standalone Redis without SSL
 *
 * @property profileProperties Configuration properties for active deployment profile
 * @since 1.0.0
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val profileProperties: ProfileProperties
) {

    /**
     * Client resources for Redis connections.
     *
     * Manages shared resources including:
     * - Thread pools for I/O and computation
     * - DNS resolution and caching
     * - Command latency metrics
     * - Connection lifecycle
     *
     * This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
     * Using lateinit as the resources are created after Spring context initialization.
     */
    private lateinit var clientResources: ClientResources

    companion object {

        private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)

        /**
         * Timeout configurations for Redis operations.
         * - Command timeout: Maximum time for command execution
         * - Connect timeout: Maximum time for connection establishment
         * - Topology refresh: Interval for cluster topology updates
         * - Adaptive refresh: Time window for topology change detection
         */
        private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
        private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
        private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
        private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
        private const val SHUTDOWN_TIMEOUT_SECONDS = 10L
        private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 5L

        /**
         * Connection pool settings to optimize resource usage.
         * - Max total: Maximum number of connections in the pool
         * - Max/Min idle: Upper/Lower bounds for idle connections
         * - Max wait: Maximum time to wait for connection
         * - Eviction: Maintenance settings for idle connections
         */
        private const val MAX_TOTAL_CONNECTIONS = 100
        private const val MAX_IDLE_CONNECTIONS = 60
        private const val MIN_IDLE_CONNECTIONS = 20
        private const val MAX_WAIT_SECONDS = 30L

        // more conservative eviction settings
        private const val EVICTION_RUN_PERIOD_SECONDS = 120L
        private const val MIN_EVICTABLE_IDLE_MINUTES = 5L
        private const val NUM_TESTS_PER_EVICTION_RUN = 3

        /**
         * Performance optimization parameters.
         * - Decode buffer ratio: Memory allocation for response buffers
         * - Request queue size: Maximum pending requests
         * - Latency publish interval: Metrics publication frequency
         * - Thread pool multiplier: Scaling factor for I/O threads
         */
        private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
        private const val REQUEST_QUEUE_SIZE = 2500
        private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
        private const val IO_THREAD_POOL_MULTIPLIER = 2
    }

    /**
     * Configures Redis keyspace notifications behavior.
     *
     * Returns NO_OP action to disable automatic configuration of keyspace
     * notifications, preventing potential connection issues during shutdown
     * related to Pub/Sub connections in Spring Session.
     *
     * @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
     */
    @Bean
    fun configureRedisAction(): ConfigureRedisAction {
        return ConfigureRedisAction.NO_OP
    }

    /* LETTUCE - reactive RedisConnectionFactory */
    /**
     * Creates the primary reactive Redis connection factory.
     *
     * Configures a Lettuce-based connection factory with:
     * - Profile-specific Redis deployment mode (clustered/standalone)
     * - Connection pooling
     * - Client resources (thread pools, DNS resolution)
     * - SSL for production environments
     *
     * @param clusterProperties Cluster node configuration
     * @param springDataRedisProperties Redis connection properties
     * @return Configured [ReactiveRedisConnectionFactory]
     */
    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
        springDataRedisProperties: SpringDataRedisProperties,
    ): ReactiveRedisConnectionFactory {
        val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
        clientResources = createClientResources(springDataRedisProperties.host)
        val clientConfig = createLettuceClientConfig(
            clientResources,
            profileProperties.profileActive,
            springDataRedisProperties.type
        )

        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            setValidateConnection(false)
            setShareNativeConnection(true)
        }
    }

    /**
     * Properties class for Redis cluster configuration.
     *
     * @property nodes List of Redis nodes in format `host:port`
     */
    @Component
    internal class ClusterConfigurationProperties(
        springDataRedisProperties: SpringDataRedisProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataRedisProperties.host}:${springDataRedisProperties.port}",
        )
    }

    /**
     * Creates appropriate Redis configuration based on active profile.
     *
     * @param properties Redis connection properties
     * @param clusterProperties Cluster node configuration
     * @return [RedisConfiguration] for either clustered or standalone deployment
     */
    private fun createRedisConfiguration(
        properties: SpringDataRedisProperties,
        clusterProperties: ClusterConfigurationProperties
    ): RedisConfiguration = when {
        profileProperties.profileActive == ProfileTypes.PRODUCTION.type &&
                properties.type == RedisConfigTypes.CLUSTERED.type -> {

            // Redis Cluster for production
            RedisClusterConfiguration(clusterProperties.nodes).apply {
                password = RedisPassword.of(properties.password)
            }
        }
        else -> {

            // Redis Standalone for non-production
            RedisStandaloneConfiguration().apply {
                hostName = properties.host
                port = properties.port
                password = RedisPassword.of(properties.password)
            }
        }
    }

    /**
     * Creates client resources with optimized thread pools and DNS resolution.
     *
     * @param host Redis host for DNS resolution
     * @return Configured [ClientResources]
     */
    private fun createClientResources(host: String) = DefaultClientResources.builder()
        .ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
        .computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
        .socketAddressResolver(createCachingDnsResolver(host))
        .commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
        .commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
        .build()

    /**
     * Creates Lettuce client configuration with pooling and security settings.
     *
     * Configures:
     * - Read preferences (replica preferred)
     * - Command timeouts
     * - Connection pooling
     * - SSL (for production)
     *
     * @param clientResources Configured client resources for connection management
     * @param activeProfile Current deployment profile
     * @return Configured [LettucePoolingClientConfiguration]
     */
    private fun createLettuceClientConfig(
        clientResources: ClientResources,
        activeProfile: String?,
        redisConfigType: String?
    ): LettucePoolingClientConfiguration {
        val clusterClientOptions = createClusterClientOptions(activeProfile, redisConfigType)

        return LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            .commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())
            .shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
            .shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
            // conditionally use sslOptions if profileProperties.profileActive is 'prod'
            .apply {
                if (activeProfile == ProfileTypes.PRODUCTION.type) {
                    useSsl()
                }
            }
            .build()
    }

    /**
     * Creates cluster client options with comprehensive connection settings.
     *
     * Configures:
     * - Auto-reconnect behavior
     * - Connection validation
     * - Timeout settings
     * - Socket options
     * - Topology refresh
     * - Buffer and queue sizes
     * - SSL (for production)
     *
     * @param activeProfile Current deployment profile
     * @return Configured [ClusterClientOptions]
     */
    private fun createClusterClientOptions(activeProfile: String?, redisConfigType: String?): ClusterClientOptions {
        val builder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(createTimeoutOptions())
            .socketOptions(createSocketOptions())
            .topologyRefreshOptions(createTopologyRefreshOptions())
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
            .requestQueueSize(REQUEST_QUEUE_SIZE)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
            .publishOnScheduler(true)
            .protocolVersion(ProtocolVersion.RESP3)

        // conditionally use sslOptions if profileProperties.profileActive is 'prod'
        if (activeProfile == ProfileTypes.PRODUCTION.type) {
            builder.sslOptions(createSslOptions())
        }

        // conditionally use connected nodes if redisConfigType is 'clustered'
        if (redisConfigType == RedisConfigTypes.CLUSTERED.type) {
            builder.nodeFilter { node ->
                node.isConnected
            }
        }

        return builder.build()
    }

    /**
     * Creates socket options for Redis connections.
     *
     * Configures:
     * - Keep-alive settings
     * - TCP no-delay
     * - Connection timeouts
     *
     * @return Configured [SocketOptions]
     */
    private fun createSocketOptions() = SocketOptions.builder()
        .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
        .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
        .connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
        .build()

    /**
     * Creates timeout options for Redis commands.
     *
     * Configures:
     * - Fixed timeout duration
     * - Command timeout behavior
     *
     * @return Configured [TimeoutOptions]
     */
    private fun createTimeoutOptions() = TimeoutOptions.builder()
        .fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
        .timeoutCommands(true)
        .build()

    /**
     * Creates topology refresh options for Redis cluster.
     *
     * Configures:
     * - Periodic refresh intervals
     * - Dynamic refresh sources
     * - Stale connection handling
     * - Adaptive refresh triggers
     *
     * @return Configured [ClusterTopologyRefreshOptions]
     */
    private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
        .enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
        .dynamicRefreshSources(true)
        .closeStaleConnections(true)
        .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
        .enableAllAdaptiveRefreshTriggers()
        .build()

    /**
     * Creates connection pool configuration.
     *
     * Configures:
     * - Maximum total/idle connections
     * - Connection wait times
     * - Eviction policies
     * - Connection testing
     * - Pool behavior (LIFO/FIFO)
     *
     * @return Configured [GenericObjectPoolConfig]
     */
    private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
        maxTotal = MAX_TOTAL_CONNECTIONS
        maxIdle = MAX_IDLE_CONNECTIONS
        minIdle = MIN_IDLE_CONNECTIONS
        setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
        timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
        minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
        testOnBorrow = true
        testWhileIdle = true
        testOnReturn = false // <<-- ENSURE THIS IS KEPT AS FALSE FOR A.W.S. ELASTICACHE CLUSTERED or SERVERLESS
        blockWhenExhausted = true
        lifo = true
        jmxEnabled = false
        fairness = true
        evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
        numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
    }

    /**
     * Creates SSL options for secure Redis connections.
     *
     * Configures JDK-based SSL provider for Redis connections
     * in production environments.
     *
     * @return Configured [SslOptions]
     */
    private fun createSslOptions(): SslOptions = SslOptions.builder()
        .jdkSslProvider()
        .build()

    /**
     * Creates DNS resolver with caching capabilities.
     *
     * Implements:
     * - DNS resolution caching
     * - Hostname-to-IP mapping
     * - Fallback handling for resolution failures
     *
     * @param host Redis host to resolve
     * @return Configured [MappingSocketAddressResolver]
     */
    private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
        val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val addresses = dnsCache.computeIfAbsent(host) {
                try {
                    DnsResolvers.JVM_DEFAULT.resolve(host)
                } catch (e: UnknownHostException) {
                    emptyArray()
                }
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress
            if (hostAndPort.hostText == cacheIP) {
                HostAndPort.of(host, hostAndPort.port)
            } else {
                hostAndPort
            }
        }

        return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
    }

}

@Clete2
Copy link

Clete2 commented Feb 24, 2025

@dreamstar-enterprises @EnableRedisIndexedWebSession is the reason.

I read the code that sets up the session configuration. In 2.7, both annotations (indexed and regular) use pSubscribe to subscribe to events. In 3.x, only indexed annotation creates subscriptions.

You should read this to determine if you need the indexed repository: https://docs.spring.io/spring-session/reference/getting-started/using-redis.html#choosing-between-regular-and-indexed

If you need the indexed repository, I am afraid you have no choice but to move to clustered mode. For us, we are fortunate that we don't need the indexed repository, nor do we utilize lifecycle events.

Edit:

As you can see, in 3.x, the RedisMessageListenerContainer, which creates subscriptions, is only defined when using EnableRedisIndexedHttpSession, but in 2.x there is no choice between indexed vs non-indexed. There was only one annotation back then.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

4 participants