Skip to content

Commit

Permalink
Implement Delete on the BlipTesterClient and enable delete-related to…
Browse files Browse the repository at this point in the history
…pology tests
  • Loading branch information
bbrks committed Dec 19, 2024
1 parent d3a48c3 commit abdb9a4
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 57 deletions.
6 changes: 3 additions & 3 deletions rest/blip_api_attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) {
bodyText := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}`
rev := NewDocVersionFromFakeRev("2-abc")
// FIXME CBG-4400: docID: doc1 was not found on the client - expecting to update doc based on parentVersion RevID: 2-abc
err := btcRunner.StoreRevOnClient(btc.id, docID, &rev, []byte(bodyText))
_, err := btcRunner.AddRev(btc.id, docID, &rev, []byte(bodyText))
require.NoError(t, err)

bodyText = `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":2,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestBlipLegacyAttachNameChange(t *testing.T) {
docVersion := client1.GetDocVersion(docID)

// Store the document and attachment on the test client
err := btcRunner.StoreRevOnClient(client1.id, docID, &docVersion, rawDoc)
_, err := btcRunner.AddRev(client1.id, docID, &docVersion, rawDoc)
// FIXME CBG-4400: docID: doc was not found on the client - expecting to update doc based on parentVersion RevID: 1-5fc93bd36377008f96fdae2719c174ed
require.NoError(t, err)

Expand Down Expand Up @@ -672,7 +672,7 @@ func TestBlipLegacyAttachDocUpdate(t *testing.T) {

// Store the document and attachment on the test client
// FIXME CBG-4400: docID: doc was not found on the client - expecting to update doc based on parentVersion RevID: 1-5fc93bd36377008f96fdae2719c174ed
err := btcRunner.StoreRevOnClient(client1.id, docID, &version, rawDoc)
_, err := btcRunner.AddRev(client1.id, docID, &version, rawDoc)
require.NoError(t, err)
btcRunner.AttachmentsLock(client1.id).Lock()
btcRunner.Attachments(client1.id)[digest] = attBody
Expand Down
60 changes: 49 additions & 11 deletions rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,32 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *DocVersio
return latestRev.body, &latestRev.version
}

// IsTombstoned returns true if the latest version of the doc is a tombstone.
func (btcc *BlipTesterCollectionClient) IsTombstoned(docID string) (bool, error) {
doc, exists := btcc.getClientDoc(docID)
if !exists {
return false, base.ErrNotFound
}
rev, err := doc.latestRev()
if err != nil {
return false, err
}
return rev.isDelete, nil
}

// IsVersionTombstone returns true if the given version is found and is a tombstone.
func (btcc *BlipTesterCollectionClient) IsVersionTombstone(docID string, version DocVersion) (bool, error) {
doc, exists := btcc.getClientDoc(docID)
if !exists {
return false, base.ErrNotFound
}
rev, err := doc.getRev(version)
if err != nil {
return false, err
}
return rev.isDelete, nil
}

// getClientDoc returns the clientDoc for the given docID, if it exists.
func (btcc *BlipTesterCollectionClient) getClientDoc(docID string) (*clientDoc, bool) {
btcc.seqLock.RLock()
Expand Down Expand Up @@ -1175,6 +1201,7 @@ type proposeChangeBatchEntry struct {
revTreeIDHistory []string
hlvHistory db.HybridLogicalVector
latestServerVersion DocVersion
isDelete bool
}

func (e proposeChangeBatchEntry) historyStr() string {
Expand All @@ -1200,7 +1227,7 @@ func proposeChangesEntryForDoc(doc *clientDoc) proposeChangeBatchEntry {
}
revisionHistory = append(revisionHistory, doc._revisionsBySeq[seq].version.RevTreeID)
}
return proposeChangeBatchEntry{docID: doc.id, version: latestRev.version, revTreeIDHistory: revisionHistory, hlvHistory: latestRev.HLV, latestServerVersion: doc._latestServerVersion}
return proposeChangeBatchEntry{docID: doc.id, version: latestRev.version, revTreeIDHistory: revisionHistory, hlvHistory: latestRev.HLV, latestServerVersion: doc._latestServerVersion, isDelete: latestRev.isDelete}
}

// StartPull will begin a push replication with the given options between the client and server
Expand Down Expand Up @@ -1318,6 +1345,12 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt
docBody := doc._revisionsBySeq[doc._seqsByVersions[change.version]].body
doc.lock.RUnlock()

