|
72 | 72 | import org.apache.kafka.server.share.session.ShareSessionKey;
|
73 | 73 | import org.apache.kafka.server.storage.log.FetchIsolation;
|
74 | 74 | import org.apache.kafka.server.storage.log.FetchParams;
|
| 75 | +import org.apache.kafka.server.storage.log.FetchPartitionData; |
75 | 76 | import org.apache.kafka.server.util.FutureUtils;
|
76 | 77 | import org.apache.kafka.server.util.timer.MockTimer;
|
77 | 78 | import org.apache.kafka.server.util.timer.SystemTimer;
|
|
115 | 116 | import scala.jdk.javaapi.CollectionConverters;
|
116 | 117 |
|
117 | 118 | import static kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes;
|
| 119 | +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; |
118 | 120 | import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
|
119 | 121 | import static org.junit.jupiter.api.Assertions.assertEquals;
|
120 | 122 | import static org.junit.jupiter.api.Assertions.assertFalse;
|
@@ -1089,28 +1091,63 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException {
|
1089 | 1091 | "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
|
1090 | 1092 | DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
|
1091 | 1093 | mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
|
1092 |
| - mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); |
1093 | 1094 | mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp0, 1);
|
1094 | 1095 | mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 1);
|
1095 | 1096 | mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp2, 1);
|
1096 | 1097 | mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp3, 1);
|
1097 | 1098 |
|
1098 |
| - sharePartitionManager = SharePartitionManagerBuilder.builder() |
1099 |
| - .withReplicaManager(mockReplicaManager) |
1100 |
| - .withTimer(mockTimer) |
1101 |
| - .withBrokerTopicStats(brokerTopicStats) |
1102 |
| - .build(); |
1103 |
| - |
1104 | 1099 | SharePartition sp0 = mock(SharePartition.class);
|
1105 | 1100 | SharePartition sp1 = mock(SharePartition.class);
|
1106 | 1101 | SharePartition sp2 = mock(SharePartition.class);
|
1107 | 1102 | SharePartition sp3 = mock(SharePartition.class);
|
1108 | 1103 |
|
| 1104 | + // Mock the share partitions corresponding to the topic partitions. |
| 1105 | + Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); |
| 1106 | + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); |
| 1107 | + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); |
| 1108 | + partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); |
| 1109 | + partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); |
| 1110 | + // Mock the share partitions to get initialized instantaneously without any error. |
| 1111 | + when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); |
| 1112 | + when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); |
| 1113 | + when(sp2.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); |
| 1114 | + when(sp3.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); |
| 1115 | + // Required mocks so that the share partitions can acquire record. |
| 1116 | + when(sp0.maybeAcquireFetchLock()).thenReturn(true); |
| 1117 | + when(sp1.maybeAcquireFetchLock()).thenReturn(true); |
| 1118 | + when(sp2.maybeAcquireFetchLock()).thenReturn(true); |
| 1119 | + when(sp3.maybeAcquireFetchLock()).thenReturn(true); |
| 1120 | + when(sp0.canAcquireRecords()).thenReturn(true); |
| 1121 | + when(sp1.canAcquireRecords()).thenReturn(true); |
| 1122 | + when(sp2.canAcquireRecords()).thenReturn(true); |
| 1123 | + when(sp3.canAcquireRecords()).thenReturn(true); |
| 1124 | + when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( |
| 1125 | + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); |
| 1126 | + when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( |
| 1127 | + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); |
| 1128 | + when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( |
| 1129 | + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); |
| 1130 | + when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( |
| 1131 | + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); |
| 1132 | + // Mocks to have fetch offset metadata match for share partitions to avoid any extra calls to replicaManager.readFromLog. |
| 1133 | + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); |
| 1134 | + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); |
| 1135 | + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); |
| 1136 | + when(sp3.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); |
| 1137 | + |
| 1138 | + // Mock nextFetchOffset() functionality for share partitions to reflect the moving fetch of share partitions. |
1109 | 1139 | when(sp0.nextFetchOffset()).thenReturn((long) 1, (long) 15, (long) 6, (long) 30, (long) 25);
|
1110 | 1140 | when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 1, (long) 18, (long) 5);
|
1111 | 1141 | when(sp2.nextFetchOffset()).thenReturn((long) 10, (long) 25, (long) 26);
|
1112 | 1142 | when(sp3.nextFetchOffset()).thenReturn((long) 20, (long) 15, (long) 23, (long) 16);
|
1113 | 1143 |
|
| 1144 | + sharePartitionManager = SharePartitionManagerBuilder.builder() |
| 1145 | + .withReplicaManager(mockReplicaManager) |
| 1146 | + .withTimer(mockTimer) |
| 1147 | + .withBrokerTopicStats(brokerTopicStats) |
| 1148 | + .withPartitionCacheMap(partitionCacheMap) |
| 1149 | + .build(); |
| 1150 | + |
1114 | 1151 | doAnswer(invocation -> {
|
1115 | 1152 | assertEquals(1, sp0.nextFetchOffset());
|
1116 | 1153 | assertEquals(4, sp1.nextFetchOffset());
|
@@ -1146,10 +1183,14 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException {
|
1146 | 1183 | int threadCount = 100;
|
1147 | 1184 | ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
|
1148 | 1185 |
|
| 1186 | + FetchParams fetchParams = new FetchParams( |
| 1187 | + FetchRequest.ORDINARY_CONSUMER_ID, -1, 200, |
| 1188 | + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true); |
| 1189 | + |
1149 | 1190 | try {
|
1150 | 1191 | for (int i = 0; i != threadCount; ++i) {
|
1151 | 1192 | executorService.submit(() -> {
|
1152 |
| - sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 0, |
| 1193 | + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, 0, |
1153 | 1194 | MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
|
1154 | 1195 | });
|
1155 | 1196 | // We are blocking the main thread at an interval of 10 threads so that the currently running executorService threads can complete.
|
|
0 commit comments