From 776b99c5bf6dd289e3afe29d48cd91c058de150a Mon Sep 17 00:00:00 2001 From: Glorfischi Date: Wed, 4 Jul 2018 15:51:30 +0200 Subject: [PATCH 1/3] Added signal handeling to the eventloop and added SIGTERM handler to the stream manager --- .../src/cpp/network/event_loop_impl.cpp | 49 +++++++++++++++++++ .../common/src/cpp/network/event_loop_impl.h | 9 ++++ heron/stmgr/src/cpp/server/stmgr-main.cpp | 9 +++- 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/heron/common/src/cpp/network/event_loop_impl.cpp b/heron/common/src/cpp/network/event_loop_impl.cpp index c37fc7ef0ad..a584108bf28 100644 --- a/heron/common/src/cpp/network/event_loop_impl.cpp +++ b/heron/common/src/cpp/network/event_loop_impl.cpp @@ -37,6 +37,12 @@ void EventLoopImpl::eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void el->handleWriteCallback(fd, event); } +// 'C' style callback for libevent on signal events +void EventLoopImpl::eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg) { + auto* el = reinterpret_cast(arg); + el->handleSignalCallback(sig, event); +} + // 'C' style callback for libevent on timer events void EventLoopImpl::eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg) { // TODO(vikasr): this needs to change to VCallback @@ -83,6 +89,39 @@ void EventLoopImpl::loop() { int EventLoopImpl::loopExit() { return event_base_loopbreak(mDispatcher); } +int EventLoopImpl::registerSignal(int sig, VCallback cb) { + // Create the appropriate structures and init them. + auto* event = new SS_RegisteredEvent(sig, false, std::move(cb), -1); + evsignal_set(event->event(), sig, &EventLoopImpl::eventLoopImplSignalCallback, this); + if (event_base_set(mDispatcher, event->event()) < 0) { + delete event; + throw heron::error::Error_Exception(errno); + } + + // Now add it to the list of signals monitored by the mDispatcher + if (event_add(event->event(), NULL) < 0) { + delete event; + throw heron::error::Error_Exception(errno); + } + mSignalEvents[sig] = event; + return 0; +} + +int EventLoopImpl::unRegisterSignal(int sig) { + if (mSignalEvents.find(sig) == mSignalEvents.end()) { + // This signal wasn't registed. Hence we can't unregister it. + return -1; + } + + // Delete the underlying event in libevent + if (event_del(mSignalEvents[sig]->event()) != 0) { + throw heron::error::Error_Exception(errno); + } + delete mSignalEvents[sig]; + mSignalEvents.erase(sig); + return 0; +} + int EventLoopImpl::registerForRead(int fd, VCallback cb, bool persistent) { return registerForRead(fd, std::move(cb), persistent, -1); } @@ -373,6 +412,16 @@ void EventLoopImpl::handleTimerCallback(sp_int16 event, sp_int64 timerId) { } } +void EventLoopImpl::handleSignalCallback(sp_int32 sig, sp_int16 event) { + if (mSignalEvents.find(sig) == mSignalEvents.end()) { + // This is possible when unRegisterSignal has been called before we handle this event + // Just ignore this event. + return; + } + auto* registeredEvent = mSignalEvents[sig]; + registeredEvent->get_callback()(mapStatusCode(event)); +} + EventLoop::Status EventLoopImpl::mapStatusCode(sp_int16 event) { switch (event) { case EV_READ: diff --git a/heron/common/src/cpp/network/event_loop_impl.h b/heron/common/src/cpp/network/event_loop_impl.h index c807ff39ce3..b85406e240c 100644 --- a/heron/common/src/cpp/network/event_loop_impl.h +++ b/heron/common/src/cpp/network/event_loop_impl.h @@ -44,6 +44,8 @@ class EventLoopImpl : public EventLoop { // Methods inherited from EventLoop. virtual void loop(); virtual sp_int32 loopExit(); + virtual sp_int32 registerSignal(sp_int32 sig, VCallback cb); + virtual sp_int32 unRegisterSignal(sp_int32 sig); virtual sp_int32 registerForRead(sp_int32 fd, VCallback cb, bool persistent, sp_int64 timeoutMicroSecs); virtual sp_int32 registerForRead(sp_int32 fd, VCallback cb, bool persistent); @@ -63,6 +65,7 @@ class EventLoopImpl : public EventLoop { // Static member functions to interact with C libevent API static void eventLoopImplReadCallback(sp_int32 fd, sp_int16 event, void* arg); static void eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void* arg); + static void eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg); static void eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg); private: @@ -81,6 +84,9 @@ class EventLoopImpl : public EventLoop { // libevent callback on timer events. void handleTimerCallback(sp_int16 event, sp_int64 timerid); + // libevent callback on signal events. + void handleSignalCallback(sp_int32 sig, sp_int16 event); + // The underlying dispatcher that we wrap around. struct event_base* mDispatcher; @@ -93,6 +99,9 @@ class EventLoopImpl : public EventLoop { // The registered timers. std::unordered_map*> mTimerEvents; + // The registered signals. + std::unordered_map*> mSignalEvents; + // The registered instant callbacks typedef std::list> OrderedCallbackList; OrderedCallbackList mListInstantCallbacks; diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp index 371be9db9cc..c6e36a68887 100644 --- a/heron/stmgr/src/cpp/server/stmgr-main.cpp +++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp @@ -48,6 +48,12 @@ DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr"); DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr"); DEFINE_string(metricscachemgr_mode, "disabled", "MetricsCacheMgr mode, default `disabled`"); +EventLoopImpl ss; + +void sigtermHandler(int signum) { + ss.loopExit(); +} + int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -56,8 +62,6 @@ int main(int argc, char* argv[]) { } std::vector instances = StrUtils::split(FLAGS_instance_ids, ","); - EventLoopImpl ss; - // Read heron internals config from local file // Create the heron-internals-config-reader to read the heron internals config heron::config::HeronInternalsConfigReader::Create(&ss, @@ -85,6 +89,7 @@ int main(int argc, char* argv[]) { FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id, high_watermark, low_watermark, FLAGS_metricscachemgr_mode); mgr.Init(); + ss.registerSignal(SIGTERM, &sigtermHandler); ss.loop(); return 0; } From 81c27e3ae8d17bf67e513d2baabae7dce69949da Mon Sep 17 00:00:00 2001 From: Glorfischi Date: Fri, 20 Jul 2018 13:30:53 +0200 Subject: [PATCH 2/3] Fixed possible bugs at the unregister functions, by making sure that the event is erased from the map before deleting it. --- heron/common/src/cpp/network/event_loop_impl.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/heron/common/src/cpp/network/event_loop_impl.cpp b/heron/common/src/cpp/network/event_loop_impl.cpp index a584108bf28..4e473c6928b 100644 --- a/heron/common/src/cpp/network/event_loop_impl.cpp +++ b/heron/common/src/cpp/network/event_loop_impl.cpp @@ -117,8 +117,9 @@ int EventLoopImpl::unRegisterSignal(int sig) { if (event_del(mSignalEvents[sig]->event()) != 0) { throw heron::error::Error_Exception(errno); } - delete mSignalEvents[sig]; + auto* event = mSignalEvents[sig]; mSignalEvents.erase(sig); + delete event; return 0; } @@ -175,8 +176,9 @@ int EventLoopImpl::unRegisterForRead(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - delete mReadEvents[fd]; + auto* event = mReadEvents[fd]; mReadEvents.erase(fd); + delete event; return 0; } @@ -234,8 +236,9 @@ int EventLoopImpl::unRegisterForWrite(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - delete mWriteEvents[fd]; + auto* event = mWriteEvents[fd]; mWriteEvents.erase(fd); + delete event; return 0; } @@ -290,8 +293,9 @@ sp_int32 EventLoopImpl::unRegisterTimer(sp_int64 timerId) { // cout << "event_del failed for timer " << timerId; throw heron::error::Error_Exception(errno); } - delete mTimerEvents[timerId]; + auto* event = mTimerEvents[timerId]; mTimerEvents.erase(timerId); + delete event; return 0; } From e38928cc7ef3b9ddf6d14caf2ca59c73753b9fc3 Mon Sep 17 00:00:00 2001 From: Glorfischi Date: Mon, 23 Jul 2018 12:00:32 +0200 Subject: [PATCH 3/3] Removed unnecessary * --- heron/common/src/cpp/network/event_loop_impl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/heron/common/src/cpp/network/event_loop_impl.cpp b/heron/common/src/cpp/network/event_loop_impl.cpp index 4e473c6928b..24859055958 100644 --- a/heron/common/src/cpp/network/event_loop_impl.cpp +++ b/heron/common/src/cpp/network/event_loop_impl.cpp @@ -117,7 +117,7 @@ int EventLoopImpl::unRegisterSignal(int sig) { if (event_del(mSignalEvents[sig]->event()) != 0) { throw heron::error::Error_Exception(errno); } - auto* event = mSignalEvents[sig]; + auto event = mSignalEvents[sig]; mSignalEvents.erase(sig); delete event; return 0; @@ -176,7 +176,7 @@ int EventLoopImpl::unRegisterForRead(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - auto* event = mReadEvents[fd]; + auto event = mReadEvents[fd]; mReadEvents.erase(fd); delete event; return 0; @@ -236,7 +236,7 @@ int EventLoopImpl::unRegisterForWrite(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - auto* event = mWriteEvents[fd]; + auto event = mWriteEvents[fd]; mWriteEvents.erase(fd); delete event; return 0; @@ -293,7 +293,7 @@ sp_int32 EventLoopImpl::unRegisterTimer(sp_int64 timerId) { // cout << "event_del failed for timer " << timerId; throw heron::error::Error_Exception(errno); } - auto* event = mTimerEvents[timerId]; + auto event = mTimerEvents[timerId]; mTimerEvents.erase(timerId); delete event; return 0;