Skip to content

Commit

Permalink
fix: handle partial errors durin subscribe and return failed content-…
Browse files Browse the repository at this point in the history
…topic details
  • Loading branch information
chaitanyaprem committed Sep 19, 2023
1 parent f053b8f commit 6457022
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
14 changes: 10 additions & 4 deletions library/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -884,15 +884,21 @@ Creates a subscription to a filter full node matching a content filter..

A status code. Refer to the [`Status codes`](#status-codes) section for possible values.

If the function is executed succesfully, `onOkCb` will receive the subscription details.
If the function is executed succesfully, `onOkCb` will receive the following subscription details along with any partial errors.

For example:

```json
{
"peerID": "....",
"pubsubTopic": "...",
"contentTopics": [...]
"subscriptions" : [
{
"ID": "<subscriptionID>",
"peerID": "....",
"pubsubTopic": "...",
"contentTopics": [...]
}
],
"error" : "subscriptions failed for contentTopics:<topicA>,.." // Empty if all subscriptions are succesful
}
```

Expand Down
2 changes: 1 addition & 1 deletion library/c/api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import "github.com/waku-org/go-waku/library"
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription
// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures
//
//export waku_filter_subscribe
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int {
Expand Down
13 changes: 10 additions & 3 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
}, nil
}

type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitEmpty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
cf, err := toContentFilter(filterJSON)
Expand Down Expand Up @@ -61,7 +66,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}

subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil {
if err != nil && subscriptions == nil {
return "", err
}

Expand All @@ -72,8 +77,10 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}
}(subscriptionDetails)
}

return marshalJSON(subscriptions)
var subResult subscribeResult
subResult.Subscriptions = subscriptions
subResult.Error = err.Error()
return marshalJSON(subResult)
}

// FilterPing is used to determine if a peer has an active subscription
Expand Down
15 changes: 12 additions & 3 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"net/http"
"strings"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -256,6 +257,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
// Subscribe setups a subscription to receive messages that match a specific content filter
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
Expand Down Expand Up @@ -291,6 +293,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
if err != nil {
return nil, err
}
failedContentTopics := []string{}
subscriptions := make([]*SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
Expand All @@ -299,12 +302,18 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
return nil, err
wf.log.Error("Failed to subscribe for conentTopics ",
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter.Topic, cFilter.ContentTopics))
}
return subscriptions, nil

if len(failedContentTopics) > 0 {
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return subscriptions, nil
}
}

// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
Expand Down

0 comments on commit 6457022

Please sign in to comment.