Skip to content

Commit

Permalink
CBG-4331: legacy rev handling for version 4 replication protocol (#7239)
Browse files Browse the repository at this point in the history
* CBG-4331: legacy rev handling for version 4 replication protocol

* tidy up + fix for test flake

* update comment

* updated to address comments

* fix incorrect redaction
  • Loading branch information
gregns1 authored Dec 13, 2024
1 parent 407a5e0 commit dcc98f1
Show file tree
Hide file tree
Showing 3 changed files with 474 additions and 17 deletions.
30 changes: 20 additions & 10 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
defer func() {
bh.replicationStats.HandleChangesTime.Add(time.Since(startTime).Nanoseconds())
}()
changesContainLegacyRevs := false // keep track if proposed changes have legacy revs for delta sync purposes
versionVectorProtocol := bh.useHLV()

for i, change := range changeList {
docID := change[0].(string)
Expand All @@ -832,9 +834,16 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
var status ProposedRevStatus
var currentRev string
if bh.useHLV() {

changeIsVector := false
if versionVectorProtocol {
// only check if rev is vector in VV replication mode
changeIsVector = strings.Contains(rev, "@")
}
if versionVectorProtocol && changeIsVector {
status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID)
} else {
changesContainLegacyRevs = true
status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID)
}
if status == ProposedRev_OK_IsNew {
Expand Down Expand Up @@ -866,8 +875,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
output.Write([]byte("]"))
response := rq.Response()
// Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() {
// Disable delta sync for protocol versions < 4 or changes batches that have legacy revs in them, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() && !changesContainLegacyRevs {
base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response")
response.Properties[ChangesResponseDeltas] = trueProperty
}
Expand All @@ -887,13 +896,13 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen
} else if base.IsFleeceDeltaError(err) {
// Something went wrong in the diffing library. We want to know about this!
base.WarnfCtx(ctx, "Falling back to full body replication. Error generating delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
} else if err == base.ErrDeltaSourceIsTombstone {
base.TracefCtx(ctx, base.KeySync, "Falling back to full body replication. Delta source %s is tombstone. Unable to generate delta to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
} else if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

if redactedRev != nil {
Expand All @@ -909,12 +918,12 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen

if revDelta == nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

resendFullRevisionFunc := func() error {
base.InfofCtx(ctx, base.KeySync, "Resending revision as full body. Peer couldn't process delta %s from %s to %s for key %s", base.UD(revDelta.DeltaBytes), deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

base.TracefCtx(ctx, base.KeySync, "docID: %s - delta: %v", base.UD(docID), base.UD(string(revDelta.DeltaBytes)))
Expand Down Expand Up @@ -1059,7 +1068,8 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
historyStr := rq.Properties[RevMessageHistory]
var incomingHLV *HybridLogicalVector
// Build history/HLV
if !bh.useHLV() {
changeIsVector := strings.Contains(rev, "@")
if !bh.useHLV() || !changeIsVector {
newDoc.RevID = rev
history = []string{rev}
if historyStr != "" {
Expand Down Expand Up @@ -1287,7 +1297,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// If the doc is a tombstone we want to allow conflicts when running SGR2
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
if bh.useHLV() {
if bh.useHLV() && changeIsVector {
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc)
} else if bh.conflictResolver != nil {
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
Expand Down
27 changes: 20 additions & 7 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,15 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if err != nil {
return err
}
versionVectorProtocol := bsc.useHLV()

for i, knownRevsArrayInterface := range answer {
seq := changeArray[i][0].(SequenceID)
docID := changeArray[i][1].(string)
rev := changeArray[i][2].(string)

if knownRevsArray, ok := knownRevsArrayInterface.([]interface{}); ok {
legacyRev := false
deltaSrcRevID := ""
knownRevs := knownRevsByDoc[docID]
if knownRevs == nil {
Expand All @@ -358,10 +360,10 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
// revtree clients. For HLV clients, use the cv as deltaSrc
if bsc.useDeltas && len(knownRevsArray) > 0 {
if revID, ok := knownRevsArray[0].(string); ok {
if bsc.useHLV() {
if versionVectorProtocol {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", docID)
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID))
deltaSrcRevID = "" // will force falling back to full body replication below
} else {
deltaSrcRevID = msgHLV.GetCurrentVersionString()
Expand All @@ -375,7 +377,12 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
for _, rev := range knownRevsArray {
if revID, ok := rev.(string); ok {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err == nil {
if err != nil {
// assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication
if versionVectorProtocol {
legacyRev = true
}
} else {
// extract cv as string
revID = msgHLV.GetCurrentVersionString()
}
Expand All @@ -394,7 +401,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if deltaSrcRevID != "" && bsc.useHLV() {
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
} else {
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx, legacyRev)
}
if err != nil {
return err
Expand Down Expand Up @@ -652,11 +659,11 @@ func (bsc *BlipSyncContext) sendNoRev(sender *blip.Sender, docID, revID string,
}

// Pushes a revision body to the client
func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int, legacyRev bool) error {

var originalErr error
var docRev DocumentRevision
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
if !bsc.useHLV() {
docRev, originalErr = handleChangesResponseCollection.GetRev(ctx, docID, revID, true, nil)
} else {
// extract CV string rev representation
Expand Down Expand Up @@ -743,13 +750,19 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende
bsc.replicationStats.SendReplacementRevCount.Add(1)
}
var history []string
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
if !bsc.useHLV() {
history = toHistory(docRev.History, knownRevs, maxHistory)
} else {
if docRev.hlvHistory != "" {
history = append(history, docRev.hlvHistory)
}
}
if legacyRev {
// append current revID and rest of rev tree after hlv history
revTreeHistory := toHistory(docRev.History, knownRevs, maxHistory)
history = append(history, docRev.RevID)
history = append(history, revTreeHistory...)
}

properties := blipRevMessageProperties(history, docRev.Deleted, seq, replacedRevID)
if base.LogDebugEnabled(ctx, base.KeySync) {
Expand Down
Loading

0 comments on commit dcc98f1

Please sign in to comment.