-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mqtt_proxy: Execute retry_connect in main thread to prevent updating @_retrying
race condition
#22
Conversation
timer_execute will execute registered method in main thread. Prevous implementation will causes unhandled retrying with the following error sequence: 1. send_packet(MQTT::Packet::Subscribe) in mqtt_proxy.rb (running on `@connection_thread`) 2. When https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L554, and then keep_alive! method failed in client.rb (running on `@read_thread`) 3. IOError occurred in https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L556 in mqtt_proxy.rb (running on `@connection_thread`) 4. Executed #retry_connect in mqtt_proxy.rb (running on `@connection_thread`) 5. Before reached in https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L93, `@retry_interval` second(s) passes in mqtt_proxy.rb (running on `@connection_thread`) 6. During 5. execution, https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L152 is executed in main thread 7. Then, retry is disabled. In short, `@connection_thread` in mqtt_proxy.rb and main thread will causes updating `@_retrying` race condition. To prevent this race condition, we should use `timer_execute` for `#retry_connect` to execute retrying connection operation on main thread not executing it on `@connection_thread`. This race condition will not happen when every running retrying operation runs on main thread. Signed-off-by: Hiroshi Hatake <[email protected]>
@toyokazu Any thoughts? |
Thank you for your indication and sorry for being late to reply. The point is that the plugin must manage the thread @read_thread created by ruby-mqtt. In my understanding, by using timer_execute, a new thread is created at the specified interval from the main thread and it will not fit the purpose of the plugin. In order to keep monitoring the status of the thread, it is better to create a proxy thread at the start phase and keep it until the end of the life of the plugin. As you indicate, a thread created by the main thread itself cannot rescue MQTT::ProtocolException but a child thread of that thread can rescue the Exception. Based on that, I modified the plugin and committed to develop/0.5.x branch. How do you think? If you like, I’d like to merge it to the master branch. I could not reproduce a race condition as you indicated, so I don’t know the modification can satisfy your requirements or not. If you have some description to reproduce the condition, please provide me. Furthermore, the previous implementation has an issue if it uses proxy services like SSH port forwarding because a proxy service can accept a new connection and after that, it returns an error packet. It causes short-term reconnection when the MQTT server is down and it also prevents reconnection after the server starts up. I could not find a way to detect such a situation only from Exception and I newly added :max_retry_freq option for detecting it. Please also give your comment on that function. Best Regards |
timer_execute doesn't create a new thread, just defers a task on the main thread.
There is no creating a new thread. Fluentd's plugin helper creates a thread when users use thread plugin helper.
I used the following flood of messages creation script: require 'mqtt'
opts = {
host: '127.0.0.1',
port: 1883,
client_id: "test-client",
clean_session: true,
keep_alive: 15
}
client = MQTT::Client.new(opts)
client.connect
while true
start = Time.now
35000.times do
client.publish('test', "a" * 4000)
end
now = Time.now
diff = start - now
sleep(1 - diff)
end |
I tried develop/0.5.x branch with modified ruby-mqtt which is applied the following patches:
And I also applied a patch to handle publish response as keep alive response in this plugin(in high load environment keep alive response sometimes causes MQTT::ProtocolException due to timeout): diff --git a/lib/fluent/plugin/mqtt_proxy.rb b/lib/fluent/plugin/mqtt_proxy.rb
index de6817d..7d162e3 100644
--- a/lib/fluent/plugin/mqtt_proxy.rb
+++ b/lib/fluent/plugin/mqtt_proxy.rb
@@ -74,6 +74,7 @@ module Fluent::Plugin
opts[:cert_file] = @security.tls.cert_file
opts[:key_file] = @security.tls.key_file
end
+ opts[:assume_publish_response_as_pingresp] = true
init_retry_interval
@retry_sequence = [] Then, develop/0.5.x branch works nicely. Thanks! 😄 |
Thank you for your advice! In the fluent-plugin-mqtt-io case, it might be better to create a thread for keeping the ruby-mqtt read thread and it is also preferred to be maintained by the main thread. So thus, instead of using timer_execute, I’d like to use thread_create() here. I also misunderstood your pull request. I could not have time to trace the details in the previous implementation but with fluentd 1.11.5, Exceptions are correctly rescued by the thread generated by thread_execute(). So, I removed unnecessary thread “in_mqtt/out_mqtt_monitor”, and left “in_mqtt/out_mqtt_proxy”, “in_mqtt_get” and “out_mqtt_dummy” threads. Current implementation still has several issues, e.g. the call graph becomes large (connect is called many times…) in the heavy reconnection environment. The memory should be cleared by using the “max_retry_interval” parameter and periodical rebooting. Thank you again for providing a benchmark script. The revised version seems to keep connection under the requests from your benchmark script by using ruby-mqtt with patches (128, 130). So, I’d like to merge it in a few days. if you find issues, please share them here. When patch 128 is accepted by ruby-mqtt, please also send a pull request to add a parameter of fluent-plugin-mqtt-io. By the way, applying patch 128 of ruby-mqtt seems to cause the following error. ruby-mqtt developers might request to investigate it before applying the patch.
Just FYI. After merging to the master branch, I’d like to close this pull request. Thanks. Best Regards |
I just merged the updates to the master branch and push a new gem to rubygems.org. Thank you for your contributions! |
timer_execute will execute registered method in main thread.
Prevous implementation will causes unhandled retrying with the following error sequence:
@connection_thread
)@read_thread
)@connection_thread
)@connection_thread
)@retry_interval
second(s) passes in mqtt_proxy.rb (running on@connection_thread
)In short,
@connection_thread
in mqtt_proxy.rb and main thread willcauses updating
@_retrying
race condition.To prevent this race condition, we should use
timer_execute
for#retry_connect
to execute retrying connection operation on main threadnot executing it on
@connection_thread
.This race condition will not happen when every running retrying operation
runs on main thread.
Signed-off-by: Hiroshi Hatake [email protected]