Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
*/
package org.apache.bookkeeper.bookie.storage.ldb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
Expand All @@ -48,7 +50,8 @@ public class EntryLocationIndex implements Closeable {
private final KeyValueStorage locationsDb;
private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build();
private final EntryLocationIndexStats stats;
private boolean isCompacting;
@VisibleForTesting
final AtomicBoolean compacting = new AtomicBoolean(false);

public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
StatsLogger stats) throws IOException {
Expand All @@ -67,7 +70,19 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora

@Override
public void close() throws IOException {
log.info("Closing EntryLocationIndex");
while (!compacting.compareAndSet(false, true)) {
// Wait till the locationsDb stops compacting
log.info("Waiting the locationsDb stops compacting");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
locationsDb.close();
log.info("Closed EntryLocationIndex");
}

public long getLocation(long ledgerId, long entryId) throws IOException {
Expand Down Expand Up @@ -203,15 +218,17 @@ public String getEntryLocationDBPath() {

public void compact() throws IOException {
try {
isCompacting = true;
if (!compacting.compareAndSet(false, true)) {
return;
}
locationsDb.compact();
} finally {
isCompacting = false;
compacting.set(false);
}
}

public boolean isCompacting() {
return isCompacting;
return compacting.get();
}

public void removeOffsetFromDeletedLedgers() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
package org.apache.bookkeeper.bookie.storage.ldb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.jupiter.api.Timeout;

/**
* Unit test for {@link EntryLocationIndex}.
Expand Down Expand Up @@ -231,4 +236,39 @@ public void testEntryIndexLookupLatencyStats() throws IOException {
assertEquals(1, lookupEntryLocationOpStats.getFailureCount());
assertEquals(1, lookupEntryLocationOpStats.getSuccessCount());
}

@Test
@Timeout(60)
public void testClose() throws Exception {
File tmpDir = File.createTempFile("bkTest", ".dir");
tmpDir.delete();
tmpDir.mkdir();
tmpDir.deleteOnExit();

EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);

// mock EntryLocationIndex is compacting
idx.compacting.set(true);
AtomicBoolean closeFlag = new AtomicBoolean(false);
AtomicLong closeEscapedMills = new AtomicLong(0);
new Thread(() -> {
try {
long start = System.currentTimeMillis();
idx.close();
closeEscapedMills.set(System.currentTimeMillis() - start);
closeFlag.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
long sleepMills = 10_000;
Thread.sleep(sleepMills);
assertFalse(closeFlag.get());

// mock EntryLocationIndex finish compacting
idx.compacting.set(false);
Awaitility.await().untilAsserted(() -> assertTrue(closeFlag.get()));
assertTrue(closeEscapedMills.get() >= sleepMills);
}
}
Loading