Skip to content

Commit 6e26ec0

Browse files
authored
MINOR: Update GroupCoordinator interface to use AuthorizableRequestContext instead of RequestContext (#19485)
This patch updates the `GroupCoordinator` interface to use `AuthorizableRequestContext` instead of using `RequestContext`. It makes the interface more generic. The only downside is that the request version in `AuthorizableRequestContext` is an `int` instead of a `short` so we had to adapt it in a few places. We opted for using `int` directly wherever possible. Reviewers: Chia-Ping Tsai <[email protected]>, Rajini Sivaram <[email protected]>
1 parent 18e4608 commit 6e26ec0

File tree

18 files changed

+142
-142
lines changed

18 files changed

+142
-142
lines changed

clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public static String maybeTruncateReason(final String reason) {
9696
*
9797
* @return whether a known member id is required or not.
9898
*/
99-
public static boolean requiresKnownMemberId(short apiVersion) {
99+
public static boolean requiresKnownMemberId(int apiVersion) {
100100
return apiVersion >= 4;
101101
}
102102

@@ -117,7 +117,7 @@ public static boolean requiresKnownMemberId(short apiVersion) {
117117
*/
118118
public static boolean requiresKnownMemberId(
119119
JoinGroupRequestData request,
120-
short apiVersion
120+
int apiVersion
121121
) {
122122
return request.groupInstanceId() == null
123123
&& request.memberId().equals(UNKNOWN_MEMBER_ID)
@@ -150,7 +150,7 @@ public static boolean requiresKnownMemberId(
150150
* @return whether the version supports skipping assignment.
151151
*/
152152

153-
public static boolean supportsSkippingAssignment(short apiVersion) {
153+
public static boolean supportsSkippingAssignment(int apiVersion) {
154154
return apiVersion >= 9;
155155
}
156156

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2236,7 +2236,7 @@ public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(
22362236
short producerEpoch,
22372237
Duration timeout,
22382238
CoordinatorWriteOperation<S, T, U> op,
2239-
Short apiVersion
2239+
int apiVersion
22402240
) {
22412241
throwIfNotRunning();
22422242
log.debug("Scheduled execution of transactional write operation {}.", name);

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
105105
String transactionalId,
106106
long producerId,
107107
short producerEpoch,
108-
short apiVersion
108+
int apiVersion
109109
) throws KafkaException;
110110

111111
/**

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
129129
String transactionalId,
130130
long producerId,
131131
short producerEpoch,
132-
short apiVersion
132+
int apiVersion
133133
) throws KafkaException {
134134
return CompletableFuture.completedFuture(new VerificationGuard());
135135
}

core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class CoordinatorPartitionWriter(
107107
transactionalId: String,
108108
producerId: Long,
109109
producerEpoch: Short,
110-
apiVersion: Short
110+
apiVersion: Int
111111
): CompletableFuture[VerificationGuard] = {
112112
val transactionSupportedOperation = AddPartitionsToTxnManager.txnOffsetCommitRequestVersionToTransactionSupportedOperation(apiVersion)
113113
val future = new CompletableFuture[VerificationGuard]()

core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object AddPartitionsToTxnManager {
5151
}
5252
}
5353

54-
def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Short): TransactionSupportedOperation = {
54+
def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Int): TransactionSupportedOperation = {
5555
if (version > 4) {
5656
addPartition
5757
} else if (version > 3) {

core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class CoordinatorPartitionWriterTest {
178178
"transactional-id",
179179
10L,
180180
5.toShort,
181-
ApiKeys.TXN_OFFSET_COMMIT.latestVersion()
181+
ApiKeys.TXN_OFFSET_COMMIT.latestVersion().toInt
182182
)
183183

184184
if (error == Errors.NONE) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void validateOffsetCommit(
117117
String groupInstanceId,
118118
int generationIdOrMemberEpoch,
119119
boolean isTransactional,
120-
short apiVersion
120+
int apiVersion
121121

122122
) throws KafkaException;
123123

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java

+22-22
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@
4949
import org.apache.kafka.common.message.SyncGroupResponseData;
5050
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
5151
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
52-
import org.apache.kafka.common.requests.RequestContext;
5352
import org.apache.kafka.common.requests.TransactionResult;
5453
import org.apache.kafka.common.utils.BufferSupplier;
5554
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
5655
import org.apache.kafka.image.MetadataDelta;
5756
import org.apache.kafka.image.MetadataImage;
57+
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
5858

5959
import java.time.Duration;
6060
import java.util.List;
@@ -80,7 +80,7 @@ public interface GroupCoordinator {
8080
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
8181
*/
8282
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
83-
RequestContext context,
83+
AuthorizableRequestContext context,
8484
ConsumerGroupHeartbeatRequestData request
8585
);
8686

@@ -94,7 +94,7 @@ CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
9494
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
9595
*/
9696
CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
97-
RequestContext context,
97+
AuthorizableRequestContext context,
9898
StreamsGroupHeartbeatRequestData request
9999
);
100100

@@ -108,7 +108,7 @@ CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
108108
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
109109
*/
110110
CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
111-
RequestContext context,
111+
AuthorizableRequestContext context,
112112
ShareGroupHeartbeatRequestData request
113113
);
114114

@@ -123,7 +123,7 @@ CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
123123
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
124124
*/
125125
CompletableFuture<JoinGroupResponseData> joinGroup(
126-
RequestContext context,
126+
AuthorizableRequestContext context,
127127
JoinGroupRequestData request,
128128
BufferSupplier bufferSupplier
129129
);
@@ -139,7 +139,7 @@ CompletableFuture<JoinGroupResponseData> joinGroup(
139139
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
140140
*/
141141
CompletableFuture<SyncGroupResponseData> syncGroup(
142-
RequestContext context,
142+
AuthorizableRequestContext context,
143143
SyncGroupRequestData request,
144144
BufferSupplier bufferSupplier
145145
);
@@ -154,7 +154,7 @@ CompletableFuture<SyncGroupResponseData> syncGroup(
154154
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
155155
*/
156156
CompletableFuture<HeartbeatResponseData> heartbeat(
157-
RequestContext context,
157+
AuthorizableRequestContext context,
158158
HeartbeatRequestData request
159159
);
160160

@@ -168,7 +168,7 @@ CompletableFuture<HeartbeatResponseData> heartbeat(
168168
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
169169
*/
170170
CompletableFuture<LeaveGroupResponseData> leaveGroup(
171-
RequestContext context,
171+
AuthorizableRequestContext context,
172172
LeaveGroupRequestData request
173173
);
174174

@@ -182,7 +182,7 @@ CompletableFuture<LeaveGroupResponseData> leaveGroup(
182182
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
183183
*/
184184
CompletableFuture<ListGroupsResponseData> listGroups(
185-
RequestContext context,
185+
AuthorizableRequestContext context,
186186
ListGroupsRequestData request
187187
);
188188

@@ -196,7 +196,7 @@ CompletableFuture<ListGroupsResponseData> listGroups(
196196
* The error codes of the results are set to indicate the errors occurred during the execution.
197197
*/
198198
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(
199-
RequestContext context,
199+
AuthorizableRequestContext context,
200200
List<String> groupIds
201201
);
202202

@@ -209,7 +209,7 @@ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroup
209209
* @return A future yielding the results or an exception.
210210
*/
211211
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe(
212-
RequestContext context,
212+
AuthorizableRequestContext context,
213213
List<String> groupIds
214214
);
215215

@@ -222,7 +222,7 @@ CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consum
222222
* @return A future yielding the results or an exception.
223223
*/
224224
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(
225-
RequestContext context,
225+
AuthorizableRequestContext context,
226226
List<String> groupIds
227227
);
228228

@@ -235,7 +235,7 @@ CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streams
235235
* @return A future yielding the results or an exception.
236236
*/
237237
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGroupDescribe(
238-
RequestContext context,
238+
AuthorizableRequestContext context,
239239
List<String> groupIds
240240
);
241241

@@ -250,7 +250,7 @@ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGrou
250250
* The error codes of the results are set to indicate the errors occurred during the execution.
251251
*/
252252
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(
253-
RequestContext context,
253+
AuthorizableRequestContext context,
254254
List<String> groupIds,
255255
BufferSupplier bufferSupplier
256256
);
@@ -265,7 +265,7 @@ CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> delet
265265
* The error codes of the results are set to indicate the errors occurred during the execution.
266266
*/
267267
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
268-
RequestContext context,
268+
AuthorizableRequestContext context,
269269
OffsetFetchRequestData.OffsetFetchRequestGroup request,
270270
boolean requireStable
271271
);
@@ -280,7 +280,7 @@ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets
280280
* The error codes of the results are set to indicate the errors occurred during the execution.
281281
*/
282282
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
283-
RequestContext context,
283+
AuthorizableRequestContext context,
284284
OffsetFetchRequestData.OffsetFetchRequestGroup request,
285285
boolean requireStable
286286
);
@@ -295,7 +295,7 @@ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffs
295295
* The error codes of the response are set to indicate the errors occurred during the execution.
296296
*/
297297
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(
298-
RequestContext context,
298+
AuthorizableRequestContext context,
299299
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
300300
);
301301

@@ -309,7 +309,7 @@ CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffset
309309
* The error codes of the response are set to indicate the errors occurred during the execution.
310310
*/
311311
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(
312-
RequestContext context,
312+
AuthorizableRequestContext context,
313313
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
314314
);
315315

@@ -323,7 +323,7 @@ CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffset
323323
* The error codes of the response are set to indicate the errors occurred during the execution.
324324
*/
325325
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
326-
RequestContext context,
326+
AuthorizableRequestContext context,
327327
DeleteShareGroupOffsetsRequestData request
328328
);
329329

@@ -338,7 +338,7 @@ CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
338338
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
339339
*/
340340
CompletableFuture<OffsetCommitResponseData> commitOffsets(
341-
RequestContext context,
341+
AuthorizableRequestContext context,
342342
OffsetCommitRequestData request,
343343
BufferSupplier bufferSupplier
344344
);
@@ -354,7 +354,7 @@ CompletableFuture<OffsetCommitResponseData> commitOffsets(
354354
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
355355
*/
356356
CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(
357-
RequestContext context,
357+
AuthorizableRequestContext context,
358358
TxnOffsetCommitRequestData request,
359359
BufferSupplier bufferSupplier
360360
);
@@ -370,7 +370,7 @@ CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(
370370
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
371371
*/
372372
CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
373-
RequestContext context,
373+
AuthorizableRequestContext context,
374374
OffsetDeleteRequestData request,
375375
BufferSupplier bufferSupplier
376376
);

0 commit comments

Comments
 (0)