Skip to content
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced.

### Bug Fixes
* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
* Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode.
Expand Down
34 changes: 29 additions & 5 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "utilities/trace/replayer_impl.h"
#include <thread>

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -1472,31 +1473,54 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
auto& wal = *it;
assert(wal.IsSyncing());

if (logs_.size() > 1) {
if (wal.number < logs_.back().number) {
// Inactive WAL
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"erasing log %" PRIu64
" presync size %" PRIu64 " flushed size %" PRIu64 " thread id %" PRIu64 "\n",
wal.number, wal.GetPreSyncSize(), wal.writer->file()->GetFlushedSize(), pthread_self());
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
} else {
assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"size doesn't match log %" PRIu64
" presync size %" PRIu64 " flushed size %" PRIu64 " thread id %" PRIu64 " \n",
wal.number, wal.GetPreSyncSize(), wal.writer->file()->GetFlushedSize(), pthread_self());
wal.FinishSync();
++it;
}
} else {
assert(wal.number == logs_.back().number);
// Active WAL
wal.FinishSync();
++it;
}
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].IsSyncing()));
log_sync_cv_.SignalAll();
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
log_write_mutex_.AssertHeld();
uint64_t min_wal = 0;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
if (min_wal == 0) {
min_wal = it->number;
}
wal.FinishSync();
}
log_sync_cv_.SignalAll();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal,
up_to);
}

SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ class DBImpl : public DB {

IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size, int caller_id);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,8 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
}

if (s.ok() && flush_needed) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"flushing memtable thread id %" PRIu64 "\n", pthread_self());
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (immutable_db_options_.atomic_flush) {
Expand Down
13 changes: 12 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
earliest.number);
log_recycle_files_.push_back(earliest.number);
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting WAL log %" PRIu64 "\n", earliest.number);
job_context->log_delete_files.push_back(earliest.number);
}
if (job_context->size_log_to_delete == 0) {
Expand All @@ -317,7 +319,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
auto writer = log.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_, last seq number of WAL %" PRIu64 "\n",
log.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
logs_.pop_front();
}
// Current log cannot be obsolete.
Expand Down Expand Up @@ -491,6 +498,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Close log %" PRIu64
" from logs_, last Seq number in WAL %" PRIu64 "\n",
w->get_log_number(), w->GetLastSequence());
auto s = w->Close();
s.PermitUncheckedError();
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
log_file_number_size);
log_file_number_size, 0);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
Expand Down
48 changes: 42 additions & 6 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
int caller_id) {
assert(log_size != nullptr);

if (log_writer->file()->GetFileSize() == 0) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Start writing to WAL: [%" PRIu64 " ]",
log_writer->get_log_number());
}
if (log_writer->get_log_number() != logs_.back().number) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d",
log_writer->get_log_number(), logs_.back().number, caller_id);
}
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch);
log_writer->SetLastSequence(seq);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
Expand Down Expand Up @@ -1468,7 +1481,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 1);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand All @@ -1495,11 +1508,33 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log_write_mutex_.Lock();
}

if (logs_.back().number != log_writer->get_log_number()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"new log file added after last write %" PRIu64 "writer log number %" PRIu64 "thread id %" PRIu64 "\n",
logs_.back().number, log_writer->get_log_number(), pthread_self());
}
bool found = false;
for (auto& log : logs_) {
io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (log.number == log_writer->get_log_number()) {
found = true;
}
if (!io_s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"WAL sync failed with log number %" PRIu64 "writer log number %" PRIu64 "thread id %" PRIu64 "\n",
log.number, log_writer->get_log_number(), pthread_self());
break;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"WAL sync completed with flush log number %" PRIu64 " current writer log number %" PRIu64 "presync size %" PRIu64
" flushed size %" PRIu64 "last sequence %" PRIu64 "thread id %" PRIu64 "\n", log.number, log_writer->get_log_number(), log.GetPreSyncSize(),
log.writer->file()->GetFlushedSize(), log.writer->GetLastSequence(), pthread_self());
}

if (!found) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"write log file not found %" PRIu64 "flushed size %" PRIu64 "last sequence %" PRIu64 "thread id %" PRIu64 "\n",
log_writer->get_log_number(), log_writer->file()->GetFlushedSize(), log_writer->GetLastSequence(), pthread_self());
}

if (UNLIKELY(needs_locking)) {
Expand Down Expand Up @@ -1530,6 +1565,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}

return io_s;
}

Expand Down Expand Up @@ -1569,7 +1605,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 2);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -2191,8 +2227,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64
". Immutable memtables: %d.\n",
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
". Immutable memtables: %d.thread id %" PRIu64 "\n",
cfd->GetName().c_str(), new_log_number, num_imm_unflushed, pthread_self());
mutex_.Lock();
if (recycle_log_number != 0) {
// Since renaming the file is done outside DB mutex, we need to ensure
Expand Down
45 changes: 45 additions & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,51 @@ TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
Close();
}

TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
// Repro bug where a WAL is appended and switched after
// `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
// synced inactive logs. Previously such a WAL would be wrongly untracked
// so the final append would never be synced.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
Reopen(options);

ASSERT_OK(Put("key1", "val1"));

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->FlushWAL(true /* sync */));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();

ASSERT_OK(Put("key3", "val3"));

ASSERT_OK(db_->FlushWAL(true /* sync */));

Close();

// Simulate full loss of unsynced data. This should drop nothing since we did
// `FlushWAL(true /* sync */)` before `Close()`.
fault_env->DropUnsyncedFileData();

Reopen(options);

ASSERT_EQ("val1", Get("key1"));
ASSERT_EQ("val2", Get("key2"));
ASSERT_EQ("val3", Get("key3"));

// Need to close before `fault_env` goes out of scope.
Close();
}

TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Expand Down
1 change: 1 addition & 0 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
last_seq_ = 0;
}

Writer::~Writer() {
Expand Down
6 changes: 6 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -92,11 +93,16 @@ class Writer {

bool TEST_BufferIsEmpty();

void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; }

SequenceNumber GetLastSequence() const { return last_seq_; }

private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
SequenceNumber last_seq_;

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
Expand Down