From 9854507d6a03fd0bfb3d3e7cef55fc9ea88284f0 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 28 Nov 2024 18:17:18 -0800 Subject: [PATCH] feat: option to emit event to bus upon message sent --- waku/v2/api/publish/message_sender.go | 27 ++++++++++++++ waku/v2/api/publish/message_sender_test.go | 43 ++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index c457589e7..62dcb4af7 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -6,6 +6,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -53,6 +55,12 @@ type MessageSender struct { messageSentCheck ISentCheck rateLimiter *PublishRateLimiter logger *zap.Logger + evtMessageSent event.Emitter +} + +type MessageSent struct { + Size uint32 // Size of payload in bytes + Timestamp int64 } type Request struct { @@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess return ms } +func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender { + evtMessageSent, err := host.EventBus().Emitter(new(MessageSent)) + if err != nil { + ms.logger.Error("failed to create message sent emitter", zap.Error(err)) + } + ms.evtMessageSent = evtMessageSent + return ms +} + func (ms *MessageSender) Send(req *Request) error { logger := ms.logger.With( zap.Stringer("envelopeHash", req.envelope.Hash()), @@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error { ) } + if ms.evtMessageSent != nil { + err := ms.evtMessageSent.Emit(MessageSent{ + Size: uint32(len(req.envelope.Message().Payload)), + Timestamp: req.envelope.Message().GetTimestamp(), + }) + if err != nil { + logger.Error("failed to emit message sent event", zap.Error(err)) + } + } + return nil } diff --git a/waku/v2/api/publish/message_sender_test.go b/waku/v2/api/publish/message_sender_test.go index 2aabf0110..1409a844e 100644 --- a/waku/v2/api/publish/message_sender_test.go +++ b/waku/v2/api/publish/message_sender_test.go @@ -3,6 +3,7 @@ package publish import ( "context" "crypto/rand" + "sync" "testing" "time" @@ -129,3 +130,45 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) { return host, relay } + +func TestMessageSentEmitter(t *testing.T) { + host, relayNode := createRelayNode(t) + err := relayNode.Start(context.Background()) + require.Nil(t, err) + defer relayNode.Stop() + + _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) + require.Nil(t, err) + publisher := NewDefaultPublisher(nil, relayNode) + sender, err := NewMessageSender(Relay, publisher, utils.Logger()) + require.Nil(t, err) + + check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)} + sender.WithMessageSentCheck(check) + sender.WithMessageSentEmitter(host) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: utils.GetUnixEpoch(), + ContentTopic: "test-content-topic", + } + envelope := protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), "test-pubsub-topic") + req := NewRequest(context.TODO(), envelope) + + wg := sync.WaitGroup{} + wg.Add(1) + sub, err := host.EventBus().Subscribe(new(MessageSent)) + require.Nil(t, err) + go func() { + for msgSentEvt := range sub.Out() { + msgSent := msgSentEvt.(MessageSent) + require.Equal(t, uint32(len(msg.Payload)), msgSent.Size) + require.Equal(t, msg.Timestamp, msgSent.Timestamp) + wg.Done() + } + }() + + err = sender.Send(req) + require.Nil(t, err) + go wg.Wait() +}