Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes for optimizing filter ping #1102

Merged
merged 13 commits into from
May 22, 2024
Merged
18 changes: 17 additions & 1 deletion tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/waku-org/go-waku/waku/v2/protocol"
"io"
"math"
"math/big"
Expand All @@ -22,6 +21,8 @@ import (
"time"
"unicode/utf8"

"github.com/waku-org/go-waku/waku/v2/protocol"

gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand All @@ -48,6 +49,21 @@ func GetHostAddress(ha host.Host) multiaddr.Multiaddr {
return ha.Addrs()[0]
}

// Returns a full multiaddr of host appended by peerID
func GetAddr(h host.Host) multiaddr.Multiaddr {
id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String()))
var selectedAddr multiaddr.Multiaddr
//For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses.
for _, addr := range h.Network().ListenAddresses() {
if strings.Contains(addr.String(), "p2p-circuit") {
continue
}
selectedAddr = addr
break
}
return selectedAddr.Encapsulate(id)
}

// FindFreePort returns an available port number
func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()
Expand Down
140 changes: 42 additions & 98 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package api
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

const FilterPingTimeout = 5 * time.Second
Expand Down Expand Up @@ -39,6 +37,7 @@ type Sub struct {
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
closing chan string
}

// Subscribe
Expand All @@ -53,37 +52,40 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.log = log.Named("filter-api")
sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter))
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)

sub.closing = make(chan string, config.MaxPeers)
if err != nil {
return nil, err
}
sub.multiplex(subs)
go sub.healthCheckLoop()
go sub.waitOnSubClose()
return sub, nil
}

func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()

}

func (apiSub *Sub) healthCheckLoop() {
// Health checks
ticker := time.NewTicker(FilterPingTimeout)
defer ticker.Stop()
func (apiSub *Sub) waitOnSubClose() {
for {
select {
case <-apiSub.ctx.Done():
apiSub.log.Debug("healthCheckLoop: Done()")
apiSub.log.Debug("apiSub context: Done()")
apiSub.cleanup()
return
case <-ticker.C:
apiSub.log.Debug("healthCheckLoop: checkAliveness()")
topicCounts := apiSub.getTopicCounts()
apiSub.resubscribe(topicCounts)
case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription.
apiSub.closeAndResubscribe(subId)
}
}
}

func (apiSub *Sub) closeAndResubscribe(subId string) {
apiSub.log.Debug("sub closeAndResubscribe", zap.String("subID", subId))

apiSub.subs[subId].Close()
failedPeer := apiSub.subs[subId].PeerID
delete(apiSub.subs, subId)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
apiSub.resubscribe(failedPeer)
}

func (apiSub *Sub) cleanup() {
Expand All @@ -93,6 +95,7 @@ func (apiSub *Sub) cleanup() {
}()

for _, s := range apiSub.subs {
close(s.Closing)
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
Expand All @@ -103,110 +106,45 @@ func (apiSub *Sub) cleanup() {

}

// Returns active sub counts for each pubsub topic
func (apiSub *Sub) getTopicCounts() map[string]int {
// Buffered chan for sub aliveness results
type CheckResult struct {
sub *subscription.SubscriptionDetails
alive bool
}
checkResults := make(chan CheckResult, len(apiSub.subs))
var wg sync.WaitGroup

// Run pings asynchronously
for _, s := range apiSub.subs {
wg.Add(1)
go func(sub *subscription.SubscriptionDetails) {
defer wg.Done()
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout)
defer cancelFunc()
err := apiSub.wf.IsSubscriptionAlive(ctx, sub)

apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil))
checkResults <- CheckResult{sub, err == nil}
}(s)
}

// Collect healthy topic counts
topicCounts := make(map[string]int)

topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
topicCounts[t] = 0
}
wg.Wait()
close(checkResults)
for s := range checkResults {
if !s.alive {
// Close inactive subs
s.sub.Close()
delete(apiSub.subs, s.sub.ID)
} else {
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}
}

return topicCounts
}

// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(topicCounts map[string]int) {

