Skip to content

Commit

Permalink
Add merge feature (#222)
Browse files Browse the repository at this point in the history
add merge operation of rosedb

the caller can use `Merge` function to reclaim the disk files of db.
  • Loading branch information
roseduan authored Jun 21, 2023
1 parent e2fb20b commit 3da5d07
Show file tree
Hide file tree
Showing 8 changed files with 648 additions and 18 deletions.
64 changes: 51 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
)

const (
fileLockName = "FLOCK"
fileLockName = "FLOCK"
dataFileNameSuffix = ".SEG"
hintFileNameSuffix = ".HINT"
mergeFinNameSuffix = ".MERGEFIN"
)

// DB represents a ROSEDB database instance.
Expand All @@ -33,12 +36,14 @@ const (
//
// So if your memory can almost hold all the keys, ROSEDB is the perfect stroage engine for you.
type DB struct {
dataFiles *wal.WAL // data files are a sets of segment files in WAL.
index index.Indexer
options Options
fileLock *flock.Flock
mu sync.RWMutex
closed bool
dataFiles *wal.WAL // data files are a sets of segment files in WAL.
hintFile *wal.WAL // hint file is used to store the key and the position for fast startup.
index index.Indexer
options Options
fileLock *flock.Flock
mu sync.RWMutex
closed bool
mergeRunning uint32 // indicate if the database is merging
}

// Stat represents the statistics of the database.
Expand All @@ -64,7 +69,7 @@ func Open(options Options) (*DB, error) {
}

// create data directory if not exist
if _, err := os.Stat(options.DirPath); os.IsNotExist(err) {
if _, err := os.Stat(options.DirPath); err != nil {
if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {
return nil, err
}
Expand All @@ -80,13 +85,19 @@ func Open(options Options) (*DB, error) {
return nil, ErrDatabaseIsUsing
}

// load merge files if exists
if err = loadMergeFiles(options.DirPath); err != nil {
return nil, err
}

// open data files from WAL
walFiles, err := wal.Open(wal.Options{
DirPath: options.DirPath,
SegmentSize: options.SegmentSize,
BlockCache: options.BlockCache,
Sync: options.Sync,
BytesPerSync: options.BytesPerSync,
DirPath: options.DirPath,
SegmentSize: options.SegmentSize,
SementFileExt: dataFileNameSuffix,
BlockCache: options.BlockCache,
Sync: options.Sync,
BytesPerSync: options.BytesPerSync,
})
if err != nil {
return nil, err
Expand All @@ -100,6 +111,11 @@ func Open(options Options) (*DB, error) {
fileLock: fileLock,
}

// load index frm hint file
if err = db.loadIndexFromHintFile(); err != nil {
return nil, err
}

// load index from data files
if err = db.loadIndexFromWAL(); err != nil {
return nil, err
Expand All @@ -119,6 +135,12 @@ func (db *DB) Close() error {
if err := db.dataFiles.Close(); err != nil {
return err
}
// close hint file if exists
if db.hintFile != nil {
if err := db.hintFile.Close(); err != nil {
return err
}
}
// release file lock
if err := db.fileLock.Unlock(); err != nil {
return err
Expand Down Expand Up @@ -226,10 +248,21 @@ func checkOptions(options Options) error {
// It will iterate over all the WAL files and read data
// from them to rebuild the index.
func (db *DB) loadIndexFromWAL() error {
mergeFinSegmentId, err := getMergeFinSegmentId(db.options.DirPath)
if err != nil {
return err
}
indexRecords := make(map[uint64][]*IndexRecord)
// get a reader for WAL
reader := db.dataFiles.NewReader()
for {
// if the current segment id is less than the mergeFinSegmentId,
// we can skip this segment because it has been merged,
// and we can load index from the hint file directly.
if reader.CurrentSegmentId() <= mergeFinSegmentId {
reader.SkipCurrentSegment()
}

chunk, position, err := reader.Next()
if err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -257,6 +290,11 @@ func (db *DB) loadIndexFromWAL() error {
}
// delete indexRecords according to batchId after indexing
delete(indexRecords, uint64(batchId))
} else if record.Type == LogRecordNormal && record.BatchId == mergeFinishedBatchID {
// if the record is a normal record and the batch id is 0,
// it means that the record is involved in the merge operation.
// so put the record into index directly.
db.index.Put(record.Key, position)
} else {
// put the record into the temporary indexRecords
indexRecords[record.BatchId] = append(indexRecords[record.BatchId],
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ var (
ErrReadOnlyBatch = errors.New("the batch is read only")
ErrBatchCommitted = errors.New("the batch is committed")
ErrDBClosed = errors.New("the database is closed")
ErrMergeRunning = errors.New("the merge operation is running")
)
38 changes: 38 additions & 0 deletions examples/merge/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"github.com/rosedblabs/rosedb/v2"
"github.com/rosedblabs/rosedb/v2/utils"
)

// this file shows how to use the Merge feature of rosedb.
// Merge is used to merge the data files in the database.
// It is recommended to use it when the database is not busy.

func main() {
// specify the options
options := rosedb.DefaultOptions
options.DirPath = "/tmp/rosedb_merge"

// open a database
db, err := rosedb.Open(options)
if err != nil {
panic(err)
}
defer func() {
_ = db.Close()
}()

// write some data
for i := 0; i < 100000; i++ {
_ = db.Put([]byte(utils.GetTestKey(i)), utils.RandomValue(128))
}
// delete some data
for i := 0; i < 100000/2; i++ {
_ = db.Delete([]byte(utils.GetTestKey(i)))
}

// then merge the data files
// all the invalid data will be removed, and the valid data will be merged into the new data files.
_ = db.Merge()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90
github.com/rosedblabs/wal v1.0.0
github.com/rosedblabs/wal v1.0.1-0.20230618095314-83a5e84f0043
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90 h1:AeuxZLgpVnxCjR6mvRJiDQATgOeSme59HX/rWCBjjvw=
github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90/go.mod h1:Hk7adp95/ngEfetvapVWdgneuZb15mi9nH/keSH/KqI=
github.com/rosedblabs/wal v1.0.0 h1:XZqJJiu3nicO8zfOquMyug4LOwin0NatbV9lefZttAU=
github.com/rosedblabs/wal v1.0.0/go.mod h1:tYh0WapCkDQrID7PNsNHpsZDlkTczJVAFaTySmwaD7U=
github.com/rosedblabs/wal v1.0.1-0.20230618095314-83a5e84f0043 h1:Cl92HYZe5z7ktcouC+3rEgHszG++sMvkjIyZLgERIEQ=
github.com/rosedblabs/wal v1.0.1-0.20230618095314-83a5e84f0043/go.mod h1:tYh0WapCkDQrID7PNsNHpsZDlkTczJVAFaTySmwaD7U=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24 h1:6w3iSY8IIkp5OQtbYj8NeuKG1jS9d+kYaubXqsoOiQ8=
Expand Down
Loading

0 comments on commit 3da5d07

Please sign in to comment.