diff --git a/db/db_versioned.go b/db/db_versioned.go index ea8c45a715..55b2240969 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -241,25 +241,20 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, } buckets[ns] = bucket } - var vn *versionedNamespace - if val := bucket.Get(_minKey); val == nil { + val := bucket.Get(_minKey) + if val == nil { // namespace not created yet - vn = &versionedNamespace{ - keyLen: uint32(size), - } - ve = append(ve, batch.NewWriteInfo( - batch.Put, ns, _minKey, vn.serialize(), - fmt.Sprintf("failed to create metadata for namespace %s", ns), - )) - } else { - if vn, err = deserializeVersionedNamespace(val); err != nil { - nonDBErr = true - return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) - } - if vn.keyLen != uint32(size) { - nonDBErr = true - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) - } + nonDBErr = true + return errors.Wrapf(ErrInvalid, "namespace %s has not been added", ns) + } + vn, err := deserializeVersionedNamespace(val) + if err != nil { + nonDBErr = true + return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) + } + if vn.keyLen != uint32(size) { + nonDBErr = true + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) } } // keep order of the writes same as the original batch @@ -268,71 +263,19 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, write = ve[i] ns = write.Namespace() key = write.Key() - val = write.Value() ) // get bucket bucket, ok := buckets[ns] if !ok { panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns)) } - // check key's last version - var ( - last uint64 - notexist bool - maxKey = keyForWrite(key, math.MaxUint64) - ) - c := bucket.Cursor() - k, _ := c.Seek(maxKey) - if k == nil || bytes.Compare(k, maxKey) == 1 { - k, _ = c.Prev() - if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { - // cursor is at the beginning/end of the bucket or smaller than minimum key - notexist = true - } + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) } - if !notexist { - _, last = parseKey(k) - } - switch write.WriteType() { - case batch.Put: - if bytes.Equal(key, _minKey) { - // create metadata for namespace - if err = bucket.Put(key, val); err != nil { - return errors.Wrap(err, write.Error()) - } - } else { - // wrong-size key should be caught in dedup(), but check anyway - if vnsize[ns] != len(key) { - panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) - } - if !notexist && version <= last { - // not allowed to perform write on an earlier version - nonDBErr = true - return ErrInvalid - } - if err = bucket.Put(keyForWrite(key, version), val); err != nil { - return errors.Wrap(err, write.Error()) - } - } - case batch.Delete: - if notexist { - continue - } - // wrong-size key should be caught in dedup(), but check anyway - if vnsize[ns] != len(key) { - panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) - } - if version < last { - // not allowed to perform delete on an earlier version - nonDBErr = true - return ErrInvalid - } - if err = bucket.Put(keyForDelete(key, version), nil); err != nil { - return errors.Wrap(err, write.Error()) - } - if err = bucket.Delete(keyForWrite(key, version)); err != nil { - return errors.Wrap(err, write.Error()) - } + nonDBErr, err = writeVersionedEntry(version, bucket, write) + if err != nil { + return err } } // write non-versioned keys @@ -382,6 +325,55 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, return nil } +func writeVersionedEntry(version uint64, bucket *bolt.Bucket, ve *batch.WriteInfo) (bool, error) { + var ( + key = ve.Key() + val = ve.Value() + last uint64 + notexist bool + maxKey = keyForWrite(key, math.MaxUint64) + ) + c := bucket.Cursor() + k, _ := c.Seek(maxKey) + if k == nil || bytes.Compare(k, maxKey) == 1 { + k, _ = c.Prev() + if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + notexist = true + } + } + if !notexist { + _, last = parseKey(k) + } + switch ve.WriteType() { + case batch.Put: + if !notexist && version <= last { + // not allowed to perform write on an earlier version + return true, ErrInvalid + } + if err := bucket.Put(keyForWrite(key, version), val); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + case batch.Delete: + if notexist { + return false, nil + } + if version < last { + // not allowed to perform delete on an earlier version + return true, ErrInvalid + } + if err := bucket.Put(keyForDelete(key, version), nil); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + if version == last { + if err := bucket.Delete(keyForWrite(key, version)); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + } + } + return false, nil +} + // dedup does 3 things: // 1. deduplicate entries in the batch, only keep the last write for each key // 2. splits entries into 2 slices according to the input namespace map diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 9fe1c795a5..b725368ef7 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -440,6 +440,7 @@ func TestCommitToDB(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } + r.ErrorContains(db.CommitToDB(1, nil, b), "has not been added") // create namespace r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1))))