Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: sharding tests update #1060

Merged
merged 107 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
0e0ff46
test: node with static sharding and multiple pubSubTopics
romanzac Mar 19, 2024
bbf7487
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 19, 2024
77468d6
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 20, 2024
abec5b0
test: remove Unsubscribe() calls
romanzac Mar 20, 2024
a8689eb
fix: add content topics to TestStaticShardingMultipleTopics
romanzac Mar 20, 2024
a3ec282
fix: add IsSubscribed() test
romanzac Mar 20, 2024
3fe24f6
fix: switch to use GetSubscription instead of GetSubscriptionWithPubs…
romanzac Mar 21, 2024
c0e44b6
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 21, 2024
77f1f59
fix: switch back to use GetSubscriptionWithPubsubTopic
romanzac Mar 21, 2024
6495860
fix: validate with messages
romanzac Mar 23, 2024
ff01699
fix: extend deadline for context
romanzac Mar 23, 2024
95f3ff0
fix: use select for timeout wait
romanzac Mar 23, 2024
d26f178
fix: refactor repeated code into waitForTimeout
romanzac Mar 23, 2024
265909a
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 26, 2024
54edf8b
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 26, 2024
5a76773
Merge branch 'master' into chore-sharding-tests-update
romanzac Mar 26, 2024
dc4ed27
test: Waku store with static sharding
romanzac Mar 27, 2024
4266513
fix: add WithClusterID to see its effect
romanzac Mar 28, 2024
b9e700f
test: static sharding for lightPush
romanzac Apr 1, 2024
c15768b
Merge branch 'master' into chore-sharding-tests-update
romanzac Apr 1, 2024
2c66e91
test: static sharding for Filter
romanzac Apr 2, 2024
4e89a8f
fix: reuse tests.WaitForTimeout
romanzac Apr 2, 2024
def82f2
fix: reuse tests.WaitForMsg
romanzac Apr 2, 2024
3b571ba
fix: forgot to add utils.go
romanzac Apr 2, 2024
c80456d
test: static sharding for Relay
romanzac Apr 2, 2024
873caa8
test: static sharding limits
romanzac Apr 3, 2024
502ff9d
fix: remove named shards
romanzac Apr 4, 2024
72be483
fix: change subscription to every 10ms
romanzac Apr 4, 2024
9693bed
fix: add ENR shard info comparison
romanzac Apr 8, 2024
b6fc2cb
fix: remove expected shardIDs array
romanzac Apr 8, 2024
5de4bfa
Merge branch 'master' into chore-sharding-tests-update
romanzac Apr 8, 2024
8efec1e
fix: reset subscriptions back to 1 per 100ms
romanzac Apr 8, 2024
1cd52ee
fix: add message publish to test
romanzac Apr 10, 2024
891d9a9
fix: add missing error check
romanzac Apr 10, 2024
d27c0ed
fix: missed assigment for relay2
romanzac Apr 10, 2024
e1210c7
fix: add IsSubscribed check
romanzac Apr 10, 2024
b562771
test: log peerstore content before subscribing to topics
romanzac Apr 11, 2024
8d7e31b
test: reduce number of topics to one
romanzac Apr 15, 2024
99e3f33
fix: add timeout param to WaitForTimeout and WaitForMsg
romanzac Apr 22, 2024
adaace3
Merge branch 'master' into chore-sharding-tests-update
romanzac Apr 22, 2024
590e4ff
fix: revert test to desired form and mark it flaky
romanzac Apr 23, 2024
7404bb4
fix: keep same cluster ID for topics
romanzac May 2, 2024
12a8923
fix: start s2 store with different sharded topic
romanzac May 2, 2024
73a90e0
fix: change node2 to lightpush receiver
romanzac May 2, 2024
fa01d17
fix: returned node2 to support lightpush and relay
romanzac May 2, 2024
093ea36
fix: added comment and improved readability
romanzac May 2, 2024
dd960ef
fix: align clusterID with valid pubsub topics
romanzac May 3, 2024
2d0af55
chore: fix test to use same shard
chaitanyaprem May 3, 2024
77f6917
test: ci test
romanzac May 3, 2024
6a72e6d
test: ci test - try without mac os
romanzac May 3, 2024
1afb587
test: copy go mod from PR1091
romanzac May 3, 2024
3a1a7e6
test: copy go sum from PR1091
romanzac May 3, 2024
a05971b
Revert "test: copy go sum from PR1091"
romanzac May 3, 2024
0741101
Revert "test: copy go mod from PR1091"
romanzac May 3, 2024
8968e59
test: update go mod
romanzac May 3, 2024
a047dd8
test: delete ci_test
romanzac May 3, 2024
01803de
Revert "test: update go mod"
romanzac May 3, 2024
28c2a27
feat: storeV3 client (#1028)
richard-ramos May 3, 2024
268b53c
test: node with static sharding and multiple pubSubTopics
romanzac Mar 19, 2024
561d00a
test: remove Unsubscribe() calls
romanzac Mar 20, 2024
f1ecedb
fix: add content topics to TestStaticShardingMultipleTopics
romanzac Mar 20, 2024
6b1646d
fix: add IsSubscribed() test
romanzac Mar 20, 2024
a48ae3b
fix: switch to use GetSubscription instead of GetSubscriptionWithPubs…
romanzac Mar 21, 2024
157717c
fix: switch back to use GetSubscriptionWithPubsubTopic
romanzac Mar 21, 2024
b7e6ecf
fix: validate with messages
romanzac Mar 23, 2024
a3f4be7
fix: extend deadline for context
romanzac Mar 23, 2024
e056bda
fix: use select for timeout wait
romanzac Mar 23, 2024
548870b
fix: refactor repeated code into waitForTimeout
romanzac Mar 23, 2024
b71305b
test: Waku store with static sharding
romanzac Mar 27, 2024
061f9f8
fix: add WithClusterID to see its effect
romanzac Mar 28, 2024
2b5592e
test: static sharding for lightPush
romanzac Apr 1, 2024
2e77516
test: static sharding for Filter
romanzac Apr 2, 2024
658311d
fix: reuse tests.WaitForTimeout
romanzac Apr 2, 2024
70ff74e
fix: reuse tests.WaitForMsg
romanzac Apr 2, 2024
e8f9dca
fix: forgot to add utils.go
romanzac Apr 2, 2024
2904f56
test: static sharding for Relay
romanzac Apr 2, 2024
e764fcb
test: static sharding limits
romanzac Apr 3, 2024
5357773
fix: remove named shards
romanzac Apr 4, 2024
64e27d3
fix: change subscription to every 10ms
romanzac Apr 4, 2024
134efd7
fix: add ENR shard info comparison
romanzac Apr 8, 2024
fd5cf3e
fix: remove expected shardIDs array
romanzac Apr 8, 2024
b2e9b96
fix: reset subscriptions back to 1 per 100ms
romanzac Apr 8, 2024
63f2957
fix: add message publish to test
romanzac Apr 10, 2024
ce6ff45
fix: add missing error check
romanzac Apr 10, 2024
daf9ac0
fix: missed assigment for relay2
romanzac Apr 10, 2024
aa4a3f5
fix: add IsSubscribed check
romanzac Apr 10, 2024
efbcfaf
test: log peerstore content before subscribing to topics
romanzac Apr 11, 2024
a812491
test: reduce number of topics to one
romanzac Apr 15, 2024
5f84d25
fix: add timeout param to WaitForTimeout and WaitForMsg
romanzac Apr 22, 2024
1c80ade
fix: revert test to desired form and mark it flaky
romanzac Apr 23, 2024
6778ecb
fix: keep same cluster ID for topics
romanzac May 2, 2024
3ff86db
fix: start s2 store with different sharded topic
romanzac May 2, 2024
401e8aa
fix: change node2 to lightpush receiver
romanzac May 2, 2024
281e6cf
fix: returned node2 to support lightpush and relay
romanzac May 2, 2024
4b12401
fix: added comment and improved readability
romanzac May 2, 2024
c7883d8
fix: align clusterID with valid pubsub topics
romanzac May 3, 2024
9086397
chore: fix test to use same shard
chaitanyaprem May 3, 2024
1b7d246
test: ci test
romanzac May 3, 2024
1d263a8
test: ci test - try without mac os
romanzac May 3, 2024
8a8e660
test: copy go mod from PR1091
romanzac May 3, 2024
ce0d316
test: copy go sum from PR1091
romanzac May 3, 2024
cd3f1f9
Revert "test: copy go sum from PR1091"
romanzac May 3, 2024
3aef4a7
Revert "test: copy go mod from PR1091"
romanzac May 3, 2024
3b2c0c6
test: update go mod
romanzac May 3, 2024
b0491b0
test: delete ci_test
romanzac May 3, 2024
cbc3c4a
Revert "test: update go mod"
romanzac May 3, 2024
a369fa4
Merge remote-tracking branch 'origin/chore-sharding-tests-update' int…
romanzac May 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/waku-org/go-waku/waku/v2/protocol"
"io"
"math"
"math/big"
"net"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"

gcrypto "github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -385,3 +388,36 @@ func GenerateRandomSQLInsert(maxLength int) (string, error) {

return query, nil
}

