Skip to content

[fix] Don't use ForkJoinPool when using CompletableFuture.thenXXXAsync #24383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

dao-jun
Copy link
Member

@dao-jun dao-jun commented Jun 5, 2025

Motivation

Currently, in some cases when don't pass an executor to CompletableFuture.thenComposeAsync/thenApplyAsync/thenAcceptAsync, which will use ForkJoinPool.commonPool.

Which maybe lead to 3 problems:

  1. ForkJoinPool.commonPool is managed by JVM, it is out of Pulsar manage.
  2. The default thread num of ForkJoinPool.commonPool is Math.max(1, Runtime.getRuntime().availableProcessors() - 1), if the broker is in heavy workload, it's not enough.
  3. The callback methods can be executed in different thread everytime when we call thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@dao-jun dao-jun self-assigned this Jun 5, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 5, 2025
@@ -307,11 +307,13 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
future = future.thenComposeAsync(unused ->
future = future.thenCompose(unused ->

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only change this.

@@ -307,11 +307,13 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
getAuthorizationService().revokePermissionAsync(topicNamePartition, role));
getAuthorizationService().revokePermissionAsync(topicNamePartition, role),
pulsar().getOrderedExecutor().chooseThread(role));
}
}
return future.thenComposeAsync(unused ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return future.thenComposeAsync(unused ->
return future.thenCompose(unused ->

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes that should be avoided is to replace then*Async calls with then* calls. The reason for this is that in many cases there's a clear reason why then*Async has been used. By default, CompletableFutures get completed on the thread that completes it. When metadata service is where it's completed, the completion chain will be executed on the metadata service thread.

I'd rather see the then*Async calls use a thread pool that has more than 1 thread. There's a high risk for dead locks if changes are made to use a single threaded executor when most environments run with at least 2 cores.

ForkJoinPool has some logic to add more threads when it detects that another thread is blocked. That's also why it's used by default for CompletableFuture.then*Async methods since it will avoid deadlocks this way.

@lhotari
Copy link
Member

lhotari commented Jun 5, 2025

3. The callback methods can be executed in different thread everytime when we call thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.

As long as the mutations don't happen after an object has been passed through a completable future, the receiving thread will observe the same state of the object. A lot of Pulsar's "thread safety" relies on this at the moment.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Jun 5, 2025

The reason for this is that in many cases there's a clear reason why then*Async has been used.

+1.

Generally, thenXxxAsync has poor performance due to the thread switching. But it's used to prevent the callback from blocking the completion thread too long. Take this PR's change for example:

        return provider.isSuperUser(role, authenticationData, conf).thenCompose(isSuperUser -> {
            if (isSuperUser) {
                return CompletableFuture.completedFuture(true);
            } else {
                return provider.canLookupAsync(topicName, role, authenticationData);
            }
        });

The implementation of provider.canLookupAsync could be a customized one. If there is a bug, canLookupAsync might block the thread for some time (usually the metadata store's I/O thread). What's worse, it might cause deadlock (e.g. calling a metadata store operation synchronously in the metadata store's callback).

Regarding other places that replace the ForkJoinPool with Pulsar's internal executors, it depends.

ForkJoinPool.commonPool is managed by JVM, it is out of Pulsar manage.

Executors managed by Pulsar are not guaranteed to be better than ForkJoinPool. Actually, ForkJoinPool should be better than Pulsar's existing thread model. Almost all executor services managed by Pulsar have fixed threads like FixedThreadPool. Once a thread has too many pending tasks, they have to wait previous task to be done, even if there is another IDLE thread.

The callback methods can be executed in different thread everytime when we call thenComposeAsync/thenApplyAsync/thenAcceptAsync, it maybe lead to potential thread safety issue.

It's better to give a concrete example to show how could it result a thread safety issue. Make sure you know the happens-before relationship in Java.

@zhanghaou
Copy link
Contributor

When the ForkJoinPool becomes blocked or runs into deadlock-like behavior, it can stall all dependent tasks. New tasks get stuck in the work queue, leading to widespread performance degradation.

Additionally, ForkJoinPool is designed for workloads that can be recursively split into independent subtasks. If the task isn't inherently decomposable, using ForkJoinPool won't improve performance and may introduce unnecessary overhead.

I think it's reasonable to replace the ForkJoinPool, but going with a single-threaded pool is risky.

@BewareMyPower
Copy link
Contributor

If the task isn't inherently decomposable, using ForkJoinPool won't improve performance and may introduce unnecessary overhead.

It's only one typical case. From the official document:

as well as when many small tasks are submitted to the pool from external clients.

A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

Most tasks from Pulsar are such small tasks in a chain, which are usually starting an asynchronous task.

var future = runTask1().thenCompose(result1 -> runTask2(result1)).thenApply(result2 -> runTask3(result2);
future.whenComplete((result, e) -> {
    if (e == null) {
        future2.complete(result);
    } else {
        future2.completeExceptionally(e);
    }
});

Then future2 might trigger the callback somewhere else.

For a specific example, the transactionExecutorProvider whose number of threads is determined by numTransactionReplayThreadPoolSize, which is the number of processors by default.

  1. It does never make sense to use the number of processors by default. I guess it's the default value just because everyone else does that.
  2. This executor executes many small tasks, even including when a future is completed.
    private void init() {
        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
                .thenAcceptAsync(init -> {
                    if (init) {
                        initPendingAckStore();
                    } else {
                        completeHandleFuture();
                    }
                }, internalPinnedExecutor)

The caller side should be responsible to switch the thread to continue, not from the internal code.

@zhanghaou
Copy link
Contributor

1、Although, as stated in the official documentation, ForkJoinPool provides benefits such as thread reuse and resource efficiency (e.g., automatic thread reclamation), in certain scenarios, it may actually lead to performance degradation.
This is because when a task is submitted to the ForkJoinPool, if the previously used threads have already been reclaimed, the system may need to recreate those threads, introducing additional overhead.

2、We cannot guarantee that all tasks submitted to the ForkJoinPool are lightweight and non-blocking.
In practice, for example within Pulsar's Protocol Handler or Broker Interceptor, developers are free to implement custom logic that may execute tasks using ForkJoinPool. This unrestricted usage increases the risk of introducing blocking operations, which can eventually saturate or block the entire pool.

3、Using a dedicated thread pool results in clearer code structure, allowing developers to easily identify which thread pool is executing the current task.This not only improves code maintainability but also facilitates debugging and performance tuning.

4、When the ForkJoinPool encounters blocking issues, it becomes difficult to diagnose.
For instance, in a jstack output, some threads may appear to be in a WAITING state, while they are actually blocked by tasks. This kind of "false idleness" can be misleading during diagnosis and significantly increases the difficulty of identifying the root cause.

@nodece
Copy link
Member

nodece commented Jun 13, 2025

By default, CompletableFuture executes continuations (thenApply, thenCompose, etc.) on the same thread that completes the future. In Pulsar, this can sometimes be a critical thread, such as a metadata service thread. If we're not careful, this behavior can lead to overloaded threads on hot paths and introduce performance regressions.

That said, in Pulsar we typically avoid switching threads unless it's explicitly necessary. We prefer to execute continuation logic on the callback thread, which helps reduce context switching and improves CPU efficiency. However, this also means we need to be mindful of how much work we're doing in these continuations, especially if the callback thread is part of a limited thread pool.

Using then*Async with the default ForkJoinPool.commonPool introduces behavior that is outside Pulsar's control and can be problematic under load. If thread switching is needed, it’s better to specify a Pulsar-managed executor to maintain better control explicitly and avoid contention with unrelated workloads.

@BewareMyPower
Copy link
Contributor

If thread switching is needed, it’s better to specify a Pulsar-managed executor

The issue is, where should a Pulsar-managed executor be used is very ambiguous.

I agree that there is no silver bullet, so we must be careful for which thread pool to use. This PR title is misleading that it thinks the built-in common ForkJoinPool is harmful. But for short tasks that need thread switching, it's proper.

It's reasonable to switch thenApplyAsync(f) to thenApply(f) or thenApply(f, existingExecutor) in some places. But please explain it with a convincing reason, not just "ForkJoinPool is worse than Pulsar managed thread pools". Just like I've mentioned, many Pulsar managed thread pools are not really taken enough care of. Much code is written without any careful thinking.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Jun 13, 2025

Therefore, I'd like to talk about the use of thenXXXAsync or thenXXXAsync case by case.

Take the case I've mentioned before:

We should replace thenAcceptAsync with thenAccept here. It should be the caller's responsibility to determine whether to switch the thread. Switching the thread internally adds unnecessary thread switching cost.

However, it's hard to know which thread will complete the future of checkInitializedBefore, so a safer solution is to change

return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {

to thenComposeAsync. The callback calls a synchronized code block that acquires the lock of PersistentSubscription. The question becomes, which thread pool should be used? From the purpose of avoiding deadlocks, the default common pool could be best because Pulsar uses its internal executors to execute some tasks, which could never have a chance to lead to a deadlock with the common ForkJoinPool. Though the code block is synchronized, the logic could never be blocking.

BTW, I see Pulsar's default executor is used here:

Unfortunately, it's really ambiguous that where should PulsarService#executor be used. I can only see people just choosing an available thread pool as the 2nd argument of thenXXXAsync randomly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants