Back to Repositories

Testing Plugin Parameter Compatibility System in Fluentd

This test suite validates the compatibility parameters functionality in Fluentd’s plugin system, focusing on parameter conversion and configuration handling for input, output, parser, and formatter plugins. It ensures proper handling of legacy parameters and their conversion to modern plugin configurations.

Test Coverage Overview

The test suite provides comprehensive coverage of Fluentd’s plugin parameter compatibility system.

Key areas tested include:
  • Output plugin buffer configuration with various chunk key settings
  • Input plugin parameter conversion for extract and parser functionality
  • Time format handling and injection across different plugin types
  • Parameter conversion for formatter and parser plugins

Implementation Analysis

The testing approach uses a series of dummy plugin implementations to verify parameter conversion behavior. It leverages Minitest’s structure with sub_test_cases to organize related tests logically.

Key patterns include:
  • Setup/teardown hooks for proper plugin lifecycle management
  • Mock plugin classes inheriting from Fluent::Plugin base classes
  • Configuration validation through helper methods

Technical Details

Testing infrastructure includes:
  • Minitest framework for test organization
  • Fluent::Test setup for plugin testing environment
  • Custom helper methods for parameter conversion
  • Platform-specific newline handling
  • Time parsing and formatting validation

Best Practices Demonstrated

The test suite exemplifies several testing best practices in the Fluentd ecosystem.

Notable practices include:
  • Proper plugin lifecycle management in setup/teardown
  • Comprehensive edge case coverage for different plugin types
  • Isolation of test cases using sub_test_case blocks
  • Thorough validation of configuration parameters

fluent/fluentd

test/plugin_helper/test_compat_parameters.rb

            
require_relative '../helper'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/time'

require 'time'

