Skip to content

Commit

Permalink
fix useless for select
Browse files Browse the repository at this point in the history
  • Loading branch information
cpoile committed Mar 22, 2024
1 parent e57bae6 commit 1c37d35
Showing 1 changed file with 115 additions and 118 deletions.
233 changes: 115 additions & 118 deletions cmd/transcriber/call/live_captions.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,146 +117,143 @@ func (t *Transcriber) processLiveCaptionsForTrack(ctx trackContext, pktPayloadsC
// - Send the transcription to the plugin to be redistributed to clients.
// - finish and wait for next `tick`

for {
select {
case <-ticker.C:
// empty the waiting pktPayloadsCh
window, err = readTrackPktPayloads(window)
if err != nil {
// exit on close
return
}

// track how long we were waiting until consuming the next batch of audio data, as a measure
// of the pressure on the transcription process
newAudioLenMs := (len(window) - prevWindowLen) / trackOutAudioSamplesPerMs
for range ticker.C {
// empty the waiting pktPayloadsCh
window, err = readTrackPktPayloads(window)
if err != nil {
// exit on close
return
}

// If we don't have enough samples, ignore the window.
if len(window) < vadWindowSizeInSamples {
continue
}
// track how long we were waiting until consuming the next batch of audio data, as a measure
// of the pressure on the transcription process
newAudioLenMs := (len(window) - prevWindowLen) / trackOutAudioSamplesPerMs

// If there hasn't been any new pcm added, don't re-transcribe.
if len(window) == prevWindowLen {
// And clear the window if we haven't had new data (window is stale, don't re-transcribe)
if time.Since(prevAudioAt) > removeWindowAfterSilence {
window = window[:0]
prevWindowLen = 0
prevTranscribedPos = 0
}
continue
}
// If we don't have enough samples, ignore the window.
if len(window) < vadWindowSizeInSamples {
continue
}

// Pressure valve:
// If the transcriber machine is (even briefly) overloaded, you can get into a kind of death spiral
// where too much audio has been buffered in toBeTranscribed, and there's no way the transcriber
// can finish it all in time, and it will never be able to recover. This happens especially when
// number of calls * threads per call > numCPUs. We need to be able to relieve the pressure.
if len(window) >= windowPressureLimitSamples {
// If there hasn't been any new pcm added, don't re-transcribe.
if len(window) == prevWindowLen {
// And clear the window if we haven't had new data (window is stale, don't re-transcribe)
if time.Since(prevAudioAt) > removeWindowAfterSilence {
window = window[:0]
prevWindowLen = 0
prevTranscribedPos = 0
if err := t.client.SendWs(wsEvMetric, public.MetricMsg{
SessionID: ctx.sessionID,
MetricName: public.MetricLiveCaptionsWindowDropped,
}, false); err != nil {
slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricLiveCaptionsWindowDropped",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
continue
}
continue
}

prevAudioAt = time.Now()
prevWindowLen = len(window)

vadSegments, err := sd.Detect(window)
if err != nil {
slog.Error("processLiveCaptionsForTrack: vad failed", slog.String("err", err.Error()))
continue
}
if err := sd.Reset(); err != nil {
slog.Error("failed to reset speech detector",
// Pressure valve:
// If the transcriber machine is (even briefly) overloaded, you can get into a kind of death spiral
// where too much audio has been buffered in toBeTranscribed, and there's no way the transcriber
// can finish it all in time, and it will never be able to recover. This happens especially when
// number of calls * threads per call > numCPUs. We need to be able to relieve the pressure.
if len(window) >= windowPressureLimitSamples {
window = window[:0]
prevWindowLen = 0
prevTranscribedPos = 0
if err := t.client.SendWs(wsEvMetric, public.MetricMsg{
SessionID: ctx.sessionID,
MetricName: public.MetricLiveCaptionsWindowDropped,
}, false); err != nil {
slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricLiveCaptionsWindowDropped",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
continue
}

if len(vadSegments) == 0 {
continue
}
prevAudioAt = time.Now()
prevWindowLen = len(window)

// Prepare the vad segments and the audio for transcription.
segments := convertToSegmentSamples(vadSegments, len(window))
segments = removeShortSpeeches(segments)
cleaned := cleanAudio(window, segments)
vadSegments, err := sd.Detect(window)
if err != nil {
slog.Error("processLiveCaptionsForTrack: vad failed", slog.String("err", err.Error()))
continue
}
if err := sd.Reset(); err != nil {
slog.Error("failed to reset speech detector",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}

// Before sending off data to be transcribed, check if new data is silence.
// If it is silence, don't send it off.
newDataIsSilence, windowFinished := checkSilence(segments, prevTranscribedPos)
if windowFinished {
window = window[:0]
prevTranscribedPos = 0
prevWindowLen = 0
continue
}
if newDataIsSilence {
continue
}
if len(vadSegments) == 0 {
continue
}

// Prepare the vad segments and the audio for transcription.
segments := convertToSegmentSamples(vadSegments, len(window))
segments = removeShortSpeeches(segments)
cleaned := cleanAudio(window, segments)

// Before sending off data to be transcribed, check if new data is silence.
// If it is silence, don't send it off.
newDataIsSilence, windowFinished := checkSilence(segments, prevTranscribedPos)
if windowFinished {
window = window[:0]
prevTranscribedPos = 0
prevWindowLen = 0
continue
}
if newDataIsSilence {
continue
}

// Track our new position and send off data for transcription.
prevTranscribedPos = len(cleaned)
transcribedCh := make(chan string)
pkg := captionPackage{
pcm: cleaned,
retCh: transcribedCh,
// Track our new position and send off data for transcription.
prevTranscribedPos = len(cleaned)
transcribedCh := make(chan string)
pkg := captionPackage{
pcm: cleaned,
retCh: transcribedCh,
}
select {
case t.transcriberQueueCh <- pkg:
break
default:
if err := t.client.SendWs(wsEvMetric, public.MetricMsg{
SessionID: ctx.sessionID,
MetricName: public.MetricLiveCaptionsTranscriberBufFull,
}, false); err != nil {
slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricTranscriberBufFull",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
close(transcribedCh)
}

// While audio is being transcribed, we need to cut down the window if it's > maxWindowSize.
window, prevTranscribedPos = cutWindowToSize(ctx.trackID, window, segments, prevTranscribedPos)
prevWindowLen = len(window)

// Use a for loop and a select so that we can drop ticks waiting for the transcriber.
waitForTranscription:
for {
select {
case t.transcriberQueueCh <- pkg:
break
default:
if err := t.client.SendWs(wsEvMetric, public.MetricMsg{
SessionID: ctx.sessionID,
MetricName: public.MetricLiveCaptionsTranscriberBufFull,
case <-ticker.C:
slog.Debug("processLiveCaptionsForTrack: dropped a tick waiting for the transcriber",
slog.String("trackID", ctx.trackID))
case text := <-transcribedCh:
if len(text) == 0 {
// Either transcribedCh was closed above (captionQueueCh full), or audio transcription failed.
// Note: this appears to happen when the transcriber fails to decode a block of audio.
// Usually the probability returned for the language is very low, which makes sense.
slog.Debug("processLiveCaptionsForTrack: received empty text, ignoring.")
break waitForTranscription
}
if err := t.client.SendWs(wsEvCaption, public.CaptionMsg{
SessionID: ctx.sessionID,
UserID: ctx.user.Id,
Text: text,
NewAudioLenMs: float64(newAudioLenMs),
}, false); err != nil {
slog.Error("processLiveCaptionsForTrack: error sending wsEvMetric MetricTranscriberBufFull",
slog.Error("processLiveCaptionsForTrack: error sending ws captions",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}
close(transcribedCh)
}

// While audio is being transcribed, we need to cut down the window if it's > maxWindowSize.
window, prevTranscribedPos = cutWindowToSize(ctx.trackID, window, segments, prevTranscribedPos)
prevWindowLen = len(window)

// Use a for loop and a select so that we can drop ticks waiting for the transcriber.
waitForTranscription:
for {
select {
case <-ticker.C:
slog.Debug("processLiveCaptionsForTrack: dropped a tick waiting for the transcriber",
slog.String("trackID", ctx.trackID))
case text := <-transcribedCh:
if len(text) == 0 {
// Either transcribedCh was closed above (captionQueueCh full), or audio transcription failed.
// Note: this appears to happen when the transcriber fails to decode a block of audio.
// Usually the probability returned for the language is very low, which makes sense.
slog.Debug("processLiveCaptionsForTrack: received empty text, ignoring.")
break waitForTranscription
}
if err := t.client.SendWs(wsEvCaption, public.CaptionMsg{
SessionID: ctx.sessionID,
UserID: ctx.user.Id,
Text: text,
NewAudioLenMs: float64(newAudioLenMs),
}, false); err != nil {
slog.Error("processLiveCaptionsForTrack: error sending ws captions",
slog.String("err", err.Error()),
slog.String("trackID", ctx.trackID))
}

break waitForTranscription
}
break waitForTranscription
}
}
}
Expand Down

0 comments on commit 1c37d35

Please sign in to comment.