diff --git a/blockmanager.go b/blockmanager.go index 9c9e6f4c..98421f7d 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -6,6 +6,8 @@ import ( "bytes" "container/list" "fmt" + "github.com/lightninglabs/neutrino/sideload" + "io" "math" "math/big" "sync" @@ -45,6 +47,10 @@ const ( maxCFCheckptsPerQuery = wire.MaxCFHeadersPerMsg / wire.CFCheckptInterval ) +var ( + MaxHeaderWrite = 2000 +) + // zeroHash is the zero value hash (all zeros). It is defined as a convenience. var zeroHash chainhash.Hash @@ -72,6 +78,14 @@ type donePeerMsg struct { peer *ServerPeer } +type SideLoadOpt struct { + Verify bool + Path string + Enabled bool + SourceType string + startHeight int +} + // blockManagerCfg holds options and dependencies needed by the blockManager // during operation. type blockManagerCfg struct { @@ -109,6 +123,9 @@ type blockManagerCfg struct { checkResponse func(sp *ServerPeer, resp wire.Message, quit chan<- struct{}, peerQuit chan<- struct{}), options ...QueryOption) + + // sideLoad is the config used to enable a non p2p fetching of headers to improve sync speed. + sideLoad SideLoadOpt } // blockManager provides a concurrency safe block manager for handling all @@ -206,6 +223,9 @@ type blockManager struct { // nolint:maligned minRetargetTimespan int64 // target timespan / adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor blocksPerRetarget int32 // target timespan / target time per block + + //sideLoadReader is the reader used for a non p2p fetching of headers. + sideLoadReader sideload.Reader } // newBlockManager returns a new bitcoin block manager. Use Start to begin @@ -214,7 +234,6 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) { targetTimespan := int64(cfg.ChainParams.TargetTimespan / time.Second) targetTimePerBlock := int64(cfg.ChainParams.TargetTimePerBlock / time.Second) adjustmentFactor := cfg.ChainParams.RetargetAdjustmentFactor - bm := blockManager{ cfg: cfg, peerChan: make(chan interface{}, MaxPeers*3), @@ -279,6 +298,27 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) { } bm.filterHeaderTipHash = fh.BlockHash() + // If sideload is enabled initialize reader and assign to blockmanager. + if bm.cfg.sideLoad.Enabled { + + reader, err := sideload.NewReader(&sideload.ReaderConfig{ + + SourceType: bm.cfg.sideLoad.SourceType, + Path: bm.cfg.sideLoad.Path, + }) + + log.Infof("Side loading enabled") + + if err == nil { + bm.sideLoadReader = reader + } else { + + bm.cfg.sideLoad.Enabled = false + log.Warnf("Side loading disabled: %v", err) + } + + } + return &bm, nil } @@ -1981,6 +2021,15 @@ func checkCFCheckptSanity(cp map[string][]*chainhash.Hash, func (b *blockManager) blockHandler() { defer b.wg.Done() + // Attempt to sideLoad headers. If sideLoading is not enabled the function + // returns quickly. + b.sideLoadHeaders() + + select { + case <-b.quit: + return + default: + } candidatePeers := list.New() out: for { @@ -2410,16 +2459,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { node := headerlist.Node{Header: *blockHeader} prevNode := prevNodeEl prevHash := prevNode.Header.BlockHash() - if prevHash.IsEqual(&blockHeader.PrevBlock) { - prevNodeHeight := prevNode.Height - prevNodeHeader := prevNode.Header - err := b.checkHeaderSanity( - blockHeader, false, prevNodeHeight, - &prevNodeHeader, - ) + err, passVerification := b.verifyBlockHeader(blockHeader, *prevNode) + if passVerification { if err != nil { - log.Warnf("Header doesn't pass sanity check: "+ - "%s -- disconnecting peer", err) + log.Warnf("%v: "+ + " -- disconnecting peer", err) hmsg.peer.Disconnect() return } @@ -2544,9 +2588,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } err = b.checkHeaderSanity( - reorgHeader, true, + reorgHeader, b.reorgList, int32(prevNodeHeight), prevNodeHeader, ) + if err != nil { log.Warnf("Header doesn't pass sanity"+ " check: %s -- disconnecting "+ @@ -2754,6 +2799,7 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool { } lastHeader = blockHash + } return true @@ -2764,14 +2810,10 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool { // the contextual check and blockchain.CheckBlockHeaderSanity for context-less // checks. func (b *blockManager) checkHeaderSanity(blockHeader *wire.BlockHeader, - reorgAttempt bool, prevNodeHeight int32, + hList headerlist.Chain, prevNodeHeight int32, prevNodeHeader *wire.BlockHeader) error { // Create the lightHeaderCtx for the blockHeader's parent. - hList := b.headerList - if reorgAttempt { - hList = b.reorgList - } parentHeaderCtx := newLightHeaderCtx( prevNodeHeight, prevNodeHeader, b.cfg.BlockHeaders, hList, @@ -3043,4 +3085,186 @@ func (l *lightHeaderCtx) RelativeAncestorCtx( return newLightHeaderCtx( ancestorHeight, ancestor, l.store, l.headerList, ) + +} + +func (b *blockManager) sideLoadHeaders() { + + // If sideloading is not enabled on this chain + // return quickly. + if !b.cfg.sideLoad.Enabled { + return + } + + // If headers contained in the side load source are for a different chain network return + // immediately. + if b.sideLoadReader.HeadersChain() != b.cfg.ChainParams.Net { + + log.Error("headers from side load file are of network %v "+ + "and so incompatible with neutrino's current bitcoin network -- skipping side loading", b.sideLoadReader.HeadersChain()) + + return + } + log.Infof("Side loading headers from %v to %v", b.headerTip, b.sideLoadReader.EndHeight()) + + // Set headerTip to enable reader supply header, node needs + err := b.sideLoadReader.SetHeight(int64(b.headerTip)) + + if err != nil { + log.Errorf("error while setting height for sideload--- skipping sideloading: "+ + "%v", err) + + return + } + headerBatch := make([]headerfs.BlockHeader, 0) + for { + //Request header + header, headerErr := b.sideLoadReader.NextHeader() + // If any error occurs while fetching headers that does not indicate + // an end of file, return immediately. + if headerErr != nil && headerErr != io.EOF { + + log.Errorf("error while fetching headers -- skipping sideloading %v", err) + + return + } + + var ( + node *headerlist.Node + prevNode *headerlist.Node + ) + + // Update node height if header is not nil + if header != nil { + + // Ensure there is a previous header to compare against. + prevNodeEl := b.headerList.Back() + if prevNodeEl == nil { + log.Warnf("side load - Header list does not contain a previous" + + "element as expected -- exiting side load") + + return + } + + node = &headerlist.Node{Header: *header} + prevNode = prevNodeEl + node.Height = prevNode.Height + 1 + } + + if b.cfg.sideLoad.Verify && headerErr == nil { + err, passVerification := b.verifyBlockHeader(header, *prevNode) + if err != nil || !passVerification { + log.Debugf("Side Load- Did not pass verification at height %v"+ + "-- rolling back to last verified checkpoint and skipping sideload", node.Height) + + prevCheckpoint := b.findPreviousHeaderCheckpoint( + node.Height, + ) + + log.Infof("Rolling back to previous validated "+ + "checkpoint at height %d/hash %s", + prevCheckpoint.Height, + prevCheckpoint.Hash) + + err = b.rollBackToHeight(uint32(prevCheckpoint.Height)) + if err != nil { + panic(fmt.Sprintf("Rollback failed: %s", err)) + // Should we panic here? + } + tipHeader, height, err := b.cfg.BlockHeaders.ChainTip() + if err != nil { + return + } + b.headerList.ResetHeaderState(headerlist.Node{ + Height: int32(height), + Header: *tipHeader, + }) + return + } + + } + + // Verify checkpoint only if verification is enabled + if b.nextCheckpoint != nil && headerErr == nil && b.cfg.sideLoad.Verify && + node.Height == b.nextCheckpoint.Height { + + nodeHash := node.Header.BlockHash() + if nodeHash.IsEqual(b.nextCheckpoint.Hash) { + + log.Infof("Verified downloaded block "+ + "header against checkpoint at height "+ + "%d/hash %s", node.Height, nodeHash) + } else { + log.Warnf("Error at checkpoint while side loading headers, exiting"+ + "%d/hash %s", node.Height, nodeHash) + return + } + + } + + if header != nil { + headerBatch = append(headerBatch, headerfs.BlockHeader{ + BlockHeader: header, + Height: uint32(node.Height), + }) + + } + + // Write header if batch is greater than the MaxHeaderWrite or if we have reached the + // end of file. + if len(headerBatch) >= MaxHeaderWrite || headerErr == io.EOF { + err = b.cfg.BlockHeaders.WriteHeaders(headerBatch...) + if err != nil { + log.Warnf("Error writing headers in blockheader store") + return + } + + // Updated header tip so as to enable cfhandler to start fetching cfheaders + + if headerErr == io.EOF { + log.Infof("Successfully completed sideloading") + return + } + b.nextCheckpoint = b.findNextHeaderCheckpoint(node.Height) + b.newHeadersMtx.Lock() + b.headerTip = uint32(node.Height) + b.headerTipHash = node.Header.BlockHash() + b.newHeadersMtx.Unlock() + b.newHeadersSignal.Broadcast() + + b.blkHeaderProgressLogger.LogBlockHeight( + header.Timestamp, node.Height, + ) + + } + + b.headerList.PushBack(*node) + // Check if we are to quit now + select { + case <-b.quit: + return + default: + } + } + +} + +func (b *blockManager) verifyBlockHeader(blockHeader *wire.BlockHeader, prevNode headerlist.Node) (error, bool) { + prevNodeHeader := prevNode.Header + prevHash := prevNode.Header.BlockHash() + prevNodeHeight := prevNode.Height + + if prevHash.IsEqual(&blockHeader.PrevBlock) { + err := b.checkHeaderSanity(blockHeader, b.headerList, prevNodeHeight, &prevNodeHeader) + + if err != nil { + return fmt.Errorf("did not pass sanity check: %v", err), true + } + return nil, true + + } else { + log.Debugf("Not properly connected %v") + return nil, false + } + }