Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible goroutine leaks in *Client.SendEvent() #47

Closed
greenhandatsjtu opened this issue Mar 22, 2021 · 2 comments
Closed

Possible goroutine leaks in *Client.SendEvent() #47

greenhandatsjtu opened this issue Mar 22, 2021 · 2 comments

Comments

@greenhandatsjtu
Copy link

greenhandatsjtu commented Mar 22, 2021

Hi there, recently I found this SDK, and it's awesome! However, when I use *Client.SendEvent in iotservice package, I encountered a weird problem, 50 minutes after calling this function, a log comes out as follows:
image

2021/03/22 11:41:47 ERROR put token error: amqp: session closed

And I find a issue explaning this: #38 (comment) . As the comment explains, *Client.SendEvent will result in calling *Client.putTokenContinuously:

// putTokenContinuously in v0.7.0
func (c *Client) putTokenContinuously(ctx context.Context, conn *amqp.Client) error {
	const (
		tokenUpdateInterval = time.Hour

		// we need to update tokens before they expire to prevent disconnects
		// from azure, without interrupting the message flow
		tokenUpdateSpan = 10 * time.Minute
	)

	sess, err := conn.NewSession()
	if err != nil {
		return err
	}
	defer sess.Close(context.Background())

	if err := c.putToken(ctx, sess, tokenUpdateInterval); err != nil {
		return err
	}

	go func() {
		ticker := time.NewTimer(tokenUpdateInterval - tokenUpdateSpan)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				if err := c.putToken(context.Background(), sess, tokenUpdateInterval); err != nil {
					c.logger.Errorf("put token error: %s", err)
					return
				}
				ticker.Reset(tokenUpdateInterval - tokenUpdateSpan)
				c.logger.Debugf("token updated")
			case <-c.done:
				return
			}
		}
	}()
	return nil
}

This function will start a new goroutine to putToken to update session token every 50 minutes (tokenUpdateInterval - tokenUpdateSpan). However, defer sess.Close(context.Background()) means that the session will be closed when putTokenContinuously returns. As a result, putToken called in the goroutine will always fail (which will log: put token error: amqp: session closed). I notice that this problem has been fixed in recent commit (though not released yet), but here's another question:

Each time I call the *Client.SendEvent(), it will start a new session and start a goroutine to putToken every 50 minutes until the Client is closed, which means each time I send an event, a new endless goroutine is created to putToken continuously, but all I need is just sending an event, after that, I don't need to maintain this session anymore (because next time I call *Client.SendEvent(), it will start another new session for me). What's more, as I have to call *Client.SendEvent() frequently, it could lead to a large number of goroutine leak.
My solution to this problem is that the *Client.newSession() called by *Client.SendEvent() don't call putTokenContinuously, it just call putToken and return the new session.

@amenzhinsky
Copy link
Owner

Hi,

Can you try the latest master, this should fix the error

@greenhandatsjtu
Copy link
Author

That's it. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants