From 2f55b40043618869d3ff0e26060f452d5331722f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 1 Aug 2024 10:39:04 -0400 Subject: [PATCH] refactor: move missing messages logic from status-go to go-waku (#1174) --- logging/logging.go | 4 + waku/v2/api/missing/criteria_interest.go | 47 ++++ waku/v2/api/missing/missing_messages.go | 284 +++++++++++++++++++++++ waku/v2/api/missing/options.go | 39 ++++ 4 files changed, 374 insertions(+) create mode 100644 waku/v2/api/missing/criteria_interest.go create mode 100644 waku/v2/api/missing/missing_messages.go create mode 100644 waku/v2/api/missing/options.go diff --git a/logging/logging.go b/logging/logging.go index 19732d55f..d577a1c5e 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -74,6 +74,10 @@ func (t timestamp) String() string { return time.Unix(0, int64(t)).Format(time.RFC3339) } +func Epoch(key string, time time.Time) zap.Field { + return zap.String(key, fmt.Sprintf("%d", time.UnixNano())) +} + // History Query Filters type historyFilters []*pb.ContentFilter diff --git a/waku/v2/api/missing/criteria_interest.go b/waku/v2/api/missing/criteria_interest.go new file mode 100644 index 000000000..919b2fc91 --- /dev/null +++ b/waku/v2/api/missing/criteria_interest.go @@ -0,0 +1,47 @@ +package missing + +import ( + "context" + "slices" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type criteriaInterest struct { + peerID peer.ID + contentFilter protocol.ContentFilter + lastChecked time.Time + + ctx context.Context + cancel context.CancelFunc +} + +func (c criteriaInterest) equals(other criteriaInterest) bool { + if c.peerID != other.peerID { + return false + } + + if c.contentFilter.PubsubTopic != other.contentFilter.PubsubTopic { + return false + } + + contentTopics := c.contentFilter.ContentTopics.ToList() + otherContentTopics := other.contentFilter.ContentTopics.ToList() + + slices.Sort(contentTopics) + slices.Sort(otherContentTopics) + + if len(contentTopics) != len(otherContentTopics) { + return false + } + + for i, contentTopic := range contentTopics { + if contentTopic != otherContentTopics[i] { + return false + } + } + + return true +} diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go new file mode 100644 index 000000000..a50e60718 --- /dev/null +++ b/waku/v2/api/missing/missing_messages.go @@ -0,0 +1,284 @@ +package missing + +// test + +import ( + "context" + "encoding/hex" + "errors" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/timesource" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +const maxContentTopicsPerRequest = 10 + +// MessageTracker should keep track of messages it has seen before and +// provide a way to determine whether a message exists or not. This +// is application specific +type MessageTracker interface { + MessageExists(pb.MessageHash) (bool, error) +} + +// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria +type MissingMessageVerifier struct { + ctx context.Context + params missingMessageVerifierParams + + messageTracker MessageTracker + + criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + criteriaInterestMu sync.Mutex + + C <-chan *protocol.Envelope + + store *store.WakuStore + timesource timesource.Timesource + logger *zap.Logger +} + +// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier +func NewMissingMessageVerifier(store *store.WakuStore, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { + options = append(defaultMissingMessagesVerifierOptions, options...) + params := missingMessageVerifierParams{} + for _, opt := range options { + opt(¶ms) + } + + return &MissingMessageVerifier{ + store: store, + timesource: timesource, + messageTracker: messageTracker, + logger: logger.Named("missing-msg-verifier"), + params: params, + } +} + +func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) { + m.criteriaInterestMu.Lock() + defer m.criteriaInterestMu.Unlock() + + ctx, cancel := context.WithCancel(m.ctx) + criteriaInterest := criteriaInterest{ + peerID: peerID, + contentFilter: contentFilter, + lastChecked: m.timesource.Now().Add(-m.params.delay), + ctx: ctx, + cancel: cancel, + } + + currMessageVerificationRequest, ok := m.criteriaInterest[contentFilter.PubsubTopic] + + if ok && currMessageVerificationRequest.equals(criteriaInterest) { + return + } + + if ok { + // If there is an ongoing request, we cancel it before replacing it + // by the new list. This can be probably optimized further by tracking + // the last time a content topic was synced, but might not be necessary + // since cancelling an ongoing request would mean cancelling just a single + // page of results + currMessageVerificationRequest.cancel() + } + + m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest +} + +func (m *MissingMessageVerifier) Start(ctx context.Context) { + m.ctx = ctx + m.criteriaInterest = make(map[string]criteriaInterest) + + c := make(chan *protocol.Envelope, 1000) + m.C = c + + go func() { + t := time.NewTicker(m.params.interval) + defer t.Stop() + + var semaphore = make(chan struct{}, 5) + for { + select { + case <-t.C: + m.logger.Debug("checking for missing messages...") + m.criteriaInterestMu.Lock() + for _, interest := range m.criteriaInterest { + select { + case <-ctx.Done(): + return + default: + semaphore <- struct{}{} + go func(interest criteriaInterest) { + m.fetchHistory(c, interest) + <-semaphore + }(interest) + } + } + m.criteriaInterestMu.Unlock() + + case <-ctx.Done(): + return + } + } + }() +} + +func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) { + contentTopics := interest.contentFilter.ContentTopics.ToList() + for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest { + j := i + maxContentTopicsPerRequest + if j > len(contentTopics) { + j = len(contentTopics) + } + + now := m.timesource.Now() + err := m.fetchMessagesBatch(c, interest, i, j, now) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + m.logger.Error("could not fetch history", + zap.Stringer("peerID", interest.peerID), + zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), + zap.Strings("contentTopics", contentTopics)) + continue + } + + m.criteriaInterestMu.Lock() + c := m.criteriaInterest[interest.contentFilter.PubsubTopic] + if c.equals(interest) { + c.lastChecked = now + m.criteriaInterest[interest.contentFilter.PubsubTopic] = c + } + m.criteriaInterestMu.Unlock() + } +} + +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { + retry := true + count := 1 + for retry && count <= m.params.maxAttemptsToRetrieveHistory { + logger.Debug(logMsg, zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + result, err := queryFunc(tCtx) + cancel() + if err != nil { + logger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + select { + case <-m.ctx.Done(): + return nil, m.ctx.Err() + case <-time.After(2 * time.Second): + } + } else { + return result, nil + } + } + + return nil, errors.New("storenode not available") +} + +func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, interest criteriaInterest, batchFrom int, batchTo int, now time.Time) error { + contentTopics := interest.contentFilter.ContentTopics.ToList() + + logger := m.logger.With( + zap.Stringer("peerID", interest.peerID), + zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]), + zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), + logging.Epoch("from", interest.lastChecked), + logging.Epoch("to", now), + ) + + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + return m.store.Query(ctx, store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...), + TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), + }, store.WithPeer(interest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) + }, logger, "retrieving history to check for missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + var missingHashes []pb.MessageHash + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + hash := pb.ToMessageHash(mkv.MessageHash) + exists, err := m.messageTracker.MessageExists(hash) + if err != nil { + return err + } + + if exists { + continue + } + + missingHashes = append(missingHashes, hash) + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + if len(missingHashes) == 0 { + // Nothing to do here + return nil + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100)) + }, logger, "retrieving missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + select { + case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()): + default: + m.logger.Warn("subscriber is too slow!") + } + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + return nil +} diff --git a/waku/v2/api/missing/options.go b/waku/v2/api/missing/options.go new file mode 100644 index 000000000..b16abbc70 --- /dev/null +++ b/waku/v2/api/missing/options.go @@ -0,0 +1,39 @@ +package missing + +import "time" + +type missingMessageVerifierParams struct { + delay time.Duration + interval time.Duration + maxAttemptsToRetrieveHistory int +} + +// MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior +type MissingMessageVerifierOption func(*missingMessageVerifierParams) + +// WithVerificationInterval is an option used to setup the verification interval +func WithVerificationInterval(t time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.interval = t + } +} + +// WithDelay is an option used to indicate the delay to apply for verifying messages +func WithDelay(t time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.delay = t + } +} + +// WithMaxAttempts indicates how many times will the message verifier retry a failed storenode request +func WithMaxRetryAttempts(max int) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.maxAttemptsToRetrieveHistory = max + } +} + +var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ + WithVerificationInterval(time.Minute), + WithDelay(20 * time.Second), + WithMaxRetryAttempts(3), +}