Skip to content

Commit

Permalink
[MM-54143] Add structured logging (#2)
Browse files Browse the repository at this point in the history
* Add structured logging

* Bump Golang in CI
  • Loading branch information
streamer45 authored Oct 12, 2023
1 parent ca6cd3d commit cb2b277
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 59 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:

strategy:
matrix:
go-version: [1.20.x]
go-version: [1.21.x]

runs-on: ubuntu-latest

Expand All @@ -38,7 +38,7 @@ jobs:

strategy:
matrix:
go-version: [1.20.x]
go-version: [1.21.x]

runs-on: ubuntu-latest

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ DOCKER_REGISTRY_REPO ?= mattermost/${APP_NAME}-daily
DOCKER_USER ?= user
DOCKER_PASSWORD ?= password
## Docker Images
DOCKER_IMAGE_GO += "golang:${GO_VERSION}@sha256:dd9ad81920b63c7f9f18823d888d5fdcc7e7516086fd16654d07bc437f0e2427"
DOCKER_IMAGE_GOLINT += "golangci/golangci-lint:v1.52.2@sha256:5fa6a92ab28ca3421c88d2b6cd794c9759d05a999aceca73053d014aad41b9d3"
DOCKER_IMAGE_GO += "golang:${GO_VERSION}@sha256:27b021393d0e0dfffc6cd6cca5e7836ac59f5ac98724c5d6b3b0a82199d275c5"
DOCKER_IMAGE_GOLINT += "golangci/golangci-lint:v1.54.2@sha256:abe731fe6bb335a30eab303a41dd5c2b630bb174372a4da08e3d42eab5324127"
DOCKER_IMAGE_DOCKERLINT += "hadolint/hadolint:v2.9.2@sha256:d355bd7df747a0f124f3b5e7b21e9dafd0cb19732a276f901f0fdee243ec1f3b"
DOCKER_IMAGE_COSIGN += "bitnami/cosign:1.8.0@sha256:8c2c61c546258fffff18b47bb82a65af6142007306b737129a7bd5429d53629a"
DOCKER_IMAGE_GH_CLI += "registry.internal.mattermost.com/images/build-ci:3.16.0@sha256:f6a229a9ababef3c483f237805ee4c3dbfb63f5de4fbbf58f4c4b6ed8fcd34b6"
Expand Down
70 changes: 42 additions & 28 deletions cmd/transcriber/call/tracks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -59,11 +59,11 @@ func (t *Transcriber) handleTrack(ctx any) error {
return fmt.Errorf("failed to parse track ID: %w", err)
}
if trackType != client.TrackTypeVoice {
log.Printf("ignoring non voice track")
slog.Debug("ignoring non voice track", slog.String("trackID", trackID))
return nil
}
if mt := track.Codec().MimeType; mt != webrtc.MimeTypeOpus {
log.Printf("ignoring unsupported mimetype %s for track %s", mt, trackID)
slog.Warn("ignoring unsupported mimetype for track", slog.String("mimeType", mt), slog.String("trackID", trackID))
return nil
}

Expand All @@ -89,21 +89,24 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri
filename: filepath.Join(getDataDir(), fmt.Sprintf("%s_%s.ogg", user.Id, sessionID)),
}

log.Printf("processing voice track of %s", user.Username)
log.Printf("start reading loop for track %s", ctx.trackID)
slog.Debug("processing voice track",
slog.String("username", user.Username),
slog.String("sessionID", sessionID),
slog.String("trackID", ctx.trackID))
slog.Debug("start reading loop for track", slog.String("trackID", ctx.trackID))
defer func() {
log.Printf("exiting reading loop for track %s", ctx.trackID)
slog.Debug("exiting reading loop for track", slog.String("trackID", ctx.trackID))
select {
case t.trackCtxs <- ctx:
default:
log.Printf("failed to enqueue track context: %+v", ctx)
slog.Error("failed to enqueue track context", slog.Any("ctx", ctx))
}
t.liveTracksWg.Done()
}()

