Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt_proxy: Execute retry_connect in main thread to prevent updating @_retrying race condition #22

Closed

Conversation

cosmo0920
Copy link
Contributor

timer_execute will execute registered method in main thread.
Prevous implementation will causes unhandled retrying with the following error sequence:

  1. send_packet(MQTT::Packet::Subscribe) in mqtt_proxy.rb (running on @connection_thread)
  2. When https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L554, and then keep_alive! method failed in client.rb (running on @read_thread)
  3. IOError occurred in https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L556 in mqtt_proxy.rb (running on @connection_thread)
  4. Executed #retry_connect in mqtt_proxy.rb (running on @connection_thread)
  5. Before reached in https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L93, @retry_interval second(s) passes in mqtt_proxy.rb (running on @connection_thread)
  6. During 5. execution, https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L152 is executed in main thread
  7. Then, retry is disabled.

In short, @connection_thread in mqtt_proxy.rb and main thread will
causes updating @_retrying race condition.

To prevent this race condition, we should use timer_execute for
#retry_connect to execute retrying connection operation on main thread
not executing it on @connection_thread.
This race condition will not happen when every running retrying operation
runs on main thread.

Signed-off-by: Hiroshi Hatake [email protected]

timer_execute will execute registered method in main thread.
Prevous implementation will causes unhandled retrying with the following error sequence:

1. send_packet(MQTT::Packet::Subscribe) in mqtt_proxy.rb (running on `@connection_thread`)
2. When https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L554, and then keep_alive! method failed in client.rb (running on `@read_thread`)
3. IOError occurred in https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L556 in mqtt_proxy.rb (running on `@connection_thread`)
4. Executed #retry_connect in mqtt_proxy.rb (running on `@connection_thread`)
5. Before reached in https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L93, `@retry_interval` second(s) passes in mqtt_proxy.rb (running on `@connection_thread`)
6. During 5. execution, https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L152 is executed in main thread
7. Then, retry is disabled.

In short, `@connection_thread` in mqtt_proxy.rb and main thread will
causes updating `@_retrying` race condition.

To prevent this race condition, we should use `timer_execute` for
`#retry_connect` to execute retrying connection operation on main thread
not executing it on `@connection_thread`.
This race condition will not happen when every running retrying operation
runs on main thread.

Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920
Copy link
Contributor Author

@toyokazu Any thoughts?

@toyokazu
Copy link
Owner

