Skip to content

[fix][broker] Fix the non-persistenttopic's replicator always get error "Producer send queue is full" if set a small value of the config replicationProducerQueueSize #24424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jun 17, 2025

Motivation

  • There is a configuration named replicationProducerQueueSize, limiting the rate of replication.
  • PersistentReplicator has implemented the rate limitation.
  • Issue: NonPersistentReplicator has not implemented it, but the configuration affects NonPersistentReplicator, leading to the below error if users set a small value of replicationProducerQueueSize
2025-06-13T11:53:28,121+0000 [pulsar-io-6-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [non-persistent://xxx/xxx/xxx-partition-0 | c1-->c2] Error producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) ~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) ~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) ~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) ~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) ~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
2025-06-13T11:53:28,458+0000 [pulsar-io-6-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [non-persistent://cvc/cvc-publisher/health-readiness-partition-0 | bdrck-cvc-pulsar-live-us-east4-->bdrck-cvc-pulsar-live-us-central1] Error producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942) ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222) ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]

Modifications

  • Make replicationProducerQueueSize does not affect NonPersistentReplicator.
  • Rename the logger name of NonPersistentReplicator: PersistentReplicator -> NonPersistentReplicator

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Jun 17, 2025
@poorbarcode poorbarcode self-assigned this Jun 17, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 17, 2025
poorbarcode and others added 2 commits June 19, 2025 10:31
…onpersistent/NonPersistentReplicator.java

Co-authored-by: Penghui Li <[email protected]>
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 28.57143% with 5 lines in your changes missing coverage. Please review.

Project coverage is 74.25%. Comparing base (bbc6224) to head (ca8e09a).
Report is 1158 commits behind head on master.

Files with missing lines Patch % Lines
...service/nonpersistent/NonPersistentReplicator.java 28.57% 5 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24424      +/-   ##
============================================
+ Coverage     73.57%   74.25%   +0.67%     
- Complexity    32624    32698      +74     
============================================
  Files          1877     1868       -9     
  Lines        139502   145403    +5901     
  Branches      15299    16637    +1338     
============================================
+ Hits         102638   107964    +5326     
+ Misses        28908    28876      -32     
- Partials       7956     8563     +607     
Flag Coverage Δ
inttests 26.86% <28.57%> (+2.27%) ⬆️
systests 23.33% <0.00%> (-0.99%) ⬇️
unittests 73.74% <28.57%> (+0.90%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ache/pulsar/broker/service/AbstractReplicator.java 67.96% <ø> (-17.04%) ⬇️
...ar/client/impl/conf/ProducerConfigurationData.java 85.29% <ø> (-6.48%) ⬇️
...service/nonpersistent/NonPersistentReplicator.java 63.80% <28.57%> (+0.44%) ⬆️

... and 1085 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.13 release/3.3.8 release/4.0.6 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants