Back to Repositories

Testing Forward Output Acknowledgment Handler in Fluentd

This test suite validates the acknowledgment handling functionality in Fluentd’s forward output plugin. It focuses on verifying proper handling of chunk acknowledgments, timeout scenarios, and thread safety in the AckHandler component.

Test Coverage Overview

The test suite provides comprehensive coverage of the AckHandler class functionality in Fluentd’s forward output plugin.

  • Tests chunk ID matching and validation scenarios
  • Handles various acknowledgment states including success, unmatched, and failure cases
  • Verifies timeout handling and error conditions
  • Tests thread-safety aspects of socket operations

Implementation Analysis

The testing approach utilizes minitest framework with flexmock for mocking network interactions.

Key implementation patterns include:
  • Data-driven testing using the ‘data’ helper
  • IO pipe simulation for network communication
  • Mock objects for network nodes and socket operations
  • Thread synchronization testing

Technical Details

  • Testing Framework: Test::Unit with Flexmock integration
  • Key Dependencies: MessagePack, Base64 encoding
  • Test Infrastructure: IO pipes for simulating network communication
  • Configuration: Customizable timeout and read length settings

Best Practices Demonstrated

The test suite exemplifies robust testing practices for network-related components.

  • Proper resource cleanup with ensure blocks
  • Comprehensive error scenario coverage
  • Thread-safety validation
  • Isolated test cases with clear assertions
  • Mock objects for external dependencies

fluent/fluentd

test/plugin/out_forward/test_ack_handler.rb

            
require_relative '../../helper'
require 'fluent/test/driver/output'
require 'flexmock/test_unit'

require 'fluent/plugin/out_forward'
require 'fluent/plugin/out_forward/ack_handler'

class AckHandlerTest < Test::Unit::TestCase
  data(
    'chunk_id is matched' => [MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') }), Fluent::Plugin::ForwardOutput::AckHandler::Result::SUCCESS],
    'chunk_id is not matched' => [MessagePack.pack({ 'ack' => 'unmatched' }), Fluent::Plugin::ForwardOutput::AckHandler::Result::CHUNKID_UNMATCHED],
    'chunk_id is empty' => ['', Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED],
  )
  test 'returns chunk_id, node, sock and result status' do |args|
    receved, state = args
    ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)

    node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
    chunk_id = 'chunk_id 111'
    ack = ack_handler.create_ack(chunk_id, node)

    r, w = IO.pipe
    begin
      w.write(chunk_id)
      mock(r).recv(anything) { |_| receved } # IO does not have recv
      ack.enqueue(r)

      a1 = a2 = a3 = a4 = nil
      ack_handler.collect_response(1) do |cid, n, s, ret|
        # This block is rescued by ack_handler so it needs to invoke assetion outside of this block
        a1 = cid; a2 = n; a3 = s; a4 = ret
      end

      assert_equal chunk_id, a1
      assert_equal node, a2
      assert_equal r, a3
      assert_equal state, a4
    ensure
      r.close rescue nil
      w.close rescue nil
    end
  end

  test 'returns nil if raise an error' do
    ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)

    node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
    chunk_id = 'chunk_id 111'
    ack = ack_handler.create_ack(chunk_id, node)

    r, w = IO.pipe
    begin
      w.write(chunk_id)
      mock(r).recv(anything) { |_| raise 'unexpected error' } # IO does not have recv
      ack.enqueue(r)

      a1 = a2 = a3 = a4 = nil
      ack_handler.collect_response(1) do |cid, n, s, ret|
        # This block is rescued by ack_handler so it needs to invoke assetion outside of this block
        a1 = cid; a2 = n; a3 = s; a4 = ret
      end

      assert_nil a1
      assert_nil a2
      assert_nil a3
      assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
    ensure
      r.close rescue nil
      w.close rescue nil
    end
  end

  test 'when ack is expired' do
    ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 0, log: $log, read_length: 100)

    node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
    chunk_id = 'chunk_id 111'
    ack = ack_handler.create_ack(chunk_id, node)

    r, w = IO.pipe
    begin
      w.write(chunk_id)
      mock(r).recv(anything).never
      ack.enqueue(r)

      a1 = a2 = a3 = a4 = nil
      ack_handler.collect_response(1) do |cid, n, s, ret|
        # This block is rescued by ack_handler so it needs to invoke assetion outside of this block
        a1 = cid; a2 = n; a3 = s; a4 = ret
      end

      assert_equal chunk_id, a1
      assert_equal node, a2
      assert_equal r, a3
      assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
    ensure
      r.close rescue nil
      w.close rescue nil
    end
  end

  # ForwardOutput uses AckHandler in multiple threads, so we need to assume this case.
  # If exclusive control for this case is implemented, this test may not be necessary.
  test 'raises no error when another thread closes a socket' do
    ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)

    node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
    chunk_id = 'chunk_id 111'
    ack = ack_handler.create_ack(chunk_id, node)

    r, w = IO.pipe
    begin
      w.write(chunk_id)
      def r.recv(arg)
        sleep(1) # To ensure that multiple threads select the socket before closing.
        raise IOError, 'stream closed in another thread' if self.closed?
        MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') })
      end
      ack.enqueue(r)

      threads = []
      2.times do
        threads << Thread.new do
          ack_handler.collect_response(1) do |cid, n, s, ret|
            s&.close
          end
        end
      end

      assert_true threads.map{ |t| t.join(10) }.all?
      assert_false(
        $log.out.logs.any? { |log| log.include?('[error]') },
        $log.out.logs.select{ |log| log.include?('[error]') }.join('\n')
      )
    ensure
      r.close rescue nil
      w.close rescue nil
    end
  end
end