Skip to content

Commit abb11af

Browse files
FrankYang0529janchilling
authored andcommitted
KAFKA-18949 add consumer protocol to testDeleteRecordsAfterCorruptRecords (apache#19317)
The `PlaintextAdminIntegrationTest#testDeleteRecordsAfterCorruptRecords` was only enabled for classic protocol. Add consumer protocol to it. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 7b430b8 commit abb11af

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -1565,17 +1565,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
15651565
}
15661566

15671567
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
1568-
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
1568+
@MethodSource(Array("getTestGroupProtocolParametersAll"))
15691569
def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = {
15701570
val config = new Properties()
15711571
config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
15721572
createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
15731573

15741574
client = createAdminClient
15751575

1576-
val consumer = createConsumer()
1577-
subscribeAndWaitForAssignment(topic, consumer)
1578-
15791576
val producer = createProducer()
15801577
def sendRecords(begin: Int, end: Int) = {
15811578
val futures = (begin until end).map( i => {
@@ -1608,7 +1605,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
16081605
newContent.flip()
16091606
Files.write(logFilePath, newContent.array(), StandardOpenOption.TRUNCATE_EXISTING)
16101607

1611-
consumer.seekToBeginning(Collections.singletonList(topicPartition))
1608+
val overrideConfig = new Properties
1609+
overrideConfig.setProperty("auto.offset.reset", "earliest")
1610+
val consumer = createConsumer(configOverrides = overrideConfig)
1611+
consumer.subscribe(Seq(topic).asJava)
16121612
assertEquals("Encountered corrupt message when fetching offset 0 for topic-partition topic-0",
16131613
assertThrows(classOf[KafkaException], () => consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)
16141614

0 commit comments

Comments
 (0)