Skip to content

Commit

Permalink
logs: reconnect when connection is lost
Browse files Browse the repository at this point in the history
  • Loading branch information
brmzkw committed Jul 30, 2024
1 parent a851316 commit a655478
Showing 1 changed file with 117 additions and 52 deletions.
169 changes: 117 additions & 52 deletions pkg/koyeb/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gorilla/websocket"
"github.com/koyeb/koyeb-cli/pkg/koyeb/errors"
"github.com/koyeb/koyeb-cli/pkg/koyeb/renderer"
log "github.com/sirupsen/logrus"
)

type LogsAPIClient struct {
Expand Down Expand Up @@ -49,8 +50,6 @@ type WatchLogsQuery struct {
deploymentId string
instanceId string
since time.Time
conn *websocket.Conn
ticker *time.Ticker
full bool // Whether to display full IDs
}

Expand Down Expand Up @@ -122,6 +121,35 @@ func (query *WatchLogsQuery) ParseTime(date string) time.Time {
return parsed
}

func (query *WatchLogsQuery) reconnect(isFirstconnection bool) (*WebsocketPingConnection, error) {
conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header)
if err != nil {
if isFirstconnection {
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to create the websocket connection",
Additional: []string{
"It usually happens because the API URL in your configuration is incorrect",
},
Orig: err,
Solution: "Fix the error and try again",
}
}
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "we failed to reconnect to the websocket connection",
Additional: []string{
"The websocket to the logs API was closed and we couldn't reconnect.",
"If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new",
},
Orig: err,
Solution: "Try again in a few seconds",
}
}
ret := NewWebsocketPingConnection(conn)
return &ret, nil
}

func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) {
queryParams := url.Values{}
if query.logType != "" {
Expand All @@ -141,79 +169,117 @@ func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) {
}
query.client.url.RawQuery = queryParams.Encode()

conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header)
conn, err := query.reconnect(true)
if err != nil {
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to create the websocket connection",
Additional: []string{
"It usually happens because the API URL in your configuration is incorrect",
},
Orig: err,
Solution: "Fix the error and try again",
}
return nil, err
}
query.conn = conn

// Read logs from the websocket connection
logs := make(chan WatchLogsEntry)

go func() {
var lastLogReceived *LogLine

for {
msg := LogLine{}
err := conn.ReadJSON(&msg)
err := conn.Conn.ReadJSON(&msg)

if err != nil {
logs <- WatchLogsEntry{Err: &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to read the logs from the websocket connection",
Additional: []string{
"Unfortunately, we couldn't read the logs from the websocket connection",
"If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new",
},
Orig: err,
Solution: "Try again in a few seconds",
}}
} else {
logs <- WatchLogsEntry{
Stream: msg.Result.Labels.Stream,
Msg: msg.Result.Msg,
Date: query.ParseTime(msg.Result.CreatedAt),
Labels: msg.Result.Labels,
// Stop sending ping messages to the websocket connection
conn.Stop()

// Normal closure, close the channel and return
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
close(logs)
return
}

// Abnormal closure, try to reconnect. Add a delay to avoid flooding the API with reconnections
delay := 10 * time.Second
log.Debugf("Error while fetching the logs: %v, reconnecting in %s...", err, delay)
time.Sleep(delay)

// Update the querystring to set the ?start parameter to the
// date of the last log received, to avoid receiving the same
// logs again
if lastLogReceived != nil && lastLogReceived.Result.CreatedAt != "" {
queryParams.Del("start")
queryParams.Add("start", lastLogReceived.Result.CreatedAt)
query.client.url.RawQuery = queryParams.Encode()
}

conn, err = query.reconnect(false)
if err == nil {
log.Debugf("Reconnection successful")
continue
}
// Reconnection failed, return the error to the caller and close the channel
log.Debugf("Unable to reconnect")
logs <- WatchLogsEntry{Err: err}
close(logs)
return
}

// Sometimes, for example when passing a future date in --since, the
// first log message is empty.
if msg.Result.CreatedAt == "" {
continue
}

// If the last log received is the same as the current one, ignore
// it. This can happens when there is a connection error: we
// reconnect and set the ?start parameter to the last log received,
// which is then sent again.
if lastLogReceived != nil && msg == *lastLogReceived {
continue
}

lastLogReceived = &msg
logs <- WatchLogsEntry{
Stream: msg.Result.Labels.Stream,
Msg: msg.Result.Msg,
Date: query.ParseTime(msg.Result.CreatedAt),
Labels: msg.Result.Labels,
}
}
}()
// Consume the logs channel, forward them to the caller. Also send a ping every 10 seconds to keep the connection alive.
ret := make(chan WatchLogsEntry)
query.ticker = time.NewTicker(10 * time.Second)
return logs, nil
}

// WebsocketPingConnection is a wrapper around a websocket connection that sends
// a ping message every few seconds. The Stop() method should be called to stop
// sending ping messages.
type WebsocketPingConnection struct {
Conn *websocket.Conn
stopChan chan (struct{})
}

func NewWebsocketPingConnection(conn *websocket.Conn) WebsocketPingConnection {
ret := WebsocketPingConnection{Conn: conn, stopChan: make(chan struct{})}

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case line := <-logs:
ret <- line
if line.Err != nil {
close(ret)
return
}
case tick := <-query.ticker.C:
case tick := <-ticker.C:
err := conn.WriteMessage(websocket.PingMessage, []byte(tick.String()))
if err != nil {
ret <- WatchLogsEntry{Err: err}
close(ret)
log.Debugf("Unable to send a ping message to the websocket connection: %v", err)
return
}
case <-ret.stopChan:
close(ret.stopChan)
return
}
}
}()
return ret, nil
return ret
}

func (query *WatchLogsQuery) Close() {
if query.conn != nil {
query.conn.Close()
}
if query.ticker != nil {
query.ticker.Stop()
}
// Stop sendings ping messages to the websocket connection.
func (conn *WebsocketPingConnection) Stop() {
conn.stopChan <- struct{}{}
}

// PrintAll prints all the logs returned by WatchLogsQuery.Execute(). It returns
Expand All @@ -224,7 +290,6 @@ func (query *WatchLogsQuery) PrintAll() error {
if err != nil {
return err
}
defer query.Close()
for log := range logs {
if log.Err != nil {
return log.Err
Expand Down

0 comments on commit a655478

Please sign in to comment.