From 684cda35f06d82dbee9c6e40963bab5cd3d33d3d Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 19 Dec 2024 15:54:36 +0530 Subject: [PATCH] fix: update criteria context on missing msg verifier start --- waku/v2/api/missing/missing_messages.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 927ffb9c9..97ea39a13 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -41,7 +41,7 @@ type MissingMessageVerifier struct { storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker - criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex C chan *protocol.Envelope @@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes messageTracker: messageTracker, logger: logger.Named("missing-msg-verifier"), params: params, - criteriaInterest: make(map[string]criteriaInterest), + criteriaInterest: make(map[string]*criteriaInterest), C: make(chan *protocol.Envelope, 1000), } } @@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt currMessageVerificationRequest.cancel() } - m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest + m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest } func (m *MissingMessageVerifier) setRunning(running bool) { @@ -121,6 +121,15 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.ctx = ctx m.cancel = cancelFunc + // updating context for existing criteria + m.criteriaInterestMu.Lock() + for _, value := range m.criteriaInterest { + ctx, cancel := context.WithCancel(m.ctx) + value.ctx = ctx + value.cancel = cancel + } + m.criteriaInterestMu.Unlock() + go func() { defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) @@ -134,7 +143,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.criteriaInterestMu.RLock() critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest)) for _, value := range m.criteriaInterest { - critIntList = append(critIntList, value) + critIntList = append(critIntList, *value) } m.criteriaInterestMu.RUnlock() for _, interest := range critIntList {