Skip to content

bloom-solutions/message_bus_client_worker

Repository files navigation

MessageBusClientWorker

Subscribe to MessageBus using Sidekiq workers. This gem was borne out of the noisy logs and difficult in debugging using message_bus-client. This gem aims to:

  • allow sane debugging. Do not have thread-related code all over the code to the MessageBus channel. Rely on Sidekiq
  • keep subscriptions to 1 per channel. With current options, every web server process would start a thread that listened to the MessageBus channels.
  • do not unnecessarily add noise when starting the console like rails console
  • recover from downtime by keeping track of the last message it processed per channel

This gem does not promise that the message are processed only once. You must put safeguards in place if it is imperative that messages are not processed more than once.

Installation

Add this line to your application's Gemfile:

gem 'message_bus_client_worker'

And then execute:

$ bundle

Or install it yourself as:

$ gem install message_bus_client_worker

Usage

Configure the gem in an initializer.

MessageBusClientWorker.configure do |c|
  c.subscriptions = {
    # Format is
    # "https://domain.com" => {
    #   headers: {
    #     "Some header" => "only if you want",
    #   },
    #   channels: {
    #     "/a_channel" => {
    #       processor: "ProcessorClass",
    #       message_id: 0,
    #     }
    #   },
    # }
    "https://etc.com" => {
      headers: {
        "Authorization" => "Bearer #{ENV["MYTOKEN"]}",
      },
      channels: {
        "/exchange_rates" => {
          processor: "ProcessExchangeRate",
          message_id: 0,
        },
        "/messages" => { processor: "ProcessMessage" },
      }
    },
    "https://someotherdomain.com" => {
      channels: {
        "/errors" => { processor: "ProcessError" },
      }
    },
  }
end

Processor

The processor should look like this:

class ProcessMessage
  def self.call(data, payload, headers)
    # ...
  end
end

http://chat.samsaffron.com's /message channel returns JSON like this:

[
  {"global_id":1478,"message_id":3,"channel":"/message","data":{"data":"hey","name":"joe"}},
  {"global_id":1479,"message_id":4,"channel":"/message","data":{"data":"what's up","name":"joe"}},
  ...
]

The processor you define will receive call for every element in the JSON. In ProcessMessage processor as seen above:

  • data would be the value of the "data" key as a Ruby hash, i.e. {"data" => "hey","name" => "joe"}
  • payload is the whole item as a Ruby hash, i.e. {"global_id"=>1479, "message_id"=>4, "channel"=>"/message", "data"=>{"data"=>"what's up", "name"=>"joe"}}

If you don't care to see the whole payload, you can do the following:

class ProcessMessage
  def self.call(data, _)
    # ...
  end
end

message_id

message_id defines where the worker should start reading from.

  • you do not want to use "-1" if you're short polling because no messages will be read and MessageBusClientWorker will not record the last message_id seen.
  • this is only used when nothing has been read before. If you put 0, but the last message that MessageBusClientWorker pulled had a message_id of 23, then this gem will continue where it left off, and not read from 0
  • defaults to "-1"

Staying subscribed

To keep the subscription alive if the worker dies, the app restarts, or the worker falls back to short-polling or long-polling without streaming, use a gem like sidekiq-cron and enqueue the MessageBusClientWorker::EnqueuingWorker:

message_bus_client_worker:
cron: "*/10 * * * * *"
class: "MessageBusClientWorker::EnqueuingWorker"

Note: will probably want to change the poll interval of Sidekiq if you will choose a granularity smaller than 30 seconds.

Sidekiq.options[:poll_interval] = 10

How it Works

Every time MessageBusClientWorker::EnqueuingWorker is enqueued, EnqueuingWorker attempts to enqueue a MessageBusClientWorker::SubscriptionWorker per channel that is found in MessageBusClientWorker.configuration.subcriptions.

SubscriptionWorker will open a connection to the server, and try the following (not all have been implemented):

  • long-poll with streaming, or if streaming is not supported by the server...
  • long-poll, or if long-polling is not supported by the server...
  • short-poll

Development

Copy the config and edit (if necessary)

cp spec/config.yml{.sample,}
docker-compose run gem bundle exec rspec spec

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/bloom-solutions/message_bus_client_worker. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.

License

The gem is available as open source under the terms of the MIT License.

Code of Conduct

Everyone interacting in the MessageBusClientWorker project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.

About

Subscribe to MessageBus using Sidekiq workers

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •