Skip to content

Commit

Permalink
Close rounds DBs as soon as they are not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
poszu committed Sep 5, 2023
1 parent 93a983f commit eddd78a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 102 deletions.
102 changes: 36 additions & 66 deletions registration/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"io/fs"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"go.uber.org/zap"

"github.com/spacemeshos/poet/config/round_config"
Expand Down Expand Up @@ -45,8 +43,7 @@ type Registration struct {
openRoundMutex sync.RWMutex
openRound *round

db *database
roundDbs map[uint]*leveldb.DB
db *database

powVerifiers powVerifiers
workerSvc WorkerService
Expand Down Expand Up @@ -92,7 +89,10 @@ func NewRegistration(
workerSvc WorkerService,
opts ...newRegistrationOptionFunc,
) (*Registration, error) {
options := newRegistrationOptions{}
options := newRegistrationOptions{
roundCfg: round_config.DefaultConfig(),
cfg: DefaultConfig(),
}
for _, opt := range opts {
opt(&options)
}
Expand All @@ -119,7 +119,6 @@ func NewRegistration(
privKey: options.privKey,
db: db,
workerSvc: workerSvc,
roundDbs: make(map[uint]*leveldb.DB),
}

if options.powVerifier == nil {
Expand All @@ -128,16 +127,13 @@ func NewRegistration(
r.powVerifiers = powVerifiers{current: options.powVerifier}
}

round, err := r.createRound(ctx)
epoch := r.roundCfg.OpenRoundId(r.genesis, time.Now())
round, err := newRound(epoch, r.dbdir, r.newRoundOpts()...)
if err != nil {
return nil, fmt.Errorf("creating new round: %w", err)
}
logging.FromContext(ctx).Info("opened round", zap.Uint("epoch", epoch), zap.Int("members", round.members))
r.openRound = round
logging.FromContext(ctx).Info(
"Opened round",
zap.Uint("epoch", r.openRound.epoch),
zap.Int("members", r.openRound.members),
)

return r, nil
}
Expand All @@ -147,12 +143,6 @@ func (r *Registration) Pubkey() ed25519.PublicKey {
}

func (r *Registration) Close() (res error) {
for epoch, db := range r.roundDbs {
if err := db.Close(); err != nil {
res = errors.Join(err, fmt.Errorf("closing round db for epoch %d: %w", epoch, err))
}
}

return errors.Join(res, r.db.Close(), r.openRound.Close())
}

Expand All @@ -163,18 +153,21 @@ func (r *Registration) closeRound(ctx context.Context) error {
if err != nil {
return fmt.Errorf("calculating membership root: %w", err)
}
logging.FromContext(ctx).
Info("closing round", zap.Uint("epoch", r.openRound.epoch), zap.Binary("root", root), zap.Int("members", r.openRound.members))
logging.FromContext(ctx).Info("closing round", zap.Uint("epoch", r.openRound.epoch), zap.Binary("root", root), zap.Int("members", r.openRound.members))

if err := r.openRound.Close(); err != nil {
logging.FromContext(ctx).Error("failed to close the open round", zap.Error(err))
}
if err := r.workerSvc.ExecuteRound(ctx, r.openRound.epoch, root); err != nil {
return fmt.Errorf("closing round for epoch %d: %w", r.openRound.epoch, err)
}
round, err := r.createRound(ctx)
epoch := r.roundCfg.OpenRoundId(r.genesis, time.Now())
round, err := newRound(epoch, r.dbdir, r.newRoundOpts()...)
if err != nil {
return fmt.Errorf("creating new round: %w", err)
}
if err := r.openRound.Close(); err != nil {
return fmt.Errorf("closing round: %w", err)
}
logging.FromContext(ctx).Info("opened round", zap.Uint("epoch", epoch), zap.Int("members", round.members))

r.openRound = round
return nil
}
Expand Down Expand Up @@ -202,6 +195,7 @@ func (r *Registration) Run(ctx context.Context) error {

proofs := r.workerSvc.RegisterForProofs(ctx)

// First re-execute the in-progress round if any
if err := r.recoverExecution(ctx); err != nil {
return fmt.Errorf("recovering execution: %w", err)
}
Expand All @@ -225,21 +219,21 @@ func (r *Registration) Run(ctx context.Context) error {
}

func (r *Registration) recoverExecution(ctx context.Context) error {
// First re-execute the in-progress round if any
_, executing := r.Info(ctx)
if executing == nil {
return nil
}
db, err := leveldb.OpenFile(r.roundDbPath(*executing), &opt.Options{ErrorIfMissing: true})

opts := append(r.newRoundOpts(), withFailfIfNotExists())
round, err := newRound(*executing, r.dbdir, opts...)
switch {
case os.IsNotExist(err):
// nothing to do
case errors.Is(err, fs.ErrNotExist):
return nil
case err != nil:
return err
}
defer round.Close()

round := newRound(*executing, db, r.newRoundOpts()...)
logging.FromContext(ctx).Info("found round in progress, scheduling it", zap.Uint("epoch", round.epoch))
root, err := round.calcMembershipRoot()
if err != nil {
Expand All @@ -255,22 +249,21 @@ func (r *Registration) onNewProof(ctx context.Context, proof shared.NIP) error {
logger := logging.FromContext(ctx).Named("on-proof").With(zap.Uint("epoch", proof.Epoch))
logger.Info("received new proof", zap.Uint64("leaves", proof.Leaves))

var (
db *leveldb.DB
ok bool
)
if db, ok = r.roundDbs[proof.Epoch]; !ok {
// try to reopen round db
var err error
db, err = leveldb.OpenFile(r.roundDbPath(proof.Epoch), &opt.Options{ErrorIfMissing: true})
if err != nil {
return fmt.Errorf("reopening round db: %w", err)
}
// Retrieve the list of round members for the round.
// This is temporary until we remove the list of members from the proof.
opts := append(r.newRoundOpts(), withFailfIfNotExists())
round, err := newRound(proof.Epoch, r.dbdir, opts...)
switch {
case errors.Is(err, fs.ErrNotExist):
return nil
case err != nil:
return err
}

round := newRound(proof.Epoch, db, r.newRoundOpts()...)

members := round.getMembers()
if err := round.Close(); err != nil {
logger.Error("failed to close round", zap.Error(err))
}
if err := r.db.SaveProof(ctx, proof, members); err != nil {
return fmt.Errorf("saving proof in DB: %w", err)
} else {
Expand All @@ -293,29 +286,6 @@ func (r *Registration) newRoundOpts() []newRoundOptionFunc {
}
}

func (r *Registration) roundDbPath(epoch uint) string {
return filepath.Join(r.dbdir, "rounds", epochToRoundId(epoch))
}

func (r *Registration) createRound(ctx context.Context) (*round, error) {
epoch := r.roundCfg.OpenRoundId(r.genesis, time.Now())
var db *leveldb.DB
var ok bool
if db, ok = r.roundDbs[epoch]; !ok {
var err error
db, err = leveldb.OpenFile(r.roundDbPath(epoch), nil)
if err != nil {
return nil, fmt.Errorf("opening round db: %w", err)
}
r.roundDbs[epoch] = db
}
round := newRound(epoch, db, r.newRoundOpts()...)

logging.FromContext(ctx).
Info("created new round", zap.Uint("epoch", epoch), zap.Time("start", r.roundCfg.RoundStart(r.genesis, epoch)))
return round, nil
}

func (r *Registration) Submit(
ctx context.Context,
challenge, nodeID []byte,
Expand Down
38 changes: 15 additions & 23 deletions registration/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ import (
"github.com/spacemeshos/poet/shared"
)

// func TestNewServiceCannotSetNilVerifier(t *testing.T) {
// _, err := service.NewService(
// context.Background(),
// config.DefaultConfig().Service,
// t.TempDir(),
// t.TempDir(),
// service.WithPowVerifier(nil),
// )
// require.ErrorContains(t, err, "pow verifier cannot be nil")
// }

func TestSubmitIdempotence(t *testing.T) {
req := require.New(t)
genesis := time.Now().Add(time.Second)
Expand Down Expand Up @@ -160,39 +149,42 @@ func TestService_OpeningRounds(t *testing.T) {
// The challenge should be changed to the root of PoET proof Merkle tree
// of the previous round.
func TestService_PowChallengeRotation(t *testing.T) {
genesis := time.Now().Add(time.Second)
genesis := time.Now()

proofs := make(chan shared.NIP, 1)

workerSvc := mocks.NewMockWorkerService(gomock.NewController(t))
workerSvc.EXPECT().RegisterForProofs(gomock.Any()).Return(proofs)
workerSvc.EXPECT().ExecuteRound(gomock.Any(), gomock.Any(), 0).Return(nil).AnyTimes()
workerSvc.EXPECT().ExecuteRound(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, epoch uint, _ []byte) error {
proofs <- shared.NIP{
MerkleProof: shared.MerkleProof{
Root: []byte{1, 2, 3, 4},
},
Epoch: epoch,
}
return nil
}).AnyTimes()

reg, err := registration.NewRegistration(
context.Background(),
genesis,
t.TempDir(),
workerSvc,
registration.WithRoundConfig(round_config.Config{EpochDuration: 10 * time.Millisecond}),
)
require.NoError(t, err)

params0 := reg.PowParams()
require.NotEqual(t, []byte{1, 2, 3, 4}, params0.Challenge)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var eg errgroup.Group
eg.Go(func() error { return reg.Run(ctx) })

params0 := reg.PowParams()
require.NotEqual(t, []byte{1, 2, 3, 4}, params0.Challenge)

proofs <- shared.NIP{
MerkleProof: shared.MerkleProof{
Root: []byte{1, 2, 3, 4},
},
}

require.Eventually(t, func() bool {
return bytes.Equal([]byte{1, 2, 3, 4}, reg.PowParams().Challenge)
}, time.Second, time.Millisecond*10)
}, time.Second, time.Millisecond)

cancel()
require.NoError(t, eg.Wait())
Expand Down
20 changes: 17 additions & 3 deletions registration/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -68,6 +69,7 @@ type newRoundOptions struct {
submitFlushInterval time.Duration
maxMembers int
maxSubmitBatchSize int
failIfNotExists bool
}

type newRoundOptionFunc func(*newRoundOptions)
Expand All @@ -90,7 +92,13 @@ func withMaxSubmitBatchSize(size int) newRoundOptionFunc {
}
}

func newRound(epoch uint, db *leveldb.DB, options ...newRoundOptionFunc) *round {
func withFailfIfNotExists() newRoundOptionFunc {
return func(o *newRoundOptions) {
o.failIfNotExists = true
}
}

func newRound(epoch uint, dbdir string, options ...newRoundOptionFunc) (*round, error) {
id := strconv.FormatUint(uint64(epoch), 10)
opts := newRoundOptions{
submitFlushInterval: time.Microsecond,
Expand All @@ -101,6 +109,12 @@ func newRound(epoch uint, db *leveldb.DB, options ...newRoundOptionFunc) *round
opt(&opts)
}

dbdir = filepath.Join(dbdir, "rounds", epochToRoundId(epoch))
db, err := leveldb.OpenFile(dbdir, &opt.Options{ErrorIfMissing: opts.failIfNotExists})
if err != nil {
return nil, fmt.Errorf("failed to open round db: %w", err)
}

// Note: using the panicking version here because it panics
// only if the number of label values is not the same as the number of variable labels in Desc.
// There is only 1 label (round ID), that is passed, so it's safe to use.
Expand All @@ -118,7 +132,7 @@ func newRound(epoch uint, db *leveldb.DB, options ...newRoundOptionFunc) *round

membersCounter.Add(float64(r.members))

return r
return r, nil
}

// submit a challenge to the round under the given key.
Expand Down Expand Up @@ -247,5 +261,5 @@ func (r *round) getMembers() (members [][]byte) {

func (r *round) Close() error {
r.flushPendingSubmits()
return nil
return r.db.Close()
}
18 changes: 8 additions & 10 deletions registration/round_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
)

func genChallenge() ([]byte, error) {
Expand All @@ -31,10 +30,10 @@ func genChallenges(num int) ([][]byte, error) {

func newTestRound(t *testing.T, epoch uint, opts ...newRoundOptionFunc) *round {
t.Helper()
db, err := leveldb.OpenFile(t.TempDir(), nil)
r, err := newRound(epoch, t.TempDir(), opts...)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
return newRound(epoch, db, opts...)
t.Cleanup(func() { require.NoError(t, r.Close()) })
return r
}

// Test submitting many challenges.
Expand Down Expand Up @@ -134,22 +133,21 @@ func TestRound_Submit(t *testing.T) {
}

func TestRound_Reopen(t *testing.T) {
db, err := leveldb.OpenFile(t.TempDir(), nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

dbdir := t.TempDir()
// Arrange
challenge, err := genChallenge()
require.NoError(t, err)
{
round := newRound(0, db)
round, err := newRound(0, dbdir)
require.NoError(t, err)
_, err = round.submit(context.Background(), []byte("key"), challenge)
require.NoError(t, err)
require.NoError(t, round.Close())
}

// Act
recovered := newRound(0, db)
recovered, err := newRound(0, dbdir)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, recovered.Close()) })

// Verify
Expand Down

0 comments on commit eddd78a

Please sign in to comment.