Skip to content

Commit

Permalink
implementing the sliding window fucntionality for the block download
Browse files Browse the repository at this point in the history
  • Loading branch information
samay-kothari committed Jul 12, 2022
1 parent 0a6cfad commit c201dc8
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 2 deletions.
15 changes: 15 additions & 0 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,21 @@ func (b *BlockChain) BlockHeightByHash(hash *chainhash.Hash) (int32, error) {
return node.height, nil
}

// NodeHeightByHash returns the height of the block with the given hash in the
// main chain. Does not check if the node is on the main chain, so returns height
// of the node which is created by header validation
//
// This function is safe for concurrent access.
func (b *BlockChain) NodeHeightByHash(hash *chainhash.Hash) (int32, error) {
node := b.index.LookupNode(hash)
if node == nil {
str := fmt.Sprintf("block %s is not in the main chain", hash)
return 0, errNotInMainChain(str)
}

return node.height, nil
}

// BlockHashByHeight returns the hash of the block at the given height in the
// main chain.
//
Expand Down
96 changes: 96 additions & 0 deletions blockchain/chainquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2018-2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package blockchain

import "github.com/utreexo/utreexod/chaincfg/chainhash"

// PutNextNeededBlocks populates the provided slice with hashes for the next
// blocks after the current best chain tip that are needed to make progress
// towards the current best known header skipping any blocks that already have
// their data available.
//
// The provided slice will be populated with either as many hashes as it will
// fit per its length or as many hashes it takes to reach best header, whichever
// is smaller.
//
// It returns a sub slice of the provided one with its bounds adjusted to the
// number of entries populated.
//
// This function is safe for concurrent access.
func (b *BlockChain) PutNextNeededBlocks(out []chainhash.Hash) []chainhash.Hash {
// Nothing to do when no results are requested.
maxResults := len(out)
if maxResults == 0 {
return out[:0]
}

b.index.RLock()
defer b.index.RUnlock()

// Populate the provided slice by making use of a sliding window. Note that
// the needed block hashes are populated in forwards order while it is
// necessary to walk the block index backwards to determine them. Further,
// an unknown number of blocks may already have their data and need to be
// skipped, so it's not possible to determine the precise height after the
// fork point to start iterating from. Using a sliding window efficiently
// handles these conditions without needing additional allocations.
//
// The strategy is to initially determine the common ancestor between the
// current best chain tip and the current best known header as the starting
// fork point and move the fork point forward by the window size after
// populating the output slice with all relevant nodes in the window until
// either there are no more results or the desired number of results have
// been populated.
const windowSize = 32
var outputIdx int
var window [windowSize]chainhash.Hash
bestHeader := b.index.bestHeader
fork := b.bestChain.FindFork(bestHeader)
for outputIdx < maxResults && fork != nil && fork != bestHeader {
// Determine the final descendant block on the branch that leads to the
// best known header in this window by clamping the number of
// descendants to consider to the window size.
endNode := bestHeader
numBlocksToConsider := endNode.height - fork.height
if numBlocksToConsider > windowSize {
endNode = endNode.Ancestor(fork.height + windowSize)
}

// Populate the blocks in this window from back to front by walking
// backwards from the final block to consider in the window to the first
// one excluding any blocks that already have their data available.
windowIdx := windowSize
for node := endNode; node != nil && node != fork; node = node.parent {
if node.status.HaveData() {
continue
}

windowIdx--
window[windowIdx] = node.hash
}

// Populate the outputs with as many from the back of the window as
// possible (since the window might not have been fully populated due to
// skipped blocks) and move the output index forward to match.
outputIdx += copy(out[outputIdx:], window[windowIdx:])

// Move the fork point forward to the final block of the window.
fork = endNode
}

return out[:outputIdx]
}

