Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use peerInfo instead of peerID #1269

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func Execute(options NodeOptions) error {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static,
peerID, err := wakuNode.AddPeer([]multiaddr.Multiaddr{*options.PeerExchange.Node}, wakupeerstore.Static,
pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
Expand Down Expand Up @@ -481,7 +481,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) {

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...)
_, err := wakuNode.AddPeer([]multiaddr.Multiaddr{addr}, wakupeerstore.Static, pubSubTopics, protocols...)
if err != nil {
return fmt.Errorf("could not add static peer: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/server/rest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {
protos = append(protos, protocol.ID(proto))
}

id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...)
id, err := a.node.AddPeer([]multiaddr.Multiaddr{addr}, peerstore.Static, topics, protos...)
if err != nil {
a.log.Error("failed to add peer", zap.Error(err))
writeErrOrResponse(w, err, nil)
Expand Down
2 changes: 1 addition & 1 deletion library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func AddPeer(instance *WakuInstance, address string, protocolID string) (string,
return "", err
}

peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
peerID, err := instance.node.AddPeer([]multiaddr.Multiaddr{ma}, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/common/storenode_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

type StorenodeRequestor interface {
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error)
}
34 changes: 34 additions & 0 deletions waku/v2/api/history/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) {
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
// Handle pinned storenodes
m.logger.Info("disconnecting storenode")

if m.storenodeConfigProvider == nil {
m.logger.Debug("storenodeConfigProvider not yet setup")
return nil
}

pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
if err != nil {
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
Expand Down Expand Up @@ -252,6 +258,11 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}
}

if m.storenodeConfigProvider == nil {
m.logger.Debug("storenodeConfigProvider not yet setup")
return nil
}

pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
if err != nil {
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
Expand Down Expand Up @@ -338,6 +349,29 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
return m.activeStorenode
}

func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo {
m.RLock()
defer m.RUnlock()

if m.storenodeConfigProvider == nil {
m.logger.Debug("storenodeConfigProvider not yet setup")
return peer.AddrInfo{}
}

storeNodes, err := m.storenodeConfigProvider.Storenodes()
if err != nil {
return peer.AddrInfo{}
}

for _, p := range storeNodes {
if p.ID == m.activeStorenode {
return p
}
}

return peer.AddrInfo{}
}

func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
return m.storenodeStatus(peerID) == connected
}
Expand Down
20 changes: 10 additions & 10 deletions waku/v2/api/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type HistoryRetriever struct {

type HistoryProcessor interface {
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error)
}

func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
Expand All @@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo
func (hr *HistoryRetriever) Query(
ctx context.Context,
criteria store.FilterCriteria,
storenodeID peer.ID,
storenode peer.AddrInfo,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
Expand Down Expand Up @@ -178,7 +178,7 @@ loop:
newCriteria.TimeStart = timeStart
newCriteria.TimeEnd = timeEnd

cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
queryCancel()

if err != nil {
Expand Down Expand Up @@ -241,7 +241,7 @@ loop:

func (hr *HistoryRetriever) createMessagesRequest(
ctx context.Context,
peerID peer.ID,
peerInfo peer.AddrInfo,
criteria store.FilterCriteria,
cursor []byte,
limit uint64,
Expand All @@ -257,7 +257,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
})

go func() {
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes)
resultCh <- struct {
storeCursor []byte
envelopesCount int
Expand All @@ -273,7 +273,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
}
} else {
go func() {
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
_, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false)
if err != nil {
logger.Error("failed to request store messages", zap.Error(err))
}
Expand All @@ -283,9 +283,9 @@ func (hr *HistoryRetriever) createMessagesRequest(
return
}

func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
requestID := protocol.GenerateRequestID()
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID))

logger.Debug("store.query",
logging.Timep("startTime", criteria.TimeStart),
Expand All @@ -307,12 +307,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
}

queryStart := time.Now()
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest)
queryDuration := time.Since(queryStart)
if err != nil {
logger.Error("error querying storenode", zap.Error(err))

hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err)

return nil, 0, err
}
Expand Down
8 changes: 4 additions & 4 deletions waku/v2/api/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelop
return nil
}

func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) {
}

