Skip to content

Commit

Permalink
CBG-4410 restructure multi actor non conflict tests
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Dec 13, 2024
1 parent ddc841e commit 2bcaafb
Showing 1 changed file with 69 additions and 81 deletions.
150 changes: 69 additions & 81 deletions topologytest/multi_actor_no_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,122 +10,110 @@ package topologytest

import (
"fmt"
"strings"
"testing"
)

func TestMultiActorCreate(t *testing.T) {
for _, topology := range append(simpleTopologies, Topologies...) {
t.Run(topology.description, func(t *testing.T) {
collectionName, peers, _ := setupTests(t, topology)
docVersionList := make(map[string]BodyAndVersion)
for activePeerName, activePeer := range peers {
docID := getDocID(t) + "_" + activePeerName
docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, activePeerName, topology.description))
docVersion := activePeer.CreateDocument(collectionName, docID, docBody)
docVersionList[activePeerName] = docVersion
}
for peerName := range peers {
docID := getDocID(t) + "_" + peerName
docBodyAndVersion := docVersionList[peerName]
waitForVersionAndBody(t, collectionName, peers, docID, docBodyAndVersion)
}
})
}
}
"github.com/couchbase/sync_gateway/base"
)

// TestMultiActorUpdate tests that a single actor can update a document that was created on a different peer.
// 1. start replications
// 2. create documents on each peer, to be updated by each other peer
// 3. wait for all documents to be replicated
// 4. update each document on a single peer, documents exist in pairwise create peer and update peer
// 5. wait for the hlv for updated documents to synchronized
func TestMultiActorUpdate(t *testing.T) {
for _, topology := range append(simpleTopologies, Topologies...) {
t.Run(topology.description, func(t *testing.T) {
if strings.Contains(topology.description, "CBL") {
t.Skip("Skipping Couchbase Lite test, returns unexpected body in proposeChanges: [304], CBG-4257")
}
collectionName, peers, _ := setupTests(t, topology)

docVersionList := make(map[string]BodyAndVersion)
for activePeerName, activePeer := range peers {
docID := getDocID(t) + "_" + activePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, activePeerName, topology.description))
createVersion := activePeer.CreateDocument(collectionName, docID, body1)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

newBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, activePeerName, topology.description))
updateVersion := activePeer.WriteDocument(collectionName, docID, newBody)
// store update version along with doc body and the current peer the update came in on
docVersionList[activePeerName] = updateVersion
}
// loop through peers again and assert all peers have updates
for peerName := range peers.SortedPeers() {
docID := getDocID(t) + "_" + peerName
docBodyAndVersion := docVersionList[peerName]
waitForVersionAndBody(t, collectionName, peers, docID, docBodyAndVersion)
}
for createPeerName, createPeer := range peers {
for updatePeerName, updatePeer := range peers {
if updatePeer.Type() == PeerTypeCouchbaseLite {
continue
}

docID := getDocID(t) + "_create=" + createPeerName + ",update=" + updatePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "updatePeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, updatePeer, topology.description))
createVersion := createPeer.CreateDocument(collectionName, docID, body1)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

newBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "updatePeer": "%s", "topology": "%s", "action": "update"}`, updatePeerName, createPeerName, updatePeerName, topology.description))
updateVersion := updatePeer.WriteDocument(collectionName, docID, newBody)

waitForVersionAndBody(t, collectionName, peers, docID, updateVersion)
}
}
})
}
}

// TestMultiActorDelete tests that a single actor can update a document that was created on a different peer.
// 1. start replications
// 2. create documents on each peer, to be updated by each other peer
// 3. wait for all documents to be replicated
// 4. delete each document on a single peer, documents exist in pairwise create peer and update peer
// 5. wait for the hlv for updated documents to synchronized
func TestMultiActorDelete(t *testing.T) {
for _, topology := range append(simpleTopologies, Topologies...) {
t.Run(topology.description, func(t *testing.T) {
if strings.Contains(topology.description, "CBL") {
t.Skip("Skipping Couchbase Lite test, does not know how to push a deletion yet CBG-4257")
}
collectionName, peers, _ := setupTests(t, topology)

for peerName := range peers {
docID := getDocID(t) + "_" + peerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, docID, topology.description))
createVersion := peers[peerName].CreateDocument(collectionName, docID, body1)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

for createPeerName, createPeer := range peers {
for deletePeerName, deletePeer := range peers {
if deletePeerName == peerName {
// continue till we find peer that write didn't originate from
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}

docID := getDocID(t) + "_create=" + createPeerName + ",update=" + deletePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, topology.description))
createVersion := createPeer.CreateDocument(collectionName, docID, body1)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

deleteVersion := deletePeer.DeleteDocument(collectionName, docID)
t.Logf("createVersion: %+v, deleteVersion: %+v", createVersion, deleteVersion)
t.Logf("waiting for document %s deletion on all peers", docID)
t.Logf("deleteVersion: %+v\n", deleteVersion) // FIXME: verify hlv in CBG-4416
waitForDeletion(t, collectionName, peers, docID, deletePeerName)
break
}
}
})
}
}

// TestMultiActorResurrect tests that a single actor can update a document that was created on a different peer.
// 1. start replications
// 2. create documents on each peer, to be updated by each other peer
// 3. wait for all documents to be replicated
// 4. delete each document on a single peer, documents exist in pairwise create peer and update peer
// 5. wait for the hlv for updated documents to synchronized
// 6. resurrect each document on a single peer
// 7. wait for the hlv for updated documents to be synchronized
func TestMultiActorResurrect(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("CBG-4419: this test fails xdcr with: could not write doc: cas mismatch: expected 0, really 1 -- xdcr.(*rosmarManager).processEvent() at rosmar_xdcr.go:201")
}
for _, topology := range append(simpleTopologies, Topologies...) {
t.Run(topology.description, func(t *testing.T) {
if strings.Contains(topology.description, "CBL") {
t.Skip("Skipping Couchbase Lite test, does not know how to push a deletion yet CBG-4257")
}

collectionName, peers, _ := setupTests(t, topology)

docVersionList := make(map[string]BodyAndVersion)
for activePeerName, activePeer := range peers {
docID := getDocID(t) + "_" + activePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, activePeerName, topology.description))
createVersion := activePeer.CreateDocument(collectionName, docID, body1)
t.Logf("createVersion: %+v for docID: %s", createVersion, docID)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

deleteVersion := activePeer.DeleteDocument(collectionName, docID)
t.Logf("createVersion: %+v, deleteVersion: %+v", createVersion, deleteVersion)
t.Logf("waiting for document %s deletion on all peers", docID)
waitForDeletion(t, collectionName, peers, docID, activePeerName)
// recreate doc and assert it arrives at all peers
resBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "resurrect"}`, activePeerName, topology.description))
updateVersion := activePeer.WriteDocument(collectionName, docID, resBody)
docVersionList[activePeerName] = updateVersion
}
for createPeerName, createPeer := range peers {
for deletePeerName, deletePeer := range peers {
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}
for resurrectPeerName, resurrectPeer := range peers {
docID := getDocID(t) + "_create=" + createPeerName + ",delete=" + deletePeerName + ",resurrect=" + resurrectPeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "resurrectPeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, resurrectPeer, topology.description))
createVersion := createPeer.CreateDocument(collectionName, docID, body1)
waitForVersionAndBody(t, collectionName, peers, docID, createVersion)

deleteVersion := deletePeer.DeleteDocument(collectionName, docID)
t.Logf("deleteVersion: %+v\n", deleteVersion) // FIXME: verify hlv in CBG-4416
waitForDeletion(t, collectionName, peers, docID, deletePeerName)

for updatePeerName := range peers {
docID := getDocID(t) + "_" + updatePeerName
docVersion := docVersionList[updatePeerName]
waitForVersionAndBody(t, collectionName, peers, docID, docVersion)
resBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "resurrectPeer": "%s", "topology": "%s", "action": "resurrect"}`, resurrectPeerName, createPeerName, deletePeer, resurrectPeer, topology.description))
resurrectVersion := resurrectPeer.WriteDocument(collectionName, docID, resBody)
waitForVersionAndBody(t, collectionName, peers, docID, resurrectVersion)
}
}
}
})
}
Expand Down

0 comments on commit 2bcaafb

Please sign in to comment.