Skip to content

Commit

Permalink
logs: reconnect when connection is lost
Browse files Browse the repository at this point in the history
This is not fully working, as the logs will be repeated when the
connection is re-established.
  • Loading branch information
brmzkw committed Jul 30, 2024
1 parent a851316 commit 1fc655d
Showing 1 changed file with 93 additions and 52 deletions.
145 changes: 93 additions & 52 deletions pkg/koyeb/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gorilla/websocket"
"github.com/koyeb/koyeb-cli/pkg/koyeb/errors"
"github.com/koyeb/koyeb-cli/pkg/koyeb/renderer"
log "github.com/sirupsen/logrus"
)

type LogsAPIClient struct {
Expand Down Expand Up @@ -49,8 +50,6 @@ type WatchLogsQuery struct {
deploymentId string
instanceId string
since time.Time
conn *websocket.Conn
ticker *time.Ticker
full bool // Whether to display full IDs
}

Expand Down Expand Up @@ -122,6 +121,35 @@ func (query *WatchLogsQuery) ParseTime(date string) time.Time {
return parsed
}

func (query *WatchLogsQuery) reconnect(isFirstconnection bool) (*WebsocketPingConnection, error) {
conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header)
if err != nil {
if isFirstconnection {
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to create the websocket connection",
Additional: []string{
"It usually happens because the API URL in your configuration is incorrect",
},
Orig: err,
Solution: "Fix the error and try again",
}
}
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "we failed to reconnect to the websocket connection",
Additional: []string{
"The websocket to the logs API was closed and we couldn't reconnect.",
"If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new",
},
Orig: err,
Solution: "Try again in a few seconds",
}
}
ret := NewWebsocketPingConnection(conn)
return &ret, nil
}

func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) {
queryParams := url.Values{}
if query.logType != "" {
Expand All @@ -141,79 +169,93 @@ func (query *WatchLogsQuery) Execute() (chan WatchLogsEntry, error) {
}
query.client.url.RawQuery = queryParams.Encode()

conn, _, err := websocket.DefaultDialer.Dial(query.client.url.String(), query.client.header)
conn, err := query.reconnect(true)
if err != nil {
return nil, &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to create the websocket connection",
Additional: []string{
"It usually happens because the API URL in your configuration is incorrect",
},
Orig: err,
Solution: "Fix the error and try again",
}
return nil, err
}
query.conn = conn

// Read logs from the websocket connection
logs := make(chan WatchLogsEntry)

go func() {
for {
msg := LogLine{}
err := conn.ReadJSON(&msg)
err := conn.Conn.ReadJSON(&msg)

if err != nil {
logs <- WatchLogsEntry{Err: &errors.CLIError{
What: "Error while fetching the logs",
Why: "unable to read the logs from the websocket connection",
Additional: []string{
"Unfortunately, we couldn't read the logs from the websocket connection",
"If the problem persists, please create an issue on https://github.com/koyeb/koyeb-cli/issues/new",
},
Orig: err,
Solution: "Try again in a few seconds",
}}
} else {
logs <- WatchLogsEntry{
Stream: msg.Result.Labels.Stream,
Msg: msg.Result.Msg,
Date: query.ParseTime(msg.Result.CreatedAt),
Labels: msg.Result.Labels,
// Stop sending ping messages to the websocket connection
conn.Stop()

// Normal closure, close the channel and return
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
close(logs)
return
}
// Abnormal closure, try to reconnect. Add a delay to avoid flooding the API with reconnections
delay := 10 * time.Second
log.Debugf("Error while fetching the logs: %v, reconnecting in %s...", err, delay)
time.Sleep(delay)

conn, err = query.reconnect(false)
if err == nil {
log.Debugf("Reconnection successful")
continue
}
// Reconnection failed, return the error to the caller and close the channel
log.Debugf("Unable to reconnect")
logs <- WatchLogsEntry{Err: err}
close(logs)
return
}
// Sometimes, for example when passing a future date in --since, the first log message is empty. Ignore it.
if msg.Result.CreatedAt == "" {
continue
}
logs <- WatchLogsEntry{
Stream: msg.Result.Labels.Stream,
Msg: msg.Result.Msg,
Date: query.ParseTime(msg.Result.CreatedAt),
Labels: msg.Result.Labels,
}
}
}()
// Consume the logs channel, forward them to the caller. Also send a ping every 10 seconds to keep the connection alive.
ret := make(chan WatchLogsEntry)
query.ticker = time.NewTicker(10 * time.Second)
return logs, nil
}

// WebsocketPingConnection is a wrapper around a websocket connection that sends
// a ping message every few seconds. The Stop() method should be called to stop
// sending ping messages.
type WebsocketPingConnection struct {
Conn *websocket.Conn
stopChan chan (struct{})
}

func NewWebsocketPingConnection(conn *websocket.Conn) WebsocketPingConnection {
ret := WebsocketPingConnection{Conn: conn, stopChan: make(chan struct{})}

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case line := <-logs:
ret <- line
if line.Err != nil {
close(ret)
return
}
case tick := <-query.ticker.C:
case tick := <-ticker.C:
err := conn.WriteMessage(websocket.PingMessage, []byte(tick.String()))
if err != nil {
ret <- WatchLogsEntry{Err: err}
close(ret)
log.Debugf("Unable to send a ping message to the websocket connection: %v", err)
return
}
case <-ret.stopChan:
close(ret.stopChan)
return
}
}
}()
return ret, nil
return ret
}

func (query *WatchLogsQuery) Close() {
if query.conn != nil {
query.conn.Close()
}
if query.ticker != nil {
query.ticker.Stop()
}
// Stop sendings ping messages to the websocket connection.
func (conn *WebsocketPingConnection) Stop() {
conn.stopChan <- struct{}{}
}

// PrintAll prints all the logs returned by WatchLogsQuery.Execute(). It returns
Expand All @@ -224,7 +266,6 @@ func (query *WatchLogsQuery) PrintAll() error {
if err != nil {
return err
}
defer query.Close()
for log := range logs {
if log.Err != nil {
return log.Err
Expand Down

0 comments on commit 1fc655d

Please sign in to comment.