Skip to content

Commit

Permalink
chore: sharding tests update (#1060)
Browse files Browse the repository at this point in the history
Co-authored-by: Prem Chaitanya Prathi <[email protected]>
Co-authored-by: richΛrd <[email protected]>
  • Loading branch information
3 people authored May 4, 2024
1 parent 28c2a27 commit a453c02
Show file tree
Hide file tree
Showing 6 changed files with 547 additions and 20 deletions.
36 changes: 36 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/waku-org/go-waku/waku/v2/protocol"
"io"
"math"
"math/big"
"net"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"

gcrypto "github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -385,3 +388,36 @@ func GenerateRandomSQLInsert(maxLength int) (string, error) {

return query, nil
}

func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
log := utils.Logger()
go func() {
defer wg.Done()
select {
case env := <-ch:
msg := env.Message()
log.Info("Received ", zap.String("msg", msg.String()))
case <-time.After(timeout):
require.Fail(t, "Message timeout")
}
}()
wg.Wait()
}

func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
go func() {
defer wg.Done()
select {
case _, ok := <-ch:
require.False(t, ok, "should not retrieve message")
case <-time.After(timeout):
// All good
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()

wg.Wait()
}
221 changes: 221 additions & 0 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package node
import (
"bytes"
"context"
"fmt"
"math/big"
"math/rand"
"net"
"os"
"sync"
"testing"
"time"

wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -99,6 +104,7 @@ func TestUpAndDown(t *testing.T) {
WithWakuRelay(),
WithDiscoveryV5(0, bootnodes, true),
)

require.NoError(t, err)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -319,3 +325,218 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.Len(t, result.Messages, 1)
require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
}

func TestStaticShardingMultipleTopics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testClusterID := uint16(20)

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithClusterID(testClusterID),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(0))
pubSubTopic1Str := pubSubTopic1.String()
contentTopic1 := "/test/2/my-app/sharded"

pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(10))
pubSubTopic2Str := pubSubTopic2.String()
contentTopic2 := "/test/3/my-app/sharded"

require.Equal(t, testClusterID, wakuNode1.ClusterID())

r := wakuNode1.Relay()

subs1, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic1Str, contentTopic1))
require.NoError(t, err)

subs2, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic2Str, contentTopic2))
require.NoError(t, err)

require.NotEqual(t, subs1[0].ID, subs2[0].ID)

require.True(t, r.IsSubscribed(pubSubTopic1Str))
require.True(t, r.IsSubscribed(pubSubTopic2Str))

s1, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic1Str, contentTopic1)
require.NoError(t, err)
s2, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic2Str, contentTopic2)
require.NoError(t, err)
require.Equal(t, s1.ID, subs1[0].ID)
require.Equal(t, s2.ID, subs2[0].ID)

// Wait for subscriptions
time.Sleep(1 * time.Second)

// Send message to subscribed topic
msg := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

_, err = r.Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic1Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

var wg sync.WaitGroup
wg.Add(1)
// Message msg could be retrieved
go func() {
defer wg.Done()
env, ok := <-subs1[0].Ch
require.True(t, ok, "no message retrieved")
require.Equal(t, msg.Timestamp, env.Message().Timestamp)
}()

wg.Wait()

// Send another message to non-subscribed pubsub topic, but subscribed content topic
msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2")
pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321))
pubSubTopic3Str := pubSubTopic3.String()
_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

// Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up
msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3")

_, err = r.Publish(ctx, msg3, relay.WithPubSubTopic(pubSubTopic1Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

}

func TestStaticShardingLimits(t *testing.T) {

log := utils.Logger()

if os.Getenv("RUN_FLAKY_TESTS") != "true" {

log.Info("Skipping", zap.String("test", t.Name()),
zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true"))
t.SkipNow()
}

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

testClusterID := uint16(21)

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithClusterID(testClusterID),
WithDiscoveryV5(uint(discv5UDPPort1), nil, true),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

// Node2 with Relay
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithWakuRelay(),
WithClusterID(testClusterID),
WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true),
)
require.NoError(t, err)
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()

err = wakuNode1.DiscV5().Start(ctx)
require.NoError(t, err)
err = wakuNode2.DiscV5().Start(ctx)
require.NoError(t, err)

// Wait for discovery
time.Sleep(3 * time.Second)

contentTopic1 := "/test/2/my-app/sharded"

r1 := wakuNode1.Relay()
r2 := wakuNode2.Relay()

var shardedPubSubTopics []string

// Subscribe topics related to static sharding
for i := 0; i < 1024; i++ {
shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i))
_, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
}

// Let ENR updates to finish
time.Sleep(3 * time.Second)

// Subscribe topics related to static sharding
for i := 0; i < 1024; i++ {
_, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
}

// Let ENR updates to finish
time.Sleep(3 * time.Second)

// Check ENR value after 1024 subscriptions
shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record())
require.NoError(t, err)
require.Equal(t, testClusterID, shardsENR.ClusterID)
require.Equal(t, 1, len(shardsENR.ShardIDs))

// Prepare message
msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

// Select shard to publish
randomShard := rand.Intn(1024)

// Check both nodes are subscribed
require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard]))
require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard]))

time.Sleep(1 * time.Second)

// Publish on node1
_, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard]))
require.NoError(t, err)

time.Sleep(1 * time.Second)

s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1)
require.NoError(t, err)

var wg sync.WaitGroup

// Retrieve on node2
tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch)

}
64 changes: 64 additions & 0 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,67 @@ func (s *FilterTestSuite) BeforeTest(suiteName, testName string) {
func (s *FilterTestSuite) AfterTest(suiteName, testName string) {
s.log.Info("Finished executing ", zap.String("testName", testName))
}

func (s *FilterTestSuite) TestStaticSharding() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel

// Gen pubsub topic "/waku/2/rs/100/100"
s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String()

// Pubsub topics for neg. test cases
testTopics := []string{
"/waku/2/rs/100/1024",
"/waku/2/rs/100/101",
}
s.testContentTopic = "/test/10/my-filter-app/proto"

// Prepare new nodes
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)

// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)

s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())

// Test positive case for static shard pubsub topic - message gets received
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)

// Test two negative cases for static shard pubsub topic - message times out
s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0]))
s.Require().NoError(err)

}, s.subDetails[0].C)

s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1]))
s.Require().NoError(err)

}, s.subDetails[0].C)

// Cleanup
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
ContentTopics: protocol.NewContentTopicSet(s.testContentTopic),
})
s.Require().NoError(err)

_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
Loading

0 comments on commit a453c02

Please sign in to comment.