diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 665d577bd..f561624a8 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -26,6 +26,7 @@ import ( // filterSubscriptions is the map of filter subscription IDs to subscriptions const filterSubBatchSize = 90 +const initNetworkConnType = 255 type appFilterMap map[string]filterConfig @@ -43,6 +44,7 @@ type FilterManager struct { filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} waitingToSubQueue chan filterConfig envProcessor EnevelopeProcessor + networkConnType byte } type SubDetails struct { @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. mgr.params = new(subscribeParameters) @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() { } } -// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch -// once batchlimit is hit, all filters are subscribed to and new batch is created. +// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batch-limit is hit, all filters are subscribed to and new batch is created. // if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { @@ -182,37 +185,62 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } +} + +func (mgr *FilterManager) resubscribeAllSubscriptions() { + mgr.Lock() + mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions))) + for _, asub := range mgr.filterSubscriptions { + asub.sub.cleanup() + } + mgr.Unlock() + + mgr.Lock() + for filterID, config := range mgr.filterConfigs { + mgr.SubscribeFilter(filterID, config.contentFilter) + } + mgr.Unlock() + +} + // OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa // Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, // if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. -func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) { subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if mgr.networkConnType != initNetworkConnType && //checking for initialization condition + mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters + // resubscribe all existing filters + mgr.resubscribeAllSubscriptions() + } if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } - } - } + mgr.checkAndProcessQueue(pubsubTopic) mgr.Unlock() } - + mgr.networkConnType = connType mgr.onlineChecker.SetOnline(newStatus) } diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d408..8a720ea66 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() { // Mock peers going down s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID()) - fm.OnConnectionStatusChange("", false) + fm.OnConnectionStatusChange("", false, 0) time.Sleep(2 * time.Second) - fm.OnConnectionStatusChange("", true) + fm.OnConnectionStatusChange("", true, 0) s.ConnectToFullNode(s.LightNode, s.FullNode) time.Sleep(3 * time.Second) diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 927ffb9c9..97ea39a13 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -41,7 +41,7 @@ type MissingMessageVerifier struct { storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker - criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex C chan *protocol.Envelope @@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes messageTracker: messageTracker, logger: logger.Named("missing-msg-verifier"), params: params, - criteriaInterest: make(map[string]criteriaInterest), + criteriaInterest: make(map[string]*criteriaInterest), C: make(chan *protocol.Envelope, 1000), } } @@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt currMessageVerificationRequest.cancel() } - m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest + m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest } func (m *MissingMessageVerifier) setRunning(running bool) { @@ -121,6 +121,15 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.ctx = ctx m.cancel = cancelFunc + // updating context for existing criteria + m.criteriaInterestMu.Lock() + for _, value := range m.criteriaInterest { + ctx, cancel := context.WithCancel(m.ctx) + value.ctx = ctx + value.cancel = cancel + } + m.criteriaInterestMu.Unlock() + go func() { defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) @@ -134,7 +143,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.criteriaInterestMu.RLock() critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest)) for _, value := range m.criteriaInterest { - critIntList = append(critIntList, value) + critIntList = append(critIntList, *value) } m.criteriaInterestMu.RUnlock() for _, interest := range critIntList { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 8fbcd91c1..c1e762e9b 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } if !wf.subscriptions.IsSubscribedTo(peerID) { - logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) + logger.Warn("received message push from unknown peer") wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us if err := stream.Reset(); err != nil {