Skip to content

Commit 2cd733c

Browse files
authored
KAFKA-17184: Fix the error thrown while accessing the RemoteIndexCache (#19462)
For segments that are uploaded to remote, RemoteIndexCache caches the fetched offset, timestamp, and transaction index entries on the first invocation to remote, then the subsequent invocations are accessed from local. The remote indexes that are cached locally gets removed on two cases: 1. Remote segments that are deleted due to breach by retention size/time and start-offset. 2. The number of cached indexes exceed the remote-log-index-cache size limit of 1 GB (default). There are two layers of locks used in the RemoteIndexCache. First-layer lock on the RemoteIndexCache and the second-layer lock on the RemoteIndexCache#Entry. **Issue** 1. The first-layer of lock coordinates the remote-log reader and deleter threads. To ensure that the reader and deleter threads are not blocked on each other, we only take `lock.readLock()` when accessing/deleting the cached index entries. 2. The issue happens when both the reader and deleter threads took the readLock, then the deleter thread marked the index as `markedForCleanup`. Now, the reader thread which holds the `indexEntry` gets an IllegalStateException when accessing it. 3. This is a concurrency issue, where we mark the entry as `markedForCleanup` before removing it from the cache. See RemoteIndexCache#remove, and RemoteIndexCache#removeAll methods. 4. When an entry gets evicted from cache due to breach by maxSize of 1 GB, then the cache remove that entry before calling the evictionListener and all the operations are performed atomically by caffeine cache. **Solution** 1. When the deleter thread marks an Entry for deletion, then we rename the underlying index files with ".deleted" as suffix and add a job to the remote-log-index-cleaner thread which perform the actual cleanup. Previously, the indexes were not accessible once it was marked for deletion. Now, we allow to access those renamed files (from entry that is about to be removed and held by reader thread) until those relevant files are removed from disk. 2. Similar to local-log index/segment deletion, once the files gets renamed with ".deleted" as suffix then the actual deletion of file happens after `file.delete.delay.ms` delay of 1 minute. The renamed index files gets deleted after 30 seconds. 3. During this time, if the same index entry gets fetched again from remote, then it does not have conflict with the deleted entry as the file names are different. Reviewers: Satish Duggana <[email protected]>
1 parent 8df7002 commit 2cd733c

File tree

4 files changed

+296
-175
lines changed

4 files changed

+296
-175
lines changed

core/src/test/scala/unit/kafka/utils/SchedulerTest.scala

+20
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,24 @@ class SchedulerTest {
207207
tickExecutor.shutdownNow()
208208
}
209209
}
210+
211+
@Test
212+
def testPendingTaskSize(): Unit = {
213+
val latch1 = new CountDownLatch(1)
214+
val latch2 = new CountDownLatch(2)
215+
val task1 = new Runnable {
216+
override def run(): Unit = {
217+
latch1.await()
218+
}
219+
}
220+
scheduler.scheduleOnce("task1", task1, 0)
221+
scheduler.scheduleOnce("task2", () => latch2.countDown(), 5)
222+
scheduler.scheduleOnce("task3", () => latch2.countDown(), 5)
223+
assertEquals(2, scheduler.pendingTaskSize())
224+
latch1.countDown()
225+
latch2.await()
226+
assertEquals(0, scheduler.pendingTaskSize())
227+
scheduler.shutdown()
228+
assertEquals(0, scheduler.pendingTaskSize())
229+
}
210230
}

server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java

+4
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,8 @@ public boolean taskRunning(ScheduledFuture<?> task) {
187187
ScheduledThreadPoolExecutor e = executor;
188188
return e != null && e.getQueue().contains(task);
189189
}
190+
191+
public int pendingTaskSize() {
192+
return isStarted() ? executor.getQueue().size() : 0;
193+
}
190194
}

0 commit comments

Comments
 (0)