Skip to content

Commit ad024f9

Browse files
rreddy-22janchilling
authored andcommitted
KAFKA-19003: Add forceTerminateTransaction command to CLI tools (apache#19276)
This patch is part of KIP-939 [Support Participation in 2PC](https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC) The kafka-transactions.sh tool will support a new command --forceTerminateTransaction It has one required argument --transactionalId that would take the transactional id for the transaction to be terminated. The command uses the existing Admin#fenceProducers method to forcefully abort the transaction associated with the specified transactional ID. Under the hood, it sends an InitProducerId request to the transaction coordinator with the given transactional ID and keepPreparedTxn = false by default. This is aligned with the functionality outlined in the KIP. We will be creating a new public method in the Admin Client **public TerminateTransactionResult forceTerminateTransaction(String transactionalId)**, and re-use the existing fence producer method. Reviewers: Artem Livshits <[email protected]>, Justine Olshan <[email protected]>
1 parent 023936f commit ad024f9

File tree

10 files changed

+297
-2
lines changed

10 files changed

+297
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+26
Original file line numberDiff line numberDiff line change
@@ -2049,4 +2049,30 @@ default DescribeClassicGroupsResult describeClassicGroups(Collection<String> gro
20492049
* Get the metrics kept by the adminClient
20502050
*/
20512051
Map<MetricName, ? extends Metric> metrics();
2052+
2053+
/**
2054+
* Force terminate a transaction for the given transactional ID with the default options.
2055+
* <p>
2056+
* This is a convenience method for {@link #forceTerminateTransaction(String, TerminateTransactionOptions)}
2057+
* with default options.
2058+
*
2059+
* @param transactionalId The ID of the transaction to terminate.
2060+
* @return The TerminateTransactionResult.
2061+
*/
2062+
default TerminateTransactionResult forceTerminateTransaction(String transactionalId) {
2063+
return forceTerminateTransaction(transactionalId, new TerminateTransactionOptions());
2064+
}
2065+
2066+
/**
2067+
* Force terminate a transaction for the given transactional ID.
2068+
* This operation aborts any ongoing transaction associated with the transactional ID.
2069+
* It's similar to fenceProducers but only targets a single transactional ID to handle
2070+
* long-running transactions when 2PC is enabled.
2071+
*
2072+
* @param transactionalId The ID of the transaction to terminate.
2073+
* @param options The options to use when terminating the transaction.
2074+
* @return The TerminateTransactionResult.
2075+
*/
2076+
TerminateTransactionResult forceTerminateTransaction(String transactionalId,
2077+
TerminateTransactionOptions options);
20522078
}

clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java

+5
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
283283
return delegate.abortTransaction(spec, options);
284284
}
285285

286+
@Override
287+
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
288+
return delegate.forceTerminateTransaction(transactionalId, options);
289+
}
290+
286291
@Override
287292
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
288293
return delegate.listTransactions(options);

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+29
Original file line numberDiff line numberDiff line change
@@ -4839,6 +4839,35 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
48394839
return new AbortTransactionResult(future.all());
48404840
}
48414841

