Skip to content

Commit

Permalink
Merge branch 'develop' into sync2/advance-on-timer
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed Dec 20, 2024
2 parents 84a9e25 + ef8ba69 commit ae9a9b7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 38 deletions.
10 changes: 10 additions & 0 deletions sync2/fptree/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import (

var ErrEasySplitFailed = errEasySplitFailed

func (ft *FPTree) FingerprintInternal(
x, y rangesync.KeyBytes,
limit int,
needNext bool,
) (fpr FPResult, err error) {
ft.np.lockRead()
defer ft.np.unlockRead()
return ft.fingerprintInterval(x, y, limit, needNext)
}

func (ft *FPTree) EasySplit(x, y rangesync.KeyBytes, limit int) (sr SplitResult, err error) {
return ft.easySplit(x, y, limit)
}
Expand Down
59 changes: 36 additions & 23 deletions sync2/fptree/fptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ type FPResult struct {

// SplitResult represents the result of a split operation.
type SplitResult struct {
// The two parts of the inteval
// The two parts of the interval
Part0, Part1 FPResult
// Moddle point value
// Middle point value
Middle rangesync.KeyBytes
}

Expand Down Expand Up @@ -104,15 +104,15 @@ func (ac *aggContext) prefixBelowY(p prefix) bool {

// fingerprintAtOrAfterX verifies that the specified fingerprint, which should be derived
// from a single key, is at or after x bound of the interval.
func (ac *aggContext) fingreprintAtOrAfterX(fp rangesync.Fingerprint) bool {
func (ac *aggContext) fingerprintAtOrAfterX(fp rangesync.Fingerprint) bool {
k := make(rangesync.KeyBytes, len(ac.x))
copy(k, fp[:])
return k.Compare(ac.x) >= 0
}

// fingerprintBelowY verifies that the specified fingerprint, which should be derived from a
// single key, is below y bound of the interval.
func (ac *aggContext) fingreprintBelowY(fp rangesync.Fingerprint) bool {
func (ac *aggContext) fingerprintBelowY(fp rangesync.Fingerprint) bool {
k := make(rangesync.KeyBytes, len(ac.x))
copy(k, fp[:])
k[:FingerprintSize].Inc() // 1 after max key derived from the fingerprint
Expand All @@ -128,7 +128,7 @@ func (ac *aggContext) nodeAtOrAfterX(idx nodeIndex, p prefix) bool {
if v != nil {
return v.Compare(ac.x) >= 0
}
return ac.fingreprintAtOrAfterX(fp)
return ac.fingerprintAtOrAfterX(fp)
}
return ac.prefixAtOrAfterX(p)
}
Expand All @@ -141,7 +141,7 @@ func (ac *aggContext) nodeBelowY(idx nodeIndex, p prefix) bool {
if v != nil {
return v.Compare(ac.y) < 0
}
return ac.fingreprintBelowY(fp)
return ac.fingerprintBelowY(fp)
}
return ac.prefixBelowY(p)
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (ac *aggContext) switchToSecondPart() {
}

// maybeIncludeNode returns tries to include the full contents of the specified node in
// the aggregation and returns if it succeeded, based on the remaining limit and the numer
// the aggregation and returns if it succeeded, based on the remaining limit and the number
// of items in the node.
// It also handles "easy split" happening at the node.
func (ac *aggContext) maybeIncludeNode(idx nodeIndex, p prefix) bool {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (ac *aggContext) maybeIncludeNode(idx nodeIndex, p prefix) bool {
// We're doing a split and this node is exactly at the limit, or it was
// above the limit but first part was still empty, so store count and
// fingerprint for the first part which includes the current node and zero
// out cound and figerprint for the second part
// out count and fingerprint for the second part
ac.switchToSecondPart()
}
return true
Expand Down Expand Up @@ -271,14 +271,14 @@ var _ sqlstore.IDStore = &FPTree{}

// NewFPTreeWithValues creates an FPTree which also stores the items themselves and does
// not make use of a backing IDStore.
// sizeHint specifies the approximage expected number of items.
// sizeHint specifies the approximate expected number of items.
// keyLen specifies the number of bytes in keys used.
func NewFPTreeWithValues(sizeHint, keyLen int) *FPTree {
return NewFPTree(sizeHint, nil, keyLen, 0)
}

// NewFPTree creates an FPTree of limited depth backed by an IDStore.
// sizeHint specifies the approximage expected number of items.
// sizeHint specifies the approximate expected number of items.
// keyLen specifies the number of bytes in keys used.
func NewFPTree(sizeHint int, idStore sqlstore.IDStore, keyLen, maxDepth int) *FPTree {
var np nodePool
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ft *FPTree) traverse(idx nodeIndex, yield func(rangesync.KeyBytes) bool) (
return ft.traverse(l, yield) && ft.traverse(r, yield)
}

// travereFrom traverses the subtree rooted in idx in order and calls the given function for
// traverseFrom traverses the subtree rooted in idx in order and calls the given function for
// each item starting from the given key.
func (ft *FPTree) traverseFrom(
idx nodeIndex,
Expand Down Expand Up @@ -905,7 +905,7 @@ func (ft *FPTree) aggregateRight(
}
}

// aggregateXX aggregtes intervals of form [x, x) which denotes the whole set.
// aggregateXX aggregates intervals of form [x, x) which denotes the whole set.
func (ft *FPTree) aggregateXX(ac *aggContext) (err error) {
// [x, x) interval which denotes the whole set unless
// the limit is specified, in which case we need to start aggregating
Expand Down Expand Up @@ -1081,14 +1081,18 @@ func (ft *FPTree) nextFromPrefix(ac *aggContext, p prefix) (rangesync.KeyBytes,
return id.Clone(), nil
}

// FingerprintInteval performs a range fingerprint query with specified bounds and limit.
// FingerprintInterval performs a range fingerprint query with specified bounds and limit.
func (ft *FPTree) FingerprintInterval(x, y rangesync.KeyBytes, limit int) (fpr FPResult, err error) {
ft.np.lockRead()
defer ft.np.unlockRead()
return ft.fingerprintInterval(x, y, limit)
return ft.fingerprintInterval(x, y, limit, false)
}

func (ft *FPTree) fingerprintInterval(x, y rangesync.KeyBytes, limit int) (fpr FPResult, err error) {
func (ft *FPTree) fingerprintInterval(
x, y rangesync.KeyBytes,
limit int,
needNext bool,
) (fpr FPResult, err error) {
ft.enter("fingerprintInterval: x %s y %s limit %d", x, y, limit)
defer func() {
ft.leave(fpr.FP, fpr.Count, fpr.IType, fpr.Items, fpr.Next, err)
Expand All @@ -1110,16 +1114,23 @@ func (ft *FPTree) fingerprintInterval(x, y rangesync.KeyBytes, limit int) (fpr F

if ac.items.Seq != nil {
ft.log("fingerprintInterval: items %v", ac.items)
fpr.Items = ac.items.Limit(int(ac.count))
fpr.Items = ac.items
} else {
fpr.Items = ft.from(x, 1).Limit(int(ac.count))
fpr.Items = ft.from(x, 1)
ft.log("fingerprintInterval: start from x: %v", fpr.Items)
}

if ac.next != nil {
switch {
case !needNext:
// The next item is only needed for splitting in case if easy split is not
// feasible, and it's better to avoid getting it as that may incur
// database access
fpr.Items = fpr.Items.Limit(int(fpr.Count))
return fpr, nil
case ac.next != nil:
ft.log("fingerprintInterval: next %s", ac.next)
fpr.Next = ac.next
} else if (fpr.IType == 0 && limit < 0) || fpr.Count == 0 {
case (fpr.IType == 0 && limit < 0) || fpr.Count == 0:
next, err := fpr.Items.First()
if err != nil {
return FPResult{}, err
Expand All @@ -1128,10 +1139,10 @@ func (ft *FPTree) fingerprintInterval(x, y rangesync.KeyBytes, limit int) (fpr F
fpr.Next = next.Clone()
}
ft.log("fingerprintInterval: next at start %s", fpr.Next)
} else if ac.lastPrefix != nil {
case ac.lastPrefix != nil:
fpr.Next, err = ft.nextFromPrefix(&ac, *ac.lastPrefix)
ft.log("fingerprintInterval: next at lastPrefix %s -> %s", *ac.lastPrefix, fpr.Next)
} else {
default:
next, err := ft.from(y, 1).First()
if err != nil {
return FPResult{}, err
Expand All @@ -1140,6 +1151,8 @@ func (ft *FPTree) fingerprintInterval(x, y rangesync.KeyBytes, limit int) (fpr F
ft.log("fingerprintInterval: next at y: %s", fpr.Next)
}

// We apply limit after we have retrieved the next item
fpr.Items = fpr.Items.Limit(int(fpr.Count))
return fpr, nil
}

Expand Down Expand Up @@ -1219,7 +1232,7 @@ func (ft *FPTree) Split(x, y rangesync.KeyBytes, limit int) (sr SplitResult, err
return SplitResult{}, err
}

fpr0, err := ft.fingerprintInterval(x, y, limit)
fpr0, err := ft.fingerprintInterval(x, y, limit, true)
if err != nil {
return SplitResult{}, err
}
Expand All @@ -1228,7 +1241,7 @@ func (ft *FPTree) Split(x, y rangesync.KeyBytes, limit int) (sr SplitResult, err
return SplitResult{}, errors.New("can't split empty range")
}

fpr1, err := ft.fingerprintInterval(fpr0.Next, y, -1)
fpr1, err := ft.fingerprintInterval(fpr0.Next, y, -1, false)
if err != nil {
return SplitResult{}, err
}
Expand Down
38 changes: 23 additions & 15 deletions sync2/fptree/fptree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func testFPTree(t *testing.T, makeFPTrees mkFPTreesFunc) {
count: 0,
itype: 0,
startIdx: -1,
endIdx: -1,
endIdx: 0,
},
{
xIdx: 0,
Expand Down Expand Up @@ -194,7 +194,7 @@ func testFPTree(t *testing.T, makeFPTrees mkFPTreesFunc) {
count: 0,
itype: -1,
startIdx: -1,
endIdx: -1,
endIdx: 0,
},
{
xIdx: 1,
Expand Down Expand Up @@ -234,7 +234,7 @@ func testFPTree(t *testing.T, makeFPTrees mkFPTreesFunc) {
count: 0,
itype: 1,
startIdx: -1,
endIdx: -1,
endIdx: 2,
},
{
xIdx: 3,
Expand Down Expand Up @@ -274,7 +274,7 @@ func testFPTree(t *testing.T, makeFPTrees mkFPTreesFunc) {
count: 0,
itype: -1,
startIdx: -1,
endIdx: -1,
endIdx: 0,
},
},
},
Expand Down Expand Up @@ -581,7 +581,7 @@ func testFPTree(t *testing.T, makeFPTrees mkFPTreesFunc) {
name = fmt.Sprintf("%d-%d_%d", rtc.xIdx, rtc.yIdx, rtc.limit)
}
t.Run(name, func(t *testing.T) {
fpr, err := ft.FingerprintInterval(x, y, rtc.limit)
fpr, err := ft.FingerprintInternal(x, y, rtc.limit, true)
require.NoError(t, err)
assert.Equal(t, rtc.fp, fpr.FP.String(), "fp")
assert.Equal(t, rtc.count, fpr.Count, "count")
Expand Down Expand Up @@ -963,25 +963,29 @@ func dumbFP(hs hashList, x, y rangesync.KeyBytes, limit int) fpResultWithBounds
case -1:
p := hs.findGTE(x)
pY := hs.findGTE(y)
fpr.start = hs.keyAt(p)
for {
if p >= pY || limit == 0 {
fpr.next = hs.keyAt(p)
break
}
if fpr.start == nil {
fpr.start = hs.keyAt(p)
}
fpr.fp.Update(hs.keyAt(p))
limit--
fpr.count++
p++
}
case 1:
p := hs.findGTE(x)
fpr.start = hs.keyAt(p)
for {
if p >= len(hs) || limit == 0 {
fpr.next = hs.keyAt(p)
break
}
if fpr.start == nil {
fpr.start = hs.keyAt(p)
}
fpr.fp.Update(hs.keyAt(p))
limit--
fpr.count++
Expand All @@ -997,6 +1001,9 @@ func dumbFP(hs hashList, x, y rangesync.KeyBytes, limit int) fpResultWithBounds
fpr.next = hs.keyAt(p)
break
}
if fpr.start == nil {
fpr.start = hs.keyAt(p)
}
fpr.fp.Update(hs.keyAt(p))
limit--
fpr.count++
Expand All @@ -1005,13 +1012,15 @@ func dumbFP(hs hashList, x, y rangesync.KeyBytes, limit int) fpResultWithBounds
default:
pX := hs.findGTE(x)
p := pX
fpr.start = hs.keyAt(p)
fpr.next = fpr.start
fpr.next = hs.keyAt(p)
for {
if limit == 0 {
fpr.next = hs.keyAt(p)
break
}
if fpr.start == nil {
fpr.start = hs.keyAt(p)
}
fpr.fp.Update(hs.keyAt(p))
limit--
fpr.count++
Expand All @@ -1026,14 +1035,15 @@ func dumbFP(hs hashList, x, y rangesync.KeyBytes, limit int) fpResultWithBounds

func verifyInterval(t *testing.T, hs hashList, ft *fptree.FPTree, x, y rangesync.KeyBytes, limit int) fptree.FPResult {
expFPR := dumbFP(hs, x, y, limit)
fpr, err := ft.FingerprintInterval(x, y, limit)
fpr, err := ft.FingerprintInternal(x, y, limit, true)
require.NoError(t, err)
require.Equal(t, expFPR, toFPResultWithBounds(t, fpr),
"x=%s y=%s limit=%d", x.String(), y.String(), limit)

require.Equal(t, expFPR, toFPResultWithBounds(t, fpr),
fprNoNext, err := ft.FingerprintInterval(x, y, limit)
require.NoError(t, err)
expFPR.next = nil
require.Equal(t, expFPR, toFPResultWithBounds(t, fprNoNext),
"x=%s y=%s limit=%d", x.String(), y.String(), limit)

return fpr
}

Expand Down Expand Up @@ -1154,8 +1164,6 @@ func verifyEasySplit(
}
a := firstKey(t, fpr.Items)
require.NoError(t, err)
b := fpr.Next
require.NotNil(t, b)

m := fpr.Count / 2
sr, err := ft.EasySplit(x, y, int(m))
Expand Down

0 comments on commit ae9a9b7

Please sign in to comment.