From cf2b1b3a91abc2552b1374368a4ec6b4fa61a607 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 3 Sep 2024 22:45:09 +0800 Subject: [PATCH 1/6] impl sized file dao --- blockchain/filedao/sized.go | 324 ++++++++++++++++++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 blockchain/filedao/sized.go diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go new file mode 100644 index 0000000000..9cc3d93a56 --- /dev/null +++ b/blockchain/filedao/sized.go @@ -0,0 +1,324 @@ +package filedao + +import ( + "context" + "os" + "sync" + + "github.com/holiman/billy" + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +type ( + blockstore []byte + sizedDao struct { + size uint64 + dataDir string + + store billy.Database + + tip uint64 + base uint64 + heightToHash map[uint64]hash.Hash256 + hashToHeight map[hash.Hash256]uint64 + heightToID map[uint64]uint64 + lock sync.RWMutex + + deser *block.Deserializer + } +) + +func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (FileDAO, error) { + return &sizedDao{ + size: size, + dataDir: dataDir, + heightToHash: make(map[uint64]hash.Hash256), + hashToHeight: make(map[hash.Hash256]uint64), + heightToID: make(map[uint64]uint64), + deser: deser, + }, nil +} + +func (sd *sizedDao) Start(ctx context.Context) error { + dir := sd.dataDir + if err := os.MkdirAll(dir, 0700); err != nil { + return errors.Wrap(err, "failed to create blob store directory") + } + + var fails []uint64 + index := func(id uint64, size uint32, blob []byte) { + blk, err := blockstore(blob).Block(sd.deser) + if err != nil { + fails = append(fails, id) + log.L().Warn("Failed to decode block", zap.Error(err)) + return + } + h := blk.HashBlock() + height := blk.Height() + sd.hashToHeight[h] = height + sd.heightToHash[height] = h + sd.heightToID[height] = id + if height > sd.tip || sd.tip == 0 { + sd.tip = height + } + if height < sd.base || sd.base == 0 { + sd.base = height + } + } + + store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index) + if err != nil { + return errors.Wrap(err, "failed to open blob store") + } + sd.store = store + if len(fails) > 0 { + return errors.Errorf("failed to decode blocks %v", fails) + } + return nil +} + +func (sd *sizedDao) Stop(ctx context.Context) error { + return sd.store.Close() +} + +func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { + if blk.Height() != sd.tip+1 { + return ErrInvalidTipHeight + } + data, err := serializeBlock(blk) + if err != nil { + return err + } + + sd.lock.Lock() + defer sd.lock.Unlock() + if blk.Height() != sd.tip+1 { + return ErrInvalidTipHeight + } + id, err := sd.store.Put(data) + if err != nil { + return err + } + sd.tip++ + hash := blk.HashBlock() + sd.heightToHash[sd.tip] = hash + sd.hashToHeight[hash] = sd.tip + sd.heightToID[sd.tip] = id + + if sd.tip-sd.base > sd.size { + sd.drop() + } + return nil +} + +func (sd *sizedDao) Height() (uint64, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + return sd.tip, nil +} + +func (sd *sizedDao) GetBlockHash(height uint64) (hash.Hash256, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + h, ok := sd.heightToHash[height] + if !ok { + return hash.ZeroHash256, db.ErrNotExist + } + return h, nil +} + +func (sd *sizedDao) GetBlockHeight(h hash.Hash256) (uint64, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return 0, db.ErrNotExist + } + return height, nil +} + +func (sd *sizedDao) GetBlock(h hash.Hash256) (*block.Block, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return nil, db.ErrNotExist + } + return sd.getBlock(height) +} + +func (sd *sizedDao) GetBlockByHeight(height uint64) (*block.Block, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + return sd.getBlock(height) +} + +func (sd *sizedDao) GetReceipts(height uint64) ([]*action.Receipt, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return blk.Receipts, nil +} + +func (sd *sizedDao) ContainsTransactionLog() bool { + return true +} + +func (sd *sizedDao) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + id, ok := sd.heightToID[height] + if !ok { + return nil, db.ErrNotExist + } + data, err := sd.store.Get(id) + if err != nil { + return nil, err + } + return blockstore(data).TransactionLogs() +} + +func (sd *sizedDao) DeleteTipBlock() error { + panic("not supported") +} + +func (sd *sizedDao) Header(h hash.Hash256) (*block.Header, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return nil, db.ErrNotExist + } + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Header, nil +} + +func (sd *sizedDao) HeaderByHeight(height uint64) (*block.Header, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Header, nil +} + +func (sd *sizedDao) FooterByHeight(height uint64) (*block.Footer, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Footer, nil +} + +func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { + id, ok := sd.heightToID[height] + if !ok { + return nil, db.ErrNotExist + } + data, err := sd.store.Get(id) + if err != nil { + return nil, err + } + return blockstore(data).Block(sd.deser) +} + +func (sd *sizedDao) drop() { + id := sd.heightToID[sd.base] + if err := sd.store.Delete(id); err != nil { + log.L().Error("Failed to delete block", zap.Error(err)) + return + } + hash := sd.heightToHash[sd.base] + delete(sd.heightToHash, sd.base) + delete(sd.heightToID, sd.base) + delete(sd.hashToHeight, hash) + sd.base++ +} + +func serializeBlock(blk *block.Block) (blockstore, error) { + data := make(blockstore, 0) + s := &block.Store{ + Block: blk, + Receipts: blk.Receipts, + } + tmp, err := s.Serialize() + if err != nil { + return nil, err + } + data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) + data = append(data, tmp...) + txLog := blk.TransactionLog() + if txLog != nil { + tmp = txLog.Serialize() + data = append(data, tmp...) + } + return data, nil +} + +func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { + size, err := s.blockSize() + if err != nil { + return nil, err + } + if uint64(len(s)) < size+8 { + return nil, errors.New("blockstore is too short") + } + bs, err := deser.DeserializeBlockStore(s[8 : size+8]) + if err != nil { + return nil, err + } + bs.Block.Receipts = bs.Receipts + return bs.Block, nil +} + +func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { + size, err := s.blockSize() + if err != nil { + return nil, err + } + if uint64(len(s)) < size+8 { + return nil, errors.New("blockstore is too short") + } else if uint64(len(s)) == size+8 { + return nil, nil + } + + return block.DeserializeSystemLogPb(s[size+8:]) +} + +func (s blockstore) blockSize() (uint64, error) { + if len(s) < 8 { + return 0, errors.New("blockstore is too short") + } + return byteutil.BytesToUint64BigEndian(s[:8]), nil +} + +func newSlotter() func() (uint32, bool) { + // TODO: set emptySize and delta according to the actual block size + emptySize := uint32(1024) + delta := uint32(2048) + slotsize := uint32(emptySize) // empty block + slotsize -= uint32(delta) // underflows, it's ok, will overflow back in the first return + + return func() (size uint32, done bool) { + slotsize += delta + return slotsize, false + } +} From 0700bcc5e131252dc4cc902d6ab5add23dc8bbce Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 12:23:57 +0800 Subject: [PATCH 2/6] blockstore --- blockchain/filedao/blockstore.go | 102 ++++++++++++++++++++++++++ blockchain/filedao/sized.go | 120 ++++++++++++------------------- 2 files changed, 149 insertions(+), 73 deletions(-) create mode 100644 blockchain/filedao/blockstore.go diff --git a/blockchain/filedao/blockstore.go b/blockchain/filedao/blockstore.go new file mode 100644 index 0000000000..b280c833cd --- /dev/null +++ b/blockchain/filedao/blockstore.go @@ -0,0 +1,102 @@ +package filedao + +import ( + "fmt" + + "github.com/iotexproject/iotex-proto/golang/iotextypes" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +const ( + blockstoreDefaultVersion = byte(0) + blockstoreHeaderSize = 8 +) + +type ( + // blockstore is a byte slice that contains a block, its receipts and transaction logs + // the first 8 bytes is the size of the block + // the next n bytes is the serialized block and receipts, n is the size of the block + // the rest bytes is the serialized transaction logs + blockstore []byte +) + +var ( + errInvalidBlockstore = fmt.Errorf("invalid blockstore") +) + +func convertToBlockStore(blk *block.Block) (blockstore, error) { + data := make(blockstore, 0) + s := &block.Store{ + Block: blk, + Receipts: blk.Receipts, + } + tmp, err := s.Serialize() + if err != nil { + return nil, err + } + data = append(data, blockstoreDefaultVersion) + data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) + data = append(data, tmp...) + txLog := blk.TransactionLog() + if txLog != nil { + tmp = txLog.Serialize() + data = append(data, tmp...) + } + return data, nil +} + +func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { + size := s.blockSize() + bs, err := deser.DeserializeBlockStore(s[blockstoreHeaderSize : size+blockstoreHeaderSize]) + if err != nil { + return nil, err + } + bs.Block.Receipts = bs.Receipts + return bs.Block, nil +} + +func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { + size := s.blockSize() + if uint64(len(s)) == size+blockstoreHeaderSize { + return nil, nil + } + return block.DeserializeSystemLogPb(s[size+blockstoreHeaderSize:]) +} + +func (s blockstore) Serialize() []byte { + return append([]byte{blockstoreDefaultVersion}, s...) +} + +func (s *blockstore) Deserialize(data []byte) error { + if len(data) == 0 { + return errInvalidBlockstore + } + switch data[0] { + case blockstoreDefaultVersion: + bs := blockstore(data[1:]) + if err := bs.Validate(); err != nil { + return err + } + *s = bs + return nil + default: + return errInvalidBlockstore + } +} + +func (s blockstore) blockSize() uint64 { + return byteutil.BytesToUint64BigEndian(s[:blockstoreHeaderSize]) +} + +func (s blockstore) Validate() error { + if len(s) < blockstoreHeaderSize { + return errInvalidBlockstore + } + blkSize := s.blockSize() + if uint64(len(s)) < blkSize+blockstoreHeaderSize { + return errInvalidBlockstore + } + return nil +} diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index 9cc3d93a56..9590cf9018 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -15,12 +15,10 @@ import ( "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/db" "github.com/iotexproject/iotex-core/pkg/log" - "github.com/iotexproject/iotex-core/pkg/util/byteutil" ) type ( - blockstore []byte - sizedDao struct { + sizedDao struct { size uint64 dataDir string @@ -49,6 +47,8 @@ func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (Fi } func (sd *sizedDao) Start(ctx context.Context) error { + sd.lock.Lock() + defer sd.lock.Unlock() dir := sd.dataDir if err := os.MkdirAll(dir, 0700); err != nil { return errors.Wrap(err, "failed to create blob store directory") @@ -56,7 +56,14 @@ func (sd *sizedDao) Start(ctx context.Context) error { var fails []uint64 index := func(id uint64, size uint32, blob []byte) { - blk, err := blockstore(blob).Block(sd.deser) + bs := new(blockstore) + err := bs.Deserialize(blob) + if err != nil { + fails = append(fails, id) + log.L().Warn("Failed to decode block store", zap.Error(err)) + return + } + blk, err := bs.Block(sd.deser) if err != nil { fails = append(fails, id) log.L().Warn("Failed to decode block", zap.Error(err)) @@ -87,6 +94,8 @@ func (sd *sizedDao) Start(ctx context.Context) error { } func (sd *sizedDao) Stop(ctx context.Context) error { + sd.lock.Lock() + defer sd.lock.Unlock() return sd.store.Close() } @@ -94,7 +103,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight } - data, err := serializeBlock(blk) + bs, err := convertToBlockStore(blk) if err != nil { return err } @@ -104,7 +113,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight } - id, err := sd.store.Put(data) + id, err := sd.store.Put(bs.Serialize()) if err != nil { return err } @@ -187,7 +196,12 @@ func (sd *sizedDao) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, if err != nil { return nil, err } - return blockstore(data).TransactionLogs() + bs := new(blockstore) + err = bs.Deserialize(data) + if err != nil { + return nil, err + } + return bs.TransactionLogs() } func (sd *sizedDao) DeleteTipBlock() error { @@ -237,7 +251,12 @@ func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { if err != nil { return nil, err } - return blockstore(data).Block(sd.deser) + bs := new(blockstore) + err = bs.Deserialize(data) + if err != nil { + return nil, err + } + return bs.Block(sd.deser) } func (sd *sizedDao) drop() { @@ -253,72 +272,27 @@ func (sd *sizedDao) drop() { sd.base++ } -func serializeBlock(blk *block.Block) (blockstore, error) { - data := make(blockstore, 0) - s := &block.Store{ - Block: blk, - Receipts: blk.Receipts, - } - tmp, err := s.Serialize() - if err != nil { - return nil, err - } - data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) - data = append(data, tmp...) - txLog := blk.TransactionLog() - if txLog != nil { - tmp = txLog.Serialize() - data = append(data, tmp...) - } - return data, nil -} - -func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { - size, err := s.blockSize() - if err != nil { - return nil, err - } - if uint64(len(s)) < size+8 { - return nil, errors.New("blockstore is too short") - } - bs, err := deser.DeserializeBlockStore(s[8 : size+8]) - if err != nil { - return nil, err - } - bs.Block.Receipts = bs.Receipts - return bs.Block, nil -} - -func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { - size, err := s.blockSize() - if err != nil { - return nil, err - } - if uint64(len(s)) < size+8 { - return nil, errors.New("blockstore is too short") - } else if uint64(len(s)) == size+8 { - return nil, nil - } - - return block.DeserializeSystemLogPb(s[size+8:]) -} - -func (s blockstore) blockSize() (uint64, error) { - if len(s) < 8 { - return 0, errors.New("blockstore is too short") - } - return byteutil.BytesToUint64BigEndian(s[:8]), nil -} - func newSlotter() func() (uint32, bool) { - // TODO: set emptySize and delta according to the actual block size - emptySize := uint32(1024) - delta := uint32(2048) - slotsize := uint32(emptySize) // empty block - slotsize -= uint32(delta) // underflows, it's ok, will overflow back in the first return - + sizeList := []uint32{ + 1024 * 4, // empty block + 1024 * 8, // 2 execution + 1024 * 16, + 1024 * 128, // 250 transfer + 1024 * 512, + 1024 * 1024, + 1024 * 1024 * 4, // 5000 transfer + 1024 * 1024 * 8, + 1024 * 1024 * 16, + 1024 * 1024 * 128, + 1024 * 1024 * 512, + 1024 * 1024 * 1024, // max block size + } + i := -1 return func() (size uint32, done bool) { - slotsize += delta - return slotsize, false + i++ + if i >= len(sizeList)-1 { + return sizeList[i], true + } + return sizeList[i], true } } From 44874e29d9106a8c2a9a8c7000cfd344d423fbaf Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 21:52:19 +0800 Subject: [PATCH 3/6] fix --- blockchain/filedao/blockstore.go | 1 - blockchain/filedao/sized.go | 51 +++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/blockchain/filedao/blockstore.go b/blockchain/filedao/blockstore.go index b280c833cd..491fdace00 100644 --- a/blockchain/filedao/blockstore.go +++ b/blockchain/filedao/blockstore.go @@ -36,7 +36,6 @@ func convertToBlockStore(blk *block.Block) (blockstore, error) { if err != nil { return nil, err } - data = append(data, blockstoreDefaultVersion) data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) data = append(data, tmp...) txLog := blk.TransactionLog() diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index 9590cf9018..c3218c90ae 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -29,21 +29,25 @@ type ( heightToHash map[uint64]hash.Hash256 hashToHeight map[hash.Hash256]uint64 heightToID map[uint64]uint64 + dropCh chan uint64 lock sync.RWMutex + wg sync.WaitGroup deser *block.Deserializer } ) func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (FileDAO, error) { - return &sizedDao{ + sd := &sizedDao{ size: size, dataDir: dataDir, heightToHash: make(map[uint64]hash.Hash256), hashToHeight: make(map[hash.Hash256]uint64), heightToID: make(map[uint64]uint64), deser: deser, - }, nil + dropCh: make(chan uint64, size), + } + return sd, nil } func (sd *sizedDao) Start(ctx context.Context) error { @@ -54,7 +58,9 @@ func (sd *sizedDao) Start(ctx context.Context) error { return errors.Wrap(err, "failed to create blob store directory") } - var fails []uint64 + var ( + fails []uint64 + ) index := func(id uint64, size uint32, blob []byte) { bs := new(blockstore) err := bs.Deserialize(blob) @@ -90,15 +96,47 @@ func (sd *sizedDao) Start(ctx context.Context) error { if len(fails) > 0 { return errors.Errorf("failed to decode blocks %v", fails) } + // block continous check + for i := sd.base; i <= sd.tip; i++ { + if i == 0 { + continue + } + if _, ok := sd.heightToID[i]; !ok { + return errors.Errorf("missing block %d", i) + } + } + // start drop routine + go func() { + sd.wg.Add(1) + defer sd.wg.Done() + for id := range sd.dropCh { + if err := sd.store.Delete(id); err != nil { + log.L().Error("Failed to delete block", zap.Error(err)) + } + } + }() return nil } func (sd *sizedDao) Stop(ctx context.Context) error { sd.lock.Lock() defer sd.lock.Unlock() + close(sd.dropCh) + sd.wg.Wait() return sd.store.Close() } +func (sd *sizedDao) SetStart(height uint64) error { + sd.lock.Lock() + defer sd.lock.Unlock() + if len(sd.hashToHeight) > 0 || len(sd.heightToHash) > 0 || len(sd.heightToID) > 0 { + return errors.New("cannot set start height after start") + } + sd.base = height - 1 + sd.tip = height - 1 + return nil +} + func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight @@ -123,7 +161,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { sd.hashToHeight[hash] = sd.tip sd.heightToID[sd.tip] = id - if sd.tip-sd.base > sd.size { + if sd.tip-sd.base >= sd.size { sd.drop() } return nil @@ -261,10 +299,7 @@ func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { func (sd *sizedDao) drop() { id := sd.heightToID[sd.base] - if err := sd.store.Delete(id); err != nil { - log.L().Error("Failed to delete block", zap.Error(err)) - return - } + sd.dropCh <- id hash := sd.heightToHash[sd.base] delete(sd.heightToHash, sd.base) delete(sd.heightToID, sd.base) From a9748dd304d7927f1b18779f58fef675a6d37346 Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 21:52:48 +0800 Subject: [PATCH 4/6] test --- blockchain/filedao/blockstore_test.go | 40 ++++++++++++++++++ blockchain/filedao/sized_test.go | 59 +++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 blockchain/filedao/blockstore_test.go create mode 100644 blockchain/filedao/sized_test.go diff --git a/blockchain/filedao/blockstore_test.go b/blockchain/filedao/blockstore_test.go new file mode 100644 index 0000000000..f5ce3470ee --- /dev/null +++ b/blockchain/filedao/blockstore_test.go @@ -0,0 +1,40 @@ +package filedao + +import ( + "testing" + + "github.com/iotexproject/go-pkgs/hash" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/blockchain/block" +) + +func TestBlockStore(t *testing.T) { + r := require.New(t) + builder := block.NewTestingBuilder() + blk := createTestingBlock(builder, 1, hash.ZeroHash256) + bs, err := convertToBlockStore(blk) + r.NoError(err) + data := bs.Serialize() + dbs := new(blockstore) + r.NoError(dbs.Deserialize(data)) + r.Equal(bs[:], (*dbs)[:], "serialized block store should be equal to deserialized block store") + // check deserialized block + deser := block.NewDeserializer(0) + dBlk, err := dbs.Block(deser) + r.NoError(err) + dTxLogs, err := dbs.TransactionLogs() + r.NoError(err) + r.NoError(fillTransactionLog(dBlk.Receipts, dTxLogs.Logs)) + r.Equal(blk.Header, dBlk.Header) + r.Equal(blk.Body, dBlk.Body) + r.Equal(blk.Footer, dBlk.Footer) + r.Equal(len(blk.Receipts), len(dBlk.Receipts)) + for i := range blk.Receipts { + r.Equal(blk.Receipts[i].Hash(), dBlk.Receipts[i].Hash()) + r.Equal(len(blk.Receipts[i].TransactionLogs()), len(dBlk.Receipts[i].TransactionLogs())) + for j := range blk.Receipts[i].TransactionLogs() { + r.Equal(blk.Receipts[i].TransactionLogs()[j], dBlk.Receipts[i].TransactionLogs()[j]) + } + } +} diff --git a/blockchain/filedao/sized_test.go b/blockchain/filedao/sized_test.go new file mode 100644 index 0000000000..f40b3113ae --- /dev/null +++ b/blockchain/filedao/sized_test.go @@ -0,0 +1,59 @@ +package filedao + +import ( + "context" + "crypto/tls" + "testing" + + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-proto/golang/iotexapi" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/blockchain/block" +) + +func TestBlockSize(t *testing.T) { + r := require.New(t) + conn, err := grpc.NewClient("api.mainnet.iotex.one:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}))) + r.NoError(err) + defer conn.Close() + + cli := iotexapi.NewAPIServiceClient(conn) + for _, h := range []uint64{29276276} { + resp, err := cli.GetRawBlocks(context.Background(), &iotexapi.GetRawBlocksRequest{ + StartHeight: h, + Count: 1, + WithReceipts: true, + WithTransactionLogs: true, + }) + r.NoError(err) + r.Len(resp.Blocks, 1) + deserializer := block.NewDeserializer(4689) + blk, err := deserializer.FromBlockProto(resp.Blocks[0].Block) + r.NoError(err) + receipts := make([]*action.Receipt, 0, len(resp.Blocks[0].Receipts)) + for _, receiptpb := range resp.Blocks[0].Receipts { + receipt := &action.Receipt{} + receipt.ConvertFromReceiptPb(receiptpb) + receipts = append(receipts, receipt) + } + blk.Receipts = receipts + data, err := convertToBlockStore(blk) + r.NoError(err) + t.Logf("block %d size= %d", h, len(data)) + } +} + +func TestSizedDao(t *testing.T) { + r := require.New(t) + dao, err := NewSizedFileDao(10, t.TempDir(), block.NewDeserializer(4089)) + r.NoError(err) + + ctx := context.Background() + r.NoError(dao.Start(ctx)) + r.NoError(testCommitBlocks(t, dao, 1, 100, hash.ZeroHash256)) + r.NoError(dao.Stop(ctx)) +} From 022b1abd6578f2ea0092468703515d28bee4d262 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 6 Sep 2024 08:25:57 +0800 Subject: [PATCH 5/6] fix --- blockchain/filedao/sized.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index c3218c90ae..9c87ffd20b 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -1,8 +1,11 @@ package filedao import ( + "bytes" "context" + "math/big" "os" + "slices" "sync" "github.com/holiman/billy" @@ -331,3 +334,37 @@ func newSlotter() func() (uint32, bool) { return sizeList[i], true } } + +func fillTransactionLog(receipts []*action.Receipt, txLogs []*iotextypes.TransactionLog) error { + for _, l := range txLogs { + idx := slices.IndexFunc(receipts, func(r *action.Receipt) bool { + return bytes.Equal(r.ActionHash[:], l.ActionHash) + }) + if idx < 0 { + return errors.Errorf("missing receipt for log %x", l.ActionHash) + } + txLogs := make([]*action.TransactionLog, len(l.GetTransactions())) + for j, tx := range l.GetTransactions() { + txlog, err := convertToTxLog(tx) + if err != nil { + return err + } + txLogs[j] = txlog + } + receipts[idx].AddTransactionLogs(txLogs...) + } + return nil +} + +func convertToTxLog(tx *iotextypes.TransactionLog_Transaction) (*action.TransactionLog, error) { + amount, ok := big.NewInt(0).SetString(tx.Amount, 10) + if !ok { + return nil, errors.Errorf("failed to parse amount %s", tx.Amount) + } + return &action.TransactionLog{ + Type: tx.Type, + Amount: amount, + Sender: tx.Sender, + Recipient: tx.Recipient, + }, nil +} From ef2972ac3ae099add49bd28ebfbd9e89ff2a282e Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 6 Sep 2024 08:41:47 +0800 Subject: [PATCH 6/6] merge dao --- blockchain/filedao/merge.go | 85 ++++++++++++++++++++++++++++++++ blockchain/filedao/merge_test.go | 40 +++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 blockchain/filedao/merge.go create mode 100644 blockchain/filedao/merge_test.go diff --git a/blockchain/filedao/merge.go b/blockchain/filedao/merge.go new file mode 100644 index 0000000000..cd9a187811 --- /dev/null +++ b/blockchain/filedao/merge.go @@ -0,0 +1,85 @@ +package filedao + +import ( + "context" + + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/pkg/log" +) + +type ( + // mergeDao wraps a file dao, and merge the blocks from the original dao on start + mergeDao struct { + FileDAO + originalDao FileDAO + mergeMax uint64 + } +) + +// NewMergeDao creates a new merge dao +func NewMergeDao(original FileDAO, new FileDAO, mergeMax uint64) FileDAO { + return &mergeDao{ + FileDAO: new, + originalDao: original, + mergeMax: mergeMax, + } +} + +func (md *mergeDao) Start(ctx context.Context) error { + if err := md.FileDAO.Start(ctx); err != nil { + return err + } + if err := md.originalDao.Start(context.Background()); err != nil { + log.L().Error("failed to start original dao", zap.Error(err)) + return nil + } + defer md.originalDao.Stop(ctx) + + height, err := md.FileDAO.Height() + if err != nil { + return err + } + orgTip, err := md.originalDao.Height() + if err != nil { + return err + } + // doing nothing if the height is larger than the original dao + if height >= orgTip { + return nil + } + // init the blocks from the original dao + start := uint64(1) + if md.mergeMax > 0 && orgTip > (md.mergeMax+1) { + start = orgTip - md.mergeMax + if d, ok := md.FileDAO.(interface{ SetStart(uint64) error }); ok { + if err = d.SetStart(start); err != nil { + return err + } + } + } + for i := start; i <= orgTip; i++ { + blk, err := md.originalDao.GetBlockByHeight(i) + if err != nil { + return err + } + receipts, err := md.originalDao.GetReceipts(i) + if err != nil { + return err + } + blk.Receipts = receipts + if md.originalDao.ContainsTransactionLog() { + logs, err := md.originalDao.TransactionLogs(i) + if err != nil { + return err + } + if err = fillTransactionLog(receipts, logs.Logs); err != nil { + return err + } + } + if err := md.PutBlock(ctx, blk); err != nil { + return err + } + } + return nil +} diff --git a/blockchain/filedao/merge_test.go b/blockchain/filedao/merge_test.go new file mode 100644 index 0000000000..a61c4918ec --- /dev/null +++ b/blockchain/filedao/merge_test.go @@ -0,0 +1,40 @@ +package filedao + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/iotexproject/go-pkgs/hash" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db" +) + +func TestMergeDao(t *testing.T) { + r := require.New(t) + cfg := db.DefaultConfig + cfg.DbPath = t.TempDir() + "/filedao" + deser := block.NewDeserializer(4689) + fdao, err := NewFileDAO(cfg, deser) + r.NoError(err) + ctx := context.Background() + r.NoError(fdao.Start(ctx)) + r.NoError(testCommitBlocks(t, fdao, 1, 100, hash.ZeroHash256)) + r.NoError(fdao.Stop(ctx)) + + fdao, err = NewFileDAO(cfg, deser) + r.NoError(err) + sdao, err := NewSizedFileDao(10, t.TempDir(), deser) + r.NoError(err) + mdao := NewMergeDao(fdao, sdao, 10) + r.NoError(mdao.Start(ctx)) + _, err = mdao.GetBlockByHeight(1) + r.ErrorIs(err, db.ErrNotExist) + _, err = mdao.GetBlockByHeight(90) + r.ErrorIs(err, db.ErrNotExist) + _, err = mdao.GetBlockByHeight(91) + r.NoError(err) + r.NoError(mdao.Stop(ctx)) +}