class CompatParameterTest < Test::Unit::TestCase
  setup do
    Fluent::Test.setup
    @i = nil
    @default_newline = if Fluent.windows?
                         "\r\n"
                       else
                         "\n"
                       end
  end

  teardown do
    if @i
      @i.stop unless @i.stopped?
      @i.before_shutdown unless @i.before_shutdown?
      @i.shutdown unless @i.shutdown?
      @i.after_shutdown unless @i.after_shutdown?
      @i.close unless @i.closed?
      @i.terminate unless @i.terminated?
    end
  end

  class DummyI0 < Fluent::Plugin::Input
    helpers :compat_parameters, :parser, :extract
    attr_reader :parser
    def configure(conf)
      compat_parameters_convert(conf, :extract, :parser)
      super
    end
    def start
      super
      @parser = parser_create
    end
    def produce_events(input_data)
      emit_events = [] # tag, time, record
      @parser.parse(input_data) do |time, record|
        tag = extract_tag_from_record(record) || 'dummy_tag'
        emit_events << [tag, time, record]
      end
      emit_events
    end
  end
  class DummyO0 < Fluent::Plugin::Output
    helpers :compat_parameters
    def configure(conf)
      compat_parameters_buffer(conf, default_chunk_key: '')
      super
    end
    def write(chunk); end # dummy
  end
  class DummyO1 < Fluent::Plugin::Output
    helpers :compat_parameters
    def configure(conf)
      compat_parameters_buffer(conf, default_chunk_key: 'time')
      super
    end
    def write(chunk); end # dummy
  end
  class DummyO2 < Fluent::Plugin::Output
    helpers :compat_parameters
    def configure(conf)
      compat_parameters_buffer(conf, default_chunk_key: 'time')
      super
    end
    def write(chunk); end # dummy
  end
  class DummyO3 < Fluent::Plugin::Output
    helpers :compat_parameters
    def configure(conf)
      compat_parameters_buffer(conf, default_chunk_key: 'tag')
      super
    end
    def write(chunk); end # dummy
  end
  class DummyO4 < Fluent::Plugin::Output
    helpers :compat_parameters, :inject, :formatter
    attr_reader :f
    def configure(conf)
      compat_parameters_convert(conf, :buffer, :inject, :formatter, default_chunk_key: 'tag')
      super
    end
    def start
      super
      @f = formatter_create()
    end
    def write(chunk); end # dummy
  end

  sub_test_case 'output plugins which does not have default chunk key' do
    test 'plugin helper converts parameters into plugin configuration parameters' do
      hash = {
        'num_threads' => 8,
        'flush_interval' => '10s',
        'buffer_chunk_limit' => '8m',
        'buffer_queue_limit' => '1024',
        'flush_at_shutdown' => 'yes',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO0.new
      @i.configure(conf)

      assert_equal 'memory', @i.buffer_config[:@type]
      assert_equal [], @i.buffer_config.chunk_keys
      assert_equal 8, @i.buffer_config.flush_thread_count
      assert_equal 10, @i.buffer_config.flush_interval
      assert_equal :default, @i.buffer_config.flush_mode
      assert @i.buffer_config.flush_at_shutdown

      assert_equal 8*1024*1024, @i.buffer.chunk_limit_size
      assert_equal 1024, @i.buffer.queue_limit_length
    end
  end

  sub_test_case 'output plugins which has default chunk key: time' do
    test 'plugin helper converts parameters into plugin configuration parameters' do
      hash = {
        'buffer_type' => 'file',
        'buffer_path' => '/tmp/mybuffer',
        'disable_retry_limit' => 'yes',
        'max_retry_wait' => '1h',
        'buffer_queue_full_action' => 'block',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO1.new
      @i.configure(conf)

      assert_equal 'file', @i.buffer_config[:@type]
      assert_equal 24*60*60, @i.buffer_config.timekey
      assert @i.buffer_config.retry_forever
      assert_equal 60*60, @i.buffer_config.retry_max_interval
      assert_equal :block, @i.buffer_config.overflow_action
      assert_equal :default, @i.buffer_config.flush_mode

      assert [email protected]_key_tag
      assert_equal [], @i.chunk_keys

      assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
    end
  end

  sub_test_case 'output plugins which does not have default chunk key' do
    test 'plugin helper converts parameters into plugin configuration parameters' do
      hash = {
        'buffer_type' => 'file',
        'buffer_path' => '/tmp/mybuffer',
        'time_slice_format' => '%Y%m%d%H',
        'time_slice_wait' => '10',
        'retry_limit' => '1024',
        'buffer_queue_full_action' => 'drop_oldest_chunk',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO2.new
      @i.configure(conf)

      assert_equal 'file', @i.buffer_config[:@type]
      assert_equal 60*60, @i.buffer_config.timekey
      assert_equal 10, @i.buffer_config.timekey_wait
      assert_equal 1024, @i.buffer_config.retry_max_times
      assert_equal :drop_oldest_chunk, @i.buffer_config.overflow_action

      assert @i.chunk_key_time
      assert [email protected]_key_tag
      assert_equal [], @i.chunk_keys

      assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
    end
  end

  sub_test_case 'output plugins which has default chunk key: tag' do
    test 'plugin helper converts parameters into plugin configuration parameters' do
      hash = {
        'buffer_type' => 'memory',
        'num_threads' => '10',
        'flush_interval' => '10s',
        'try_flush_interval' => '0.1',
        'queued_chunk_flush_interval' => '0.5',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO3.new
      @i.configure(conf)

      assert_equal 'memory', @i.buffer_config[:@type]
      assert_equal 10, @i.buffer_config.flush_thread_count
      assert_equal 10, @i.buffer_config.flush_interval
      assert_equal 0.1, @i.buffer_config.flush_thread_interval
      assert_equal 0.5, @i.buffer_config.flush_thread_burst_interval

      assert [email protected]_key_time
      assert @i.chunk_key_tag
      assert_equal [], @i.chunk_keys
    end
  end

  sub_test_case 'output plugins which has default chunk key: tag, and enables inject and formatter' do
    test 'plugin helper converts parameters into plugin configuration parameters for all of buffer, inject and formatter' do
      hash = {
        'buffer_type' => 'file',
        'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__),
        'num_threads' => '10',
        'format' => 'ltsv',
        'delimiter' => ',',
        'label_delimiter' => '%',
        'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime
        'include_tag_key' => 'yes', # default tag_key 'tag'
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO4.new
      @i.configure(conf)
      @i.start
      @i.after_start

      assert_equal 'file', @i.buffer_config[:@type]
      assert_equal 10, @i.buffer_config.flush_thread_count
      formatter = @i.f
      assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter }
      assert_equal ',', @i.f.delimiter
      assert_equal '%', @i.f.label_delimiter

      assert [email protected]_key_time
      assert @i.chunk_key_tag
      assert_equal [], @i.chunk_keys

      t = event_time('2016-06-24 16:05:01') # localtime
      iso8601str = Time.at(t.to_i).iso8601
      formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1}))
      assert_equal "value%1,tag%tag.test,time%#{iso8601str}#{@default_newline}", formatted
    end

    test 'plugin helper setups time injecting as unix time (integer from epoch)' do
      hash = {
        'buffer_type' => 'file',
        'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__),
        'num_threads' => '10',
        'format' => 'ltsv',
        'delimiter' => ',',
        'label_delimiter' => '%',
        'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime
        'include_tag_key' => 'yes', # default tag_key 'tag'
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyO4.new
      @i.configure(conf)
      @i.start
      @i.after_start

      assert_equal 'file', @i.buffer_config[:@type]
      assert_equal 10, @i.buffer_config.flush_thread_count
      formatter = @i.f
      assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter }
      assert_equal ',', @i.f.delimiter
      assert_equal '%', @i.f.label_delimiter

      assert [email protected]_key_time
      assert @i.chunk_key_tag
      assert_equal [], @i.chunk_keys

      t = event_time('2016-06-24 16:05:01') # localtime
      iso8601str = Time.at(t.to_i).iso8601
      formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1}))
      assert_equal "value%1,tag%tag.test,time%#{iso8601str}#{@default_newline}", formatted
    end
  end

  sub_test_case 'input plugins' do
    test 'plugin helper converts parameters into plugin configuration parameters for extract and parser' do
      hash = {
        'format' => 'ltsv',
        'delimiter' => ',',
        'label_delimiter' => '%',
        'tag_key' => 't2',
        'time_key' => 't',
        'time_format' => '%Y-%m-%d.%H:%M:%S.%N',
        'utc' => 'yes',
        'types' => 'A integer|B string|C bool',
        'types_delimiter' => '|',
        'types_label_delimiter' => ' ',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyI0.new
      @i.configure(conf)
      @i.start
      @i.after_start

      parser = @i.parser
      assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser }
      assert_equal ',', parser.delimiter
      assert_equal '%', parser.label_delimiter

      events = @i.produce_events("A%1,B%x,C%true,t2%mytag,t%2016-10-20.03:50:11.987654321")
      assert_equal 1, events.size

      tag, time, record = events.first
      assert_equal 'mytag', tag
      assert_equal_event_time event_time("2016-10-20 03:50:11.987654321 +0000"), time
      assert_equal 3, record.keys.size
      assert_equal ['A','B','C'], record.keys.sort
      assert_equal 1, record['A']
      assert_equal 'x', record['B']
      assert_equal true, record['C']
    end

    test 'plugin helper converts parameters into plugin configuration parameters for extract and parser, using numeric time' do
      hash = {
        'format' => 'ltsv',
        'delimiter' => ',',
        'label_delimiter' => '%',
        'tag_key' => 't2',
        'time_key' => 't',
        'time_type' => 'float',
        'localtime' => 'yes',
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyI0.new
      @i.configure(conf)
      @i.start
      @i.after_start

      parser = @i.parser
      assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser }
      assert_equal ',', parser.delimiter
      assert_equal '%', parser.label_delimiter
    end

    test 'plugin helper setups time extraction as unix time (integer from epoch)' do
      # TODO:
    end
  end

  sub_test_case 'parser plugins' do
    test 'syslog parser parameters' do
      hash = {
        'format' => 'syslog',
        'message_format' => 'rfc5424',
        'with_priority' => 'true',
        'rfc5424_time_format' => '%Y'
      }
      conf = config_element('ROOT', '', hash)
      @i = DummyI0.new
      @i.configure(conf)
      @i.start
      @i.after_start

      parser = @i.parser
      assert_kind_of(Fluent::Plugin::SyslogParser, parser)
      assert_equal :rfc5424, parser.message_format
      assert_equal true, parser.with_priority
      assert_equal '%Y', parser.rfc5424_time_format
    end
  end
end