Back to Repositories

Testing Copy Output Plugin Implementation in Fluentd

This test suite validates the functionality of Fluentd’s copy output plugin, which enables event stream replication across multiple output destinations. The tests ensure proper configuration, event handling, and error management capabilities of the copy mechanism.

Test Coverage Overview

The test suite provides comprehensive coverage of the copy output plugin’s core features.

  • Configuration validation for multiple store outputs
  • Event stream handling with different copy modes (no_copy, shallow, marshal, deep)
  • Error handling with ignore_error and ignore_if_prev_success options
  • Event stream processing across different types (OneEventStream, ArrayEventStream, MultiEventStream)

Implementation Analysis

The testing approach utilizes Ruby’s Test::Unit framework with FlexMock for mocking capabilities. Tests follow a structured pattern of setup, execution, and verification, with particular attention to event stream manipulation and copy semantics.

Key implementation patterns include:
  • Driver-based test configuration
  • Event stream generation and manipulation
  • Custom method overrides for error testing
  • Flexible configuration testing

Technical Details

Testing infrastructure includes:
  • Fluent::Test::Driver::MultiOutput for output testing
  • FlexMock for test doubles and expectations
  • Custom test output plugins (TestOutput, Test2Output)
  • Event time handling utilities
  • Multiple event stream implementations

Best Practices Demonstrated

The test suite exemplifies several testing best practices in Ruby and Fluentd plugin development.

  • Comprehensive configuration testing
  • Isolation of test cases
  • Proper setup and teardown procedures
  • Edge case handling for different event stream types
  • Systematic error scenario testing

fluent/fluentd

test/plugin/test_out_copy.rb

            
require_relative '../helper'
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
require 'fluent/event'
require 'flexmock/test_unit'