// Delete healthy topics
for t, cnt := range topicCounts {
if cnt == apiSub.Config.MaxPeers {
delete(topicCounts, t)
}
func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
// Re-subscribe asynchronously
existingSubCount := len(apiSub.subs)
apiSub.log.Debug("subscribing again", zap.Stringer("contentFilter", apiSub.ContentFilter), zap.Int("numPeers", apiSub.Config.MaxPeers-existingSubCount))
var peersToExclude peer.IDSlice
peersToExclude = append(peersToExclude, failedPeer)
for _, sub := range apiSub.subs {
peersToExclude = append(peersToExclude, sub.PeerID)
}

if len(topicCounts) == 0 {
// All topics healthy, return
subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...)
if err != nil {
return
}
var wg sync.WaitGroup
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.

// Re-subscribe asynchronously
newSubs := make(chan []*subscription.SubscriptionDetails)

for t, cnt := range topicCounts {
cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics}
wg.Add(1)
go func(count int) {
defer wg.Done()
subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count)
if err != nil {
return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.
newSubs <- subs
}(cnt)
}
wg.Wait()
close(newSubs)
apiSub.log.Debug("resubscribe(): before range newSubs")
for subs := range newSubs {
if subs != nil {
apiSub.multiplex(subs)
}
}
apiSub.log.Debug("checkAliveness(): close(newSubs)")
//close(newSubs)

apiSub.multiplex(subs)
}

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) {
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peersToExclude", zap.Stringer("peersToExclude", peersToExclude[0]))
options = append(options, filter.WithPeersToExclude(peersToExclude...))
}
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)

if err != nil {
if len(subs) > 0 {
// Partial Failure, for now proceed as we don't expect this to happen wrt specific topics.
// Rather it can happen in case subscription with one of the peer fails.
// This can further get automatically handled at resubscribe,
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err))
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err), zap.Int("successCount", len(subs)))
return subs, nil
}
// In case of complete subscription failure, application or user needs to handle and probably retry based on error
Expand All @@ -229,5 +167,11 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
<-subDetails.Closing
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))

apiSub.closing <- subDetails.ID
}(subDetails)
}
}
26 changes: 23 additions & 3 deletions waku/v2/api/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func (s *FilterApiTestSuite) TestSubscribe() {
// We have one full node already created in SetupTest(),
// create another one
fullNodeData2 := s.GetWakuFilterFullNode(s.TestTopic, true)
s.ConnectHosts(s.LightNodeHost, fullNodeData2.FullNodeHost)
s.ConnectToFullNode(s.LightNode, fullNodeData2.FullNode)
//s.ConnectHosts(s.FullNodeHost, fullNodeData2.FullNodeHost)
peers := []peer.ID{s.FullNodeHost.ID(), fullNodeData2.FullNodeHost.ID()}
s.Log.Info("FullNodeHost IDs:", zap.Any("peers", peers))
// Make sure IDs are different
s.Require().True(peers[0] != peers[1])
apiConfig := FilterConfig{MaxPeers: 2, Peers: peers}
//s.Require().True(peers[0] != peers[1])
apiConfig := FilterConfig{MaxPeers: 2}

s.Require().Equal(apiConfig.MaxPeers, 2)
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
Expand All @@ -68,7 +69,26 @@ func (s *FilterApiTestSuite) TestSubscribe() {
}
s.Require().Equal(cnt, 1)

//Verify HealthCheck
subs := s.LightNode.Subscriptions()
s.Require().Equal(2, len(subs))

s.Log.Info("stopping full node", zap.Stringer("id", fullNodeData2.FullNodeHost.ID()))
fullNodeData3 := s.GetWakuFilterFullNode(s.TestTopic, true)

s.ConnectToFullNode(s.LightNode, fullNodeData3.FullNode)

fullNodeData2.FullNode.Stop()
fullNodeData2.FullNodeHost.Close()
time.Sleep(2 * time.Second)
subs = s.LightNode.Subscriptions()

s.Require().Equal(2, len(subs))

for _, sub := range subs {
s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID)
}

apiSub.Unsubscribe()
for range apiSub.DataCh {
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee

//Need to filter peers to check if they support relay
if inPeers.Len() != 0 {
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200)
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200)
}
if outPeers.Len() != 0 {
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200)
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200)
}
return
}
Expand Down
Loading
Loading