Back to Repositories

Testing Event Router Pipeline Management in Fluentd

This test suite validates the EventRouter functionality in Fluentd, focusing on event routing, filtering, and error handling mechanisms. It ensures proper event stream processing and pipeline management through comprehensive unit tests.

Test Coverage Overview

The test suite provides extensive coverage of Fluentd’s event routing system, including:
  • Match cache functionality and expiration
  • Pipeline configuration and event processing
  • Filter chain optimization
  • Error handling scenarios
  • Default collector behavior
Key integration points include filter plugins, output handling, and event stream transformations.

Implementation Analysis

The testing approach utilizes Ruby’s Test::Unit framework with modular test cases.
  • Implements sub_test_case blocks for logical grouping
  • Uses RR mocking framework for behavior verification
  • Employs setup/teardown hooks for test isolation
  • Implements both single and multi-event stream testing

Technical Details

Testing infrastructure includes:
  • Test::Unit as the primary testing framework
  • RR for mocking and stubbing
  • Custom test plugin classes
  • Event stream simulation utilities
  • Pipeline configuration helpers

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices:
  • Comprehensive setup and teardown management
  • Isolated test cases with clear purpose
  • Edge case coverage for cache expiration
  • Thorough validation of filter chain behavior
  • Proper error handling verification

fluent/fluentd

test/test_event_router.rb

            
require_relative 'helper'
require 'fluent/event_router'
require_relative 'test_plugin_classes'