class CopyOutputTest < Test::Unit::TestCase
  include FlexMock::TestCase

  class << self
    def startup
      $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
      require 'fluent/plugin/out_test'
      require 'fluent/plugin/out_test2'
    end

    def shutdown
      $LOAD_PATH.shift
    end
  end

  def setup
    Fluent::Test.setup
  end

  CONFIG = %[
    <store>
      @type test
      name c0
    </store>
    <store>
      @type test2
      name c1
    </store>
    <store>
      @type test
      name c2
    </store>
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(conf)
  end

  def test_configure
    d = create_driver

    outputs = d.instance.outputs
    assert_equal 3, outputs.size
    assert_equal Fluent::Plugin::TestOutput, outputs[0].class
    assert_equal Fluent::Plugin::Test2Output, outputs[1].class
    assert_equal Fluent::Plugin::TestOutput, outputs[2].class
    assert_equal "c0", outputs[0].name
    assert_equal "c1", outputs[1].name
    assert_equal "c2", outputs[2].name
    assert_false d.instance.deep_copy
    assert_equal :no_copy, d.instance.copy_mode
  end

  ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
    <store ignore_if_prev_success ignore_error>
      @type test
      name c0
    </store>
    <store ignore_if_prev_success ignore_error>
      @type test
      name c1
    </store>
    <store ignore_if_prev_success>
      @type test
      name c2
    </store>
  ]
  def test_configure_with_errorneus_ignore_if_prev_success
    assert_raise(Fluent::ConfigError) do
      create_driver(ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG)
    end
  end

  ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
    @log_level info
    <store ignore_error>
      @type test
      name c0
    </store>
    <store ignore_error>
      @type test
      name c1
    </store>
    <store ignore_error>
      @type test
      name c2
    </store>
  ]
  def test_configure_all_ignore_errors_without_ignore_if_prev_success
    d = create_driver(ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG)
    expected = /ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified./
    matches = d.logs.grep(expected)
    assert_equal(1, matches.length, "Logs do not contain '#{expected}' '#{d.logs}'")
  end

  def test_configure_with_deep_copy_and_use_shallow_copy_mode
    d = create_driver(%[
      deep_copy true
      <store>
        @type test
        name c0
      </store>
    ])

    outputs = d.instance.outputs
    assert_equal 1, outputs.size
    assert_equal Fluent::Plugin::TestOutput, outputs[0].class
    assert_equal "c0", outputs[0].name
    assert_true d.instance.deep_copy
    assert_equal :shallow, d.instance.copy_mode
  end

  def test_feed_events
    d = create_driver

    assert !d.instance.outputs[0].has_router?
    assert_not_nil d.instance.outputs[1].router
    assert !d.instance.outputs[2].has_router?

    time = event_time("2011-01-02 13:14:15 UTC")
    d.run(default_tag: 'test') do
      d.feed(time, {"a" => 1})
      d.feed(time, {"a" => 2})
    end

    d.instance.outputs.each {|o|
      assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
    }
  end

  def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
    d = create_driver

    time = event_time("2011-01-02 13:14:15 UTC")
    source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ])
    es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream)

    d.run(default_tag: 'test') do
      d.feed(es)
    end

    d.instance.outputs.each { |o|
      assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
    }
  end

  def create_event_test_driver(copy_mode = 'no_copy')
    config = %[
      copy_mode #{copy_mode}
      <store>
        @type test
        name output1
      </store>
      <store>
        @type test
        name output2
      </store>
    ]

    d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config)
    d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
      es.each do |time, record|
        record['foo'] = 'bar'
      end
      super(tag, es)
    end
    d
  end

  time = event_time("2013-05-26 06:37:22 UTC")
  gen_multi_es = Proc.new {
    es = Fluent::MultiEventStream.new
    es.add(time, {"a" => 1, "nest" => {'k' => 'v'}})
    es.add(time, {"b" => 1, "nest" => {'k' => 'v'}})
    es
  }

  data(
    "OneEventStream without copy" => ['no_copy', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
    "OneEventStream with shallow" => ['shallow', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
    "OneEventStream with marshal" => ['marshal', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
    "OneEventStream with deep"    => ['deep',    Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
    "ArrayEventStream without copy" => ['no_copy', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
    "ArrayEventStream with shallow" => ['shallow', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
    "ArrayEventStream with marshal" => ['marshal', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
    "ArrayEventStream with deep"    => ['deep',    Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
    "MultiEventStream without copy" => ['no_copy', gen_multi_es.call],
    "MultiEventStream with shallow" => ['shallow', gen_multi_es.call],
    "MultiEventStream with marshal" => ['marshal', gen_multi_es.call],
    "MultiEventStream with deep"    => ['deep',    gen_multi_es.call],
  )
  def test_copy_mode_with_event_streams(data)
    copy_mode, es = data

    d = create_event_test_driver(copy_mode)
    d.run(default_tag: 'test') do
      d.feed(es)
    end

    events = d.instance.outputs.map(&:events)

    if copy_mode != 'no_copy'
      events[0].each_with_index do |entry0, i|
        record0 = entry0.last
        record1 = events[1][i].last

        assert_not_equal record0.object_id, record1.object_id
        assert_equal "bar", record0["foo"]
        assert !record1.has_key?("foo")
        if copy_mode == 'shallow'
          assert_equal record0['nest'].object_id, record1['nest'].object_id
        else
          assert_not_equal record0['nest'].object_id, record1['nest'].object_id
        end
      end
    else
      events[0].each_with_index do |entry0, i|
        record0 = entry0.last
        record1 = events[1][i].last

        assert_equal record0.object_id, record1.object_id
        assert_equal "bar", record0["foo"]
        assert_equal "bar", record1["foo"]
        assert_equal record0['nest'].object_id, record1['nest'].object_id
      end
    end
  end

  IGNORE_ERROR_CONFIG = %[
    <store ignore_error>
      @type test
      name c0
    </store>
    <store ignore_error>
      @type test
      name c1
    </store>
    <store>
      @type test
      name c2
    </store>
  ]

  def test_ignore_error
    d = create_driver(IGNORE_ERROR_CONFIG)

    # override to raise an error
    d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
      raise ArgumentError, 'Failed'
    end

    time = Time.parse("2011-01-02 13:14:15 UTC").to_i
    assert_nothing_raised do
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
    end
  end

  IGNORE_IF_PREV_SUCCESS_CONFIG = %[
    <store ignore_error>
      @type test
      name c0
    </store>
    <store ignore_if_prev_success ignore_error>
      @type test
      name c1
    </store>
    <store ignore_if_prev_success>
      @type test
      name c2
    </store>
  ]

  def test_ignore_if_prev_success
    d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG)

    # override to raise an error
    d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
      raise ArgumentError, 'Failed'
    end

    # check ingore_if_prev_success functionality:
    # 1. output 2 is succeeded.
    # 2. output 3 is not called.
    flexstub(d.instance.outputs[1]) do |output|
      output.should_receive(:process).once
    end
    flexstub(d.instance.outputs[2]) do |output|
      output.should_receive(:process).never
    end

    time = Time.parse("2011-01-02 13:14:15 UTC").to_i
    assert_nothing_raised do
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
    end
  end

end