diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index a353b7cf7ee..35fe7cc6608 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -20,12 +20,14 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import java.io.Closeable; import java.io.IOException; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; @@ -48,7 +50,8 @@ public class EntryLocationIndex implements Closeable { private final KeyValueStorage locationsDb; private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); private final EntryLocationIndexStats stats; - private boolean isCompacting; + @VisibleForTesting + final AtomicBoolean compacting = new AtomicBoolean(false); public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, StatsLogger stats) throws IOException { @@ -67,7 +70,19 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora @Override public void close() throws IOException { + log.info("Closing EntryLocationIndex"); + while (!compacting.compareAndSet(false, true)) { + // Wait till the locationsDb stops compacting + log.info("Waiting the locationsDb stops compacting"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } locationsDb.close(); + log.info("Closed EntryLocationIndex"); } public long getLocation(long ledgerId, long entryId) throws IOException { @@ -203,15 +218,17 @@ public String getEntryLocationDBPath() { public void compact() throws IOException { try { - isCompacting = true; + if (!compacting.compareAndSet(false, true)) { + return; + } locationsDb.compact(); } finally { - isCompacting = false; + compacting.set(false); } } public boolean isCompacting() { - return isCompacting; + return compacting.get(); } public void removeOffsetFromDeletedLedgers() throws IOException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java index 80fdfc7bd0e..b614f9e67b3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java @@ -21,14 +21,19 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider; +import org.awaitility.Awaitility; import org.junit.Test; +import org.junit.jupiter.api.Timeout; /** * Unit test for {@link EntryLocationIndex}. @@ -231,4 +236,39 @@ public void testEntryIndexLookupLatencyStats() throws IOException { assertEquals(1, lookupEntryLocationOpStats.getFailureCount()); assertEquals(1, lookupEntryLocationOpStats.getSuccessCount()); } + + @Test + @Timeout(60) + public void testClose() throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + + EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, + tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); + + // mock EntryLocationIndex is compacting + idx.compacting.set(true); + AtomicBoolean closeFlag = new AtomicBoolean(false); + AtomicLong closeEscapedMills = new AtomicLong(0); + new Thread(() -> { + try { + long start = System.currentTimeMillis(); + idx.close(); + closeEscapedMills.set(System.currentTimeMillis() - start); + closeFlag.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).start(); + long sleepMills = 10_000; + Thread.sleep(sleepMills); + assertFalse(closeFlag.get()); + + // mock EntryLocationIndex finish compacting + idx.compacting.set(false); + Awaitility.await().untilAsserted(() -> assertTrue(closeFlag.get())); + assertTrue(closeEscapedMills.get() >= sleepMills); + } }