class EventRouterTest < ::Test::Unit::TestCase
  include Fluent
  include FluentTest

  teardown do
    @output = nil
    @filter = nil
    @compat_filter = nil
    @error_output = nil
    @emit_handler = nil
    @default_collector = nil
  end

  def output
    @output ||= FluentTestOutput.new
  end

  def filter
    @filter ||= FluentTestFilter.new
  end

  def compat_filter
    @compat_filter ||= FluentCompatTestFilter.new
  end

  def error_output
    @error_output ||= FluentTestErrorOutput.new
  end

  def emit_handler
    @emit_handler ||= TestEmitErrorHandler.new
  end

  def default_collector
    @default_collector ||= FluentTestOutput.new
  end

  def event(record, time = Engine.now)
    OneEventStream.new(time, record)
  end

  DEFAULT_EVENT_NUM = 5

  def events(num = DEFAULT_EVENT_NUM)
    es = MultiEventStream.new
    num.times { |i|
      es.add(Engine.now, 'key' => "value#{i}")
    }
    es
  end

  sub_test_case EventRouter::MatchCache do
    setup do
      @match_cache = EventRouter::MatchCache.new
    end

    test "call block when non-cached key" do
      assert_raise(RuntimeError.new('Test!')) {
        @match_cache.get('test') { raise 'Test!' }
      }
    end

    test "don't call block when cached key" do
      @match_cache.get('test') { "I'm cached" }
      assert_nothing_raised {
        @match_cache.get('test') { raise 'Test!' }
      }
      assert_equal "I'm cached", @match_cache.get('test') { raise 'Test!' }
    end

    test "call block when keys are expired" do
      cache_size = EventRouter::MatchCache::MATCH_CACHE_SIZE
      cache_size.times { |i|
        @match_cache.get("test#{i}") { "I'm cached #{i}" }
      }
      assert_nothing_raised {
        cache_size.times { |i|
          @match_cache.get("test#{i}") { raise "Why called?" }
        }
      }
      # expire old keys
      cache_size.times { |i|
        @match_cache.get("new_test#{i}") { "I'm young #{i}" }
      }
      num_called = 0
      cache_size.times { |i|
        @match_cache.get("test#{i}") { num_called += 1 }
      }
      assert_equal cache_size, num_called
    end
  end

  sub_test_case EventRouter::Pipeline do
    setup do
      @pipeline = EventRouter::Pipeline.new
      @es = event('key' => 'value')
    end

    test 'set one output' do
      @pipeline.set_output(output)
      @pipeline.emit_events('test', @es)
      assert_equal 1, output.events.size
      assert_equal 'value', output.events['test'].first['key']
    end

    sub_test_case 'with filter' do
      setup do
        @pipeline.set_output(output)
      end

      data('Filter plugin' => 'filter',
           'Compat::Filter plugin' => 'compat_filter')
      test 'set one filer' do |filter_type|
        @pipeline.add_filter(filter_type == 'filter' ? filter : compat_filter)
        @pipeline.emit_events('test', @es)
        assert_equal 1, output.events.size
        assert_equal 'value', output.events['test'].first['key']
        assert_equal 0, output.events['test'].first['__test__']
      end

      data('Filter plugin' => 'filter',
           'Compat::Filter plugin' => 'compat_filter')
      test 'set one filer with multi events' do |filter_type|
        @pipeline.add_filter(filter_type == 'filter' ? filter : compat_filter)
        @pipeline.emit_events('test', events)
        assert_equal 1, output.events.size
        assert_equal 5, output.events['test'].size
        DEFAULT_EVENT_NUM.times { |i|
          assert_equal "value#{i}", output.events['test'][i]['key']
          assert_equal i, output.events['test'][i]['__test__']
        }
      end
    end
  end

  sub_test_case EventRouter do
    teardown do
      @event_router = nil
    end

    def event_router
      @event_router ||= EventRouter.new(default_collector, emit_handler)
    end

    sub_test_case 'default collector' do
      test 'call default collector when no output' do
        assert_rr do
          mock(default_collector).emit_events('test', is_a(OneEventStream))
          event_router.emit('test', Engine.now, 'k' => 'v')
        end
      end

      test "call default collector when only filter" do
        event_router.add_rule('test', filter)
        assert_rr do
          # After apply Filter, EventStream becomes MultiEventStream by default
          mock(default_collector).emit_events('test', is_a(MultiEventStream))
          event_router.emit('test', Engine.now, 'k' => 'v')
        end
        assert_equal 1, filter.num
      end

      test "call default collector when no matched with output" do
        event_router.add_rule('test', output)
        assert_rr do
          mock(default_collector).emit_events('dummy', is_a(OneEventStream))
          event_router.emit('dummy', Engine.now, 'k' => 'v')
        end
      end

      test "don't call default collector when tag matched" do
        event_router.add_rule('test', output)
        assert_rr do
          mock(default_collector).emit_events('test', is_a(OneEventStream)).never
          event_router.emit('test', Engine.now, 'k' => 'v')
        end
        # check emit handler doesn't catch rr error
        assert_empty emit_handler.events
      end
    end

    sub_test_case 'filter' do
      test 'filter should be called when tag matched' do
        filter = Class.new(FluentTestFilter) { |x|
          def filter_stream(_tag, es); end
        }.new

        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter).filter_stream('test', is_a(OneEventStream)) { events }
          event_router.emit('test', Engine.now, 'k' => 'v')
        end
      end

      test 'filter should not be called when tag mismatched' do
        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter).filter_stream('test', is_a(OneEventStream)).never
          event_router.emit('foo', Engine.now, 'k' => 'v')
        end
      end

      test 'filter changes records' do
        event_router.add_rule('test', filter)
        event_router.add_rule('test', output)
        event_router.emit('test', Engine.now, 'k' => 'v')

        assert_equal 1, filter.num
        assert_equal 1, output.events['test'].size
        assert_equal 0, output.events['test'].first['__test__']
        assert_equal 'v', output.events['test'].first['k']
      end

      test 'filter can be chained' do
        other_filter = FluentTestFilter.new('__hoge__')
        event_router.add_rule('test', filter)
        event_router.add_rule('test', other_filter)
        event_router.add_rule('test', output)
        event_router.emit('test', Engine.now, 'k' => 'v')

        assert_equal 1, filter.num
        assert_equal 1, other_filter.num
        assert_equal 1, output.events['test'].size
        assert_equal 0, output.events['test'].first['__test__']
        assert_equal 0, output.events['test'].first['__hoge__']
        assert_equal 'v', output.events['test'].first['k']
      end
    end

    sub_test_case 'optimized filter' do
      setup do
        @record = { 'k' => 'v' }
        @now = Engine.now
      end

      test 'call optimized filter when the filter plugin implements #filter without #filter_stream' do
        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter).filter('test', @now, @record) { @record }
          event_router.emit('test', @now, @record)
        end
      end

      test 'call optimized filter when the filter plugin implements #filter_with_time without #filter_stream' do
        filter = Class.new(FluentTestFilter) {
          undef_method :filter
          def filter_with_time(tag, time, record); end
        }.new

        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter).filter_with_time('test', @now, @record) { [time, @record] }
          event_router.emit('test', @now, @record)
        end
      end

      test "don't call optimized filter when filter plugins implement #filter_stream" do
        filter = Class.new(FluentTestFilter) {
          undef_method :filter
          def filter_stream(tag, time, record); end
        }.new

        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter).filter_stream('test', is_a(OneEventStream)) { OneEventStream.new(@now, @record) }
          event_router.emit('test', @now, @record)
        end
      end

      test 'call optimized filter when filter plugins have #filter_with_time instead of #filter' do
        filter_with_time = Class.new(FluentTestFilter) {
          undef_method :filter
          def filter_with_time(tag, time, record); end
        }.new

        event_router.add_rule('test', filter_with_time)
        event_router.add_rule('test', filter)

        assert_rr do
          mock(filter_with_time).filter_with_time('test', @now, @record) { [@now + 1, @record] }
          mock(filter).filter('test', @now + 1, @record) { @record }
          event_router.emit('test', @now, @record)
        end
      end

      test "don't call optimized filter even if just a filter of some filters implements #filter_stream method" do
        filter_stream = Class.new(FluentTestFilter) {
          def filter_stream(tag, record); end
        }.new

        filter_with_time = Class.new(FluentTestFilter) {
          undef_method :filter
          def filter_with_time(tag, time, record); end
        }.new

        filters = [filter_stream, filter_with_time, filter]
        filters.each { |f| event_router.add_rule('test', f) }

        e = OneEventStream.new(@now, @record)
        assert_rr do
          mock($log).info("disable filter chain optimization because #{[filter_stream].map(&:class)} uses `#filter_stream` method.")
          mock(filter_stream).filter_stream('test', is_a(OneEventStream)) { e }
          mock(filter).filter_stream('test', is_a(OneEventStream)) { e }
          mock(filter_with_time).filter_stream('test', is_a(OneEventStream)) { e }
          event_router.emit('test', @now, @record)
        end
      end
    end

    sub_test_case 'emit_error_handler' do
      test 'call handle_emits_error when emit failed' do
        event_router.add_rule('test', error_output)

        event_router.emit('test', Engine.now, 'k' => 'v')
        assert_rr do
          mock(emit_handler).handle_emits_error('test', is_a(OneEventStream), is_a(RuntimeError))
          event_router.emit('test', Engine.now, 'k' => 'v')
        end
      end

      test 'can pass records modified by filters to handle_emits_error' do
        filter = Class.new(FluentTestFilter) {
          def filter_stream(_tag, es); end
        }.new
        event_router.add_rule('test', filter)
        event_router.add_rule('test', error_output)

        time = Engine.now
        modified_es = OneEventStream.new(time, 'modified_label' => 'modified_value')

        assert_rr do
          stub(filter).filter_stream { modified_es }
          mock(emit_handler).handle_emits_error('test', modified_es, is_a(RuntimeError))
          event_router.emit('test', time, 'pre_label' => 'pre_value')
        end
      end
    end
  end
end