Skip to content

Commit 284c18d

Browse files
committed
Increase Kafka topic polling time in test to 10 seconds
We've seen multiple failures where the 3 second timeout was breached. This happens in the test setup of many different tests so multiple individual test failures have been reported. Also replace deprecated method in Kafka API and simplify a couple other code paths. Signed-off-by: Andrew Ross <[email protected]>
1 parent d907511 commit 284c18d

File tree

1 file changed

+8
-12
lines changed
  • plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka

1 file changed

+8
-12
lines changed

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,33 +53,29 @@ public static void createTopic(String topicName, int numOfPartitions, String boo
5353
}
5454

5555
// validates topic is created
56-
await().atMost(3, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
56+
await().atMost(10, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
5757
}
5858

5959
public static boolean checkTopicExistence(String topicName, String bootstrapServers) {
6060
return getAdminClient(bootstrapServers, (client -> {
61-
Map<String, KafkaFuture<TopicDescription>> topics = client.describeTopics(List.of(topicName)).values();
61+
Map<String, KafkaFuture<TopicDescription>> topics = client.describeTopics(List.of(topicName)).topicNameValues();
6262

6363
try {
6464
return topics.containsKey(topicName) && topics.get(topicName).get().name().equals(topicName);
65-
} catch (InterruptedException e) {
66-
LOGGER.error("error on checkTopicExistence", e);
67-
return false;
68-
} catch (ExecutionException e) {
65+
} catch (InterruptedException | ExecutionException e) {
6966
LOGGER.error("error on checkTopicExistence", e);
7067
return false;
7168
}
7269
}));
7370
}
7471

7572
private static <Rep> Rep getAdminClient(String bootstrapServer, Function<AdminClient, Rep> function) {
76-
AdminClient adminClient = KafkaAdminClient.create(
77-
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test")
78-
);
79-
try {
73+
try (
74+
AdminClient adminClient = KafkaAdminClient.create(
75+
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test")
76+
)
77+
) {
8078
return function.apply(adminClient);
81-
} finally {
82-
adminClient.close();
8379
}
8480
}
8581
}

0 commit comments

Comments
 (0)