Thank you for your indication and sorry for being late to reply.
As you indicated, the current implementation unnaturally complies with the fluentd thread management. When fluentd was updated to v1.0, I tried to use plugin helper APIs including thread_create and timer_execute but it failed. Current implementation keeps "somehow working" status ;(. It should be fixed by redesigning the procedure.

The point is that the plugin must manage the thread @read_thread created by ruby-mqtt. In my understanding, by using timer_execute, a new thread is created at the specified interval from the main thread and it will not fit the purpose of the plugin. In order to keep monitoring the status of the thread, it is better to create a proxy thread at the start phase and keep it until the end of the life of the plugin.

As you indicate, a thread created by the main thread itself cannot rescue MQTT::ProtocolException but a child thread of that thread can rescue the Exception. Based on that, I modified the plugin and committed to develop/0.5.x branch. How do you think? If you like, I’d like to merge it to the master branch.

I could not reproduce a race condition as you indicated, so I don’t know the modification can satisfy your requirements or not. If you have some description to reproduce the condition, please provide me.

Furthermore, the previous implementation has an issue if it uses proxy services like SSH port forwarding because a proxy service can accept a new connection and after that, it returns an error packet. It causes short-term reconnection when the MQTT server is down and it also prevents reconnection after the server starts up. I could not find a way to detect such a situation only from Exception and I newly added :max_retry_freq option for detecting it. Please also give your comment on that function.

Best Regards

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Dec 17, 2020

In my understanding, by using timer_execute, a new thread is created at the specified interval from the main thread

timer_execute doesn't create a new thread, just defers a task on the main thread.

timer_execute: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/timer.rb#L33-L42
event_loop_attach: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/event_loop.rb#L38-L44

There is no creating a new thread.
Just register deferred context on the main thread.

Fluentd's plugin helper creates a thread when users use thread plugin helper.
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/thread.rb

I could not reproduce a race condition as you indicated, so I don’t know the modification can satisfy your requirements or not. If you have some description to reproduce the condition, please provide me.

I used the following flood of messages creation script:

require 'mqtt'

opts = {
  host: '127.0.0.1',
  port: 1883,
  client_id: "test-client",
  clean_session: true,
  keep_alive: 15
}
client = MQTT::Client.new(opts)
client.connect
while true
  start = Time.now
  35000.times do
    client.publish('test', "a" * 4000)
  end
  now = Time.now
  diff = start - now
  sleep(1 - diff)
end

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Dec 17, 2020

It causes short-term reconnection when the MQTT server is down and it also prevents reconnection after the server starts up. I could not find a way to detect such a situation only from Exception and I newly added :max_retry_freq option for detecting it. Please also give your comment on that function.

I tried develop/0.5.x branch with modified ruby-mqtt which is applied the following patches:

And I also applied a patch to handle publish response as keep alive response in this plugin(in high load environment keep alive response sometimes causes MQTT::ProtocolException due to timeout):

diff --git a/lib/fluent/plugin/mqtt_proxy.rb b/lib/fluent/plugin/mqtt_proxy.rb
index de6817d..7d162e3 100644
--- a/lib/fluent/plugin/mqtt_proxy.rb
+++ b/lib/fluent/plugin/mqtt_proxy.rb
@@ -74,6 +74,7 @@ module Fluent::Plugin
         opts[:cert_file] = @security.tls.cert_file
         opts[:key_file] = @security.tls.key_file
       end
+      opts[:assume_publish_response_as_pingresp] = true
 
       init_retry_interval
       @retry_sequence = []

Then, develop/0.5.x branch works nicely. Thanks! 😄
And also non-patched version can handle reconnection w/o unexpected ending of retrying.

@toyokazu
Copy link
Owner

toyokazu commented Dec 18, 2020

Thank you for your advice!
Sorry, I couldn’t have time to trace the details of the fluentd main thread implementation, and your information is beneficial. I understand the procedure of timer_execute().

In the fluent-plugin-mqtt-io case, it might be better to create a thread for keeping the ruby-mqtt read thread and it is also preferred to be maintained by the main thread. So thus, instead of using timer_execute, I’d like to use thread_create() here.

I also misunderstood your pull request. I could not have time to trace the details in the previous implementation but with fluentd 1.11.5, Exceptions are correctly rescued by the thread generated by thread_execute(). So, I removed unnecessary thread “in_mqtt/out_mqtt_monitor”, and left “in_mqtt/out_mqtt_proxy”, “in_mqtt_get” and “out_mqtt_dummy” threads.

Current implementation still has several issues, e.g. the call graph becomes large (connect is called many times…) in the heavy reconnection environment. The memory should be cleared by using the “max_retry_interval” parameter and periodical rebooting.

Thank you again for providing a benchmark script. The revised version seems to keep connection under the requests from your benchmark script by using ruby-mqtt with patches (128, 130). So, I’d like to merge it in a few days. if you find issues, please share them here.

When patch 128 is accepted by ruby-mqtt, please also send a pull request to add a parameter of fluent-plugin-mqtt-io.

By the way, applying patch 128 of ruby-mqtt seems to cause the following error. ruby-mqtt developers might request to investigate it before applying the patch.

#<Thread:0x00007fe1c02ce2b0 /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:276 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
  6: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:278:in `block (2 levels) in connect'
  5: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:468:in `receive_packet'
  4: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:468:in `synchronize'
  3: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:472:in `block in receive_packet'
  2: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:498:in `handle_packet'
  1: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:498:in `synchronize'
/path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:499:in `block in handle_packet': undefined method `<<' for nil:NilClass (NoMethodError)

Just FYI.

After merging to the master branch, I’d like to close this pull request. Thanks.

Best Regards

@toyokazu
Copy link
Owner

I just merged the updates to the master branch and push a new gem to rubygems.org. Thank you for your contributions!

@toyokazu toyokazu closed this Dec 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants