Skip to content

KAFKA-19003: Add forceTerminateTransaction command to CLI tools #19276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -2049,4 +2049,30 @@ default DescribeClassicGroupsResult describeClassicGroups(Collection<String> gro
* Get the metrics kept by the adminClient
*/
Map<MetricName, ? extends Metric> metrics();

/**
* Force terminate a transaction for the given transactional ID with the default options.
* <p>
* This is a convenience method for {@link #forceTerminateTransaction(String, TerminateTransactionOptions)}
* with default options.
*
* @param transactionalId The ID of the transaction to terminate.
* @return The TerminateTransactionResult.
*/
default TerminateTransactionResult forceTerminateTransaction(String transactionalId) {
return forceTerminateTransaction(transactionalId, new TerminateTransactionOptions());
}

/**
* Force terminate a transaction for the given transactional ID.
* This operation aborts any ongoing transaction associated with the transactional ID.
* It's similar to fenceProducers but only targets a single transactional ID to handle
* long-running transactions when 2PC is enabled.
*
* @param transactionalId The ID of the transaction to terminate.
* @param options The options to use when terminating the transaction.
* @return The TerminateTransactionResult.
*/
TerminateTransactionResult forceTerminateTransaction(String transactionalId,
TerminateTransactionOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
return delegate.abortTransaction(spec, options);
}

@Override
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
return delegate.forceTerminateTransaction(transactionalId, options);
}

@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
return delegate.listTransactions(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4839,6 +4839,35 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
return new AbortTransactionResult(future.all());
}

/**
* Forcefully terminates an ongoing transaction for a given transactional ID.
* <p>
* This API is intended for well-formed but long-running transactions that are known to the
* transaction coordinator. It is primarily designed for supporting 2PC (two-phase commit) workflows,
* where a coordinator may need to unilaterally terminate a participant transaction that hasn't completed.
* </p>
*
* @param transactionalId The transactional ID whose active transaction should be forcefully terminated.
* @return a {@link TerminateTransactionResult} that can be used to await the operation result.
*/
@Override
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
// Simply leverage the existing fenceProducers implementation with a single transactional ID
FenceProducersOptions fenceOptions = new FenceProducersOptions();
if (options.timeoutMs() != null) {
fenceOptions.timeoutMs(options.timeoutMs());
}

FenceProducersResult fenceResult = fenceProducers(
Collections.singleton(transactionalId),
fenceOptions
);

// Convert the result to a TerminateTransactionResult
KafkaFuture<Void> future = fenceResult.fencedProducers().get(transactionalId);
return new TerminateTransactionResult(future);
}

@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> future =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

/**
* Options for {@link Admin#forceTerminateTransaction(String, TerminateTransactionOptions)}.
*/
public class TerminateTransactionOptions extends AbstractOptions<TerminateTransactionOptions> {

@Override
public String toString() {
return "TerminateTransactionOptions{" +
"timeoutMs=" + timeoutMs +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;

/**
* The result of the {@link Admin#forceTerminateTransaction(String)} call.
*/
public class TerminateTransactionResult {

private final KafkaFuture<Void> future;

TerminateTransactionResult(KafkaFuture<Void> future) {
this.future = future;
}

/**
* Return a future which indicates whether the transaction was successfully terminated.
*/
public KafkaFuture<Void> result() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
Expand Down Expand Up @@ -9746,6 +9747,92 @@ public void testAbortTransactionFindLeaderAfterDisconnect() throws Exception {
}
}

@Test
public void testForceTerminateTransaction() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
String transactionalId = "testForceTerminate";
Node transactionCoordinator = env.cluster().nodes().iterator().next();

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
Errors.NONE,
transactionalId,
transactionCoordinator
));

// Complete the init PID request successfully
InitProducerIdResponseData initProducerIdResponseData = new InitProducerIdResponseData()
.setProducerId(5678)
.setProducerEpoch((short) 123);

env.kafkaClient().prepareResponseFrom(request ->
request instanceof InitProducerIdRequest,
new InitProducerIdResponse(initProducerIdResponseData),
transactionCoordinator
);

// Call force terminate and verify results
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId);
assertNull(result.result().get());
}
}

@Test
public void testForceTerminateTransactionWithError() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
String transactionalId = "testForceTerminateError";
Node transactionCoordinator = env.cluster().nodes().iterator().next();

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
Errors.NONE,
transactionalId,
transactionCoordinator
));

// Return an error from the InitProducerId request
env.kafkaClient().prepareResponseFrom(request ->
request instanceof InitProducerIdRequest,
new InitProducerIdResponse(new InitProducerIdResponseData()
.setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code())),
transactionCoordinator
);

// Call force terminate and verify error is propagated
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId);
ExecutionException exception = assertThrows(ExecutionException.class, () -> result.result().get());
assertTrue(exception.getCause() instanceof TransactionalIdAuthorizationException);
}
}

@Test
public void testForceTerminateTransactionWithCustomTimeout() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
String transactionalId = "testForceTerminateTimeout";
Node transactionCoordinator = env.cluster().nodes().iterator().next();

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
Errors.NONE,
transactionalId,
transactionCoordinator
));

// Complete the init PID request
InitProducerIdResponseData initProducerIdResponseData = new InitProducerIdResponseData()
.setProducerId(9012)
.setProducerEpoch((short) 456);

env.kafkaClient().prepareResponseFrom(request ->
request instanceof InitProducerIdRequest,
new InitProducerIdResponse(initProducerIdResponseData),
transactionCoordinator
);

// Use custom timeout
TerminateTransactionOptions options = new TerminateTransactionOptions().timeoutMs(10000);
TerminateTransactionResult result = env.adminClient().forceTerminateTransaction(transactionalId, options);
assertNull(result.result().get());
}
}

@Test
public void testListTransactions() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,11 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public TerminateTransactionResult forceTerminateTransaction(String transactionalId, TerminateTransactionOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@
import org.apache.kafka.clients.admin.RemoveRaftVoterResult;
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.kafka.clients.admin.TerminateTransactionOptions;
import org.apache.kafka.clients.admin.TerminateTransactionResult;
import org.apache.kafka.clients.admin.UnregisterBrokerOptions;
import org.apache.kafka.clients.admin.UnregisterBrokerResult;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
Expand Down Expand Up @@ -416,6 +418,11 @@ public AbortTransactionResult abortTransaction(final AbortTransactionSpec spec,
return adminDelegate.abortTransaction(spec, options);
}

@Override
public TerminateTransactionResult forceTerminateTransaction(final String transactionalId, final TerminateTransactionOptions options) {
return adminDelegate.forceTerminateTransaction(transactionalId, options);
}

@Override
public ListTransactionsResult listTransactions(final ListTransactionsOptions options) {
return adminDelegate.listTransactions(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,42 @@ void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
}
}

static class ForceTerminateTransactionsCommand extends TransactionsCommand {

ForceTerminateTransactionsCommand(Time time) {
super(time);
}

@Override
String name() {
return "forceTerminateTransaction";
}

@Override
void addSubparser(Subparsers subparsers) {
Subparser subparser = subparsers.addParser(name())
.description("Force abort an ongoing transaction on transactionalId")
.help("Force abort an ongoing transaction on transactionalId (requires administrative privileges)");

subparser.addArgument("--transactionalId")
.help("transactional id")
.action(store())
.type(String.class)
.required(true);
}

@Override
void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
String transactionalId = ns.getString("transactionalId");

try {
admin.forceTerminateTransaction(transactionalId).result().get();
} catch (ExecutionException e) {
printErrorAndExit("Failed to force terminate transactionalId `" + transactionalId + "`", e.getCause());
}
}
}

static class DescribeProducersCommand extends TransactionsCommand {
static final List<String> HEADERS = asList(
"ProducerId",
Expand Down Expand Up @@ -990,7 +1026,8 @@ static void execute(
new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time),
new AbortTransactionCommand(time),
new FindHangingTransactionsCommand(time)
new FindHangingTransactionsCommand(time),
new ForceTerminateTransactionsCommand(time)
);

ArgumentParser parser = buildBaseParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TerminateTransactionResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
Expand Down Expand Up @@ -239,6 +240,35 @@ public void testListTransactions(boolean hasDurationFilter) throws Exception {
assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
}

@Test
public void testForceTerminateTransaction() throws Exception {
String transactionalId = "foo";
String[] args = new String[] {
"--bootstrap-server",
"localhost:9092",
"forceTerminateTransaction",
"--transactionalId",
transactionalId
};

TerminateTransactionResult terminateTransactionResult = Mockito.mock(TerminateTransactionResult.class);
KafkaFuture<Void> future = KafkaFuture.completedFuture(null);
Mockito.when(terminateTransactionResult.result()).thenReturn(future);
Mockito.when(admin.forceTerminateTransaction(transactionalId)).thenReturn(terminateTransactionResult);

execute(args);
assertNormalExit();
}

@Test
public void testForceTerminateTransactionTransactionalIdRequired() throws Exception {
assertCommandFailure(new String[]{
"--bootstrap-server",
"localhost:9092",
"force-terminate"
});
}

@Test
public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
assertCommandFailure(new String[]{
Expand Down Expand Up @@ -1066,5 +1096,4 @@ private void assertCommandFailure(String[] args) throws Exception {
assertTrue(exitProcedure.hasExited());
assertEquals(1, exitProcedure.statusCode());
}

}