Skip to content

Commit

Permalink
CBG-4417 construct missing CV entry from HLV if not present (#7242)
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin authored Dec 13, 2024
1 parent cafd49e commit ddc841e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 93 deletions.
6 changes: 4 additions & 2 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, do

// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *CouchbaseLiteMockPeer) WriteDocument(_ sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.TB().Logf("%s: Writing document %s", p, docID)
// this isn't yet collection aware, using single default collection
client := p.getSingleBlipClient()
// set an HLV here.
docVersion, err := client.btcRunner.PushRev(client.ID(), docID, rest.EmptyDocVersion(), body)
require.NoError(client.btcRunner.TB(), err)
docMetadata := DocMetadataFromDocVersion(docID, docVersion)
// FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing
docMetadata := DocMetadataFromDocVersion(client.btc.TB(), docID, docVersion)
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand All @@ -95,7 +97,7 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(_ sgbucket.DataStoreName, docI
var data []byte
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
var found bool
data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV()})
data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV(c)})
if !assert.True(c, found, "Could not find docID:%+v on %p\nVersion %#v", docID, p, docVersion) {
return
}
Expand Down
89 changes: 50 additions & 39 deletions topologytest/couchbase_server_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/stretchr/testify/require"
)

// dummySystemXattr is created for XDCR testing. This prevents a document echo after an initial write. The dummy xattr also means that the document will always have xattrs when deleting it, which is necessary for WriteUpdateWithXattrs.
const dummySystemXattr = "_dummysystemxattr"

var metadataXattrNames = []string{base.VvXattrName, base.MouXattrName, base.SyncXattrName, dummySystemXattr}

// CouchbaseServerPeer represents an instance of a backing server (bucket). This is rosmar unless SG_TEST_BACKING_STORE=couchbase is set.
type CouchbaseServerPeer struct {
tb testing.TB
Expand Down Expand Up @@ -96,20 +101,19 @@ func (p *CouchbaseServerPeer) GetDocument(dsName sgbucket.DataStoreName, docID s

// CreateDocument creates a document on the peer. The test will fail if the document already exists.
func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.tb.Logf("%s: Creating document %s in bucket %s", p, docID, p.bucket.GetName())
p.tb.Logf("%s: Creating document %s", p, docID)
// create document with xattrs to prevent XDCR from doing a round trip replication in this scenario:
// CBS1: write document (cas1, no _vv)
// CBS1->CBS2: XDCR replication
// CBS2->CBS1: XDCR replication, creates a new _vv
cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{"userxattr": []byte(`{"dummy": "xattr"}`)}, nil, nil)
cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{dummySystemXattr: []byte(`{"dummy": "xattr"}`)}, nil, nil)
require.NoError(p.tb, err)
implicitHLV := db.NewHybridLogicalVector()
require.NoError(p.tb, implicitHLV.AddVersion(db.Version{SourceID: p.SourceID(), Value: cas}))
docMetadata := DocMetadata{
DocID: docID,
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
DocID: docID,
Cas: cas,
ImplicitHLV: implicitHLV,
}
return BodyAndVersion{
docMeta: docMetadata,
Expand All @@ -121,23 +125,16 @@ func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docI
// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.tb.Logf("%s: Writing document %s", p, docID)
var lastXattrs map[string][]byte
// write the document LWW, ignoring any in progress writes
callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) {
return body, nil, false, nil
callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
lastXattrs = xattrs
return sgbucket.UpdatedDoc{Doc: body}, nil
}
cas, err := p.getCollection(dsName).Update(docID, 0, callback)
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.tb, err)
docMetadata := DocMetadata{
DocID: docID,
// FIXME: this should actually probably show the HLV persisted, and then also the implicit CV
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
}
return BodyAndVersion{
docMeta: docMetadata,
docMeta: getDocVersion(docID, p, cas, lastXattrs),
body: body,
updatePeer: p.name,
}
Expand All @@ -146,19 +143,15 @@ func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID
// DeleteDocument deletes a document on the peer. The test will fail if the document does not exist.
func (p *CouchbaseServerPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) DocMetadata {
// delete the document, ignoring any in progress writes. We are allowed to delete a document that does not exist.
callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) {
return nil, nil, true, nil
var lastXattrs map[string][]byte
// write the document LWW, ignoring any in progress writes
callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
lastXattrs = xattrs
return sgbucket.UpdatedDoc{Doc: nil, IsTombstone: true, Xattrs: xattrs}, nil
}
cas, err := p.getCollection(dsName).Update(docID, 0, callback)
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.tb, err)
return DocMetadata{
DocID: docID,
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
}
return getDocVersion(docID, p, cas, lastXattrs)
}

// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
Expand Down Expand Up @@ -191,14 +184,14 @@ func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, d
var err error
var xattrs map[string][]byte
var cas uint64
docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, []string{base.VvXattrName})
docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, metadataXattrNames)
if !assert.NoError(c, err) {
return
}
// have to use p.tb instead of c because of the assert.CollectT doesn't implement TB
version = getDocVersion(docID, p, cas, xattrs)

assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes))
assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes))

}, totalWaitTime, pollInterval)
return docBytes
Expand Down Expand Up @@ -285,6 +278,20 @@ func (p *CouchbaseServerPeer) UpdateTB(tb *testing.T) {
p.tb = tb
}

// useImplicitHLV returns true if the document's HLV is not up to date and an HLV should be composed of current sourceID and cas.
func useImplicitHLV(doc DocMetadata) bool {
if doc.HLV == nil {
return true
}
if doc.HLV.CurrentVersionCAS == doc.Cas {
return false
}
if doc.Mou == nil {
return true
}
return doc.Mou.CAS() != doc.Cas
}

// getDocVersion returns a DocVersion from a cas and xattrs with _vv (hlv) and _sync (RevTreeID).
func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte) DocMetadata {
docVersion := DocMetadata{
Expand All @@ -298,11 +305,15 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte
hlvBytes, ok := xattrs[base.VvXattrName]
if ok {
require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.HLV))
} else {
docVersion.ImplicitCV = &db.Version{
SourceID: peer.SourceID(),
Value: cas,
}
if useImplicitHLV(docVersion) {
if docVersion.HLV == nil {
docVersion.ImplicitHLV = db.NewHybridLogicalVector()
} else {
require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.ImplicitHLV))
docVersion.ImplicitHLV = docVersion.HLV
}
require.NoError(peer.TB(), docVersion.ImplicitHLV.AddVersion(db.Version{SourceID: peer.SourceID(), Value: cas}))
}
sync, ok := xattrs[base.SyncXattrName]
if ok {
Expand All @@ -315,7 +326,7 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte

// getBodyAndVersion returns the body and version of a document from a sgbucket.DataStore.
func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (DocMetadata, db.Body) {
docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, []string{base.VvXattrName})
docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, metadataXattrNames)
require.NoError(peer.TB(), err)
// get hlv to construct DocVersion
var body db.Body
Expand Down
14 changes: 0 additions & 14 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, pee
}
}

// waitForVersionAndBodyOnNonActivePeers waits for a document to reach a specific version on all non-active peers. This is stub until CBG-4417 is implemented.
func waitForVersionAndBodyOnNonActivePeers(t *testing.T, dsName base.ScopeAndCollectionName, docID string, peers Peers, expectedVersion BodyAndVersion) {
for peerName := range peers.SortedPeers() {
if peerName == expectedVersion.updatePeer {
// skip peer the write came from
continue
}
peer := peers[peerName]
t.Logf("waiting for doc version %#v on %s, update written from %s", expectedVersion, peer, expectedVersion.updatePeer)
body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta)
requireBodyEqual(t, expectedVersion.body, body)
}
}

func waitForDeletion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string, deleteActor string) {
for peerName, peer := range peers {
if peer.Type() == PeerTypeCouchbaseLite {
Expand Down
5 changes: 2 additions & 3 deletions topologytest/multi_actor_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func TestMultiActorConflictUpdate(t *testing.T) {

docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description)
replications.Start()
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docVersion)
})
}
}
Expand Down Expand Up @@ -155,7 +154,7 @@ func TestMultiActorConflictResurrect(t *testing.T) {
lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description)
replications.Start()

waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, lastWriteVersion)
waitForVersionAndBody(t, collectionName, peers, docID, lastWriteVersion)
})
}
}
6 changes: 2 additions & 4 deletions topologytest/multi_actor_no_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func TestMultiActorUpdate(t *testing.T) {
for peerName := range peers.SortedPeers() {
docID := getDocID(t) + "_" + peerName
docBodyAndVersion := docVersionList[peerName]
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docBodyAndVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docBodyAndVersion)
}

})
Expand Down Expand Up @@ -126,8 +125,7 @@ func TestMultiActorResurrect(t *testing.T) {
for updatePeerName := range peers {
docID := getDocID(t) + "_" + updatePeerName
docVersion := docVersionList[updatePeerName]
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docVersion)
}
})
}
Expand Down
16 changes: 8 additions & 8 deletions topologytest/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestPeerImplementation(t *testing.T) {
updateBody := []byte(`{"op": "update"}`)
updateVersion := peer.WriteDocument(collectionName, docID, updateBody)
require.NotEmpty(t, updateVersion.docMeta.CV)
require.NotEqual(t, updateVersion.docMeta.CV(), createVersion.docMeta.CV())
require.NotEqual(t, updateVersion.docMeta.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, updateVersion.docMeta.RevTreeID)
} else {
Expand All @@ -374,9 +374,9 @@ func TestPeerImplementation(t *testing.T) {

// Delete
deleteVersion := peer.DeleteDocument(collectionName, docID)
require.NotEmpty(t, deleteVersion.CV())
require.NotEqual(t, deleteVersion.CV(), updateVersion.docMeta.CV())
require.NotEqual(t, deleteVersion.CV(), createVersion.docMeta.CV())
require.NotEmpty(t, deleteVersion.CV(t))
require.NotEqual(t, deleteVersion.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, deleteVersion.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, deleteVersion.RevTreeID)
} else {
Expand All @@ -390,10 +390,10 @@ func TestPeerImplementation(t *testing.T) {

resurrectionBody := []byte(`{"op": "resurrection"}`)
resurrectionVersion := peer.WriteDocument(collectionName, docID, resurrectionBody)
require.NotEmpty(t, resurrectionVersion.docMeta.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), deleteVersion.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), updateVersion.docMeta.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), createVersion.docMeta.CV())
require.NotEmpty(t, resurrectionVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), deleteVersion.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, resurrectionVersion.docMeta.RevTreeID)
} else {
Expand Down
2 changes: 1 addition & 1 deletion topologytest/sync_gateway_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (p *SyncGatewayPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID
// Only assert on CV since RevTreeID might not be present if this was a Couchbase Server write
bodyBytes, err := doc.BodyBytes(ctx)
assert.NoError(c, err)
assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes))
assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes))
}, totalWaitTime, pollInterval)
return doc.Body(ctx)
}
Expand Down
45 changes: 23 additions & 22 deletions topologytest/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,32 @@ package topologytest

import (
"fmt"
"testing"

"github.com/couchbase/sync_gateway/db"
"github.com/couchbase/sync_gateway/rest"
"github.com/stretchr/testify/require"
)

// DocMetadata is a struct that contains metadata about a document. It contains the relevant information for testing versions of documents, as well as debugging information.
type DocMetadata struct {
DocID string // DocID is the document ID
RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present
HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present
Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present
Cas uint64 // Cas is the cas value of the document
ImplicitCV *db.Version // ImplicitCV is the version of the document, if there was no HLV
DocID string // DocID is the document ID
RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present
HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present
Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present
Cas uint64 // Cas is the cas value of the document
ImplicitHLV *db.HybridLogicalVector // ImplicitHLV is the version of the document, if there was no HLV
}

// CV returns the current version of the document.
func (v DocMetadata) CV() db.Version {
if v.HLV == nil {
// If there is no HLV, then the version is implicit from the current ver@sourceID
if v.ImplicitCV == nil {
return db.Version{}
}
return *v.ImplicitCV
}
return db.Version{
SourceID: v.HLV.SourceID,
Value: v.HLV.Version,
func (v DocMetadata) CV(t require.TestingT) db.Version {
if v.ImplicitHLV != nil {
return *v.ImplicitHLV.ExtractCurrentVersionFromHLV()
} else if v.HLV != nil {
return *v.HLV.ExtractCurrentVersionFromHLV()
}
require.FailNow(t, "no hlv available %#v", v)
return db.Version{}
}

// DocMetadataFromDocument returns a DocVersion from the given document.
Expand All @@ -52,14 +50,17 @@ func DocMetadataFromDocument(doc *db.Document) DocMetadata {
}

func (v DocMetadata) GoString() string {
return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitCV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitCV)
return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV)
}

// DocMetadataFromDocVersion returns metadata DocVersion from the given document and version.
func DocMetadataFromDocVersion(docID string, version rest.DocVersion) DocMetadata {
func DocMetadataFromDocVersion(t testing.TB, docID string, version rest.DocVersion) DocMetadata {
// FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing
hlv := db.NewHybridLogicalVector()
require.NoError(t, hlv.AddVersion(version.CV))
return DocMetadata{
DocID: docID,
RevTreeID: version.RevTreeID,
ImplicitCV: &version.CV,
DocID: docID,
RevTreeID: version.RevTreeID,
ImplicitHLV: hlv,
}
}

0 comments on commit ddc841e

Please sign in to comment.