func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
log := utils.Logger()
go func() {
defer wg.Done()
select {
case env := <-ch:
msg := env.Message()
log.Info("Received ", zap.String("msg", msg.String()))
case <-time.After(timeout):
require.Fail(t, "Message timeout")
}
}()
wg.Wait()
}

func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
go func() {
defer wg.Done()
select {
case _, ok := <-ch:
require.False(t, ok, "should not retrieve message")
case <-time.After(timeout):
// All good
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()

wg.Wait()
}
221 changes: 221 additions & 0 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package node
import (
"bytes"
"context"
"fmt"
"math/big"
"math/rand"
"net"
"os"
"sync"
"testing"
"time"

wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -98,6 +103,7 @@ func TestUpAndDown(t *testing.T) {
WithWakuRelay(),
WithDiscoveryV5(0, bootnodes, true),
)

require.NoError(t, err)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -318,3 +324,218 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.Len(t, result.Messages, 1)
require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
}

func TestStaticShardingMultipleTopics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testClusterID := uint16(20)

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
WithClusterID(testClusterID),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(0))
pubSubTopic1Str := pubSubTopic1.String()
contentTopic1 := "/test/2/my-app/sharded"

pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(10))
pubSubTopic2Str := pubSubTopic2.String()
contentTopic2 := "/test/3/my-app/sharded"

