Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update relay REST and RPC API's and fix unit tests #866

Merged
merged 11 commits into from
Nov 7, 2023
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ lint-full:
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m

test-with-race:
${GOBIN} test -race -timeout 300s ./waku/...
${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/...

test:
${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE}
${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}

Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
var peerIds string
ind := 0
for _, entry := range result.Errors() {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
peerIds += entry.PeerID.String()
}
peerIds += entry.PeerID.String()
ind++
}
if peerIds != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) {
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/lightpush_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}
}
122 changes: 26 additions & 96 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package rest

import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strings"
"sync"

"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
Expand All @@ -26,16 +22,11 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages"

// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
cancel context.CancelFunc
node *node.WakuNode

log *zap.Logger

messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex

runner *runnerService
}

// NewRelayService returns an instance of RelayService
Expand All @@ -44,11 +35,8 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
messages: make(map[string][]*pb.WakuMessage),
}

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
Expand All @@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
return s
}

func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}

// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}

r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{}
}
r.messagesMutex.Unlock()

r.runner.Start(ctx)
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}

func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
Expand All @@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
}
defer req.Body.Close()

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
delete(r.messages, topic)
}
}

Expand All @@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
defer req.Body.Close()

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
r.messagesMutex.Unlock()
continue
}
successCnt++
}

// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}

writeErrOrResponse(w, err, true)
Expand All @@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
if topic == "" {
return
}

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[topic]; !ok {
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte("not subscribed to topic"))
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}

response := r.messages[topic]

r.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response)
}

Expand All @@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}

if !r.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}

if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
Expand Down Expand Up @@ -266,21 +209,8 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
}

func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}

cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)
Expand Down
48 changes: 18 additions & 30 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

Expand All @@ -34,7 +34,6 @@ func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()

_ = makeRelayService(t, router)

msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Expand All @@ -54,10 +53,7 @@ func TestPostV1Message(t *testing.T) {
func TestRelaySubscription(t *testing.T) {
router := chi.NewRouter()

d := makeRelayService(t, router)

go d.Start(context.Background())
defer d.Stop()
r := makeRelayService(t, router)

// Wait for node to start
time.Sleep(500 * time.Millisecond)
Expand All @@ -74,41 +70,35 @@ func TestRelaySubscription(t *testing.T) {

// Test max messages in subscription
now := utils.GetUnixEpoch()
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test"))

// Wait for the messages to be processed
time.Sleep(500 * time.Millisecond)

require.Len(t, d.messages["test"], 3)

d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test"))
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test"))
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test"))
require.NoError(t, err)

// Should only have 3 messages
require.Len(t, d.messages["test"], 3)
// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)

// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
require.Len(t, d.messages["test"], 0)

}

func TestRelayGetV1Messages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()

serviceA := makeRelayService(t, router)
go serviceA.Start(context.Background())
defer serviceA.Stop()
serviceB := makeRelayService(t, router)
go serviceB.Start(context.Background())
defer serviceB.Stop()

serviceB := makeRelayService(t, router1)

hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))
require.NoError(t, err)
Expand Down Expand Up @@ -165,9 +155,7 @@ func TestRelayGetV1Messages(t *testing.T) {

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)

err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 0)
}
6 changes: 0 additions & 6 deletions cmd/waku/server/rest/waku_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool

if node.Relay() != nil {
relayService := NewRelayService(node, mux, relayCacheCapacity, log)
server.RegisterOnShutdown(func() {
relayService.Stop()
})
wrpc.relayService = relayService
}

Expand All @@ -75,9 +72,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

if r.node.Relay() != nil {
go r.relayService.Start(ctx)
}
if r.node.FilterLightnode() != nil {
go r.filterService.Start(ctx)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/server/rpc/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) {

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcast := relay.NewBroadcaster(10)
relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
Expand Down
Loading