if change.isDelete {
revRequest.Properties[db.RevMessageDeleted] = "1"
// SG doesn't like nil bodies - transform the tombstone into an empty body
docBody = []byte(base.EmptyDocument)
}

if serverDeltas && btcc.parent.ClientDeltas && ok && !serverRev.isDelete {
base.DebugfCtx(ctx, base.KeySGTest, "specifying last known server version as deltaSrc for doc %s = %v", change.docID, change.latestServerVersion)
var deltaSrc string
Expand Down Expand Up @@ -1511,6 +1544,7 @@ func (btr *BlipTesterReplicator) sendMsg(msg *blip.Message) (err error) {
}

// upsertDoc will create or update the doc based on whether parentVersion is passed or not. Enforces MVCC update.
// body can be nil and the update will be treated as a tombstone/delete.
func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *DocVersion, body []byte) (*clientDocRev, error) {
btc.seqLock.Lock()
defer btc.seqLock.Unlock()
Expand Down Expand Up @@ -1574,7 +1608,7 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do

btc._seqLast++
newSeq := btc._seqLast
rev := clientDocRev{clientSeq: newSeq, version: docVersion, body: body, HLV: hlv}
rev := clientDocRev{clientSeq: newSeq, version: docVersion, body: body, HLV: hlv, isDelete: body == nil}
doc.addNewRev(rev)

btc._seqStore[newSeq] = doc
Expand All @@ -1587,6 +1621,15 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do
return &rev, nil
}

// Delete creates a tombstone for the document.
func (btc *BlipTesterCollectionClient) Delete(docID string, parentVersion *DocVersion) (DocVersion, error) {
newRev, err := btc.upsertDoc(docID, parentVersion, nil)
if err != nil {
return DocVersion{}, err
}
return newRev.version, nil
}

// AddRev creates a revision on the client.
// The rev ID is always: "N-abc", where N is rev generation for predictability.
func (btc *BlipTesterCollectionClient) AddRev(docID string, parentVersion *DocVersion, body []byte) (DocVersion, error) { // Inline attachment processing
Expand Down Expand Up @@ -1743,11 +1786,6 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe
return &newRev.version, nil
}

func (btc *BlipTesterCollectionClient) StoreRevOnClient(docID string, parentVersion *DocVersion, body []byte) error {
_, err := btc.upsertDoc(docID, parentVersion, body)
return err
}

func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte, revGen int) (outputBody []byte, err error) {
if bytes.Contains(inputBody, []byte(db.BodyAttachments)) {
var newDocJSON map[string]interface{}
Expand Down Expand Up @@ -1999,6 +2037,10 @@ func (btcRunner *BlipTestClientRunner) AddRev(clientID uint32, docID string, ver
return btcRunner.SingleCollection(clientID).AddRev(docID, version, body)
}

func (btcRunner *BlipTestClientRunner) Delete(clientID uint32, docID string, version *DocVersion) (DocVersion, error) {
return btcRunner.SingleCollection(clientID).Delete(docID, version)
}

func (btcRunner *BlipTestClientRunner) PushUnsolicitedRev(clientID uint32, docID string, parentVersion *DocVersion, body []byte) (*DocVersion, error) {
return btcRunner.SingleCollection(clientID).PushUnsolicitedRev(docID, parentVersion, body)
}
Expand All @@ -2015,10 +2057,6 @@ func (btcRunner *BlipTestClientRunner) saveAttachment(clientID uint32, contentTy
return btcRunner.SingleCollection(clientID).saveAttachment(contentType, attachmentData)
}

func (btcRunner *BlipTestClientRunner) StoreRevOnClient(clientID uint32, docID string, parentVersion *DocVersion, body []byte) error {
return btcRunner.SingleCollection(clientID).StoreRevOnClient(docID, parentVersion, body)
}

func (btcRunner *BlipTestClientRunner) PushRevWithHistory(clientID uint32, docID string, parentVersion *DocVersion, body []byte, revCount, prunedRevCount int) (*DocVersion, error) {
return btcRunner.SingleCollection(clientID).PushRevWithHistory(docID, parentVersion, body, revCount, prunedRevCount)
}
Expand Down
46 changes: 36 additions & 10 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package topologytest

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -113,8 +114,17 @@ func (p *CouchbaseLiteMockPeer) WriteDocument(dsName sgbucket.DataStoreName, doc
}

// DeleteDocument deletes a document on the peer. The test will fail if the document does not exist.
func (p *CouchbaseLiteMockPeer) DeleteDocument(sgbucket.DataStoreName, string) DocMetadata {
return DocMetadata{}
func (p *CouchbaseLiteMockPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) DocMetadata {
p.TB().Logf("%s: Deleting document %s", p, docID)
client := p.getSingleSGBlipClient().CollectionClient(dsName)
_, parentMeta := p.getLatestDocVersion(dsName, docID)
parentVersion := rest.EmptyDocVersion()
if parentMeta != nil {
parentVersion = &db.DocVersion{CV: parentMeta.CV(p.TB())}
}
docVersion, err := client.Delete(docID, parentVersion)
require.NoError(p.TB(), err)
return DocMetadataFromDocVersion(p.TB(), docID, docVersion)
}

// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
Expand All @@ -135,20 +145,36 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName,
}

// WaitForDeletion waits for a document to be deleted. This document must be a tombstone. The test will fail if the document still exists after 20s.
func (p *CouchbaseLiteMockPeer) WaitForDeletion(_ sgbucket.DataStoreName, _ string) {
require.Fail(p.TB(), "WaitForDeletion not yet implemented CBG-4257")
func (p *CouchbaseLiteMockPeer) WaitForDeletion(dsName sgbucket.DataStoreName, docID string) {
client := p.getSingleSGBlipClient().CollectionClient(dsName)
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
isTombstone, err := client.IsTombstoned(docID)
require.NoError(c, err)
require.True(c, isTombstone, "expected docID %s on peer %s to be deleted", docID, p)
}, totalWaitTime, pollInterval)
}

// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(_ sgbucket.DataStoreName, _ string, _ DocMetadata) {
require.Fail(p.TB(), "WaitForTombstoneVersion not yet implemented CBG-4257")
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata) {
client := p.getSingleSGBlipClient().CollectionClient(dsName)
expectedVersion := db.DocVersion{CV: expected.CV(p.TB())}
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
isTombstone, err := client.IsVersionTombstone(docID, expectedVersion)
require.NoError(c, err)
require.True(c, isTombstone, "expected docID %s on peer %s to be deleted", docID, p)
}, totalWaitTime, pollInterval)
}

// RequireDocNotFound asserts that a document does not exist on the peer.
func (p *CouchbaseLiteMockPeer) RequireDocNotFound(sgbucket.DataStoreName, string) {
// not implemented yet in blip client tester
// _, err := p.btcRunner.GetDoc(p.btc.id, docID)
// base.RequireDocNotFoundError(p.btcRunner.TB(), err)
func (p *CouchbaseLiteMockPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) {
client := p.getSingleSGBlipClient().CollectionClient(dsName)
isTombstone, err := client.IsTombstoned(docID)
if err == nil {
require.True(p.TB(), isTombstone, "expected docID %s on peer %s to be deleted or not exist", docID, p)
}
if !errors.Is(err, base.ErrNotFound) {
require.NoError(p.TB(), err)
}
}

// Close will shut down the peer and close any active replications on the peer.
Expand Down
6 changes: 1 addition & 5 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, pee
}

