Skip to content

Commit

Permalink
Remodel threading to use a global thread pool so it never block on th…
Browse files Browse the repository at this point in the history
…e main thread. Bump emscripten to 3.1.65
  • Loading branch information
msqr1 committed Aug 27, 2024
1 parent 53262fe commit c4ff62f
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 81 deletions.
2 changes: 1 addition & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
## Shared interface
| Function/Object | Description |
|---|---|
| ```delete()``` | Delete this object (call C++ destructor), see [why](https://emscripten.org/docs/getting_started/FAQ.html#what-does-exiting-the-runtime-mean-why-don-t-atexit-s-run) this is neccessary. |
| ```delete()``` | Delete this object (call C++ destructor), see [why](https://emscripten.org/docs/getting_started/FAQ.html#what-does-exiting-the-runtime-mean-why-don-t-atexit-s-run) this is neccessary. For recognizers, make sure they finished recognizing before deleting them |

## ```Module``` object
| Function/Object | Description |
Expand Down
2 changes: 1 addition & 1 deletion Examples/Vosklet.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Examples/fromMic.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].0/examples/Vosklet.min.js" async defer></script>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].1/examples/Vosklet.min.js" async defer></script>
<script>
async function start() {
// Make sure sample rate matches that in the training data
Expand Down
2 changes: 1 addition & 1 deletion Examples/fromWav.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].0/examples/Vosklet.min.js" async defer></script>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].1/examples/Vosklet.min.js" async defer></script>
<script>
async function start() {
// Make sure sample rate matches that in the training data
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].0/examples/Vosklet.min.js" async defer></script>
<script src="https://cdn.jsdelivr.net/gh/msqr1/[email protected].1/examples/Vosklet.min.js" async defer></script>
<script>
async function start() {
// Make sure sample rate matches that in the training data
Expand Down
2 changes: 1 addition & 1 deletion Vosklet.js

Large diffs are not rendered by default.

17 changes: 6 additions & 11 deletions src/CommonModel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

CommonModel::CommonModel(int index, bool normalMdl, std::string storepath, std::string id, int tarStart, int tarSize) :
normalMdl{normalMdl}, index{index},
t{&CommonModel::extractAndLoad, this, reinterpret_cast<unsigned char*>(tarStart), tarSize},
storepath{std::move(storepath)},
id{std::move(id)} {}
id{std::move(id)}
{
globalPool.exec([this, tarStart, tarSize]{
extractAndLoad(reinterpret_cast<unsigned char*>(tarStart), tarSize);
});
}
void CommonModel::extractAndLoad(unsigned char* tar, int tarSize) {
int res{untar(tar, tarSize, storepath)};
free(tar);
Expand All @@ -30,20 +34,11 @@ void CommonModel::extractAndLoad(unsigned char* tar, int tarSize) {
if(normalMdl ? std::get<VoskModel*>(mdl) != nullptr : std::get<VoskSpkModel*>(mdl) != nullptr) fireEv(index, "0");
else fireEv(index, "Unable to load model for recognition");
fs::remove_all(storepath);

// Wait for potential recognizer usage
thrdUsed.wait(false, std::memory_order_relaxed);
if(recognizerMain) recognizerMain();
}
int CommonModel::findWord(std::string word) {
return vosk_model_find_word(std::get<VoskModel*>(mdl), word.c_str());
}
CommonModel::~CommonModel() {
if(normalMdl) vosk_model_free(std::get<VoskModel*>(mdl));
else vosk_spk_model_free(std::get<VoskSpkModel*>(mdl));
if(t.joinable()) {
thrdUsed.store(true, std::memory_order_relaxed);
thrdUsed.notify_one();
t.join();
}
}
5 changes: 0 additions & 5 deletions src/CommonModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@
#include "Util.h"

#include <vosk_api.h>
extern void free(void*);
struct CommonModel {
bool normalMdl;
std::atomic_bool thrdUsed{};
int index;
std::thread t;
std::function<void()> recognizerMain;
std::string storepath;
std::string id;
std::variant<VoskModel*, VoskSpkModel*> mdl;

void extractAndLoad(unsigned char* tarStart, int tarSize);
int findWord(std::string word);
CommonModel(int index, bool normalMdl, std::string storepath, std::string id, int tarStart, int tarSize);
Expand Down
63 changes: 24 additions & 39 deletions src/Recognizer.cc
Original file line number Diff line number Diff line change
@@ -1,65 +1,50 @@
#include "Recognizer.h"

#include "emscripten/atomic.h"
Recognizer::Recognizer(int index, float sampleRate, CommonModel* model) :
index{index},
t{std::move(model->t)},
rec{vosk_recognizer_new(std::get<VoskModel*>(model->mdl), sampleRate)} {
rec{vosk_recognizer_new(std::get<VoskModel*>(model->mdl), sampleRate)}
{
finishConstruction(model);
}
Recognizer::Recognizer(int index, float sampleRate, CommonModel* model, CommonModel* spkModel) :
index{index},
t{std::move(model->t)},
rec{vosk_recognizer_new_spk(std::get<VoskModel*>(model->mdl), sampleRate, std::get<VoskSpkModel*>(spkModel->mdl))} {
finishConstruction(model, spkModel);
}
Recognizer::Recognizer(int index, float sampleRate, CommonModel* model, const std::string& grm, int) :
index{index},
t{std::move(model->t)},
rec{vosk_recognizer_new_grm(std::get<VoskModel*>(model->mdl), sampleRate, grm.c_str())} {
finishConstruction(model);
}
Recognizer::~Recognizer() {
vosk_recognizer_free(rec);
if(t.joinable()) {
done = true;
blocker.store(true, std::memory_order_relaxed);
blocker.notify_one();
t.join();
done = true;
emscripten_atomic_store_u32(&haveData, true);
emscripten_atomic_notify(&haveData, 1);
while(!dataQ.empty()) {
free(dataQ.front().data);
dataQ.pop();
}
vosk_recognizer_free(rec);
}
void Recognizer::finishConstruction(CommonModel* model, CommonModel* spkModel) {
if(rec == nullptr) {
fireEv(index, "Unable to initialize recognizer");
return;
}
if(!model->thrdUsed.load(std::memory_order_relaxed)) {
model->recognizerMain = [this]{main();};
model->thrdUsed.store(true, std::memory_order_relaxed);
model->thrdUsed.notify_one();
return;
}
if(spkModel != nullptr && !spkModel->thrdUsed.load(std::memory_order_relaxed)) {
spkModel->recognizerMain = [this]{main();};
spkModel->thrdUsed.store(true, std::memory_order_relaxed);
spkModel->thrdUsed.notify_one();
return;
}
t = std::thread{&Recognizer::main, this};
if(rec == nullptr) fireEv(index, "Unable to initialize recognizer");
else globalPool.exec([this]{main();});
}
void Recognizer::main() {
fireEv(index, "0");
while(!done) {
blocker.wait(false, std::memory_order_relaxed);
blocker.store(false, std::memory_order_relaxed);
while(!dataQ.empty()) {
if(dataQ.empty()) {
emscripten_atomic_store_u32(&haveData, false);
emscripten_atomic_wait_u32(&haveData, false, -1);
}
else {
AudioData& next = dataQ.front();
// If done we free the rest of the data by skipping this block
switch(vosk_recognizer_accept_waveform_f(rec, next.data, next.len)) {
case 0: [[likely]]
fireEv(index, vosk_recognizer_partial_result(rec), "partialResult");
break;
case 1: [[unlikely]]
fireEv(index, vosk_recognizer_result(rec), "result");
case 0: [[likely]]
fireEv(index, vosk_recognizer_partial_result(rec), "partialResult");
break;
case 1: [[unlikely]]
fireEv(index, vosk_recognizer_result(rec), "result");
}
free(next.data);
dataQ.pop();
Expand All @@ -68,8 +53,8 @@ void Recognizer::main() {
}
void Recognizer::pushData(int start, int len) {
dataQ.emplace(start, len);
blocker.store(true, std::memory_order_relaxed);
blocker.notify_one();
emscripten_atomic_store_u32(&haveData, true);
emscripten_atomic_notify(&haveData, 1);
}
void Recognizer::reset() {
vosk_recognizer_reset(rec);
Expand Down
8 changes: 1 addition & 7 deletions src/Recognizer.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
#pragma once
#include "CommonModel.h"

#include <queue>

#include <emscripten/console.h>

// Prevent naming conflicts with Vosk's Recognizer class
#define Recognizer Recognizer_

struct Recognizer {
int haveData{};
bool done{};
std::atomic_bool blocker{};
int index;
std::thread t;
VoskRecognizer* rec;
std::queue<AudioData> dataQ;
Recognizer(int index, float sampleRate, CommonModel* model);
Expand Down
41 changes: 40 additions & 1 deletion src/Util.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "Util.h"

#include <emscripten/em_asm.h>
ThreadPool globalPool;
void fireEv(int index, const char* content, const char* type) {
MAIN_THREAD_EM_ASM({
objs[$0].dispatchEvent(new CustomEvent($2 === 0 ? "0" : UTF8ToString($2), { "detail" : UTF8ToString($1) }));
Expand Down Expand Up @@ -44,4 +45,42 @@ int untar(unsigned char* tar, int tarSize, const std::string& storepath) {
}
}
return Successful;
}
void Thread::startup(ThreadPool* pool) {
while(!pool->done) {
// Wait until unlocked
emscripten_atomic_wait_u32(&pool->qLock, true, -1);
if(pool->done) break;
// If there is no task then everyone has to wait until there is more
if(pool->taskQ.empty()) {
emscripten_atomic_store_u32(&pool->qLock, true);
continue;
}
// If this locks, the returned (loaded) value will be false, and we move on
if(emscripten_atomic_cas_u32(&pool->qLock, false, true)) continue;
fn = pool->taskQ.front();
pool->taskQ.pop();
// Unlock
emscripten_atomic_store_u32(&pool->qLock, false);
emscripten_atomic_notify(&pool->qLock, 1);
fn();
}
}
ThreadPool::ThreadPool() {
for(Thread& thrd : threads) {
thrd.handle = std::thread{&Thread::startup, &thrd, this};
}
}
ThreadPool::~ThreadPool() {
done = true;
emscripten_atomic_store_u32(&qLock, false);
emscripten_atomic_notify(&qLock, -1);
for(Thread& thrd : threads) {
thrd.handle.detach();
}
}
void ThreadPool::exec(std::function<void()> fn) {
taskQ.emplace(fn);
emscripten_atomic_store_u32(&qLock, false);
emscripten_atomic_notify(&qLock, 1);
}
26 changes: 24 additions & 2 deletions src/Util.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include <thread>
#include <filesystem>
#include <functional>
#include <variant>
#include <fstream>
#include <emscripten/em_asm.h>
#include <emscripten/atomic.h>
#include <emscripten/console.h>

namespace fs = std::filesystem;

struct AudioData {
float* data;
int len;
Expand All @@ -21,5 +22,26 @@ enum UntarStatus {
FailedWrite,
FailedClose
};
struct ThreadPool;
struct Thread {
std::thread handle;
std::function<void()> fn;
void startup(ThreadPool* pool);
};
struct ThreadPool {
bool qLock{true}; // True is locked, false is unlocked
bool done{};
std::queue<std::function<void()>> taskQ;
#ifndef MAX_THREADS
#define MAX_THREADS 1
#endif
std::array<Thread, MAX_THREADS> threads;
#undef MAX_THREADS
ThreadPool();
~ThreadPool();
void exec(std::function<void()> fn);
};

extern ThreadPool globalPool;
void fireEv(int index, const char* content, const char* type = nullptr);
int untar(unsigned char* tar, int tarSize, const std::string& storepath);
6 changes: 3 additions & 3 deletions src/make
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ if [ $EMSDK = ../emsdk ] && [ ! -d $EMSDK ]; then
echo "Installing emsdk + Emscripten..."
git clone --depth=1 https://github.com/emscripten-core/emsdk.git ../emsdk &&
cd ../emsdk &&
./emsdk install 3.1.64 &&
./emsdk activate 3.1.64
./emsdk install 3.1.65 &&
./emsdk activate 3.1.65
fi
. $(realpath $EMSDK)/emsdk_env.sh &&
export PATH=:$PATH:$(realpath $EMSDK)/upstream/bin &&
Expand Down Expand Up @@ -73,7 +73,7 @@ if [ ! -d $VOSK ]; then
fi

cd $SRC &&
em++ Util.cc CommonModel.cc Recognizer.cc Bindings.cc -O3 -sWASMFS -sWASM_BIGINT -sSINGLE_FILE -sMODULARIZE -sEMBIND_STD_STRING_IS_UTF8 -sPTHREAD_POOL_DELAY_LOAD -sALLOW_MEMORY_GROWTH -sTEXTDECODER=2 -sPTHREAD_POOL_SIZE_STRICT=2 -sINITIAL_MEMORY=$INITIAL_MEMORY -sPTHREAD_POOL_SIZE=$MAX_THREADS -sPOLYFILL=0 -sEXIT_RUNTIME=0 -sINVOKE_RUN=0 -sSUPPORT_LONGJMP=0 -sEXPORTED_FUNCTIONS=_malloc -sEXPORT_NAME=loadVosklet -sMALLOC=emmalloc -sEXPORTED_RUNTIME_METHODS=UTF8ToString,stringToUTF8OnStack -sENVIRONMENT=web,worker -I. -I"$VOSK"/src -L$KALDI/src -l:online2/kaldi-online2.a -l:decoder/kaldi-decoder.a -l:ivector/kaldi-ivector.a -l:gmm/kaldi-gmm.a -l:tree/kaldi-tree.a -l:feat/kaldi-feat.a -l:cudamatrix/kaldi-cudamatrix.a -l:lat/kaldi-lat.a -l:lm/kaldi-lm.a -l:rnnlm/kaldi-rnnlm.a -l:hmm/kaldi-hmm.a -l:nnet3/kaldi-nnet3.a -l:transform/kaldi-transform.a -l:matrix/kaldi-matrix.a -l:fstext/kaldi-fstext.a -l:util/kaldi-util.a -l:base/kaldi-base.a -L"$OPENFST"/lib -l:libfst.a -l:libfstngram.a -L"$CLAPACK_WASM" -l:CBLAS/lib/cblas.a -l:CLAPACK-3.2.1/lapack.a -l:CLAPACK-3.2.1/libcblaswr.a -l:f2c_BLAS-3.8.0/blas.a -l:libf2c/libf2c.a -L$VOSK/src -l:vosk.a -lembind -pthread -flto -msimd128 -mreference-types -mnontrapping-fptoint -mextended-const -msign-ext -mmutable-globals --pre-js Wrapper.js -o ../Vosklet.js &&
em++ Util.cc CommonModel.cc Recognizer.cc Bindings.cc -O3 -Wno-pthreads-mem-growth -DEMSCRIPTEN_HAS_UNBOUND_TYPE_NAMES=0 -fno-rtti -DMAX_THREADS=$MAX_THREADS -sWASMFS -sWASM_BIGINT -sSINGLE_FILE -sMODULARIZE -sEMBIND_STD_STRING_IS_UTF8 -sPTHREAD_POOL_DELAY_LOAD -sTEXTDECODER=2 -sPTHREAD_POOL_SIZE_STRICT=2 -sINITIAL_MEMORY=$INITIAL_MEMORY -sALLOW_MEMORY_GROWTH -sPTHREAD_POOL_SIZE=$MAX_THREADS -sPOLYFILL=0 -sEXIT_RUNTIME=0 -sINVOKE_RUN=0 -sSUPPORT_LONGJMP=0 -sALLOW_BLOCKING_ON_MAIN_THREAD=0 -sEXPORTED_FUNCTIONS=_malloc -sEXPORT_NAME=loadVosklet -sMALLOC=emmalloc -sEXPORTED_RUNTIME_METHODS=UTF8ToString,stringToUTF8OnStack -sENVIRONMENT=web,worker -I. -I$VOSK/src -L$KALDI/src -l:online2/kaldi-online2.a -l:decoder/kaldi-decoder.a -l:ivector/kaldi-ivector.a -l:gmm/kaldi-gmm.a -l:tree/kaldi-tree.a -l:feat/kaldi-feat.a -l:cudamatrix/kaldi-cudamatrix.a -l:lat/kaldi-lat.a -l:lm/kaldi-lm.a -l:rnnlm/kaldi-rnnlm.a -l:hmm/kaldi-hmm.a -l:nnet3/kaldi-nnet3.a -l:transform/kaldi-transform.a -l:matrix/kaldi-matrix.a -l:fstext/kaldi-fstext.a -l:util/kaldi-util.a -l:base/kaldi-base.a -L$OPENFST/lib -l:libfst.a -l:libfstngram.a -L$CLAPACK_WASM -l:CBLAS/lib/cblas.a -l:CLAPACK-3.2.1/lapack.a -l:CLAPACK-3.2.1/libcblaswr.a -l:f2c_BLAS-3.8.0/blas.a -l:libf2c/libf2c.a -L$VOSK/src -l:vosk.a -lembind -pthread -flto -msimd128 -mreference-types -mnontrapping-fptoint -mextended-const -msign-ext -mmutable-globals --pre-js Wrapper.js -o ../Vosklet.js &&
cd .. &&
rm -f Vosklet.worker.js

Expand Down
Loading

0 comments on commit c4ff62f

Please sign in to comment.