From 1d6ac0f81814957a6f181608a7dc994b3989f892 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 6 Nov 2023 19:25:28 +0530 Subject: [PATCH 01/10] update relay REST API's to remove duplicate message cache, fix relay tests and admin test --- cmd/waku/server/rest/relay.go | 106 +++++++-------------------- cmd/waku/server/rest/relay_test.go | 45 +++++------- cmd/waku/server/rest/waku_rest.go | 3 - cmd/waku/server/rpc/admin_test.go | 3 +- cmd/waku/server/rpc/relay.go | 5 -- cmd/waku/server/rpc/relay_test.go | 3 +- waku/v2/protocol/relay/waku_relay.go | 24 +++++- 7 files changed, 73 insertions(+), 116 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 968ed338f..6132df2c9 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -3,11 +3,8 @@ package rest import ( "context" "encoding/json" - "errors" "net/http" - "net/url" "strings" - "sync" "github.com/go-chi/chi/v5" "github.com/waku-org/go-waku/cmd/waku/server" @@ -31,11 +28,8 @@ type RelayService struct { log *zap.Logger - messages map[string][]*pb.WakuMessage cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService + ctx context.Context } // NewRelayService returns an instance of RelayService @@ -44,10 +38,9 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za node: node, log: log.Named("relay"), cacheCapacity: cacheCapacity, - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) + //s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) @@ -65,36 +58,11 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - // Start starts the RelayService func (r *RelayService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel - - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = []*pb.WakuMessage{} - } - r.messagesMutex.Unlock() - - r.runner.Start(ctx) + r.ctx = ctx } // Stop stops the RelayService @@ -114,16 +82,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re } defer req.Body.Close() - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - var err error for _, topic := range topics { err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) - } else { - delete(r.messages, topic) } } @@ -140,26 +103,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ defer req.Body.Close() var err error - var sub *relay.Subscription - var subs []*relay.Subscription + var successCnt int var topicToSubscribe string for _, topic := range topics { if topic == "" { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic)) topicToSubscribe = relay.DefaultWakuTopic } else { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic)) topicToSubscribe = topic } + _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe)) + if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) - } else { - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = []*pb.WakuMessage{} - r.messagesMutex.Unlock() + continue } + successCnt++ + } + + // on partial subscribe failure + if successCnt > 0 && err != nil { + r.log.Error("partial subscribe failed", zap.Error(err)) + // on partial failure + writeResponse(w, err, http.StatusOK) + return } writeErrOrResponse(w, err, true) @@ -170,20 +136,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { if topic == "" { return } - - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[topic]; !ok { + //TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well. + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "") + if err != nil { w.WriteHeader(http.StatusNotFound) - _, err := w.Write([]byte("not subscribed to topic")) + _, err = w.Write([]byte("not subscribed to topic")) r.log.Error("writing response", zap.Error(err)) return } + var response []*pb.WakuMessage + select { + case msg := <-sub.Ch: + response = append(response, msg.Message()) + default: + break + } - response := r.messages[topic] - - r.messages[topic] = []*pb.WakuMessage{} writeErrOrResponse(w, nil, response) } @@ -205,11 +173,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { topic = relay.DefaultWakuTopic } - if !r.node.Relay().IsSubscribed(topic) { - writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil) - return - } - if err := server.AppendRLNProof(r.node, message); err != nil { writeErrOrResponse(w, err, nil) return @@ -266,21 +229,8 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. } func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { - cTopic := chi.URLParam(req, "contentTopic") - if cTopic == "" { - w.WriteHeader(http.StatusBadRequest) - _, err := w.Write([]byte("contentTopic is required")) - r.log.Error("writing response", zap.Error(err)) - return - } - cTopic, err := url.QueryUnescape(cTopic) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - _, err = w.Write([]byte("invalid contentTopic format")) - r.log.Error("writing response", zap.Error(err)) - return - } + cTopic := topicFromPath(w, req, "contentTopic", r.log) sub, err := r.node.Relay().GetSubscription(cTopic) if err != nil { w.WriteHeader(http.StatusNotFound) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 3fe994b90..470b9cbb7 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -15,8 +15,8 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -34,7 +34,6 @@ func TestPostV1Message(t *testing.T) { router := chi.NewRouter() _ = makeRelayService(t, router) - msg := &pb.WakuMessage{ Payload: []byte{1, 2, 3}, ContentTopic: "abc", @@ -54,10 +53,10 @@ func TestPostV1Message(t *testing.T) { func TestRelaySubscription(t *testing.T) { router := chi.NewRouter() - d := makeRelayService(t, router) + r := makeRelayService(t, router) - go d.Start(context.Background()) - defer d.Stop() + go r.Start(context.Background()) + defer r.Stop() // Wait for node to start time.Sleep(500 * time.Millisecond) @@ -74,21 +73,19 @@ func TestRelaySubscription(t *testing.T) { // Test max messages in subscription now := utils.GetUnixEpoch() - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test")) - - // Wait for the messages to be processed - time.Sleep(500 * time.Millisecond) - - require.Len(t, d.messages["test"], 3) - - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test")) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test")) + require.NoError(t, err) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test")) + require.NoError(t, err) - time.Sleep(500 * time.Millisecond) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test")) + require.NoError(t, err) - // Should only have 3 messages - require.Len(t, d.messages["test"], 3) + // Wait for the messages to be processed + time.Sleep(5 * time.Millisecond) // Test deletion rr = httptest.NewRecorder() @@ -96,17 +93,17 @@ func TestRelaySubscription(t *testing.T) { router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) - require.Len(t, d.messages["test"], 0) - } func TestRelayGetV1Messages(t *testing.T) { router := chi.NewRouter() + router1 := chi.NewRouter() serviceA := makeRelayService(t, router) go serviceA.Start(context.Background()) defer serviceA.Stop() - serviceB := makeRelayService(t, router) + + serviceB := makeRelayService(t, router1) go serviceB.Start(context.Background()) defer serviceB.Stop() @@ -165,9 +162,7 @@ func TestRelayGetV1Messages(t *testing.T) { rr = httptest.NewRecorder() req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{})) - router.ServeHTTP(rr, req) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) - err = json.Unmarshal(rr.Body.Bytes(), &messages) - require.NoError(t, err) - require.Len(t, messages, 0) } diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 98c5437c9..f6efbfe85 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -75,9 +75,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - if r.node.Relay() != nil { - go r.relayService.Start(ctx) - } if r.node.FilterLightnode() != nil { go r.filterService.Start(ctx) } diff --git a/cmd/waku/server/rpc/admin_test.go b/cmd/waku/server/rpc/admin_test.go index 41966e6f7..a17979e11 100644 --- a/cmd/waku/server/rpc/admin_test.go +++ b/cmd/waku/server/rpc/admin_test.go @@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcast := relay.NewBroadcaster(10) + relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 5f4310468..992dc40e3 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,7 +1,6 @@ package rpc import ( - "errors" "fmt" "net/http" "sync" @@ -105,10 +104,6 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, topic = args.Topic } - if !r.node.Relay().IsSubscribed(topic) { - return errors.New("not subscribed to pubsubTopic") - } - msg := args.Message.toProto() if err = server.AppendRLNProof(r.node, msg); err != nil { diff --git a/cmd/waku/server/rpc/relay_test.go b/cmd/waku/server/rpc/relay_test.go index 61427faf3..cd9a408ff 100644 --- a/cmd/waku/server/rpc/relay_test.go +++ b/cmd/waku/server/rpc/relay_test.go @@ -111,7 +111,8 @@ func TestRelayGetV1Messages(t *testing.T) { &RelayMessageArgs{ Topic: "test", Message: ProtoToRPC(&pb.WakuMessage{ - Payload: []byte("test"), + Payload: []byte("test"), + ContentTopic: "testContentTopic", }), }, &reply, diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index f55eba1e3..8c186370d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -279,13 +279,31 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . return hash, nil } +// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic +func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) { + var contentFilter waku_proto.ContentFilter + if contentTopic != "" { + contentFilter = waku_proto.NewContentFilter(pubsubTopic, contentTopic) + } else { + contentFilter = waku_proto.NewContentFilter(pubsubTopic) + } + cSubs := w.contentSubs[pubsubTopic] + for _, sub := range cSubs { + if sub.contentFilter.Equals(contentFilter) { + return sub, nil + } + } + return nil, errors.New("no subscription found for content topic") +} + +// GetSubscription fetches subscription matching a contentTopic(via autosharding) func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { - pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { return nil, err } - contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic) - cSubs := w.contentSubs[pubSubTopic] + contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic) + cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { return sub, nil From cb0abeeedbdb975b9619453c3686507c5ba2fafc Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 6 Nov 2023 19:29:01 +0530 Subject: [PATCH 02/10] chore: enable REST and RPC unit tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e68348b17..71b8ef85a 100644 --- a/Makefile +++ b/Makefile @@ -80,7 +80,7 @@ test-with-race: ${GOBIN} test -race -timeout 300s ./waku/... test: - ${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... + ${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE} ${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV} From 9d6643de3517445fce6b672aec7be8ef87d1b990 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 11:31:39 +0530 Subject: [PATCH 03/10] update lightpush rest api to match yaml --- cmd/waku/server/rest/lightpush_rest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index 8500df4a5..e33f4b7a2 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req _, err = w.Write([]byte(err.Error())) serv.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } From 0f6c93c0e2e8cf6a248b39a9c04ab8d88f3eaf14 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 12:03:52 +0530 Subject: [PATCH 04/10] fix: filter rest unit test failures --- cmd/waku/server/rest/filter.go | 10 ++++++---- cmd/waku/server/rest/filter_test.go | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index d6719e031..02e1e6815 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul var peerIds string ind := 0 for _, entry := range result.Errors() { - s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) - if ind != 0 { - peerIds += ", " + if entry.Err != nil { + s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) + if ind != 0 { + peerIds += ", " + } + peerIds += entry.PeerID.String() } - peerIds += entry.PeerID.String() ind++ } if peerIds != "" { diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 52a5af3a0..09d873e2d 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) { router.ServeHTTP(rr, req) require.Equal(t, http.StatusNotFound, rr.Code) require.Equal(t, - fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), + fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), rr.Body.String(), ) } From 4d5ef72d62a8d2e836c47182483d3a15cdc07702 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 12:11:30 +0530 Subject: [PATCH 05/10] skipping legacy filter tests --- cmd/waku/server/rpc/filter_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index e6c7d698c..160afca7a 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -50,6 +50,7 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService { } func TestFilterSubscription(t *testing.T) { + t.Skip("skipping since it is legacy filter") port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -110,6 +111,8 @@ func TestFilterSubscription(t *testing.T) { } func TestFilterGetV1Messages(t *testing.T) { + t.Skip("skipping since it is legacy filter") + serviceA := makeFilterService(t, true) var reply SuccessReply From 1008d17484211f8df363739db25c4958e8584841 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 12:12:06 +0530 Subject: [PATCH 06/10] enable rpc and rest tests with race flag --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 71b8ef85a..8d64a429c 100644 --- a/Makefile +++ b/Makefile @@ -77,7 +77,7 @@ lint-full: @golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m test-with-race: - ${GOBIN} test -race -timeout 300s ./waku/... + ${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/... test: ${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... From 946359171dc2338545d072e4be6c3045ac446c18 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 13:18:15 +0530 Subject: [PATCH 07/10] chroe: remove start/stop for relay rest --- cmd/waku/server/rest/relay.go | 22 +--------------------- cmd/waku/server/rest/relay_test.go | 7 ------- cmd/waku/server/rest/waku_rest.go | 3 --- 3 files changed, 1 insertion(+), 31 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 6132df2c9..da5f2d79c 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -1,7 +1,6 @@ package rest import ( - "context" "encoding/json" "net/http" "strings" @@ -23,13 +22,11 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages" // RelayService represents the REST service for WakuRelay type RelayService struct { - node *node.WakuNode - cancel context.CancelFunc + node *node.WakuNode log *zap.Logger cacheCapacity int - ctx context.Context } // NewRelayService returns an instance of RelayService @@ -40,8 +37,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za cacheCapacity: cacheCapacity, } - //s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) m.Get(routeRelayV1Messages, s.getV1Messages) @@ -58,21 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za return s } -// Start starts the RelayService -func (r *RelayService) Start(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel - r.ctx = ctx -} - -// Stop stops the RelayService -func (r *RelayService) Stop() { - if r.cancel == nil { - return - } - r.cancel() -} - func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) { var topics []string decoder := json.NewDecoder(req.Body) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 470b9cbb7..4256530e0 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -55,9 +55,6 @@ func TestRelaySubscription(t *testing.T) { r := makeRelayService(t, router) - go r.Start(context.Background()) - defer r.Stop() - // Wait for node to start time.Sleep(500 * time.Millisecond) @@ -100,12 +97,8 @@ func TestRelayGetV1Messages(t *testing.T) { router1 := chi.NewRouter() serviceA := makeRelayService(t, router) - go serviceA.Start(context.Background()) - defer serviceA.Stop() serviceB := makeRelayService(t, router1) - go serviceB.Start(context.Background()) - defer serviceB.Stop() hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) require.NoError(t, err) diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index f6efbfe85..1b5b5204f 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -51,9 +51,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool if node.Relay() != nil { relayService := NewRelayService(node, mux, relayCacheCapacity, log) - server.RegisterOnShutdown(func() { - relayService.Stop() - }) wrpc.relayService = relayService } From b154233003c86a0e29629d68fb0944bcf41d6558 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 18:15:16 +0530 Subject: [PATCH 08/10] chore: refactor rest and provide cacheSize as config option for relay subscribe (#869) --- cmd/waku/node.go | 9 ++++++++- cmd/waku/server/rest/relay.go | 8 ++++---- cmd/waku/server/rest/waku_rest.go | 21 +++++++++++++++------ cmd/waku/server/rest/waku_rest_test.go | 2 +- waku/v2/protocol/relay/config.go | 8 ++++++++ waku/v2/protocol/relay/waku_relay.go | 5 ++++- 6 files changed, 40 insertions(+), 13 deletions(-) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 2129a1e56..56e3f83a3 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error { var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) - restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger) + restConfig := rest.RestConfig{Address: options.RESTServer.Address, + Port: uint(options.RESTServer.Port), + EnablePProf: options.PProf, + EnableAdmin: options.RESTServer.Admin, + RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity), + FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)} + + restServer = rest.NewWakuRest(wakuNode, restConfig, logger) restServer.Start(ctx, &wg) } diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index da5f2d79c..a890169c8 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -26,11 +26,11 @@ type RelayService struct { log *zap.Logger - cacheCapacity int + cacheCapacity uint } // NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { +func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService { s := &RelayService{ node: node, log: log.Named("relay"), @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } else { topicToSubscribe = topic } - _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe)) + _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) @@ -193,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. defer req.Body.Close() var err error - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) } diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 1b5b5204f..3126f37eb 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -22,7 +22,16 @@ type WakuRest struct { filterService *FilterService } -func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest { +type RestConfig struct { + Address string + Port uint + EnablePProf bool + EnableAdmin bool + RelayCacheCapacity uint + FilterCacheCapacity uint +} + +func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest { wrpc := new(WakuRest) wrpc.log = log.Named("rest") @@ -30,7 +39,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool mux.Use(middleware.Logger) mux.Use(middleware.NoCache) - if enablePProf { + if config.EnablePProf { mux.Mount("/debug", middleware.Profiler()) } @@ -39,7 +48,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewStoreService(node, mux) _ = NewLightpushService(node, mux, log) - listenAddr := fmt.Sprintf("%s:%d", address, port) + listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port) server := &http.Server{ Addr: listenAddr, @@ -50,16 +59,16 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool wrpc.server = server if node.Relay() != nil { - relayService := NewRelayService(node, mux, relayCacheCapacity, log) + relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log) wrpc.relayService = relayService } - if enableAdmin { + if config.EnableAdmin { _ = NewAdminService(node, mux, wrpc.log) } if node.FilterLightnode() != nil { - filterService := NewFilterService(node, mux, filterCacheCapacity, log) + filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log) server.RegisterOnShutdown(func() { filterService.Stop() }) diff --git a/cmd/waku/server/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go index d27810b4e..0d62c22e0 100644 --- a/cmd/waku/server/rest/waku_rest_test.go +++ b/cmd/waku/server/rest/waku_rest_test.go @@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) { n, err := node.New(options) require.NoError(t, err) - rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger()) + rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/waku/v2/protocol/relay/config.go b/waku/v2/protocol/relay/config.go index d343edd58..16799863e 100644 --- a/waku/v2/protocol/relay/config.go +++ b/waku/v2/protocol/relay/config.go @@ -15,6 +15,7 @@ var DefaultRelaySubscriptionBufferSize int = 1024 type RelaySubscribeParameters struct { dontConsume bool + cacheSize uint } type RelaySubscribeOption func(*RelaySubscribeParameters) error @@ -28,6 +29,13 @@ func WithoutConsumer() RelaySubscribeOption { } } +func WithCacheSize(size uint) RelaySubscribeOption { + return func(params *RelaySubscribeParameters) error { + params.cacheSize = size + return nil + } +} + func msgIDFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 8c186370d..c195ae070 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -349,6 +349,9 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont return nil, err } } + if params.cacheSize <= 0 { + params.cacheSize = uint(DefaultRelaySubscriptionBufferSize) + } for pubSubTopic, cTopics := range pubSubTopicMap { w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics)) @@ -365,7 +368,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont } } - subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize), + subscription := w.bcaster.Register(cFilter, WithBufferSize(int(params.cacheSize)), WithConsumerOption(params.dontConsume)) // Create Content subscription From d0df6959e5363141a2bce1523cb292e851610cb2 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 18:21:19 +0530 Subject: [PATCH 09/10] Chore/relay rpc api update (#867) * chore: update relay RPC to not use local message cache * chore: add unit tests for autosharding relay REST API, fix success response (#868) --- cmd/waku/server/rest/relay.go | 2 +- cmd/waku/server/rest/relay_test.go | 151 ++++++++++++++++++++++++++++- cmd/waku/server/rpc/relay.go | 64 ++---------- 3 files changed, 159 insertions(+), 58 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index a890169c8..d80e00733 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -203,7 +203,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. _, err := w.Write([]byte(err.Error())) r.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 4256530e0..36b4d9356 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "time" @@ -63,7 +64,7 @@ func TestRelaySubscription(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) @@ -86,7 +87,7 @@ func TestRelaySubscription(t *testing.T) { // Test deletion rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) @@ -119,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) @@ -159,3 +160,147 @@ func TestRelayGetV1Messages(t *testing.T) { require.Equal(t, http.StatusNotFound, rr.Code) } + +func TestPostAutoV1Message(t *testing.T) { + router := chi.NewRouter() + + _ = makeRelayService(t, router) + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "/toychat/1/huilong/proto", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) +} + +func TestRelayAutoSubUnsub(t *testing.T) { + router := chi.NewRouter() + + r := makeRelayService(t, router) + + go r.Start(context.Background()) + defer r.Stop() + + // Wait for node to start + time.Sleep(500 * time.Millisecond) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Test publishing messages after subscription + now := utils.GetUnixEpoch() + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage(cTopic1, now+1)) + require.NoError(t, err) + + // Wait for the messages to be processed + time.Sleep(5 * time.Millisecond) + + // Test deletion + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + cTopics = append(cTopics, "test") + topicsJSONBytes, err = json.Marshal(cTopics) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + +} + +func TestRelayGetV1AutoMessages(t *testing.T) { + router := chi.NewRouter() + router1 := chi.NewRouter() + + serviceA := makeRelayService(t, router) + go serviceA.Start(context.Background()) + defer serviceA.Stop() + + serviceB := makeRelayService(t, router1) + go serviceB.Start(context.Background()) + defer serviceB.Stop() + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: cTopic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJsonBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + var messages []*pb.WakuMessage + err = json.Unmarshal(rr.Body.Bytes(), &messages) + require.NoError(t, err) + require.Len(t, messages, 1) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) + +} diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 992dc40e3..4994706b0 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -3,12 +3,10 @@ package rpc import ( "fmt" "net/http" - "sync" "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -19,11 +17,7 @@ type RelayService struct { log *zap.Logger - messages map[string][]*pb.WakuMessage cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService } // RelayMessageArgs represents the requests used for posting messages @@ -53,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R node: node, cacheCapacity: cacheCapacity, log: log.Named("relay"), - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - // Start starts the RelayService func (r *RelayService) Start() { - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = make([]*pb.WakuMessage, 0) - } - r.messagesMutex.Unlock() - - r.runner.Start() } // Stop stops the RelayService func (r *RelayService) Stop() { - r.runner.Stop() } // PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method @@ -199,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r if topic == "" { topic = relay.DefaultWakuTopic } - var sub *relay.Subscription - subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err } - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = make([]*pb.WakuMessage, 0) - r.messagesMutex.Unlock() + } *reply = true @@ -225,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) return err } - - delete(r.messages, topic) } *reply = true @@ -235,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, // GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - if _, ok := r.messages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "") + if err != nil { + return err } - - for i := range r.messages[args.Topic] { - *reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i])) + select { + case msg := <-sub.Ch: + *reply = append(*reply, ProtoToRPC(msg.Message())) + default: + break } - - r.messages[args.Topic] = make([]*pb.WakuMessage, 0) - return nil } From bc718566af04555edd6b9a46bf971d7b0a64975a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 18:26:42 +0530 Subject: [PATCH 10/10] chore: fix test code --- cmd/waku/server/rest/relay_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 36b4d9356..7f10c9307 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -185,9 +185,6 @@ func TestRelayAutoSubUnsub(t *testing.T) { r := makeRelayService(t, router) - go r.Start(context.Background()) - defer r.Stop() - // Wait for node to start time.Sleep(500 * time.Millisecond) @@ -235,12 +232,8 @@ func TestRelayGetV1AutoMessages(t *testing.T) { router1 := chi.NewRouter() serviceA := makeRelayService(t, router) - go serviceA.Start(context.Background()) - defer serviceA.Stop() serviceB := makeRelayService(t, router1) - go serviceB.Start(context.Background()) - defer serviceB.Stop() hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) require.NoError(t, err)