Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: filter network change #1270

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// filterSubscriptions is the map of filter subscription IDs to subscriptions

const filterSubBatchSize = 90
const initNetworkConnType = 255

type appFilterMap map[string]filterConfig

Expand All @@ -43,6 +44,7 @@ type FilterManager struct {
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}

type SubDetails struct {
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType

//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
Expand Down Expand Up @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
}
}

// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batch-limit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created

func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
Expand Down Expand Up @@ -182,37 +185,57 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}

func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
}

func (mgr *FilterManager) closeAllSubscriptions() {
mgr.Lock()
mgr.logger.Debug("closing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions)))
for _, asub := range mgr.filterSubscriptions {
for _, sub := range asub.sub.subs {
sub.SetClosing()
}
}
mgr.Unlock()
}

// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if mgr.networkConnType != initNetworkConnType && //checking for initialization condition
mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters
// resubscribe all existing filters
mgr.closeAllSubscriptions()
}
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.checkAndProcessQueue(pubsubTopic)
mgr.Unlock()
}

mgr.networkConnType = connType
mgr.onlineChecker.SetOnline(newStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() {
// Mock peers going down
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())

fm.OnConnectionStatusChange("", false)
fm.OnConnectionStatusChange("", false, 0)
time.Sleep(2 * time.Second)
fm.OnConnectionStatusChange("", true)
fm.OnConnectionStatusChange("", true, 0)
s.ConnectToFullNode(s.LightNode, s.FullNode)
time.Sleep(3 * time.Second)

Expand Down
17 changes: 13 additions & 4 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type MissingMessageVerifier struct {
storenodeRequestor common.StorenodeRequestor
messageTracker MessageTracker

criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex

C chan *protocol.Envelope
Expand All @@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
criteriaInterest: make(map[string]criteriaInterest),
criteriaInterest: make(map[string]*criteriaInterest),
C: make(chan *protocol.Envelope, 1000),
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
currMessageVerificationRequest.cancel()
}

m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest
}

func (m *MissingMessageVerifier) setRunning(running bool) {
Expand All @@ -121,6 +121,15 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.ctx = ctx
m.cancel = cancelFunc

// updating context for existing criteria
m.criteriaInterestMu.Lock()
for _, value := range m.criteriaInterest {
ctx, cancel := context.WithCancel(m.ctx)
value.ctx = ctx
value.cancel = cancel
}
m.criteriaInterestMu.Unlock()

go func() {
defer utils.LogOnPanic()
t := time.NewTicker(m.params.interval)
Expand All @@ -134,7 +143,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.criteriaInterestMu.RLock()
critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
for _, value := range m.criteriaInterest {
critIntList = append(critIntList, value)
critIntList = append(critIntList, *value)
}
m.criteriaInterestMu.RUnlock()
for _, interest := range critIntList {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
logger.Warn("received message push from unknown peer")
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
if err := stream.Reset(); err != nil {
Expand Down
Loading