diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 157f33e..a67f953 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: - go-version: [1.20.x] + go-version: [1.21.x] runs-on: ubuntu-latest @@ -38,7 +38,7 @@ jobs: strategy: matrix: - go-version: [1.20.x] + go-version: [1.21.x] runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index 75289f0..57f4293 100644 --- a/Makefile +++ b/Makefile @@ -54,8 +54,8 @@ DOCKER_REGISTRY_REPO ?= mattermost/${APP_NAME}-daily DOCKER_USER ?= user DOCKER_PASSWORD ?= password ## Docker Images -DOCKER_IMAGE_GO += "golang:${GO_VERSION}@sha256:dd9ad81920b63c7f9f18823d888d5fdcc7e7516086fd16654d07bc437f0e2427" -DOCKER_IMAGE_GOLINT += "golangci/golangci-lint:v1.52.2@sha256:5fa6a92ab28ca3421c88d2b6cd794c9759d05a999aceca73053d014aad41b9d3" +DOCKER_IMAGE_GO += "golang:${GO_VERSION}@sha256:27b021393d0e0dfffc6cd6cca5e7836ac59f5ac98724c5d6b3b0a82199d275c5" +DOCKER_IMAGE_GOLINT += "golangci/golangci-lint:v1.54.2@sha256:abe731fe6bb335a30eab303a41dd5c2b630bb174372a4da08e3d42eab5324127" DOCKER_IMAGE_DOCKERLINT += "hadolint/hadolint:v2.9.2@sha256:d355bd7df747a0f124f3b5e7b21e9dafd0cb19732a276f901f0fdee243ec1f3b" DOCKER_IMAGE_COSIGN += "bitnami/cosign:1.8.0@sha256:8c2c61c546258fffff18b47bb82a65af6142007306b737129a7bd5429d53629a" DOCKER_IMAGE_GH_CLI += "registry.internal.mattermost.com/images/build-ci:3.16.0@sha256:f6a229a9ababef3c483f237805ee4c3dbfb63f5de4fbbf58f4c4b6ed8fcd34b6" diff --git a/cmd/transcriber/call/tracks.go b/cmd/transcriber/call/tracks.go index 66720ea..a44d2ae 100644 --- a/cmd/transcriber/call/tracks.go +++ b/cmd/transcriber/call/tracks.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" "io" - "log" + "log/slog" "os" "path/filepath" "time" @@ -59,11 +59,11 @@ func (t *Transcriber) handleTrack(ctx any) error { return fmt.Errorf("failed to parse track ID: %w", err) } if trackType != client.TrackTypeVoice { - log.Printf("ignoring non voice track") + slog.Debug("ignoring non voice track", slog.String("trackID", trackID)) return nil } if mt := track.Codec().MimeType; mt != webrtc.MimeTypeOpus { - log.Printf("ignoring unsupported mimetype %s for track %s", mt, trackID) + slog.Warn("ignoring unsupported mimetype for track", slog.String("mimeType", mt), slog.String("trackID", trackID)) return nil } @@ -89,21 +89,24 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri filename: filepath.Join(getDataDir(), fmt.Sprintf("%s_%s.ogg", user.Id, sessionID)), } - log.Printf("processing voice track of %s", user.Username) - log.Printf("start reading loop for track %s", ctx.trackID) + slog.Debug("processing voice track", + slog.String("username", user.Username), + slog.String("sessionID", sessionID), + slog.String("trackID", ctx.trackID)) + slog.Debug("start reading loop for track", slog.String("trackID", ctx.trackID)) defer func() { - log.Printf("exiting reading loop for track %s", ctx.trackID) + slog.Debug("exiting reading loop for track", slog.String("trackID", ctx.trackID)) select { case t.trackCtxs <- ctx: default: - log.Printf("failed to enqueue track context: %+v", ctx) + slog.Error("failed to enqueue track context", slog.Any("ctx", ctx)) } t.liveTracksWg.Done() }() oggWriter, err := ogg.NewWriter(ctx.filename, trackInAudioRate, trackAudioChannels) if err != nil { - log.Printf("failed to created ogg writer: %s", err) + slog.Error("failed to created ogg writer", slog.String("err", err.Error()), slog.String("trackID", ctx.trackID)) return } defer oggWriter.Close() @@ -114,7 +117,9 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri pkt, _, readErr := track.ReadRTP() if readErr != nil { if !errors.Is(readErr, io.EOF) { - log.Printf("failed to read RTP packet for track %s", ctx.trackID) + slog.Error("failed to read RTP packet for track", + slog.String("err", readErr.Error()), + slog.String("trackID", ctx.trackID)) } return } @@ -127,7 +132,9 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri var gap uint64 if ctx.startTS == 0 { ctx.startTS = time.Since(*t.startTime.Load()).Milliseconds() - log.Printf("start offset for track is %v", time.Duration(ctx.startTS)*time.Millisecond) + slog.Debug("start offset for track", + slog.Duration("offset", time.Duration(ctx.startTS)*time.Millisecond), + slog.String("trackID", ctx.trackID)) } else if receiveGap := time.Since(prevArrivalTime); receiveGap > audioGapThreshold { // If the last received audio packet was more than a audioGapThreshold // ago we may need to fix the RTP timestamp as some clients (e.g. Firefox) will @@ -137,9 +144,6 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri // potentially achieve more accurate synchronization. rtpGap := time.Duration((pkt.Timestamp-prevRTPTimestamp)/trackInAudioSamplesPerMs) * time.Millisecond - log.Printf("Arrival timestamp gap is %v", receiveGap) - log.Printf("RTP timestamp gap is %v", rtpGap) - if (rtpGap - receiveGap).Abs() > audioGapThreshold { // If the difference between the timestamps reported in RTP packets and // the measured time since the last received packet is greater than @@ -148,7 +152,7 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri // that we can easily keep track of separate voice sequences (e.g. caused by // muting/unmuting). gap = uint64((receiveGap.Milliseconds() / trackAudioFrameSizeMs) * trackInFrameSize) - log.Printf("fixing audio timestamp by %d", gap) + slog.Debug("fixing audio timestamp", slog.Uint64("gap", gap), slog.String("trackID", ctx.trackID)) } } @@ -156,29 +160,31 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri prevRTPTimestamp = pkt.Timestamp if err := oggWriter.WriteRTP(pkt, gap); err != nil { - log.Printf("failed to write RTP packet: %s", err) + slog.Error("failed to write RTP packet", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) } } } // handleClose will kick off post-processing of saved voice tracks. func (t *Transcriber) handleClose() error { - log.Printf("handleClose") + slog.Debug("handleClose") t.liveTracksWg.Wait() close(t.trackCtxs) - log.Printf("live tracks processing done, starting post processing") + slog.Debug("live tracks processing done, starting post processing") start := time.Now() var samplesDur time.Duration var tr transcribe.Transcription for ctx := range t.trackCtxs { - log.Printf("post processing track %s", ctx.trackID) + slog.Debug("post processing track", slog.String("trackID", ctx.trackID)) trackTr, dur, err := t.transcribeTrack(ctx) if err != nil { - log.Printf("failed to transcribe track %q: %s", ctx.trackID, err) + slog.Error("failed to transcribe track", slog.String("trackID", ctx.trackID), slog.String("err", err.Error())) continue } @@ -194,8 +200,8 @@ func (t *Transcriber) handleClose() error { } dur := time.Since(start) - log.Printf("transcription process completed for all tracks: transcribed %v of audio in %v, %0.2fx", - samplesDur, dur, samplesDur.Seconds()/dur.Seconds()) + slog.Debug(fmt.Sprintf("transcription process completed for all tracks: transcribed %v of audio in %v, %0.2fx", + samplesDur, dur, samplesDur.Seconds()/dur.Seconds())) f, err := os.OpenFile(filepath.Join(getDataDir(), fmt.Sprintf("%s-%s.vtt", t.cfg.CallID, time.Now().UTC().Format("2006-01-02-15_04_05"))), os.O_RDWR|os.O_CREATE, 0600) @@ -212,7 +218,7 @@ func (t *Transcriber) handleClose() error { return fmt.Errorf("failed to publish transcription: %w", err) } - log.Printf("transcription published successfully") + slog.Debug("transcription published successfully") return nil } @@ -246,11 +252,13 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) { } defer func() { if err := opusDec.Destroy(); err != nil { - log.Printf("failed to destroy decoder: %s", err) + slog.Error("failed to destroy decoder", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) } }() - log.Printf("decoding track %s", ctx.trackID) + slog.Debug("decoding track", slog.String("trackID", ctx.trackID)) pcmBuf := make([]float32, trackOutFrameSize) // TODO: consider pre-calculating track duration to minimize memory waste. @@ -263,7 +271,9 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) { if errors.Is(err, io.EOF) { break } - log.Printf("failed to parse off page: %s", err) + slog.Error("failed to parse ogg page", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) continue } @@ -273,7 +283,7 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) { } if hdr.GranulePosition > prevGP+uint64(trackInFrameSize) { - log.Printf("%v gap in audio samples", time.Duration((hdr.GranulePosition-prevGP)/trackInAudioSamplesPerMs)*time.Millisecond) + slog.Debug("gap in audio samples", slog.Duration("gap", time.Duration((hdr.GranulePosition-prevGP)/trackInAudioSamplesPerMs)*time.Millisecond)) samples = append(samples, trackTimedSamples{ startTS: int64(hdr.GranulePosition) / trackInAudioSamplesPerMs, }) @@ -282,7 +292,9 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) { n, err := opusDec.Decode(data, pcmBuf) if err != nil { - log.Printf("failed to decode audio data: %s", err) + slog.Error("failed to decode audio data", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) } samples[len(samples)-1].pcm = append(samples[len(samples)-1].pcm, pcmBuf[:n]...) @@ -312,7 +324,9 @@ func (t *Transcriber) transcribeTrack(ctx trackContext) (transcribe.TrackTranscr for _, ts := range samples { segments, err := transcriber.Transcribe(ts.pcm) if err != nil { - log.Printf("failed to transcribe audio samples: %s", err) + slog.Error("failed to transcribe audio samples", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) continue } diff --git a/cmd/transcriber/call/transcriber.go b/cmd/transcriber/call/transcriber.go index fd85903..1662cad 100644 --- a/cmd/transcriber/call/transcriber.go +++ b/cmd/transcriber/call/transcriber.go @@ -3,7 +3,7 @@ package call import ( "context" "fmt" - "log" + "log/slog" "sync" "sync/atomic" "time" @@ -65,7 +65,7 @@ func (t *Transcriber) Start(ctx context.Context) error { var connectOnce sync.Once connectedCh := make(chan struct{}) t.client.On(client.RTCConnectEvent, func(_ any) error { - log.Printf("transcoder RTC client connected") + slog.Debug("transcoder RTC client connected") connectOnce.Do(func() { close(connectedCh) @@ -85,7 +85,7 @@ func (t *Transcriber) Start(ctx context.Context) error { startedCh := make(chan struct{}) t.client.On(client.WSCallRecordingState, func(ctx any) error { if recState, ok := ctx.(client.CallJobState); ok && recState.StartAt > 0 { - log.Printf("received call recording state: %+v", recState) + slog.Debug("received call recording state", slog.Any("recState", recState)) // Note: recState.StartAt is the absolute timestamp of when the recording // started to process but could come from a different instance and @@ -95,7 +95,7 @@ func (t *Transcriber) Start(ctx context.Context) error { startOnce.Do(func() { // We are coupling transcribing with recording. This means that we // won't start unless a recording is on going. - log.Printf("updating startAt to be in sync with recording, startAt=%d", recState.StartAt) + slog.Debug("updating startAt to be in sync with recording", slog.Int64("startAt", recState.StartAt)) t.startTime.Store(newTimeP(time.UnixMilli(recState.StartAt))) close(startedCh) }) @@ -127,7 +127,7 @@ func (t *Transcriber) Start(ctx context.Context) error { func (t *Transcriber) Stop(ctx context.Context) error { if err := t.client.Close(); err != nil { - log.Printf("failed to close client on stop: %s", err) + slog.Error("failed to close client on stop", slog.String("err", err.Error())) } select { diff --git a/cmd/transcriber/call/utils.go b/cmd/transcriber/call/utils.go index e1791e8..c3ed2ae 100644 --- a/cmd/transcriber/call/utils.go +++ b/cmd/transcriber/call/utils.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "log" + "log/slog" "net/http" "os" "path/filepath" @@ -78,7 +78,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { for i := 0; i < maxUploadRetryAttempts; i++ { if i > 0 { - log.Printf("publishTranscription failed, reattempting in %v", uploadRetryAttemptWaitTime) + slog.Error("publishTranscription failed", slog.Duration("reattempt_time", uploadRetryAttemptWaitTime)) time.Sleep(uploadRetryAttemptWaitTime) } @@ -86,14 +86,14 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { defer cancelCtx() resp, err := t.apiClient.DoAPIRequestBytes(ctx, http.MethodPost, apiURL+"/uploads", payload, "") if err != nil { - log.Printf("failed to create upload (%d): %s", resp.StatusCode, err) + slog.Error("failed to create upload", slog.String("err", err.Error())) continue } defer resp.Body.Close() cancelCtx() if err := json.NewDecoder(resp.Body).Decode(&us); err != nil { - log.Printf("failed to decode response body: %s", err) + slog.Error("failed to decode response body", slog.String("err", err.Error())) continue } @@ -101,7 +101,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { defer cancelCtx() resp, err = t.apiClient.DoAPIRequestReader(ctx, http.MethodPost, apiURL+"/uploads/"+us.Id, f, nil) if err != nil { - log.Printf("failed to upload data (%d): %s", resp.StatusCode, err) + slog.Error("failed to upload data", slog.String("err", err.Error())) continue } defer resp.Body.Close() @@ -109,7 +109,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { var fi model.FileInfo if err := json.NewDecoder(resp.Body).Decode(&fi); err != nil { - log.Printf("failed to decode response body: %s", err) + slog.Error("failed to decode response body", slog.String("err", err.Error())) continue } @@ -119,7 +119,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { "thread_id": t.cfg.ThreadID, }) if err != nil { - log.Printf("failed to encode payload: %s", err) + slog.Error("failed to encode payload", slog.String("err", err.Error())) continue } @@ -128,7 +128,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) { defer cancelCtx() resp, err = t.apiClient.DoAPIRequestBytes(ctx, http.MethodPost, url, payload, "") if err != nil { - log.Printf("failed to post transcription (%d): %s", resp.StatusCode, err) + slog.Error("failed to post transcription", slog.String("err", err.Error())) continue } defer resp.Body.Close() diff --git a/cmd/transcriber/main.go b/cmd/transcriber/main.go index 868ec79..77e8520 100644 --- a/cmd/transcriber/main.go +++ b/cmd/transcriber/main.go @@ -3,9 +3,11 @@ package main import ( "context" "fmt" - "log" + "log/slog" "os" "os/signal" + "path/filepath" + "runtime" "syscall" "time" @@ -17,38 +19,65 @@ const ( startTimeout = 30 * time.Second ) +func slogReplaceAttr(_ []string, a slog.Attr) slog.Attr { + if a.Key == slog.SourceKey { + source := a.Value.Any().(*slog.Source) + if source.File == "" { + // Log from a dependency (e.g. rtcd client). + if pc, file, line, ok := runtime.Caller(7); ok { + if f := runtime.FuncForPC(pc); f != nil { + source.File = filepath.Base(filepath.Dir(file)) + "/" + filepath.Base(file) + source.Line = line + } + } + } else { + source.File = filepath.Base(source.File) + } + } + return a +} + func main() { - log.SetFlags(log.LstdFlags | log.Lmicroseconds) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + ReplaceAttr: slogReplaceAttr, + })) + slog.SetDefault(logger) pid := os.Getpid() if err := os.WriteFile("/tmp/transcriber.pid", []byte(fmt.Sprintf("%d", pid)), 0666); err != nil { - log.Fatalf("failed to write pid file: %s", err) + slog.Error("failed to write pid file", slog.String("err", err.Error())) + os.Exit(1) } cfg, err := config.LoadFromEnv() if err != nil { - log.Fatalf("failed to load config: %s", err) + slog.Error("failed to load config", slog.String("err", err.Error())) + os.Exit(1) } cfg.SetDefaults() transcriber, err := call.NewTranscriber(cfg) if err != nil { - log.Fatalf("failed to create call transcriber: %s", err) + slog.Error("failed to create call transcriber", slog.String("err", err.Error())) + os.Exit(1) } - log.Printf("starting transcriber") + slog.Info("starting transcriber") ctx, cancel := context.WithTimeout(context.Background(), startTimeout) defer cancel() if err := transcriber.Start(ctx); err != nil { if err := transcriber.ReportJobFailure(err.Error()); err != nil { - log.Printf("failed to report job failure: %s", err) + slog.Error("failed to report job failure", slog.String("err", err.Error())) } - log.Fatalf("failed to start transcriber: %s", err) + slog.Error("failed to start transcriber", slog.String("err", err.Error())) + os.Exit(1) } - log.Printf("transcriber has started") + slog.Info("transcriber has started") sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -56,14 +85,16 @@ func main() { select { case <-transcriber.Done(): if err := transcriber.Err(); err != nil { - log.Fatalf("transcriber failed: %s", err) + slog.Error("transcriber failed", slog.String("err", err.Error())) + os.Exit(1) } case <-sig: - log.Printf("received SIGTERM, stopping transcriber") + slog.Info("received SIGTERM, stopping transcriber") if err := transcriber.Stop(context.Background()); err != nil { - log.Fatalf("failed to stop transcriber: %s", err) + slog.Error("failed to stop transcriber", slog.String("err", err.Error())) + os.Exit(1) } } - log.Printf("transcriber has finished, exiting") + slog.Info("transcriber has finished, exiting") } diff --git a/go.mod b/go.mod index afa554f..d54e1db 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/mattermost/calls-transcriber -go 1.20 +go 1.21.3 require ( github.com/mattermost/mattermost-plugin-calls/server/public v0.0.0-20231009181626-5ffdff167863 diff --git a/go.sum b/go.sum index 14f40ff..066ca56 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -174,6 +175,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -388,6 +390,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=