Skip to content

[improve][broker] Support consumer side delayed messages #24372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Denovo1998
Copy link
Contributor

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

When consumers encounter transient failures while processing messages, a common requirement is to retry processing after a certain delay. Before this feature, developers typically resorted to:

  1. Using Retry/Dead Letter Topics (DLQ): Consumers would republish failed messages to a dedicated retry topic. This approach suffered from write amplification (message produced again) and read amplification (additional consumption from the retry topic), and increased architectural complexity.
  2. Application-Level Custom Delay Logic: Implementing delay logic using external components (e.g., databases, scheduling frameworks) significantly increased system complexity and external dependencies.

These existing solutions introduced varying degrees of overhead, complexity, or inflexibility. This change aims to provide a native, efficient mechanism for consumer-side delayed message redelivery.

Modifications

This change introduces the capability for Pulsar consumers to negatively acknowledge (Nack) messages with a specified custom delay. The key modifications include:

  1. Client API Extension:

    • New methods negativeAcknowledge(MessageId messageId, long delay, TimeUnit unit) and negativeAcknowledge(Message<?> message, long delay, TimeUnit unit) have been added to the org.apache.pulsar.client.api.Consumer interface.
  2. Protocol Enhancement:

    • The CommandRedeliverUnacknowledgedMessages protobuf command has been augmented with an optional delay_at_time (uint64) field. This field carries the absolute timestamp at which the message is expected to be redelivered.
  3. Broker-Side Core Logic Adjustments:

    • ServerCnx: Updated to recognize and process the delay_at_time field in CommandRedeliverUnacknowledgedMessages.
    • Consumer / Subscription / Dispatcher:
      • Relevant interfaces and implementations have been extended to handle redelivery requests with a delayAtTime.
      • PersistentDispatcherMultipleConsumers (and its classic variant) now leverage the topic-level DelayedDeliveryTracker to manage Nacked messages with a specified delay. The message's position and the target delayAtTime are added to the tracker, which triggers redelivery when the specified time is reached.
    • This feature is primarily effective for Shared and Key_Shared subscription types to provide precise delay semantics.

With these modifications, consumers can directly request the broker to redeliver a message after a specific delay, leading to:

  • Elimination of write and read amplification: No need to republish messages to other topics.
  • Simplified application architecture: Removes the need for external delay mechanisms or complex application-level logic.
  • Fine-grained control: Allows consumers to dynamically set retry delays per message based on specific conditions.
  • Enhanced resource efficiency: Leverages the broker's internal delayed message delivery mechanism.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Jun 2, 2025
@Denovo1998
Copy link
Contributor Author

Currently, there seem to be some issues with this implementation. (Note, this PIP only discusses the implementation of BucketDelayedDeliveryTracker)

If a delayed message in the LastMutableBucket has not been flushed to the Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, the messages will still be read from the MackDelete position onwards, and the Bucket will be rebuilt. This is why the data in the Bucket can be deleted as long as it is read (without requiring client Ack).

If we send a command to add a delayed message from the consumer side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the LastMutableBucket, and the Broker crashes, the command will be lost. We cannot wait until the Seal Bucket condition is triggered to return the delayed message command cancellation success, because we do not know how long it will take.

This part is very important and needs to be resolved. I will think about how to solve this part, and everyone is welcome to discuss it with me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant