Skip to content

Commit

Permalink
fix: dont start missing message verified if already started
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Dec 3, 2024
1 parent 69c9d82 commit 692b862
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ type MissingMessageVerifier struct {
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex

C <-chan *protocol.Envelope
C chan *protocol.Envelope

timesource timesource.Timesource
logger *zap.Logger
timesource timesource.Timesource
logger *zap.Logger
isRunning bool
runningMutex sync.RWMutex
}

// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
Expand All @@ -64,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
criteriaInterest: make(map[string]criteriaInterest),
C: make(chan *protocol.Envelope, 1000),
}
}

Expand Down Expand Up @@ -99,14 +103,20 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
}

func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.runningMutex.RLock()
if m.isRunning { //make sure verifier only runs once.
m.runningMutex.RUnlock()
return
}
m.runningMutex.RUnlock()

ctx, cancelFunc := context.WithCancel(ctx)
m.ctx = ctx
m.cancel = cancelFunc
m.criteriaInterest = make(map[string]criteriaInterest)

c := make(chan *protocol.Envelope, 1000)
m.C = c

m.runningMutex.Lock()
m.isRunning = true
m.runningMutex.Unlock()
go func() {
defer utils.LogOnPanic()
t := time.NewTicker(m.params.interval)
Expand All @@ -131,7 +141,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanic()
m.fetchHistory(c, interest)
m.fetchHistory(m.C, interest)
<-semaphore
}(interest)
}
Expand All @@ -146,6 +156,9 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {

func (m *MissingMessageVerifier) Stop() {
m.cancel()
m.runningMutex.Lock()
defer m.runningMutex.Unlock()
m.isRunning = false
}

func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) {
Expand Down

0 comments on commit 692b862

Please sign in to comment.