diff --git a/.github/.env b/.github/.env deleted file mode 100644 index 8789b588c..000000000 --- a/.github/.env +++ /dev/null @@ -1 +0,0 @@ -go_version=1.20 diff --git a/.github/docker-compose/nwaku.yml b/.github/docker-compose/nwaku.yml new file mode 100644 index 000000000..b8371066a --- /dev/null +++ b/.github/docker-compose/nwaku.yml @@ -0,0 +1,6 @@ +services: + nwaku: + image: "harbor.status.im/wakuorg/nwaku:latest" + command: ["--relay", "--store", "--nodekey=1122334455667788990011223344556677889900112233445566778899001122"] + ports: + - "60000" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2254a822e..690b3da9b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,13 +52,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - - uses: xom9ikk/dotenv@v2 - with: - path: ".github/" - - run: | - echo "go_version=${{ env.GO_VERSION }}" >> $GITHUB_OUTPUT - - run: | VERSION=$(cat ./VERSION) echo "waku_version=$VERSION" >> $GITHUB_OUTPUT @@ -73,9 +66,9 @@ jobs: uses: actions/checkout@v3 - name: Install Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: ${{ needs.env.outputs.go_version }} + go-version-file: 'go.mod' cache: false - name: Execute golangci-lint @@ -112,9 +105,9 @@ jobs: key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }} - name: Install Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: ${{ needs.env.outputs.go_version }} + go-version-file: 'go.mod' cache: false - name: Build binary @@ -154,9 +147,9 @@ jobs: key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }} - name: Install Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: ${{ needs.env.outputs.go_version }} + go-version-file: 'go.mod' cache: false - name: "Run tests" @@ -166,3 +159,11 @@ jobs: run: | docker compose -f .github/docker-compose/ganache.yml up -d make test-onchain${{ matrix.tests == 'test-with-race' && '-with-race' || '' }} + + - name: "Run storev3 tests" + run: | + docker compose -f .github/docker-compose/nwaku.yml up -d + NWAKU_HOST=$(docker-compose -f .github/docker-compose/nwaku.yml port nwaku 60000) + NWAKU_PORT=$(echo $NWAKU_HOST | cut -d ":" -f 2) + sleep 5 + make test-storev3 TEST_STOREV3_NODE="/ip4/127.0.0.1/tcp/${NWAKU_PORT}/p2p/16Uiu2HAmMGhfSTUzKbsjMWxc6T1X4wiTWSF1bEWSLjAukCm7KiHV" diff --git a/Makefile b/Makefile index daf97d1e8..df2f096e6 100644 --- a/Makefile +++ b/Makefile @@ -219,3 +219,7 @@ test-postgres: test-postgres-with-race: ${GOCMD} test -race -p 1 -v -count 1 -tags="${PG_BUILD_TAGS}" github.com/waku-org/go-waku/waku/persistence/... + +TEST_STOREV3_NODE ?= +test-storev3: + TEST_STOREV3_NODE=${TEST_STOREV3_NODE} ${GOCMD} test -p 1 -v -count 1 -tags="${BUILD_TAGS} include_storev3_tests" github.com/waku-org/go-waku/waku/v2/protocol/store/... \ No newline at end of file diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 4a5a352bd..976d10242 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -46,10 +46,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" humanize "github.com/dustin/go-humanize" @@ -336,7 +336,7 @@ func Execute(options NodeOptions) error { //For now assuming that static peers added support/listen on all topics specified via commandLine. staticPeers := map[protocol.ID][]multiaddr.Multiaddr{ - store.StoreID_v20beta4: options.Store.Nodes, + legacy_store.StoreID_v20beta4: options.Store.Nodes, lightpush.LightPushID_v20beta1: options.LightPush.Nodes, rendezvous.RendezvousID: options.Rendezvous.Nodes, filter.FilterSubscribeID_v20beta1: options.Filter.Nodes, diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index 723acabfd..f1435eb81 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -12,8 +12,8 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" ) type StoreService struct { @@ -55,9 +55,9 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { return s } -func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) { - query := &store.Query{} - var options []store.HistoryRequestOption +func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.HistoryRequestOption, error) { + query := &legacy_store.Query{} + var options []legacy_store.HistoryRequestOption var err error peerAddrStr := r.URL.Query().Get("peerAddr") var m multiaddr.Multiaddr @@ -66,12 +66,12 @@ func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption if err != nil { return nil, nil, err } - options = append(options, store.WithPeerAddr(m)) + options = append(options, legacy_store.WithPeerAddr(m)) } else { // The user didn't specify a peer address and self-node is configured as a store node. // In this case we assume that the user is willing to retrieve the messages stored by // the local/self store node. - options = append(options, store.WithLocalQuery()) + options = append(options, legacy_store.WithLocalQuery()) } query.PubsubTopic = r.URL.Query().Get("pubsubTopic") @@ -131,14 +131,14 @@ func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption cursor.PubsubTopic = query.PubsubTopic - options = append(options, store.WithCursor(cursor)) + options = append(options, legacy_store.WithCursor(cursor)) } pageSizeStr := r.URL.Query().Get("pageSize") ascendingStr := r.URL.Query().Get("ascending") if ascendingStr != "" || pageSizeStr != "" { ascending := true - pageSize := uint64(store.DefaultPageSize) + pageSize := uint64(legacy_store.DefaultPageSize) if ascendingStr != "" { ascending, err = strconv.ParseBool(ascendingStr) if err != nil { @@ -151,12 +151,12 @@ func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption if err != nil { return nil, nil, err } - if pageSize > store.MaxPageSize { - pageSize = store.MaxPageSize + if pageSize > legacy_store.MaxPageSize { + pageSize = legacy_store.MaxPageSize } } - options = append(options, store.WithPaging(ascending, pageSize)) + options = append(options, legacy_store.WithPaging(ascending, pageSize)) } return query, options, nil @@ -166,7 +166,7 @@ func writeStoreError(w http.ResponseWriter, code int, err error) { writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code) } -func toStoreResponse(result *store.Result) StoreResponse { +func toStoreResponse(result *legacy_store.Result) StoreResponse { response := StoreResponse{} cursor := result.Cursor() @@ -202,7 +202,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - result, err := d.node.Store().Query(ctx, *query, options...) + result, err := d.node.LegacyStore().Query(ctx, *query, options...) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) return diff --git a/cmd/waku/server/utils.go b/cmd/waku/server/utils.go index 865d09009..1c12c8ae1 100644 --- a/cmd/waku/server/utils.go +++ b/cmd/waku/server/utils.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -16,7 +17,8 @@ func IsWakuProtocol(protocol protocol.ID) bool { protocol == filter.FilterSubscribeID_v20beta1 || protocol == relay.WakuRelayID_v200 || protocol == lightpush.LightPushID_v20beta1 || - protocol == store.StoreID_v20beta4 + protocol == legacy_store.StoreID_v20beta4 || + protocol == store.StoreQueryID_v300 } type Base64URLByte []byte diff --git a/examples/basic-relay/main.go b/examples/basic-relay/main.go index 86aef53d7..4536701ca 100644 --- a/examples/basic-relay/main.go +++ b/examples/basic-relay/main.go @@ -191,7 +191,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms if err != nil { log.Error("Error sending a message", zap.Error(err)) } - log.Info("Published msg,", zap.String("data", string(msg.Payload)), logging.HexBytes("hash", hash)) + log.Info("Published msg,", zap.String("data", string(msg.Payload)), logging.HexBytes("hash", hash.Bytes())) } func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) { diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 6a65cea4d..48c39d4fd 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -17,11 +17,11 @@ import ( "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" "google.golang.org/protobuf/proto" ) @@ -368,10 +368,10 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { return } - var storeOpt store.HistoryRequestOption + var storeOpt legacy_store.HistoryRequestOption if c.options.Store.Node == nil { c.ui.InfoMessage("No store node configured. Choosing one at random...") - storeOpt = store.WithAutomaticPeerSelection() + storeOpt = legacy_store.WithAutomaticPeerSelection() } else { peerID, err := (*c.options.Store.Node).ValueForProtocol(multiaddr.P_P2P) if err != nil { @@ -383,7 +383,7 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { c.ui.ErrorMessage(err) return } - storeOpt = store.WithPeer(pID) + storeOpt = legacy_store.WithPeer(pID) c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", peerID)) } @@ -391,14 +391,14 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second) defer cancel() - q := store.Query{ + q := legacy_store.Query{ ContentTopics: []string{options.ContentTopic}, } - response, err := c.node.Store().Query(tCtx, q, - store.WithAutomaticRequestID(), + response, err := c.node.LegacyStore().Query(tCtx, q, + legacy_store.WithAutomaticRequestID(), storeOpt, - store.WithPaging(false, 100)) + legacy_store.WithPaging(false, 100)) if err != nil { c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err)) diff --git a/examples/chat2/exec.go b/examples/chat2/exec.go index 266b03151..2882960c6 100644 --- a/examples/chat2/exec.go +++ b/examples/chat2/exec.go @@ -12,9 +12,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" ) func execute(options Options) { @@ -77,7 +77,7 @@ func execute(options Options) { return } - err = addPeer(wakuNode, options.Store.Node, options.Relay.Topics.Value(), store.StoreID_v20beta4) + err = addPeer(wakuNode, options.Store.Node, options.Relay.Topics.Value(), legacy_store.StoreID_v20beta4) if err != nil { fmt.Println(err.Error()) return diff --git a/library/lightpush.go b/library/lightpush.go index 1714b2d69..e1e353d2b 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -6,7 +6,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" ) @@ -42,7 +41,8 @@ func lightpushPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic s } hash, err := instance.node.Lightpush().Publish(ctx, msg, lpOptions...) - return hexutil.Encode(hash), err + + return hash.String(), err } // LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol diff --git a/library/node.go b/library/node.go index d8b787986..f1e49b2c7 100644 --- a/library/node.go +++ b/library/node.go @@ -16,7 +16,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -444,7 +443,7 @@ type subscriptionMsg struct { func toSubscriptionMessage(msg *protocol.Envelope) *subscriptionMsg { return &subscriptionMsg{ - MessageID: hexutil.Encode(msg.Hash()), + MessageID: msg.Hash().String(), PubsubTopic: msg.PubsubTopic(), Message: msg.Message(), } diff --git a/library/relay.go b/library/relay.go index 452b56bc8..666904e5f 100644 --- a/library/relay.go +++ b/library/relay.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -40,7 +39,8 @@ func relayPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic strin } hash, err := instance.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic)) - return hexutil.Encode(hash), err + + return hash.String(), err } // RelayPublish publishes a message using waku relay and returns the message ID diff --git a/library/store.go b/library/store.go index 59ad2703d..aaac40cf7 100644 --- a/library/store.go +++ b/library/store.go @@ -4,15 +4,15 @@ import ( "C" "encoding/json" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) import ( "context" "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" ) type storePagingOptions struct { @@ -35,10 +35,10 @@ type storeMessagesReply struct { Error string `json:"error,omitempty"` } -func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) { - res, err := instance.node.Store().Query( +func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []legacy_store.HistoryRequestOption) (string, error) { + res, err := instance.node.LegacyStore().Query( ctx, - store.Query{ + legacy_store.Query{ PubsubTopic: args.Topic, ContentTopics: args.ContentTopics, StartTime: args.StartTime, @@ -75,10 +75,10 @@ func StoreQuery(instance *WakuInstance, queryJSON string, peerID string, ms int) return "", err } - options := []store.HistoryRequestOption{ - store.WithAutomaticRequestID(), - store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), - store.WithCursor(args.PagingOptions.Cursor), + options := []legacy_store.HistoryRequestOption{ + legacy_store.WithAutomaticRequestID(), + legacy_store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), + legacy_store.WithCursor(args.PagingOptions.Cursor), } if peerID != "" { @@ -86,9 +86,9 @@ func StoreQuery(instance *WakuInstance, queryJSON string, peerID string, ms int) if err != nil { return "", err } - options = append(options, store.WithPeer(p)) + options = append(options, legacy_store.WithPeer(p)) } else { - options = append(options, store.WithAutomaticPeerSelection()) + options = append(options, legacy_store.WithAutomaticPeerSelection()) } var ctx context.Context @@ -116,11 +116,11 @@ func StoreLocalQuery(instance *WakuInstance, queryJSON string) (string, error) { return "", err } - options := []store.HistoryRequestOption{ - store.WithAutomaticRequestID(), - store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), - store.WithCursor(args.PagingOptions.Cursor), - store.WithLocalQuery(), + options := []legacy_store.HistoryRequestOption{ + legacy_store.WithAutomaticRequestID(), + legacy_store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), + legacy_store.WithCursor(args.PagingOptions.Cursor), + legacy_store.WithLocalQuery(), } return queryResponse(instance.ctx, instance, args, options) diff --git a/logging/logging.go b/logging/logging.go index fb0295ae3..7e872eefc 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -15,7 +15,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" + wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -128,6 +129,10 @@ func (bytes hexBytes) String() string { return hexutil.Encode(bytes) } +func Hash(hash wpb.MessageHash) zap.Field { + return zap.Stringer("hash", hash) +} + // ENode creates a field for ENR node. func ENode(key string, node *enode.Node) zap.Field { return zap.Stringer(key, node) diff --git a/tests/utils.go b/tests/utils.go index 10c99be63..8b9dec223 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/waku-org/go-waku/waku/v2/protocol" "io" "math" "math/big" @@ -16,7 +17,9 @@ import ( "net/url" "strconv" "strings" + "sync" "testing" + "time" "unicode/utf8" gcrypto "github.com/ethereum/go-ethereum/crypto" @@ -385,3 +388,36 @@ func GenerateRandomSQLInsert(maxLength int) (string, error) { return query, nil } + +func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + log := utils.Logger() + go func() { + defer wg.Done() + select { + case env := <-ch: + msg := env.Message() + log.Info("Received ", zap.String("msg", msg.String())) + case <-time.After(timeout): + require.Fail(t, "Message timeout") + } + }() + wg.Wait() +} + +func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + go func() { + defer wg.Done() + select { + case _, ok := <-ch: + require.False(t, ok, "should not retrieve message") + case <-time.After(timeout): + // All good + case <-ctx.Done(): + require.Fail(t, "test exceeded allocated time") + } + }() + + wg.Wait() +} diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 574d1877d..10540c7ce 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -11,8 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -317,8 +317,10 @@ func (d *DBStore) Put(env *protocol.Envelope) error { storedAt = env.Index().ReceiverTime } + hash := env.Hash() + start := time.Now() - _, err = stmt.Exec(env.Index().Digest, env.Hash(), storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) + _, err = stmt.Exec(env.Index().Digest, hash[:], storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) if err != nil { return err } diff --git a/waku/persistence/utils/store_test.go b/waku/persistence/utils/store_test.go index fee14f65a..743f1fefa 100644 --- a/waku/persistence/utils/store_test.go +++ b/waku/persistence/utils/store_test.go @@ -17,7 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/postgres" "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5ef808c95..323958e58 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -36,6 +36,7 @@ import ( wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -59,7 +60,7 @@ type Peer struct { PubsubTopics []string `json:"pubsubTopics"` } -type storeFactory func(w *WakuNode) store.Store +type storeFactory func(w *WakuNode) legacy_store.Store type byte32 = [32]byte @@ -100,7 +101,8 @@ type WakuNode struct { metadata Service filterFullNode ReceptorService filterLightNode Service - store ReceptorService + legacyStore ReceptorService + store *store.WakuStore rlnRelay RLNRelay wakuFlag enr.WakuEnrBitfield @@ -125,8 +127,8 @@ type WakuNode struct { peermanager *peermanager.PeerManager } -func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) +func defaultStoreFactory(w *WakuNode) legacy_store.Store { + return legacy_store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -291,6 +293,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + if params.storeFactory != nil { w.storeFactory = params.storeFactory } else { @@ -422,8 +426,8 @@ func (w *WakuNode) Start(ctx context.Context) error { w.registerAndMonitorReachability(ctx) } - w.store = w.storeFactory(w) - w.store.SetHost(host) + w.legacyStore = w.storeFactory(w) + w.legacyStore.SetHost(host) if w.opts.enableStore { sub := w.bcaster.RegisterForAll() err := w.startStore(ctx, sub) @@ -433,6 +437,8 @@ func (w *WakuNode) Start(ctx context.Context) error { w.log.Info("Subscribing store to broadcaster") } + w.store.SetHost(host) + w.lightPush.SetHost(host) if w.opts.enableLightPush { if err := w.lightPush.Start(ctx); err != nil { @@ -498,7 +504,7 @@ func (w *WakuNode) Stop() { w.relay.Stop() w.lightPush.Stop() - w.store.Stop() + w.legacyStore.Stop() w.filterFullNode.Stop() w.filterLightNode.Stop() @@ -583,9 +589,14 @@ func (w *WakuNode) Relay() *relay.WakuRelay { return nil } +// LegacyStore is used to access any operation related to Waku Store protocol +func (w *WakuNode) LegacyStore() legacy_store.Store { + return w.legacyStore.(legacy_store.Store) +} + // Store is used to access any operation related to Waku Store protocol -func (w *WakuNode) Store() store.Store { - return w.store.(store.Store) +func (w *WakuNode) Store() *store.WakuStore { + return w.store } // FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature @@ -667,7 +678,7 @@ func (w *WakuNode) mountDiscV5() error { } func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) error { - err := w.store.Start(ctx, sub) + err := w.legacyStore.Start(ctx, sub) if err != nil { w.log.Error("starting store", zap.Error(err)) return err diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 484cb3090..d4ca453c9 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -3,12 +3,17 @@ package node import ( "bytes" "context" + "fmt" "math/big" + "math/rand" "net" + "os" "sync" "testing" "time" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/prometheus/client_golang/prometheus" @@ -20,9 +25,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -98,6 +104,7 @@ func TestUpAndDown(t *testing.T) { WithWakuRelay(), WithDiscoveryV5(0, bootnodes, true), ) + require.NoError(t, err) for i := 0; i < 5; i++ { @@ -309,12 +316,227 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode3.Stop() - _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4) + _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4) require.NoError(t, err) time.Sleep(2 * time.Second) // NODE2 should have returned the message received via filter - result, err := wakuNode3.Store().Query(ctx, store.Query{}) + result, err := wakuNode3.LegacyStore().Query(ctx, legacy_store.Query{}) require.NoError(t, err) require.Len(t, result.Messages, 1) require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp) } + +func TestStaticShardingMultipleTopics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testClusterID := uint16(20) + + // Node1 with Relay + hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + wakuNode1, err := New( + WithHostAddress(hostAddr1), + WithWakuRelay(), + WithClusterID(testClusterID), + ) + require.NoError(t, err) + err = wakuNode1.Start(ctx) + require.NoError(t, err) + defer wakuNode1.Stop() + + pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(0)) + pubSubTopic1Str := pubSubTopic1.String() + contentTopic1 := "/test/2/my-app/sharded" + + pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(10)) + pubSubTopic2Str := pubSubTopic2.String() + contentTopic2 := "/test/3/my-app/sharded" + + require.Equal(t, testClusterID, wakuNode1.ClusterID()) + + r := wakuNode1.Relay() + + subs1, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic1Str, contentTopic1)) + require.NoError(t, err) + + subs2, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic2Str, contentTopic2)) + require.NoError(t, err) + + require.NotEqual(t, subs1[0].ID, subs2[0].ID) + + require.True(t, r.IsSubscribed(pubSubTopic1Str)) + require.True(t, r.IsSubscribed(pubSubTopic2Str)) + + s1, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic1Str, contentTopic1) + require.NoError(t, err) + s2, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic2Str, contentTopic2) + require.NoError(t, err) + require.Equal(t, s1.ID, subs1[0].ID) + require.Equal(t, s2.ID, subs2[0].ID) + + // Wait for subscriptions + time.Sleep(1 * time.Second) + + // Send message to subscribed topic + msg := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message") + + _, err = r.Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic1Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + wg.Add(1) + // Message msg could be retrieved + go func() { + defer wg.Done() + env, ok := <-subs1[0].Ch + require.True(t, ok, "no message retrieved") + require.Equal(t, msg.Timestamp, env.Message().Timestamp) + }() + + wg.Wait() + + // Send another message to non-subscribed pubsub topic, but subscribed content topic + msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2") + pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321)) + pubSubTopic3Str := pubSubTopic3.String() + _, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // No message could be retrieved + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch) + + // Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up + msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3") + + _, err = r.Publish(ctx, msg3, relay.WithPubSubTopic(pubSubTopic1Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // No message could be retrieved + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch) + +} + +func TestStaticShardingLimits(t *testing.T) { + + log := utils.Logger() + + if os.Getenv("RUN_FLAKY_TESTS") != "true" { + + log.Info("Skipping", zap.String("test", t.Name()), + zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true")) + t.SkipNow() + } + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + testClusterID := uint16(21) + + // Node1 with Relay + hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3) + require.NoError(t, err) + wakuNode1, err := New( + WithHostAddress(hostAddr1), + WithWakuRelay(), + WithClusterID(testClusterID), + WithDiscoveryV5(uint(discv5UDPPort1), nil, true), + ) + require.NoError(t, err) + err = wakuNode1.Start(ctx) + require.NoError(t, err) + defer wakuNode1.Stop() + + // Node2 with Relay + hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3) + require.NoError(t, err) + wakuNode2, err := New( + WithHostAddress(hostAddr2), + WithWakuRelay(), + WithClusterID(testClusterID), + WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true), + ) + require.NoError(t, err) + err = wakuNode2.Start(ctx) + require.NoError(t, err) + defer wakuNode2.Stop() + + err = wakuNode1.DiscV5().Start(ctx) + require.NoError(t, err) + err = wakuNode2.DiscV5().Start(ctx) + require.NoError(t, err) + + // Wait for discovery + time.Sleep(3 * time.Second) + + contentTopic1 := "/test/2/my-app/sharded" + + r1 := wakuNode1.Relay() + r2 := wakuNode2.Relay() + + var shardedPubSubTopics []string + + // Subscribe topics related to static sharding + for i := 0; i < 1024; i++ { + shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i)) + _, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1)) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + } + + // Let ENR updates to finish + time.Sleep(3 * time.Second) + + // Subscribe topics related to static sharding + for i := 0; i < 1024; i++ { + _, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1)) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + } + + // Let ENR updates to finish + time.Sleep(3 * time.Second) + + // Check ENR value after 1024 subscriptions + shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record()) + require.NoError(t, err) + require.Equal(t, testClusterID, shardsENR.ClusterID) + require.Equal(t, 1, len(shardsENR.ShardIDs)) + + // Prepare message + msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message") + + // Select shard to publish + randomShard := rand.Intn(1024) + + // Check both nodes are subscribed + require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard])) + require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard])) + + time.Sleep(1 * time.Second) + + // Publish on node1 + _, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard])) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Retrieve on node2 + tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch) + +} diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 9372d4561..60608f779 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -28,8 +28,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -83,7 +83,7 @@ type WakuNodeParameters struct { maxMsgSizeBytes int enableStore bool - messageProvider store.MessageProvider + messageProvider legacy_store.MessageProvider enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -448,7 +448,7 @@ func WithWakuStoreFactory(factory storeFactory) WakuNodeOption { // WithMessageProvider is a WakuNodeOption that sets the MessageProvider // used to store and retrieve persisted messages -func WithMessageProvider(s store.MessageProvider) WakuNodeOption { +func WithMessageProvider(s legacy_store.MessageProvider) WakuNodeOption { return func(params *WakuNodeParameters) error { if s == nil { return errors.New("message provider can't be nil") diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index eb4793d6b..751c71582 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" r "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" @@ -17,7 +18,6 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/v2/protocol/store" ) func handleSpam(msg *pb.WakuMessage, topic string) error { @@ -43,8 +43,8 @@ func TestWakuOptions(t *testing.T) { addr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4000/ws") require.NoError(t, err) - storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, prometheus.DefaultRegisterer, w.log) + storeFactory := func(w *WakuNode) legacy_store.Store { + return legacy_store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, prometheus.DefaultRegisterer, w.log) } options := []WakuNodeOption{ @@ -88,8 +88,8 @@ func TestWakuRLNOptions(t *testing.T) { addr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4000/ws") require.NoError(t, err) - storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, prometheus.DefaultRegisterer, w.log) + storeFactory := func(w *WakuNode) legacy_store.Store { + return legacy_store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, prometheus.DefaultRegisterer, w.log) } index := r.MembershipIndex(5) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 41dd27697..a49a7b9bd 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -167,7 +167,6 @@ func TestPeerSelection(t *testing.T) { require.NoError(t, err) peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) - fmt.Println("peerIDs", peerIDs) require.Equal(t, 2, peerIDs.Len()) require.NoError(t, err) diff --git a/waku/v2/protocol/envelope.go b/waku/v2/protocol/envelope.go index 6fd4edbc9..5ca5a77fa 100644 --- a/waku/v2/protocol/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -2,8 +2,8 @@ package protocol import ( "github.com/waku-org/go-waku/waku/v2/hash" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) // Envelope contains information about the pubsub topic of a WakuMessage @@ -11,7 +11,7 @@ import ( // protobuffer type Envelope struct { msg *wpb.WakuMessage - hash []byte + hash wpb.MessageHash index *pb.Index } @@ -44,7 +44,7 @@ func (e *Envelope) PubsubTopic() string { } // Hash returns a 32 byte hash calculated from the WakuMessage bytes -func (e *Envelope) Hash() []byte { +func (e *Envelope) Hash() wpb.MessageHash { return e.hash } diff --git a/waku/v2/protocol/envelope_test.go b/waku/v2/protocol/envelope_test.go index b95426b43..84a963dfc 100644 --- a/waku/v2/protocol/envelope_test.go +++ b/waku/v2/protocol/envelope_test.go @@ -24,7 +24,7 @@ func TestEnvelope(t *testing.T) { require.Equal( t, - []byte{0x91, 0x0, 0xe4, 0xa5, 0xcf, 0xf7, 0x19, 0x27, 0x49, 0x81, 0x66, 0xb3, 0xdf, 0xc7, 0xa6, 0x31, 0xf0, 0x87, 0xc7, 0x29, 0xb4, 0x28, 0x83, 0xb9, 0x5c, 0x31, 0x25, 0x33, 0x3, 0xc9, 0x7, 0x95}, + pb.ToMessageHash([]byte{0x91, 0x0, 0xe4, 0xa5, 0xcf, 0xf7, 0x19, 0x27, 0x49, 0x81, 0x66, 0xb3, 0xdf, 0xc7, 0xa6, 0x31, 0xf0, 0x87, 0xc7, 0x29, 0xb4, 0x28, 0x83, 0xb9, 0x5c, 0x31, 0x25, 0x33, 0x3, 0xc9, 0x7, 0x95}), hash, ) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 451ef51c8..cad12dc6f 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -277,11 +277,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, if filterSubscribeResponse.StatusCode != http.StatusOK { wf.metrics.RecordError(errorResponse) - errMessage := "" - if filterSubscribeResponse.StatusDesc != nil { - errMessage = *filterSubscribeResponse.StatusDesc - } - err := NewFilterError(int(filterSubscribeResponse.StatusCode), errMessage) + err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.GetStatusDesc()) return &err } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index e5dac7182..be5a0e2cf 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -543,3 +543,67 @@ func (s *FilterTestSuite) BeforeTest(suiteName, testName string) { func (s *FilterTestSuite) AfterTest(suiteName, testName string) { s.log.Info("Finished executing ", zap.String("testName", testName)) } + +func (s *FilterTestSuite) TestStaticSharding() { + log := utils.Logger() + s.log = log + s.wg = &sync.WaitGroup{} + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + // Gen pubsub topic "/waku/2/rs/100/100" + s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String() + + // Pubsub topics for neg. test cases + testTopics := []string{ + "/waku/2/rs/100/1024", + "/waku/2/rs/100/101", + } + s.testContentTopic = "/test/10/my-filter-app/proto" + + // Prepare new nodes + s.lightNode = s.makeWakuFilterLightNode(true, true) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + + // Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) + + // Test positive case for static shard pubsub topic - message gets received + s.waitForMsg(func() { + _, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic)) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Test two negative cases for static shard pubsub topic - message times out + s.waitForTimeout(func() { + _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0])) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + s.waitForTimeout(func() { + _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1])) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Cleanup + _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ + PubsubTopic: s.testTopic, + ContentTopics: protocol.NewContentTopicSet(s.testContentTopic), + }) + s.Require().NoError(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) +} diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 29f38e12d..aa18839b4 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -222,7 +222,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { handle := func(envelope *protocol.Envelope) error { msg := envelope.Message() pubsubTopic := envelope.PubsubTopic() - logger := utils.MessagesLogger("filter").With(logging.HexBytes("hash", envelope.Hash()), + logger := utils.MessagesLogger("filter").With(logging.Hash(envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), ) diff --git a/waku/v2/protocol/store/metrics.go b/waku/v2/protocol/legacy_store/metrics.go similarity index 98% rename from waku/v2/protocol/store/metrics.go rename to waku/v2/protocol/legacy_store/metrics.go index d9083ec66..99ff87a06 100644 --- a/waku/v2/protocol/store/metrics.go +++ b/waku/v2/protocol/legacy_store/metrics.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "github.com/libp2p/go-libp2p/p2p/metricshelper" diff --git a/waku/v2/protocol/legacy_store/pb/generate.go b/waku/v2/protocol/legacy_store/pb/generate.go new file mode 100644 index 000000000..eb2537ebd --- /dev/null +++ b/waku/v2/protocol/legacy_store/pb/generate.go @@ -0,0 +1,3 @@ +package pb + +//go:generate protoc -I./../../waku-proto/waku/store/v2beta4//. -I./../../waku-proto/ --go_opt=paths=source_relative --go_opt=Mstore.proto=github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb --go_opt=Mwaku/message/v1/message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./../../waku-proto/waku/store/v2beta4/store.proto diff --git a/waku/v2/protocol/store/pb/store.pb.go b/waku/v2/protocol/legacy_store/pb/store.pb.go similarity index 100% rename from waku/v2/protocol/store/pb/store.pb.go rename to waku/v2/protocol/legacy_store/pb/store.pb.go diff --git a/waku/v2/protocol/legacy_store/pb/validation.go b/waku/v2/protocol/legacy_store/pb/validation.go new file mode 100644 index 000000000..740b58086 --- /dev/null +++ b/waku/v2/protocol/legacy_store/pb/validation.go @@ -0,0 +1,68 @@ +package pb + +import ( + "errors" +) + +// MaxContentFilters is the maximum number of allowed content filters in a query +const MaxContentFilters = 10 + +var ( + errMissingRequestID = errors.New("missing RequestId field") + errMissingQuery = errors.New("missing Query field") + errRequestIDMismatch = errors.New("requestID in response does not match request") + errMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed") + errEmptyContentTopics = errors.New("one or more content topics specified is empty") +) + +func (x *HistoryQuery) Validate() error { + if len(x.ContentFilters) > MaxContentFilters { + return errMaxContentFilters + } + + for _, m := range x.ContentFilters { + if m.ContentTopic == "" { + return errEmptyContentTopics + } + } + + return nil +} + +func (x *HistoryRPC) ValidateQuery() error { + if x.RequestId == "" { + return errMissingRequestID + } + + if x.Query == nil { + return errMissingQuery + } + + return x.Query.Validate() +} + +func (x *HistoryResponse) Validate() error { + for _, m := range x.Messages { + if err := m.Validate(); err != nil { + return err + } + } + + return nil +} + +func (x *HistoryRPC) ValidateResponse(requestID string) error { + if x.RequestId == "" { + return errMissingRequestID + } + + if x.RequestId != requestID { + return errRequestIDMismatch + } + + if x.Response != nil { + return x.Response.Validate() + } + + return nil +} diff --git a/waku/v2/protocol/store/pb/validation_test.go b/waku/v2/protocol/legacy_store/pb/validation_test.go similarity index 100% rename from waku/v2/protocol/store/pb/validation_test.go rename to waku/v2/protocol/legacy_store/pb/validation_test.go diff --git a/waku/v2/protocol/store/utils_test.go b/waku/v2/protocol/legacy_store/utils_test.go similarity index 96% rename from waku/v2/protocol/store/utils_test.go rename to waku/v2/protocol/legacy_store/utils_test.go index 6fd883c82..3b689a037 100644 --- a/waku/v2/protocol/store/utils_test.go +++ b/waku/v2/protocol/legacy_store/utils_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "database/sql" diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/legacy_store/waku_resume_test.go similarity index 99% rename from waku/v2/protocol/store/waku_resume_test.go rename to waku/v2/protocol/legacy_store/waku_resume_test.go index 52175cbc4..aeecc6dfd 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/legacy_store/waku_resume_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "context" diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go similarity index 99% rename from waku/v2/protocol/store/waku_store_client.go rename to waku/v2/protocol/legacy_store/waku_store_client.go index f1612a1c6..b02cd92e3 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "context" @@ -15,8 +15,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) type Query struct { diff --git a/waku/v2/protocol/store/waku_store_client_test.go b/waku/v2/protocol/legacy_store/waku_store_client_test.go similarity index 99% rename from waku/v2/protocol/store/waku_store_client_test.go rename to waku/v2/protocol/legacy_store/waku_store_client_test.go index fc44bde9f..c947a64dc 100644 --- a/waku/v2/protocol/store/waku_store_client_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_client_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "context" diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/legacy_store/waku_store_common.go similarity index 99% rename from waku/v2/protocol/store/waku_store_common.go rename to waku/v2/protocol/legacy_store/waku_store_common.go index 929e58e1f..f1edbbc25 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/legacy_store/waku_store_common.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "context" diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/legacy_store/waku_store_pagination_test.go similarity index 99% rename from waku/v2/protocol/store/waku_store_pagination_test.go rename to waku/v2/protocol/legacy_store/waku_store_pagination_test.go index a9ca7374a..a6cee3c08 100644 --- a/waku/v2/protocol/store/waku_store_pagination_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_pagination_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "testing" @@ -6,8 +6,8 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" "google.golang.org/protobuf/proto" ) diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/legacy_store/waku_store_persistence_test.go similarity index 98% rename from waku/v2/protocol/store/waku_store_persistence_test.go rename to waku/v2/protocol/legacy_store/waku_store_persistence_test.go index 022d47b41..2c5d674e6 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_persistence_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "testing" diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/legacy_store/waku_store_protocol.go similarity index 99% rename from waku/v2/protocol/store/waku_store_protocol.go rename to waku/v2/protocol/legacy_store/waku_store_protocol.go index e40abe7ef..16eabe8f0 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/legacy_store/waku_store_protocol.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "context" @@ -17,9 +17,9 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" ) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/legacy_store/waku_store_protocol_test.go similarity index 83% rename from waku/v2/protocol/store/waku_store_protocol_test.go rename to waku/v2/protocol/legacy_store/waku_store_protocol_test.go index 10ada3dec..e240541ac 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_protocol_test.go @@ -1,13 +1,14 @@ -package store +package legacy_store import ( "context" "database/sql" - "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/persistence/sqlite" "testing" "time" + "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/sqlite" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" @@ -425,3 +426,86 @@ func TestWakuStoreStart(t *testing.T) { defer s2.Stop() } + +func TestWakuStoreWithStaticSharding(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s1.SetHost(host1) + + // Prepare pubsub topics for static sharding + pubSubTopics := protocol.ShardsToTopics(20, []int{1, 2, 3, 4}) + + // Prepare test messages + now := *utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage("hello", proto.Int64(now)) + + nowPlusOne := proto.Int64(now + 1) + msg2 := tests.CreateWakuMessage("/test/2/my-app/sharded", nowPlusOne) + + nowPlusTwo := proto.Int64(now + 2) + msg3 := tests.CreateWakuMessage("/test/2/my-app/sharded", nowPlusTwo) + + // Subscribe to pubSubtopics and start store1 + host1 with them + sub := SimulateSubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubSubTopics[0]), + protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubSubTopics[1]), + protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubSubTopics[2]), + }) + err = s1.Start(ctx, sub) + require.NoError(t, err) + defer s1.Stop() + + host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) + require.NoError(t, err) + + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s2.SetHost(host2) + + // Subscribe to different pubSubTopics[3] at store2 + host2 + sub1 := relay.NewSubscription(protocol.NewContentFilter(pubSubTopics[3])) + + err = s2.Start(ctx, sub1) + require.NoError(t, err) + defer s2.Stop() + + q1 := Query{ + PubsubTopic: pubSubTopics[0], + } + + fn1 := func(msg *pb.WakuMessage) (bool, error) { + return msg.ContentTopic == "hello", nil + } + + // Find msg1 on the second host2+s2 + foundMsg, err := s2.Find(ctx, q1, fn1, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) + require.NoError(t, err) + require.NotNil(t, foundMsg) + require.Equal(t, "hello", foundMsg.ContentTopic) + + q2 := Query{ + PubsubTopic: pubSubTopics[1], + } + + // Find msg2 on the second host2+s2; No other messages (msg3) should be found + result, err := s2.Query(ctx, q2, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) + require.NoError(t, err) + + for i, m := range result.Messages { + if i == 0 { + require.Equal(t, "/test/2/my-app/sharded", m.ContentTopic) + require.Equal(t, nowPlusOne, m.Timestamp) + } else { + require.Fail(t, "Unexpected message found") + } + } + +} diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/legacy_store/waku_store_query_test.go similarity index 99% rename from waku/v2/protocol/store/waku_store_query_test.go rename to waku/v2/protocol/legacy_store/waku_store_query_test.go index 2d775953b..881ee79af 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_query_test.go @@ -1,4 +1,4 @@ -package store +package legacy_store import ( "testing" @@ -10,7 +10,7 @@ import ( wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "google.golang.org/protobuf/proto" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index fe65ed047..f2b98bb37 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -313,14 +313,14 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe // Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the // contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the // `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) ([]byte, error) { +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) (wpb.MessageHash, error) { if message == nil { - return nil, errors.New("message can't be null") + return wpb.MessageHash{}, errors.New("message can't be null") } params, err := wakuLP.handleOpts(ctx, message, opts...) if err != nil { - return nil, err + return wpb.MessageHash{}, err } req := new(pb.PushRequest) req.Message = message @@ -333,12 +333,12 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa response, err := wakuLP.request(ctx, req, params) if err != nil { logger.Error("could not publish message", zap.Error(err)) - return nil, err + return wpb.MessageHash{}, err } if response.IsSuccess { hash := message.Hash(params.pubsubTopic) - utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash)) + utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:])) return hash, nil } @@ -347,5 +347,5 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa errMsg = *response.Info } - return nil, errors.New(errMsg) + return wpb.MessageHash{}, errors.New(errMsg) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 08c134eee..189c21c82 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -7,14 +7,12 @@ import ( "testing" "time" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "go.uber.org/zap" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -42,22 +40,6 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su return relay, sub[0], host } -func waitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) { - wg.Add(1) - log := utils.Logger() - go func() { - defer wg.Done() - select { - case env := <-ch: - msg := env.Message() - log.Info("Received ", zap.String("msg", msg.String())) - case <-time.After(2 * time.Second): - require.Fail(t, "Message timeout") - } - }() - wg.Wait() -} - // Node1: Relay // Node2: Relay+Lightpush // Client that will lightpush a message @@ -305,7 +287,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { require.NoError(t, err) // Wait for the nominal case message at node1 - waitForMsg(t, &wg, sub1.Ch) + tests.WaitForMsg(t, 2*time.Second, &wg, sub1.Ch) // Test error case with nil message _, err = client.Publish(ctx, nil, lpOptions...) @@ -332,3 +314,70 @@ func TestWakuLightPushCornerCases(t *testing.T) { // Test situation when cancel func is nil lightPushNode2.cancel = nil } + +func TestWakuLightPushWithStaticSharding(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Prepare pubsub topic for static sharding + pubSubTopic := protocol.NewStaticShardingPubsubTopic(uint16(25), uint16(0)).String() + testContentTopic := "/test/10/my-lp-app/proto" + + // Node topology: clientNode (lightpush client) <-> node2(relay+lightpush server) <-> node3(relay) + // ClientNode + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client.SetHost(clientHost) + + // Node2 + node2, sub2, host2 := makeWakuRelay(t, pubSubTopic) + defer node2.Stop() + defer sub2.Unsubscribe() + + lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2.SetHost(host2) + err = lightPushNode2.Start(ctx) + require.NoError(t, err) + defer lightPushNode2.Stop() + + // Node3 + node3, sub3, host3 := makeWakuRelay(t, pubSubTopic) + defer node3.Stop() + defer sub3.Unsubscribe() + + // Add path clientNode (lightpush client) -> node2(relay+lightpush server) + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) + require.NoError(t, err) + + // Add path node2(relay+lightpush server) -> node3(relay) + host2.Peerstore().AddAddr(host3.ID(), tests.GetHostAddress(host3), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host3.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host3.ID())) + require.NoError(t, err) + + // Create messages + msg := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()) + + // Wait for the mesh connection to happen between nodes + time.Sleep(2 * time.Second) + + var wg sync.WaitGroup + + // Check that msg publish has led to message deliver for existing topic + _, err = client.Publish(ctx, msg, WithPubSubTopic(pubSubTopic), WithPeer(host2.ID())) + require.NoError(t, err) + tests.WaitForMsg(t, 2*time.Second, &wg, sub3.Ch) + + // Check that msg2 publish finished without message delivery for unconfigured topic + _, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID())) + require.NoError(t, err) + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub3.Ch) +} diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go index 70de2e0d5..6c306e2b6 100644 --- a/waku/v2/protocol/pb/utils.go +++ b/waku/v2/protocol/pb/utils.go @@ -9,9 +9,28 @@ import ( "go.uber.org/zap/zapcore" ) +// MessageHash represents an unique identifier for a message within a pubsub topic +type MessageHash [32]byte + +func (h MessageHash) String() string { + return hexutil.Encode(h[:]) +} + +func (h MessageHash) Bytes() []byte { + return h[:] +} + +// ToMessageHash converts a byte slice into a MessageHash +func ToMessageHash(b []byte) MessageHash { + var result MessageHash + copy(result[:], b) + return result +} + // Hash calculates the hash of a waku message -func (msg *WakuMessage) Hash(pubsubTopic string) []byte { - return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp())) +func (msg *WakuMessage) Hash(pubsubTopic string) MessageHash { + hash := hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp())) + return ToMessageHash(hash) } func toBytes(i int64) []byte { @@ -22,7 +41,7 @@ func toBytes(i int64) []byte { func (msg *WakuMessage) LogFields(pubsubTopic string) []zapcore.Field { return []zapcore.Field{ - zap.String("hash", hexutil.Encode(msg.Hash(pubsubTopic))), + zap.Stringer("hash", msg.Hash(pubsubTopic)), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", msg.ContentTopic), zap.Int64("timestamp", msg.GetTimestamp()), diff --git a/waku/v2/protocol/pb/utils_test.go b/waku/v2/protocol/pb/utils_test.go index f15421db2..133c0f6ee 100644 --- a/waku/v2/protocol/pb/utils_test.go +++ b/waku/v2/protocol/pb/utils_test.go @@ -1,7 +1,6 @@ package pb import ( - "encoding/hex" "testing" "github.com/stretchr/testify/require" @@ -24,7 +23,7 @@ func TestEnvelopeHash(t *testing.T) { expected := []byte{0xb6, 0x59, 0x60, 0x7f, 0x2a, 0xae, 0x18, 0x84, 0x8d, 0xca, 0xa7, 0xd5, 0x1c, 0xb3, 0x7e, 0x6c, 0xc6, 0xfc, 0x33, 0x40, 0x2c, 0x70, 0x4f, 0xf0, 0xc0, 0x16, 0x33, 0x7d, 0x83, 0xad, 0x61, 0x50} result := msg.Hash("test") - require.Equal(t, expected, result) + require.Equal(t, ToMessageHash(expected), result) } func TestEmptyMeta(t *testing.T) { @@ -39,7 +38,7 @@ func TestEmptyMeta(t *testing.T) { messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "f0183c2e370e473ff471bbe1028d0d8a940949c02f3007a1ccd21fed356852a0", hex.EncodeToString(messageHash)) + require.Equal(t, "0xf0183c2e370e473ff471bbe1028d0d8a940949c02f3007a1ccd21fed356852a0", messageHash.String()) } func Test13ByteMeta(t *testing.T) { @@ -53,7 +52,7 @@ func Test13ByteMeta(t *testing.T) { messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "f673cd2c9c973d685b52ca74c2559e001733a3a31a49ffc7b6e8713decba5a55", hex.EncodeToString(messageHash)) + require.Equal(t, "0xf673cd2c9c973d685b52ca74c2559e001733a3a31a49ffc7b6e8713decba5a55", messageHash.String()) } func TestZeroLenPayload(t *testing.T) { @@ -67,7 +66,7 @@ func TestZeroLenPayload(t *testing.T) { messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash)) + require.Equal(t, "0x978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", messageHash.String()) } func TestHashWithTimestamp(t *testing.T) { @@ -79,11 +78,11 @@ func TestHashWithTimestamp(t *testing.T) { msg.Version = proto.Uint32(1) messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "58e2fc032a82c4adeb967a8b87086d0d6fb304912f120d4404e6236add8f1f56", hex.EncodeToString(messageHash)) + require.Equal(t, "0x58e2fc032a82c4adeb967a8b87086d0d6fb304912f120d4404e6236add8f1f56", messageHash.String()) msg.Timestamp = proto.Int64(123456789123456789) messageHash = msg.Hash(pubsubTopic) - require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash)) + require.Equal(t, "0x978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", messageHash.String()) } func TestIntToBytes(t *testing.T) { diff --git a/waku/v2/protocol/relay/metrics.go b/waku/v2/protocol/relay/metrics.go index b642c4970..4a10a0a9a 100644 --- a/waku/v2/protocol/relay/metrics.go +++ b/waku/v2/protocol/relay/metrics.go @@ -61,7 +61,7 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) { messageSize.Observe(payloadSizeInKb) pubsubTopic := envelope.PubsubTopic() messages.WithLabelValues(pubsubTopic).Inc() - m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes)) + m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.Hash(envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes)) }() } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 06fd2c281..d2ec331bf 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -255,19 +255,19 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscripti // Publish is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic // specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should // be provided -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) { +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) (pb.MessageHash, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { - return nil, errors.New("PubSub hasn't been set") + return pb.MessageHash{}, errors.New("PubSub hasn't been set") } if message == nil { - return nil, errors.New("message can't be null") + return pb.MessageHash{}, errors.New("message can't be null") } err := message.Validate() if err != nil { - return nil, err + return pb.MessageHash{}, err } params := new(publishParameters) @@ -278,12 +278,12 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . if params.pubsubTopic == "" { params.pubsubTopic, err = waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) if err != nil { - return nil, err + return pb.MessageHash{}, err } } if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { - return nil, errors.New("not enough peers to publish") + return pb.MessageHash{}, errors.New("not enough peers to publish") } w.topicsMutex.Lock() @@ -291,26 +291,26 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . pubSubTopic, err := w.upsertTopic(params.pubsubTopic) if err != nil { - return nil, err + return pb.MessageHash{}, err } out, err := proto.Marshal(message) if err != nil { - return nil, err + return pb.MessageHash{}, err } if len(out) > w.relayParams.maxMsgSizeBytes { - return nil, errors.New("message size exceeds gossipsub max message size") + return pb.MessageHash{}, errors.New("message size exceeds gossipsub max message size") } err = pubSubTopic.Publish(ctx, out) if err != nil { - return nil, err + return pb.MessageHash{}, err } hash := message.Hash(params.pubsubTopic) - w.logMessages.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexBytes("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) + w.logMessages.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.Hash(hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) return hash, nil } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 94e9992ad..307d3ee3f 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -56,7 +56,7 @@ func TestWakuRelay(t *testing.T) { go func() { defer cancel() env := <-subs[0].Ch - t.Log("received msg", logging.HexBytes("hash", env.Hash())) + t.Log("received msg", logging.Hash(env.Hash())) }() msg := &pb.WakuMessage{ @@ -355,3 +355,77 @@ func TestInvalidMessagePublish(t *testing.T) { ctxCancel() } + +func TestWakuRelayStaticSharding(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Follow spec /waku/2/rs// + testTopic := "/waku/2/rs/64/0" + testContentTopic := "/test/10/my-relay" + + // Host1 + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host1, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster1 := NewBroadcaster(10) + relay1 := NewWakuRelay(bcaster1, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay1.SetHost(host1) + err = relay1.Start(context.Background()) + require.NoError(t, err) + + err = bcaster1.Start(context.Background()) + require.NoError(t, err) + defer relay1.Stop() + + // Host2 + port, err = tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host2, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster2 := NewBroadcaster(10) + relay2 := NewWakuRelay(bcaster2, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay2.SetHost(host2) + err = relay2.Start(context.Background()) + require.NoError(t, err) + + err = bcaster2.Start(context.Background()) + require.NoError(t, err) + defer relay2.Stop() + + // Connect nodes + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), WakuRelayID_v200) + require.NoError(t, err) + + // Wait for the mesh connection to happen between node1 and node2 + time.Sleep(2 * time.Second) + + // Subscribe to valid static shard topic on both hosts + subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + require.NoError(t, err) + + subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + require.NoError(t, err) + require.True(t, relay2.IsSubscribed(testTopic)) + require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0]) + + msg := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch(), "test_payload") + + // Test publish from host2 using autosharding -> should fail on topic format + _, err = relay2.Publish(ctx, msg) + require.Error(t, err) + + // Test publish from host2 using static sharding -> should succeed + _, err = relay2.Publish(ctx, msg, WithPubSubTopic(testTopic)) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Msg should get received on host1 + tests.WaitForMsg(t, 2*time.Second, &wg, subs1[0].Ch) + +} diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 39b718061..29d5d65ac 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -235,7 +235,7 @@ func (rlnRelay *WakuRLNRelay) Validator( hash := msg.Hash(topic) log := rlnRelay.log.With( - logging.HexBytes("hash", hash), + logging.HexBytes("hash", hash[:]), zap.String("pubsubTopic", topic), zap.String("contentTopic", msg.ContentTopic), ) diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go new file mode 100644 index 000000000..9eb3da40a --- /dev/null +++ b/waku/v2/protocol/store/client.go @@ -0,0 +1,290 @@ +package store + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "math" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-msgio/pbio" + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +// StoreQueryID_v300 is the Store protocol v3 identifier +const StoreQueryID_v300 = libp2pProtocol.ID("/vac/waku/store-query/3.0.0") +const StoreENRField = uint8(1 << 1) + +// MaxPageSize is the maximum number of waku messages to return per page +const MaxPageSize = 100 + +// DefaultPageSize is the default number of waku messages per page +const DefaultPageSize = 20 + +const ok = uint32(200) + +var ( + + // ErrNoPeersAvailable is returned when there are no store peers in the peer store + // that could be used to retrieve message history + ErrNoPeersAvailable = errors.New("no suitable remote peers") + ErrMustSelectPeer = errors.New("a peer ID or multiaddress is required when checking for message hashes") +) + +// StoreError represents an error code returned by a storenode +type StoreError struct { + Code int + Message string +} + +// NewStoreError creates a new instance of StoreError +func NewStoreError(code int, message string) StoreError { + return StoreError{ + Code: code, + Message: message, + } +} + +const errorStringFmt = "%d - %s" + +// Error returns a string with the error message +func (e *StoreError) Error() string { + return fmt.Sprintf(errorStringFmt, e.Code, e.Message) +} + +// WakuStore represents an instance of a store client +type WakuStore struct { + h host.Host + timesource timesource.Timesource + log *zap.Logger + pm *peermanager.PeerManager +} + +// NewWakuStore is used to instantiate a StoreV3 client +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { + s := new(WakuStore) + s.log = log.Named("store-client") + s.timesource = timesource + s.pm = pm + + if pm != nil { + pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField) + } + + return s +} + +// Sets the host to be able to mount or consume a protocol +func (s *WakuStore) SetHost(h host.Host) { + s.h = h +} + +// Request is used to send a store query. This function requires understanding how to prepare a store query +// and most of the time you can use `Query`, `QueryByHash` and `Exists` instead, as they provide +// a simpler API +func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (*Result, error) { + params := new(Parameters) + + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + + filterCriteria, isFilterCriteria := criteria.(FilterCriteria) + + var pubsubTopics []string + if isFilterCriteria { + pubsubTopics = append(pubsubTopics, filterCriteria.PubsubTopic) + } + + //Add Peer to peerstore. + if s.pm != nil && params.peerAddr != nil { + pData, err := s.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreQueryID_v300) + if err != nil { + return nil, err + } + s.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + + if s.pm != nil && params.selectedPeer == "" { + if isFilterCriteria { + selectedPeers, err := s.pm.SelectPeers( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreQueryID_v300, + PubsubTopics: []string{filterCriteria.PubsubTopic}, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } + params.selectedPeer = selectedPeers[0] + } else { + return nil, ErrMustSelectPeer + } + } + + if params.selectedPeer == "" { + return nil, ErrNoPeersAvailable + } + + pageLimit := params.pageLimit + if pageLimit == 0 { + pageLimit = DefaultPageSize + } else if pageLimit > uint64(MaxPageSize) { + pageLimit = MaxPageSize + } + + storeRequest := &pb.StoreQueryRequest{ + RequestId: hex.EncodeToString(params.requestID), + IncludeData: params.includeData, + PaginationForward: params.forward, + PaginationLimit: proto.Uint64(pageLimit), + } + + criteria.PopulateStoreRequest(storeRequest) + + if params.cursor != nil { + storeRequest.PaginationCursor = params.cursor + } + + err := storeRequest.Validate() + if err != nil { + return nil, err + } + + response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer) + if err != nil { + return nil, err + } + + result := &Result{ + store: s, + messages: response.Messages, + storeRequest: storeRequest, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, + } + + return result, nil +} + +// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. +func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (*Result, error) { + return s.Request(ctx, criteria, opts...) +} + +// Query retrieves all the messages with specific message hashes +func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (*Result, error) { + return s.Request(ctx, MessageHashCriteria{messageHashes}, opts...) +} + +// Exists is an utility function to determine if a message exists. For checking the presence of more than one message, use QueryByHash +// and pass the option WithReturnValues(false). You will have to iterate the results and check whether the full list of messages contains +// the list of messages to verify +func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opts ...RequestOption) (bool, error) { + opts = append(opts, IncludeData(false)) + result, err := s.Request(ctx, MessageHashCriteria{MessageHashes: []wpb.MessageHash{messageHash}}, opts...) + if err != nil { + return false, err + } + + return len(result.messages) != 0, nil +} + +func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { + if r.IsComplete() { + return &Result{ + store: s, + started: true, + messages: []*pb.WakuMessageKeyValue{}, + cursor: nil, + storeRequest: r.storeRequest, + peerID: r.PeerID(), + }, nil + } + + storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) + storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) + storeRequest.PaginationCursor = r.Cursor() + + response, err := s.queryFrom(ctx, storeRequest, r.PeerID()) + if err != nil { + return nil, err + } + + result := &Result{ + started: true, + store: s, + messages: response.Messages, + storeRequest: storeRequest, + peerID: r.PeerID(), + cursor: response.PaginationCursor, + } + + return result, nil + +} + +func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) { + logger := s.log.With(logging.HostID("peer", selectedPeer)) + logger.Info("sending store request") + + stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) + if err != nil { + logger.Error("creating stream to peer", zap.Error(err)) + return nil, err + } + + writer := pbio.NewDelimitedWriter(stream) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) + + err = writer.WriteMsg(storeRequest) + if err != nil { + logger.Error("writing request", zap.Error(err)) + if err := stream.Reset(); err != nil { + s.log.Error("resetting connection", zap.Error(err)) + } + return nil, err + } + + storeResponse := &pb.StoreQueryResponse{RequestId: storeRequest.RequestId} + err = reader.ReadMsg(storeResponse) + if err != nil { + logger.Error("reading response", zap.Error(err)) + if err := stream.Reset(); err != nil { + s.log.Error("resetting connection", zap.Error(err)) + } + return nil, err + } + + stream.Close() + + if err := storeResponse.Validate(storeRequest.RequestId); err != nil { + return nil, err + } + + if storeResponse.GetStatusCode() != ok { + err := NewStoreError(int(storeResponse.GetStatusCode()), storeResponse.GetStatusDesc()) + return nil, &err + } + return storeResponse, nil +} diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go new file mode 100644 index 000000000..e02e904e6 --- /dev/null +++ b/waku/v2/protocol/store/client_test.go @@ -0,0 +1,224 @@ +//go:build include_storev3_tests +// +build include_storev3_tests + +package store + +import ( + "context" + "crypto/rand" + "os" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + "google.golang.org/protobuf/proto" +) + +func TestStoreClient(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + // Creating a relay instance for pushing messages to the store node + b := relay.NewBroadcaster(10) + require.NoError(t, b.Start(context.Background())) + wakuRelay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + wakuRelay.SetHost(host) + require.NoError(t, err) + err = wakuRelay.Start(context.Background()) + require.NoError(t, err) + + pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) + pm.SetHost(host) + err = pm.SubscribeToRelayEvtBus(wakuRelay.Events()) + require.NoError(t, err) + pm.Start(ctx) + + // Creating a storeV3 instance for all queries + wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger()) + wakuStore.SetHost(host) + + _, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(protocol.DefaultPubsubTopic{}.String()), relay.WithoutConsumer()) + require.NoError(t, err) + + // Obtain multiaddr from env + envStorenode := os.Getenv("TEST_STOREV3_NODE") + if envStorenode == "" { + envStorenode = "/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmMGhfSTUzKbsjMWxc6T1X4wiTWSF1bEWSLjAukCm7KiHV" + } + storenode_multiaddr, err := multiaddr.NewMultiaddr(envStorenode) + require.NoError(t, err) + + storenode, err := peer.AddrInfoFromP2pAddr(storenode_multiaddr) + require.NoError(t, err) + + err = host.Connect(ctx, *storenode) + require.NoError(t, err) + + // Wait until mesh forms + time.Sleep(2 * time.Second) + + // Send messages + messages := []*pb.WakuMessage{} + startTime := utils.GetUnixEpoch(timesource.NewDefaultClock()) + for i := 0; i < 5; i++ { + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: "test", + Version: proto.Uint32(0), + Timestamp: utils.GetUnixEpoch(timesource.NewDefaultClock()), + } + _, err := wakuRelay.Publish(ctx, msg, relay.WithDefaultPubsubTopic()) + require.NoError(t, err) + + messages = append(messages, msg) + time.Sleep(20 * time.Millisecond) + } + endTime := utils.GetUnixEpoch(timesource.NewDefaultClock()) + + time.Sleep(1 * time.Second) + + // Check for message existence + exists, err := wakuStore.Exists(ctx, messages[0].Hash(relay.DefaultWakuTopic), WithPeer(storenode.ID)) + require.NoError(t, err) + require.True(t, exists) + + // Message should not exist + exists, err = wakuStore.Exists(ctx, pb.MessageHash{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2}, WithPeer(storenode.ID)) + require.NoError(t, err) + require.False(t, exists) + + // Query messages with forward pagination + response, err := wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2)) + require.NoError(t, err) + + // -- First page: + hasNext, err := response.Next(ctx) + require.NoError(t, err) + require.True(t, hasNext) + require.False(t, response.IsComplete()) + require.Len(t, response.messages, 2) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp()) + + // -- Second page: + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.True(t, hasNext) + require.False(t, response.IsComplete()) + require.Len(t, response.messages, 2) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp()) + require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp()) + + // -- Third page: + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.False(t, hasNext) + require.True(t, response.IsComplete()) + require.Len(t, response.messages, 1) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp()) + + // -- Trying to continue a completed cursor + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.False(t, hasNext) + require.True(t, response.IsComplete()) + require.Len(t, response.messages, 0) + + // Query messages with backward pagination + response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(false, 2)) + require.NoError(t, err) + + // -- First page: + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.True(t, hasNext) + require.False(t, response.IsComplete()) + require.Len(t, response.messages, 2) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp()) + require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp()) + + // -- Second page: + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.True(t, hasNext) + require.False(t, response.IsComplete()) + require.Len(t, response.messages, 2) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp()) + require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp()) + + // -- Third page: + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.False(t, hasNext) + require.True(t, response.IsComplete()) + require.Len(t, response.messages, 1) + require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + + // -- Trying to continue a completed cursor + hasNext, err = response.Next(ctx) + require.NoError(t, err) + require.False(t, hasNext) + require.True(t, response.IsComplete()) + require.Len(t, response.messages, 0) + + // No cursor should be returned if there are no messages that match the criteria + response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2)) + require.NoError(t, err) + require.Len(t, response.messages, 0) + require.Empty(t, response.Cursor()) + + // If the page size is larger than the number of existing messages, it should not return a cursor + response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100)) + require.NoError(t, err) + require.Len(t, response.messages, 5) + require.Empty(t, response.Cursor()) + + // Invalid cursors should fail + // TODO: nwaku does not support this feature yet + //_, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithCursor([]byte{1, 2, 3, 4, 5, 6})) + //require.Error(t, err) + + // Inexistent cursors should return an empty response + // TODO: nwaku does not support this feature yet + //response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithCursor(make([]byte, 32))) // Requesting cursor 0x00...00 + //require.NoError(t, err) + //require.Len(t, response.messages, 0) + //require.Empty(t, response.Cursor()) + + // Handle temporal history query with an invalid time window + _, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: endTime, TimeEnd: startTime}) + require.NotNil(t, err) + + // Handle temporal history query with a zero-size time window + response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: startTime}) + require.NoError(t, err) + require.Len(t, response.messages, 0) + require.Empty(t, response.Cursor()) + + // Should not include data + response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(relay.DefaultWakuTopic)}}, IncludeData(false), WithPeer(storenode.ID)) + require.NoError(t, err) + require.Len(t, response.messages, 1) + require.Nil(t, response.messages[0].Message) + + response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test")}, IncludeData(false)) + require.NoError(t, err) + require.GreaterOrEqual(t, len(response.messages), 1) + require.Nil(t, response.messages[0].Message) +} diff --git a/waku/v2/protocol/store/criteria.go b/waku/v2/protocol/store/criteria.go new file mode 100644 index 000000000..f62de764c --- /dev/null +++ b/waku/v2/protocol/store/criteria.go @@ -0,0 +1,36 @@ +package store + +import ( + "github.com/waku-org/go-waku/waku/v2/protocol" + wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" +) + +type Criteria interface { + PopulateStoreRequest(request *pb.StoreQueryRequest) +} + +type FilterCriteria struct { + protocol.ContentFilter + TimeStart *int64 + TimeEnd *int64 +} + +func (f FilterCriteria) PopulateStoreRequest(request *pb.StoreQueryRequest) { + request.ContentTopics = f.ContentTopicsList() + request.PubsubTopic = proto.String(f.PubsubTopic) + request.TimeStart = f.TimeStart + request.TimeEnd = f.TimeEnd +} + +type MessageHashCriteria struct { + MessageHashes []wpb.MessageHash +} + +func (m MessageHashCriteria) PopulateStoreRequest(request *pb.StoreQueryRequest) { + request.MessageHashes = make([][]byte, len(m.MessageHashes)) + for i := range m.MessageHashes { + request.MessageHashes[i] = m.MessageHashes[i][:] + } +} diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go new file mode 100644 index 000000000..b38afd53a --- /dev/null +++ b/waku/v2/protocol/store/options.go @@ -0,0 +1,126 @@ +package store + +import ( + "errors" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type Parameters struct { + selectedPeer peer.ID + peerAddr multiaddr.Multiaddr + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + requestID []byte + cursor []byte + pageLimit uint64 + forward bool + includeData bool +} + +type RequestOption func(*Parameters) error + +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeer(p peer.ID) RequestOption { + return func(params *Parameters) error { + params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("WithPeer and WithPeerAddr options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress to request the message history. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { + return func(params *Parameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("WithPeerAddr and WithPeer options are mutually exclusive") + } + return nil + } +} + +// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store +// to request the message history. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +// Note: This option is avaiable only with peerManager +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *Parameters) error { + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers + return nil + } +} + +// WithFastestPeerSelection is an option used to select a peer from the peer store +// with the lowest ping. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +// Note: This option is avaiable only with peerManager +func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *Parameters) error { + params.peerSelectionType = peermanager.LowestRTT + return nil + } +} + +// WithRequestID is an option to set a specific request ID to be used when +// creating a store request +func WithRequestID(requestID []byte) RequestOption { + return func(params *Parameters) error { + params.requestID = requestID + return nil + } +} + +// WithAutomaticRequestID is an option to automatically generate a request ID +// when creating a store request +func WithAutomaticRequestID() RequestOption { + return func(params *Parameters) error { + params.requestID = protocol.GenerateRequestID() + return nil + } +} + +func WithCursor(cursor []byte) RequestOption { + return func(params *Parameters) error { + params.cursor = cursor + return nil + } +} + +// WithPaging is an option used to specify the order and maximum number of records to return +func WithPaging(forward bool, limit uint64) RequestOption { + return func(params *Parameters) error { + params.forward = forward + params.pageLimit = limit + return nil + } +} + +// IncludeData is an option used to indicate whether you want to return the message content or not +func IncludeData(v bool) RequestOption { + return func(params *Parameters) error { + params.includeData = v + return nil + } +} + +// Default options to be used when querying a store node for results +func DefaultOptions() []RequestOption { + return []RequestOption{ + WithAutomaticRequestID(), + WithAutomaticPeerSelection(), + WithPaging(true, DefaultPageSize), + IncludeData(true), + } +} diff --git a/waku/v2/protocol/store/pb/generate.go b/waku/v2/protocol/store/pb/generate.go index 31462bdac..c7e957821 100644 --- a/waku/v2/protocol/store/pb/generate.go +++ b/waku/v2/protocol/store/pb/generate.go @@ -1,3 +1,3 @@ package pb -//go:generate protoc -I./../../waku-proto/waku/store/v2beta4//. -I./../../waku-proto/ --go_opt=paths=source_relative --go_opt=Mstore.proto=github.com/waku-org/go-waku/waku/v2/protocol/store/pb --go_opt=Mwaku/message/v1/message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./../../waku-proto/waku/store/v2beta4/store.proto +//go:generate protoc -I. -I./../../waku-proto/ --go_opt=paths=source_relative --go_opt=Mstorev3.proto=github.com/waku-org/go-waku/waku/v2/protocol/store/pb --go_opt=Mwaku/message/v1/message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./storev3.proto diff --git a/waku/v2/protocol/store/pb/hash.go b/waku/v2/protocol/store/pb/hash.go new file mode 100644 index 000000000..600b5cafb --- /dev/null +++ b/waku/v2/protocol/store/pb/hash.go @@ -0,0 +1,9 @@ +package pb + +import ( + pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +func (x *WakuMessageKeyValue) WakuMessageHash() pb.MessageHash { + return pb.ToMessageHash(x.MessageHash) +} diff --git a/waku/v2/protocol/store/pb/storev3.pb.go b/waku/v2/protocol/store/pb/storev3.pb.go new file mode 100644 index 000000000..10f90e0b1 --- /dev/null +++ b/waku/v2/protocol/store/pb/storev3.pb.go @@ -0,0 +1,445 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.21.12 +// source: storev3.proto + +// Protocol identifier: /vac/waku/store/3.0.0 + +package pb + +import ( + pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WakuMessageKeyValue struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MessageHash []byte `protobuf:"bytes,1,opt,name=message_hash,json=messageHash,proto3,oneof" json:"message_hash,omitempty"` // Globally unique key for a Waku Message + Message *pb.WakuMessage `protobuf:"bytes,2,opt,name=message,proto3,oneof" json:"message,omitempty"` // Full message content as value +} + +func (x *WakuMessageKeyValue) Reset() { + *x = WakuMessageKeyValue{} + if protoimpl.UnsafeEnabled { + mi := &file_storev3_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WakuMessageKeyValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WakuMessageKeyValue) ProtoMessage() {} + +func (x *WakuMessageKeyValue) ProtoReflect() protoreflect.Message { + mi := &file_storev3_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WakuMessageKeyValue.ProtoReflect.Descriptor instead. +func (*WakuMessageKeyValue) Descriptor() ([]byte, []int) { + return file_storev3_proto_rawDescGZIP(), []int{0} +} + +func (x *WakuMessageKeyValue) GetMessageHash() []byte { + if x != nil { + return x.MessageHash + } + return nil +} + +func (x *WakuMessageKeyValue) GetMessage() *pb.WakuMessage { + if x != nil { + return x.Message + } + return nil +} + +type StoreQueryRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + IncludeData bool `protobuf:"varint,2,opt,name=include_data,json=includeData,proto3" json:"include_data,omitempty"` // Response should include full message content + // Filter criteria for content-filtered queries + PubsubTopic *string `protobuf:"bytes,10,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` + ContentTopics []string `protobuf:"bytes,11,rep,name=content_topics,json=contentTopics,proto3" json:"content_topics,omitempty"` + TimeStart *int64 `protobuf:"zigzag64,12,opt,name=time_start,json=timeStart,proto3,oneof" json:"time_start,omitempty"` + TimeEnd *int64 `protobuf:"zigzag64,13,opt,name=time_end,json=timeEnd,proto3,oneof" json:"time_end,omitempty"` + // List of key criteria for lookup queries + MessageHashes [][]byte `protobuf:"bytes,20,rep,name=message_hashes,json=messageHashes,proto3" json:"message_hashes,omitempty"` // Message hashes (keys) to lookup + // Pagination info. 50 Reserved + PaginationCursor []byte `protobuf:"bytes,51,opt,name=pagination_cursor,json=paginationCursor,proto3,oneof" json:"pagination_cursor,omitempty"` // Message hash (key) from where to start query (exclusive) + PaginationForward bool `protobuf:"varint,52,opt,name=pagination_forward,json=paginationForward,proto3" json:"pagination_forward,omitempty"` + PaginationLimit *uint64 `protobuf:"varint,53,opt,name=pagination_limit,json=paginationLimit,proto3,oneof" json:"pagination_limit,omitempty"` +} + +func (x *StoreQueryRequest) Reset() { + *x = StoreQueryRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_storev3_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StoreQueryRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StoreQueryRequest) ProtoMessage() {} + +func (x *StoreQueryRequest) ProtoReflect() protoreflect.Message { + mi := &file_storev3_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StoreQueryRequest.ProtoReflect.Descriptor instead. +func (*StoreQueryRequest) Descriptor() ([]byte, []int) { + return file_storev3_proto_rawDescGZIP(), []int{1} +} + +func (x *StoreQueryRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *StoreQueryRequest) GetIncludeData() bool { + if x != nil { + return x.IncludeData + } + return false +} + +func (x *StoreQueryRequest) GetPubsubTopic() string { + if x != nil && x.PubsubTopic != nil { + return *x.PubsubTopic + } + return "" +} + +func (x *StoreQueryRequest) GetContentTopics() []string { + if x != nil { + return x.ContentTopics + } + return nil +} + +func (x *StoreQueryRequest) GetTimeStart() int64 { + if x != nil && x.TimeStart != nil { + return *x.TimeStart + } + return 0 +} + +func (x *StoreQueryRequest) GetTimeEnd() int64 { + if x != nil && x.TimeEnd != nil { + return *x.TimeEnd + } + return 0 +} + +func (x *StoreQueryRequest) GetMessageHashes() [][]byte { + if x != nil { + return x.MessageHashes + } + return nil +} + +func (x *StoreQueryRequest) GetPaginationCursor() []byte { + if x != nil { + return x.PaginationCursor + } + return nil +} + +func (x *StoreQueryRequest) GetPaginationForward() bool { + if x != nil { + return x.PaginationForward + } + return false +} + +func (x *StoreQueryRequest) GetPaginationLimit() uint64 { + if x != nil && x.PaginationLimit != nil { + return *x.PaginationLimit + } + return 0 +} + +type StoreQueryResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + StatusCode *uint32 `protobuf:"varint,10,opt,name=status_code,json=statusCode,proto3,oneof" json:"status_code,omitempty"` + StatusDesc *string `protobuf:"bytes,11,opt,name=status_desc,json=statusDesc,proto3,oneof" json:"status_desc,omitempty"` + Messages []*WakuMessageKeyValue `protobuf:"bytes,20,rep,name=messages,proto3" json:"messages,omitempty"` + PaginationCursor []byte `protobuf:"bytes,51,opt,name=pagination_cursor,json=paginationCursor,proto3,oneof" json:"pagination_cursor,omitempty"` +} + +func (x *StoreQueryResponse) Reset() { + *x = StoreQueryResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_storev3_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StoreQueryResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StoreQueryResponse) ProtoMessage() {} + +func (x *StoreQueryResponse) ProtoReflect() protoreflect.Message { + mi := &file_storev3_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StoreQueryResponse.ProtoReflect.Descriptor instead. +func (*StoreQueryResponse) Descriptor() ([]byte, []int) { + return file_storev3_proto_rawDescGZIP(), []int{2} +} + +func (x *StoreQueryResponse) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *StoreQueryResponse) GetStatusCode() uint32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *StoreQueryResponse) GetStatusDesc() string { + if x != nil && x.StatusDesc != nil { + return *x.StatusDesc + } + return "" +} + +func (x *StoreQueryResponse) GetMessages() []*WakuMessageKeyValue { + if x != nil { + return x.Messages + } + return nil +} + +func (x *StoreQueryResponse) GetPaginationCursor() []byte { + if x != nil { + return x.PaginationCursor + } + return nil +} + +var File_storev3_proto protoreflect.FileDescriptor + +var file_storev3_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x33, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0d, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x1a, 0x1d, + 0x77, 0x61, 0x6b, 0x75, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x01, + 0x0a, 0x13, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0b, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68, 0x88, 0x01, 0x01, 0x12, 0x3b, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, + 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x01, 0x52, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x42, 0x0a, 0x0a, 0x08, 0x5f, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xf8, 0x03, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, + 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, + 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, + 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0b, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, + 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, + 0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, + 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x0c, 0x20, 0x01, + 0x28, 0x12, 0x48, 0x01, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x88, + 0x01, 0x01, 0x12, 0x1e, 0x0a, 0x08, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x0d, + 0x20, 0x01, 0x28, 0x12, 0x48, 0x02, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x88, + 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, + 0x73, 0x68, 0x65, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, + 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, + 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x70, + 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x18, 0x34, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x2e, 0x0a, 0x10, 0x70, 0x61, + 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x35, + 0x20, 0x01, 0x28, 0x04, 0x48, 0x04, 0x52, 0x0f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, + 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, + 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, + 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x42, 0x13, 0x0a, + 0x11, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, + 0x69, 0x74, 0x22, 0xa7, 0x02, 0x0a, 0x12, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, + 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, + 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x88, 0x01, 0x01, 0x12, 0x24, + 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, + 0x63, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, + 0x02, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, + 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_storev3_proto_rawDescOnce sync.Once + file_storev3_proto_rawDescData = file_storev3_proto_rawDesc +) + +func file_storev3_proto_rawDescGZIP() []byte { + file_storev3_proto_rawDescOnce.Do(func() { + file_storev3_proto_rawDescData = protoimpl.X.CompressGZIP(file_storev3_proto_rawDescData) + }) + return file_storev3_proto_rawDescData +} + +var file_storev3_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_storev3_proto_goTypes = []interface{}{ + (*WakuMessageKeyValue)(nil), // 0: waku.store.v3.WakuMessageKeyValue + (*StoreQueryRequest)(nil), // 1: waku.store.v3.StoreQueryRequest + (*StoreQueryResponse)(nil), // 2: waku.store.v3.StoreQueryResponse + (*pb.WakuMessage)(nil), // 3: waku.message.v1.WakuMessage +} +var file_storev3_proto_depIdxs = []int32{ + 3, // 0: waku.store.v3.WakuMessageKeyValue.message:type_name -> waku.message.v1.WakuMessage + 0, // 1: waku.store.v3.StoreQueryResponse.messages:type_name -> waku.store.v3.WakuMessageKeyValue + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_storev3_proto_init() } +func file_storev3_proto_init() { + if File_storev3_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_storev3_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WakuMessageKeyValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storev3_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StoreQueryRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storev3_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StoreQueryResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_storev3_proto_msgTypes[0].OneofWrappers = []interface{}{} + file_storev3_proto_msgTypes[1].OneofWrappers = []interface{}{} + file_storev3_proto_msgTypes[2].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_storev3_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_storev3_proto_goTypes, + DependencyIndexes: file_storev3_proto_depIdxs, + MessageInfos: file_storev3_proto_msgTypes, + }.Build() + File_storev3_proto = out.File + file_storev3_proto_rawDesc = nil + file_storev3_proto_goTypes = nil + file_storev3_proto_depIdxs = nil +} diff --git a/waku/v2/protocol/store/pb/storev3.proto b/waku/v2/protocol/store/pb/storev3.proto new file mode 100644 index 000000000..c828a713f --- /dev/null +++ b/waku/v2/protocol/store/pb/storev3.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +// Protocol identifier: /vac/waku/store/3.0.0 +package waku.store.v3; + +import "waku/message/v1/message.proto"; + +message WakuMessageKeyValue { + optional bytes message_hash = 1; // Globally unique key for a Waku Message + optional waku.message.v1.WakuMessage message = 2; // Full message content as value +} + +message StoreQueryRequest { + string request_id = 1; + bool include_data = 2; // Response should include full message content + + // Filter criteria for content-filtered queries + optional string pubsub_topic = 10; + repeated string content_topics = 11; + optional sint64 time_start = 12; + optional sint64 time_end = 13; + + // List of key criteria for lookup queries + repeated bytes message_hashes = 20; // Message hashes (keys) to lookup + + // Pagination info. 50 Reserved + optional bytes pagination_cursor = 51; // Message hash (key) from where to start query (exclusive) + bool pagination_forward = 52; + optional uint64 pagination_limit = 53; +} + +message StoreQueryResponse { + string request_id = 1; + + optional uint32 status_code = 10; + optional string status_desc = 11; + + repeated WakuMessageKeyValue messages = 20; + + optional bytes pagination_cursor = 51; +} \ No newline at end of file diff --git a/waku/v2/protocol/store/pb/validation.go b/waku/v2/protocol/store/pb/validation.go index 740b58086..40bdfade6 100644 --- a/waku/v2/protocol/store/pb/validation.go +++ b/waku/v2/protocol/store/pb/validation.go @@ -4,44 +4,70 @@ import ( "errors" ) -// MaxContentFilters is the maximum number of allowed content filters in a query -const MaxContentFilters = 10 +// MaxContentTopics is the maximum number of allowed contenttopics in a query +const MaxContentTopics = 10 var ( - errMissingRequestID = errors.New("missing RequestId field") - errMissingQuery = errors.New("missing Query field") - errRequestIDMismatch = errors.New("requestID in response does not match request") - errMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed") - errEmptyContentTopics = errors.New("one or more content topics specified is empty") + errMissingRequestID = errors.New("missing RequestId field") + errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic") + errRequestIDMismatch = errors.New("requestID in response does not match request") + errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed") + errEmptyContentTopic = errors.New("one or more content topics specified is empty") + errMissingPubsubTopic = errors.New("missing PubsubTopic field") + errMissingContentTopics = errors.New("missing ContentTopics field") + errMissingStatusCode = errors.New("missing StatusCode field") + errInvalidTimeRange = errors.New("invalid time range") + errInvalidMessageHash = errors.New("invalid message hash") ) -func (x *HistoryQuery) Validate() error { - if len(x.ContentFilters) > MaxContentFilters { - return errMaxContentFilters +func (x *StoreQueryRequest) Validate() error { + if x.RequestId == "" { + return errMissingRequestID } - for _, m := range x.ContentFilters { - if m.ContentTopic == "" { - return errEmptyContentTopics + if len(x.MessageHashes) != 0 { + if len(x.ContentTopics) != 0 || x.GetPubsubTopic() != "" { + return errMessageHashOtherFields + } + + for _, x := range x.MessageHashes { + if len(x) != 32 { + return errInvalidMessageHash + } + } + } else { + if x.GetPubsubTopic() == "" { + return errMissingPubsubTopic + } + + if len(x.ContentTopics) == 0 { + return errMissingContentTopics + } else if len(x.ContentTopics) > MaxContentTopics { + return errMaxContentTopics + } else { + for _, m := range x.ContentTopics { + if m == "" { + return errEmptyContentTopic + } + } } - } + if x.GetTimeStart() > 0 && x.GetTimeEnd() > 0 && x.GetTimeStart() > x.GetTimeEnd() { + return errInvalidTimeRange + } + } return nil } -func (x *HistoryRPC) ValidateQuery() error { - if x.RequestId == "" { - return errMissingRequestID +func (x *StoreQueryResponse) Validate(requestID string) error { + if x.RequestId != "" && x.RequestId != requestID { + return errRequestIDMismatch } - if x.Query == nil { - return errMissingQuery + if x.StatusCode == nil { + return errMissingStatusCode } - return x.Query.Validate() -} - -func (x *HistoryResponse) Validate() error { for _, m := range x.Messages { if err := m.Validate(); err != nil { return err @@ -51,17 +77,13 @@ func (x *HistoryResponse) Validate() error { return nil } -func (x *HistoryRPC) ValidateResponse(requestID string) error { - if x.RequestId == "" { - return errMissingRequestID - } - - if x.RequestId != requestID { - return errRequestIDMismatch +func (x *WakuMessageKeyValue) Validate() error { + if len(x.MessageHash) != 32 { + return errInvalidMessageHash } - if x.Response != nil { - return x.Response.Validate() + if x.Message != nil { + return x.Message.Validate() } return nil diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go new file mode 100644 index 000000000..be67671ec --- /dev/null +++ b/waku/v2/protocol/store/result.go @@ -0,0 +1,68 @@ +package store + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +// Result represents a valid response from a store node +type Result struct { + started bool + messages []*pb.WakuMessageKeyValue + store *WakuStore + storeRequest *pb.StoreQueryRequest + cursor []byte + peerID peer.ID +} + +func (r *Result) Cursor() []byte { + return r.cursor +} + +func (r *Result) IsComplete() bool { + return r.cursor == nil +} + +func (r *Result) PeerID() peer.ID { + return r.peerID +} + +func (r *Result) Query() *pb.StoreQueryRequest { + return r.storeRequest +} + +func (r *Result) PubsubTopic() string { + return r.storeRequest.GetPubsubTopic() +} + +func (r *Result) Next(ctx context.Context) (bool, error) { + if !r.started { + r.started = true + return len(r.messages) != 0, nil + } + + if r.IsComplete() { + r.cursor = nil + r.messages = nil + return false, nil + } + + newResult, err := r.store.next(ctx, r) + if err != nil { + return false, err + } + + r.cursor = newResult.cursor + r.messages = newResult.messages + + return !r.IsComplete(), nil +} + +func (r *Result) Messages() []*pb.WakuMessageKeyValue { + if !r.started { + return nil + } + return r.messages +}