Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error retrieving state store during graceful shutdown #3067

Open
cedric-schaller opened this issue Jan 15, 2025 · 2 comments
Open

Error retrieving state store during graceful shutdown #3067

cedric-schaller opened this issue Jan 15, 2025 · 2 comments

Comments

@cedric-schaller
Copy link

Scenario

Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the InteractiveQueryService.

Problem

Unfortunately, during a graceful shutdown, ongoing requests lead to the following error:

java.lang.IllegalStateException: Error retrieving state store: my-global-store-name-v0
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$1(InteractiveQueryService.java:153)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:217)
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:103)
    ...

Analysis

Our analysis showed that this error is caused by the fact that WebServerGracefulShutdownLifecycle, which handles the graceful shutdown, has a phase with value Integer.MAX_VALUE - 1024 while StreamsBuilderFactoryManager has a phase with value Integer.MAX_VALUE - 100. This means that StreamsBuilderFactoryManager is shut down before the graceful shutdown is initiated, so that ongoing requests processed as part of the graceful shutdown no longer have access to the global state store.

This is indeed visible in the logs:

2025-01-08 17:04:16.225 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147483547]
...
2025-01-08 17:04:16.572 DEBUG  org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl [Closing global storage engine my-global-store-name-v0]
...
2025-01-08 17:04:16.577 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Bean 'streamsBuilderFactoryManager' completed its stop procedure]
...
2025-01-08 17:04:16.584 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147482623]
2025-01-08 17:04:16.584 INFO   org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Commencing graceful shutdown. Waiting for active requests to complete]
2025-01-08 17:04:16.587 INFO   org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Graceful shutdown complete]

The Javadoc of StreamsBuilderFactoryManager highlights the fact that choosing a phase close to Integer.MAX_VALUE was a conscious decision:

* This {@link SmartLifecycle} class ensures that the bean created from it is started very
* late through the bootstrap process by setting the phase value closer to
* Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* function with multiple bindings is only started after all the binding phases have completed successfully.

Also, the constant AbstractMessageListenerContainer.DEFAULT_PHASE, which is not used by the StreamsBuilderFactoryManager but which has the same value of Integer.MAX_VALUE - 100, makes the following claim (which unfortunately I was unable to verify):

// The default org.springframework.context.SmartLifecycle phase for listener containers 2147483547.

As a final consideration, StreamsBuilderFactoryManager stops StreamsBuilderFactoryBeans in its stop() method (which takes place at phase Integer.MAX_VALUE - 100), but StreamsBuilderFactoryBean itself implements SmartLifecycle and its phase is defined to be Integer.MAX_VALUE - 1000. So we are wondering whether:

  1. both mechanisms are necessary?
  2. if so, do they need to have a different lifecycle phase?

Expected behavior

Kafka Streams (and state stores in our specific case) need to be available during the graceful shutdown phase and hence StreamsBuilderFactoryManager (and potentially StreamsBuilderFactoryBean) should have a phase which is lower than WebServerGracefulShutdownLifecycle.

Once the questions above have been clarified and the design is clear, I am happy to provide a fix.

To reproduce

Steps to reproduce the behavior:

  1. Define a state store
spring.cloud.stream.bindings.some-store-in-0.destination: some-topic
spring.cloud.stream.kafka.streams.some-store-in-0.consumer.materialized.as: some-store-v0
  @Bean
  public Consumer<GlobalKTable<String, SomeClass>> someStore() {
    ...
  }
  1. Have a REST controller access the state store via a service
private final InteractiveQueryService queryService;
...
ReadOnlyKeyValueStore<String, LockEvent> store = queryService.getQueryableStore("some-store-v0", QueryableStoreTypes.keyValueStore());
  1. Initiate a graceful shutdown while under load, so that requests are being processed while shutting down. Log statements similar to the ones above will be generated.

Version of the framework

Spring Boot 3.4.1 (current latest) and spring-cloud-stream 4.2 (current latest)

@sobychacko
Copy link
Contributor

@cedric-schaller Is there a consistent way to reproduce the issue? Can you create a small application for us to triage it further?

@cedric-schaller
Copy link
Author

@sobychacko Depending on what you want to achieve with this reproduction, I see 3 options:

  1. If your intention is to reproduce the log statements demonstrating that the state store is closed before the graceful shutdown is initiated, this is feasible.
  2. If you would also like to reproduce the IllegalStateException manually, I might be able to create a controller with a Thread.sleep() in it, so that a manually initiated GET request takes time to be answered, which would allow you to manually shut down the application just after the request has been sent and to thereby initiate the graceful shutdown process.
  3. If you need a fully automated integration test to reproduce the issue, then I think that would be pretty challenging (if not impossible?) and I must admit I would not know off the top of my head how to shut down the container as part of @SpringBootTest.

Please let me know what your thoughts on the matter are.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants