Skip to content

Commit

Permalink
Merge branch 'master' into fix/rest-api-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Nov 24, 2023
2 parents 56ad0bd + b59a498 commit d88a83a
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,19 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
if !ok { // Joins topic if node hasn't joined yet
err := w.pubsub.RegisterTopicValidator(topic, w.topicValidator(topic))
if err != nil {
w.log.Error("failed to register topic validator", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}

newTopic, err := w.pubsub.Join(string(topic))
if err != nil {
w.log.Error("failed to join pubsubTopic", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}

err = newTopic.SetScoreParams(w.topicParams)
if err != nil {
w.log.Error("failed to set score params", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}

Expand All @@ -199,11 +202,13 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscriptionDetails, error) {
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()
w.log.Info("subscribing to underlying pubsubTopic", zap.String("pubsubTopic", topic))

result, ok := w.topics[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
w.log.Error("failed to upsert topic", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -333,6 +338,7 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
w.log.Error("failed to derive pubsubTopic", zap.Error(err), zap.String("contentTopic", contentTopic))
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic)
Expand Down Expand Up @@ -374,6 +380,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
for _, opt := range optList {
err := opt(params)
if err != nil {
w.log.Error("failed to apply option", zap.Error(err))
return nil, err
}
}
Expand All @@ -392,6 +399,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
_, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic)
if err != nil {
//TODO: Handle partial errors.
w.log.Error("failed to subscribe to pubsubTopic", zap.Error(err), zap.String("pubsubTopic", cFilter.PubsubTopic))
return nil, err
}
}
Expand Down Expand Up @@ -428,6 +436,8 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co

pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
w.log.Error("failed to derive pubsubTopic from contentFilter", zap.String("pubsubTopic", contentFilter.PubsubTopic),
zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
return err
}

Expand All @@ -439,6 +449,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
pubsubUnsubscribe := false
sub, ok := w.topics[pubSubTopic]
if !ok {
w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic))
return errors.New("not subscribed to topic")
}

Expand Down Expand Up @@ -478,7 +489,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error {

pubSubTopic := topicData.subscription.Topic()
w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic))
w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic))

topicData.subscription.Cancel()
topicData.topicEventHandler.Cancel()
Expand All @@ -487,11 +498,18 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio

err := topicData.topic.Close()
if err != nil {
w.log.Error("failed to close the pubsubTopic", zap.String("topic", pubSubTopic))
return err
}

w.RemoveTopicValidator(pubSubTopic)

err = w.pubsub.UnregisterTopicValidator(pubSubTopic)
if err != nil {
w.log.Error("failed to unregister topic validator", zap.String("topic", pubSubTopic))
return err
}

delete(w.topics, pubSubTopic)

return w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})
Expand Down

0 comments on commit d88a83a

Please sign in to comment.