diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index a890169c8..d80e00733 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -203,7 +203,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. _, err := w.Write([]byte(err.Error())) r.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 4256530e0..36b4d9356 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "time" @@ -63,7 +64,7 @@ func TestRelaySubscription(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) @@ -86,7 +87,7 @@ func TestRelaySubscription(t *testing.T) { // Test deletion rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) @@ -119,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) @@ -159,3 +160,147 @@ func TestRelayGetV1Messages(t *testing.T) { require.Equal(t, http.StatusNotFound, rr.Code) } + +func TestPostAutoV1Message(t *testing.T) { + router := chi.NewRouter() + + _ = makeRelayService(t, router) + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "/toychat/1/huilong/proto", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) +} + +func TestRelayAutoSubUnsub(t *testing.T) { + router := chi.NewRouter() + + r := makeRelayService(t, router) + + go r.Start(context.Background()) + defer r.Stop() + + // Wait for node to start + time.Sleep(500 * time.Millisecond) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Test publishing messages after subscription + now := utils.GetUnixEpoch() + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage(cTopic1, now+1)) + require.NoError(t, err) + + // Wait for the messages to be processed + time.Sleep(5 * time.Millisecond) + + // Test deletion + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + cTopics = append(cTopics, "test") + topicsJSONBytes, err = json.Marshal(cTopics) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + +} + +func TestRelayGetV1AutoMessages(t *testing.T) { + router := chi.NewRouter() + router1 := chi.NewRouter() + + serviceA := makeRelayService(t, router) + go serviceA.Start(context.Background()) + defer serviceA.Stop() + + serviceB := makeRelayService(t, router1) + go serviceB.Start(context.Background()) + defer serviceB.Stop() + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: cTopic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJsonBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + var messages []*pb.WakuMessage + err = json.Unmarshal(rr.Body.Bytes(), &messages) + require.NoError(t, err) + require.Len(t, messages, 1) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) + +} diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 992dc40e3..4994706b0 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -3,12 +3,10 @@ package rpc import ( "fmt" "net/http" - "sync" "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -19,11 +17,7 @@ type RelayService struct { log *zap.Logger - messages map[string][]*pb.WakuMessage cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService } // RelayMessageArgs represents the requests used for posting messages @@ -53,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R node: node, cacheCapacity: cacheCapacity, log: log.Named("relay"), - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - // Start starts the RelayService func (r *RelayService) Start() { - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = make([]*pb.WakuMessage, 0) - } - r.messagesMutex.Unlock() - - r.runner.Start() } // Stop stops the RelayService func (r *RelayService) Stop() { - r.runner.Stop() } // PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method @@ -199,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r if topic == "" { topic = relay.DefaultWakuTopic } - var sub *relay.Subscription - subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err } - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = make([]*pb.WakuMessage, 0) - r.messagesMutex.Unlock() + } *reply = true @@ -225,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) return err } - - delete(r.messages, topic) } *reply = true @@ -235,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, // GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - if _, ok := r.messages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "") + if err != nil { + return err } - - for i := range r.messages[args.Topic] { - *reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i])) + select { + case msg := <-sub.Ch: + *reply = append(*reply, ProtoToRPC(msg.Message())) + default: + break } - - r.messages[args.Topic] = make([]*pb.WakuMessage, 0) - return nil }