Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blockbeat #9315

Merged
merged 92 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
6c2e8b9
sweep: add new state `TxFatal` for erroneous sweepings
yyforyongyu Apr 30, 2024
5f64280
sweep: add new error `ErrZeroFeeRateDelta`
yyforyongyu Oct 25, 2024
d0c7fd8
sweep: add new interface method `Immediate`
yyforyongyu Oct 25, 2024
2479dc7
sweep: handle inputs locally instead of relying on the tx
yyforyongyu Oct 25, 2024
77ff2c0
sweep: add `handleInitialBroadcast` to handle initial broadcast
yyforyongyu Oct 25, 2024
719ca5b
sweep: remove redundant error from `Broadcast`
yyforyongyu Apr 30, 2024
ba23896
sweep: add method `handleBumpEventError` and fix `markInputFailed`
yyforyongyu Apr 30, 2024
afc08c6
sweep: add method `isMature` on `SweeperInput`
yyforyongyu Apr 30, 2024
7545bbf
sweep: make sure defaultDeadline is derived from the mature height
yyforyongyu Apr 30, 2024
f0c4e6d
sweep: remove redundant loopvar assign
yyforyongyu Oct 25, 2024
78ce757
sweep: break `initialBroadcast` into two steps
yyforyongyu Nov 7, 2024
30ee450
sweep: make sure nil tx is handled
yyforyongyu Nov 7, 2024
060ff01
chainio: introduce `chainio` to handle block synchronization
yyforyongyu Jun 27, 2024
01ac713
chainio: implement `Blockbeat`
yyforyongyu Jun 27, 2024
a1eb87e
chainio: add helper methods to dispatch beats
yyforyongyu Oct 31, 2024
4b83d87
chainio: add `BlockbeatDispatcher` to dispatch blockbeats
yyforyongyu Jun 27, 2024
b5a3a27
chainio: add partial implementation of `Consumer` interface
yyforyongyu Oct 17, 2024
801fd6b
multi: implement `Consumer` on subsystems
yyforyongyu Oct 29, 2024
e113f39
sweep: remove block subscription in `UtxoSweeper` and `TxPublisher`
yyforyongyu Jun 4, 2024
3ac6752
sweep: remove redundant notifications during shutdown
yyforyongyu Nov 18, 2024
5f9d473
contractcourt: remove `waitForHeight` in resolvers
yyforyongyu Jun 4, 2024
045f843
contractcourt: remove block subscription in chain arbitrator
yyforyongyu Oct 29, 2024
7129553
contractcourt: remove block subscription in channel arbitrator
yyforyongyu Oct 29, 2024
e2e59bd
contractcourt: remove the `immediate` param used in `Resolve`
yyforyongyu Jun 4, 2024
8023530
contractcourt: start channel arbitrator with blockbeat
yyforyongyu Oct 29, 2024
545cea0
multi: start consumers with a starting blockbeat
yyforyongyu Oct 29, 2024
16a8b62
lnd: add new method `startLowLevelServices`
yyforyongyu Oct 17, 2024
8fc9154
lnd: start `blockbeatDispatcher` and register consumers
yyforyongyu Oct 17, 2024
45b243c
contractcourt: fix linter `funlen`
yyforyongyu Oct 29, 2024
1d53e7d
multi: improve loggings
yyforyongyu May 22, 2024
0bab6b3
chainio: use `errgroup` to limit num of goroutines
yyforyongyu Nov 19, 2024
1f2cfc6
contractcourt: add verbose logging in resolvers
yyforyongyu Jun 20, 2024
10e5a43
contractcourt: add spend path helpers in timeout/success resolver
yyforyongyu Nov 13, 2024
fb499bc
contractcourt: add sweep senders in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
c92d7f0
contractcourt: add resolver handlers in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
cb18940
contractcourt: remove redundant return value in `claimCleanUp`
yyforyongyu Nov 14, 2024
bfc95b8
contractcourt: add sweep senders in `htlcTimeoutResolver`
yyforyongyu Nov 14, 2024
7083302
contractcourt: add methods to checkpoint states
yyforyongyu Jul 16, 2024
730b605
contractcourt: add resolve handlers in `htlcTimeoutResolver`
yyforyongyu Jul 16, 2024
a987634
contractcourt: add `Launch` method to anchor/breach resolver
yyforyongyu Jun 24, 2024
913f5d4
contractcourt: add `Launch` method to commit resolver
yyforyongyu Jun 20, 2024
cf105e6
contractcourt: add `Launch` method to htlc success resolver
yyforyongyu Jul 15, 2024
71aec7b
contractcourt: add `Launch` method to htlc timeout resolver
yyforyongyu Jul 16, 2024
025d787
invoices: exit early when the subscriber chan is nil
yyforyongyu Nov 17, 2024
ef98c52
contractcourt: add `Launch` method to incoming contest resolver
yyforyongyu Nov 17, 2024
4772229
contractcourt: add `Launch` method to outgoing contest resolver
yyforyongyu Jun 20, 2024
4f5ccb8
contractcourt: fix concurrent access to `resolved`
yyforyongyu Jul 10, 2024
d2e81a1
contractcourt: fix concurrent access to `launched`
yyforyongyu Jul 11, 2024
819c15f
contractcourt: break `launchResolvers` into two steps
yyforyongyu Jun 25, 2024
63aa5aa
contractcourt: offer outgoing htlc one block earlier before its expiry
yyforyongyu Nov 25, 2024
07cb3ae
contractcourt: implement `Consumer` on `chainWatcher`
yyforyongyu Jun 20, 2024
c1a9390
contractcourt: register spend notification during init
yyforyongyu Nov 16, 2024
4e30598
contractcourt: add method `handleCommitSpend`
yyforyongyu Nov 16, 2024
8237598
contractcourt: handle blockbeat in `chainWatcher`
yyforyongyu Nov 16, 2024
3822c23
contractcourt: notify blockbeat for `chainWatcher`
yyforyongyu Nov 16, 2024
4d76566
contractcourt: use close height instead of best height
yyforyongyu Dec 3, 2024
6eb9bb1
multi: add new method `ChainArbitrator.RedispatchBlockbeat`
yyforyongyu Nov 13, 2024
c5b3033
contractcourt: add close event handlers in `ChannelArbitrator`
yyforyongyu Nov 16, 2024
a6d3a0f
contractcourt: process channel close event on new beat
yyforyongyu Nov 13, 2024
ea7d6a5
contractcourt: register conf notification once and cancel when confirmed
yyforyongyu Nov 22, 2024
cc60d2b
chainntnfs: skip dispatched conf details
yyforyongyu Nov 25, 2024
bd88948
docs: add release notes for `blockbeat` series
yyforyongyu Nov 25, 2024
fecd5ac
contractcourt: make sure `launchResolvers` is called on new blockbeat
yyforyongyu Dec 18, 2024
4806b2f
multi: optimize loggings around changes from `blockbeat`
yyforyongyu Oct 17, 2024
40ac04a
lntest+itest: fix `testSweepCPFPAnchorOutgoingTimeout`
yyforyongyu Oct 24, 2024
cacf222
itest: fix `testSweepCPFPAnchorIncomingTimeout`
yyforyongyu Oct 24, 2024
d260a87
itest: fix `testSweepHTLCs`
yyforyongyu Oct 24, 2024
1aeea8a
itest: fix `testSweepCommitOutputAndAnchor`
yyforyongyu Oct 24, 2024
0778009
itest: fix `testBumpForceCloseFee`
yyforyongyu Oct 17, 2024
e45005b
itest: fix `testPaymentSucceededHTLCRemoteSwept`
yyforyongyu Oct 24, 2024
9ab9cd5
lntest+itest: start flattening the multi-hop tests
yyforyongyu Oct 17, 2024
bc31979
itest: simplify and flatten `testMultiHopReceiverChainClaim`
yyforyongyu Oct 18, 2024
bef17f1
lntest+itest: flatten `testMultiHopLocalForceCloseOnChainHtlcTimeout`
yyforyongyu Oct 18, 2024
d7b2025
lntest+itest: flatten `testMultiHopRemoteForceCloseOnChainHtlcTimeout`
yyforyongyu Oct 19, 2024
8dd73a0
itest: flatten `testMultiHopHtlcLocalChainClaim`
yyforyongyu Oct 21, 2024
52e6fb1
itest: flatten `testMultiHopHtlcRemoteChainClaim`
yyforyongyu Oct 22, 2024
f95e64f
itest: flatten `testMultiHopHtlcAggregation`
yyforyongyu Oct 23, 2024
34951a6
itest: flatten `testHtlcTimeoutResolverExtractPreimageLocal`
yyforyongyu Oct 23, 2024
2f02567
itest: flatten `testHtlcTimeoutResolverExtractPreimageRemote`
yyforyongyu Oct 23, 2024
2adb356
itest: rename file to reflect the tests
yyforyongyu Oct 23, 2024
e1d5bbf
itest: remove unnecessary force close
yyforyongyu Oct 23, 2024
2494f1e
itest: remove redundant block mining in `testFailingChannel`
yyforyongyu Oct 24, 2024
22b9350
itest: remove redunant block mining in `testChannelFundingWithUnstabl…
yyforyongyu Oct 24, 2024
a55408d
itest: remove redudant block in `testPsbtChanFundingWithUnstableUtxos`
yyforyongyu Oct 24, 2024
9d4a60d
itest: remove redundant blocks in channel backup tests
yyforyongyu Oct 24, 2024
b7feeba
itest+lntest: fix channel force close test
yyforyongyu Jun 29, 2024
9c0c373
itest: flatten and fix `testWatchtower`
yyforyongyu Oct 25, 2024
425877e
itest: remove redundant block in multiple tests
yyforyongyu Oct 25, 2024
36a87ad
itest: assert payment status after sending
yyforyongyu Oct 24, 2024
c4a6abb
lntest+itest: remove the usage of `ht.AssertActiveHtlcs`
yyforyongyu Nov 5, 2024
a1bd894
lntest+itest: export `DeriveFundingShim`
yyforyongyu Dec 20, 2024
ecd82a3
contractcourt: include custom records on replayed htlc
yyforyongyu Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions chainio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Chainio

`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.

