Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Limit size of HeronTupleSet. #2253

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolMaxMessageNumber()
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES]
.as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as<int>();
}
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class HeronInternalsConfigReader : public YamlFileReader {
// The max number of messages in the memory pool for each message type
sp_int32 GetHeronStreammgrMempoolMaxMessageNumber();

// The max byte size of HeronTupleSet message in stream manager
sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes();

// Get the Nbucket value, for efficient acknowledgement
sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets();

Expand Down
2 changes: 2 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB =
"heron.streammgr.cache.drain.size.mb";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER =
"heron.streammgr.mempool.max.message.number";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES =
"heron.streammgr.herontupleset.message.max.bytes";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS =
"heron.streammgr.xormgr.rotatingmap.nbuckets";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS =
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class HeronInternalsConfigVars {
// The max number of messages in the memory pool for each message type
static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER;

// The max byte size of HeronTupleSet message in stream manager
static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES;

// For efficient acknowledgement
static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS;

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/aurora/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/examples/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/local/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/localzk/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/marathon/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/mesos/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/slurm/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/test/test_heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ heron.streammgr.cache.drain.size.mb: 100
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# For efficient acknowledgement
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/yarn/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
19 changes: 17 additions & 2 deletions heron/stmgr/src/cpp/manager/stmgr-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options,
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
back_pressure_metric_initiated_);
spouts_under_back_pressure_ = false;

max_herontupleset_size_in_bytes =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes();
space_check_counter = 0;
// Update queue related metrics every 10 seconds
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateQueueMetrics(status);
Expand Down Expand Up @@ -431,7 +433,20 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn,
->incr_by(_message->control().fails_size());
}
stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message);
__global_protobuf_pool_release__(_message);
space_check_counter = (space_check_counter + 1) % 1024;
if (space_check_counter == 0) {
auto message_size = _message->SpaceUsed();
if (message_size >= max_herontupleset_size_in_bytes) {
LOG(WARNING) << "HeronTupleSet message has size " << message_size <<
" bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." <<
" Release to memory allocator rather than memory pool.";
delete _message;
} else {
__global_protobuf_pool_release__(_message);
}
} else {
__global_protobuf_pool_release__(_message);
}
}

void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
Expand Down
2 changes: 2 additions & 0 deletions heron/stmgr/src/cpp/manager/stmgr-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class StMgrServer : public Server {
heron::common::TimeSpentMetric* back_pressure_metric_initiated_;

bool spouts_under_back_pressure_;
sp_uint32 max_herontupleset_size_in_bytes;
sp_uint32 space_check_counter;

// Stateful processing related member variables
NeighbourCalculator* neighbour_calculator_;
Expand Down