Back to Repositories

Testing Channel Communication Primitives in concurrent-ruby

This test suite validates the Channel implementation in Concurrent Ruby, focusing on asynchronous communication and message passing between threads. The tests cover channel capacity management, push/pop operations, and complex message exchange patterns.

Test Coverage Overview

The test suite provides comprehensive coverage of Channel functionality in Concurrent Ruby:

  • Channel capacity and size management
  • Synchronous and asynchronous push/pop operations
  • Pattern matching for message selection
  • Channel selection and multi-channel operations
  • Thread synchronization and blocking behavior

Implementation Analysis

The testing approach employs RSpec’s behavior-driven development framework to validate Channel operations. Tests utilize thread manipulation, timing controls, and operation combinations to verify both successful scenarios and edge cases.

Key patterns include concurrent operation verification, timeout handling, and promise-based asynchronous testing.

Technical Details

Testing infrastructure includes:

  • RSpec testing framework
  • Concurrent Ruby’s Channel implementation
  • Thread manipulation helpers
  • Custom matchers for channel operations
  • Timing-sensitive test scenarios

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through comprehensive coverage of synchronization primitives. Notable practices include:

  • Isolation of concurrent operations
  • Explicit state verification
  • Timeout handling for blocking operations
  • Integration testing with promises
  • Systematic edge case coverage

ruby-concurrency/concurrent-ruby

spec/concurrent/edge/channel_spec.rb

            
require 'concurrent/edge/channel'

RSpec.describe 'Concurrent' do
  describe 'Promises::Channel', edge: true do
    specify "#capacity" do
      channel = Concurrent::Promises::Channel.new 2
      expect(channel.capacity).to be 2
    end

    specify "#to_s" do
      channel = Concurrent::Promises::Channel.new
      expect(channel.to_s).to match(/Channel.*unlimited/)
      channel = Concurrent::Promises::Channel.new 2
      expect(channel.to_s).to match(/Channel.*0.*2/)
      channel.push :value
      expect(channel.to_s).to match(/Channel.*1.*2/)
    end

    specify "#(try_)push(_op)" do
      channel = Concurrent::Promises::Channel.new 1

      expect(channel.size).to eq 0
      expect(channel.try_push(:v1)).to be_truthy
      expect(channel.size).to eq 1
      expect(channel.try_push(:v2)).to be_falsey
      expect(channel.size).to eq 1

      channel = Concurrent::Promises::Channel.new 1
      expect(channel.push(:v1)).to eq channel
      expect(channel.size).to eq 1
      thread = in_thread { channel.push :v2 }
      is_sleeping thread
      expect(channel.size).to eq 1
      channel.pop
      expect(channel.size).to eq 1
      expect(thread.value).to eq channel
      channel.pop
      expect(channel.size).to eq 0

      channel = Concurrent::Promises::Channel.new 1
      expect(channel.push(:v1)).to eq channel
      expect(channel.size).to eq 1
      thread = in_thread { channel.push :v2, 0.01 }
      is_sleeping thread
      expect(channel.size).to eq 1
      expect(thread.value).to eq false
      channel.pop
      expect(channel.size).to eq 0
      expect(channel.push(:v3, 0)).to eq true
      expect(channel.size).to eq 1
      thread = in_thread { channel.push :v2, 1 }
      is_sleeping thread
      channel.pop
      expect(channel.size).to eq 1
      expect(thread.value).to eq true

      channel = Concurrent::Promises::Channel.new 1
      expect(channel.push_op(:v1).value!).to eq channel
      expect(channel.size).to eq 1
      push_op = channel.push_op :v2
      expect(channel.size).to eq 1
      expect(push_op.pending?).to be_truthy
      channel.pop
      expect(channel.size).to eq 1
      expect(push_op.value!).to eq channel
      channel.pop
      expect(channel.size).to eq 0
    end

    specify "#(try_)pop(_op)" do
      channel = Concurrent::Promises::Channel.new 1
      channel.push :v1

      expect(channel.size).to eq 1
      expect(channel.try_pop).to eq :v1
      expect(channel.size).to eq 0
      expect(channel.try_pop).to eq nil
      expect(channel.size).to eq 0

      channel = Concurrent::Promises::Channel.new 1
      channel.push :v1
      expect(channel.pop).to eq :v1
      expect(channel.size).to eq 0
      thread = in_thread { channel.pop }
      is_sleeping thread
      expect(channel.size).to eq 0
      channel.push :v2
      expect(thread.value).to eq :v2
      expect(channel.size).to eq 0

      channel = Concurrent::Promises::Channel.new 1
      channel.push :v1
      expect(channel.pop).to eq :v1
      expect(channel.size).to eq 0
      thread = in_thread { channel.pop 0.01 }
      is_sleeping thread
      expect(channel.size).to eq 0
      expect(thread.value).to eq nil
      channel.push :v2
      expect(channel.size).to eq 1
      expect(channel.pop).to eq :v2
      expect(channel.size).to eq 0
      thread = in_thread { channel.pop 1 }
      is_sleeping thread
      channel.push :v3
      expect(channel.size).to eq 0
      expect(thread.value).to eq :v3
      channel.push :v4
      expect(channel.pop(0)).to eq :v4

      channel = Concurrent::Promises::Channel.new 1
      channel.push :v1
      expect(channel.pop_op.value!).to eq :v1
      expect(channel.size).to eq 0
      pop_op = channel.pop_op
      expect(channel.size).to eq 0
      expect(pop_op.pending?).to be_truthy
      channel.push :v2
      expect(channel.size).to eq 0
      expect(pop_op.value!).to eq :v2
    end

    specify "#(try_)pop(_op)_matching" do
      channel = Concurrent::Promises::Channel.new 2
      channel.push 'junk'
      channel.push :v1

      expect(channel.size).to eq 2
      expect(channel.try_pop_matching(Symbol)).to eq :v1
      expect(channel.size).to eq 1
      expect(channel.try_pop_matching(Symbol)).to eq nil
      expect(channel.size).to eq 1

      channel = Concurrent::Promises::Channel.new 2
      channel.push 'junk'
      channel.push :v1
      expect(channel.pop_matching(Symbol)).to eq :v1
      expect(channel.size).to eq 1
      thread = in_thread { channel.pop_matching(Symbol) }
      is_sleeping thread
      expect(channel.size).to eq 1
      channel.push 'junk'
      channel.pop
      channel.push :v2
      expect(thread.value).to eq :v2
      expect(channel.size).to eq 1

      channel = Concurrent::Promises::Channel.new 2
      channel.push 'junk'
      channel.push :v1
      expect(channel.pop_matching(Symbol)).to eq :v1
      expect(channel.size).to eq 1
      thread = in_thread { channel.pop_matching(Symbol, 0.01) }
      is_sleeping thread
      expect(channel.size).to eq 1
      expect(thread.value).to eq nil
      channel.push :v2
      expect(channel.size).to eq 2
      expect(channel.pop_matching(Symbol)).to eq :v2
      expect(channel.size).to eq 1
      thread = in_thread { channel.pop_matching(Symbol,1) }
      is_sleeping thread
      channel.push :v3
      expect(channel.size).to eq 1
      expect(thread.value).to eq :v3
      channel.push :v4
      expect(channel.pop_matching(Symbol,0)).to eq :v4

      channel = Concurrent::Promises::Channel.new 2
      channel.push 'junk'
      channel.push :v1
      expect(channel.pop_op_matching(Symbol).value!).to eq :v1
      expect(channel.size).to eq 1
      pop_op = channel.pop_op_matching(Symbol)
      expect(channel.size).to eq 1
      expect(pop_op.pending?).to be_truthy
      channel.push :v2
      expect(channel.size).to eq 1
      expect(pop_op.value!).to eq :v2
    end

    specify "#(try_)select(_op)" do
      channel1 = Concurrent::Promises::Channel.new 1
      channel2 = Concurrent::Promises::Channel.new 1

      expect(channel1.try_select(channel2)).to eq nil
      expect(Concurrent::Promises::Channel.try_select([channel1, channel2])).to eq nil
      channel1.push :v1
      expect(channel1.try_select(channel2)).to eq [channel1, :v1]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0

      channel1 = Concurrent::Promises::Channel.new 1
      channel2 = Concurrent::Promises::Channel.new 1
      channel1.push :v1
      expect(Concurrent::Promises::Channel.select([channel1, channel2])).to eq [channel1, :v1]
      channel1.push :v1
      expect(channel1.select(channel2)).to eq [channel1, :v1]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      thread = in_thread { channel1.select(channel2) }
      is_sleeping thread
      expect(channel1.size).to eq 0
      channel2.push :v2
      expect(thread.value).to eq [channel2, :v2]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0

      channel1 = Concurrent::Promises::Channel.new 1
      channel2 = Concurrent::Promises::Channel.new 1
      channel1.push :v1
      expect(channel1.select(channel2)).to eq [channel1, :v1]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      thread = in_thread { channel1.select(channel2, 0.01) }
      is_sleeping thread
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      expect(thread.value).to eq nil
      channel2.push :v2
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 1
      expect(channel2.select(channel1)).to eq [channel2, :v2]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0

      channel1 = Concurrent::Promises::Channel.new 1
      channel2 = Concurrent::Promises::Channel.new 1
      channel1.push :v1
      expect(channel1.select_op(channel2).value!).to eq [channel1, :v1]
      channel1.push :v1
      expect(Concurrent::Promises::Channel.select_op([channel1, channel2]).value!).to eq [channel1, :v1]
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      select_op = channel2.select_op(channel1)
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      expect(select_op.pending?).to be_truthy
      channel2.push :v2
      expect(channel1.size).to eq 0
      expect(channel2.size).to eq 0
      expect(select_op.value!).to eq [channel2, :v2]
    end

    def push_first(push_type, pop_type)
      channel = Concurrent::Promises::Channel.new 0
      message = Object.new

      case push_type
      when :push
        thread = in_thread { channel.push message }
        is_sleeping thread
      when :push_op
        push = channel.push_op message
        expect(push.pending?).to eq true
      else
        raise
      end

      expect(channel.size).to eq 0

      case pop_type
      when :try_pop
        expect(channel.try_pop).to eq message
      when :pop
        expect(channel.pop).to eq message
      when :pop_op
        expect(channel.pop_op.value!).to eq message
      else
        raise
      end

      expect(channel.size).to eq 0

      case push_type
      when :push
        expect(thread.value).to eq channel
      when :push_op
        expect(push.value!).to eq channel
      else
        raise
      end
    end

    def pop_first(pop_type, push_type)
      channel = Concurrent::Promises::Channel.new 0
      message = Object.new

      case pop_type
      when :pop
        thread = in_thread { channel.pop }
        is_sleeping thread
      when :pop_op
        pop = channel.pop_op
        expect(pop.pending?).to eq true
      else
        raise
      end

      expect(channel.size).to eq 0

      case push_type
      when :try_push
        expect(channel.try_push message).to eq true
      when :push
        expect(channel.push(message)).to eq channel
      when :push_op
        expect(channel.push_op(message).value!).to eq channel
      else
        raise
      end

      expect(channel.size).to eq 0

      case pop_type
      when :pop
        expect(thread.value).to eq message
      when :pop_op
        expect(pop.value!).to eq message
      else
        raise
      end
    end


    specify 'exchanging' do
      push_first :push, :try_pop
      push_first :push, :pop
      push_first :push, :pop_op
      push_first :push_op, :try_pop
      push_first :push_op, :pop
      push_first :push_op, :pop_op

      pop_first :pop, :try_push
      pop_first :pop, :push
      pop_first :pop, :push_op
      pop_first :pop_op, :try_push
      pop_first :pop_op, :push
      pop_first :pop_op, :push_op

      ch1       = Concurrent::Promises::Channel.new 0
      ch2       = Concurrent::Promises::Channel.new 0
      selection = ch1.select_op(ch2)
      expect(ch2.try_push(:v3)).to be_truthy
      expect(selection.value!).to eq [ch2, :v3]
    end

    specify 'integration' do
      ch1 = Concurrent::Promises::Channel.new
      ch2 = Concurrent::Promises::Channel.new
      ch3 = Concurrent::Promises::Channel.new

      add = -> *_ do
        (ch1.pop_op & ch2.pop_op).then do |a, b|
          if a == :done && b == :done
            :done
          else
            # do not add again until push is done
            ch3.push_op(a + b).then(&add)
          end
        end
      end

      ch1.push_op 1
      ch2.push_op 2
      ch1.push_op 'a'
      ch2.push_op 'b'
      ch1.push_op nil
      ch2.push_op true

      result = Concurrent::Promises.future(&add).run.result
      expect(result[0..1]).to eq [false, nil]
      expect(result[2]).to be_a_kind_of(NoMethodError)
      expect(ch3.pop_op.value!).to eq 3
      expect(ch3.pop_op.value!).to eq 'ab'

      ch1.push_op 1
      ch2.push_op 2
      ch1.push_op 'a'
      ch2.push_op 'b'
      ch1.push_op :done
      ch2.push_op :done

      expect(Concurrent::Promises.future(&add).run.result).to eq [true, :done, nil]
      expect(ch3.pop_op.value!).to eq 3
      expect(ch3.pop_op.value!).to eq 'ab'
    end
  end
end