Skip to content

Commit 56591d2

Browse files
KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server module (#19390)
Rewrite these classes in Java and move them to the server module Reviewers: Mickael Maison <[email protected]>
1 parent 5148174 commit 56591d2

File tree

6 files changed

+279
-210
lines changed

6 files changed

+279
-210
lines changed

core/src/main/scala/kafka/server/AclApis.scala

+10-7
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ import org.apache.kafka.common.resource.ResourceType
3232
import org.apache.kafka.security.authorizer.AuthorizerUtils
3333
import org.apache.kafka.server.ProcessRole
3434
import org.apache.kafka.server.authorizer._
35+
import org.apache.kafka.server.purgatory.DelayedFuturePurgatory
3536

3637
import java.util
3738
import java.util.concurrent.CompletableFuture
39+
import java.util.stream.Collectors
3840
import scala.collection.mutable.ArrayBuffer
3941
import scala.collection.mutable
4042
import scala.jdk.CollectionConverters._
@@ -50,7 +52,7 @@ class AclApis(authHelper: AuthHelper,
5052
config: KafkaConfig) extends Logging {
5153
this.logIdent = "[AclApis-%s-%s] ".format(role, config.nodeId)
5254
private val alterAclsPurgatory =
53-
new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.nodeId)
55+
new DelayedFuturePurgatory("AlterAcls", config.nodeId)
5456

5557
def isClosed: Boolean = alterAclsPurgatory.isShutdown
5658

@@ -107,11 +109,11 @@ class AclApis(authHelper: AuthHelper,
107109
}
108110

109111
val future = new CompletableFuture[util.List[AclCreationResult]]()
110-
val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
112+
val createResults = auth.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
111113

112114
def sendResponseCallback(): Unit = {
113115
val aclCreationResults = allBindings.map { acl =>
114-
val result = errorResults.getOrElse(acl, createResults(validBindings.indexOf(acl)).get)
116+
val result = errorResults.getOrElse(acl, createResults.get(validBindings.indexOf(acl)).get)
115117
val creationResult = new AclCreationResult()
116118
result.exception.toScala.foreach { throwable =>
117119
val apiError = ApiError.fromThrowable(throwable)
@@ -123,7 +125,7 @@ class AclApis(authHelper: AuthHelper,
123125
}
124126
future.complete(aclCreationResults.asJava)
125127
}
126-
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
128+
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, () => sendResponseCallback())
127129

128130
future.thenApply[Unit] { aclCreationResults =>
129131
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -147,14 +149,15 @@ class AclApis(authHelper: AuthHelper,
147149

148150
val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
149151
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
150-
.asScala.map(_.toCompletableFuture).toList
152+
.stream().map(_.toCompletableFuture).toList
151153

152154
def sendResponseCallback(): Unit = {
153-
val filterResults = deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
155+
val filterResults: util.List[DeleteAclsFilterResult] = deleteResults.stream().map(_.get)
156+
.map(DeleteAclsResponse.filterResult).collect(Collectors.toList())
154157
future.complete(filterResults)
155158
}
156159

157-
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
160+
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, () => sendResponseCallback())
158161
future.thenApply[Unit] { filterResults =>
159162
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
160163
new DeleteAclsResponse(

core/src/main/scala/kafka/server/DelayedFuture.scala

-107
This file was deleted.

core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala

-96
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.server.purgatory;
19+
20+
import org.apache.kafka.common.errors.TimeoutException;
21+
import org.apache.kafka.common.utils.LogContext;
22+
23+
import org.slf4j.Logger;
24+
25+
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
27+
28+
/**
29+
* A delayed operation using CompletionFutures that can be created by KafkaApis and watched
30+
* in a DelayedFuturePurgatory purgatory. This is used for ACL updates using async Authorizers.
31+
*/
32+
public class DelayedFuture<T> extends DelayedOperation {
33+
34+
private final Logger log = new LogContext().logger(DelayedFuture.class.getName());
35+
private final List<CompletableFuture<T>> futures;
36+
private final Runnable responseCallback;
37+
private final long timeoutMs;
38+
39+
public DelayedFuture(long timeoutMs, List<CompletableFuture<T>> futures, Runnable responseCallback) {
40+
super(timeoutMs);
41+
this.timeoutMs = timeoutMs;
42+
this.futures = futures;
43+
this.responseCallback = responseCallback;
44+
}
45+
46+
/**
47+
* The operation can be completed if all the futures have completed successfully
48+
* or failed with exceptions.
49+
*/
50+
@Override
51+
public boolean tryComplete() {
52+
log.trace("Trying to complete operation for {} futures", futures.size());
53+
54+
long pending = futures.stream().filter(future -> !future.isDone()).count();
55+
if (pending == 0) {
56+
log.trace("All futures have been completed or have errors, completing the delayed operation");
57+
return forceComplete();
58+
} else {
59+
log.trace("{} future still pending, not completing the delayed operation", pending);
60+
return false;
61+
}
62+
}
63+
64+
/**
65+
* Timeout any pending futures and invoke responseCallback. This is invoked when all
66+
* futures have completed or the operation has timed out.
67+
*/
68+
@Override
69+
public void onComplete() {
70+
List<CompletableFuture<T>> pendingFutures = futures.stream().filter(future -> !future.isDone()).toList();
71+
log.trace("Completing operation for {} futures, expired {}", futures.size(), pendingFutures.size());
72+
pendingFutures.forEach(future -> future.completeExceptionally(new TimeoutException("Request has been timed out after " + timeoutMs + " ms")));
73+
responseCallback.run();
74+
}
75+
76+
/**
77+
* This is invoked after onComplete(), so no actions required.
78+
*/
79+
@Override
80+
public void onExpiration() {
81+
// This is invoked after onComplete(), so no actions required.
82+
}
83+
}

0 commit comments

Comments
 (0)