diff --git a/pkg/koyeb/logs.go b/pkg/koyeb/logs.go index 6f1f3e32..40b27cf2 100644 --- a/pkg/koyeb/logs.go +++ b/pkg/koyeb/logs.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/koyeb/koyeb-cli/pkg/koyeb/errors" "github.com/koyeb/koyeb-cli/pkg/koyeb/renderer" + log "github.com/sirupsen/logrus" ) type LogsAPIClient struct { @@ -49,8 +50,6 @@ type WatchLogsQuery struct { deploymentId string instanceId string since time.Time - conn *websocket.Conn - ticker *time.Ticker full bool // Whether to display full IDs } @@ -122,6 +121,35 @@ func (query *WatchLogsQuery) ParseTime(date string) time.Time { return parsed } +func (query *WatchLogsQuery) reconnect(isFirstconnection bool) (*WebsocketPingConnection, error) { + conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header) + if err != nil { + if isFirstconnection { + return nil, &errors.CLIError{ + What: "Error while fetching the logs", + Why: "unable to create the websocket connection", + Additional: []string{ + "It usually happens because the API URL in your configuration is incorrect", + }, + Orig: err, + Solution: "Fix the error and try again", + } + } + return nil, &errors.CLIError{ + What: "Error while fetching the logs", + Why: "we failed to reconnect to the websocket connection", + Additional: []string{ + "The websocket to the logs API was closed and we couldn't reconnect.", + "If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new", + }, + Orig: err, + Solution: "Try again in a few seconds", + } + } + ret := NewWebsocketPingConnection(conn) + return &ret, nil +} + func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) { queryParams := url.Values{} if query.logType != "" { @@ -141,79 +169,117 @@ func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) { } query.client.url.RawQuery = queryParams.Encode() - conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header) + conn, err := query.reconnect(true) if err != nil { - return nil, &errors.CLIError{ - What: "Error while fetching the logs", - Why: "unable to create the websocket connection", - Additional: []string{ - "It usually happens because the API URL in your configuration is incorrect", - }, - Orig: err, - Solution: "Fix the error and try again", - } + return nil, err } - query.conn = conn - // Read logs from the websocket connection logs := make(chan WatchLogsEntry) + go func() { + var lastLogReceived *LogLine + for { msg := LogLine{} - err := conn.ReadJSON(&msg) + err := conn.Conn.ReadJSON(&msg) + if err != nil { - logs <- WatchLogsEntry{Err: &errors.CLIError{ - What: "Error while fetching the logs", - Why: "unable to read the logs from the websocket connection", - Additional: []string{ - "Unfortunately, we couldn't read the logs from the websocket connection", - "If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new", - }, - Orig: err, - Solution: "Try again in a few seconds", - }} - } else { - logs <- WatchLogsEntry{ - Stream: msg.Result.Labels.Stream, - Msg: msg.Result.Msg, - Date: query.ParseTime(msg.Result.CreatedAt), - Labels: msg.Result.Labels, + // Stop sending ping messages to the websocket connection + conn.Stop() + + // Normal closure, close the channel and return + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + close(logs) + return + } + + // Abnormal closure, try to reconnect. Add a delay to avoid flooding the API with reconnections + delay := 10 * time.Second + log.Debugf("Error while fetching the logs: %v, reconnecting in %s...", err, delay) + time.Sleep(delay) + + // Update the querystring to set the ?start parameter to the + // date of the last log received, to avoid receiving the same + // logs again + if lastLogReceived != nil && lastLogReceived.Result.CreatedAt != "" { + queryParams.Del("start") + queryParams.Add("start", lastLogReceived.Result.CreatedAt) + query.client.url.RawQuery = queryParams.Encode() + } + + conn, err = query.reconnect(false) + if err == nil { + log.Debugf("Reconnection successful") + continue } + // Reconnection failed, return the error to the caller and close the channel + log.Debugf("Unable to reconnect") + logs <- WatchLogsEntry{Err: err} + close(logs) + return + } + + // Sometimes, for example when passing a future date in --since, the + // first log message is empty. + if msg.Result.CreatedAt == "" { + continue + } + + // If the last log received is the same as the current one, ignore + // it. This can happens when there is a connection error: we + // reconnect and set the ?start parameter to the last log received, + // which is then sent again. + if lastLogReceived != nil && msg == *lastLogReceived { + continue + } + + lastLogReceived = &msg + logs <- WatchLogsEntry{ + Stream: msg.Result.Labels.Stream, + Msg: msg.Result.Msg, + Date: query.ParseTime(msg.Result.CreatedAt), + Labels: msg.Result.Labels, } } }() - // Consume the logs channel, forward them to the caller. Also send a ping every 10 seconds to keep the connection alive. - ret := make(chan WatchLogsEntry) - query.ticker = time.NewTicker(10 * time.Second) + return logs, nil +} + +// WebsocketPingConnection is a wrapper around a websocket connection that sends +// a ping message every few seconds. The Stop() method should be called to stop +// sending ping messages. +type WebsocketPingConnection struct { + Conn *websocket.Conn + stopChan chan (struct{}) +} + +func NewWebsocketPingConnection(conn *websocket.Conn) WebsocketPingConnection { + ret := WebsocketPingConnection{Conn: conn, stopChan: make(chan struct{})} + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { select { - case line := <-logs: - ret <- line - if line.Err != nil { - close(ret) - return - } - case tick := <-query.ticker.C: + case tick := <-ticker.C: err := conn.WriteMessage(websocket.PingMessage, []byte(tick.String())) if err != nil { - ret <- WatchLogsEntry{Err: err} - close(ret) + log.Debugf("Unable to send a ping message to the websocket connection: %v", err) return } + case <-ret.stopChan: + close(ret.stopChan) + return } } }() - return ret, nil + return ret } -func (query *WatchLogsQuery) Close() { - if query.conn != nil { - query.conn.Close() - } - if query.ticker != nil { - query.ticker.Stop() - } +// Stop sendings ping messages to the websocket connection. +func (conn *WebsocketPingConnection) Stop() { + conn.stopChan <- struct{}{} } // PrintAll prints all the logs returned by WatchLogsQuery.Execute(). It returns @@ -224,7 +290,6 @@ func (query *WatchLogsQuery) PrintAll() error { if err != nil { return err } - defer query.Close() for log := range logs { if log.Err != nil { return log.Err