Back to Repositories

Testing Input Plugin Event Stream Processing in Fluentd

This test suite implements a specialized InputTestDriver class for Fluentd, providing comprehensive testing capabilities for input plugins. It enables verification of event emission, timing, and stream handling while offering flexible test execution control and validation mechanisms.

Test Coverage Overview

The test suite provides extensive coverage for Fluentd input plugin testing, focusing on event emission validation and stream processing.

  • Tests event tag, time, and record combinations
  • Validates event stream handling and processing
  • Supports expected emit length verification
  • Implements timeout mechanisms for test execution

Implementation Analysis

The testing approach utilizes a driver-based pattern with stream wrapper functionality for comprehensive input plugin validation.

  • Custom EmitStreamWrapper module for stream handling
  • Flexible condition registration for test execution control
  • Event stream and emit stream tracking mechanisms
  • Support for both individual and batch event validation

Technical Details

  • Built on Fluentd’s core testing framework
  • Utilizes Fluentd::Test::TestDriver as base class
  • Implements custom event time assertion methods
  • Configurable timeout and wait conditions
  • Supports both synchronous and asynchronous testing modes

Best Practices Demonstrated

The test implementation showcases robust testing practices for event-driven systems.

  • Clear separation of test setup and execution
  • Comprehensive event validation mechanisms
  • Flexible test condition management
  • Proper handling of asynchronous operations
  • Thorough error checking and timeout handling

fluent/fluentd

lib/fluent/test/input_test.rb

            
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/engine'
require 'fluent/time'
require 'fluent/test/base'

module Fluent
  module Test
    class InputTestDriver < TestDriver
      def initialize(klass, &block)
        super(klass, &block)
        @emit_streams = []
        @event_streams = []
        @expects = nil
        # for checking only the number of emitted records during run
        @expected_emits_length = nil
        @run_timeout = 5
        @run_post_conditions = []
      end

      def expect_emit(tag, time, record)
        (@expects ||= []) << [tag, time, record]
        self
      end

      def expected_emits
        @expects ||= []
      end

      attr_accessor :expected_emits_length
      attr_accessor :run_timeout
      attr_reader :emit_streams, :event_streams

      def emits
        all = []
        @emit_streams.each {|tag,events|
          events.each {|time,record|
            all << [tag, time, record]
          }
        }
        all
      end

      def events
        all = []
        @emit_streams.each {|tag,events|
          all.concat events
        }
        all
      end

      def records
        all = []
        @emit_streams.each {|tag,events|
          events.each {|time,record|
            all << record
          }
        }
        all
      end

      def register_run_post_condition(&block)
        if block
          @run_post_conditions << block
        end
      end

      def register_run_breaking_condition(&block)
        if block
          @run_breaking_conditions ||= []
          @run_breaking_conditions << block
        end
      end

      def run_should_stop?
        # Should stop running if post conditions are not registered.
        return true unless @run_post_conditions

        # Should stop running if all of the post conditions are true.
        return true if @run_post_conditions.all? {|proc| proc.call }

        # Should stop running if any of the breaking conditions is true.
        # In this case, some post conditions may be not true.
        return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call }

        false
      end

      module EmitStreamWrapper
        def emit_stream_callee=(method)
          @emit_stream_callee = method
        end
        def emit_stream(tag, es)
          @emit_stream_callee.call(tag, es)
        end
      end

      def run(num_waits = 10, &block)
        m = method(:emit_stream)
        unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper)
          Engine.singleton_class.prepend EmitStreamWrapper
        end
        Engine.emit_stream_callee = m
        unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper)
          instance.router.singleton_class.prepend EmitStreamWrapper
        end
        instance.router.emit_stream_callee = m

        super(num_waits) {
          block.call if block

          if @expected_emits_length || @expects || @run_post_conditions
            # counters for emits and emit_streams
            i, j = 0, 0

            # Events of expected length will be emitted at the end.
            max_length = @expected_emits_length
            max_length ||= @expects.length if @expects
            if max_length
              register_run_post_condition do
                i == max_length
              end
            end

            # Set running timeout to avoid infinite loop caused by some errors.
            started_at = Time.now
            register_run_breaking_condition do
              Time.now >= started_at + @run_timeout
            end

            until run_should_stop?
              if j >= @emit_streams.length
                sleep 0.01
                next
              end

              tag, events = @emit_streams[j]
              events.each do |time, record|
                if @expects
                  assert_equal(@expects[i], [tag, time, record])
                  assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime)
                end
                i += 1
              end
              j += 1
            end
            assert_equal(@expects.length, i) if @expects
          end
        }
        self
      end

      private
      def emit_stream(tag, es)
        @event_streams << es
        @emit_streams << [tag, es.to_a]
      end
    end
  end
end