Skip to content

Commit

Permalink
netsync: adding sliding block window implementation
Browse files Browse the repository at this point in the history
implementing maybeUpdateNextNeededBlocks function that populates the sliding window with next blocks whose headers are validated using the PutNextNeededBlocks function.
fetchNextBlocks function creates and sends a request to the provided peer for next blocks to be downloaded based on the current headers.
  • Loading branch information
samay-kothari committed Aug 23, 2022
1 parent b4f83b3 commit 5a00fd9
Showing 1 changed file with 96 additions and 2 deletions.
98 changes: 96 additions & 2 deletions netsync/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2018-2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -28,6 +29,10 @@ const (
// more.
minInFlightBlocks = 10

// maxInFlightBlocks is the maximum number of blocks to allow in the sync
// peer request queue.
maxInFlightBlocks = 16

// maxRejectedTxns is the maximum number of rejected transactions
// hashes to store in memory.
maxRejectedTxns = 1000
Expand Down Expand Up @@ -148,6 +153,8 @@ type headerNode struct {
// peerSyncState stores additional information that the SyncManager tracks
// about a peer.
type peerSyncState struct {
*peerpkg.Peer

syncCandidate bool
requestQueue []*wire.InvVect
requestedTxns map[chainhash.Hash]struct{}
Expand Down Expand Up @@ -204,6 +211,28 @@ type SyncManager struct {
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint

// The following fields are used to track the list of the next blocks to
// download in the branch leading up to the best known header.
//
// nextBlocksHeader is the hash of the best known header when the list was
// last updated.
//
// nextBlocksBuf houses an overall list of blocks needed (up to the size of
// the array) regardless of whether or not they have been requested and
// provides what is effectively a reusable lookahead buffer. Note that
// since it is a fixed size and acts as a backing array, not all entries
// will necessarily refer to valid data, especially once the chain is
// synced. nextNeededBlocks slices into the valid part of the array.
//
// nextNeededBlocks subslices into nextBlocksBuf such that it provides an
// upper bound on the entries of the backing array that are valid and also
// acts as a list of needed blocks that are not already known to be in
// flight.
// nextBlocksHeader chainhash.Hash
nextBlocksHeader chainhash.Hash
nextBlocksBuf [1024]chainhash.Hash
nextNeededBlocks []chainhash.Hash

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
}
Expand Down Expand Up @@ -842,7 +871,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
sm.fetchNextBlocks(peer, sm.peerStates[peer])
}
return
}
Expand Down Expand Up @@ -883,6 +912,71 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

func (sm *SyncManager) maybeUpdateNextNeededBlocks() {

// Update the list if the best known header changed since the last time it
// was updated or it is not empty, is getting short, does not already
// end at the best known header.
chain := sm.chain
bestHeader, _ := chain.BestHeader()
numNeeded := len(sm.nextNeededBlocks)
needsUpdate := sm.nextBlocksHeader != bestHeader || (numNeeded > 0 &&
numNeeded < minInFlightBlocks &&
sm.nextNeededBlocks[numNeeded-1] != bestHeader)
if needsUpdate {
sm.nextNeededBlocks = chain.PutNextNeededBlocks(sm.nextBlocksBuf[:])
sm.nextBlocksHeader = bestHeader
}
}

// fetchNextBlocks creates and sends a request to the provided peer for the next
// blocks to be downloaded based on the current headers.
func (m *SyncManager) fetchNextBlocks(peer *peerpkg.Peer, peerSync *peerSyncState) {
// Nothing to do if the target maximum number of blocks to request from the
// peer at the same time are already in flight.
numInFlight := len(peerSync.requestedBlocks)
if numInFlight >= maxInFlightBlocks {
return
}

// Potentially update the list of the next blocks to download in the branch
// leading up to the best known header.
m.maybeUpdateNextNeededBlocks()

// Build and send a getdata request for the needed blocks.
numNeeded := len(m.nextNeededBlocks)
if numNeeded == 0 {
return
}
maxNeeded := maxInFlightBlocks - numInFlight
if numNeeded > maxNeeded {
numNeeded = maxNeeded
}
gdmsg := wire.NewMsgGetDataSizeHint(uint(numNeeded))
for i := 0; i < numNeeded && len(gdmsg.InvList) < wire.MaxInvPerMsg; i++ {
// The block is either going to be skipped because it has already been
// requested or it will be requested, but in either case, the block is
// no longer needed for future iterations.
hash := &m.nextNeededBlocks[0]
m.nextNeededBlocks = m.nextNeededBlocks[1:]

// Skip blocks that have already been requested. The needed blocks
// might have been updated above thereby potentially repopulating some
// blocks that are still in flight.
if _, ok := m.requestedBlocks[*hash]; ok {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, hash)
m.requestedBlocks[*hash] = struct{}{}
peerSync.requestedBlocks[*hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
if len(gdmsg.InvList) > 0 {
peer.QueueMessage(gdmsg, nil)
}
}

// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
Expand Down Expand Up @@ -1051,7 +1145,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Infof("Received %v block headers: Fetching blocks",
sm.headerList.Len())
sm.progressLogger.SetLastLogTime(time.Now())
sm.fetchHeaderBlocks()
sm.fetchNextBlocks(peer, sm.peerStates[peer])
return
}

Expand Down

0 comments on commit 5a00fd9

Please sign in to comment.