oggWriter, err := ogg.NewWriter(ctx.filename, trackInAudioRate, trackAudioChannels)
if err != nil {
log.Printf("failed to created ogg writer: %s", err)
slog.Error("failed to created ogg writer", slog.String("err", err.Error()), slog.String("trackID", ctx.trackID))
return
}
defer oggWriter.Close()
Expand All @@ -114,7 +117,9 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri
pkt, _, readErr := track.ReadRTP()
if readErr != nil {
if !errors.Is(readErr, io.EOF) {
log.Printf("failed to read RTP packet for track %s", ctx.trackID)
slog.Error("failed to read RTP packet for track",
slog.String("err", readErr.Error()),
slog.String("trackID", ctx.trackID))
}
return
}
Expand All @@ -127,7 +132,9 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri
var gap uint64
if ctx.startTS == 0 {
ctx.startTS = time.Since(*t.startTime.Load()).Milliseconds()
log.Printf("start offset for track is %v", time.Duration(ctx.startTS)*time.Millisecond)
slog.Debug("start offset for track",
slog.Duration("offset", time.Duration(ctx.startTS)*time.Millisecond),
slog.String("trackID", ctx.trackID))
} else if receiveGap := time.Since(prevArrivalTime); receiveGap > audioGapThreshold {
// If the last received audio packet was more than a audioGapThreshold
// ago we may need to fix the RTP timestamp as some clients (e.g. Firefox) will
Expand All @@ -137,9 +144,6 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri
// potentially achieve more accurate synchronization.
rtpGap := time.Duration((pkt.Timestamp-prevRTPTimestamp)/trackInAudioSamplesPerMs) * time.Millisecond

log.Printf("Arrival timestamp gap is %v", receiveGap)
log.Printf("RTP timestamp gap is %v", rtpGap)

if (rtpGap - receiveGap).Abs() > audioGapThreshold {
// If the difference between the timestamps reported in RTP packets and
// the measured time since the last received packet is greater than
Expand All @@ -148,37 +152,39 @@ func (t *Transcriber) processLiveTrack(track *webrtc.TrackRemote, sessionID stri
// that we can easily keep track of separate voice sequences (e.g. caused by
// muting/unmuting).
gap = uint64((receiveGap.Milliseconds() / trackAudioFrameSizeMs) * trackInFrameSize)
log.Printf("fixing audio timestamp by %d", gap)
slog.Debug("fixing audio timestamp", slog.Uint64("gap", gap), slog.String("trackID", ctx.trackID))
}
}

prevArrivalTime = time.Now()
prevRTPTimestamp = pkt.Timestamp

if err := oggWriter.WriteRTP(pkt, gap); err != nil {
log.Printf("failed to write RTP packet: %s", err)
slog.Error("failed to write RTP packet",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
}
}

// handleClose will kick off post-processing of saved voice tracks.
func (t *Transcriber) handleClose() error {
log.Printf("handleClose")
slog.Debug("handleClose")

t.liveTracksWg.Wait()
close(t.trackCtxs)

log.Printf("live tracks processing done, starting post processing")
slog.Debug("live tracks processing done, starting post processing")
start := time.Now()

var samplesDur time.Duration
var tr transcribe.Transcription
for ctx := range t.trackCtxs {
log.Printf("post processing track %s", ctx.trackID)
slog.Debug("post processing track", slog.String("trackID", ctx.trackID))

trackTr, dur, err := t.transcribeTrack(ctx)
if err != nil {
log.Printf("failed to transcribe track %q: %s", ctx.trackID, err)
slog.Error("failed to transcribe track", slog.String("trackID", ctx.trackID), slog.String("err", err.Error()))
continue
}

Expand All @@ -194,8 +200,8 @@ func (t *Transcriber) handleClose() error {
}

dur := time.Since(start)
log.Printf("transcription process completed for all tracks: transcribed %v of audio in %v, %0.2fx",
samplesDur, dur, samplesDur.Seconds()/dur.Seconds())
slog.Debug(fmt.Sprintf("transcription process completed for all tracks: transcribed %v of audio in %v, %0.2fx",
samplesDur, dur, samplesDur.Seconds()/dur.Seconds()))

f, err := os.OpenFile(filepath.Join(getDataDir(), fmt.Sprintf("%s-%s.vtt",
t.cfg.CallID, time.Now().UTC().Format("2006-01-02-15_04_05"))), os.O_RDWR|os.O_CREATE, 0600)
Expand All @@ -212,7 +218,7 @@ func (t *Transcriber) handleClose() error {
return fmt.Errorf("failed to publish transcription: %w", err)
}

log.Printf("transcription published successfully")
slog.Debug("transcription published successfully")

return nil
}
Expand Down Expand Up @@ -246,11 +252,13 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) {
}
defer func() {
if err := opusDec.Destroy(); err != nil {
log.Printf("failed to destroy decoder: %s", err)
slog.Error("failed to destroy decoder",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
}()

log.Printf("decoding track %s", ctx.trackID)
slog.Debug("decoding track", slog.String("trackID", ctx.trackID))

pcmBuf := make([]float32, trackOutFrameSize)
// TODO: consider pre-calculating track duration to minimize memory waste.
Expand All @@ -263,7 +271,9 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) {
if errors.Is(err, io.EOF) {
break
}
log.Printf("failed to parse off page: %s", err)
slog.Error("failed to parse ogg page",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
continue
}

Expand All @@ -273,7 +283,7 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) {
}

if hdr.GranulePosition > prevGP+uint64(trackInFrameSize) {
log.Printf("%v gap in audio samples", time.Duration((hdr.GranulePosition-prevGP)/trackInAudioSamplesPerMs)*time.Millisecond)
slog.Debug("gap in audio samples", slog.Duration("gap", time.Duration((hdr.GranulePosition-prevGP)/trackInAudioSamplesPerMs)*time.Millisecond))
samples = append(samples, trackTimedSamples{
startTS: int64(hdr.GranulePosition) / trackInAudioSamplesPerMs,
})
Expand All @@ -282,7 +292,9 @@ func (ctx trackContext) decodeAudio() ([]trackTimedSamples, error) {

n, err := opusDec.Decode(data, pcmBuf)
if err != nil {
log.Printf("failed to decode audio data: %s", err)
slog.Error("failed to decode audio data",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}

samples[len(samples)-1].pcm = append(samples[len(samples)-1].pcm, pcmBuf[:n]...)
Expand Down Expand Up @@ -312,7 +324,9 @@ func (t *Transcriber) transcribeTrack(ctx trackContext) (transcribe.TrackTranscr
for _, ts := range samples {
segments, err := transcriber.Transcribe(ts.pcm)
if err != nil {
log.Printf("failed to transcribe audio samples: %s", err)
slog.Error("failed to transcribe audio samples",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
continue
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/transcriber/call/transcriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package call
import (
"context"
"fmt"
"log"
"log/slog"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -65,7 +65,7 @@ func (t *Transcriber) Start(ctx context.Context) error {
var connectOnce sync.Once
connectedCh := make(chan struct{})
t.client.On(client.RTCConnectEvent, func(_ any) error {
log.Printf("transcoder RTC client connected")
slog.Debug("transcoder RTC client connected")

connectOnce.Do(func() {
close(connectedCh)
Expand All @@ -85,7 +85,7 @@ func (t *Transcriber) Start(ctx context.Context) error {
startedCh := make(chan struct{})
t.client.On(client.WSCallRecordingState, func(ctx any) error {
if recState, ok := ctx.(client.CallJobState); ok && recState.StartAt > 0 {
log.Printf("received call recording state: %+v", recState)
slog.Debug("received call recording state", slog.Any("recState", recState))

// Note: recState.StartAt is the absolute timestamp of when the recording
// started to process but could come from a different instance and
Expand All @@ -95,7 +95,7 @@ func (t *Transcriber) Start(ctx context.Context) error {
startOnce.Do(func() {
// We are coupling transcribing with recording. This means that we
// won't start unless a recording is on going.
log.Printf("updating startAt to be in sync with recording, startAt=%d", recState.StartAt)
slog.Debug("updating startAt to be in sync with recording", slog.Int64("startAt", recState.StartAt))
t.startTime.Store(newTimeP(time.UnixMilli(recState.StartAt)))
close(startedCh)
})
Expand Down Expand Up @@ -127,7 +127,7 @@ func (t *Transcriber) Start(ctx context.Context) error {

func (t *Transcriber) Stop(ctx context.Context) error {
if err := t.client.Close(); err != nil {
log.Printf("failed to close client on stop: %s", err)
slog.Error("failed to close client on stop", slog.String("err", err.Error()))
}

select {
Expand Down
16 changes: 8 additions & 8 deletions cmd/transcriber/call/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -78,38 +78,38 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) {

for i := 0; i < maxUploadRetryAttempts; i++ {
if i > 0 {
log.Printf("publishTranscription failed, reattempting in %v", uploadRetryAttemptWaitTime)
slog.Error("publishTranscription failed", slog.Duration("reattempt_time", uploadRetryAttemptWaitTime))
time.Sleep(uploadRetryAttemptWaitTime)
}

ctx, cancelCtx := context.WithTimeout(context.Background(), httpRequestTimeout)
defer cancelCtx()
resp, err := t.apiClient.DoAPIRequestBytes(ctx, http.MethodPost, apiURL+"/uploads", payload, "")
if err != nil {
log.Printf("failed to create upload (%d): %s", resp.StatusCode, err)
slog.Error("failed to create upload", slog.String("err", err.Error()))
continue
}
defer resp.Body.Close()
cancelCtx()

if err := json.NewDecoder(resp.Body).Decode(&us); err != nil {
log.Printf("failed to decode response body: %s", err)
slog.Error("failed to decode response body", slog.String("err", err.Error()))
continue
}

ctx, cancelCtx = context.WithTimeout(context.Background(), httpUploadTimeout)
defer cancelCtx()
resp, err = t.apiClient.DoAPIRequestReader(ctx, http.MethodPost, apiURL+"/uploads/"+us.Id, f, nil)
if err != nil {
log.Printf("failed to upload data (%d): %s", resp.StatusCode, err)
slog.Error("failed to upload data", slog.String("err", err.Error()))
continue
}
defer resp.Body.Close()
cancelCtx()

var fi model.FileInfo
if err := json.NewDecoder(resp.Body).Decode(&fi); err != nil {
log.Printf("failed to decode response body: %s", err)
slog.Error("failed to decode response body", slog.String("err", err.Error()))
continue
}

Expand All @@ -119,7 +119,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) {
"thread_id": t.cfg.ThreadID,
})
if err != nil {
log.Printf("failed to encode payload: %s", err)
slog.Error("failed to encode payload", slog.String("err", err.Error()))
continue
}

Expand All @@ -128,7 +128,7 @@ func (t *Transcriber) publishTranscription(f *os.File) (err error) {
defer cancelCtx()
resp, err = t.apiClient.DoAPIRequestBytes(ctx, http.MethodPost, url, payload, "")
if err != nil {
log.Printf("failed to post transcription (%d): %s", resp.StatusCode, err)
slog.Error("failed to post transcription", slog.String("err", err.Error()))
continue
}
defer resp.Body.Close()
Expand Down
Loading

0 comments on commit cb2b277

Please sign in to comment.