diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go new file mode 100644 index 000000000..7e5428da9 --- /dev/null +++ b/waku/v2/api/filter/filter_manager.go @@ -0,0 +1,237 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + + "go.uber.org/zap" + "golang.org/x/exp/maps" + + "github.com/waku-org/go-waku/waku/v2/onlinechecker" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" +) + +// Methods on FilterManager just aggregate filters from application and subscribe to them +// +// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions +// +// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes +// +// filterConfigs is the map of filer IDs to filter configs +// filterSubscriptions is the map of filter subscription IDs to subscriptions + +const filterSubBatchSize = 90 + +type appFilterMap map[string]filterConfig + +type FilterManager struct { + sync.Mutex + ctx context.Context + minPeersPerFilter int + onlineChecker *onlinechecker.DefaultOnlineChecker + filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details + onNewEnvelopes func(env *protocol.Envelope) error + logger *zap.Logger + node *filter.WakuFilterLightNode + filterSubBatchDuration time.Duration + incompleteFilterBatch map[string]filterConfig + filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} + waitingToSubQueue chan filterConfig +} + +type SubDetails struct { + cancel func() + sub *Sub +} + +type filterConfig struct { + ID string + contentFilter protocol.ContentFilter +} + +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, onNewEnvelopes func(env *protocol.Envelope) error, node *filter.WakuFilterLightNode) *FilterManager { + // This fn is being mocked in test + mgr := new(FilterManager) + mgr.ctx = ctx + mgr.logger = logger + mgr.minPeersPerFilter = minPeersPerFilter + mgr.onNewEnvelopes = onNewEnvelopes + mgr.filterSubscriptions = make(map[string]SubDetails) + mgr.node = node + mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) + mgr.node.SetOnlineChecker(mgr.onlineChecker) + mgr.filterSubBatchDuration = 5 * time.Second + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.filterConfigs = make(appFilterMap) + mgr.waitingToSubQueue = make(chan filterConfig, 100) + go mgr.startFilterSubLoop() + return mgr +} + +func (mgr *FilterManager) startFilterSubLoop() { + ticker := time.NewTicker(mgr.filterSubBatchDuration) + defer ticker.Stop() + for { + select { + case <-mgr.ctx.Done(): + return + case <-ticker.C: + // TODO: Optimization, handle case where 1st addFilter happens just before ticker expires. + if mgr.onlineChecker.IsOnline() { + mgr.Lock() + for _, af := range mgr.incompleteFilterBatch { + mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)), + zap.Stringer("agg-content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.Unlock() + } + } + } +} + +/* +addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +once batchlimit is hit, all filters are subscribed to and new batch is created. +if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created +*/ +func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { + mgr.logger.Debug("adding filter", zap.String("filter-id", filterID)) + + mgr.Lock() + defer mgr.Unlock() + + afilter, ok := mgr.incompleteFilterBatch[cf.PubsubTopic] + if !ok { + // no existing batch for pubsubTopic + mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", cf.PubsubTopic)) + afilter = filterConfig{uuid.NewString(), cf} + mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } else { + mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic)) + if len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics) > filterSubBatchSize { + // filter batch limit is hit + if mgr.onlineChecker.IsOnline() { + // node is online, go ahead and subscribe the batch + mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics))) + go mgr.subscribeAndRunLoop(afilter) + } else { + mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics))) + // queue existing batch as node is not online + mgr.waitingToSubQueue <- afilter + } + afilter = filterConfig{uuid.NewString(), cf} + mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf)) + mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } else { + // add to existing batch as batch limit not reached + for _, ct := range maps.Keys(cf.ContentTopics) { + afilter.contentFilter.ContentTopics[ct] = struct{}{} + } + mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics))) + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } + } +} + +func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { + ctx, cancel := context.WithCancel(mgr.ctx) + config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) + mgr.Lock() + mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} + mgr.Unlock() + if err == nil { + mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) + mgr.runFilterSubscriptionLoop(sub) + } else { + mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) + } +} + +func (mgr *FilterManager) NetworkChange() { + mgr.node.PingPeers() // ping all peers to check if subscriptions are alive +} + +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { + subs := mgr.node.Subscriptions() + mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), + zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online + mgr.NetworkChange() + mgr.logger.Debug("switching from offline to online") + mgr.Lock() + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + // TODO: Can this cause issues? + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } + mgr.Unlock() + } + + mgr.onlineChecker.SetOnline(newStatus) +} + +func (mgr *FilterManager) UnsubscribeFilter(filterID string) { + mgr.Lock() + defer mgr.Unlock() + mgr.logger.Debug("removing filter", zap.String("filter-id", filterID)) + filterConfig, ok := mgr.filterConfigs[filterID] + if !ok { + mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID)) + return + } + af, ok := mgr.filterSubscriptions[filterConfig.ID] + if ok { + delete(mgr.filterConfigs, filterID) + for ct := range filterConfig.contentFilter.ContentTopics { + delete(af.sub.ContentFilter.ContentTopics, ct) + } + if len(af.sub.ContentFilter.ContentTopics) == 0 { + af.cancel() + } else { + go af.sub.Unsubscribe(filterConfig.contentFilter) + } + } else { + mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID)) + } +} + +func (mgr *FilterManager) runFilterSubscriptionLoop(sub *Sub) { + for { + select { + case <-mgr.ctx.Done(): + mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter)) + return + case env, ok := <-sub.DataCh: + if ok { + err := (mgr.onNewEnvelopes)(env) + if err != nil { + mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err)) + } + } else { + mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter)) + return + } + } + } +}