Back to Repositories

Testing Buffered Output Backup Functionality in Fluentd

This test suite validates the backup functionality of Fluentd’s buffered output plugin, focusing on error handling and secondary output scenarios. It ensures proper behavior when writing operations fail and verifies the backup system’s reliability across different configurations.

Test Coverage Overview

The test suite comprehensively covers buffered output backup scenarios in Fluentd.

Key areas tested include:
  • Unrecoverable error handling with no secondary output
  • Backup behavior with same-type secondary outputs
  • Error handling with different-type secondary outputs
  • Async secondary output scenarios
  • Directory creation and permission handling
  • Chunk backup disable functionality

Implementation Analysis

The testing approach utilizes Ruby’s Test::Unit framework with extensive mocking of output plugins.

Key implementation patterns include:
  • Custom dummy output classes for different scenarios
  • Time manipulation using Timecop
  • Controlled flush operations
  • Error injection for various failure cases

Technical Details

Testing tools and configuration:
  • MiniTest/Test::Unit as the testing framework
  • Timecop for time manipulation
  • Mock output plugins (DummyOutput, DummyOutputForSecondary)
  • Temporary directory handling for backup testing
  • Custom event stream generation

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices in Ruby.

Notable aspects include:
  • Thorough setup and teardown procedures
  • Comprehensive error case coverage
  • Platform-specific considerations (Windows vs Unix)
  • Clean test isolation
  • Effective use of helper methods and utilities

fluent/fluentd

test/plugin/test_output_as_buffered_backup.rb

            
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/event'
require 'fluent/error'

require 'json'
require 'time'
require 'timeout'
require 'timecop'