// BestHeader returns the header with the most cumulative work that is NOT
// known to be invalid.
func (b *BlockChain) BestHeader() (chainhash.Hash, int32) {
b.index.RLock()
header := b.index.bestHeader
blockHash := b.index.bestHeader.hash
height, _ := b.NodeHeightByHash(&blockHash)
b.index.RUnlock()
return header.hash, height
}
98 changes: 96 additions & 2 deletions netsync/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2018-2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -28,6 +29,10 @@ const (
// more.
minInFlightBlocks = 10

// maxInFlightBlocks is the maximum number of blocks to allow in the sync
// peer request queue.
maxInFlightBlocks = 16

// maxRejectedTxns is the maximum number of rejected transactions
// hashes to store in memory.
maxRejectedTxns = 1000
Expand Down Expand Up @@ -148,6 +153,8 @@ type headerNode struct {
// peerSyncState stores additional information that the SyncManager tracks
// about a peer.
type peerSyncState struct {
*peerpkg.Peer

syncCandidate bool
requestQueue []*wire.InvVect
requestedTxns map[chainhash.Hash]struct{}
Expand Down Expand Up @@ -204,6 +211,28 @@ type SyncManager struct {
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint

// The following fields are used to track the list of the next blocks to
// download in the branch leading up to the best known header.
//
// nextBlocksHeader is the hash of the best known header when the list was
// last updated.
//
// nextBlocksBuf houses an overall list of blocks needed (up to the size of
// the array) regardless of whether or not they have been requested and
// provides what is effectively a reusable lookahead buffer. Note that
// since it is a fixed size and acts as a backing array, not all entries
// will necessarily refer to valid data, especially once the chain is
// synced. nextNeededBlocks slices into the valid part of the array.
//
// nextNeededBlocks subslices into nextBlocksBuf such that it provides an
// upper bound on the entries of the backing array that are valid and also
// acts as a list of needed blocks that are not already known to be in
// flight.
// nextBlocksHeader chainhash.Hash
nextBlocksHeader chainhash.Hash
nextBlocksBuf [1024]chainhash.Hash
nextNeededBlocks []chainhash.Hash

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
}
Expand Down Expand Up @@ -842,7 +871,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
sm.fetchNextBlocks(peer, sm.peerStates[peer])
}
return
}
Expand Down Expand Up @@ -883,6 +912,71 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

func (sm *SyncManager) maybeUpdateNextNeededBlocks() {

// Update the list if the best known header changed since the last time it
// was updated or it is not empty, is getting short, does not already
// end at the best known header.
chain := sm.chain
bestHeader, _ := chain.BestHeader()
numNeeded := len(sm.nextNeededBlocks)
needsUpdate := sm.nextBlocksHeader != bestHeader || (numNeeded > 0 &&
numNeeded < minInFlightBlocks &&
sm.nextNeededBlocks[numNeeded-1] != bestHeader)
if needsUpdate {
sm.nextNeededBlocks = chain.PutNextNeededBlocks(sm.nextBlocksBuf[:])
sm.nextBlocksHeader = bestHeader
}
}

// fetchNextBlocks creates and sends a request to the provided peer for the next
// blocks to be downloaded based on the current headers.
func (m *SyncManager) fetchNextBlocks(peer *peerpkg.Peer, peerSync *peerSyncState) {
// Nothing to do if the target maximum number of blocks to request from the
// peer at the same time are already in flight.
numInFlight := len(peerSync.requestedBlocks)
if numInFlight >= maxInFlightBlocks {
return
}

// Potentially update the list of the next blocks to download in the branch
// leading up to the best known header.
m.maybeUpdateNextNeededBlocks()

// Build and send a getdata request for the needed blocks.
numNeeded := len(m.nextNeededBlocks)
if numNeeded == 0 {
return
}
maxNeeded := maxInFlightBlocks - numInFlight
if numNeeded > maxNeeded {
numNeeded = maxNeeded
}
gdmsg := wire.NewMsgGetDataSizeHint(uint(numNeeded))
for i := 0; i < numNeeded && len(gdmsg.InvList) < wire.MaxInvPerMsg; i++ {
// The block is either going to be skipped because it has already been
// requested or it will be requested, but in either case, the block is
// no longer needed for future iterations.
hash := &m.nextNeededBlocks[0]
m.nextNeededBlocks = m.nextNeededBlocks[1:]

// Skip blocks that have already been requested. The needed blocks
// might have been updated above thereby potentially repopulating some
// blocks that are still in flight.
if _, ok := m.requestedBlocks[*hash]; ok {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, hash)
m.requestedBlocks[*hash] = struct{}{}
peerSync.requestedBlocks[*hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
if len(gdmsg.InvList) > 0 {
peer.QueueMessage(gdmsg, nil)
}
}

// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
Expand Down Expand Up @@ -1052,7 +1146,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Infof("Received %v block headers: Fetching blocks",
sm.headerList.Len())
sm.progressLogger.SetLastLogTime(time.Now())
sm.fetchHeaderBlocks()
sm.fetchNextBlocks(peer, sm.peerStates[peer])
return
}

Expand Down

0 comments on commit c201dc8

Please sign in to comment.