Back to Repositories

Testing ExecFilter Output Plugin Implementation in Fluentd

This test suite validates the ExecFilterOutput plugin functionality in Fluentd, focusing on command execution, data transformation, and process management. It verifies the plugin’s ability to handle input/output formatting, time parsing, and child process management for data filtering operations.

Test Coverage Overview

The test suite provides comprehensive coverage of ExecFilterOutput plugin functionality, including:

  • Configuration validation for different format types
  • Event emission with TSV format and time handling
  • Command execution with grep and sed
  • JSON format parsing and processing
  • Child process management and round-robin distribution
  • Process respawning and error handling

Implementation Analysis

The testing approach uses Minitest framework with Fluent::Test::Driver::Output for plugin testing. It implements both traditional and section-based configuration patterns, validating the plugin’s backward compatibility and modern features. The tests systematically verify data transformation, process management, and error handling scenarios.

Technical Details

  • Testing Framework: Minitest with Fluent::Test
  • Key Components: ExecFilterOutput plugin, Output driver
  • Configuration Types: Traditional and modern section-based
  • Process Management: Multi-process handling with respawn capability
  • Data Formats: TSV, JSON with time formatting

Best Practices Demonstrated

The test suite exemplifies robust testing practices including:

  • Comprehensive configuration testing
  • Multiple format and parsing scenarios
  • Edge case handling for process management
  • Time format and timezone handling
  • Error recovery and process respawning validation

fluent/fluentd

test/plugin/test_out_exec_filter.rb

            
require_relative '../helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_exec_filter'
require 'fileutils'

class ExecFilterOutputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
  end

  CONFIG = %[
    command cat
    num_children 3
    <inject>
      tag_key     tag
      time_key    time_in
      time_type   string
      time_format %Y-%m-%d %H:%M:%S
    </inject>
    <format>
      keys ["time_in", "tag", "k1"]
    </format>
    <parse>
      keys ["time_out", "tag", "k2"]
    </parse>
    <extract>
      tag_key     tag
      time_key    time_out
      time_type   string
      time_format %Y-%m-%d %H:%M:%S
    </extract>
  ]

  CONFIG_COMPAT = %[
    command cat
    in_keys time_in,tag,k1
    out_keys time_out,tag,k2
    tag_key tag
    in_time_key time_in
    out_time_key time_out
    time_format %Y-%m-%d %H:%M:%S
    localtime
    num_children 3
  ]

  def create_driver(conf)
    Fluent::Test::Driver::Output.new(Fluent::Plugin::ExecFilterOutput).configure(conf)
  end

  SED_SUPPORT_UNBUFFERED_OPTION = ->(){
    system("echo xxx | sed --unbuffered -l -e 's/x/y/g' >#{IO::NULL} 2>&1")
    $?.success?
  }.call
  SED_UNBUFFERED_OPTION = SED_SUPPORT_UNBUFFERED_OPTION ? '--unbuffered' : ''

  data(
    'with sections' => CONFIG,
    'traditional' => CONFIG_COMPAT,
  )
  test 'configure' do |conf|
    d = create_driver(conf)

    assert_false d.instance.parser.estimate_current_event

    assert_equal ["time_in","tag","k1"], d.instance.formatter.keys
    assert_equal ["time_out","tag","k2"], d.instance.parser.keys
    assert_equal "tag", d.instance.inject_config.tag_key
    assert_equal "tag", d.instance.extract_config.tag_key
    assert_equal "time_in", d.instance.inject_config.time_key
    assert_equal "time_out", d.instance.extract_config.time_key
    assert_equal "%Y-%m-%d %H:%M:%S", d.instance.inject_config.time_format
    assert_equal "%Y-%m-%d %H:%M:%S", d.instance.extract_config.time_format
    assert_equal true, d.instance.inject_config.localtime
    assert_equal 3, d.instance.num_children

    d = create_driver %[
      command sed -l -e s/foo/bar/
      in_keys time,k1
      out_keys time,k2
      tag xxx
      time_key time
      num_children 3
    ]
    assert_equal "sed -l -e s/foo/bar/", d.instance.command

    d = create_driver(conf + %[
      remove_prefix before
      add_prefix after
    ])
    assert_equal "before", d.instance.remove_prefix
    assert_equal "after" , d.instance.add_prefix
  end

  data(
    'with sections' => CONFIG,
    'traditional' => CONFIG_COMPAT,
  )
  test 'emit events with TSV format' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15")

    d.run(default_tag: 'test', expect_emits: 2, timeout: 10) do
      # sleep 0.1 until d.instance.children && !d.instance.children.empty? && d.instance.children.all?{|c| c.finished == false }
      d.feed(time, {"k1"=>1})
      d.feed(time, {"k1"=>2})
    end

    assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[0]
    assert_equal "2011-01-02 13:14:15\ttest\t2\n", d.formatted[1]

    events = d.events
    assert_equal 2, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["test", time, {"k2"=>"1"}], events[0]
    assert_equal_event_time time, events[1][1]
    assert_equal ["test", time, {"k2"=>"2"}], events[1]
  end

  CONFIG_WITHOUT_TIME_FORMAT = %[
    command cat
    num_children 3
    tag xxx
    <inject>
      time_key time
      time_type unixtime
    </inject>
    <format>
      keys time,k1
    </format>
    <parse>
      keys time,k2
      time_key time
      time_type unixtime
    </parse>
  ]
  CONFIG_WITHOUT_TIME_FORMAT_COMPAT = %[
    command cat
    in_keys time,k1
    out_keys time,k2
    tag xxx
    time_key time
    num_children 3
  ]

  data(
    'with sections' => CONFIG_WITHOUT_TIME_FORMAT,
    'traditional' => CONFIG_WITHOUT_TIME_FORMAT_COMPAT,
  )
  test 'emit events without time format configuration' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15 +0900")

    d.run(default_tag: 'test', expect_emits: 2, timeout: 10) do
      d.feed(time, {"k1"=>1})
      d.feed(time, {"k1"=>2})
    end

    assert_equal "1293941655\t1\n", d.formatted[0]
    assert_equal "1293941655\t2\n", d.formatted[1]

    events = d.events
    assert_equal 2, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["xxx", time, {"k2"=>"1"}], events[0]
    assert_equal_event_time time, events[1][1]
    assert_equal ["xxx", time, {"k2"=>"2"}], events[1]
  end

  CONFIG_TO_DO_GREP = %[
    command grep --line-buffered -v poo
    num_children 3
    tag xxx
    <inject>
      time_key time
      time_type unixtime
    </inject>
    <format>
      keys time, val1
    </format>
    <parse>
      keys time, val2
      time_key time
      time_type unixtime
    </parse>
  ]
  CONFIG_TO_DO_GREP_COMPAT = %[
    command grep --line-buffered -v poo
    in_keys time,val1
    out_keys time,val2
    tag xxx
    time_key time
    num_children 3
  ]

  data(
    'with sections' => CONFIG_TO_DO_GREP,
    'traditional' => CONFIG_TO_DO_GREP_COMPAT,
  )
  test 'emit events through grep command' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15 +0900")

    d.run(default_tag: 'test', expect_emits: 1, timeout: 10) do
      d.feed(time, {"val1"=>"sed-ed value poo"})
      d.feed(time, {"val1"=>"sed-ed value foo"})
    end

    assert_equal "1293941655\tsed-ed value poo\n", d.formatted[0]
    assert_equal "1293941655\tsed-ed value foo\n", d.formatted[1]

    events = d.events
    assert_equal 1, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["xxx", time, {"val2"=>"sed-ed value foo"}], events[0]
  end

  CONFIG_TO_DO_SED = %[
    command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/
    num_children 3
    tag xxx
    <inject>
      time_key time
      time_type unixtime
    </inject>
    <format>
      keys time, val1
    </format>
    <parse>
      keys time, val2
      time_key time
      time_type unixtime
    </parse>
  ]
  CONFIG_TO_DO_SED_COMPAT = %[
    command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/
    in_keys time,val1
    out_keys time,val2
    tag xxx
    time_key time
    num_children 3
  ]

  data(
    'with sections' => CONFIG_TO_DO_SED,
    'traditional' => CONFIG_TO_DO_SED_COMPAT,
  )
  test 'emit events through sed command' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15 +0900")

    d.run(default_tag: 'test', expect_emits: 1, timeout: 10) do
      d.feed(time, {"val1"=>"sed-ed value poo"})
      d.feed(time, {"val1"=>"sed-ed value foo"})
    end

    assert_equal "1293941655\tsed-ed value poo\n", d.formatted[0]
    assert_equal "1293941655\tsed-ed value foo\n", d.formatted[1]

    events = d.events
    assert_equal 2, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["xxx", time, {"val2"=>"sed-ed value poo"}], events[0]
    assert_equal_event_time time, events[1][1]
    assert_equal ["xxx", time, {"val2"=>"sed-ed value bar"}], events[1]
  end

  CONFIG_TO_DO_SED_WITH_TAG_MODIFY = %[
    command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/
    num_children 3
    remove_prefix input
    add_prefix output
    <inject>
      tag_key tag
      time_key time
    </inject>
    <format>
      keys tag, time, val1
    </format>
    <parse>
      keys tag, time, val2
    </parse>
    <extract>
      tag_key tag
      time_key time
    </extract>
  ]
  CONFIG_TO_DO_SED_WITH_TAG_MODIFY_COMPAT = %[
    command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/
    in_keys tag,time,val1
    remove_prefix input
    out_keys tag,time,val2
    add_prefix output
    tag_key tag
    time_key time
    num_children 3
  ]

  data(
    'with sections' => CONFIG_TO_DO_SED_WITH_TAG_MODIFY,
    'traditional' => CONFIG_TO_DO_SED_WITH_TAG_MODIFY_COMPAT,
  )
  test 'emit events with add/remove tag prefix' do |conf|
    d = create_driver(conf)

    time = event_time("2011-01-02 13:14:15 +0900")

    d.run(default_tag: 'input.test', expect_emits: 2, timeout: 10) do
      d.feed(time, {"val1"=>"sed-ed value foo"})
      d.feed(time, {"val1"=>"sed-ed value poo"})
    end

    assert_equal "test\t1293941655\tsed-ed value foo\n", d.formatted[0]
    assert_equal "test\t1293941655\tsed-ed value poo\n", d.formatted[1]

    events = d.events
    assert_equal 2, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["output.test", time, {"val2"=>"sed-ed value bar"}], events[0]
    assert_equal_event_time time, events[1][1]
    assert_equal ["output.test", time, {"val2"=>"sed-ed value poo"}], events[1]
  end

  CONFIG_JSON = %[
    command cat
    <format>
      @type tsv
      keys message
    </format>
    <parse>
      @type json
      stream_buffer_size 1
    </parse>
    <extract>
      tag_key tag
      time_key time
    </extract>
  ]
  CONFIG_JSON_COMPAT = %[
    command cat
    in_keys message
    out_format json
    out_stream_buffer_size 1
    time_key time
    tag_key tag
  ]

  data(
    'with sections' => CONFIG_JSON,
    'traditional' => CONFIG_JSON_COMPAT,
  )
  test 'using json format' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15 +0900")

    d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do
      i = d.instance
      assert{ i.router }
      d.feed(time, {"message"=>%[{"time":#{time},"tag":"t1","k1":"v1"}]})
    end

    assert_equal '{"time":1293941655,"tag":"t1","k1":"v1"}' + "\n", d.formatted[0]

    events = d.events
    assert_equal 1, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["t1", time, {"k1"=>"v1"}], events[0]
  end

  CONFIG_JSON_WITH_FLOAT_TIME = %[
    command cat
    <format>
      @type tsv
      keys message
    </format>
    <parse>
      @type json
      stream_buffer_size 1
    </parse>
    <extract>
      tag_key tag
      time_key time
    </extract>
  ]
  CONFIG_JSON_WITH_FLOAT_TIME_COMPAT = %[
    command cat
    in_keys message
    out_format json
    out_stream_buffer_size 1
    time_key time
    tag_key tag
  ]

  data(
    'with sections' => CONFIG_JSON_WITH_FLOAT_TIME,
    'traditional' => CONFIG_JSON_WITH_FLOAT_TIME_COMPAT,
  )
  test 'using json format with float time' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15.123 +0900")

    d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do
      d.feed(time + 10, {"message"=>%[{"time":#{time.sec}.#{time.nsec},"tag":"t1","k1":"v1"}]})
    end

    assert_equal '{"time":1293941655.123000000,"tag":"t1","k1":"v1"}' + "\n", d.formatted[0]

    events = d.events
    assert_equal 1, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["t1", time, {"k1"=>"v1"}], events[0]
  end

  CONFIG_JSON_WITH_TIME_FORMAT = %[
    command cat
    <format>
      @type tsv
      keys message
    </format>
    <parse>
      @type json
      stream_buffer_size 1
    </parse>
    <extract>
      tag_key tag
      time_key time
      time_type string
      time_format %d/%b/%Y %H:%M:%S.%N %z
    </extract>
  ]
  CONFIG_JSON_WITH_TIME_FORMAT_COMPAT = %[
    command cat
    in_keys message
    out_format json
    out_stream_buffer_size 1
    time_key time
    time_format %d/%b/%Y %H:%M:%S.%N %z
    tag_key tag
  ]

  data(
    'with sections' => CONFIG_JSON_WITH_TIME_FORMAT,
    'traditional' => CONFIG_JSON_WITH_TIME_FORMAT_COMPAT,
  )
  test 'using json format with custom time format' do |conf|
    d = create_driver(conf)
    time_str = "28/Feb/2013 12:00:00.123456789 +0900"
    time = event_time(time_str, format: "%d/%b/%Y %H:%M:%S.%N %z")

    d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do
      d.feed(time + 10, {"message"=>%[{"time":"#{time_str}","tag":"t1","k1":"v1"}]})
    end

    assert_equal '{"time":"28/Feb/2013 12:00:00.123456789 +0900","tag":"t1","k1":"v1"}' + "\n", d.formatted[0]

    events = d.events
    assert_equal 1, events.length
    assert_equal_event_time time, events[0][1]
    assert_equal ["t1", time, {"k1"=>"v1"}], events[0]
  end

  CONFIG_ROUND_ROBIN = %[
    command ruby -e 'STDOUT.sync = true; STDIN.each_line{|line| puts line.chomp + "\t" + Process.pid.to_s }'
    num_children 2
    <inject>
      tag_key     tag
      time_key    time_in
      time_type   string
      time_format %Y-%m-%d %H:%M:%S
    </inject>
    <format>
      keys ["time_in", "tag", "k1"]
    </format>
    <parse>
      keys ["time_out", "tag", "k2", "child_pid"]
    </parse>
    <extract>
      tag_key     tag
      time_key    time_out
      time_type   string
      time_format %Y-%m-%d %H:%M:%S
    </extract>
  ]
  CONFIG_ROUND_ROBIN_COMPAT = %[
    command ruby -e 'STDOUT.sync = true; STDIN.each_line{|line| puts line.chomp + "\t" + Process.pid.to_s }'
    in_keys time_in,tag,k1
    out_keys time_out,tag,k2,child_pid
    tag_key tag
    in_time_key time_in
    out_time_key time_out
    time_format %Y-%m-%d %H:%M:%S
    localtime
    num_children 2
  ]

  data(
    'with sections' => CONFIG_ROUND_ROBIN,
    'traditional' => CONFIG_ROUND_ROBIN_COMPAT,
  )
  test 'using child processes by round robin' do |conf|
    d = create_driver(conf)
    time = event_time('2011-01-02 13:14:15')

    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: false){ d.feed(time, {"k1" => 0}) }
    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 1}) }
    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 2}) }
    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 3}) }

    assert_equal "2011-01-02 13:14:15\ttest\t0\n", d.formatted[0]
    assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[1]
    assert_equal "2011-01-02 13:14:15\ttest\t2\n", d.formatted[2]
    assert_equal "2011-01-02 13:14:15\ttest\t3\n", d.formatted[3]

    events = d.events
    assert_equal 4, events.length

    pid_list = []
    events.each do |event|
      pid = event[2]['child_pid']
      pid_list << pid unless pid_list.include?(pid)
    end
    assert_equal 2, pid_list.size, "the number of pids should be same with number of child processes: #{pid_list.inspect}"

    assert_equal pid_list[0], events[0][2]['child_pid']
    assert_equal pid_list[1], events[1][2]['child_pid']
    assert_equal pid_list[0], events[2][2]['child_pid']
    assert_equal pid_list[1], events[3][2]['child_pid']

  ensure
    d.run(start: false, shutdown: true)
  end

  # child process exits per 3 lines
  CONFIG_RESPAWN = %[
    command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; proc.call; proc.call; proc.call'
    num_children 2
    child_respawn -1
    <inject>
      tag_key   tag
      time_key  time_in
      time_type unixtime
    </inject>
    <format>
      keys ["time_in", "tag", "k1"]
    </format>
    <parse>
      keys ["time_out", "tag", "k2", "child_pid"]
    </parse>
    <extract>
      tag_key   tag
      time_key  time_out
      time_type unixtime
    </extract>
  ]

  CONFIG_RESPAWN_COMPAT = %[
    command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; proc.call; proc.call; proc.call'
    num_children 2
    child_respawn -1
    in_keys time_in,tag,k1
    out_keys time_out,tag,k2,child_pid
    tag_key tag
    in_time_key time_in
    out_time_key time_out
#    time_format %Y-%m-%d %H:%M:%S
#    localtime
  ]

  data(
    'with sections' => CONFIG_RESPAWN,
    'traditional' => CONFIG_RESPAWN_COMPAT,
  )
  test 'emit events via child processes which exits sometimes' do |conf|
    d = create_driver(conf)
    time = event_time("2011-01-02 13:14:15")
    countup = 0

    d.run(start: true, shutdown: false)
    assert_equal 2, d.instance.instance_eval{ @_child_process_processes.size }

    2.times do
      d.run(default_tag: 'test', expect_emits: 3, timeout: 3, force_flush_retry: true, start: false, shutdown: false) do
        d.feed(time, { "k1" => countup }); countup += 1
        d.feed(time, { "k1" => countup }); countup += 1
        d.feed(time, { "k1" => countup }); countup += 1
      end
    end

    events = d.events
    assert_equal 6, events.length

    pid_list = []
    events.each do |event|
      pid = event[2]['child_pid']
      pid_list << pid unless pid_list.include?(pid)
    end

    # the number of pids should be same with number of child processes
    assert_equal 2, pid_list.size
    logs = d.instance.log.out.logs
    assert_equal 2, logs.count { |l| l.include?('child process exits with error code') }
    assert_equal 2, logs.count { |l| l.include?('respawning child process') }

  ensure
    d.run(start: false, shutdown: true)
  end
end