From 91056956900a66c28642e5b19999e62d4db8813e Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Mon, 28 Aug 2017 11:58:51 -0700 Subject: [PATCH 1/5] Limit size of HeronTupleSet. --- .../src/cpp/config/heron-internals-config-reader.cpp | 5 +++++ .../src/cpp/config/heron-internals-config-reader.h | 3 +++ .../src/cpp/config/heron-internals-config-vars.cpp | 2 ++ .../src/cpp/config/heron-internals-config-vars.h | 3 +++ .../config/src/yaml/conf/aurora/heron_internals.yaml | 3 +++ .../src/yaml/conf/examples/heron_internals.yaml | 3 +++ .../src/yaml/conf/kubernetes/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/local/heron_internals.yaml | 3 +++ .../src/yaml/conf/localzk/heron_internals.yaml | 3 +++ .../src/yaml/conf/marathon/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/mesos/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/slurm/heron_internals.yaml | 3 +++ .../src/yaml/conf/test/test_heron_internals.yaml | 3 +++ heron/config/src/yaml/conf/yarn/heron_internals.yaml | 3 +++ heron/stmgr/src/cpp/manager/stmgr-server.cpp | 12 +++++++++++- heron/stmgr/src/cpp/manager/stmgr-server.h | 1 + 16 files changed, 55 insertions(+), 1 deletion(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index d7840482e2d..b06395b3184 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -205,6 +205,11 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolMaxMessageNumber() return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as(); } +sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() { + return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES] + .as(); +} + sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as(); } diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index 37bed019e99..93c3ef50f4d 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -156,6 +156,9 @@ class HeronInternalsConfigReader : public YamlFileReader { // The max number of messages in the memory pool for each message type sp_int32 GetHeronStreammgrMempoolMaxMessageNumber(); + // The max byte size of HeronTupleSet message in stream manager + sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes(); + // Get the Nbucket value, for efficient acknowledgement sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index addcadfbcae..069a400d76b 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB = "heron.streammgr.cache.drain.size.mb"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER = "heron.streammgr.mempool.max.message.number"; +const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES = + "heron.streammgr.herontupleset.message.max.bytes"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS = "heron.streammgr.xormgr.rotatingmap.nbuckets"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index f711a9ba673..49422de5124 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,6 +140,9 @@ class HeronInternalsConfigVars { // The max number of messages in the memory pool for each message type static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER; + // The max byte size of HeronTupleSet message in stream manager + static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES; + // For efficient acknowledgement static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS; diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index a1b77b15444..30f355d0774 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml index b592fbc5fb2..4bb96361a08 100644 --- a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml +++ b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index 56fb659901b..b96d535a811 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index 1c79d344e6a..62ceae3f332 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index 7537a96623c..08241fb1323 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -50,6 +50,9 @@ heron.streammgr.cache.drain.size.mb: 100 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # For efficient acknowledgement heron.streammgr.xormgr.rotatingmap.nbuckets: 3 diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index 027fee384df..e66ef94af78 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -128,6 +128,8 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, sp_uint64 drain_threshold_bytes = config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrStatefulBufferSizeMb() * 1_MB; + sp_uint32 max_herontupleset_size_in_bytes = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes(); stateful_gateway_ = new CheckpointGateway(drain_threshold_bytes, neighbour_calculator_, metrics_manager_client_, std::bind(&StMgrServer::DrainTupleSet, this, std::placeholders::_1, std::placeholders::_2), @@ -431,7 +433,15 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn, ->incr_by(_message->control().fails_size()); } stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message); - __global_protobuf_pool_release__(_message); + auto message_size = _message->ByteSize(); + if (message_size >= max_herontupleset_size_in_bytes) { + LOG(WARNING) << "HeronTupleSet message has size " << message_size << + " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." << + " Release to memory allocator rather than memory pool."; + delete _message; + } else { + __global_protobuf_pool_release__(_message); + } } void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h index 0faa08f6329..d91b15a8611 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.h +++ b/heron/stmgr/src/cpp/manager/stmgr-server.h @@ -201,6 +201,7 @@ class StMgrServer : public Server { heron::common::TimeSpentMetric* back_pressure_metric_initiated_; bool spouts_under_back_pressure_; + sp_uint32 max_herontupleset_size_in_bytes; // Stateful processing related member variables NeighbourCalculator* neighbour_calculator_; From 0ce41d561676581ebf321368fffcc921254cf1fa Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Mon, 28 Aug 2017 14:28:12 -0700 Subject: [PATCH 2/5] Really assign. --- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index e66ef94af78..a283f14f7e0 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -120,7 +120,8 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT, back_pressure_metric_initiated_); spouts_under_back_pressure_ = false; - + max_herontupleset_size_in_bytes = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes(); // Update queue related metrics every 10 seconds CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) { this->UpdateQueueMetrics(status); @@ -128,8 +129,6 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, sp_uint64 drain_threshold_bytes = config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrStatefulBufferSizeMb() * 1_MB; - sp_uint32 max_herontupleset_size_in_bytes = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes(); stateful_gateway_ = new CheckpointGateway(drain_threshold_bytes, neighbour_calculator_, metrics_manager_client_, std::bind(&StMgrServer::DrainTupleSet, this, std::placeholders::_1, std::placeholders::_2), From 459f7f58c3574dfa59fa9d1f91d69c6ea0e64fc6 Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Mon, 28 Aug 2017 15:49:54 -0700 Subject: [PATCH 3/5] Use SpaceUsed. --- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index a283f14f7e0..e042d67d585 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -432,7 +432,7 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn, ->incr_by(_message->control().fails_size()); } stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message); - auto message_size = _message->ByteSize(); + auto message_size = _message->SpaceUsed(); if (message_size >= max_herontupleset_size_in_bytes) { LOG(WARNING) << "HeronTupleSet message has size " << message_size << " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." << From 4558b1d3c33308a852a414d2547b5e139bebf7b0 Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Tue, 29 Aug 2017 12:05:08 -0700 Subject: [PATCH 4/5] Call SpaceUsed less often. --- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 18 ++++++++++++------ heron/stmgr/src/cpp/manager/stmgr-server.h | 1 + 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index e042d67d585..58062ad07f3 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -122,6 +122,7 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, spouts_under_back_pressure_ = false; max_herontupleset_size_in_bytes = config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes(); + space_check_counter = 0; // Update queue related metrics every 10 seconds CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) { this->UpdateQueueMetrics(status); @@ -432,12 +433,17 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn, ->incr_by(_message->control().fails_size()); } stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message); - auto message_size = _message->SpaceUsed(); - if (message_size >= max_herontupleset_size_in_bytes) { - LOG(WARNING) << "HeronTupleSet message has size " << message_size << - " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." << - " Release to memory allocator rather than memory pool."; - delete _message; + space_check_counter = (space_check_counter + 1) % 4096; + if (space_check_counter == 0) { + auto message_size = _message->SpaceUsed(); + if (message_size >= max_herontupleset_size_in_bytes) { + LOG(WARNING) << "HeronTupleSet message has size " << message_size << + " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." << + " Release to memory allocator rather than memory pool."; + delete _message; + } else { + __global_protobuf_pool_release__(_message); + } } else { __global_protobuf_pool_release__(_message); } diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h index d91b15a8611..0ad470790d4 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.h +++ b/heron/stmgr/src/cpp/manager/stmgr-server.h @@ -202,6 +202,7 @@ class StMgrServer : public Server { bool spouts_under_back_pressure_; sp_uint32 max_herontupleset_size_in_bytes; + sp_uint32 space_check_counter; // Stateful processing related member variables NeighbourCalculator* neighbour_calculator_; From c7ec49c3ff8802915951cc41374b5373b2b03c43 Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Tue, 29 Aug 2017 16:02:26 -0700 Subject: [PATCH 5/5] Check every 1024 times. --- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index 58062ad07f3..1923e6b0b5d 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -433,7 +433,7 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn, ->incr_by(_message->control().fails_size()); } stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message); - space_check_counter = (space_check_counter + 1) % 4096; + space_check_counter = (space_check_counter + 1) % 1024; if (space_check_counter == 0) { auto message_size = _message->SpaceUsed(); if (message_size >= max_herontupleset_size_in_bytes) {