Skip to content

Commit

Permalink
use yugabyte adapter for getting the lock connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 25, 2024
1 parent f29a8b8 commit e1f6f21
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 245 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ jobs:
ruby-version: "${{ matrix.ruby-version }}"
- name: Run specs
run: |
bundle exec rspec
bundle exec rspec
- name: Run Specs With YUAGBUTE_QUE_WORKER_ENABLED Enabled
run: YUAGBUTE_QUE_WORKER_ENABLED=true bundle exec rspec
56 changes: 28 additions & 28 deletions lib/que/adapters/yugabyte.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,34 @@ def checkout_activerecord_adapter(&block)
YugabyteRecord.connection_pool.with_connection(&block)
end

# def establish_lock_database_connection
# Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection
# end
def checkout_lock_database_connection
# when multiple threads are running we need to make sure
# the acquiring and releasing of advisory locks is done by the
# same connection
Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout
end

# def lock_database_connection
# # connection = @lock_database_connection[Thread.current.name]
# # return connection unless connection.nil?
# # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection
# @lock_database_connection ||= LockDatabaseRecord.connection
# end
def lock_database_connection
Thread.current[:db_connection]
end

def setup_lock_database_connection
::LockDatabaseRecord.connection
def release_lock_database_connection
LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection])
end

# def execute(command, params=[])
# if command == :lock_job
# queue, cursor, lock_database_connection = params
# lock_job_with_lock_database(queue, cursor, lock_database_connection)
# elsif command == :unlock_job
# job_id, lock_database_connection = params
# unlock_job(job_id, lock_database_connection)
# else
# super(command, params)
# end
# end
def execute(command, params=[])
if command == :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
elsif command == :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super(command, params)
end
end

def lock_job_with_lock_database(queue, cursor, lock_database_connection)
def lock_job_with_lock_database(queue, cursor)
query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
Expand All @@ -50,24 +50,24 @@ def lock_job_with_lock_database(queue, cursor, lock_database_connection)
result = Que.execute(query)
return result if result.empty?

if locked?(result.first['job_id'], lock_database_connection)
if locked?(result.first['job_id'])
return result
end

# continue the recursion to fetch the next available job
lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection)
lock_job_with_lock_database(queue, result.first['job_id'])
end

def cleanup!
YugabyteRecord.connection_pool.release_connection
LockDatabaseRecord.connection_pool.release_connection
end

def locked?(job_id, lock_database_connection)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
def locked?(job_id)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock')
end

def unlock_job(job_id, lock_database_connection)
def unlock_job(job_id)
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
Expand Down
43 changes: 3 additions & 40 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue
@queue_expires_at = {}
@secondary_queues = secondary_queues
@consolidated_queues = Array.wrap(queue).concat(secondary_queues)
@using_lock_database = ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)

if @using_lock_database
@lock_database_connection = LockDatabaseRecord.connection
end

# Create a bucket that has 100% capacity, so even when we don't apply a limit we
# have a valid bucket that we can use everywhere
@leaky_bucket = LeakyBucket.new(window: window || 1.0, budget: budget || 1.0)
Expand Down Expand Up @@ -126,11 +120,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
if @using_lock_database
@lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})")
else
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
end
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down Expand Up @@ -158,40 +148,13 @@ def exists?(job)
end

def lock_job_query(queue, cursor)
if @using_lock_database
lock_job_with_lock_database(queue, cursor)
else
Que.execute(:lock_job, [queue, cursor]).first
end
end

def lock_job_with_lock_database(queue, cursor)
query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql

result = Que.execute(query).first
return result if result.nil?

return result if locked?(result['job_id'])

# continue the recursion to fetch the next available job
lock_job_with_lock_database(queue, result['job_id'])
end

def locked?(job_id)
@lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
Que.execute(:lock_job, [queue, cursor]).first
end

def handle_expired_cursors!
@consolidated_queues.each do |queue|
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)


reset_cursor_for!(queue) if queue_cursor_expires_at <= monotonic_now
reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
}
}
# rubocop:enable Style/MutableConstant
end
35 changes: 19 additions & 16 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,30 @@ def initialize(

def work_loop
return if @stop

Que.adapter.checkout_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)
@tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do
loop do
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
loop do
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_worked
nil # immediately find a new job to work
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)

if @stop
Que.adapter.release_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)
break
end
when :job_worked
nil # immediately find a new job to work
end

break if @stop
end
end
ensure
@stopped = true
end
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def wait_for_jobs_to_be_worked(timeout: 10)

expect(QueJob.count).to eq(1)

with_workers(5) { wait_for_jobs_to_be_worked }
with_workers(4) { wait_for_jobs_to_be_worked }

expect(QueJob.count).to eq(0)
expect(FakeJob.log.count).to eq(1)
Expand All @@ -112,7 +112,7 @@ def wait_for_jobs_to_be_worked(timeout: 10)

expect(QueJob.count).to eq(3)

with_workers(5) { wait_for_jobs_to_be_worked }
with_workers(4) { wait_for_jobs_to_be_worked }

expect(QueJob.count).to eq(0)
expect(User.count).to eq(3)
Expand Down
154 changes: 0 additions & 154 deletions spec/integration/integration_with_lock_database_spec.rb

This file was deleted.

Loading

0 comments on commit e1f6f21

Please sign in to comment.