Skip to content

Commit

Permalink
Merge pull request fluent#4578 from Athishpranav2003/tail-throttling-…
Browse files Browse the repository at this point in the history
…metrics

Add throttling metrics
  • Loading branch information
ashie authored Aug 7, 2024
2 parents 5c03b1e + 18ef7e6 commit 5e75248
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 21 deletions.
13 changes: 9 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TailInput < Fluent::Plugin::Input
helpers :timer, :event_loop, :parser, :compat_parameters

RESERVED_CHARS = ['/', '*', '%'].freeze
MetricsInfo = Struct.new(:opened, :closed, :rotated)
MetricsInfo = Struct.new(:opened, :closed, :rotated, :throttled)

class WatcherSetupError < StandardError
def initialize(msg)
Expand Down Expand Up @@ -208,7 +208,8 @@ def configure(conf)
opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
@metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
throttling_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_throttled_total", help_text: "Total number of times throttling occurs per file when throttling enabled")
@metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics, throttling_metrics)
end

def configure_tag
Expand Down Expand Up @@ -793,6 +794,7 @@ def statistics
'opened_file_count' => @metrics.opened.get,
'closed_file_count' => @metrics.closed.get,
'rotated_file_count' => @metrics.rotated.get,
'throttled_log_count' => @metrics.throttled.get,
})
}
stats
Expand Down Expand Up @@ -1192,8 +1194,10 @@ def should_shutdown_now?
end

def handle_notify
return if limit_bytes_per_second_reached?
return if group_watcher&.limit_lines_reached?(@path)
if limit_bytes_per_second_reached? || group_watcher&.limit_lines_reached?(@path)
@metrics.throttled.inc
return
end

with_io do |io|
begin
Expand All @@ -1220,6 +1224,7 @@ def handle_notify

if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
@metrics.throttled.inc if group_watcher_limit || limit_bytes_per_second_reached?
read_more = false
break
end
Expand Down
4 changes: 3 additions & 1 deletion test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def setup
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
throttling_metrics = Fluent::Plugin::LocalMetrics.new
throttling_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics, throttling_metrics)
yield
end
end
Expand Down
16 changes: 0 additions & 16 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1878,13 +1878,6 @@ def test_z_refresh_watchers
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log)
@loop = Coolio::Loop.new
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

Timecop.freeze(2010, 1, 2, 3, 4, 5) do
Expand Down Expand Up @@ -2245,15 +2238,6 @@ def test_should_close_watcher_after_rotate_wait
config = common_follow_inode_config + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"})

d = create_driver(config, false)
d.instance.instance_eval do
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f|
f.puts "test1"
Expand Down

0 comments on commit 5e75248

Please sign in to comment.