Skip to content

[feat] Support get retained message count from MessageMetadata #24431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

gaoran10
Copy link
Contributor

Motivation

Currently, if the broker wants to determine the valid batch index in a compacted entry, it can only calculate it by examining the compactedOut flag in SingleMessageMetadata. Pulsar supports different protocols, if other protocols doesn't add SingleMessageMetadata for each message in a batch, such as Kafka protocol handler may build the payload in Kafka format.

Modifications

Provide a way to retrieve the retained message count from MessageMetadata.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@gaoran10 gaoran10 self-assigned this Jun 19, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 19, 2025
@gaoran10 gaoran10 closed this Jun 19, 2025
@gaoran10 gaoran10 reopened this Jun 19, 2025
@gaoran10 gaoran10 requested review from coderzc and lhotari June 19, 2025 10:51
@codelipenghui codelipenghui added this to the 4.1.0 milestone Jun 19, 2025
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to add this util method to the upstream. You can implement the same function if you have pulsar-broker as the dependency.

@BewareMyPower
Copy link
Contributor

I mean, this PR only adds a static method calculateTheLastBatchIndexInBatch, which does not depend on any private method.

I don't see any extra change in ServerCnx, did you miss something?

@BewareMyPower BewareMyPower dismissed their stale review June 20, 2025 03:44

I see it changes the semantic of getLargestBatchIndexWhenPossible

Comment on lines +161 to +167
if (RETAINED_MESSAGE_COUNT_PROPERTY.equals(kv.getKey())) {
try {
return Integer.parseInt(kv.getValue());
} catch (NumberFormatException e) {
// Ignore and return default value
return -1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this hacky behavior should be avoided. It gives a new meaning that if the value of "RetainedMessageCount" property is a valid integer, the value will represent the number of messages in the batch.

Comment on lines -2472 to -2478
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
singleMessagePayload.release();
if (singleMessageMetadata.isCompactedOut()){
continue;
}
lastBatchIndexInBatch = i;
Copy link
Contributor

@BewareMyPower BewareMyPower Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key point is here assumes the compaction batch should have SingleMessageMetadata for each single message. The behavior was introduced from #18877. However, it requires the message payload can always have a serialized SingleMessageMetadata if the batch size is greater than 1, which could be too strict.

This comment also makes sense: https://github.com/apache/pulsar/pull/18877/files#r1165000498

@@ -35,6 +35,7 @@ public abstract class Compactor {
private static final Logger log = LoggerFactory.getLogger(Compactor.class);
public static final String COMPACTION_SUBSCRIPTION = "__compaction";
public static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
public static final String RETAINED_MESSAGE_COUNT_PROPERTY = "RetainedMessageCount";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a concept of Pulsar?
Will the client side set the value for this property?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The background is complicated. I'm writing a proposal to fix the root cause.

This draft implementation might help you understand: https://github.com/BewareMyPower/pulsar/pull/42/files#diff-f2b074eb97e870ec15676e349d61de41fc94067e2b01e86335d05b4bc4019407

i.e. the customized compaction service could use a property to determine which messages are compacted out rather than checking the compactedOut field. The root cause is that each entry is guaranteed to have a MessageMetadata, even the message was produced via the protocol handler, but it's not guaranteed to have a SingleMessageMetadata.

You can also wait for my proposal (PIP-429) to be available

@coderzc
Copy link
Member

coderzc commented Jun 23, 2025

If we purpose are to ensure that the compaction task is successful, we only need to check lastCompactedPosition and lastDispatchablePosition beforen trigger compaction task. If lastDispatchablePosition <= lastCompactedPosition, then I just need to skip the compact task because there is no new data need to be compacted.

If we need make the Pulsar reader to read Kafka format data, then we need this change or Introduce a new client API(PIP-429).

@gaoran10 gaoran10 closed this Jun 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants