Skip to content

Commit

Permalink
use sem
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Dec 10, 2024
1 parent 434ed23 commit ceb8b02
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 16 deletions.
4 changes: 2 additions & 2 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ msgq_objects = env.SharedObject([
'msgq/msgq.cc',
])
msgq = env.Library('msgq', msgq_objects)
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", 'pthread', common])

# Build Vision IPC
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
Expand All @@ -40,7 +40,7 @@ envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.
LIBS=vipc_libs, FRAMEWORKS=vipc_frameworks)

if GetOption('extras'):
env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common])
env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common, 'pthread'])
env.Program(f'{visionipc_dir.abspath}/test_runner',
[f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'],
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)
Expand Down
33 changes: 19 additions & 14 deletions msgq/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@

#include "msgq/msgq.h"

void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}

uint64_t msgq_get_uid(void){
std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0, std::numeric_limits<uint32_t>::max());
Expand Down Expand Up @@ -85,7 +81,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);

std::string full_path = "/dev/shm/";
const char* prefix = std::getenv("OPENPILOT_PREFIX");
Expand Down Expand Up @@ -158,13 +153,22 @@ void msgq_init_publisher(msgq_queue_t * q) {
q->write_uid_local = uid;
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
sem_t *get_sem(msgq_queue_t *q, uint32_t tid) {
sem_t *sem = nullptr;
auto it = q->semapores.find(tid);
if (it != q->semapores.end()) {
sem = it->second;
} else {
char name[256];
snprintf(name, sizeof(name), "opmsgq-%d", tid);
sem = sem_open(name, O_CREAT, 0644, 0);
q->semapores[tid] = sem;
}
return sem;
}

static void thread_signal(msgq_queue_t *q, uint32_t tid) {
sem_post(get_sem(q, tid));
}

void msgq_init_subscriber(msgq_queue_t * q) {
Expand All @@ -190,7 +194,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
*q->read_uids[i] = 0;

// Wake up reader in case they are in a poll
thread_signal(old_uid & 0xFFFFFFFF);
thread_signal(q, old_uid);// & 0xFFFFFFFF);
}

continue;
Expand Down Expand Up @@ -295,7 +299,8 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Notify readers
for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i];
thread_signal(reader_uid & 0xFFFFFFFF);
// thread_signal(reader_uid & 0xFFFFFFFF);
thread_signal(q, reader_uid);
}

return msg->size;
Expand Down
3 changes: 3 additions & 0 deletions msgq/msgq.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <cstring>
#include <string>
#include <atomic>
#include <semaphore.h>
#include <unordered_map>

#define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024)
#define NUM_READERS 15
Expand Down Expand Up @@ -38,6 +40,7 @@ struct msgq_queue_t {

bool read_conflate;
std::string endpoint;
std::unordered_map<uint64_t, sem_t *> semapores;
};

struct msgq_msg_t {
Expand Down

0 comments on commit ceb8b02

Please sign in to comment.