Skip to content

Commit

Permalink
Chore/relay rpc api update (#867)
Browse files Browse the repository at this point in the history
* chore: update relay RPC to not use local message cache

* chore: add unit tests for autosharding relay REST API, fix success response (#868)
  • Loading branch information
chaitanyaprem authored Nov 7, 2023
1 parent 8fd4d0f commit d0df695
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}

}
Expand Down
151 changes: 148 additions & 3 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -63,7 +64,7 @@ func TestRelaySubscription(t *testing.T) {
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
Expand All @@ -86,7 +87,7 @@ func TestRelaySubscription(t *testing.T) {

// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) {
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)

Expand Down Expand Up @@ -159,3 +160,147 @@ func TestRelayGetV1Messages(t *testing.T) {
require.Equal(t, http.StatusNotFound, rr.Code)

}

func TestPostAutoV1Message(t *testing.T) {
router := chi.NewRouter()

_ = makeRelayService(t, router)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
}

func TestRelayAutoSubUnsub(t *testing.T) {
router := chi.NewRouter()

r := makeRelayService(t, router)

go r.Start(context.Background())
defer r.Stop()

// Wait for node to start
time.Sleep(500 * time.Millisecond)

cTopic1 := "/toychat/1/huilong/proto"

cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())

// Test publishing messages after subscription
now := utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage(cTopic1, now+1))
require.NoError(t, err)

// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)

// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())

cTopics = append(cTopics, "test")
topicsJSONBytes, err = json.Marshal(cTopics)
require.NoError(t, err)

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)

}

func TestRelayGetV1AutoMessages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()

serviceA := makeRelayService(t, router)
go serviceA.Start(context.Background())
defer serviceA.Stop()

serviceB := makeRelayService(t, router1)
go serviceB.Start(context.Background())
defer serviceB.Stop()

hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))
require.NoError(t, err)

var addr multiaddr.Multiaddr
for _, a := range serviceB.node.Host().Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
require.NoError(t, err)

// Wait for the dial to complete
time.Sleep(1 * time.Second)

cTopic1 := "/toychat/1/huilong/proto"

cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())

// Wait for the subscription to be started
time.Sleep(1 * time.Second)

msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: cTopic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
require.NoError(t, err)

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)

// Wait for the message to be received
time.Sleep(1 * time.Second)

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)

var messages []*pb.WakuMessage
err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 1)

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)

}
64 changes: 10 additions & 54 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package rpc
import (
"fmt"
"net/http"
"sync"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
Expand All @@ -19,11 +17,7 @@ type RelayService struct {

log *zap.Logger

messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex

runner *runnerService
}

// RelayMessageArgs represents the requests used for posting messages
Expand Down Expand Up @@ -53,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R
node: node,
cacheCapacity: cacheCapacity,
log: log.Named("relay"),
messages: make(map[string][]*pb.WakuMessage),
}

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

return s
}

func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}

// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}

r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start() {
r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = make([]*pb.WakuMessage, 0)
}
r.messagesMutex.Unlock()

r.runner.Start()
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
r.runner.Stop()
}

// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method
Expand Down Expand Up @@ -199,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
if topic == "" {
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
_, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()

}

*reply = true
Expand All @@ -225,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err
}

delete(r.messages, topic)
}

*reply = true
Expand All @@ -235,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,

// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[args.Topic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.Topic)
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "")
if err != nil {
return err
}

for i := range r.messages[args.Topic] {
*reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i]))
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}

r.messages[args.Topic] = make([]*pb.WakuMessage, 0)

return nil
}

0 comments on commit d0df695

Please sign in to comment.