Skip to content

Commit

Permalink
Change UnsubscribeWithSubscription so that it's single sub-specific
Browse files Browse the repository at this point in the history
Also merge FilterSubscribe and FilterUnsubscribe options/params
  • Loading branch information
Vitaliy Vlasov committed Sep 21, 2023
1 parent 003c90f commit 897a4d8
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 127 deletions.
104 changes: 57 additions & 47 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}

type WakuFilterPushResult struct {
Err error
PeerID peer.ID
Expand Down Expand Up @@ -149,7 +153,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
} else {
pubSubTopic = *messagePush.PubsubTopic
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) {
if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
Expand Down Expand Up @@ -304,20 +308,14 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
var cFilter ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter)
if existingSub != nil {
subscriptions = append(subscriptions, existingSub)
} else {
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe for conentTopics ",
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe for contentTopics ",
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
}

if len(failedContentTopics) > 0 {
Expand All @@ -335,15 +333,15 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
return nil, err
}

if !wf.subscriptions.Has(peerID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()...) {
if !wf.subscriptions.Has(peerID, contentFilter) {
return nil, errors.New("subscription does not exist")
}

return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
}

func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters)
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
params := new(FilterSubscribeParameters)
params.log = wf.log
opts = append(DefaultUnsubscribeOptions(), opts...)
for _, opt := range opts {
Expand Down Expand Up @@ -418,21 +416,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID)
} else {
subscriptionDetailList[subscriptionDetailID] = subscriptionDetail
subscriptionDetail.closeC()
}
}

if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic)
} else {
wf.subscriptions.items[peerID].subsPerPubsubTopic[contentFilter.PubsubTopic] = subscriptionDetailList
}

}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -456,12 +451,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
if err != nil {
return nil, err
}

resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
for pTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
cFilter.PubsubTopic = pTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
cFilter := NewContentFilter(pTopic, cTopics...)
wf.log.Warn("cfilter", zap.Any("cf", cFilter))
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
Expand All @@ -487,21 +480,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
params.wg.Done()
}
}()

err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE,
cFilter)
if err != nil {
ferr, ok := err.(*FilterError)
if ok && ferr.Code == http.StatusNotFound {
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err))
} else {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
return
}
}
err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter)

if params.wg != nil {
resultChan <- WakuFilterPushResult{
Expand All @@ -521,20 +500,51 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
return resultChan, nil
}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
// UnsubscribeWithSubscription is used to close a particular subscription
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return nil, err
}

opts = append(opts, Peer(sub.PeerID))
opts = append(opts, WithPeer(sub.PeerID))

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This definition of opts is never used.

// Close this sub
sub.Close()

resultChan := make(chan WakuFilterPushResult, 1)
var err error
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair
err = wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: sub.PeerID, requestID: protocol.GenerateRequestID()}, sub.ContentFilter)
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: sub.PeerID,
}
}
close(resultChan)
return resultChan, err

}

func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error {
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
if err != nil {
ferr, ok := err.(*FilterError)
if ok && ferr.Code == http.StatusNotFound {
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
} else {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
}
}

return wf.Unsubscribe(ctx, sub.ContentFilter, opts...)
return err
}

func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -590,7 +600,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
}

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down
49 changes: 42 additions & 7 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

func TestFilterSuite(t *testing.T) {
Expand Down Expand Up @@ -110,7 +109,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
defer s.wg.Done()
select {
case env := <-ch:
s.Require().Equal(maps.Keys(s.contentFilter.ContentTopics)[0], env.Message().GetContentTopic())
s.Require().Equal(s.contentFilter.ContentTopicsList()[0], env.Message().GetContentTopic())
case <-time.After(5 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
Expand All @@ -128,8 +127,10 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
go func() {
defer s.wg.Done()
select {
case <-ch:
s.Require().Fail("should not receive another message")
case _, ok := <-ch:
if ok {
s.Require().Fail("should not receive another message")
}
case <-time.After(1 * time.Second):
// Timeout elapsed, all good
case <-s.ctx.Done():
Expand Down Expand Up @@ -216,10 +217,44 @@ func (s *FilterTestSuite) TestWakuFilter() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)

_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)

time.Sleep(1 * time.Second)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)

// Two new subscriptions with same [peer, contentFilter]
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Assert that we have 2 subscriptions now
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)

// Should be received on both subscriptions
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)

s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, secondSub[0].C)

// Unsubscribe from second sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
s.Require().NoError(err)

// Should still receive
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)

// Unsubscribe from first sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
s.Require().NoError(err)

s.Require().Equal(len(s.lightNode.Subscriptions()), 0)

// Should not receive after unsubscribe
s.waitForTimeout(func() {
Expand Down Expand Up @@ -441,7 +476,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)

_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)

time.Sleep(1 * time.Second)
Expand Down
57 changes: 17 additions & 40 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@ import (

type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
pm *peermanager.PeerManager
requestID []byte
log *zap.Logger
}

FilterUnsubscribeParameters struct {
// Subscribe-specific
host host.Host
pm *peermanager.PeerManager

// Unsubscribe-specific
unsubscribeAll bool
selectedPeer peer.ID
requestID []byte
log *zap.Logger
wg *sync.WaitGroup
}

Expand All @@ -37,8 +35,7 @@ type (

Option func(*FilterParameters)

FilterSubscribeOption func(*FilterSubscribeParameters)
FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
FilterSubscribeOption func(*FilterSubscribeParameters)
)

func WithTimeout(timeout time.Duration) Option {
Expand Down Expand Up @@ -89,7 +86,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi
}

// WithRequestID is an option to set a specific request ID to be used when
// creating a filter subscription
// creating/removing a filter subscription
func WithRequestID(requestID []byte) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.requestID = requestID
Expand All @@ -111,51 +108,31 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption {
}
}

func UnsubscribeAll() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
func UnsubscribeAll() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.unsubscribeAll = true
}
}

func Peer(p peer.ID) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.selectedPeer = p
}
}

// RequestID is an option to set a specific request ID to be used when
// removing a subscription from a filter node
func RequestID(requestID []byte) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.requestID = requestID
}
}

func AutomaticRequestID() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.requestID = protocol.GenerateRequestID()
}
}

// WithWaitGroup allos specigying a waitgroup to wait until all
// WithWaitGroup allows specifying a waitgroup to wait until all
// unsubscribe requests are complete before the function is complete
func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.wg = wg
}
}

// DontWait is used to fire and forget an unsubscription, and don't
// care about the results of it
func DontWait() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
func DontWait() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.wg = nil
}
}

func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
return []FilterUnsubscribeOption{
AutomaticRequestID(),
func DefaultUnsubscribeOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticRequestID(),
WithWaitGroup(&sync.WaitGroup{}),
}
}
Expand Down
Loading

0 comments on commit 897a4d8

Please sign in to comment.