Back to Repositories

Testing Concurrent Throttle Operations in concurrent-ruby

This test suite validates the Concurrent::Throttle implementation in the concurrent-ruby library, focusing on rate limiting and capacity management functionality. The tests verify throttle behavior for concurrent operations, resource acquisition, and executor integration.

Test Coverage Overview

The test suite provides comprehensive coverage of the Throttle class functionality:
  • Acquisition and release of resources with capacity limits
  • Timeout handling for resource acquisition
  • Thread synchronization and blocking behavior
  • Integration with different executor types
  • Capacity limit enforcement across concurrent operations

Implementation Analysis

The testing approach employs RSpec’s behavior-driven development patterns with concurrent programming specifics:
  • Thread-safe counter implementations using AtomicFixnum
  • Future-based asynchronous testing patterns
  • Explicit thread management for concurrency validation
  • Multiple concurrent operation scenarios using promises

Technical Details

Testing infrastructure includes:
  • RSpec test framework for behavior specification
  • Concurrent::Promises for async operation testing
  • Thread manipulation utilities
  • Custom executor proxies for IO operations
  • AtomicFixnum for thread-safe counting

Best Practices Demonstrated

The test suite exemplifies robust concurrent testing practices:
  • Isolation of concurrent operations for deterministic testing
  • Comprehensive edge case coverage including overflow scenarios
  • Platform-specific test skipping (TruffleRuby)
  • Resource cleanup and proper release patterns
  • Deterministic concurrency testing approaches

ruby-concurrency/concurrent-ruby

spec/concurrent/throttle_spec.rb

            
require 'thread'
require 'concurrent/edge/throttle'

RSpec.describe 'Concurrent' do
  describe 'Throttle' do
    specify 'acquiring' do
      skip('flaky on truffleruby') if Concurrent.on_truffleruby?

      throttle = Concurrent::Throttle.new 2
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 2

      expect(throttle.try_acquire).to be_truthy
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 1

      thread = in_thread { throttle.acquire; throttle.release; :ok }
      expect(thread.value).to eq :ok

      expect(throttle.try_acquire).to be_truthy
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 0

      thread1 = in_thread { throttle.acquire(2); throttle.release; :ok }
      thread2 = in_thread { throttle.acquire(0.01) { :ok } }
      thread3 = in_thread { throttle.acquire(2) { :ok } }
      is_sleeping thread1
      expect(thread2.value).to be_falsey
      is_sleeping thread3

      expect(throttle.try_acquire).to be_falsey
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq(0)
      expect(throttle.send(:capacity)).to eq(-3)

      throttle.release

      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 0
      expect(thread1.value).to eq :ok
      expect(thread3.value).to eq :ok
    end

    specify '#to_s' do
      throttle = Concurrent::Throttle.new 2
      expect(throttle.to_s).to match(/Throttle.*available 2 of 2/)
    end

    specify '#on' do
      throttle = Concurrent::Throttle.new 2
      io_proxy = throttle.on :io
      expect(throttle.on(:io)).to eq io_proxy

      expect(io_proxy.can_overflow?).to eq Concurrent.executor(:io).can_overflow?
      expect(io_proxy.serialized?).to eq Concurrent.executor(:io).serialized?

      # cache only one proxy
      fast_proxy = throttle.on :fast
      expect(throttle.on(:io)).to eq io_proxy
      expect(throttle.on(:fast)).not_to eq fast_proxy
    end

    specify 'capacity limited' do
      limit    = 4
      throttle = Concurrent::Throttle.new limit
      counter  = Concurrent::AtomicFixnum.new
      testing  = -> i do
        counter.increment
        sleep rand * 0.02 + 0.02
        # returns less then 3 since it's throttled
        v = counter.value
        counter.decrement
        v
      end

      result = Concurrent::Promises.zip(
          *20.times.map { |i| throttle.future(i, &testing) }
      ).value!
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s

      result = Array.new(20) do |i1|
        Thread.new(i1) { |i2| throttle.acquire { testing.call i2 } }
      end.map(&:value)
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s

      throttled_futures = 20.times.map do |i|
        Concurrent::Promises.
            fulfilled_future(i).
            then_on(throttle.on(:io), &testing)
      end

      result = Concurrent::Promises.zip(*throttled_futures).value!
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s
    end
  end
end