Skip to content

Commit

Permalink
Implement Chain Exchange protocol over pubsub
Browse files Browse the repository at this point in the history
Implement chain exchange protocol over pubsub as a mechanism to
propagate `ECChain` across the network with reasonable spam protection.

To protect against spam the mechanism employs two separate caches for
chains that are generally discovered across the network and the ones
explicitly looked up or broadcasted by the local node. Both caches are
capped LRU, where the LRU recent-ness is used as a way to prioritise
chains we cache while keeping the total memory footprint fixed. This
approach is not the most memory efficient but is simpler to implement
as the LRU encapsulates a lot of the complexity.

The code has a lot of TODOs as places to improve or question to the
reviewer. To action most of the TODOs further refactoring across the
code is needed which is intended to be actioned in separate commits.

The code path introduced here is not integrated into F3 host; future PRs
will iteratively integrate the mechanism across F3 host and other
places.

Part of #792
  • Loading branch information
masih committed Dec 19, 2024
1 parent d4f3a0c commit 58ce9f1
Show file tree
Hide file tree
Showing 7 changed files with 688 additions and 0 deletions.
135 changes: 135 additions & 0 deletions chainexchange/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions chainexchange/chainexchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package chainexchange

import (
"context"

"github.com/filecoin-project/go-f3/gpbft"
)

type Key []byte

type Keyer interface {
Key(gpbft.ECChain) Key
}

type Message struct {
Instance uint64
Chain gpbft.ECChain
}

type ChainExchange interface {
Keyer
Broadcast(context.Context, Message) error
GetChainByInstance(context.Context, uint64, Key) (gpbft.ECChain, bool)
RemoveChainsByInstance(context.Context, uint64) error
}

func (k Key) IsZero() bool { return len(k) == 0 }
134 changes: 134 additions & 0 deletions chainexchange/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package chainexchange

import (
"errors"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

type Option func(*options) error

type options struct {
topicName string
topicScoreParams *pubsub.TopicScoreParams
subscriptionBufferSize int
pubsub *pubsub.PubSub
progress gpbft.Progress
maxChainLength int
maxInstanceLookahead uint64
maxDiscoveredChainsPerInstance int
maxWantedChainsPerInstance int
}

func newOptions(o ...Option) (*options, error) {
opts := &options{
topicScoreParams: psutil.PubsubTopicScoreParams,
subscriptionBufferSize: 32,
maxChainLength: gpbft.ChainMaxLen,
maxInstanceLookahead: manifest.DefaultCommitteeLookback,
maxDiscoveredChainsPerInstance: 1000,
maxWantedChainsPerInstance: 1000,
}
for _, apply := range o {
if err := apply(opts); err != nil {
return nil, err
}
}
if opts.progress == nil {
return nil, errors.New("gpbft progress must be set")
}
if opts.pubsub == nil {
return nil, errors.New("pubsub must be set")
}
if opts.topicName == "" {
return nil, errors.New("topic name must be set")
}
return opts, nil
}

func WithTopicName(name string) Option {
return func(o *options) error {
if name == "" {
return errors.New("topic name cannot be empty")
}
o.topicName = name
return nil
}
}

func WithTopicScoreParams(params *pubsub.TopicScoreParams) Option {
return func(o *options) error {
o.topicScoreParams = params
return nil
}
}

func WithSubscriptionBufferSize(size int) Option {
return func(o *options) error {
if size < 1 {
return errors.New("subscription buffer size must be at least 1")
}
o.subscriptionBufferSize = size
return nil
}
}

func WithPubSub(pubsub *pubsub.PubSub) Option {
return func(o *options) error {
if pubsub == nil {
return errors.New("pubsub cannot be nil")
}
o.pubsub = pubsub
return nil
}
}

func WithProgress(progress gpbft.Progress) Option {
return func(o *options) error {
if progress == nil {
return errors.New("progress cannot be nil")
}
o.progress = progress
return nil
}
}

func WithMaxChainLength(length int) Option {
return func(o *options) error {
if length < 1 {
return errors.New("max chain length must be at least 1")
}
o.maxChainLength = length
return nil
}
}

func WithMaxInstanceLookahead(lookahead uint64) Option {
return func(o *options) error {
o.maxInstanceLookahead = lookahead
return nil
}
}

func WithMaxDiscoveredChainsPerInstance(max int) Option {
return func(o *options) error {
if max < 1 {
return errors.New("max discovered chains per instance must be at least 1")
}
o.maxDiscoveredChainsPerInstance = max
return nil
}
}

func WithMaxWantedChainsPerInstance(max int) Option {
return func(o *options) error {
if max < 1 {
return errors.New("max wanted chains per instance must be at least 1")
}
o.maxWantedChainsPerInstance = max
return nil
}
}
Loading

0 comments on commit 58ce9f1

Please sign in to comment.