Skip to content

Commit

Permalink
MB-61421: Prevent deletion of segments scheduled for copy (#2032) (#2046
Browse files Browse the repository at this point in the history
)

- Use a modified index reader, CopyReader, to mark segments in the
Scorch root for online copy/backup operations. This prevents their
deletion by the asynchronous cleanup routine during the copy/backup
process, thereby mitigating the race condition between the
merger/persistor and the copy/backup routine.

Backports: #2032

---------

Co-authored-by: Rahul Rampure <[email protected]>
  • Loading branch information
abhinavdangeti and CascadingRadium authored Jun 25, 2024
1 parent 33acfd6 commit fdaed7b
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x]
go-version: [1.16.x, 1.17.x, 1.18.x]
platform: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.13
require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/bits-and-blooms/bitset v1.2.0
github.com/blevesearch/bleve_index_api v1.0.2
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/blevesearch/go-porterstemmer v1.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/bleve_index_api v1.0.2 h1:rO736FwEPMVY1mGi7d4n7CgBB3+tB7uYN7QTjR+Ij+s=
github.com/blevesearch/bleve_index_api v1.0.2/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5 h1:AG1xQCQKv8dqODzsCc5v1bnxOumAcLeHDHKPiVTGhqE=
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121 h1:5uNzC0Mn/8aCGbSJA6T8ZCjrKW8MKsZKQYBDowmeV/g=
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121/go.mod h1:8z6udmXe8Ek8uuX4qOIWKb50vY/OQ1SG+XhL5FrcHOU=
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:kDy+zgJFJJoJYBvdfBSiZYBbdsUL0XcjHYWezpQBGPA=
Expand Down
6 changes: 3 additions & 3 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
}
}

var filenames []string
newSegmentPaths := make(map[uint64]string)
filenames := make([]string, 0, len(snapshot.segment))
newSegmentPaths := make(map[uint64]string, len(snapshot.segment))

// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
Expand Down Expand Up @@ -982,7 +982,7 @@ func (s *Scorch) removeOldZapFiles() error {
for _, finfo := range currFileInfos {
fname := finfo.Name()
if filepath.Ext(fname) == ".zap" {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] && (s.copyScheduled[fname] <= 0) {
err := os.Remove(s.path + string(os.PathSeparator) + fname)
if err != nil {
log.Printf("got err removing file: %s, err: %v", fname, err)
Expand Down
40 changes: 40 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -50,13 +51,20 @@ type Scorch struct {
unsafeBatch bool

rootLock sync.RWMutex

root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
persistedCallbacks []index.BatchCallback
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.

// keeps track of segments scheduled for online copy/backup operation. Each segment's filename maps to
// the count of copy schedules. Segments with non-zero counts are protected from removal by the cleanup
// operation. Counts decrement upon successful copy, allowing removal of segments with zero or absent counts.
// must be accessed within the rootLock as it is accessed by the asynchronous cleanup routine.
copyScheduled map[string]int

numSnapshotsToKeep int
closeCh chan struct{}
introductions chan *segmentIntroduction
Expand Down Expand Up @@ -110,6 +118,7 @@ func NewScorch(storeName string,
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
copyScheduled: map[string]int{},
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -715,3 +724,34 @@ func parseToInteger(i interface{}) (int, error) {
return 0, fmt.Errorf("expects int or float64 value")
}
}

// CopyReader returns a low-level accessor for index data, ensuring persisted segments
// remain on disk for backup, preventing race conditions with the persister/merger cleanup.
// Close the reader after backup to allow segment removal by the persister/merger.
func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Lock()
rv := s.root
if rv != nil {
rv.AddRef()
var fileName string
// schedule a backup for all the segments from the root. Note that the
// both the unpersisted and persisted segments are scheduled for backup.
// because during the backup, the unpersisted segments may get persisted and
// hence we need to protect both the unpersisted and persisted segments from removal
// by the cleanup routine during the online backup
for _, seg := range rv.segment {
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
rv.parent.copyScheduled[fileName]++
}
}
s.rootLock.Unlock()
return rv
}
23 changes: 23 additions & 0 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,3 +843,26 @@ func (i *IndexSnapshot) GetSpatialAnalyzerPlugin(typ string) (
}
return rv, nil
}

func (is *IndexSnapshot) CloseCopyReader() error {
// first unmark the segments that were marked for backup by this index snapshot
is.parent.rootLock.Lock()
for _, seg := range is.segment {
var fileName string
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
if is.parent.copyScheduled[fileName]--; is.parent.copyScheduled[fileName] <= 0 {
delete(is.parent.copyScheduled, fileName)
}
}
is.parent.rootLock.Unlock()
// close the index snapshot normally
return is.Close()
}
3 changes: 3 additions & 0 deletions index/upsidedown/store/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !darwin || !arm64
// +build !darwin !arm64

package boltdb

import (
Expand Down
21 changes: 11 additions & 10 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,22 +921,23 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) {
return ErrorIndexClosed
}

indexReader, err := i.i.Reader()
if err != nil {
return err
copyIndex, ok := i.i.(index.CopyIndex)
if !ok {
return fmt.Errorf("index implementation does not support copy reader")
}

copyReader := copyIndex.CopyReader()
if copyReader == nil {
return fmt.Errorf("index's copyReader is nil")
}

defer func() {
if cerr := indexReader.Close(); err == nil && cerr != nil {
if cerr := copyReader.CloseCopyReader(); err == nil && cerr != nil {
err = cerr
}
}()

irc, ok := indexReader.(IndexCopyable)
if !ok {
return fmt.Errorf("index implementation does not support copy")
}

err = irc.CopyTo(d)
err = copyReader.CopyTo(d)
if err != nil {
return fmt.Errorf("error copying index metadata: %v", err)
}
Expand Down

0 comments on commit fdaed7b

Please sign in to comment.