Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use CommonService in peerConnector #737

Merged
merged 7 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 58 additions & 79 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -17,8 +18,6 @@ import (
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"

"sync/atomic"

"go.uber.org/zap"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -27,22 +26,17 @@ import (
// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex

cache *lru.TwoQueueCache
host host.Host
pm *PeerManager
cancel context.CancelFunc

paused atomic.Bool

wg sync.WaitGroup
dialTimeout time.Duration
dialCh chan peer.AddrInfo
mux sync.Mutex
cache *lru.TwoQueueCache
host host.Host
pm *PeerManager

paused atomic.Bool
dialTimeout time.Duration
*CommonDiscoveryService
subscriptions []<-chan PeerData

backoff backoff.BackoffFactory
mux sync.Mutex
logger *zap.Logger
}

Expand All @@ -69,12 +63,12 @@ func NewPeerConnectionStrategy(pm *PeerManager,
}
//
pc := &PeerConnectionStrategy{
cache: cache,
wg: sync.WaitGroup{},
dialTimeout: dialTimeout,
pm: pm,
backoff: getBackOff(),
logger: logger.Named("discovery-connector"),
cache: cache,
dialTimeout: dialTimeout,
CommonDiscoveryService: NewCommonDiscoveryService(),
pm: pm,
backoff: getBackOff(),
logger: logger.Named("discovery-connector"),
}
pm.SetPeerConnector(pc)
return pc, nil
Expand All @@ -87,36 +81,40 @@ type connCacheData struct {

// Subscribe receives channels on which discovered peers should be pushed
func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) {
if c.cancel != nil {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.consumeSubscription(ctx, ch)
}()
} else {
// if not running yet, store the subscription and return
if err := c.ErrOnNotRunning(); err != nil {
c.mux.Lock()
c.subscriptions = append(c.subscriptions, ch)
c.mux.Unlock()
return
}
// if running start a goroutine to consume the subscription
c.WaitGroup().Add(1)
go func() {
defer c.WaitGroup().Done()
c.consumeSubscription(ch)
}()
}

func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) {
func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
for {
// for returning from the loop when peerConnector is paused.
select {
case <-ctx.Done():
case <-c.Context().Done():
return
default:
}
//
if !c.isPaused() {
select {
case <-ctx.Done():
case <-c.Context().Done():
return
case p, ok := <-ch:
if !ok {
return
}
c.pm.AddDiscoveredPeer(p)
c.publishWork(ctx, p.AddrInfo)
c.PushToChan(p)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
Expand All @@ -135,48 +133,36 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
}

ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.dialCh = make(chan peer.AddrInfo)
return c.CommonDiscoveryService.Start(ctx, c.start)

c.wg.Add(2)
go c.shouldDialPeers(ctx)
go c.dialPeers(ctx)
}
func (c *PeerConnectionStrategy) start() error {
c.WaitGroup().Add(2)
go c.shouldDialPeers()
go c.dialPeers()

c.consumeSubscriptions(ctx)
c.consumeSubscriptions()

return nil
}

// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
}

c.cancel()
c.cancel = nil
c.wg.Wait()

close(c.dialCh)
c.CommonDiscoveryService.Stop(func() {})
}

func (c *PeerConnectionStrategy) isPaused() bool {
return c.paused.Load()
}

func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
defer c.wg.Done()
func (c *PeerConnectionStrategy) shouldDialPeers() {
defer c.WaitGroup().Done()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case <-c.Context().Done():
return
case <-ticker.C:
_, outRelayPeers := c.pm.getRelayPeers()
Expand All @@ -186,25 +172,17 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
}

// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) {
func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
c.wg.Add(1)
c.WaitGroup().Add(1)
go func(s <-chan PeerData) {
defer c.wg.Done()
c.consumeSubscription(ctx, s)
defer c.WaitGroup().Done()
c.consumeSubscription(s)
}(subs)
}
c.subscriptions = nil
}

func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:
case <-ctx.Done():
return
}
}

const maxActiveDials = 5

// c.cache is thread safe
Expand All @@ -230,8 +208,8 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
return true
}

func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
func (c *PeerConnectionStrategy) dialPeers() {
defer c.WaitGroup().Done()

maxGoRoutines := c.pm.OutRelayPeersTarget
if maxGoRoutines > maxActiveDials {
Expand All @@ -242,30 +220,31 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {

for {
select {
case pi, ok := <-c.dialCh:
case pd, ok := <-c.GetListeningChan():
if !ok {
return
}
addrInfo := pd.AddrInfo

if pi.ID == c.host.ID() || pi.ID == "" ||
c.host.Network().Connectedness(pi.ID) == network.Connected {
if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
c.host.Network().Connectedness(addrInfo.ID) == network.Connected {
continue
}

if c.canDialPeer(pi) {
if c.canDialPeer(addrInfo) {
sem <- struct{}{}
c.wg.Add(1)
go c.dialPeer(ctx, pi, sem)
c.WaitGroup().Add(1)
go c.dialPeer(addrInfo, sem)
}
case <-ctx.Done():
case <-c.Context().Done():
return
}
}
}

func (c *PeerConnectionStrategy) dialPeer(ctx context.Context, pi peer.AddrInfo, sem chan struct{}) {
defer c.wg.Done()
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer c.WaitGroup().Done()
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
Expand Down
16 changes: 11 additions & 5 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,19 @@ func (pm *PeerManager) connectToRelayPeers() {
} //Else: Should we raise some sort of unhealthy event??
}

func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData {
return PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: host.Peerstore().Addrs(peerID),
},
}
}
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerInfo := peer.AddrInfo{
ID: peerID,
Addrs: pm.host.Peerstore().Addrs(peerID),
}
pm.peerConnector.publishWork(pm.ctx, peerInfo)
peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host)
pm.peerConnector.PushToChan(peerData)
}
}

Expand Down