diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index d7840482e2d..b06395b3184 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -205,6 +205,11 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolMaxMessageNumber() return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as(); } +sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() { + return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES] + .as(); +} + sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as(); } diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index 37bed019e99..93c3ef50f4d 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -156,6 +156,9 @@ class HeronInternalsConfigReader : public YamlFileReader { // The max number of messages in the memory pool for each message type sp_int32 GetHeronStreammgrMempoolMaxMessageNumber(); + // The max byte size of HeronTupleSet message in stream manager + sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes(); + // Get the Nbucket value, for efficient acknowledgement sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index addcadfbcae..069a400d76b 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB = "heron.streammgr.cache.drain.size.mb"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER = "heron.streammgr.mempool.max.message.number"; +const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES = + "heron.streammgr.herontupleset.message.max.bytes"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS = "heron.streammgr.xormgr.rotatingmap.nbuckets"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index f711a9ba673..49422de5124 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,6 +140,9 @@ class HeronInternalsConfigVars { // The max number of messages in the memory pool for each message type static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER; + // The max byte size of HeronTupleSet message in stream manager + static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES; + // For efficient acknowledgement static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS; diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index a1b77b15444..30f355d0774 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml index b592fbc5fb2..4bb96361a08 100644 --- a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml +++ b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index 56fb659901b..b96d535a811 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index 1c79d344e6a..62ceae3f332 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index 7537a96623c..08241fb1323 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -50,6 +50,9 @@ heron.streammgr.cache.drain.size.mb: 100 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # For efficient acknowledgement heron.streammgr.xormgr.rotatingmap.nbuckets: 3 diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index a1b77b15444..b56780d9bd8 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # The max number of messages in the memory pool for each message type heron.streammgr.mempool.max.message.number: 512 +# The max byte size of a HeronTupleSet message allowed in memory pool +heron.streammgr.herontupleset.message.max.bytes: 83886080 + # The max reconnect attempts to other stream managers for stream manager client heron.streammgr.client.reconnect.max.attempts: 300 diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index 027fee384df..1923e6b0b5d 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -120,7 +120,9 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT, back_pressure_metric_initiated_); spouts_under_back_pressure_ = false; - + max_herontupleset_size_in_bytes = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes(); + space_check_counter = 0; // Update queue related metrics every 10 seconds CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) { this->UpdateQueueMetrics(status); @@ -431,7 +433,20 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn, ->incr_by(_message->control().fails_size()); } stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message); - __global_protobuf_pool_release__(_message); + space_check_counter = (space_check_counter + 1) % 1024; + if (space_check_counter == 0) { + auto message_size = _message->SpaceUsed(); + if (message_size >= max_herontupleset_size_in_bytes) { + LOG(WARNING) << "HeronTupleSet message has size " << message_size << + " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." << + " Release to memory allocator rather than memory pool."; + delete _message; + } else { + __global_protobuf_pool_release__(_message); + } + } else { + __global_protobuf_pool_release__(_message); + } } void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h index 0faa08f6329..0ad470790d4 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.h +++ b/heron/stmgr/src/cpp/manager/stmgr-server.h @@ -201,6 +201,8 @@ class StMgrServer : public Server { heron::common::TimeSpentMetric* back_pressure_metric_initiated_; bool spouts_under_back_pressure_; + sp_uint32 max_herontupleset_size_in_bytes; + sp_uint32 space_check_counter; // Stateful processing related member variables NeighbourCalculator* neighbour_calculator_;