diff --git a/waku/v2/api/publish/message_queue.go b/waku/v2/api/publish/message_queue.go index fbd79df88..ad153ff9c 100644 --- a/waku/v2/api/publish/message_queue.go +++ b/waku/v2/api/publish/message_queue.go @@ -103,12 +103,6 @@ func (m *MessageQueue) Start(ctx context.Context) { m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} case <-ctx.Done(): - if m.usePriorityQueue { - close(m.throttledPrioritySendQueue) - close(m.envelopeAvailableOnPriorityQueueSignal) - } else { - close(m.toSendChan) - } return } } @@ -116,27 +110,43 @@ func (m *MessageQueue) Start(ctx context.Context) { // Push an envelope into the message queue. The priority is optional, and will be ignored // if the message queue does not use a priority queue -func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) { +func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error { if m.usePriorityQueue { msgPriority := NormalPriority if len(priority) != 0 { msgPriority = priority[0] } - m.throttledPrioritySendQueue <- &envelopePriority{ + pEnvelope := &envelopePriority{ envelope: envelope, priority: msgPriority, } + + select { + case m.throttledPrioritySendQueue <- pEnvelope: + // Do nothing + case <-ctx.Done(): + return ctx.Err() + } } else { - m.toSendChan <- envelope + select { + case m.toSendChan <- envelope: + // Do nothing + case <-ctx.Done(): + return ctx.Err() + } } + + return nil } // Pop will return a channel on which a message can be retrieved from the message queue -func (m *MessageQueue) Pop() <-chan *protocol.Envelope { +func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { ch := make(chan *protocol.Envelope) go func() { + defer close(ch) + select { case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: if ok { @@ -147,9 +157,11 @@ func (m *MessageQueue) Pop() <-chan *protocol.Envelope { if ok { ch <- envelope } + + case <-ctx.Done(): + return } - close(ch) }() return ch diff --git a/waku/v2/api/publish/message_queue_test.go b/waku/v2/api/publish/message_queue_test.go index 15761c57c..e7b8e21aa 100644 --- a/waku/v2/api/publish/message_queue_test.go +++ b/waku/v2/api/publish/message_queue_test.go @@ -17,25 +17,30 @@ func TestFifoQueue(t *testing.T) { queue := NewMessageQueue(10, false) go queue.Start(ctx) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A")) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B")) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C")) + err := queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A")) + require.NoError(t, err) - envelope, ok := <-queue.Pop() + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B")) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C")) + require.NoError(t, err) + + envelope, ok := <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "A", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "B", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "C", envelope.PubsubTopic()) cancel() - _, ok = <-queue.Pop() + _, ok = <-queue.Pop(ctx) require.False(t, ok) } @@ -45,47 +50,60 @@ func TestPriorityQueue(t *testing.T) { queue := NewMessageQueue(10, true) go queue.Start(ctx) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority) - queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority) + err := queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority) + require.NoError(t, err) + + err = queue.Push(ctx, protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority) + require.NoError(t, err) time.Sleep(2 * time.Second) - envelope, ok := <-queue.Pop() + envelope, ok := <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "C", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "E", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "D", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "G", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "A", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "B", envelope.PubsubTopic()) - envelope, ok = <-queue.Pop() + envelope, ok = <-queue.Pop(ctx) require.True(t, ok) require.Equal(t, "F", envelope.PubsubTopic()) cancel() - _, ok = <-queue.Pop() + _, ok = <-queue.Pop(ctx) require.False(t, ok) }