Skip to content

[improve][client]PIP-425:Support connecting with next available endpoint for multi-endpoint serviceUrls #24387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,34 @@ public interface ClientBuilder extends Serializable, Cloneable {
*/
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);

/**
* Configure the service URL recovery init backoff intervals.
*
* <p>When the client is unable to connect to the service URL, it will wait for a certain amount of time before
* trying to recover the service URL. The init backoff intervals can be configured using this method.
*
* <p>
* A value of 0 means don't need wait before retrying to connect to the failed service URL.
* @param serviceUrlRecoveryInitBackoffInterval the initial backoff interval for service URL recovery
* @param unit the time unit for the backoff interval
* @return the client builder instance
*/
ClientBuilder serviceUrlRecoveryInitBackoffInterval(long serviceUrlRecoveryInitBackoffInterval, TimeUnit unit);

/**
* Configure the service URL recovery max backoff interval.
*
* <p>When the client is unable to connect to the service URL, it will wait for a certain amount of time before
* trying to recover the service URL. The max backoff interval can be configured using this method.
*
* <p>
* A value of 0 means don't need wait before retrying to connect to the failed service URL.
* @param serviceUrlRecoveryMaxBackoffInterval the maximum backoff interval for service URL recovery
* @param unit the time unit for the backoff interval
* @return the client builder instance
*/
ClientBuilder serviceUrlRecoveryMaxBackoffInterval(long serviceUrlRecoveryMaxBackoffInterval, TimeUnit unit);

/**
* Configure the listenerName that the broker will return the corresponding `advertisedListener`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.useTls = useTls;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.serviceNameResolver =
new PulsarServiceNameResolver(client.getConfiguration().getServiceUrlRecoveryInitBackoffIntervalMs(),
client.getConfiguration().getServiceUrlRecoveryMaxBackoffIntervalMs());
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);

Expand Down Expand Up @@ -188,8 +190,8 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadata(
serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled,
CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadataAsync(
topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
newFutureCreated.setValue(newFuture);
return newFuture;
Expand Down Expand Up @@ -281,19 +283,20 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
client.getCnxPool().releaseConnection(clientCnx);
});
}, lookupPinnedExecutor).exceptionally(connectionException -> {
serviceNameResolver.markHostAvailability(socketAddress, false);
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
return addressFuture;
}

private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) {

long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
Expand Down Expand Up @@ -356,8 +359,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand Down Expand Up @@ -403,12 +405,12 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.MINUTES)
.create();
getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode,
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode,
topicsPattern, topicsHash);
return topicsFuture;
}

private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
private void getTopicsUnderNamespace(
NamespaceName namespace,
Backoff backoff,
AtomicLong remainingTime,
Expand All @@ -418,7 +420,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
Expand Down Expand Up @@ -451,14 +453,13 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, getTopicsResultFuture,
getTopicsUnderNamespace(namespace, backoff, remainingTime, getTopicsResultFuture,
mode, topicsPattern, topicsHash);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}


@Override
public void close() throws Exception {
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider) {
return this;
}

@Override
public ClientBuilder serviceUrlRecoveryInitBackoffInterval(long serviceUrlRecoveryInitBackoffInterval,
TimeUnit unit) {
checkArgument(serviceUrlRecoveryInitBackoffInterval >= 0,
"serviceUrlRecoveryInitBackoffInterval needs to be >= 0");
conf.setServiceUrlRecoveryInitBackoffIntervalMs(unit.toMillis(serviceUrlRecoveryInitBackoffInterval));
return this;
}

@Override
public ClientBuilder serviceUrlRecoveryMaxBackoffInterval(long serviceUrlRecoveryMaxBackoffInterval,
TimeUnit unit) {
checkArgument(serviceUrlRecoveryMaxBackoffInterval >= 0,
"serviceUrlRecoveryMaxBackoffInterval needs to be >= 0");
conf.setServiceUrlRecoveryMaxBackoffIntervalMs(unit.toMillis(serviceUrlRecoveryMaxBackoffInterval));
return this;
}

@Override
public ClientBuilder listenerName(String listenerName) {
checkArgument(StringUtils.isNotBlank(listenerName), "Param listenerName must not be blank.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public int genRandomKeyToSelectCon() {
return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
}

public CompletableFuture<ClientCnx> getConnection(final ServiceNameResolver serviceNameResolver) {
InetSocketAddress address = serviceNameResolver.resolveHost();
CompletableFuture<ClientCnx> clientCnxCompletableFuture = getConnection(address);
clientCnxCompletableFuture.whenComplete(
(__, throwable) -> serviceNameResolver.markHostAvailability(address, throwable == null));
return clientCnxCompletableFuture;
}

public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress address) {
if (maxConnectionsPerHosts == 0) {
return getConnection(address, address, -1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.pulsar.common.util.Backoff;

@Data
@AllArgsConstructor
class EndpointStatus {
private InetSocketAddress socketAddress;
private Backoff recoverBackoff;
private long lastUpdateTimeStampMs;
private long nextDelayMsToRecover;
private boolean isAvailable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class HttpClient implements Closeable {

protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this.authentication = conf.getAuthentication();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.serviceNameResolver = new PulsarServiceNameResolver(conf.getServiceUrlRecoveryInitBackoffIntervalMs(),
conf.getServiceUrlRecoveryMaxBackoffIntervalMs());
this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());

DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
Expand Down Expand Up @@ -114,7 +115,6 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
confBuilder.setSslEngineFactory(sslEngineFactory);



confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
} catch (Exception e) {
Expand Down Expand Up @@ -168,6 +168,8 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
// auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
serviceNameResolver.markHostAvailability(
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), false);
log.warn("[{}] Failed to perform http request at authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new PulsarClientException(ex));
Expand All @@ -194,10 +196,14 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {

builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
serviceNameResolver.markHostAvailability(
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), false);
log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
future.completeExceptionally(new PulsarClientException(t));
return;
}
serviceNameResolver.markHostAvailability(
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true);

// request not success
if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
Expand Down Expand Up @@ -266,4 +272,5 @@ protected void refreshSslContext() {
log.error("Failed to refresh SSL context", e);
}
}

}
Loading
Loading