func newMockHistoryProcessor() *mockHistoryProcessor {
Expand All @@ -92,7 +92,7 @@ func getInitialResponseKey(contentTopics []string) string {
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
}

func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
func (t *mockStore) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
result := &mockResult{}
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestSuccessBatchExecution(t *testing.T) {
ContentFilter: protocol.NewContentFilter("test", topics...),
}

err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.NoError(t, err)
}

Expand Down Expand Up @@ -246,6 +246,6 @@ func TestFailedBatchExecution(t *testing.T) {
ContentFilter: protocol.NewContentFilter("test", topics...),
}

err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.Error(t, err)
}
4 changes: 2 additions & 2 deletions waku/v2/api/missing/criteria_interest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type criteriaInterest struct {
peerID peer.ID
peerInfo peer.AddrInfo
contentFilter protocol.ContentFilter
lastChecked time.Time

Expand All @@ -19,7 +19,7 @@ type criteriaInterest struct {
}

func (c criteriaInterest) equals(other criteriaInterest) bool {
if c.peerID != other.peerID {
if c.peerInfo.ID != other.peerInfo.ID {
return false
}

Expand Down
8 changes: 4 additions & 4 deletions waku/v2/api/missing/default_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct {
store *store.WakuStore
}

func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize))
}

func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest)
}
12 changes: 6 additions & 6 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
}
}

func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) {
func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) {
m.criteriaInterestMu.Lock()
defer m.criteriaInterestMu.Unlock()

ctx, cancel := context.WithCancel(m.ctx)
criteriaInterest := criteriaInterest{
peerID: peerID,
peerInfo: peerInfo,
contentFilter: contentFilter,
lastChecked: m.timesource.Now().Add(-m.params.delay),
ctx: ctx,
Expand Down Expand Up @@ -190,7 +190,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}

m.logger.Error("could not fetch history",
zap.Stringer("peerID", interest.peerID),
zap.Stringer("peerID", interest.peerInfo.ID),
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
zap.Strings("contentTopics", contentTopics))
continue
Expand Down Expand Up @@ -233,7 +233,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
contentTopics := interest.contentFilter.ContentTopics.ToList()

logger := m.logger.With(
zap.Stringer("peerID", interest.peerID),
zap.Stringer("peerID", interest.peerInfo.ID),
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
logging.Epoch("from", interest.lastChecked),
Expand All @@ -252,7 +252,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,

return m.storenodeRequestor.Query(
ctx,
interest.peerID,
interest.peerInfo,
storeQueryRequest,
)
}, logger, "retrieving history to check for missing messages")
Expand Down Expand Up @@ -335,7 +335,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
}

return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest)
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/publish/default_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct {
store *store.WakuStore
}

func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
var opts []store.RequestOption
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(peerID))
opts = append(opts, store.WithPeerAddr(peerID.Addrs...))
opts = append(opts, store.WithPaging(false, pageSize))
opts = append(opts, store.IncludeData(false))

Expand Down
10 changes: 5 additions & 5 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ISentCheck interface {

type StorenodeMessageVerifier interface {
// MessagesExist returns a list of the messages it found from a list of message hashes
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
}

// MessageSentCheck tracks the outgoing messages and check against store node
Expand Down Expand Up @@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() {
}

func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
selectedPeer := m.storenodeCycle.GetActiveStorenode()
if selectedPeer == "" {
selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo()
if selectedPeer.ID == "" {
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
return []common.Hash{}
}
Expand All @@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}

m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes))

queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err))
return []common.Hash{}
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestPeriodicKeepAlive(t *testing.T) {

node2MAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String())
require.NoError(t, err)
_, err = wakuNode.AddPeer(node2MAddr, wps.Static, []string{"waku/rs/1/1"})
_, err = wakuNode.AddPeer([]multiaddr.Multiaddr{node2MAddr}, wps.Static, []string{"waku/rs/1/1"})
require.NoError(t, err)

time.Sleep(time.Second * 2)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro

// AddPeer is used to add a peer and the protocols it support to the node peerstore
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...)
if err != nil {
return "", err
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode2.Stop()

peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)

subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode3.Stop()

_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
require.NoError(t, err)
time.Sleep(2 * time.Second)
// NODE2 should have returned the message received via filter
Expand Down
Loading
Loading