Skip to content

Commit cd7168d

Browse files
committed
[fix][ml] Don't estimate number of entries when ledgers are empty, return 1 instead (#24125)
(cherry picked from commit 228a98f)
1 parent 8b2dc3c commit cd7168d

File tree

2 files changed

+52
-9
lines changed

2 files changed

+52
-9
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
22-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
2322
import java.util.Collection;
2423
import java.util.Map;
2524
import java.util.NavigableMap;
@@ -97,8 +96,7 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
9796

9897
long estimatedEntryCount = 0;
9998
long remainingBytesSize = maxSizeBytes;
100-
// Start with a default estimated average size per entry, including any overhead
101-
long currentAvgSize = DEFAULT_ESTIMATED_ENTRY_SIZE + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
99+
long currentAvgSize = 0;
102100
// Get a collection of ledger info starting from the read position
103101
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
104102
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
@@ -159,7 +157,26 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
159157

160158
// Add any remaining bytes to the estimated entry count considering the current average entry size
161159
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
162-
estimatedEntryCount += remainingBytesSize / currentAvgSize;
160+
// need to find the previous non-empty ledger to find the average size
161+
if (currentAvgSize == 0) {
162+
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
163+
ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
164+
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
165+
long ledgerTotalSize = ledgerInfo.getSize();
166+
long ledgerTotalEntries = ledgerInfo.getEntries();
167+
// Skip processing ledgers that have no entries or size
168+
if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
169+
continue;
170+
}
171+
// Update the average entry size based on the current ledger's size and entry count
172+
currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
173+
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
174+
break;
175+
}
176+
}
177+
if (currentAvgSize > 0) {
178+
estimatedEntryCount += remainingBytesSize / currentAvgSize;
179+
}
163180
}
164181

165182
// Ensure at least one entry is always returned as the result

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java

+31-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
22-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
2322
import static org.testng.Assert.assertEquals;
2423
import java.util.HashSet;
2524
import java.util.NavigableMap;
@@ -31,7 +30,6 @@
3130
import org.testng.annotations.Test;
3231

3332
public class EntryCountEstimatorTest {
34-
3533
private NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersInfo;
3634
private Position readPosition;
3735
private Long lastLedgerId;
@@ -202,11 +200,39 @@ public void testWithOnlyLastLedgerWhichIsEmpty() {
202200
beforeLastKey.forEach(ledgersInfo::remove);
203201
lastLedgerTotalEntries = 0;
204202
lastLedgerTotalSize = 0;
203+
int result = estimateEntryCountByBytesSize(Integer.MAX_VALUE);
204+
// expect that result is 1 because the estimation couldn't be done
205+
assertEquals(result, 1);
206+
}
207+
208+
@Test
209+
public void testWithOnlySecondLastLedgerAndEmptyLastLedger() {
210+
readPosition = PositionImpl.LATEST;
211+
// remove all but the second last and last ledger
212+
long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
213+
Set<Long> beforeSecondLastKey = new HashSet<>(ledgersInfo.headMap(secondLastLedgerId).keySet());
214+
beforeSecondLastKey.forEach(ledgersInfo::remove);
215+
lastLedgerTotalEntries = 0;
216+
lastLedgerTotalSize = 0;
217+
long expectedEntries = 50;
218+
long requiredSize =
219+
expectedEntries * (2000 / 150 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
220+
int result = estimateEntryCountByBytesSize(requiredSize);
221+
assertEquals(result, expectedEntries);
222+
}
223+
224+
@Test
225+
public void testWithMultipleEmptyLedgers() {
226+
readPosition = PositionImpl.LATEST;
227+
long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
228+
MLDataFormats.ManagedLedgerInfo.LedgerInfo secondLastLedgerInfo = ledgersInfo.get(secondLastLedgerId);
229+
// make the second last ledger empty
230+
ledgersInfo.put(secondLastLedgerId, secondLastLedgerInfo.toBuilder().setEntries(0).setSize(0).build());
231+
lastLedgerTotalEntries = 0;
232+
lastLedgerTotalSize = 0;
205233
long expectedEntries = 50;
206-
// when last is empty, DEFAULT_ESTIMATED_ENTRY_SIZE + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY is used
207-
// for the average size per entry
208234
long requiredSize =
209-
expectedEntries * (DEFAULT_ESTIMATED_ENTRY_SIZE + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
235+
expectedEntries * (3000 / 200 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
210236
int result = estimateEntryCountByBytesSize(requiredSize);
211237
assertEquals(result, expectedEntries);
212238
}

0 commit comments

Comments
 (0)