Testing Plugin Parameter Compatibility System in Fluentd
This test suite validates the compatibility parameters functionality in Fluentd’s plugin system, focusing on parameter conversion and configuration handling for input, output, parser, and formatter plugins. It ensures proper handling of legacy parameters and their conversion to modern plugin configurations.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/plugin_helper/test_compat_parameters.rb
require_relative '../helper'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/time'
require 'time'
class CompatParameterTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
@i = nil
@default_newline = if Fluent.windows?
"\r\n"
else
"\n"
end
end
teardown do
if @i
@i.stop unless @i.stopped?
@i.before_shutdown unless @i.before_shutdown?
@i.shutdown unless @i.shutdown?
@i.after_shutdown unless @i.after_shutdown?
@i.close unless @i.closed?
@i.terminate unless @i.terminated?
end
end
class DummyI0 < Fluent::Plugin::Input
helpers :compat_parameters, :parser, :extract
attr_reader :parser
def configure(conf)
compat_parameters_convert(conf, :extract, :parser)
super
end
def start
super
@parser = parser_create
end
def produce_events(input_data)
emit_events = [] # tag, time, record
@parser.parse(input_data) do |time, record|
tag = extract_tag_from_record(record) || 'dummy_tag'
emit_events << [tag, time, record]
end
emit_events
end
end
class DummyO0 < Fluent::Plugin::Output
helpers :compat_parameters
def configure(conf)
compat_parameters_buffer(conf, default_chunk_key: '')
super
end
def write(chunk); end # dummy
end
class DummyO1 < Fluent::Plugin::Output
helpers :compat_parameters
def configure(conf)
compat_parameters_buffer(conf, default_chunk_key: 'time')
super
end
def write(chunk); end # dummy
end
class DummyO2 < Fluent::Plugin::Output
helpers :compat_parameters
def configure(conf)
compat_parameters_buffer(conf, default_chunk_key: 'time')
super
end
def write(chunk); end # dummy
end
class DummyO3 < Fluent::Plugin::Output
helpers :compat_parameters
def configure(conf)
compat_parameters_buffer(conf, default_chunk_key: 'tag')
super
end
def write(chunk); end # dummy
end
class DummyO4 < Fluent::Plugin::Output
helpers :compat_parameters, :inject, :formatter
attr_reader :f
def configure(conf)
compat_parameters_convert(conf, :buffer, :inject, :formatter, default_chunk_key: 'tag')
super
end
def start
super
@f = formatter_create()
end
def write(chunk); end # dummy
end
sub_test_case 'output plugins which does not have default chunk key' do
test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'num_threads' => 8,
'flush_interval' => '10s',
'buffer_chunk_limit' => '8m',
'buffer_queue_limit' => '1024',
'flush_at_shutdown' => 'yes',
}
conf = config_element('ROOT', '', hash)
@i = DummyO0.new
@i.configure(conf)
assert_equal 'memory', @i.buffer_config[:@type]
assert_equal [], @i.buffer_config.chunk_keys
assert_equal 8, @i.buffer_config.flush_thread_count
assert_equal 10, @i.buffer_config.flush_interval
assert_equal :default, @i.buffer_config.flush_mode
assert @i.buffer_config.flush_at_shutdown
assert_equal 8*1024*1024, @i.buffer.chunk_limit_size
assert_equal 1024, @i.buffer.queue_limit_length
end
end
sub_test_case 'output plugins which has default chunk key: time' do
test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'disable_retry_limit' => 'yes',
'max_retry_wait' => '1h',
'buffer_queue_full_action' => 'block',
}
conf = config_element('ROOT', '', hash)
@i = DummyO1.new
@i.configure(conf)
assert_equal 'file', @i.buffer_config[:@type]
assert_equal 24*60*60, @i.buffer_config.timekey
assert @i.buffer_config.retry_forever
assert_equal 60*60, @i.buffer_config.retry_max_interval
assert_equal :block, @i.buffer_config.overflow_action
assert_equal :default, @i.buffer_config.flush_mode
assert [email protected]_key_tag
assert_equal [], @i.chunk_keys
assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end
sub_test_case 'output plugins which does not have default chunk key' do
test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'time_slice_format' => '%Y%m%d%H',
'time_slice_wait' => '10',
'retry_limit' => '1024',
'buffer_queue_full_action' => 'drop_oldest_chunk',
}
conf = config_element('ROOT', '', hash)
@i = DummyO2.new
@i.configure(conf)
assert_equal 'file', @i.buffer_config[:@type]
assert_equal 60*60, @i.buffer_config.timekey
assert_equal 10, @i.buffer_config.timekey_wait
assert_equal 1024, @i.buffer_config.retry_max_times
assert_equal :drop_oldest_chunk, @i.buffer_config.overflow_action
assert @i.chunk_key_time
assert [email protected]_key_tag
assert_equal [], @i.chunk_keys
assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end
sub_test_case 'output plugins which has default chunk key: tag' do
test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'memory',
'num_threads' => '10',
'flush_interval' => '10s',
'try_flush_interval' => '0.1',
'queued_chunk_flush_interval' => '0.5',
}
conf = config_element('ROOT', '', hash)
@i = DummyO3.new
@i.configure(conf)
assert_equal 'memory', @i.buffer_config[:@type]
assert_equal 10, @i.buffer_config.flush_thread_count
assert_equal 10, @i.buffer_config.flush_interval
assert_equal 0.1, @i.buffer_config.flush_thread_interval
assert_equal 0.5, @i.buffer_config.flush_thread_burst_interval
assert [email protected]_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
end
end
sub_test_case 'output plugins which has default chunk key: tag, and enables inject and formatter' do
test 'plugin helper converts parameters into plugin configuration parameters for all of buffer, inject and formatter' do
hash = {
'buffer_type' => 'file',
'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__),
'num_threads' => '10',
'format' => 'ltsv',
'delimiter' => ',',
'label_delimiter' => '%',
'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime
'include_tag_key' => 'yes', # default tag_key 'tag'
}
conf = config_element('ROOT', '', hash)
@i = DummyO4.new
@i.configure(conf)
@i.start
@i.after_start
assert_equal 'file', @i.buffer_config[:@type]
assert_equal 10, @i.buffer_config.flush_thread_count
formatter = @i.f
assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter }
assert_equal ',', @i.f.delimiter
assert_equal '%', @i.f.label_delimiter
assert [email protected]_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
t = event_time('2016-06-24 16:05:01') # localtime
iso8601str = Time.at(t.to_i).iso8601
formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1}))
assert_equal "value%1,tag%tag.test,time%#{iso8601str}#{@default_newline}", formatted
end
test 'plugin helper setups time injecting as unix time (integer from epoch)' do
hash = {
'buffer_type' => 'file',
'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__),
'num_threads' => '10',
'format' => 'ltsv',
'delimiter' => ',',
'label_delimiter' => '%',
'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime
'include_tag_key' => 'yes', # default tag_key 'tag'
}
conf = config_element('ROOT', '', hash)
@i = DummyO4.new
@i.configure(conf)
@i.start
@i.after_start
assert_equal 'file', @i.buffer_config[:@type]
assert_equal 10, @i.buffer_config.flush_thread_count
formatter = @i.f
assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter }
assert_equal ',', @i.f.delimiter
assert_equal '%', @i.f.label_delimiter
assert [email protected]_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
t = event_time('2016-06-24 16:05:01') # localtime
iso8601str = Time.at(t.to_i).iso8601
formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1}))
assert_equal "value%1,tag%tag.test,time%#{iso8601str}#{@default_newline}", formatted
end
end
sub_test_case 'input plugins' do
test 'plugin helper converts parameters into plugin configuration parameters for extract and parser' do
hash = {
'format' => 'ltsv',
'delimiter' => ',',
'label_delimiter' => '%',
'tag_key' => 't2',
'time_key' => 't',
'time_format' => '%Y-%m-%d.%H:%M:%S.%N',
'utc' => 'yes',
'types' => 'A integer|B string|C bool',
'types_delimiter' => '|',
'types_label_delimiter' => ' ',
}
conf = config_element('ROOT', '', hash)
@i = DummyI0.new
@i.configure(conf)
@i.start
@i.after_start
parser = @i.parser
assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser }
assert_equal ',', parser.delimiter
assert_equal '%', parser.label_delimiter
events = @i.produce_events("A%1,B%x,C%true,t2%mytag,t%2016-10-20.03:50:11.987654321")
assert_equal 1, events.size
tag, time, record = events.first
assert_equal 'mytag', tag
assert_equal_event_time event_time("2016-10-20 03:50:11.987654321 +0000"), time
assert_equal 3, record.keys.size
assert_equal ['A','B','C'], record.keys.sort
assert_equal 1, record['A']
assert_equal 'x', record['B']
assert_equal true, record['C']
end
test 'plugin helper converts parameters into plugin configuration parameters for extract and parser, using numeric time' do
hash = {
'format' => 'ltsv',
'delimiter' => ',',
'label_delimiter' => '%',
'tag_key' => 't2',
'time_key' => 't',
'time_type' => 'float',
'localtime' => 'yes',
}
conf = config_element('ROOT', '', hash)
@i = DummyI0.new
@i.configure(conf)
@i.start
@i.after_start
parser = @i.parser
assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser }
assert_equal ',', parser.delimiter
assert_equal '%', parser.label_delimiter
end
test 'plugin helper setups time extraction as unix time (integer from epoch)' do
# TODO:
end
end
sub_test_case 'parser plugins' do
test 'syslog parser parameters' do
hash = {
'format' => 'syslog',
'message_format' => 'rfc5424',
'with_priority' => 'true',
'rfc5424_time_format' => '%Y'
}
conf = config_element('ROOT', '', hash)
@i = DummyI0.new
@i.configure(conf)
@i.start
@i.after_start
parser = @i.parser
assert_kind_of(Fluent::Plugin::SyslogParser, parser)
assert_equal :rfc5424, parser.message_format
assert_equal true, parser.with_priority
assert_equal '%Y', parser.rfc5424_time_format
end
end
end