Skip to content

Commit

Permalink
chainio: use errgroup to limit num of goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Dec 6, 2024
1 parent 5f93c89 commit c472db5
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions chainio/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
"golang.org/x/sync/errgroup"
)

// DefaultProcessBlockTimeout is the timeout value used when waiting for one
Expand Down Expand Up @@ -229,34 +230,30 @@ func DispatchSequential(b Blockbeat, consumers []Consumer) error {
// It requires the consumer to finish processing the block within the specified
// time, otherwise a timeout error is returned.
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
// errChans is a map of channels that will be used to receive errors
// returned from notifying the consumers.
errChans := make(map[string]chan error, len(consumers))
eg := &errgroup.Group{}

// Notify each queue in goroutines.
for _, c := range consumers {
// Create a signal chan.
errChan := make(chan error, 1)
errChans[c.Name()] = errChan

// Notify each consumer concurrently.
go func(c Consumer, beat Blockbeat) {
eg.Go(func() error {
// Send the beat to the consumer.
errChan <- notifyAndWait(
b, c, DefaultProcessBlockTimeout,
)
}(c, b)
}
err := notifyAndWait(b, c, DefaultProcessBlockTimeout)

// Exit early if there's no error.
if err == nil {
return nil
}

// Wait for all consumers in each queue to finish.
for name, errChan := range errChans {
err := <-errChan
if err != nil {
b.logger().Errorf("Consumer=%v failed to process "+
"block: %v", name, err)
"block: %v", c.Name(), err)

return err
}
})
}

// Wait for all consumers in each queue to finish.
if err := eg.Wait(); err != nil {
return err
}

return nil
Expand Down

0 comments on commit c472db5

Please sign in to comment.