Skip to content

[fix][broker]fix memory leak, messages lost, incorrect replication state if using multiple versions schema #24178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Apr 14, 2025

Motivation

Issue 1: memory leak when publishing messages with a broken schema

  • Pulsar client will discard messages which has a broken schema, but it never releases the message when discarding the message.
  • You can reproduce the issue with the new test testBrokenSchema.

Issue 2: incorrect replication/producer state

  • The internal producer of the replicator connected.
  • The producer tries to register a new schema, and the state was changed: Ready -> RegisteringSchema.
  • The topic's stats response shows there is a producer connected, but the value of replication.connected or producer.connected shows false
  • You can reproduce the issue with the new test testProducerConnectStateWhenRegisteringSchema

Issue 3: replication lost messages or is out of order

time / task internal producer of replicator
1 send async msg1 with a compatible schema
2 send asycn msg2 with a incompatible schema
3 send async msg1 with a compatible schema

Result:

  • msg1 was sent
  • msg2 was discarded due to incompatible schema
  • msg3 was sent

Issue 4: Reused a recycled SendCallback, which causes a dangerous issue

time / task client: publish with a broken schema broker-side: handle schema broker-side: disconnect client: close producer
1 Sends new schema to Broker
2 received the request of new schema registration
3 switch to metadata store threads or Bookie client threads
4 Disconnect producers due to the unloading topic or others, but the socket has not been closed yet
5 start to close producer
6 respond to client: Broken schema
7 Calls "op.callback.sendComplete()", but op is still in producer.pendingMessages[1]
8 op.callback was recycled`
9 client-side: disconnected
10 Calls failPendingMessages, which provides a failed callback for all pending messages

At step 10, the producer will call a failed callback on a recycled SendCallback which has been recycled at step 8, but the object SendCallback may be used by others, which will cause unexpected and dangenrous issues.

[1] https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L894

    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) {
        SchemaInfo schemaInfo = ...;
        getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
            if (ex instanceof PulsarClientException.IncompatibleSchemaException) {
                msg.setSchemaState(MessageImpl.SchemaState.Broken);
                callback.sendComplete(t, null); // This line calls "callback.sendComplete", but the callback was still related to "producer.pendingMessages"
            }
            ...
        });
    }

You can reproduce issues 3 and 4 with the new test testIncompatibleMultiVersionSchema , and you will get following various errors, but the test is not in order to reproduce a special case.

2025-04-14T12:35:08,452 - WARN  - [pulsar-io-64-13:AbstractEventExecutor] - A task raised an exception. Task: org.apache.pulsar.client.impl.ProducerImpl$$Lambda$1827/0x0000000500c1ba58@1d15c41a
java.lang.IllegalStateException: Some required fields are missing
	at org.apache.pulsar.common.api.proto.MessageMetadata.checkRequiredFields(MessageMetadata.java:1378) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.writeTo(MessageMetadata.java:945) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.serializeCommandSendWithSize(Commands.java:1705) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.newSend(Commands.java:585) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.newSend(Commands.java:532) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.sendMessage(ProducerImpl.java:970) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$serializeAndSendMessage$3(ProducerImpl.java:814) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.recoverProcessOpSendMsgFrom(ProducerImpl.java:2449) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$tryRegisterSchema$6(ProducerImpl.java:912) ~[classes/:?]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:173) ~[netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java) ~[netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:166) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) [netty-transport-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
...
...
2025-04-14T12:35:10,843 - WARN  - [pulsar-web-99-14:ProducerImpl] - [persistent://public/always-compatible/tp_-aa24e8d7-94b0-41fa-84df-e747895cafc6] [pulsar.repl.r1-->r2] Got exception while completing the callback for msg 1:
java.lang.NullPointerException: Cannot read field "replicatorId" because "x0" is null
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator.access$000(PersistentReplicator.java:75) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator$ProducerSendCallback.sendComplete(PersistentReplicator.java:401) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1658) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:2225) ~[classes/:?]
	at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:888) ~[?:?]
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1749) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:2215) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.closeAndClearPendingMessages(ProducerImpl.java:1219) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.closeAsync(ProducerImpl.java:1185) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractReplicator.doCloseProducerAsync(AbstractReplicator.java:385) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractReplicator.terminate(AbstractReplicator.java:407) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$close$52(PersistentTopic.java:1684) ~[classes/:?]
	at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1684) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1617) ~[classes/:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadNonPartitionedTopicAsync$115(PersistentTopicsBase.java:1124) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadNonPartitionedTopicAsync(PersistentTopicsBase.java:1124) ~[classes/:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$97(PersistentTopicsBase.java:947) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$99(PersistentTopicsBase.java:913) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadTopic(PersistentTopicsBase.java:903) ~[classes/:?]
	at org.apache.pulsar.broker.admin.v2.PersistentTopics.unloadTopic(PersistentTopics.java:1134) ~[classes/:?]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:359) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:312) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.apache.pulsar.broker.web.WebService$FilterInitializer$WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter.doFilter(WebService.java:336) ~[classes/:?]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[jetty-servlets-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:722) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
2025-04-14T12:35:10,843 - ERROR - [pulsar-web-99-14:PersistentReplicator] - [persistent://public/always-compatible/tp_-aa24e8d7-94b0-41fa-84df-e747895cafc6 | r1-->r2] Error producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: The producer pulsar.repl.r1-->r2 of the topic persistent://public/always-compatible/tp_-aa24e8d7-94b0-41fa-84df-e747895cafc6 was already closed when closing the producers
	at org.apache.pulsar.client.impl.ProducerImpl.closeAndClearPendingMessages(ProducerImpl.java:1216) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.closeAsync(ProducerImpl.java:1185) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractReplicator.doCloseProducerAsync(AbstractReplicator.java:385) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractReplicator.terminate(AbstractReplicator.java:407) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$close$52(PersistentTopic.java:1684) ~[classes/:?]
	at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1684) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1617) ~[classes/:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadNonPartitionedTopicAsync$115(PersistentTopicsBase.java:1124) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadNonPartitionedTopicAsync(PersistentTopicsBase.java:1124) ~[classes/:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$97(PersistentTopicsBase.java:947) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$99(PersistentTopicsBase.java:913) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadTopic(PersistentTopicsBase.java:903) ~[classes/:?]
	at org.apache.pulsar.broker.admin.v2.PersistentTopics.unloadTopic(PersistentTopics.java:1134) ~[classes/:?]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[jersey-common-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684) ~[jersey-server-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:359) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:312) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[jersey-container-servlet-core-2.42.jar:?]
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.apache.pulsar.broker.web.WebService$FilterInitializer$WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter.doFilter(WebService.java:336) ~[classes/:?]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[jetty-servlets-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[jetty-servlet-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:722) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [jetty-server-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [jetty-io-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) [jetty-util-9.4.56.v20240826.jar:9.4.56.v20240826]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.119.Final.jar:4.1.119.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
2025-04-14T12:35:10,843 - INFO  - [pulsar-web-99-14:ManagedCursorImpl] - [public/always-compatible/persistent/tp_-aa24e8d7-94b0-41fa-84df-e747895cafc6-pulsar.repl.r2] Rewind from 2:1 to 2:1
...
...
// A incorrect schema was registered
2025-04-14T12:35:16,257 - WARN  - [pulsar-client-io-187-3:HttpClient] - [http://localhost:59689/admin/v2/schemas/public/always-compatible/tp_-23441596-5a6b-4ea4-9723-1dd23f7cea43/schema/1] HTTP get request failed: Schema not found
2025-04-14T12:35:16,257 - INFO  - [main:AutoConsumeSchema] - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x01 for topic persistent://public/always-compatible/tp_-23441596-5a6b-4ea4-9723-1dd23f7cea43 : 
2025-04-14T12:35:16,257 - INFO  - [main:OneWayReplicatorTest] - received msg: [B@29caf222

Modifications

  • Fix the 3 issues that were described in Motivation
  • Print schemas in one line instead of pretty printing, which is helpful for searching and filtering.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/3.0.12 release/3.3.7 release/4.0.5 labels Apr 14, 2025
@poorbarcode poorbarcode added this to the 4.1.0 milestone Apr 14, 2025
@poorbarcode poorbarcode self-assigned this Apr 14, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 14, 2025
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode force-pushed the fix/multiple_version_schema_publish branch from af91444 to 30a488c Compare April 16, 2025 02:13
@poorbarcode poorbarcode reopened this Apr 16, 2025
@codecov-commenter
Copy link

codecov-commenter commented Apr 17, 2025

Codecov Report

Attention: Patch coverage is 71.79487% with 22 lines in your changes missing coverage. Please review.

Project coverage is 74.28%. Comparing base (bbc6224) to head (8c51582).
Report is 1029 commits behind head on master.

Files with missing lines Patch % Lines
...va/org/apache/pulsar/client/impl/ProducerImpl.java 78.46% 10 Missing and 4 partials ⚠️
...pulsar/client/impl/GeoReplicationProducerImpl.java 0.00% 4 Missing and 1 partial ⚠️
.../apache/pulsar/client/impl/schema/SchemaUtils.java 60.00% 1 Missing and 1 partial ⚠️
...nt/impl/PulsarClientImplementationBindingImpl.java 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24178      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.70%     
+ Complexity    32624    32161     -463     
============================================
  Files          1877     1865      -12     
  Lines        139502   144644    +5142     
  Branches      15299    16519    +1220     
============================================
+ Hits         102638   107443    +4805     
+ Misses        28908    28731     -177     
- Partials       7956     8470     +514     
Flag Coverage Δ
inttests 26.79% <14.10%> (+2.20%) ⬆️
systests 23.28% <20.51%> (-1.05%) ⬇️
unittests 73.76% <71.79%> (+0.91%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/HandlerState.java 94.73% <100.00%> (+0.14%) ⬆️
...ache/pulsar/client/impl/schema/SchemaInfoImpl.java 80.00% <100.00%> (-1.49%) ⬇️
...nt/impl/PulsarClientImplementationBindingImpl.java 80.00% <0.00%> (ø)
.../apache/pulsar/client/impl/schema/SchemaUtils.java 72.72% <60.00%> (-2.85%) ⬇️
...pulsar/client/impl/GeoReplicationProducerImpl.java 47.78% <0.00%> (ø)
...va/org/apache/pulsar/client/impl/ProducerImpl.java 83.69% <78.46%> (+0.10%) ⬆️

... and 1070 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode poorbarcode requested a review from lhotari April 17, 2025 07:29
@@ -200,12 +200,13 @@ public static String getStringSchemaVersion(byte[] schemaVersionBytes) {
* @param schemaInfo the schema info
* @return the jsonified schema info
*/
public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there might be some external usage of this method, it could be useful to add a method with the original method signature for backwards compatibility with the prettyPrinting argument set to true.

@@ -117,6 +117,6 @@ public SchemaHash getSchemaHash() {

@Override
public String toString() {
return SchemaUtils.jsonifySchemaInfo(this);
return SchemaUtils.jsonifySchemaInfo(this, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to change the behavior in this PR? It seems that previously the output was pretty printed, but this changes it to be without pretty printing.

@@ -327,7 +327,7 @@ public KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo sche
* @return the jsonified schema info
*/
public String jsonifySchemaInfo(SchemaInfo schemaInfo) {
return SchemaUtils.jsonifySchemaInfo(schemaInfo);
return SchemaUtils.jsonifySchemaInfo(schemaInfo, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to change the behavior in this PR? It seems that previously the output was pretty printed, but this changes it to be without pretty printing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.12 release/3.3.7 release/4.0.5 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants