Back to Repositories

Testing Concurrent Agent Implementation in concurrent-ruby

This test suite validates the Agent class functionality in the concurrent-ruby library, focusing on asynchronous state management and concurrency control. The tests verify initialization, action processing, error handling, and observation patterns essential for reliable concurrent operations.

Test Coverage Overview

Comprehensive test coverage for the Agent class implementation, including:
  • Agent initialization and configuration validation
  • Action processing and state management
  • Error handling modes and recovery mechanisms
  • Nested action execution and ordering
  • Validation and observation patterns

Implementation Analysis

The test suite employs RSpec to thoroughly validate the Agent’s concurrent behavior patterns:
  • Uses various executor types (ImmediateExecutor, SingleThreadExecutor) for action processing
  • Implements nested describe/context blocks for organized test grouping
  • Leverages shared examples for reusable test patterns
  • Utilizes CountDownLatch for synchronization testing

Technical Details

Key technical components and testing tools:
  • RSpec testing framework
  • Concurrent Ruby executors and primitives
  • Custom matcher and helper methods
  • Thread synchronization mechanisms
  • Observable pattern implementation

Best Practices Demonstrated

The test suite showcases several testing best practices:
  • Comprehensive edge case coverage
  • Isolated test contexts
  • Clear test descriptions and organization
  • Proper setup and teardown patterns
  • Effective use of RSpec’s let and subject patterns

ruby-concurrency/concurrent-ruby

spec/concurrent/agent_spec.rb

            
require 'concurrent/agent'
require 'concurrent/executor/single_thread_executor'
require 'concurrent/executor/immediate_executor'
require 'concurrent/executor/fixed_thread_pool'
require 'concurrent/atomic/count_down_latch'

require_relative 'concern/observable_shared'

