Skip to content

Commit

Permalink
Fix locking for inplace_update_support == true
Browse files Browse the repository at this point in the history
  • Loading branch information
ajkr committed May 7, 2024
1 parent 4a6e1e6 commit 7cf0c8e
Showing 1 changed file with 6 additions and 25 deletions.
31 changes: 6 additions & 25 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,11 @@ static bool SaveValue(void* arg, const char* entry) {
assert(s != nullptr);
assert(!s->value || !s->columns);

std::unique_ptr<ReadLock> read_lock;
if (s->inplace_update_support) {
read_lock.reset(new ReadLock(s->mem->GetLock(s->key->user_key())));
}

if (s->protection_bytes_per_key > 0) {
*(s->status) = MemTable::VerifyEntryChecksum(
entry, s->protection_bytes_per_key, s->allow_data_in_errors);
Expand Down Expand Up @@ -1035,10 +1040,6 @@ static bool SaveValue(void* arg, const char* entry) {
return false;
}

if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}

Slice v = GetLengthPrefixedSlice(key_ptr + key_length);

*(s->status) = Status::OK();
Expand All @@ -1049,21 +1050,13 @@ static bool SaveValue(void* arg, const char* entry) {
s->columns->SetPlainValue(v);
}

if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}

*(s->found_final_value) = true;
*(s->is_blob_index) = true;

return false;
}
case kTypeValue:
case kTypeValuePreferredSeqno: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}

Slice v = GetLengthPrefixedSlice(key_ptr + key_length);

if (type == kTypeValuePreferredSeqno) {
Expand Down Expand Up @@ -1100,10 +1093,6 @@ static bool SaveValue(void* arg, const char* entry) {
s->columns->SetPlainValue(v);
}

if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}

*(s->found_final_value) = true;

if (s->is_blob_index != nullptr) {
Expand All @@ -1113,10 +1102,6 @@ static bool SaveValue(void* arg, const char* entry) {
return false;
}
case kTypeWideColumnEntity: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}

Slice v = GetLengthPrefixedSlice(key_ptr + key_length);

*(s->status) = Status::OK();
Expand Down Expand Up @@ -1158,10 +1143,6 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = s->columns->SetWideColumnValue(v);
}

if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}

*(s->found_final_value) = true;

if (s->is_blob_index != nullptr) {
Expand Down Expand Up @@ -1499,9 +1480,9 @@ Status MemTable::Update(SequenceNumber seq, ValueType value_type,

// Update value, if new value size <= previous value size
if (new_size <= prev_size) {
WriteLock wl(GetLock(lkey.user_key()));
char* p =
EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), value.size());
assert((unsigned)((p + value.size()) - entry) ==
(unsigned)(VarintLength(key_length) + key_length +
Expand Down

0 comments on commit 7cf0c8e

Please sign in to comment.