require.Equal(t, testClusterID, wakuNode1.ClusterID())

r := wakuNode1.Relay()

subs1, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic1Str, contentTopic1))
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

subs2, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic2Str, contentTopic2))
require.NoError(t, err)

require.NotEqual(t, subs1[0].ID, subs2[0].ID)

require.True(t, r.IsSubscribed(pubSubTopic1Str))
require.True(t, r.IsSubscribed(pubSubTopic2Str))

s1, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic1Str, contentTopic1)
require.NoError(t, err)
s2, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic2Str, contentTopic2)
require.NoError(t, err)
require.Equal(t, s1.ID, subs1[0].ID)
require.Equal(t, s2.ID, subs2[0].ID)

// Wait for subscriptions
time.Sleep(1 * time.Second)

// Send message to subscribed topic
msg := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

_, err = r.Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic1Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

var wg sync.WaitGroup
wg.Add(1)
// Message msg could be retrieved
go func() {
defer wg.Done()
env, ok := <-subs1[0].Ch
require.True(t, ok, "no message retrieved")
require.Equal(t, msg.Timestamp, env.Message().Timestamp)
}()

wg.Wait()

// Send another message to non-subscribed pubsub topic, but subscribed content topic
msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2")
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321))
pubSubTopic3Str := pubSubTopic3.String()
_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

// Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3")

_, err = r.Publish(ctx, msg3, relay.WithPubSubTopic(pubSubTopic1Str))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

}

func TestStaticShardingLimits(t *testing.T) {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

log := utils.Logger()

if os.Getenv("RUN_FLAKY_TESTS") != "true" {

log.Info("Skipping", zap.String("test", t.Name()),
zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true"))
t.SkipNow()
}

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

testClusterID := uint16(21)

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithClusterID(testClusterID),
WithDiscoveryV5(uint(discv5UDPPort1), nil, true),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

// Node2 with Relay
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithWakuRelay(),
WithClusterID(testClusterID),
WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true),
)
require.NoError(t, err)
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()

err = wakuNode1.DiscV5().Start(ctx)
require.NoError(t, err)
err = wakuNode2.DiscV5().Start(ctx)
require.NoError(t, err)

// Wait for discovery
time.Sleep(3 * time.Second)

contentTopic1 := "/test/2/my-app/sharded"

r1 := wakuNode1.Relay()
r2 := wakuNode2.Relay()

var shardedPubSubTopics []string

// Subscribe topics related to static sharding
for i := 0; i < 1024; i++ {
shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i))
_, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
}

// Let ENR updates to finish
time.Sleep(3 * time.Second)

// Subscribe topics related to static sharding
for i := 0; i < 1024; i++ {
_, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
}

// Let ENR updates to finish
time.Sleep(3 * time.Second)

// Check ENR value after 1024 subscriptions
shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record())
require.NoError(t, err)
require.Equal(t, testClusterID, shardsENR.ClusterID)
require.Equal(t, 1, len(shardsENR.ShardIDs))

// Prepare message
msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

// Select shard to publish
randomShard := rand.Intn(1024)

// Check both nodes are subscribed
require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard]))
require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard]))

time.Sleep(1 * time.Second)

// Publish on node1
_, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard]))
require.NoError(t, err)

time.Sleep(1 * time.Second)

s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1)
require.NoError(t, err)

var wg sync.WaitGroup

// Retrieve on node2
tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch)

}
64 changes: 64 additions & 0 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,67 @@ func (s *FilterTestSuite) BeforeTest(suiteName, testName string) {
func (s *FilterTestSuite) AfterTest(suiteName, testName string) {
s.log.Info("Finished executing ", zap.String("testName", testName))
}

func (s *FilterTestSuite) TestStaticSharding() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel

// Gen pubsub topic "/waku/2/rs/100/100"
s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String()

// Pubsub topics for neg. test cases
testTopics := []string{
"/waku/2/rs/100/1024",
"/waku/2/rs/100/101",
}
s.testContentTopic = "/test/10/my-filter-app/proto"

// Prepare new nodes
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)

// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)

s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())

// Test positive case for static shard pubsub topic - message gets received
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)

}, s.subDetails[0].C)

// Test two negative cases for static shard pubsub topic - message times out
s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0]))
s.Require().NoError(err)

}, s.subDetails[0].C)

s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1]))
s.Require().NoError(err)

}, s.subDetails[0].C)

// Cleanup
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
ContentTopics: protocol.NewContentTopicSet(s.testContentTopic),
})
s.Require().NoError(err)

_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
Loading
Loading