Description
Motivation
There are many users who need to use tag messages. The implementation of
this part has also been discussed before:
We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not.
In this way, not only tag messages can be supported, users can also use this extension point to implement other features.
Goal
In this PIP, we only implement the entry filter framework at the broker level, and we will continue to support the namespace and topic level in other PR.
API Changes And Implementation
Broker
add option entryFilterClassName
in broker.conf and ServiceConfiguration, default value is null.
Add a factory class, if the entryFilterClassName
is set, the factory class will be created, then init the filter
public interface EntriesFilterProvider {
// Use `EntriesFilterProvider` to create `EntriesFilter` through reflection
EntriesFilter createEntriesFilter(Subscription subscription);
// The default implementation class of EntriesFilterProvider
public DefaultEntriesFilterProviderImpl implements EntriesFilterProvider {
public DefaultEntriesFilterProviderImpl(ServiceConfiguration serviceConfiguration) {
//...
}
EntriesFilter createEntriesFilter(Subscription subscription) { //… }
}
}
Add some new interfaces to avoid relying on existing interfaces inside Pulsar. In this way, even if the internal interface changes in the future, the plugin can continue to run
// The interface that needs to be implemented in the plugin
public interface EntryFilter {
public FilterResult filterEntries(Entry entry, FilterContext context);
public static class FilterContext {
EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor,
boolean isReplayRead,
MessageMetadata msgMetadata
...
}
enum FilterResult {
ACCEPT, // deliver to the Consumer
REJECT, // skip the message
// ...
}
}
Why filter a single entry instead of List ?
The existing Dispatchers already traverse the entry list and parse the metadata of each Entry. We can reuse this data to avoid traversing the entry list twice and parsing the metadata twice.
Call the plugin in AbstractBaseDispatcher#filterEntriesForConsumer
if(entryFilter != null) {
FilterResult result = filterEntries(entry, FilterContext context);
if(FilterResult.REJECT.equals(result)) {
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(entry);
continue;
}
}
Client
Add optional attribute subscriptionProperties
to the subscription, which can be set when subscribing to topic. These properties cannot be modified. We can only delete the subscription and create it again.
a. This attribute needs to be added to the subscribe command
b. For persistent subscriptions, these properties should stored in ZooKeeper with subscription
c. REST API (create-subscription) should also support setting subscriptionProperties
Example:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscriptionProperties(new HashMap<String, String>)
.subscribe();
Reject Alternatives
-
Plug-in Dispatcher
If we make this pluggable that we must define a limited private but
"stable" API. Enrico's suggestion is to define particular needs and then add features to make
pluggable single specific parts of the dispatcher. -
Create a message dimension filter
At this stage, we can only do Entry-level filtering. If the Message in the Entry is forced to be filtered on the Broker side, there will be problems in the subsequent consumer ack.
Therefore, if we want to use this filter, we must set enableBatching=false, which is the same as delayed messages.