module Concurrent

  RSpec.describe Agent do

    let!(:immediate) { Concurrent::ImmediateExecutor.new }
    let!(:executor) { Concurrent::SingleThreadExecutor.new }

    context 'initialization' do

      it 'sets the initial value' do
        subject = Agent.new(42)
        expect(subject.value).to eq 42
      end

      it 'sets the initial error to nil' do
        subject = Agent.new(42)
        expect(subject.error).to be nil
      end

      it 'sets the error mode when given a valid value' do
        subject = Agent.new(42, error_mode: :fail)
        expect(subject.error_mode).to eq :fail
      end

      it 'defaults the error mode to :continue when an error handler is given' do
        subject = Agent.new(42, error_handler: ->(value) { true })
        expect(subject.error_mode).to eq :continue
      end

      it 'defaults the error mode to :fail when no error handler is given' do
        subject = Agent.new(42)
        expect(subject.error_mode).to eq :fail
      end

      it 'raises an error when given an invalid error mode' do
        expect {
          Agent.new(42, error_mode: :bogus)
        }.to raise_error(ArgumentError)
      end

      it 'sets #failed? to false' do
        subject = Agent.new(42)
        expect(subject).to_not be_failed
        expect(subject).to_not be_stopped
      end
    end

    context 'action processing' do

      specify 'the given block will be passed the current value' do
        actual   = nil
        expected = 0
        subject  = Agent.new(expected)
        subject.send_via(immediate) { |value| actual = value }
        expect(actual).to eq expected
      end

      specify 'the given block will be passed any provided arguments' do
        actual   = nil
        expected = [1, 2, 3, 4]
        subject  = Agent.new(0)
        subject.send_via(immediate, *expected) { |_, *args| actual = args }
        expect(actual).to eq expected
      end

      specify 'the return value will be passed to the validator function' do
        actual    = nil
        expected  = 42
        validator = ->(new_value) { actual = new_value; true }
        subject   = Agent.new(0, validator: validator)
        subject.send_via(immediate) { expected }
        expect(actual).to eq expected
      end

      specify 'upon validation the new value will be set to the block return value' do
        expected  = 42
        validator = ->(_new_value) { true }
        subject   = Agent.new(0, validator: validator)
        subject.send_via(immediate) { expected }
        expect(subject.value).to eq expected
      end

      specify 'on success all observers will be notified' do
        observer_class = Class.new do
          def initialize(bucket)
            @bucket = bucket
          end

          def update(time, old_value, new_value)
            @bucket.concat([time, old_value, new_value])
          end
        end

        bucket  = []
        subject = Agent.new(0)
        subject.add_observer(observer_class.new(bucket))
        subject.send_via(immediate) { 42 }

        expect(bucket[0]).to be_a Time
        expect(bucket[1]).to eq 0
        expect(bucket[2]).to eq 42
      end

      specify 'any recursive action dispatches will run after the value has been updated' do
        subject = Agent.new(0)

        subject.send_via(executor, subject) do |v1, a1|
          expect(v1).to eq 0
          a1.send_via(executor, a1) do |v2, a2|
            expect(v2).to eq 1
            a1.send_via(executor, a2) do |v3, a3|
              expect(v3).to eq 2
              3
            end
            2
          end
          1
        end
        expect(subject.await_for(1)).to eq true
        expect(subject).to_not be_failed
      end

      specify 'when the action raises an error the value will not change' do
        expected = 0
        subject  = Agent.new(expected)
        subject.send_via(immediate) { raise StandardError }
        expect(subject.value).to eq expected
      end

      specify 'when the action raises an error the validator will not be called' do
        validator_called = false
        validator        = ->(new_value) { validator_called = true }
        subject          = Agent.new(0, validator: validator)
        subject.send_via(immediate) { raise StandareError }
        expect(validator_called).to be false
      end

      specify 'when validation returns false the value will not change' do
        expected  = 0
        validator = ->(new_value) { false }
        subject   = Agent.new(0, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.value).to eq expected
      end

      specify 'when validation raises an error the value will not change' do
        expected  = 0
        validator = ->(new_value) { raise StandareError }
        subject   = Agent.new(0, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.value).to eq expected
      end

      specify 'when the action raises an error the handler will be called' do
        error_handler_called = false
        error_handler        = ->(agent, exception) { error_handler_called = true }
        subject              = Agent.new(0, error_handler: error_handler)
        subject.send_via(immediate) { raise StandardError }
        expect(error_handler_called).to be true
      end

      specify 'when validation fails the handler will be called' do
        error_handler_called = false
        error_handler        = ->(agent, exception) { error_handler_called = true }
        validator            = ->(new_value) { false }
        subject              = Agent.new(0, error_handler: error_handler, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(error_handler_called).to be true
      end

      specify 'when validation raises an error the handler will be called' do
        error_handler_called = false
        error_handler        = ->(agent, exception) { error_handler_called = true }
        validator            = ->(new_value) { raise StandardError }
        subject              = Agent.new(0, error_handler: error_handler, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(error_handler_called).to be true
      end
    end

    context 'validation' do

      it 'sets the new value when the validator returns true' do
        expected  = 42
        validator = ->(new_value) { true }
        subject   = Agent.new(0, validator: validator)
        subject.send_via(immediate) { expected }
        expect(subject.value).to eq expected
      end

      it 'rejects the new value when the validator returns false' do
        expected  = 0
        validator = ->(new_value) { false }
        subject   = Agent.new(expected, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.value).to eq expected
      end

      it 'rejects the new value when the validator raises an error' do
        expected  = 0
        validator = ->(new_value) { raise StandardError }
        subject   = Agent.new(expected, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.value).to eq expected
      end

      it 'sets the error when the error mode is :fail and the validator returns false' do
        validator = ->(new_value) { false }
        subject   = Agent.new(0, error_mode: :fail, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.error).to be_a Agent::ValidationError
      end

      it 'sets the error when the error mode is :fail and the validator raises an error' do
        validator = ->(new_value) { raise expected }
        subject   = Agent.new(0, error_mode: :fail, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.error).to be_a Agent::ValidationError
      end

      it 'does not set an error when the error mode is :continue and the validator returns false' do
        validator = ->(new_value) { false }
        subject   = Agent.new(0, error_mode: :continue, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.error).to be nil
      end

      it 'does not set an error when the error mode is :continue and the validator raises an error' do
        validator = ->(new_value) { raise StandardError }
        subject   = Agent.new(0, error_mode: :continue, validator: validator)
        subject.send_via(immediate) { 42 }
        expect(subject.error).to be nil
      end

      it 'does not trigger observation when validation fails' do
        observer_class = Class.new do
          attr_reader :count

          def initialize
            @count = 0
          end

          def update(time, old_value, new_value)
            @count += 1
          end
        end

        observer = observer_class.new
        subject  = Agent.new(0, validator: ->(new_value) { false })
        subject.add_observer(observer)
        subject.send_via(immediate) { 42 }

        expect(observer.count).to eq 0
      end
    end

    context 'error handling' do

      specify 'the agent will be passed to the handler' do
        actual        = nil
        error_handler = ->(agent, error) { actual = agent }
        subject       = Agent.new(0, error_handler: error_handler)
        subject.send_via(immediate) { raise StandardError }
        expect(actual).to eq subject
      end

      specify 'the exception will be passed to the handler' do
        expected      = StandardError.new
        actual        = nil
        error_handler = ->(agent, error) { actual = error }
        subject       = Agent.new(0, error_handler: error_handler)
        subject.send_via(immediate) { raise expected }
        expect(actual).to eq expected
      end

      specify 'does not trigger observation' do
        observer_class = Class.new do
          attr_reader :count

          def initialize
            @count = 0
          end

          def update(time, old_value, new_value)
            @count += 1
          end
        end

        observer = observer_class.new
        subject  = Agent.new(0)
        subject.add_observer(observer)
        subject.send_via(immediate) { raise StandardError }

        expect(observer.count).to eq 0
      end
    end

    context 'error mode' do

      context ':continue' do

        it 'does not set an error when the validator returns false' do
          validator = ->(new_value) { false }
          subject   = Agent.new(0, error_mode: :continue, validator: validator)
          subject.send_via(immediate) { 42 }
          expect(subject.error).to be nil
        end

        it 'does not set an error when the validator raises an error' do
          validator = ->(new_value) { raise StandardError }
          subject   = Agent.new(0, error_mode: :continue, validator: validator)
          subject.send_via(immediate) { 42 }
          expect(subject.error).to be nil
        end

        it 'does not set an error when the action raises an error' do
          subject = Agent.new(0, error_mode: :continue)
          subject.send_via(immediate) { raise StandardError }
          expect(subject.error).to be nil
        end

        it 'does not block further action processing' do
          subject  = Agent.new(0, error_mode: :continue)
          subject.send_via(immediate) { raise StandardError }
          subject.send_via(immediate) { 42 }
          expect(subject.value).to eq 42
        end

        it 'sets #failed? to false' do
          subject = Agent.new(0, error_mode: :continue)
          subject.send_via(immediate) { raise StandardError }
          expect(subject).to_not be_failed
        end
      end

      context ':fail' do

        it 'sets the error when the validator returns false' do
          validator = ->(new_value) { false }
          subject   = Agent.new(0, error_mode: :fail, validator: validator)
          subject.send_via(immediate) { 42 }
          expect(subject.error).to be_a Agent::ValidationError
        end

        it 'sets the error when the validator raises an error' do
          validator = ->(new_value) { raise expected }
          subject   = Agent.new(0, error_mode: :fail, validator: validator)
          subject.send_via(immediate) { 42 }
          expect(subject.error).to be_a Agent::ValidationError
        end

        it 'sets the error when the action raises an error' do
          expected = StandardError.new
          subject  = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise expected }
          expect(subject.error).to eq expected
        end

        it 'blocks all further action processing until a restart' do
          latch    = Concurrent::CountDownLatch.new
          expected = 42

          subject = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise StandardError }
          subject.send_via(executor) { latch.count_down; expected }

          latch.wait(0.1)
          expect(subject.value).to eq 0

          subject.restart(42)
          latch.wait(0.1)
          expect(subject.await_for(1)).to eq true
          expect(subject.value).to eq expected
        end

        it 'sets #failed? to true' do
          subject = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise StandardError }
          expect(subject).to be_failed
        end
      end
    end

    context 'nested actions' do

      specify 'occur in the order they ar post' do
        actual   = []
        expected = [0, 1, 2, 3, 4]
        latch    = Concurrent::CountDownLatch.new
        subject  = Agent.new(0)

        subject.send_via(executor, subject) do |v1, a1|
          a1.send_via(executor, a1) do |v2, a2|
            a1.send_via(executor, a2) do |v3, a3|
              a1.send_via(executor, a3) do |v4, a4|
                a1.send_via(executor, a4) do |v5, a5|
                  actual << v5; latch.count_down
                end
                actual << v4; v4 + 1
              end
              actual << v3; v3 + 1
            end
            actual << v2; v2 + 1
          end
          actual << v1; v1 + 1
        end

        latch.wait(2)
        expect(subject.await_for(1)).to eq true
        expect(actual).to eq expected
      end

      specify 'work with immediate execution' do
        actual   = []
        expected = [0, 1, 2]
        subject  = Agent.new(0)

        subject.send_via(immediate) do |v1|
          subject.send_via(immediate) do |v2|
            subject.send_via(immediate) do |v3|
              actual << v3
            end
            actual << v2; v2 + 1
          end
          actual << v1; v1 + 1
        end

        expect(actual).to eq expected
      end
    end

    context 'posting' do

      context 'with #send' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send { nil }).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'returns false when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect(subject.send { nil }).to be false
          expect(subject.await_for(1)).to eq true
        end

        it 'posts to the global fast executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_fast_executor).and_call_original
          subject.send { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject.send { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end

      context 'with #send!' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send! { nil }).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'raises an error when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect {
            subject.send! { nil }
          }.to raise_error(Agent::Error)
        end

        it 'posts to the global fast executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_fast_executor).and_call_original
          subject.send! { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject.send! { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end

      context 'with #send_off' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send_off { nil }).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'returns false when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect(subject.send_off { nil }).to be false
        end

        it 'posts to the global io executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_io_executor).and_call_original
          subject.send_off { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject.send_off { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end

      context 'with #send_off!' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send_off! { nil }).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'raises an error when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect {
            subject.send_off! { nil }
          }.to raise_error(Agent::Error)
        end

        it 'posts to the global io executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_io_executor).and_call_original
          subject.send_off! { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject.send_off! { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end

      context 'with #send_via' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send_via(immediate) { nil }).to be true
        end

        it 'returns false when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect(subject.send_via(immediate) { nil }).to be false
        end

        it 'posts to the given executor' do
          expect(immediate).to receive(:post).with(any_args).and_call_original
          subject = Agent.new(0)
          subject.send_via(immediate) { nil }
        end
      end

      context 'with #send_via!' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.send_via!(immediate) { nil }).to be true
        end

        it 'raises an error when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect {
            subject.send_via!(immediate) { nil }
          }.to raise_error(Agent::Error)
        end

        it 'posts to the given executor' do
          expect(immediate).to receive(:post).with(any_args).and_call_original
          subject = Agent.new(0)
          subject.send_via!(immediate) { nil }
        end
      end

      context 'with #post' do

        it 'returns true when the job is post' do
          subject = Agent.new(0)
          expect(subject.post { nil }).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'returns false when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect(subject.post { nil }).to be false
        end

        it 'posts to the global io executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_io_executor).and_call_original
          subject.post { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject.post { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end

      context 'with #<<' do

        it 'returns self when the job is post' do
          subject = Agent.new(0)
          expect(subject << proc { nil }).to be subject
          expect(subject.await_for(1)).to eq true
        end

        it 'returns self when #failed?' do
          subject = Agent.new(0)
          allow(subject).to receive(:failed?).and_return(true)
          expect(subject << proc { nil }).to be subject
        end

        it 'posts to the global io executor' do
          subject = Agent.new(0)
          expect(subject).to receive(:enqueue_action_job).with(anything, anything, Concurrent.global_io_executor).and_call_original
          subject << proc { nil }
          expect(subject.await_for(1)).to eq true
        end

        it 'does not wait for the action to process' do
          job_done = false
          subject  = Agent.new(0)
          latch = CountDownLatch.new
          subject << proc { latch.wait; job_done = true }
          expect(job_done).to be false
          latch.count_down
          expect(subject.await_for(1)).to eq true
        end
      end
    end

    context '#restart' do

      context 'when #failed?' do

        it 'raises an error if the new value is not valid' do
          subject = Agent.new(0, error_mode: :fail, validator: ->(new_value) { false })
          subject.send_via(immediate) { raise StandardError }

          expect {
            subject.restart(0)
          }.to raise_error(Agent::Error)
        end

        it 'sets the new value' do
          subject = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise StandardError }

          subject.restart(42)
          expect(subject.value).to eq 42
        end

        it 'clears the error' do
          subject = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise StandardError }

          subject.restart(42)
          expect(subject.error).to be nil
        end

        it 'sets #failed? to true' do
          subject = Agent.new(0, error_mode: :fail)
          subject.send_via(immediate) { raise StandardError }

          subject.restart(42)
          expect(subject).to_not be_failed
        end

        it 'removes all actions from the queue when :clear_actions is true' do
          latch     = Concurrent::CountDownLatch.new
          end_latch = Concurrent::CountDownLatch.new
          subject   = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { latch.wait; raise StandardError }
          subject.send_via(executor) { end_latch.count_down }

          latch.count_down
          10.times { break if subject.failed?; sleep(0.1) }

          subject.restart(42, clear_actions: true)
          result = end_latch.wait(0.1)
          expect(result).to be false
          expect(subject.await_for(1)).to eq true
        end

        it 'does not clear the action queue when :clear_actions is false' do
          latch     = Concurrent::CountDownLatch.new
          end_latch = Concurrent::CountDownLatch.new
          subject   = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { latch.wait; raise StandardError }
          subject.send_via(executor) { end_latch.count_down }

          latch.count_down
          10.times { break if subject.failed?; sleep(0.1) }

          subject.restart(42, clear_actions: false)
          result = end_latch.wait(3)
          expect(result).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'does not clear the action queue when :clear_actions is not given' do
          latch     = Concurrent::CountDownLatch.new
          end_latch = Concurrent::CountDownLatch.new
          subject   = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { latch.wait; raise StandardError }
          subject.send_via(executor) { end_latch.count_down }

          latch.count_down
          10.times { break if subject.failed?; sleep(0.1) }

          subject.restart(42)
          result = end_latch.wait(3)
          expect(result).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'resumes action processing if actions are enqueued' do
          count        = 5
          latch        = Concurrent::CountDownLatch.new
          finish_latch = Concurrent::CountDownLatch.new(5)
          subject      = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { latch.wait; raise StandardError }
          count.times { subject.send_via(executor) { finish_latch.count_down } }

          queue = subject.instance_variable_get(:@queue)
          size  = queue.size
          expect(size).to be > 0

          latch.count_down
          10.times { break if subject.failed?; sleep(0.1) }

          subject.restart(42, clear_actions: false)
          expect(finish_latch.wait(5)).to be true
          expect(subject.await_for(1)).to eq true
        end

        it 'does not trigger observation' do
          observer_class = Class.new do
            attr_reader :count

            def initialize
              @count = 0
            end

            def update(time, old_value, new_value)
              @count += 1
            end
          end

          observer = observer_class.new
          subject  = Agent.new(0, error_mode: :fail)
          subject.add_observer(observer)
          subject.send_via(immediate) { raise StandardError }
          subject.restart(42)

          expect(observer.count).to eq 0
        end
      end

      context 'when not #failed?' do

        it 'raises an error' do
          subject = Agent.new(0)
          expect {
            subject.restart(0)
          }.to raise_error(Agent::Error)
        end
      end
    end

    context 'waiting' do

      context 'the await job' do

        it 'does not change the value' do
          expected = 42
          subject  = Agent.new(0)
          subject.send_via(executor) { sleep(0.1); expected }
          subject.await_for(1)
          expect(subject.value).to eq expected
        end

        it 'does not trigger the error mode' do
          subject = Agent.new(10)
          subject.send { |x| sleep(0.1); x + 1 }
          subject.await_for(1)

          expect(subject.value).to eq 11
          expect(subject).to_not be_failed
          expect(subject.error).to be nil
        end

        it 'does not trigger observers' do
          observer_class = Class.new do
            attr_reader :count

            def initialize
              @count = 0
            end

            def update(time, old_value, new_value)
              @count += 1
            end
          end

          observer = observer_class.new
          subject  = Agent.new(0)
          subject.add_observer(observer)
          subject.send_via(executor) { sleep(0.1); 42 }
          subject.await_for(1)

          expect(observer.count).to eq 1
        end

        it 'waits for nested actions' do
          bucket   = []
          latch    = Concurrent::CountDownLatch.new
          executor = Concurrent::FixedThreadPool.new(3)
          subject  = Agent.new(0)

          subject.send_via(executor) do
            subject.send_via(executor) do
              subject.send_via(executor) do
                bucket << 3
              end
              latch.count_down
              sleep(0.2)
              bucket << 2
            end
            bucket << 1
          end
          latch.wait

          subject.await_for(5)
          expect(bucket).to eq [1, 2, 3]

          executor.kill
          expect(executor.wait_for_termination(pool_termination_timeout)).to eq true
        end
      end

      context 'with #await' do

        it 'returns self when there are no pending actions' do
          subject = Agent.new(0)
          expect(subject.await).to eq subject
          expect(subject.await.value).to eq 0
        end

        it 'does not block on actions from other threads' do
          latch   = Concurrent::CountDownLatch.new
          finish   = Concurrent::CountDownLatch.new
          subject = Agent.new(0)
          in_thread do
            subject.send_via(executor) { finish.wait }
            latch.count_down
          end

          latch.wait(0.1)
          expect(subject.await_for(1)).to eq true
          finish.count_down
        end

        it 'blocks indefinitely' do
          start   = Concurrent.monotonic_time
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          expect(subject.await).to be_truthy
          expect(Concurrent.monotonic_time - start).to be > 0.5
        end

        it 'returns true when all prior actions have processed' do
          count    = 0
          expected = 5
          subject  = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          expected.times { subject.send_via(executor) { count += 1 } }
          subject.await
          expect(count).to eq expected
        end

        it 'blocks forever if restarted with :clear_actions true' do
          pending('the timing is nearly impossible'); fail
          subject = Agent.new(0, error_mode: :fail)

          t = in_thread do
            subject.send_via(executor) { sleep(0.1) }
            subject.send_via(executor) { raise StandardError }
            subject.send_via(executor) { nil }
            in_thread { subject.restart(42, clear_actions: true) }
            subject.await
          end

          thread_status = t.join(0.3)
          expect(thread_status).to be nil
        end
      end

      context 'with #await_for' do

        it 'returns true when there are no pending actions' do
          subject = Agent.new(0)
          expect(subject.await_for(1)).to be true
        end

        it 'does not block on actions from other threads' do
          latch   = Concurrent::CountDownLatch.new
          finish   = Concurrent::CountDownLatch.new
          subject = Agent.new(0)
          in_thread do
            subject.send_via(executor) { finish.wait }
            latch.count_down
          end

          latch.wait(0.1)
          expect(subject.await_for(0.1)).to be true
          finish.count_down
        end

        it 'returns true when all prior actions have processed' do
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          5.times { subject.send_via(executor) { nil } }
          expect(subject.await_for(10)).to be true
        end

        it 'returns false on timeout' do
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          5.times { subject.send_via(executor) { nil } }
          expect(subject.await_for(0.1)).to be false
          expect(subject.await_for(5)).to eq true
        end

        it 'returns false if restarted with :clear_actions true' do
          pending('the timing is nearly impossible'); fail
          subject = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { sleep(0.1) }
          subject.send_via(executor) { raise StandardError }
          subject.send_via(executor) { nil }

          in_thread { subject.restart(42, clear_actions: true) }
          ok = subject.await_for(0.2)

          expect(ok).to be false
        end
      end

      context 'with #await_for!' do

        it 'returns true when there are no pending actions' do
          subject = Agent.new(0)
          expect(subject.await_for!(1)).to be true
        end

        it 'does not block on actions from other threads' do
          latch   = Concurrent::CountDownLatch.new
          finish   = Concurrent::CountDownLatch.new
          subject = Agent.new(0)
          in_thread do
            subject.send_via(executor) { finish.wait }
            latch.count_down
          end

          latch.wait(0.1)
          expect(subject.await_for!(0.1)).to be true
          finish.count_down
        end

        it 'returns true when all prior actions have processed' do
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          5.times { subject.send_via(executor) { nil } }
          expect(subject.await_for!(10)).to be true
        end

        it 'raises an error on timeout' do
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          5.times { subject.send_via(executor) { nil } }
          expect {
            subject.await_for!(0.1)
          }.to raise_error(Concurrent::TimeoutError)
          expect(subject.await_for(5)).to eq true
        end

        it 'raises an error if restarted with :clear_actions true' do
          pending('the timing is nearly impossible'); fail
          subject = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { sleep(0.1) }
          subject.send_via(executor) { raise StandardError }
          subject.send_via(executor) { nil }

          in_thread { subject.restart(42, clear_actions: true) }

          expect {
            subject.await_for!(0.2)
          }.to raise_error(Concurrent::TimeoutError)
        end
      end

      context 'with #wait' do

        it 'returns true when there are no pending actions and timeout is nil' do
          subject = Agent.new(0)
          expect(subject.wait(nil)).to be true
        end

        it 'returns true when there are no pending actions and a timeout is given' do
          subject = Agent.new(0)
          expect(subject.wait(1)).to be true
        end

        it 'does not block on actions from other threads' do
          latch   = Concurrent::CountDownLatch.new
          finish   = Concurrent::CountDownLatch.new
          subject = Agent.new(0)
          in_thread do
            subject.send_via(executor) { finish.wait }
            latch.count_down
          end

          latch.wait(0.1)
          expect(subject.wait(0.1)).to be true
          finish.count_down
        end

        it 'blocks indefinitely when timeout is nil' do
          start   = Concurrent.monotonic_time
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          expect(subject.wait(nil)).to be true
          expect(Concurrent.monotonic_time - start).to be > 0.5
        end

        it 'blocks forever when timeout is nil and restarted with :clear_actions true' do
          pending('the timing is nearly impossible'); fail
          subject = Agent.new(0, error_mode: :fail)

          t = in_thread do
            subject.send_via(executor) { sleep(0.1) }
            subject.send_via(executor) { raise StandardError }
            subject.send_via(executor) { nil }
            in_thread { subject.restart(42, clear_actions: true) }
            subject.wait(nil)
          end

          expect(t.join(0.3)).to be nil
        end

        it 'returns true when all prior actions have processed' do
          count    = 0
          expected = 5
          subject  = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          expected.times { subject.send_via(executor) { count += 1 } }
          subject.wait(nil)
          expect(count).to eq expected
        end

        it 'returns false on timeout' do
          subject = Agent.new(0)
          subject.send_via(executor) { sleep(1) }
          5.times { subject.send_via(executor) { nil } }
          expect(subject.wait(0.1)).to be false
          expect(subject.wait(5)).to eq true
        end

        it 'returns false when timeout is given and restarted with :clear_actions true' do
          pending('the timing is nearly impossible'); fail
          subject = Agent.new(0, error_mode: :fail)

          subject.send_via(executor) { sleep(0.1) }
          subject.send_via(executor) { raise StandardError }
          subject.send_via(executor) { nil }

          in_thread { subject.restart(42, clear_actions: true) }
          ok = subject.wait(0.2)

          expect(ok).to be false
        end
      end

      context 'with .await' do

        it 'returns true when all prior actions on all agents have processed' do
          latch  = Concurrent::CountDownLatch.new
          agents = 3.times.collect { Agent.new(0) }
          agents.each { |agent| agent.send_via(executor, latch) { |_, l| l.wait(1) } }
          in_thread { latch.count_down }
          ok = Agent.await(*agents)
          expect(ok).to be true
        end
      end

      context 'with .await_for' do

        it 'returns true when there are no pending actions' do
          agents = 3.times.collect { Agent.new(0) }
          ok     = Agent.await_for(1, *agents)
          expect(ok).to be true
        end

        it 'returns true when all prior actions for all agents have processed' do
          latch  = Concurrent::CountDownLatch.new
          agents = 3.times.collect { Agent.new(0) }
          agents.each { |agent| agent.send_via(executor, latch) { |_, l| l.wait(1) } }
          in_thread { latch.count_down }
          ok = Agent.await_for(5, *agents)
          expect(ok).to be true
        end

        it 'returns false on timeout' do
          agents = 3.times.collect { Agent.new(0) }
          agents.each { |agent| agent.send_via(executor) { sleep(0.3) } }
          ok = Agent.await_for(0.1, *agents)
          expect(ok).to be false
          expect(Agent.await_for!(1, *agents)).to eq true
        end
      end

      context 'with await_for!' do

        it 'returns true when there are no pending actions' do
          agents = 3.times.collect { Agent.new(0) }
          ok     = Agent.await_for!(1, *agents)
          expect(ok).to be true
        end

        it 'returns true when all prior actions for all agents have processed' do
          latch  = Concurrent::CountDownLatch.new
          agents = 3.times.collect { Agent.new(0) }
          agents.each { |agent| agent.send_via(executor, latch) { |_, l| l.wait(1) } }
          in_thread { latch.count_down }
          ok = Agent.await_for!(5, *agents)
          expect(ok).to be true
        end

        it 'raises an exception on timeout' do
          agents = 3.times.collect { Agent.new(0) }
          agents.each { |agent| agent.send_via(executor) { sleep(0.3) } }
          expect {
            Agent.await_for!(0.1, *agents)
          }.to raise_error(Concurrent::TimeoutError)
          expect(Agent.await_for!(1, *agents)).to eq true
        end
      end
    end

    context :observable do

      subject { Agent.new(0) }

      def trigger_observable(observable)
        observable.send_via(immediate) { 42 }
      end

      it_behaves_like :observable
    end
  end
end