4842+
/**
4843+
* Forcefully terminates an ongoing transaction for a given transactional ID.
4844+
* <p>
4845+
* This API is intended for well-formed but long-running transactions that are known to the
4846+
* transaction coordinator. It is primarily designed for supporting 2PC (two-phase commit) workflows,
4847+
* where a coordinator may need to unilaterally terminate a participant transaction that hasn't completed.
4848+
* </p>
4849+
*
4850+
* @param transactionalId The transactional ID whose active transaction should be forcefully terminated.
4851+
* @return a {@link TerminateTransactionResult} that can be used to await the operation result.
4852+
*/
4853+
@Override
4854+
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
4855+
// Simply leverage the existing fenceProducers implementation with a single transactional ID
4856+
FenceProducersOptions fenceOptions = new FenceProducersOptions();
4857+
if (options.timeoutMs() != null) {
4858+
fenceOptions.timeoutMs(options.timeoutMs());
4859+
}
4860+
4861+
FenceProducersResult fenceResult = fenceProducers(
4862+
Collections.singleton(transactionalId),
4863+
fenceOptions
4864+
);
4865+
4866+
// Convert the result to a TerminateTransactionResult
4867+
KafkaFuture<Void> future = fenceResult.fencedProducers().get(transactionalId);
4868+
return new TerminateTransactionResult(future);
4869+
}
4870+
48424871
@Override
48434872
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
48444873
AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> future =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
/**
21+
* Options for {@link Admin#forceTerminateTransaction(String, TerminateTransactionOptions)}.
22+
*/
23+
public class TerminateTransactionOptions extends AbstractOptions<TerminateTransactionOptions> {
24+
25+
@Override
26+
public String toString() {
27+
return "TerminateTransactionOptions{" +
28+
"timeoutMs=" + timeoutMs +
29+
'}';
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.KafkaFuture;
21+
22+
/**
23+
* The result of the {@link Admin#forceTerminateTransaction(String)} call.
24+
*/
25+
public class TerminateTransactionResult {
26+
27+
private final KafkaFuture<Void> future;
28+
29+
TerminateTransactionResult(KafkaFuture<Void> future) {
30+
this.future = future;
31+
}
32+
33+
/**
34+
* Return a future which indicates whether the transaction was successfully terminated.
35+
*/
36+
public KafkaFuture<Void> result() {
37+
return future;
38+
}
39+
}

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

+87
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.kafka.common.errors.TopicAuthorizationException;
7373
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
7474
import org.apache.kafka.common.errors.TopicExistsException;
75+
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
7576
import org.apache.kafka.common.errors.UnknownMemberIdException;
7677
import org.apache.kafka.common.errors.UnknownServerException;
7778
import org.apache.kafka.common.errors.UnknownTopicIdException;
@@ -9746,6 +9747,92 @@ public void testAbortTransactionFindLeaderAfterDisconnect() throws Exception {
97469747
}
97479748
}
97489749

9750+
@Test
9751+
public void testForceTerminateTransaction() throws Exception {
9752+
try (AdminClientUnitTestEnv env = mockClientEnv()) {
9753+
String transactionalId = "testForceTerminate";
9754+
Node transactionCoordinator = env.cluster().nodes().iterator().next();
9755+
9756+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
9757+
Errors.NONE,
9758+
transactionalId,
9759+
transactionCoordinator
9760+
));
9761+
9762+
// Complete the init PID request successfully
9763+
InitProducerIdResponseData initProducerIdResponseData = new InitProducerIdResponseData()
9764+
.setProducerId(5678)
9765+
.setProducerEpoch((short) 123);
9766+
9767+
env.kafkaClient().prepareResponseFrom(request ->
9768+
request instanceof InitProducerIdRequest,
9769+
new InitProducerIdResponse(initProducerIdResponseData),
9770+
transactionCoordinator
9771+
);
9772+
9773+
// Call force terminate and verify results
9774+
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId);
9775+
assertNull(result.result().get());
9776+
}
9777+
}
9778+
9779+
@Test
9780+
public void testForceTerminateTransactionWithError() throws Exception {
9781+
try (AdminClientUnitTestEnv env = mockClientEnv()) {
9782+
String transactionalId = "testForceTerminateError";
9783+
Node transactionCoordinator = env.cluster().nodes().iterator().next();
9784+
9785+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
9786+
Errors.NONE,
9787+
transactionalId,
9788+
transactionCoordinator
9789+
));
9790+
9791+
// Return an error from the InitProducerId request
9792+
env.kafkaClient().prepareResponseFrom(request ->
9793+
request instanceof InitProducerIdRequest,
9794+
new InitProducerIdResponse(new InitProducerIdResponseData()
9795+
.setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code())),
9796+
transactionCoordinator
9797+
);
9798+
9799+
// Call force terminate and verify error is propagated
9800+
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId);
9801+
ExecutionException exception = assertThrows(ExecutionException.class, () -> result.result().get());
9802+
assertTrue(exception.getCause() instanceof TransactionalIdAuthorizationException);
9803+
}
9804+
}
9805+
9806+
@Test
9807+
public void testForceTerminateTransactionWithCustomTimeout() throws Exception {
9808+
try (AdminClientUnitTestEnv env = mockClientEnv()) {
9809+
String transactionalId = "testForceTerminateTimeout";
9810+
Node transactionCoordinator = env.cluster().nodes().iterator().next();
9811+
9812+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
9813+
Errors.NONE,
9814+
transactionalId,
9815+
transactionCoordinator
9816+
));
9817+
9818+
// Complete the init PID request
9819+
InitProducerIdResponseData initProducerIdResponseData = new InitProducerIdResponseData()
9820+
.setProducerId(9012)
9821+
.setProducerEpoch((short) 456);
9822+
9823+
env.kafkaClient().prepareResponseFrom(request ->
9824+
request instanceof InitProducerIdRequest,
9825+
new InitProducerIdResponse(initProducerIdResponseData),
9826+
transactionCoordinator
9827+
);
9828+
9829+
// Use custom timeout
9830+
TerminateTransactionOptions options = new TerminateTransactionOptions().timeoutMs(10000);
9831+
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId, options);
9832+
assertNull(result.result().get());
9833+
}
9834+
}
9835+
97499836
@Test
97509837
public void testListTransactions() throws Exception {
97519838
try (AdminClientUnitTestEnv env = mockClientEnv()) {

clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,11 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
13821382
throw new UnsupportedOperationException("Not implemented yet");
13831383
}
13841384

1385+
@Override
1386+
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
1387+
throw new UnsupportedOperationException("Not implemented yet");
1388+
}
1389+
13851390
@Override
13861391
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
13871392
throw new UnsupportedOperationException("Not implemented yet");

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java

