-
Notifications
You must be signed in to change notification settings - Fork 594
Add signal handling to stream manager #2950
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,12 @@ void EventLoopImpl::eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void | |
el->handleWriteCallback(fd, event); | ||
} | ||
|
||
// 'C' style callback for libevent on signal events | ||
void EventLoopImpl::eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg) { | ||
auto* el = reinterpret_cast<EventLoopImpl*>(arg); | ||
el->handleSignalCallback(sig, event); | ||
} | ||
|
||
// 'C' style callback for libevent on timer events | ||
void EventLoopImpl::eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg) { | ||
// TODO(vikasr): this needs to change to VCallback | ||
|
@@ -83,6 +89,40 @@ void EventLoopImpl::loop() { | |
|
||
int EventLoopImpl::loopExit() { return event_base_loopbreak(mDispatcher); } | ||
|
||
int EventLoopImpl::registerSignal(int sig, VCallback<EventLoop::Status> cb) { | ||
// Create the appropriate structures and init them. | ||
auto* event = new SS_RegisteredEvent<sp_int32>(sig, false, std::move(cb), -1); | ||
evsignal_set(event->event(), sig, &EventLoopImpl::eventLoopImplSignalCallback, this); | ||
if (event_base_set(mDispatcher, event->event()) < 0) { | ||
delete event; | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
|
||
// Now add it to the list of signals monitored by the mDispatcher | ||
if (event_add(event->event(), NULL) < 0) { | ||
delete event; | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
mSignalEvents[sig] = event; | ||
return 0; | ||
} | ||
|
||
int EventLoopImpl::unRegisterSignal(int sig) { | ||
if (mSignalEvents.find(sig) == mSignalEvents.end()) { | ||
// This signal wasn't registed. Hence we can't unregister it. | ||
return -1; | ||
} | ||
|
||
// Delete the underlying event in libevent | ||
if (event_del(mSignalEvents[sig]->event()) != 0) { | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
auto* event = mSignalEvents[sig]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need an auto* ? just auto event will be fine, isn't? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh yes I think you're right. That * is useless. |
||
mSignalEvents.erase(sig); | ||
delete event; | ||
return 0; | ||
} | ||
|
||
int EventLoopImpl::registerForRead(int fd, VCallback<EventLoop::Status> cb, bool persistent) { | ||
return registerForRead(fd, std::move(cb), persistent, -1); | ||
} | ||
|
@@ -136,8 +176,9 @@ int EventLoopImpl::unRegisterForRead(int fd) { | |
// cout << "event_del failed for fd " << fd; | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
delete mReadEvents[fd]; | ||
auto* event = mReadEvents[fd]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here and other places. |
||
mReadEvents.erase(fd); | ||
delete event; | ||
return 0; | ||
} | ||
|
||
|
@@ -195,8 +236,9 @@ int EventLoopImpl::unRegisterForWrite(int fd) { | |
// cout << "event_del failed for fd " << fd; | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
delete mWriteEvents[fd]; | ||
auto* event = mWriteEvents[fd]; | ||
mWriteEvents.erase(fd); | ||
delete event; | ||
return 0; | ||
} | ||
|
||
|
@@ -251,8 +293,9 @@ sp_int32 EventLoopImpl::unRegisterTimer(sp_int64 timerId) { | |
// cout << "event_del failed for timer " << timerId; | ||
throw heron::error::Error_Exception(errno); | ||
} | ||
delete mTimerEvents[timerId]; | ||
auto* event = mTimerEvents[timerId]; | ||
mTimerEvents.erase(timerId); | ||
delete event; | ||
return 0; | ||
} | ||
|
||
|
@@ -373,6 +416,16 @@ void EventLoopImpl::handleTimerCallback(sp_int16 event, sp_int64 timerId) { | |
} | ||
} | ||
|
||
void EventLoopImpl::handleSignalCallback(sp_int32 sig, sp_int16 event) { | ||
if (mSignalEvents.find(sig) == mSignalEvents.end()) { | ||
// This is possible when unRegisterSignal has been called before we handle this event | ||
// Just ignore this event. | ||
return; | ||
} | ||
auto* registeredEvent = mSignalEvents[sig]; | ||
registeredEvent->get_callback()(mapStatusCode(event)); | ||
} | ||
|
||
EventLoop::Status EventLoopImpl::mapStatusCode(sp_int16 event) { | ||
switch (event) { | ||
case EV_READ: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,12 @@ DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr"); | |
DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr"); | ||
DEFINE_string(metricscachemgr_mode, "disabled", "MetricsCacheMgr mode, default `disabled`"); | ||
|
||
EventLoopImpl ss; | ||
|
||
void sigtermHandler(int signum) { | ||
ss.loopExit(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit concerned that if loopExit() is safe to call here.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the sigtermHandler should run the in the same thread as any other callback function and loopExit() directly calls event_base_loopbreak() which I think should be safe to call. Here in the libevent book they call event_base_loopbreak() in a very similar way. But maybe someone with more experience with libevent could give some input on that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nwangtw - stmgr runs in a single thread. hence loopExit is fine in the context of sigterm handler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nwangtw - stmgr runs in a single thread. hence loopExit is fine in the context of sigterm handler. |
||
} | ||
|
||
int main(int argc, char* argv[]) { | ||
gflags::ParseCommandLineFlags(&argc, &argv, true); | ||
|
||
|
@@ -56,8 +62,6 @@ int main(int argc, char* argv[]) { | |
} | ||
std::vector<std::string> instances = StrUtils::split(FLAGS_instance_ids, ","); | ||
|
||
EventLoopImpl ss; | ||
|
||
// Read heron internals config from local file | ||
// Create the heron-internals-config-reader to read the heron internals config | ||
heron::config::HeronInternalsConfigReader::Create(&ss, | ||
|
@@ -85,6 +89,7 @@ int main(int argc, char* argv[]) { | |
FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id, | ||
high_watermark, low_watermark, FLAGS_metricscachemgr_mode); | ||
mgr.Init(); | ||
ss.registerSignal(SIGTERM, &sigtermHandler); | ||
ss.loop(); | ||
return 0; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this function hooked up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not actually used yet. I included it, because other events also have a unregister function and it might be useful sometime in the future.
I can also remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk. It is ok to leave it there to me. Let's see if @srkukarni has any input.