diff --git a/cmd/transcriber/call/live_captions.go b/cmd/transcriber/call/live_captions.go index 9222203..7d3607d 100644 --- a/cmd/transcriber/call/live_captions.go +++ b/cmd/transcriber/call/live_captions.go @@ -117,146 +117,143 @@ func (t *Transcriber) processLiveCaptionsForTrack(ctx trackContext, pktPayloadsC // - Send the transcription to the plugin to be redistributed to clients. // - finish and wait for next `tick` - for { - select { - case <-ticker.C: - // empty the waiting pktPayloadsCh - window, err = readTrackPktPayloads(window) - if err != nil { - // exit on close - return - } - - // track how long we were waiting until consuming the next batch of audio data, as a measure - // of the pressure on the transcription process - newAudioLenMs := (len(window) - prevWindowLen) / trackOutAudioSamplesPerMs + for range ticker.C { + // empty the waiting pktPayloadsCh + window, err = readTrackPktPayloads(window) + if err != nil { + // exit on close + return + } - // If we don't have enough samples, ignore the window. - if len(window) < vadWindowSizeInSamples { - continue - } + // track how long we were waiting until consuming the next batch of audio data, as a measure + // of the pressure on the transcription process + newAudioLenMs := (len(window) - prevWindowLen) / trackOutAudioSamplesPerMs - // If there hasn't been any new pcm added, don't re-transcribe. - if len(window) == prevWindowLen { - // And clear the window if we haven't had new data (window is stale, don't re-transcribe) - if time.Since(prevAudioAt) > removeWindowAfterSilence { - window = window[:0] - prevWindowLen = 0 - prevTranscribedPos = 0 - } - continue - } + // If we don't have enough samples, ignore the window. + if len(window) < vadWindowSizeInSamples { + continue + } - // Pressure valve: - // If the transcriber machine is (even briefly) overloaded, you can get into a kind of death spiral - // where too much audio has been buffered in toBeTranscribed, and there's no way the transcriber - // can finish it all in time, and it will never be able to recover. This happens especially when - // number of calls * threads per call > numCPUs. We need to be able to relieve the pressure. - if len(window) >= windowPressureLimitSamples { + // If there hasn't been any new pcm added, don't re-transcribe. + if len(window) == prevWindowLen { + // And clear the window if we haven't had new data (window is stale, don't re-transcribe) + if time.Since(prevAudioAt) > removeWindowAfterSilence { window = window[:0] prevWindowLen = 0 prevTranscribedPos = 0 - if err := t.client.SendWs(wsEvMetric, public.MetricMsg{ - SessionID: ctx.sessionID, - MetricName: public.MetricLiveCaptionsWindowDropped, - }, false); err != nil { - slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricLiveCaptionsWindowDropped", - slog.String("err", err.Error()), - slog.String("trackID", ctx.trackID)) - } - continue } + continue + } - prevAudioAt = time.Now() - prevWindowLen = len(window) - - vadSegments, err := sd.Detect(window) - if err != nil { - slog.Error("processLiveCaptionsForTrack: vad failed", slog.String("err", err.Error())) - continue - } - if err := sd.Reset(); err != nil { - slog.Error("failed to reset speech detector", + // Pressure valve: + // If the transcriber machine is (even briefly) overloaded, you can get into a kind of death spiral + // where too much audio has been buffered in toBeTranscribed, and there's no way the transcriber + // can finish it all in time, and it will never be able to recover. This happens especially when + // number of calls * threads per call > numCPUs. We need to be able to relieve the pressure. + if len(window) >= windowPressureLimitSamples { + window = window[:0] + prevWindowLen = 0 + prevTranscribedPos = 0 + if err := t.client.SendWs(wsEvMetric, public.MetricMsg{ + SessionID: ctx.sessionID, + MetricName: public.MetricLiveCaptionsWindowDropped, + }, false); err != nil { + slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricLiveCaptionsWindowDropped", slog.String("err", err.Error()), slog.String("trackID", ctx.trackID)) } + continue + } - if len(vadSegments) == 0 { - continue - } + prevAudioAt = time.Now() + prevWindowLen = len(window) - // Prepare the vad segments and the audio for transcription. - segments := convertToSegmentSamples(vadSegments, len(window)) - segments = removeShortSpeeches(segments) - cleaned := cleanAudio(window, segments) + vadSegments, err := sd.Detect(window) + if err != nil { + slog.Error("processLiveCaptionsForTrack: vad failed", slog.String("err", err.Error())) + continue + } + if err := sd.Reset(); err != nil { + slog.Error("failed to reset speech detector", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) + } - // Before sending off data to be transcribed, check if new data is silence. - // If it is silence, don't send it off. - newDataIsSilence, windowFinished := checkSilence(segments, prevTranscribedPos) - if windowFinished { - window = window[:0] - prevTranscribedPos = 0 - prevWindowLen = 0 - continue - } - if newDataIsSilence { - continue - } + if len(vadSegments) == 0 { + continue + } + + // Prepare the vad segments and the audio for transcription. + segments := convertToSegmentSamples(vadSegments, len(window)) + segments = removeShortSpeeches(segments) + cleaned := cleanAudio(window, segments) + + // Before sending off data to be transcribed, check if new data is silence. + // If it is silence, don't send it off. + newDataIsSilence, windowFinished := checkSilence(segments, prevTranscribedPos) + if windowFinished { + window = window[:0] + prevTranscribedPos = 0 + prevWindowLen = 0 + continue + } + if newDataIsSilence { + continue + } - // Track our new position and send off data for transcription. - prevTranscribedPos = len(cleaned) - transcribedCh := make(chan string) - pkg := captionPackage{ - pcm: cleaned, - retCh: transcribedCh, + // Track our new position and send off data for transcription. + prevTranscribedPos = len(cleaned) + transcribedCh := make(chan string) + pkg := captionPackage{ + pcm: cleaned, + retCh: transcribedCh, + } + select { + case t.transcriberQueueCh <- pkg: + break + default: + if err := t.client.SendWs(wsEvMetric, public.MetricMsg{ + SessionID: ctx.sessionID, + MetricName: public.MetricLiveCaptionsTranscriberBufFull, + }, false); err != nil { + slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricTranscriberBufFull", + slog.String("err", err.Error()), + slog.String("trackID", ctx.trackID)) } + close(transcribedCh) + } + + // While audio is being transcribed, we need to cut down the window if it's > maxWindowSize. + window, prevTranscribedPos = cutWindowToSize(ctx.trackID, window, segments, prevTranscribedPos) + prevWindowLen = len(window) + + // Use a for loop and a select so that we can drop ticks waiting for the transcriber. + waitForTranscription: + for { select { - case t.transcriberQueueCh <- pkg: - break - default: - if err := t.client.SendWs(wsEvMetric, public.MetricMsg{ - SessionID: ctx.sessionID, - MetricName: public.MetricLiveCaptionsTranscriberBufFull, + case <-ticker.C: + slog.Debug("processLiveCaptionsForTrack: dropped a tick waiting for the transcriber", + slog.String("trackID", ctx.trackID)) + case text := <-transcribedCh: + if len(text) == 0 { + // Either transcribedCh was closed above (captionQueueCh full), or audio transcription failed. + // Note: this appears to happen when the transcriber fails to decode a block of audio. + // Usually the probability returned for the language is very low, which makes sense. + slog.Debug("processLiveCaptionsForTrack: received empty text, ignoring.") + break waitForTranscription + } + if err := t.client.SendWs(wsEvCaption, public.CaptionMsg{ + SessionID: ctx.sessionID, + UserID: ctx.user.Id, + Text: text, + NewAudioLenMs: float64(newAudioLenMs), }, false); err != nil { - slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricTranscriberBufFull", + slog.Error("processLiveCaptionsForTrack: error sending ws captions", slog.String("err", err.Error()), slog.String("trackID", ctx.trackID)) } - close(transcribedCh) - } - - // While audio is being transcribed, we need to cut down the window if it's > maxWindowSize. - window, prevTranscribedPos = cutWindowToSize(ctx.trackID, window, segments, prevTranscribedPos) - prevWindowLen = len(window) - - // Use a for loop and a select so that we can drop ticks waiting for the transcriber. - waitForTranscription: - for { - select { - case <-ticker.C: - slog.Debug("processLiveCaptionsForTrack: dropped a tick waiting for the transcriber", - slog.String("trackID", ctx.trackID)) - case text := <-transcribedCh: - if len(text) == 0 { - // Either transcribedCh was closed above (captionQueueCh full), or audio transcription failed. - // Note: this appears to happen when the transcriber fails to decode a block of audio. - // Usually the probability returned for the language is very low, which makes sense. - slog.Debug("processLiveCaptionsForTrack: received empty text, ignoring.") - break waitForTranscription - } - if err := t.client.SendWs(wsEvCaption, public.CaptionMsg{ - SessionID: ctx.sessionID, - UserID: ctx.user.Id, - Text: text, - NewAudioLenMs: float64(newAudioLenMs), - }, false); err != nil { - slog.Error("processLiveCaptionsForTrack: error sending ws captions", - slog.String("err", err.Error()), - slog.String("trackID", ctx.trackID)) - } - break waitForTranscription - } + break waitForTranscription } } }