From bc91be9eaa6b5d50e5389f5c938b6ef46406cd4a Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Tue, 20 Jan 2026 23:56:45 +0800 Subject: [PATCH 1/5] Fix core dumps triggered by rocksdb compacting when shutdown bk --- .../bookie/storage/ldb/EntryLocationIndex.java | 16 ++++++++++++---- .../ldb/SingleDirectoryDbLedgerStorage.java | 3 +++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index a353b7cf7ee..05ef8a93dff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -26,6 +26,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; @@ -48,7 +50,7 @@ public class EntryLocationIndex implements Closeable { private final KeyValueStorage locationsDb; private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); private final EntryLocationIndexStats stats; - private boolean isCompacting; + private final AtomicBoolean compacting = new AtomicBoolean(false); public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, StatsLogger stats) throws IOException { @@ -203,15 +205,21 @@ public String getEntryLocationDBPath() { public void compact() throws IOException { try { - isCompacting = true; + if (!compacting.compareAndSet(false, true)) { + return; + } locationsDb.compact(); } finally { - isCompacting = false; + compacting.set(false); } } public boolean isCompacting() { - return isCompacting; + return compacting.get(); + } + + public boolean compareAndSetCompacting(boolean expectedValue, boolean newValue) { + return compacting.compareAndSet(expectedValue, newValue); } public void removeOffsetFromDeletedLedgers() throws IOException { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 774d10c158f..fe02ca19c22 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -347,6 +347,9 @@ public void shutdown() throws InterruptedException { try { flush(); + while (!entryLocationIndex.compareAndSetCompacting(false, true)) { + Thread.sleep(100); + } gcThread.shutdown(); entryLogger.close(); From 3e60a8d5a19a8a872675a4bba8b9fb3480281661 Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Wed, 21 Jan 2026 09:17:12 +0800 Subject: [PATCH 2/5] checkstyle --- .../bookkeeper/bookie/storage/ldb/EntryLocationIndex.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index 05ef8a93dff..2c9a160b88d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -25,9 +25,8 @@ import java.io.IOException; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; From 67a6ac2cceaf1c5348c89a237ad7c78a14c96999 Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Thu, 22 Jan 2026 00:47:16 +0800 Subject: [PATCH 3/5] address comment --- .../bookie/storage/ldb/EntryLocationIndex.java | 15 ++++++++++----- .../ldb/SingleDirectoryDbLedgerStorage.java | 3 --- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index 2c9a160b88d..6398af7fd80 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -25,8 +25,8 @@ import java.io.IOException; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; @@ -68,6 +68,15 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora @Override public void close() throws IOException { + while (!compacting.compareAndSet(false, true)) { + // Wait till the locationsDb stops compacting + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } locationsDb.close(); } @@ -217,10 +226,6 @@ public boolean isCompacting() { return compacting.get(); } - public boolean compareAndSetCompacting(boolean expectedValue, boolean newValue) { - return compacting.compareAndSet(expectedValue, newValue); - } - public void removeOffsetFromDeletedLedgers() throws IOException { Set ledgersToDelete = deletedLedgers.items(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index fe02ca19c22..774d10c158f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -347,9 +347,6 @@ public void shutdown() throws InterruptedException { try { flush(); - while (!entryLocationIndex.compareAndSetCompacting(false, true)) { - Thread.sleep(100); - } gcThread.shutdown(); entryLogger.close(); From 7018ea1ff70f3b8868e423114e7bcc8e073cfa33 Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Thu, 22 Jan 2026 09:08:50 +0800 Subject: [PATCH 4/5] log --- .../bookkeeper/bookie/storage/ldb/EntryLocationIndex.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index 6398af7fd80..14606f1ba91 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -68,8 +68,13 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora @Override public void close() throws IOException { + long start = System.currentTimeMillis(); + log.info("Closing EntryLocationIndex"); while (!compacting.compareAndSet(false, true)) { // Wait till the locationsDb stops compacting + if ((System.currentTimeMillis() - start) % 1000 == 0) { + log.info("Waiting the locationsDb stops compacting"); + } try { Thread.sleep(100); } catch (InterruptedException e) { @@ -78,6 +83,7 @@ public void close() throws IOException { } } locationsDb.close(); + log.info("Closed EntryLocationIndex cost: {} mills", System.currentTimeMillis() - start); } public long getLocation(long ledgerId, long entryId) throws IOException { From abee9aa1b5c1cd209c47d4bded776a13d1038dcc Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Thu, 22 Jan 2026 12:57:13 +0800 Subject: [PATCH 5/5] optimize and ut --- .../storage/ldb/EntryLocationIndex.java | 13 +++--- .../storage/ldb/EntryLocationIndexTest.java | 40 +++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index 14606f1ba91..35fe7cc6608 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -20,6 +20,7 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import java.io.Closeable; import java.io.IOException; @@ -49,7 +50,8 @@ public class EntryLocationIndex implements Closeable { private final KeyValueStorage locationsDb; private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); private final EntryLocationIndexStats stats; - private final AtomicBoolean compacting = new AtomicBoolean(false); + @VisibleForTesting + final AtomicBoolean compacting = new AtomicBoolean(false); public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, StatsLogger stats) throws IOException { @@ -68,22 +70,19 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora @Override public void close() throws IOException { - long start = System.currentTimeMillis(); log.info("Closing EntryLocationIndex"); while (!compacting.compareAndSet(false, true)) { // Wait till the locationsDb stops compacting - if ((System.currentTimeMillis() - start) % 1000 == 0) { - log.info("Waiting the locationsDb stops compacting"); - } + log.info("Waiting the locationsDb stops compacting"); try { - Thread.sleep(100); + Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); } } locationsDb.close(); - log.info("Closed EntryLocationIndex cost: {} mills", System.currentTimeMillis() - start); + log.info("Closed EntryLocationIndex"); } public long getLocation(long ledgerId, long entryId) throws IOException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java index 80fdfc7bd0e..b614f9e67b3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java @@ -21,14 +21,19 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider; +import org.awaitility.Awaitility; import org.junit.Test; +import org.junit.jupiter.api.Timeout; /** * Unit test for {@link EntryLocationIndex}. @@ -231,4 +236,39 @@ public void testEntryIndexLookupLatencyStats() throws IOException { assertEquals(1, lookupEntryLocationOpStats.getFailureCount()); assertEquals(1, lookupEntryLocationOpStats.getSuccessCount()); } + + @Test + @Timeout(60) + public void testClose() throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + + EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, + tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); + + // mock EntryLocationIndex is compacting + idx.compacting.set(true); + AtomicBoolean closeFlag = new AtomicBoolean(false); + AtomicLong closeEscapedMills = new AtomicLong(0); + new Thread(() -> { + try { + long start = System.currentTimeMillis(); + idx.close(); + closeEscapedMills.set(System.currentTimeMillis() - start); + closeFlag.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).start(); + long sleepMills = 10_000; + Thread.sleep(sleepMills); + assertFalse(closeFlag.get()); + + // mock EntryLocationIndex finish compacting + idx.compacting.set(false); + Awaitility.await().untilAsserted(() -> assertTrue(closeFlag.get())); + assertTrue(closeEscapedMills.get() >= sleepMills); + } }