Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4417 construct missing CV entry from HLV if not present #7242

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, do

// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *CouchbaseLiteMockPeer) WriteDocument(_ sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.TB().Logf("%s: Writing document %s", p, docID)
// this isn't yet collection aware, using single default collection
client := p.getSingleBlipClient()
// set an HLV here.
docVersion, err := client.btcRunner.PushRev(client.ID(), docID, rest.EmptyDocVersion(), body)
require.NoError(client.btcRunner.TB(), err)
docMetadata := DocMetadataFromDocVersion(docID, docVersion)
// FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing
docMetadata := DocMetadataFromDocVersion(client.btc.TB(), docID, docVersion)
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand All @@ -95,7 +97,7 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(_ sgbucket.DataStoreName, docI
var data []byte
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
var found bool
data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV()})
data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV(c)})
if !assert.True(c, found, "Could not find docID:%+v on %p\nVersion %#v", docID, p, docVersion) {
return
}
Expand Down
89 changes: 50 additions & 39 deletions topologytest/couchbase_server_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/stretchr/testify/require"
)

// dummySystemXattr is created for XDCR testing. This prevents a document echo after an initial write. The dummy xattr also means that the document will always have xattrs when deleting it, which is necessary for WriteUpdateWithXattrs.
const dummySystemXattr = "_dummysystemxattr"

var metadataXattrNames = []string{base.VvXattrName, base.MouXattrName, base.SyncXattrName, dummySystemXattr}

// CouchbaseServerPeer represents an instance of a backing server (bucket). This is rosmar unless SG_TEST_BACKING_STORE=couchbase is set.
type CouchbaseServerPeer struct {
tb testing.TB
Expand Down Expand Up @@ -96,20 +101,19 @@ func (p *CouchbaseServerPeer) GetDocument(dsName sgbucket.DataStoreName, docID s

// CreateDocument creates a document on the peer. The test will fail if the document already exists.
func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.tb.Logf("%s: Creating document %s in bucket %s", p, docID, p.bucket.GetName())
p.tb.Logf("%s: Creating document %s", p, docID)
// create document with xattrs to prevent XDCR from doing a round trip replication in this scenario:
// CBS1: write document (cas1, no _vv)
// CBS1->CBS2: XDCR replication
// CBS2->CBS1: XDCR replication, creates a new _vv
cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{"userxattr": []byte(`{"dummy": "xattr"}`)}, nil, nil)
cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{dummySystemXattr: []byte(`{"dummy": "xattr"}`)}, nil, nil)
require.NoError(p.tb, err)
implicitHLV := db.NewHybridLogicalVector()
require.NoError(p.tb, implicitHLV.AddVersion(db.Version{SourceID: p.SourceID(), Value: cas}))
docMetadata := DocMetadata{
DocID: docID,
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
DocID: docID,
Cas: cas,
ImplicitHLV: implicitHLV,
}
return BodyAndVersion{
docMeta: docMetadata,
Expand All @@ -121,23 +125,16 @@ func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docI
// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
p.tb.Logf("%s: Writing document %s", p, docID)
var lastXattrs map[string][]byte
// write the document LWW, ignoring any in progress writes
callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) {
return body, nil, false, nil
callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
lastXattrs = xattrs
return sgbucket.UpdatedDoc{Doc: body}, nil
}
cas, err := p.getCollection(dsName).Update(docID, 0, callback)
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.tb, err)
docMetadata := DocMetadata{
DocID: docID,
// FIXME: this should actually probably show the HLV persisted, and then also the implicit CV
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
}
return BodyAndVersion{
docMeta: docMetadata,
docMeta: getDocVersion(docID, p, cas, lastXattrs),
body: body,
updatePeer: p.name,
}
Expand All @@ -146,19 +143,15 @@ func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID
// DeleteDocument deletes a document on the peer. The test will fail if the document does not exist.
func (p *CouchbaseServerPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) DocMetadata {
// delete the document, ignoring any in progress writes. We are allowed to delete a document that does not exist.
callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) {
return nil, nil, true, nil
var lastXattrs map[string][]byte
// write the document LWW, ignoring any in progress writes
callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
lastXattrs = xattrs
return sgbucket.UpdatedDoc{Doc: nil, IsTombstone: true, Xattrs: xattrs}, nil
}
cas, err := p.getCollection(dsName).Update(docID, 0, callback)
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.tb, err)
return DocMetadata{
DocID: docID,
Cas: cas,
ImplicitCV: &db.Version{
SourceID: p.SourceID(),
Value: cas,
},
}
return getDocVersion(docID, p, cas, lastXattrs)
}

// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
Expand Down Expand Up @@ -191,14 +184,14 @@ func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, d
var err error
var xattrs map[string][]byte
var cas uint64
docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, []string{base.VvXattrName})
docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, metadataXattrNames)
if !assert.NoError(c, err) {
return
}
// have to use p.tb instead of c because of the assert.CollectT doesn't implement TB
version = getDocVersion(docID, p, cas, xattrs)

assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes))
assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes))

}, totalWaitTime, pollInterval)
return docBytes
Expand Down Expand Up @@ -285,6 +278,20 @@ func (p *CouchbaseServerPeer) UpdateTB(tb *testing.T) {
p.tb = tb
}

// useImplicitHLV returns true if the document's HLV is not up to date and an HLV should be composed of current sourceID and cas.
func useImplicitHLV(doc DocMetadata) bool {
if doc.HLV == nil {
return true
}
if doc.HLV.CurrentVersionCAS == doc.Cas {
return false
}
if doc.Mou == nil {
return true
}
return doc.Mou.CAS() != doc.Cas
}

// getDocVersion returns a DocVersion from a cas and xattrs with _vv (hlv) and _sync (RevTreeID).
func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte) DocMetadata {
docVersion := DocMetadata{
Expand All @@ -298,11 +305,15 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte
hlvBytes, ok := xattrs[base.VvXattrName]
if ok {
require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.HLV))
} else {
docVersion.ImplicitCV = &db.Version{
SourceID: peer.SourceID(),
Value: cas,
}
if useImplicitHLV(docVersion) {
if docVersion.HLV == nil {
docVersion.ImplicitHLV = db.NewHybridLogicalVector()
} else {
require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.ImplicitHLV))
docVersion.ImplicitHLV = docVersion.HLV
}
require.NoError(peer.TB(), docVersion.ImplicitHLV.AddVersion(db.Version{SourceID: peer.SourceID(), Value: cas}))
}
sync, ok := xattrs[base.SyncXattrName]
if ok {
Expand All @@ -315,7 +326,7 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte

// getBodyAndVersion returns the body and version of a document from a sgbucket.DataStore.
func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (DocMetadata, db.Body) {
docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, []string{base.VvXattrName})
docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, metadataXattrNames)
require.NoError(peer.TB(), err)
// get hlv to construct DocVersion
var body db.Body
Expand Down
14 changes: 0 additions & 14 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, pee
}
}

// waitForVersionAndBodyOnNonActivePeers waits for a document to reach a specific version on all non-active peers. This is stub until CBG-4417 is implemented.
func waitForVersionAndBodyOnNonActivePeers(t *testing.T, dsName base.ScopeAndCollectionName, docID string, peers Peers, expectedVersion BodyAndVersion) {
for peerName := range peers.SortedPeers() {
if peerName == expectedVersion.updatePeer {
// skip peer the write came from
continue
}
peer := peers[peerName]
t.Logf("waiting for doc version %#v on %s, update written from %s", expectedVersion, peer, expectedVersion.updatePeer)
body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta)
requireBodyEqual(t, expectedVersion.body, body)
}
}

func waitForDeletion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string, deleteActor string) {
for peerName, peer := range peers {
if peer.Type() == PeerTypeCouchbaseLite {
Expand Down
5 changes: 2 additions & 3 deletions topologytest/multi_actor_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func TestMultiActorConflictUpdate(t *testing.T) {

docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description)
replications.Start()
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docVersion)
})
}
}
Expand Down Expand Up @@ -155,7 +154,7 @@ func TestMultiActorConflictResurrect(t *testing.T) {
lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description)
replications.Start()

waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, lastWriteVersion)
waitForVersionAndBody(t, collectionName, peers, docID, lastWriteVersion)
})
}
}
6 changes: 2 additions & 4 deletions topologytest/multi_actor_no_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func TestMultiActorUpdate(t *testing.T) {
for peerName := range peers.SortedPeers() {
docID := getDocID(t) + "_" + peerName
docBodyAndVersion := docVersionList[peerName]
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docBodyAndVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docBodyAndVersion)
}

})
Expand Down Expand Up @@ -126,8 +125,7 @@ func TestMultiActorResurrect(t *testing.T) {
for updatePeerName := range peers {
docID := getDocID(t) + "_" + updatePeerName
docVersion := docVersionList[updatePeerName]
// FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists
waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion)
waitForVersionAndBody(t, collectionName, peers, docID, docVersion)
}
})
}
Expand Down
16 changes: 8 additions & 8 deletions topologytest/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestPeerImplementation(t *testing.T) {
updateBody := []byte(`{"op": "update"}`)
updateVersion := peer.WriteDocument(collectionName, docID, updateBody)
require.NotEmpty(t, updateVersion.docMeta.CV)
require.NotEqual(t, updateVersion.docMeta.CV(), createVersion.docMeta.CV())
require.NotEqual(t, updateVersion.docMeta.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, updateVersion.docMeta.RevTreeID)
} else {
Expand All @@ -374,9 +374,9 @@ func TestPeerImplementation(t *testing.T) {

// Delete
deleteVersion := peer.DeleteDocument(collectionName, docID)
require.NotEmpty(t, deleteVersion.CV())
require.NotEqual(t, deleteVersion.CV(), updateVersion.docMeta.CV())
require.NotEqual(t, deleteVersion.CV(), createVersion.docMeta.CV())
require.NotEmpty(t, deleteVersion.CV(t))
require.NotEqual(t, deleteVersion.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, deleteVersion.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, deleteVersion.RevTreeID)
} else {
Expand All @@ -390,10 +390,10 @@ func TestPeerImplementation(t *testing.T) {

resurrectionBody := []byte(`{"op": "resurrection"}`)
resurrectionVersion := peer.WriteDocument(collectionName, docID, resurrectionBody)
require.NotEmpty(t, resurrectionVersion.docMeta.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), deleteVersion.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), updateVersion.docMeta.CV())
require.NotEqual(t, resurrectionVersion.docMeta.CV(), createVersion.docMeta.CV())
require.NotEmpty(t, resurrectionVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), deleteVersion.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, resurrectionVersion.docMeta.RevTreeID)
} else {
Expand Down
2 changes: 1 addition & 1 deletion topologytest/sync_gateway_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (p *SyncGatewayPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID
// Only assert on CV since RevTreeID might not be present if this was a Couchbase Server write
bodyBytes, err := doc.BodyBytes(ctx)
assert.NoError(c, err)
assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes))
assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes))
}, totalWaitTime, pollInterval)
return doc.Body(ctx)
}
Expand Down
45 changes: 23 additions & 22 deletions topologytest/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,32 @@ package topologytest

import (
"fmt"
"testing"

"github.com/couchbase/sync_gateway/db"
"github.com/couchbase/sync_gateway/rest"
"github.com/stretchr/testify/require"
)

// DocMetadata is a struct that contains metadata about a document. It contains the relevant information for testing versions of documents, as well as debugging information.
type DocMetadata struct {
DocID string // DocID is the document ID
RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present
HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present
Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present
Cas uint64 // Cas is the cas value of the document
ImplicitCV *db.Version // ImplicitCV is the version of the document, if there was no HLV
DocID string // DocID is the document ID
RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present
HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present
Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present
Cas uint64 // Cas is the cas value of the document
ImplicitHLV *db.HybridLogicalVector // ImplicitHLV is the version of the document, if there was no HLV
}

// CV returns the current version of the document.
func (v DocMetadata) CV() db.Version {
if v.HLV == nil {
// If there is no HLV, then the version is implicit from the current ver@sourceID
if v.ImplicitCV == nil {
return db.Version{}
}
return *v.ImplicitCV
}
return db.Version{
SourceID: v.HLV.SourceID,
Value: v.HLV.Version,
func (v DocMetadata) CV(t require.TestingT) db.Version {
if v.ImplicitHLV != nil {
return *v.ImplicitHLV.ExtractCurrentVersionFromHLV()
} else if v.HLV != nil {
return *v.HLV.ExtractCurrentVersionFromHLV()
}
require.FailNow(t, "no hlv available %#v", v)
return db.Version{}
}

// DocMetadataFromDocument returns a DocVersion from the given document.
Expand All @@ -52,14 +50,17 @@ func DocMetadataFromDocument(doc *db.Document) DocMetadata {
}

func (v DocMetadata) GoString() string {
return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitCV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitCV)
return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV)
}

// DocMetadataFromDocVersion returns metadata DocVersion from the given document and version.
func DocMetadataFromDocVersion(docID string, version rest.DocVersion) DocMetadata {
func DocMetadataFromDocVersion(t testing.TB, docID string, version rest.DocVersion) DocMetadata {
// FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing
hlv := db.NewHybridLogicalVector()
require.NoError(t, hlv.AddVersion(version.CV))
return DocMetadata{
DocID: docID,
RevTreeID: version.RevTreeID,
ImplicitCV: &version.CV,
DocID: docID,
RevTreeID: version.RevTreeID,
ImplicitHLV: hlv,
}
}
Loading