From ceb8b02462ad1ed3fd44b04a184f5b4d1c2f03cd Mon Sep 17 00:00:00 2001 From: deanlee Date: Tue, 10 Dec 2024 23:14:16 +0800 Subject: [PATCH] use sem --- SConscript | 4 ++-- msgq/msgq.cc | 33 +++++++++++++++++++-------------- msgq/msgq.h | 3 +++ 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/SConscript b/SConscript index 147eb304..b723fea0 100644 --- a/SConscript +++ b/SConscript @@ -15,7 +15,7 @@ msgq_objects = env.SharedObject([ 'msgq/msgq.cc', ]) msgq = env.Library('msgq', msgq_objects) -msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common]) +msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", 'pthread', common]) # Build Vision IPC vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc'] @@ -40,7 +40,7 @@ envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir. LIBS=vipc_libs, FRAMEWORKS=vipc_frameworks) if GetOption('extras'): - env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common]) + env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common, 'pthread']) env.Program(f'{visionipc_dir.abspath}/test_runner', [f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'], LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks) diff --git a/msgq/msgq.cc b/msgq/msgq.cc index 5ce25a3b..601964ad 100644 --- a/msgq/msgq.cc +++ b/msgq/msgq.cc @@ -25,10 +25,6 @@ #include "msgq/msgq.h" -void sigusr2_handler(int signal) { - assert(signal == SIGUSR2); -} - uint64_t msgq_get_uid(void){ std::random_device rd("/dev/urandom"); std::uniform_int_distribution distribution(0, std::numeric_limits::max()); @@ -85,7 +81,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes - std::signal(SIGUSR2, sigusr2_handler); std::string full_path = "/dev/shm/"; const char* prefix = std::getenv("OPENPILOT_PREFIX"); @@ -158,13 +153,22 @@ void msgq_init_publisher(msgq_queue_t * q) { q->write_uid_local = uid; } -static void thread_signal(uint32_t tid) { - #ifndef SYS_tkill - // TODO: this won't work for multithreaded programs - kill(tid, SIGUSR2); - #else - syscall(SYS_tkill, tid, SIGUSR2); - #endif +sem_t *get_sem(msgq_queue_t *q, uint32_t tid) { + sem_t *sem = nullptr; + auto it = q->semapores.find(tid); + if (it != q->semapores.end()) { + sem = it->second; + } else { + char name[256]; + snprintf(name, sizeof(name), "opmsgq-%d", tid); + sem = sem_open(name, O_CREAT, 0644, 0); + q->semapores[tid] = sem; + } + return sem; +} + +static void thread_signal(msgq_queue_t *q, uint32_t tid) { + sem_post(get_sem(q, tid)); } void msgq_init_subscriber(msgq_queue_t * q) { @@ -190,7 +194,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { *q->read_uids[i] = 0; // Wake up reader in case they are in a poll - thread_signal(old_uid & 0xFFFFFFFF); + thread_signal(q, old_uid);// & 0xFFFFFFFF); } continue; @@ -295,7 +299,8 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ // Notify readers for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - thread_signal(reader_uid & 0xFFFFFFFF); + // thread_signal(reader_uid & 0xFFFFFFFF); + thread_signal(q, reader_uid); } return msg->size; diff --git a/msgq/msgq.h b/msgq/msgq.h index 94e18494..29e0170f 100644 --- a/msgq/msgq.h +++ b/msgq/msgq.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024) #define NUM_READERS 15 @@ -38,6 +40,7 @@ struct msgq_queue_t { bool read_conflate; std::string endpoint; + std::unordered_map semapores; }; struct msgq_msg_t {