diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index acc31c8..541cbf6 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,6 +3,18 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord + FindJobSecondsTotal = Prometheus::Client::Counter.new( + :que_find_job_seconds_total, + docstring: "Seconds spent finding a job", + labels: %i[queue], + ) + + FindJobHitTotal = Prometheus::Client::Counter.new( + :que_find_job_total, + docstring: "total number of job hits in acquiring a lock", + labels: %i[queue job_hit], + ) + def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool @lock_connection_pool = lock_connection_pool @@ -35,22 +47,20 @@ def execute(command, params = []) def lock_job_with_lock_database(queue, cursor) result = [] loop do - break_loop = false - Que.transaction do - result = Que.execute(:find_job_to_lock, [queue, cursor]) - - if result.empty? - break_loop = true - break + result = Que.transaction do + observe(nil, FindJobSecondsTotal, queue: queue) do + result = Que.execute(:find_job_to_lock, [queue, cursor]) end + return result if result.empty? + cursor = result.first["job_id"] - if pg_try_advisory_lock?(cursor) - break_loop = true - break - end + job_locked = pg_try_advisory_lock?(cursor) + + observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked }) + return result if job_locked end - break if break_loop + break if result end result @@ -74,6 +84,21 @@ def unlock_job(job_id) conn.execute("SELECT pg_advisory_unlock(#{job_id})") end end + + def observe(metric, metric_duration, labels = {}) + now = monotonic_now + yield if block_given? + ensure + metric&.increment(labels: labels) + metric_duration&.increment( + by: monotonic_now - now, + labels: labels, + ) + end + + def monotonic_now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end end end end diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index c639c78..e6beb61 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -28,3 +28,11 @@ def active_record_with_lock_adapter_connection lock_connection_pool: LockDatabaseRecord.connection_pool, ) end + +RSpec.configure do |config| + if ENV["ADAPTER"] == "ActiveRecordWithLock" + config.filter_run_including :active_record_with_lock + else + config.filter_run_excluding :active_record_with_lock + end +end diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index ffa67a8..37c9325 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -119,6 +119,20 @@ def wait_for_jobs_to_be_worked(timeout: 10) expect(User.count).to eq(3) expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) end + + it "increments the metrics", :active_record_with_lock do + CreateUser.enqueue("alice") + CreateUser.enqueue("bob") + CreateUser.enqueue("charlie") + expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). + with({ :labels => { :job_hit => false, :queue => "default" } }).at_least(:once).and_call_original + expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). + with({ :labels => { :job_hit => true, :queue => "default" } }). + exactly(3).times.and_call_original + expect(QueJob.count).to eq(3) + + with_workers(5) { wait_for_jobs_to_be_worked } + end end context "with jobs that exceed stop timeout" do