Skip to content

Commit

Permalink
blockmanager: Added sideload functionality to blockmanager
Browse files Browse the repository at this point in the history
Signed-off-by: Maureen Ononiwu <[email protected]>
  • Loading branch information
Chinwendu20 committed Aug 19, 2023
1 parent edde089 commit 056751d
Showing 1 changed file with 240 additions and 16 deletions.
256 changes: 240 additions & 16 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"container/list"
"fmt"
"github.com/lightninglabs/neutrino/sideload"
"io"
"math"
"math/big"
"sync"
Expand Down Expand Up @@ -45,6 +47,10 @@ const (
maxCFCheckptsPerQuery = wire.MaxCFHeadersPerMsg / wire.CFCheckptInterval
)

var (
MaxHeaderWrite = 2000
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash chainhash.Hash

Expand Down Expand Up @@ -72,6 +78,14 @@ type donePeerMsg struct {
peer *ServerPeer
}

type SideLoadOpt struct {
Verify bool
Path string
Enabled bool
SourceType string
startHeight int
}

// blockManagerCfg holds options and dependencies needed by the blockManager
// during operation.
type blockManagerCfg struct {
Expand Down Expand Up @@ -109,6 +123,9 @@ type blockManagerCfg struct {
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
options ...QueryOption)

// sideLoad is the config used to enable a non p2p fetching of headers to improve sync speed.
sideLoad SideLoadOpt
}

// blockManager provides a concurrency safe block manager for handling all
Expand Down Expand Up @@ -206,6 +223,9 @@ type blockManager struct { // nolint:maligned
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block

//sideLoadReader is the reader used for a non p2p fetching of headers.
sideLoadReader sideload.Reader
}

// newBlockManager returns a new bitcoin block manager. Use Start to begin
Expand All @@ -214,7 +234,6 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
targetTimespan := int64(cfg.ChainParams.TargetTimespan / time.Second)
targetTimePerBlock := int64(cfg.ChainParams.TargetTimePerBlock / time.Second)
adjustmentFactor := cfg.ChainParams.RetargetAdjustmentFactor

bm := blockManager{
cfg: cfg,
peerChan: make(chan interface{}, MaxPeers*3),
Expand Down Expand Up @@ -279,6 +298,27 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
}
bm.filterHeaderTipHash = fh.BlockHash()

// If sideload is enabled initialize reader and assign to blockmanager.
if bm.cfg.sideLoad.Enabled {

reader, err := sideload.NewReader(&sideload.ReaderConfig{

SourceType: bm.cfg.sideLoad.SourceType,
Path: bm.cfg.sideLoad.Path,
})

log.Infof("Side loading enabled")

if err == nil {
bm.sideLoadReader = reader
} else {

bm.cfg.sideLoad.Enabled = false
log.Warnf("Side loading disabled: %v", err)
}

}

return &bm, nil
}

Expand Down Expand Up @@ -1981,6 +2021,15 @@ func checkCFCheckptSanity(cp map[string][]*chainhash.Hash,
func (b *blockManager) blockHandler() {
defer b.wg.Done()

// Attempt to sideLoad headers. If sideLoading is not enabled the function
// returns quickly.
b.sideLoadHeaders()

select {
case <-b.quit:
return
default:
}
candidatePeers := list.New()
out:
for {
Expand Down Expand Up @@ -2410,16 +2459,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
node := headerlist.Node{Header: *blockHeader}
prevNode := prevNodeEl
prevHash := prevNode.Header.BlockHash()
if prevHash.IsEqual(&blockHeader.PrevBlock) {
prevNodeHeight := prevNode.Height
prevNodeHeader := prevNode.Header
err := b.checkHeaderSanity(
blockHeader, false, prevNodeHeight,
&prevNodeHeader,
)
err, passVerification := b.verifyBlockHeader(blockHeader, *prevNode)
if passVerification {
if err != nil {
log.Warnf("Header doesn't pass sanity check: "+
"%s -- disconnecting peer", err)
log.Warnf("%v: "+
" -- disconnecting peer", err)
hmsg.peer.Disconnect()
return
}
Expand Down Expand Up @@ -2544,9 +2588,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
}

err = b.checkHeaderSanity(
reorgHeader, true,
reorgHeader, b.reorgList,
int32(prevNodeHeight), prevNodeHeader,
)

if err != nil {
log.Warnf("Header doesn't pass sanity"+
" check: %s -- disconnecting "+
Expand Down Expand Up @@ -2754,6 +2799,7 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool {
}

lastHeader = blockHash

}

return true
Expand All @@ -2764,14 +2810,10 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool {
// the contextual check and blockchain.CheckBlockHeaderSanity for context-less
// checks.
func (b *blockManager) checkHeaderSanity(blockHeader *wire.BlockHeader,
reorgAttempt bool, prevNodeHeight int32,
hList headerlist.Chain, prevNodeHeight int32,
prevNodeHeader *wire.BlockHeader) error {

// Create the lightHeaderCtx for the blockHeader's parent.
hList := b.headerList
if reorgAttempt {
hList = b.reorgList
}

parentHeaderCtx := newLightHeaderCtx(
prevNodeHeight, prevNodeHeader, b.cfg.BlockHeaders, hList,
Expand Down Expand Up @@ -3043,4 +3085,186 @@ func (l *lightHeaderCtx) RelativeAncestorCtx(
return newLightHeaderCtx(
ancestorHeight, ancestor, l.store, l.headerList,
)

}

func (b *blockManager) sideLoadHeaders() {

// If sideloading is not enabled on this chain
// return quickly.
if !b.cfg.sideLoad.Enabled {
return
}

// If headers contained in the side load source are for a different chain network return
// immediately.
if b.sideLoadReader.HeadersChain() != b.cfg.ChainParams.Net {

log.Error("headers from side load file are of network %v "+
"and so incompatible with neutrino's current bitcoin network -- skipping side loading", b.sideLoadReader.HeadersChain())

return
}
log.Infof("Side loading headers from %v to %v", b.headerTip, b.sideLoadReader.EndHeight())

// Set headerTip to enable reader supply header, node needs
err := b.sideLoadReader.SetHeight(int64(b.headerTip))

if err != nil {
log.Errorf("error while setting height for sideload--- skipping sideloading: "+
"%v", err)

return
}
headerBatch := make([]headerfs.BlockHeader, 0)
for {
//Request header
header, headerErr := b.sideLoadReader.NextHeader()
// If any error occurs while fetching headers that does not indicate
// an end of file, return immediately.
if headerErr != nil && headerErr != io.EOF {

log.Errorf("error while fetching headers -- skipping sideloading %v", err)

return
}

var (
node *headerlist.Node
prevNode *headerlist.Node
)

// Update node height if header is not nil
if header != nil {

// Ensure there is a previous header to compare against.
prevNodeEl := b.headerList.Back()
if prevNodeEl == nil {
log.Warnf("side load - Header list does not contain a previous" +
"element as expected -- exiting side load")

return
}

node = &headerlist.Node{Header: *header}
prevNode = prevNodeEl
node.Height = prevNode.Height + 1
}

if b.cfg.sideLoad.Verify && headerErr == nil {
err, passVerification := b.verifyBlockHeader(header, *prevNode)
if err != nil || !passVerification {
log.Debugf("Side Load- Did not pass verification at height %v"+
"-- rolling back to last verified checkpoint and skipping sideload", node.Height)

prevCheckpoint := b.findPreviousHeaderCheckpoint(
node.Height,
)

log.Infof("Rolling back to previous validated "+
"checkpoint at height %d/hash %s",
prevCheckpoint.Height,
prevCheckpoint.Hash)

err = b.rollBackToHeight(uint32(prevCheckpoint.Height))
if err != nil {
panic(fmt.Sprintf("Rollback failed: %s", err))
// Should we panic here?
}
tipHeader, height, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
return
}
b.headerList.ResetHeaderState(headerlist.Node{
Height: int32(height),
Header: *tipHeader,
})
return
}

}

// Verify checkpoint only if verification is enabled
if b.nextCheckpoint != nil && headerErr == nil && b.cfg.sideLoad.Verify &&
node.Height == b.nextCheckpoint.Height {

nodeHash := node.Header.BlockHash()
if nodeHash.IsEqual(b.nextCheckpoint.Hash) {

log.Infof("Verified downloaded block "+
"header against checkpoint at height "+
"%d/hash %s", node.Height, nodeHash)
} else {
log.Warnf("Error at checkpoint while side loading headers, exiting"+
"%d/hash %s", node.Height, nodeHash)
return
}

}

if header != nil {
headerBatch = append(headerBatch, headerfs.BlockHeader{
BlockHeader: header,
Height: uint32(node.Height),
})

}

// Write header if batch is greater than the MaxHeaderWrite or if we have reached the
// end of file.
if len(headerBatch) >= MaxHeaderWrite || headerErr == io.EOF {
err = b.cfg.BlockHeaders.WriteHeaders(headerBatch...)
if err != nil {
log.Warnf("Error writing headers in blockheader store")
return
}

// Updated header tip so as to enable cfhandler to start fetching cfheaders

if headerErr == io.EOF {
log.Infof("Successfully completed sideloading")
return
}
b.nextCheckpoint = b.findNextHeaderCheckpoint(node.Height)
b.newHeadersMtx.Lock()
b.headerTip = uint32(node.Height)
b.headerTipHash = node.Header.BlockHash()
b.newHeadersMtx.Unlock()
b.newHeadersSignal.Broadcast()

b.blkHeaderProgressLogger.LogBlockHeight(
header.Timestamp, node.Height,
)

}

b.headerList.PushBack(*node)
// Check if we are to quit now
select {
case <-b.quit:
return
default:
}
}

}

func (b *blockManager) verifyBlockHeader(blockHeader *wire.BlockHeader, prevNode headerlist.Node) (error, bool) {
prevNodeHeader := prevNode.Header
prevHash := prevNode.Header.BlockHash()
prevNodeHeight := prevNode.Height

if prevHash.IsEqual(&blockHeader.PrevBlock) {
err := b.checkHeaderSanity(blockHeader, b.headerList, prevNodeHeight, &prevNodeHeader)

if err != nil {
return fmt.Errorf("did not pass sanity check: %v", err), true
}
return nil, true

} else {
log.Debugf("Not properly connected %v")
return nil, false
}

}

0 comments on commit 056751d

Please sign in to comment.