|
29 | 29 | import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
|
30 | 30 | import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
|
31 | 31 | import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
|
| 32 | +import org.apache.kafka.test.TestUtils; |
32 | 33 |
|
33 | 34 | import org.junit.jupiter.api.AfterEach;
|
34 | 35 |
|
35 | 36 | import java.io.IOException;
|
36 | 37 | import java.util.Arrays;
|
37 | 38 | import java.util.Collections;
|
| 39 | +import java.util.Map; |
38 | 40 | import java.util.concurrent.CountDownLatch;
|
39 | 41 | import java.util.concurrent.ExecutionException;
|
40 | 42 | import java.util.concurrent.TimeUnit;
|
@@ -307,4 +309,16 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th
|
307 | 309 | assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
|
308 | 310 | }
|
309 | 311 |
|
| 312 | + @ClusterTest |
| 313 | + public void testInitializationFailure() throws IOException, InterruptedException { |
| 314 | + try (TopicBasedRemoteLogMetadataManager rlmm = new TopicBasedRemoteLogMetadataManager()) { |
| 315 | + // configure rlmm without bootstrap servers, so it will fail to initialize admin client. |
| 316 | + Map<String, Object> configs = Map.of( |
| 317 | + TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR, TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(), |
| 318 | + TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID, 0 |
| 319 | + ); |
| 320 | + rlmm.configure(configs); |
| 321 | + TestUtils.waitForCondition(rlmm::isInitializationFailed, "Initialization should fail"); |
| 322 | + } |
| 323 | + } |
310 | 324 | }
|
0 commit comments