Back to Repositories

Testing Filter Component Implementation in Fluentd

This test suite implements a specialized FilterTestDriver for testing Fluentd’s filter functionality. It provides a robust framework for validating filter behavior, event stream processing, and tag-based filtering operations in Fluentd’s data processing pipeline.

Test Coverage Overview

The test suite provides comprehensive coverage of Fluentd’s filter components, focusing on event stream processing and tag-based filtering. Key functionality includes:

  • Event emission and filtering with customizable tags
  • Stream-level filtering operations
  • Time-based event processing
  • Multi-event stream handling

Implementation Analysis

The testing approach utilizes a custom TestDriver implementation specifically designed for filter testing. It employs Ruby’s module system and inheritance patterns to create a flexible testing framework. The implementation includes methods for both single-event and stream-based filtering, with support for custom time stamps and tag manipulation.

Technical Details

Testing tools and components include:

  • FilterTestDriver class extending TestDriver
  • MultiEventStream for managing multiple events
  • EventTime integration for timestamp handling
  • Custom tag management system
  • Aliased methods for backward compatibility

Best Practices Demonstrated

The test implementation showcases several testing best practices including:

  • Clear separation of concerns between event handling and filtering logic
  • Flexible test configuration through initialization options
  • Consistent API design with aliased methods
  • Efficient thread management with configurable wait times
  • Clean event collection and processing patterns

fluent/fluentd

lib/fluent/test/filter_test.rb

            
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/test/base'
require 'fluent/event'

module Fluent
  module Test
    class FilterTestDriver < TestDriver
      def initialize(klass, tag = 'filter.test', &block)
        super(klass, &block)
        @tag = tag
        @events = {}
        @filtered = MultiEventStream.new
      end

      attr_reader :filtered
      attr_accessor :tag

      def emit(record, time = EventTime.now)
        emit_with_tag(@tag, record, time)
      end
      alias_method :filter, :emit

      def emit_with_tag(tag, record, time = EventTime.now)
        @events[tag] ||= MultiEventStream.new
        @events[tag].add(time, record)
      end
      alias_method :filter_with_tag, :emit_with_tag

      def filter_stream(es)
        filter_stream_with_tag(@tag, es)
      end

      def filter_stream_with_tag(tag, es)
        @events[tag] = es
      end

      def filtered_as_array
        all = []
        @filtered.each { |time, record|
          all << [@tag, time, record]
        }
        all
      end
      alias_method :emits, :filtered_as_array # emits is for consistent with other drivers

      # Almost filters don't use threads so default is 0. It reduces test time.
      def run(num_waits = 0, &block)
        super(num_waits) {
          block.call if block

          @events.each { |tag, es|
            processed = @instance.filter_stream(tag, es)
            processed.each { |time, record|
              @filtered.add(time, record)
            }
          }
        }
        self
      end
    end
  end
end