func waitForDeletion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string, deleteActor string) {
for peerName, peer := range peers {
if peer.Type() == PeerTypeCouchbaseLite {
t.Logf("skipping deletion check for Couchbase Lite peer %s, CBG-4432", peerName)
continue
}
for _, peer := range peers {
t.Logf("waiting for doc to be deleted on %s, written from %s", peer, deleteActor)
peer.WaitForDeletion(dsName, docID)
}
Expand Down
9 changes: 0 additions & 9 deletions topologytest/multi_actor_no_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ func TestMultiActorDelete(t *testing.T) {

for createPeerName, createPeer := range peers.ActivePeers() {
for deletePeerName, deletePeer := range peers {
// CBG-4432: implement delete document in blip tester
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}

docID := getDocID(t) + "_create=" + createPeerName + ",update=" + deletePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, topology.description))
createVersion := createPeer.CreateDocument(collectionName, docID, body1)
Expand Down Expand Up @@ -93,10 +88,6 @@ func TestMultiActorResurrect(t *testing.T) {

for createPeerName, createPeer := range peers.ActivePeers() {
for deletePeerName, deletePeer := range peers {
// CBG-4432: implement delete document in blip tester
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}
for resurrectPeerName, resurrectPeer := range peers {
docID := getDocID(t) + "_create=" + createPeerName + ",delete=" + deletePeerName + ",resurrect=" + resurrectPeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "resurrectPeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, resurrectPeer, topology.description))
Expand Down
18 changes: 6 additions & 12 deletions topologytest/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,40 +411,34 @@ func TestPeerImplementation(t *testing.T) {
require.Equal(t, updateVersion.docMeta, roundtripGetVersion)
require.JSONEq(t, string(updateBody), string(base.MustJSONMarshal(t, roundtripGetbody)))

if peer.Type() == PeerTypeCouchbaseLite {
// CBG-4432 Couchbase Lite peer does not support deletion yet
return
}

// Delete
deleteVersion := peer.DeleteDocument(collectionName, docID)
require.NotEmpty(t, deleteVersion.CV(t))
require.NotEqual(t, deleteVersion.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, deleteVersion.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, deleteVersion.RevTreeID)
} else {
if tc.peerOption.Type == PeerTypeSyncGateway {
require.NotEmpty(t, deleteVersion.RevTreeID)
require.NotEqual(t, deleteVersion.RevTreeID, createVersion.docMeta.RevTreeID)
require.NotEqual(t, deleteVersion.RevTreeID, updateVersion.docMeta.RevTreeID)
} else {
require.Empty(t, deleteVersion.RevTreeID)
}
peer.RequireDocNotFound(collectionName, docID)

// Resurrection

resurrectionBody := []byte(`{"op": "resurrection"}`)
resurrectionVersion := peer.WriteDocument(collectionName, docID, resurrectionBody)
require.NotEmpty(t, resurrectionVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), deleteVersion.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), updateVersion.docMeta.CV(t))
require.NotEqual(t, resurrectionVersion.docMeta.CV(t), createVersion.docMeta.CV(t))
if tc.peerOption.Type == PeerTypeCouchbaseServer {
require.Empty(t, resurrectionVersion.docMeta.RevTreeID)
} else {
if tc.peerOption.Type == PeerTypeSyncGateway {
require.NotEmpty(t, resurrectionVersion.docMeta.RevTreeID)
require.NotEqual(t, resurrectionVersion.docMeta.RevTreeID, createVersion.docMeta.RevTreeID)
require.NotEqual(t, resurrectionVersion.docMeta.RevTreeID, updateVersion.docMeta.RevTreeID)
require.NotEqual(t, resurrectionVersion.docMeta.RevTreeID, deleteVersion.RevTreeID)
} else {
require.Empty(t, resurrectionVersion.docMeta.RevTreeID)
}
peer.WaitForDocVersion(collectionName, docID, resurrectionVersion.docMeta)

Expand Down
6 changes: 0 additions & 6 deletions topologytest/single_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func TestSingleActorDelete(t *testing.T) {
for activePeerID, activePeer := range peers.ActivePeers() {
t.Run(fmt.Sprintf("actor=%s", activePeerID), func(t *testing.T) {
updatePeersT(t, peers)
if activePeer.Type() == PeerTypeCouchbaseLite {
t.Skip("Skipping Couchbase Lite test, does not know how to push a deletion yet CBG-4433")
}

docID := getDocID(t)
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, activePeerID, topology.description))
Expand Down Expand Up @@ -114,9 +111,6 @@ func TestSingleActorResurrect(t *testing.T) {
for activePeerID, activePeer := range peers.ActivePeers() {
t.Run(fmt.Sprintf("actor=%s", activePeerID), func(t *testing.T) {
updatePeersT(t, peers)
if activePeer.Type() == PeerTypeCouchbaseLite {
t.Skip("Skipping Couchbase Lite test, does not know how to push a deletion yet CBG-4433")
}

docID := getDocID(t)
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, activePeerID, topology.description))
Expand Down
1 change: 0 additions & 1 deletion topologytest/topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ var Topologies = []Topology{
"sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1},
"sg2": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID2, Symmetric: true},
"cbl1": {Type: PeerTypeCouchbaseLite},
// TODO: CBG-4270, push replication only exists empemerally
"cbl2": {Type: PeerTypeCouchbaseLite, Symmetric: true},
},
replications: []PeerReplicationDefinition{
Expand Down

0 comments on commit abdb9a4

Please sign in to comment.