-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] ws controller error handling #6798
base: master
Are you sure you want to change the base?
[Access] ws controller error handling #6798
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6798 +/- ##
==========================================
+ Coverage 41.01% 41.06% +0.04%
==========================================
Files 2104 2104
Lines 185030 185076 +46
==========================================
+ Hits 75892 75997 +105
+ Misses 102767 102712 -55
+ Partials 6371 6367 -4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…6642-ws-controller-error-handling
One thing I am not sure of is error codes. I introduced them to categorize and be able to compare actual/expected errors in unit tests. I need some advice on how to do it better. @peterargue |
} | ||
|
||
func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) { | ||
dp, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.communicationChannel) | ||
// register new provider | ||
provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I wrong, or we talk about using a writeResponse
instead of the channel in data providers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, we thought that the use of a callback here would prevent a possible write to a closed channel but it turned out it would not. So, it doesn't matter what approach to use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this works as implemented. please add comments around the channel's lifecycle about how you know it's safe. particularly covering how and why the producers are not responsible for closing the channel.
// drain the channel as some providers may still send data to it after this routine shutdowns | ||
// so, in order to not run into deadlock there should be at least 1 reader on the channel | ||
go func() { | ||
for range c.multiplexedStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rather than draining in a second loop here, you can just refactor the existing loop to drop messages after the context is closed (instead of returning)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to drain the channel in any case, not only when ctx is canceled
@Guitarheroua can you fix these conflicts so we can merge in master? |
…lachyn/6642-ws-controller-error-handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
@@ -8,6 +8,8 @@ type ListSubscriptionsMessageRequest struct { | |||
// ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. | |||
// It contains a list of active subscriptions for the current WebSocket connection. | |||
type ListSubscriptionsMessageResponse struct { | |||
BaseMessageResponse | |||
Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` | |||
ClientMessageID string `json:"message_id"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should match with the other message types
ClientMessageID string `json:"message_id"` | |
ClientMessageID string `json:"message_id,omitempty"` |
Code int `json:"code"` | ||
Message string `json:"message"` | ||
Action string `json:"action,omitempty"` | ||
SubscriptionID string `json:"subscription_id,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given SubscriptionID
is also included in BaseMessageResponse
, is it also needed here?
// and avoid timeouts. | ||
// | ||
// Expected errors during normal operation: | ||
// - context.Canceled if the client disconnected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.Canceled
is never returned
err := c.conn.WriteControl(websocket.PingMessage, time.Now().Add(WriteWait)) | ||
if err != nil { | ||
if errors.Is(err, websocket.ErrCloseSent) { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an error condition?
if !ok { | ||
return fmt.Errorf("communication channel closed, no error occurred") | ||
return fmt.Errorf("multiplexed stream closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an error condition?
} | ||
|
||
func (c *Controller) parseAndValidateMessage(message json.RawMessage) (models.BaseMessageRequest, interface{}, error) { | ||
func (c *Controller) parseAndValidateMessage(ctx context.Context, message json.RawMessage) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does more than parseAndValidateMessage
. how about renaming to handleMessage
?
// | ||
// No errors are expected during normal operation. All errors are considered benign. | ||
// Expected errors during normal operation: | ||
// - context.Canceled if the client disconnected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't look like this method returns context.Canceled
In this PR I identified and fixed all the places where we have to communicate with a client by sending an either OK or error response. I implemented new data flows (unsubscribe, list_subscription) and added tests for each. What's more, I unified and merged both code and tests with Ulyana's keepalive routine PR (mentioned below). Besides, I added a client ID to messages to see how the controller would look like and to avoid refactoring in the future.
As there's lots of work done, I'm pretty sure some scenarios might have bugs. So, it should be well reviewed. (maybe some cases were not covered in tests)
What's missing/to be done:
We need to document a WebSocket controller, its routines, error handling, and shutdown process. Maybe even a scenario of every action.
Because of the asynchronous nature of the controller, the tests become quite hard to understand. We need to write good docs for it. I'd also think of creating better abstractions (maybe wrapping mocks?) and refactoring them to reduce complexity (think of using https://pkg.go.dev/github.com/stretchr/testify/mock#Call.NotBefore)
Firstly, error code names should be better. Also, I don't know if we should use them. Maybe sentinel errors are a better approach.
Closes #6642 #6637 #6724
This PR depends on #6757 #6762