diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 0769a98..b9f70e9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -7,6 +7,8 @@ permissions: env: CONSOLE_OUTPUT: XTerm + TRACES_BACKEND: traces/backend/test + METRICS_BACKEND: metrics/backend/test jobs: test: diff --git a/async-pool.gemspec b/async-pool.gemspec index 93fc1a0..c2640b0 100644 --- a/async-pool.gemspec +++ b/async-pool.gemspec @@ -26,4 +26,6 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 3.1" spec.add_dependency "async", ">= 1.25" + spec.add_dependency "traces" + spec.add_dependency "metrics" end diff --git a/lib/async/pool/controller.rb b/lib/async/pool/controller.rb index 0ced584..21fe3fc 100644 --- a/lib/async/pool/controller.rb +++ b/lib/async/pool/controller.rb @@ -11,6 +11,9 @@ require 'async/notification' require 'async/semaphore' +require 'traces' +require 'metrics' + module Async module Pool # A resource pool controller. @@ -154,7 +157,9 @@ def release(resource) retire(resource) unless processed end - private def drain + def drain + Console.debug(self, "Draining pool...", size: @resources.size) + # Enumerate all existing resources and retire them: while resource = acquire_existing_resource retire(resource) @@ -262,6 +267,7 @@ def availability_summary def reuse(resource) Console.debug(self) {"Reuse #{resource}"} + usage = @resources[resource] if usage.nil? || usage.zero? @@ -286,11 +292,7 @@ def wait_for_resource @notification.wait end - Console.debug(self) {"Wait for resource -> #{resource}"} - - # if resource.concurrency > 1 - # @notification.signal - # end + # Be careful not to context switch or fail here. return resource end @@ -318,12 +320,16 @@ def create_resource def available_resource resource = nil + Console.debug(self, "Acquiring concurrency guard...", blocking: @guard.blocking?) + @guard.acquire do + Console.debug(self, "Acquired concurrency guard.") + resource = acquire_or_create_resource end return resource - rescue Exception + rescue Exception => error reuse(resource) if resource raise end @@ -375,6 +381,50 @@ def acquire_or_create_resource return create_resource end end + + Traces::Provider(self) do + def create_resource(...) + attributes = { + concurrency: @guard.limit, + size: @resources.size, + limit: @limit, + } + + Traces.trace('async.pool.create', attributes: attributes) {super} + end + + def drain(...) + attributes = { + size: @resources.size, + } + + Traces.trace('async.pool.drain', attributes: attributes) {super} + end + end + + Metrics::Provider(self) do + ACQUIRE_COUNT = Metrics.metric('async.pool.acquire', :counter, description: 'Number of times a resource was invoked.') + RELEASE_COUNT = Metrics.metric('async.pool.release', :counter, description: 'Number of times a resource was released.') + RETIRE_COUNT = Metrics.metric('async.pool.retire', :counter, description: 'Number of times a resource was retired.') + + def acquire(...) + ACQUIRE_COUNT.emit(1) + + super + end + + def release(...) + super.tap do + RELEASE_COUNT.emit(1) + end + end + + def retire(...) + super.tap do + RETIRE_COUNT.emit(1) + end + end + end end end end diff --git a/test/async/pool/failure.rb b/test/async/pool/failure.rb new file mode 100644 index 0000000..9f7268a --- /dev/null +++ b/test/async/pool/failure.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2021-2024, by Samuel Williams. + +require 'async/pool/controller' +require 'async/pool/resource' +require 'async/queue' + +require 'sus/fixtures/async/reactor_context' + +describe Async::Pool::Controller do + include Sus::Fixtures::Async::ReactorContext + + let(:resources) {Async::Queue.new} + + let(:constructor) do + lambda do + resource = resources.dequeue + + if resource.is_a?(Exception) + raise resource + end + + resource + end + end + + let(:pool) {subject.new(constructor)} + + with "a constructor that fails" do + it "robustly creates new resources" do + resource1 = Async::Pool::Resource.new + resource2 = Async::Pool::Resource.new + + resources.enqueue(RuntimeError.new("Failed to connect")) + resources.enqueue(resource1) + resources.enqueue(RuntimeError.new("Failed to connect")) + resources.enqueue(resource2) + + expect{pool.acquire}.to raise_exception(RuntimeError) + expect(pool.acquire).to be == resource1 + expect{pool.acquire}.to raise_exception(RuntimeError) + expect(pool.acquire).to be == resource2 + ensure + pool.release(resource1) + pool.release(resource2) + end + end +end