Skip to content

Commit

Permalink
chore: limit the maximum number of message hashes to request per query
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 9, 2024
1 parent 3eab289 commit 8f0f156
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
70 changes: 44 additions & 26 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50

// MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This
Expand Down Expand Up @@ -247,38 +248,55 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
return nil
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
wg := sync.WaitGroup{}
// Split into batches
for i := 0; i < len(missingHashes); i += maxMsgHashesPerRequest {
j := i + maxMsgHashesPerRequest
if j > len(missingHashes) {
j = len(missingHashes)
}
return err
}

for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
}
wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer wg.Wait()

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))

for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
}
return err
}

}(missingHashes[i:j])
}

wg.Wait()

return nil
}
8 changes: 4 additions & 4 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.uber.org/zap"
)

const DefaultMaxHashQueryLength = 100
const DefaultMaxHashQueryLength = 50
const DefaultHashQueryInterval = 3 * time.Second
const DefaultMessageSentPeriod = 3 // in seconds
const DefaultMessageExpiredPerid = 10 // in seconds
Expand Down Expand Up @@ -209,7 +209,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}

m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes))
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))

result, err := m.store.QueryByHash(ctx, messageHashes, opts...)
if err != nil {
Expand Down Expand Up @@ -241,8 +241,8 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
}
}

m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes))
m.logger.Debug("ack message hashes", zap.Stringers("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Stringers("missedHashes", missedHashes))

return append(ackHashes, missedHashes...)
}

0 comments on commit 8f0f156

Please sign in to comment.