+7
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@
141141
import org.apache.kafka.clients.admin.RemoveRaftVoterResult;
142142
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
143143
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
144+
import org.apache.kafka.clients.admin.TerminateTransactionOptions;
145+
import org.apache.kafka.clients.admin.TerminateTransactionResult;
144146
import org.apache.kafka.clients.admin.UnregisterBrokerOptions;
145147
import org.apache.kafka.clients.admin.UnregisterBrokerResult;
146148
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
@@ -416,6 +418,11 @@ public AbortTransactionResult abortTransaction(final AbortTransactionSpec spec,
416418
return adminDelegate.abortTransaction(spec, options);
417419
}
418420

421+
@Override
422+
public TerminateTransactionResult forceTerminateTransaction(final String transactionalId, final TerminateTransactionOptions options) {
423+
return adminDelegate.forceTerminateTransaction(transactionalId, options);
424+
}
425+
419426
@Override
420427
public ListTransactionsResult listTransactions(final ListTransactionsOptions options) {
421428
return adminDelegate.listTransactions(options);

tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,42 @@ void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
254254
}
255255
}
256256

257+
static class ForceTerminateTransactionsCommand extends TransactionsCommand {
258+
259+
ForceTerminateTransactionsCommand(Time time) {
260+
super(time);
261+
}
262+
263+
@Override
264+
String name() {
265+
return "forceTerminateTransaction";
266+
}
267+
268+
@Override
269+
void addSubparser(Subparsers subparsers) {
270+
Subparser subparser = subparsers.addParser(name())
271+
.description("Force abort an ongoing transaction on transactionalId")
272+
.help("Force abort an ongoing transaction on transactionalId (requires administrative privileges)");
273+
274+
subparser.addArgument("--transactionalId")
275+
.help("transactional id")
276+
.action(store())
277+
.type(String.class)
278+
.required(true);
279+
}
280+
281+
@Override
282+
void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
283+
String transactionalId = ns.getString("transactionalId");
284+
285+
try {
286+
admin.forceTerminateTransaction(transactionalId).result().get();
287+
} catch (ExecutionException e) {
288+
printErrorAndExit("Failed to force terminate transactionalId `" + transactionalId + "`", e.getCause());
289+
}
290+
}
291+
}
292+
257293
static class DescribeProducersCommand extends TransactionsCommand {
258294
static final List<String> HEADERS = asList(
259295
"ProducerId",
@@ -990,7 +1026,8 @@ static void execute(
9901026
new DescribeTransactionsCommand(time),
9911027
new DescribeProducersCommand(time),
9921028
new AbortTransactionCommand(time),
993-
new FindHangingTransactionsCommand(time)
1029+
new FindHangingTransactionsCommand(time),
1030+
new ForceTerminateTransactionsCommand(time)
9941031
);
9951032

9961033
ArgumentParser parser = buildBaseParser();

tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.admin.ListTransactionsOptions;
3030
import org.apache.kafka.clients.admin.ListTransactionsResult;
3131
import org.apache.kafka.clients.admin.ProducerState;
32+
import org.apache.kafka.clients.admin.TerminateTransactionResult;
3233
import org.apache.kafka.clients.admin.TopicDescription;
3334
import org.apache.kafka.clients.admin.TransactionDescription;
3435
import org.apache.kafka.clients.admin.TransactionListing;
@@ -239,6 +240,35 @@ public void testListTransactions(boolean hasDurationFilter) throws Exception {
239240
assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
240241
}
241242

243+
@Test
244+
public void testForceTerminateTransaction() throws Exception {
245+
String transactionalId = "foo";
246+
String[] args = new String[] {
247+
"--bootstrap-server",
248+
"localhost:9092",
249+
"forceTerminateTransaction",
250+
"--transactionalId",
251+
transactionalId
252+
};
253+
254+
TerminateTransactionResult terminateTransactionResult = Mockito.mock(TerminateTransactionResult.class);
255+
KafkaFuture<Void> future = KafkaFuture.completedFuture(null);
256+
Mockito.when(terminateTransactionResult.result()).thenReturn(future);
257+
Mockito.when(admin.forceTerminateTransaction(transactionalId)).thenReturn(terminateTransactionResult);
258+
259+
execute(args);
260+
assertNormalExit();
261+
}
262+
263+
@Test
264+
public void testForceTerminateTransactionTransactionalIdRequired() throws Exception {
265+
assertCommandFailure(new String[]{
266+
"--bootstrap-server",
267+
"localhost:9092",
268+
"force-terminate"
269+
});
270+
}
271+
242272
@Test
243273
public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
244274
assertCommandFailure(new String[]{
@@ -1066,5 +1096,4 @@ private void assertCommandFailure(String[] args) throws Exception {
10661096
assertTrue(exitProcedure.hasExited());
10671097
assertEquals(1, exitProcedure.statusCode());
10681098
}
1069-
10701099
}

0 commit comments

Comments
 (0)