The main components include:

- `Blockbeat`: An interface that provides information about the block.

- `Consumer`: An interface that specifies how subsystems handle the blockbeat.

- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.

Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.


### Register a Consumer

Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.

To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```

To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```

### Implement the `Consumer` Interface

Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.

To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.

```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

chainio.BeatConsumer

...
```

We should also remember to initialize this `BeatConsumer`,

```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```

Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `tp.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.

```go
for {
select {
case beat := <-tp.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.

// Notify we've processed the block.
tp.NotifyBlockProcessed(beat, nil)

...
```

### Existing Queues

Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.

```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher

note left of bb: 0. received block x,<br>dispatching...

note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```
54 changes: 54 additions & 0 deletions chainio/blockbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package chainio

import (
"fmt"

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch

// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}

// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)

// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}

// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
b.log = clog.WithPrefix(logPrefix)

return b
}

// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}

// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}
28 changes: 28 additions & 0 deletions chainio/blockbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package chainio

import (
"errors"
"testing"

"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)

var errDummy = errors.New("dummy error")

// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()

// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}

// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)

// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}
113 changes: 113 additions & 0 deletions chainio/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package chainio

// BeatConsumer defines a supplementary component that should be used by
// subsystems which implement the `Consumer` interface. It partially implements
// the `Consumer` interface by providing the method `ProcessBlock` such that
// subsystems don't need to re-implement it.
//
// While inheritance is not commonly used in Go, subsystems embedding this
// struct cannot pass the interface check for `Consumer` because the `Name`
// method is not implemented, which gives us a "mortise and tenon" structure.
// In addition to reducing code duplication, this design allows `ProcessBlock`
// to work on the concrete type `Beat` to access its internal states.
type BeatConsumer struct {
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
// received block contains the best known height and the txns confirmed
// in this block.
BlockbeatChan chan Blockbeat

// name is the name of the consumer which embeds the BlockConsumer.
name string

// quit is a channel that closes when the BlockConsumer is shutting
// down.
//
// NOTE: this quit channel should be mounted to the same quit channel
// used by the subsystem.
quit chan struct{}

// errChan is a buffered chan that receives an error returned from
// processing this block.
errChan chan error
}

// NewBeatConsumer creates a new BlockConsumer.
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
// Refuse to start `lnd` if the quit channel is not initialized. We
// treat this case as if we are facing a nil pointer dereference, as
// there's no point to return an error here, which will cause the node
// to fail to be started anyway.
if quit == nil {
panic("quit channel is nil")
}

b := BeatConsumer{
BlockbeatChan: make(chan Blockbeat),
name: name,
errChan: make(chan error, 1),
quit: quit,
}

return b
}

// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
// channel. It will send it to the subsystem's BlockbeatChan, and block until
// the processed result is received from the subsystem. The subsystem must call
// `NotifyBlockProcessed` after it has finished processing the block.
//
// NOTE: part of the `chainio.Consumer` interface.
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
// Update the current height.
beat.logger().Tracef("set current height for [%s]", b.name)

select {
// Send the beat to the blockbeat channel. It's expected that the
// consumer will read from this channel and process the block. Once
// processed, it should return the error or nil to the beat.Err chan.
case b.BlockbeatChan <- beat:
beat.logger().Tracef("Sent blockbeat to [%s]", b.name)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before sending "+
"beat", b.name)

return nil
}

// Check the consumer's err chan. We expect the consumer to call
// `beat.NotifyBlockProcessed` to send the error back here.
select {
case err := <-b.errChan:
beat.logger().Tracef("[%s] processed beat: err=%v", b.name, err)

return err

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown", b.name)
}

return nil
}

// NotifyBlockProcessed signals that the block has been processed. It takes the
// blockbeat being processed and an error resulted from processing it. This
// error is then sent back to the consumer's err chan to unblock
// `ProcessBlock`.
//
// NOTE: This method must be called by the subsystem after it has finished
// processing the block.
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
// Update the current height.
beat.logger().Tracef("[%s]: notifying beat processed", b.name)

select {
case b.errChan <- err:
beat.logger().Tracef("[%s]: notified beat processed, err=%v",
b.name, err)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before notifying "+
"beat processed", b.name)
}
}
Loading
Loading