Skip to content

Commit

Permalink
data nodes are implied from FreeStorageSpace and not MasterFreeStorag…
Browse files Browse the repository at this point in the history
…eSpace
  • Loading branch information
akumar1214 committed Oct 31, 2024
1 parent 78e36ac commit 2af9a46
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module ElasticGraph

def get_cluster_health: () -> ::Hash[::String, untyped]
def get_node_os_stats: () -> ::Hash[::String, untyped]
def get_node_os_roles: () -> ::Hash[::String, untyped]
def get_flat_cluster_settings: () -> ::Hash[::String, untyped]
def put_persistent_cluster_settings: (::Hash[::Symbol | ::String, untyped]) -> void

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ def get_cluster_health
def get_node_os_stats
transform_errors { |c| c.nodes.stats(metric: "os").body }
end

def get_node_roles
transform_errors { |c| c.nodes.stats(metric: "roles").body }
end

def get_flat_cluster_settings
transform_errors { |c| c.cluster.get_settings(flat_settings: true).body }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ def define_stubs(stub, requested_stubs)
stub.get("/_cluster/health") { |env| response_for(body, env) }
in :get_node_os_stats
stub.get("/_nodes/stats/os") { |env| response_for(body, env) }
in :get_node_roles
stub.get("/_nodes/stats/roles") { |env| response_for(body, env) }
in :get_flat_cluster_settings
stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) }
in :put_persistent_cluster_settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,41 +103,20 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_min_free_storage
metric_data_queries = get_data_node_ids_by_cluster_name.map(&:first).map do |cluster_name, node_id|
{
id: node_id,
metric_stat: {
metric: {
namespace: 'AWS/ES',
metric_name: 'FreeStorageSpace',
dimensions: [
{ name: 'DomainName', value: cluster_name },
{ name: 'NodeId', value: node_id }
]
},
period: 30, # seconds
stat: 'Minimum'
},
return_data: true
}
end

def get_min_free_storage
metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 900, # past 15 minutes
end_time: ::Time.now,
metric_data_queries: metric_data_queries
metric_data_queries: [
{
id: 'minFreeStorageAcrossNodes',
expression: 'SEARCH({AWS/ES,DomainName,NodeId} MetricName="FreeStorageSpace", "Minimum", 30)',
return_data: true
}
]
})

metric_response.metric_data_results.map { |result| result.values.first }.min / (1024 * 1024) # result is in bytes
end

def get_data_node_ids_by_cluster_name
@datastore_core.clients_by_name.flat_map do |name, client|
client.get_node_roles.map do |id, roles|
roles["roles"].include?("data") ? { name => id } : nil
end
end.compact
metric_response.metric_data_results.first.values.first / (1024 * 1024) # result is in bytes
end

def get_queue_attributes(queue_urls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ module ElasticGraph

def get_max_cpu_utilization: () -> ::Float
def get_min_free_storage: () -> ::Float
def get_data_node_ids_by_cluster_name: () -> ::Hash[::String, ::String]
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class IndexerAutoscalerLambda

it "resets the concurrency when free storage space drops below the minimum regardless of cpu" do
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1, minimum_free_storage - 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage - 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1),
sqs_client: sqs_client_with_number_of_messages(1),
Expand Down Expand Up @@ -218,14 +218,6 @@ def datastore_client_with_cpu_usage(percent, percent2 = percent)
}
}
}
},
get_node_roles: {
"node1" => {
"roles" => ["data"]
},
"node2" => {
"roles" => ["data"]
}
}
)
end
Expand All @@ -249,20 +241,15 @@ def lambda_client_with_concurrency(concurrency)
end
end

def cloudwatch_client_with_storage_metrics(free_storage, free_storage2 = free_storage)
def cloudwatch_client_with_storage_metrics(free_storage)
::Aws::CloudWatch::Client.new(stub_responses: true).tap do |cloudwatch_client|
cloudwatch_client.stub_responses(:get_metric_data, {
# return values are in bytes
metric_data_results: [
{
id: "node1",
id: "minFreeStorageAcrossNodes",
values: [(free_storage * 1024 * 1024).to_f],
timestamps: [::Time.parse("2024-10-30T12:00:00Z")]
},
{
id: "node2",
values: [(free_storage2 * 1024 * 1024).to_f],
timestamps: [::Time.parse("2024-10-30T12:00:00Z")]
}
]
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ def get_node_os_stats
transform_errors { |c| c.nodes.stats(metric: "os") }
end

def get_node_roles
transform_errors { |c| c.nodes.stats(metric: "roles") }
end

def get_flat_cluster_settings
transform_errors { |c| c.cluster.get_settings(flat_settings: true) }
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def define_stubs(stub, requested_stubs)
stub.get("/_cluster/health") { |env| response_for(body, env) }
in :get_node_os_stats
stub.get("/_nodes/stats/os") { |env| response_for(body, env) }
in :get_node_roles
stub.get("/_nodes/stats/roles") { |env| response_for(body, env) }
in :get_flat_cluster_settings
stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) }
in :put_persistent_cluster_settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ module ElasticGraph
expect(client.get_node_os_stats).to eq "Node stats"
end

it "supports `get_node_roles`" do
client = build_client({get_node_roles: "Node roles"})

expect(client.get_node_roles).to eq "Node roles"
end

it "supports `get_flat_cluster_settings`" do
client = build_client({get_flat_cluster_settings: "Flat cluster settings!"})

Expand Down

0 comments on commit 2af9a46

Please sign in to comment.