class BufferedOutputBackupTest < Test::Unit::TestCase
  class BareOutput < Fluent::Plugin::Output
    def register(name, &block)
      instance_variable_set("@#{name}", block)
    end
  end
  class DummyOutput < BareOutput
    def initialize
      super
      @process = nil
      @format = nil
      @write = nil
      @try_write = nil
    end
    def prefer_buffered_processing
      true
    end
    def prefer_delayed_commit
      false
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
    def format(tag, time, record)
      [tag, time.to_i, record].to_json + "\n"
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def try_write(chunk)
      @try_write ? @try_write.call(chunk) : nil
    end
  end
  class DummyOutputForSecondary < BareOutput
    def initialize
      super
      @process = nil
      @format = nil
      @write = nil
      @try_write = nil
    end
    def prefer_buffered_processing
      true
    end
    def prefer_delayed_commit
      false
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
    def format(tag, time, record)
      [tag, time.to_i, record].to_json + "\n"
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def try_write(chunk)
      @try_write ? @try_write.call(chunk) : nil
    end
  end
  class DummyAsyncOutputForSecondary < BareOutput
    def initialize
      super
      @process = nil
      @format = nil
      @write = nil
      @try_write = nil
    end
    def prefer_buffered_processing
      true
    end
    def prefer_delayed_commit
      true
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
    def format(tag, time, record)
      [tag, time.to_i, record].to_json + "\n"
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def try_write(chunk)
      @try_write ? @try_write.call(chunk) : nil
    end
  end

  TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/bu#{ENV['TEST_ENV_NUMBER']}")

  def create_output
    DummyOutput.new
  end
  def create_metadata(timekey: nil, tag: nil, variables: nil)
    Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
  end
  def waiting(seconds)
    begin
      Timeout.timeout(seconds) do
        yield
      end
    rescue Timeout::Error
      STDERR.print(*@i.log.out.logs)
      raise
    end
  end

  def dummy_event_stream
    Fluent::ArrayEventStream.new([
      [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ],
      [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ],
      [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ],
    ])
  end

  setup do
    @i = create_output
    FileUtils.rm_rf(TMP_DIR)
    FileUtils.mkdir_p(TMP_DIR)

    Fluent::Plugin.register_output('backup_output', DummyOutput)
    Fluent::Plugin.register_output('backup_output2', DummyOutputForSecondary)
    Fluent::Plugin.register_output('backup_async_output', DummyAsyncOutputForSecondary)
  end

  teardown do
    if @i
      @i.stop unless @i.stopped?
      @i.before_shutdown unless @i.before_shutdown?
      @i.shutdown unless @i.shutdown?
      @i.after_shutdown unless @i.after_shutdown?
      @i.close unless @i.closed?
      @i.terminate unless @i.terminated?
    end
    Timecop.return
  end

  sub_test_case 'buffered output for broken chunks' do
    def flush_chunks
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze(now)
      @i.emit_events("test.tag.1", dummy_event_stream())
      now = Time.parse('2016-04-13 18:33:32 -0700')
      Timecop.freeze(now)

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4) { Thread.pass until @i.write_count > 0 }

      assert { @i.write_count > 0 }
      Timecop.freeze(now)
      @i.flush_thread_wakeup
    end

    def wait_flush(target_file)
      waiting(5) {
        target_dir = File.join(File.dirname(target_file), "*")
        while Dir.glob(target_dir).size.zero?
          # Avoid to lose globbed entries on Windows in busy loop
          sleep 0.1 if Fluent.windows?
        end
      }
    end

    data('unrecoverable error' => Fluent::UnrecoverableError,
         'type error' => TypeError,
         'argument error' => ArgumentError,
         'no method error' => NoMethodError,
         'msgpack unpack error' => MessagePack::UnpackError,
         'encoding error' => EncodingError)
    test 'backup chunk without secondary' do |error_class|
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test'
        hash = {
          'flush_interval' => 1,
          'flush_thread_burst_interval' => 0.1,
        }
        chunk_id = nil
        @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise error_class, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        wait_flush(target)
        assert_true File.exist?(target)
        logs = @i.log.out.logs
        assert { logs.any? { |l| l.include?("got unrecoverable error in primary and no secondary") } }
      end
    end

    test 'backup chunk with same type secondary' do
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test_with_same_secondary'
        hash = {
          'flush_interval' => 1,
          'flush_thread_burst_interval' => 0.1,
        }
        chunk_id = nil
        secconf = config_element('secondary','',{'@type' => 'backup_output'})
        @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        wait_flush(target)
        assert_true File.exist?(target)
        logs = @i.log.out.logs
        assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary type is same as primary") } }
      end
    end

    test 'create directory' do
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test_with_same_secondary'
        hash = { 'flush_interval' => 1, 'flush_thread_burst_interval' => 0.1 }
        chunk_id = nil
        secconf = config_element('secondary', '', { '@type' => 'backup_output' })
        @i.configure(config_element('ROOT', '', { '@id' => id }, [config_element('buffer', 'tag', hash), secconf]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        target_dir = File.dirname(target)
        wait_flush(target)

        assert_path_exist(target_dir)
        assert_equal '755', File.stat(target_dir).mode.to_s(8)[-3, 3]
      end
    end

    test 'create directory with specific mode' do
      omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?

      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR, 'dir_permission' => '744') do
        id = 'backup_test_with_same_secondary'
        hash = { 'flush_interval' => 1, 'flush_thread_burst_interval' => 0.1 }
        chunk_id = nil
        secconf = config_element('secondary', '', { '@type' => 'backup_output' })
        @i.configure(config_element('ROOT', '', { '@id' => id }, [config_element('buffer', 'tag', hash), secconf]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        target_dir = File.dirname(target)
        wait_flush(target)

        assert_path_exist(target_dir)
        assert_equal '744', File.stat(target_dir).mode.to_s(8)[-3, 3]
      end
    end

    test 'backup chunk with different type secondary' do
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test_with_diff_secondary'
        hash = {
          'flush_interval' => 1,
          'flush_thread_burst_interval' => 0.1,
        }
        chunk_id = nil
        secconf = config_element('secondary','',{'@type' => 'backup_output2'})
        @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }
        @i.secondary.register(:write) { |chunk|
          raise Fluent::UnrecoverableError, "yay, your secondary #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        wait_flush(target)
        assert_true File.exist?(target)
        logs = @i.log.out.logs
        assert { logs.any? { |l| l.include?("got unrecoverable error in primary. Skip retry and flush chunk to secondary") } }
        assert { logs.any? { |l| l.include?("got an error in secondary for unrecoverable error") } }
      end
    end

    test 'backup chunk with async secondary' do
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test_with_diff_secondary'
        hash = {
          'flush_interval' => 1,
          'flush_thread_burst_interval' => 0.1,
        }
        chunk_id = nil
        secconf = config_element('secondary','',{'@type' => 'backup_async_output'})
        @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        wait_flush(target)
        assert_true File.exist?(target)
        logs = @i.log.out.logs
        assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } }
      end
    end

    test 'chunk is thrown away when disable_chunk_backup is true' do
      Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
        id = 'backup_test'
        hash = {
          'flush_interval' => 1,
          'flush_thread_burst_interval' => 0.1,
          'disable_chunk_backup' => true
        }
        chunk_id = nil
        @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)]))
        @i.register(:write) { |chunk|
          chunk_id = chunk.unique_id;
          raise Fluent::UnrecoverableError, "yay, your #write must fail"
        }

        flush_chunks

        target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
        assert_false File.exist?(target)
        logs = @i.log.out.logs
        assert { logs.any? { |l| l.include?("disable_chunk_backup is true") } }
      end
    end
  end
end