Back to Repositories

Testing Buffered Output Retry Mechanisms in Fluentd

This test suite validates retry behavior and buffered output functionality in Fluentd plugins, focusing on error handling, retry strategies and buffer queue management.

Test Coverage Overview

Comprehensive testing of Fluentd’s buffered output retry mechanisms, including exponential backoff and periodic retry strategies. Covers retry timeouts, maximum retry attempts, and buffer queue management under failure conditions.

  • Retry strategy validation for failed writes
  • Buffer queue size limits and chunk management
  • Timeout and retry count threshold handling
  • Delayed commit functionality

Implementation Analysis

The test suite implements multiple test cases using Minitest framework to validate Fluentd’s output plugin retry behavior. It uses mock objects and time manipulation to simulate failures and verify retry logic.

  • DummyOutput classes to simulate plugin behavior
  • Timecop for controlled time progression
  • Wait conditions to validate async operations
  • Error injection for retry triggering

Technical Details

Key technical components include:

  • Minitest for test framework
  • Timecop for time manipulation
  • Mock plugin implementations
  • Buffer configuration options
  • Retry strategy implementations
  • Thread management for async operations

Best Practices Demonstrated

The test suite showcases several testing best practices:

  • Comprehensive edge case coverage
  • Controlled environment setup
  • Proper test isolation
  • Clear test case organization
  • Thorough retry scenario validation
  • Effective async operation testing

fluent/fluentd

test/plugin/test_output_as_buffered_retries.rb

            
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/event'

require 'json'
require 'time'
require 'timeout'
require 'timecop'

module FluentPluginOutputAsBufferedRetryTest
  class DummyBareOutput < Fluent::Plugin::Output
    def register(name, &block)
      instance_variable_set("@#{name}", block)
    end
  end
  class DummySyncOutput < DummyBareOutput
    def initialize
      super
      @process = nil
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
  end
  class DummyFullFeatureOutput < DummyBareOutput
    def initialize
      super
      @prefer_buffered_processing = nil
      @prefer_delayed_commit = nil
      @process = nil
      @format = nil
      @write = nil
      @try_write = nil
    end
    def prefer_buffered_processing
      @prefer_buffered_processing ? @prefer_buffered_processing.call : false
    end
    def prefer_delayed_commit
      @prefer_delayed_commit ? @prefer_delayed_commit.call : false
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
    def format(tag, time, record)
      @format ? @format.call(tag, time, record) : [tag, time, record].to_json
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def try_write(chunk)
      @try_write ? @try_write.call(chunk) : nil
    end
  end
  class DummyFullFeatureOutput2 < DummyFullFeatureOutput
    def prefer_buffered_processing; true; end
    def prefer_delayed_commit; super; end
    def format(tag, time, record); super; end
    def write(chunk); super; end
    def try_write(chunk); super; end
  end
end

class BufferedOutputRetryTest < Test::Unit::TestCase
  def create_output(type=:full)
    case type
    when :bare then FluentPluginOutputAsBufferedRetryTest::DummyBareOutput.new
    when :sync then FluentPluginOutputAsBufferedRetryTest::DummySyncOutput.new
    when :full then FluentPluginOutputAsBufferedRetryTest::DummyFullFeatureOutput.new
    else
      raise ArgumentError, "unknown type: #{type}"
    end
  end
  def create_metadata(timekey: nil, tag: nil, variables: nil)
    Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
  end
  def waiting(seconds)
    begin
      Timeout.timeout(seconds) do
        yield
      end
    rescue Timeout::Error
      STDERR.print(*@i.log.out.logs)
      raise
    end
  end
  def dummy_event_stream
    Fluent::ArrayEventStream.new([
      [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ],
      [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ],
      [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ],
    ])
  end
  def get_log_time(msg, logs)
    log_time = nil
    log = logs.find{|l| l.include?(msg) }
    if log && /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [-+]\d{4}) \[error\]/ =~ log
      log_time = Time.parse($1)
    end
    log_time
  end

  setup do
    @i = create_output
  end

  teardown do
    if @i
      @i.stop unless @i.stopped?
      @i.before_shutdown unless @i.before_shutdown?
      @i.shutdown unless @i.shutdown?
      @i.after_shutdown unless @i.after_shutdown?
      @i.close unless @i.closed?
      @i.terminate unless @i.terminated?
    end
    Timecop.return
  end

  sub_test_case 'buffered output for retries with exponential backoff' do
    test 'exponential backoff is default strategy for retries' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.start
      @i.after_start

      assert_equal :exponential_backoff, @i.buffer_config.retry_type
      assert_equal 1, @i.buffer_config.retry_wait
      assert_equal 2.0, @i.buffer_config.retry_exponential_backoff_base
      assert [email protected]_config.retry_randomize

      now = Time.parse('2016-04-13 18:17:00 -0700')
      Timecop.freeze( now )

      retry_state = @i.retry_state( @i.buffer_config.retry_randomize )
      retry_state.step
      assert_equal (1 * (2 ** 1)), (retry_state.next_time - now)
      retry_state.step
      assert_equal (1 * (2 ** 2)), (retry_state.next_time - now)
      retry_state.step
      assert_equal (1 * (2 ** 3)), (retry_state.next_time - now)
      retry_state.step
      assert_equal (1 * (2 ** 4)), (retry_state.next_time - now)
    end

    test 'does retries correctly when #write fails' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_max_interval' => 60 * 60,
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:32 -0700')
      Timecop.freeze( now )

      @i.enqueue_thread_wait

      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      now = @i.next_flush_time
      Timecop.freeze( now )
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 1 }

      assert{ @i.write_count > 1 }
      assert{ @i.num_errors > 1 }
    end

    test 'max retry interval is limited by retry_max_interval' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_max_interval' => 60,
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:32 -0700')
      Timecop.freeze( now )

      @i.enqueue_thread_wait

      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      10.times do
        now = @i.next_flush_time
        Timecop.freeze( now )
        @i.flush_thread_wakeup
        waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors
      end
      # exponential backoff interval: 1 * 2 ** 10 == 1024
      # but it should be limited by retry_max_interval=60
      assert_equal 60, (@i.next_flush_time - now)
    end

    test 'output plugin give retries up by retry_timeout, and clear queue in buffer' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_timeout' => 3600,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      15.times do |i| # large enough
        now = @i.next_flush_time
        # p({i: i, now: now, diff: (now - Time.now)})
        # * if loop count is 12:
        # {:i=>0, :now=>2016-04-13 18:33:32 -0700, :diff=>1.0}
        # {:i=>1, :now=>2016-04-13 18:33:34 -0700, :diff=>2.0}
        # {:i=>2, :now=>2016-04-13 18:33:38 -0700, :diff=>4.0}
        # {:i=>3, :now=>2016-04-13 18:33:46 -0700, :diff=>8.0}
        # {:i=>4, :now=>2016-04-13 18:34:02 -0700, :diff=>16.0}
        # {:i=>5, :now=>2016-04-13 18:34:34 -0700, :diff=>32.0}
        # {:i=>6, :now=>2016-04-13 18:35:38 -0700, :diff=>64.0}
        # {:i=>7, :now=>2016-04-13 18:37:46 -0700, :diff=>128.0}
        # {:i=>8, :now=>2016-04-13 18:42:02 -0700, :diff=>256.0}
        # {:i=>9, :now=>2016-04-13 18:50:34 -0700, :diff=>512.0}
        # {:i=>10, :now=>2016-04-13 19:07:38 -0700, :diff=>1024.0}
        # {:i=>11, :now=>2016-04-13 19:33:31 -0700, :diff=>1553.0} # clear_queue!

        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        break if @i.buffer.queue.size == 0

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
      end
      assert{ now >= first_failure + 3600 }

      assert{ @i.buffer.stage.size == 0 }
      assert{ written_tags.all?{|t| t == 'test.tag.1' } }

      @i.emit_events("test.tag.3", dummy_event_stream())

      logs = @i.log.out.logs
      assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } }
    end

    test 'output plugin give retries up by retry_max_times, and clear queue in buffer' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_max_times' => 10,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      _first_failure = @i.retry.start

      chunks = @i.buffer.queue.dup

      20.times do |i| # large times enough
        now = @i.next_flush_time

        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        break if @i.buffer.queue.size == 0

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
      end
      assert{ @i.buffer.stage.size == 0 }
      assert{ written_tags.all?{|t| t == 'test.tag.1' } }

      @i.emit_events("test.tag.3", dummy_event_stream())

      logs = @i.log.out.logs
      assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }

      assert{ @i.buffer.queue.size == 0 }
      assert{ @i.buffer.stage.size == 1 }
      assert{ chunks.all?{|c| c.empty? } }
    end

    test 'output plugin limits queued chunks via queued_chunks_limit_size' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_max_times' => 7,
        'queued_chunks_limit_size' => 2,
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing) { true }
      @i.register(:format) { |tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write) { |chunk| raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze(now)

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze(now)

      @i.emit_events("test.tag.2", dummy_event_stream())

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4) { Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert { @i.buffer.queue.size > 0 }
      assert { @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert { @i.write_count > 0 }
      assert { @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      20.times do |i| # large times enough
        now = @i.next_flush_time

        Timecop.freeze(now)
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4) { Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        @i.emit_events("test.tag.1", dummy_event_stream())
        assert { @i.buffer.queue.size <= 2 }
        assert { @i.buffer.stage.size == 1 } # all new data is stored into staged chunk

        break if @i.buffer.queue.size == 0

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors
      end
    end
  end

  sub_test_case 'bufferd output for retries with periodical retry' do
    test 'periodical retries should retry to write in failing status per retry_wait' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :periodic,
        'retry_wait' => 3,
        'retry_randomize' => false,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:32 -0700')
      Timecop.freeze( now )

      @i.enqueue_thread_wait

      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      now = @i.next_flush_time
      Timecop.freeze( now )
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 1 }

      assert{ @i.write_count > 1 }
      assert{ @i.num_errors > 1 }
    end

    test 'output plugin give retries up by retry_timeout, and clear queue in buffer' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :periodic,
        'retry_wait' => 30,
        'retry_randomize' => false,
        'retry_timeout' => 120,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      3.times do |i|
        now = @i.next_flush_time

        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors
      end

      assert{ @i.next_flush_time >= first_failure + 120 }

      assert{ @i.buffer.queue.size == 2 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
      assert{ @i.buffer.stage.size == 0 }

      assert{ written_tags.all?{|t| t == 'test.tag.1' } }

      chunks = @i.buffer.queue.dup

      @i.emit_events("test.tag.3", dummy_event_stream())

      now = @i.next_flush_time
      Timecop.freeze( now )
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

      logs = @i.log.out.logs

      target_time = Time.parse("2016-04-13 18:35:31 -0700")
      target_msg = "[error]: Hit limit for retries. dropping all chunks in the buffer queue."
      assert{ logs.any?{|l| l.include?(target_msg) } }

      log_time = get_log_time(target_msg, logs)
      assert_equal target_time.localtime, log_time.localtime

      assert{ @i.buffer.queue.size == 0 }
      assert{ @i.buffer.stage.size == 1 }
      assert{ chunks.all?{|c| c.empty? } }
    end

    test 'retry_max_times can limit maximum times for retries' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :periodic,
        'retry_wait' => 3,
        'retry_randomize' => false,
        'retry_max_times' => 10,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      _first_failure = @i.retry.start

      chunks = @i.buffer.queue.dup

      20.times do |i|
        now = @i.next_flush_time

        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        break if @i.buffer.queue.size == 0

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
      end
      assert{ @i.buffer.stage.size == 0 }
      assert{ written_tags.all?{|t| t == 'test.tag.1' } }


      @i.emit_events("test.tag.3", dummy_event_stream())

      logs = @i.log.out.logs
      assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }

      assert{ @i.buffer.queue.size == 0 }
      assert{ @i.buffer.stage.size == 1 }
      assert{ chunks.all?{|c| c.empty? } }
    end

    test 'Do not retry when retry_max_times is 0' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :periodic,
        'retry_wait' => 1,
        'retry_randomize' => false,
        'retry_max_times' => 0,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal(0, @i.write_count)
      assert_equal(0, @i.num_errors)

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(2){ Thread.pass until @i.write_count == 1 && @i.num_errors == 1 }

      assert(@i.write_count == 1)
      assert(@i.num_errors == 1)
      assert(@i.log.out.logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") })
      assert(@i.buffer.queue.size == 0)
      assert(@i.buffer.stage.size == 1)
      assert(@i.buffer.queue.all?{|c| c.empty? })
    end
  end

  sub_test_case 'buffered output configured as retry_forever' do
    setup do
      Fluent::Plugin.register_output('output_retries_secondary_test', FluentPluginOutputAsBufferedRetryTest::DummyFullFeatureOutput2)
    end

    test 'warning logs are generated if secondary section is configured' do
      chunk_key = 'tag'
      hash = {
        'retry_forever' => true,
        'retry_randomize' => false,
      }
      i = create_output()
      i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash),config_element('secondary','', {'@type' => 'output_retries_secondary_test'})]))
      logs = i.log.out.logs
      assert { logs.any? { |l| l.include?("<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary") } }
    end

    test 'retry_timeout and retry_max_times will be ignored if retry_forever is true for exponential backoff' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :exponential_backoff,
        'retry_forever' => true,
        'retry_randomize' => false,
        'retry_timeout' => 3600,
        'retry_max_times' => 10,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      15.times do |i|
        now = @i.next_flush_time

        Timecop.freeze( now + 1 )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors
      end

      assert{ @i.buffer.queue.size == 2 }
      assert{ @i.retry.steps > 10 }
      assert{ now > first_failure + 3600 }
    end

    test 'retry_timeout and retry_max_times will be ignored if retry_forever is true for periodical retries' do
      written_tags = []

      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_type' => :periodic,
        'retry_forever' => true,
        'retry_randomize' => false,
        'retry_wait' => 30,
        'retry_timeout' => 360,
        'retry_max_times' => 10,
        'queued_chunks_limit_size' => 100
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| written_tags << chunk.metadata.tag; raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      15.times do |i|
        now = @i.next_flush_time

        Timecop.freeze( now + 1 )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors
      end

      assert{ @i.buffer.queue.size == 2 }
      assert{ @i.retry.steps > 10 }
      assert{ now > first_failure + 360 }
    end
  end

  sub_test_case 'buffered output with delayed commit' do
    test 'does retries correctly when #try_write fails' do
      chunk_key = 'tag'
      hash = {
        'flush_interval' => 1,
        'flush_thread_burst_interval' => 0.1,
        'retry_randomize' => false,
        'retry_max_interval' => 60 * 60,
      }
      @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:try_write){|chunk| raise "yay, your #write must fail" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:32 -0700')
      Timecop.freeze( now )

      @i.enqueue_thread_wait

      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 0 }
      waiting(4) do
        state = @i.instance_variable_get(:@output_flush_threads).first
        state.thread.status == 'sleep'
      end

      assert(@i.write_count > 0)
      assert(@i.num_errors > 0)

      now = @i.next_flush_time
      Timecop.freeze( now )
      @i.flush_thread_wakeup
      waiting(4){ Thread.pass until @i.write_count > 1 }
      waiting(4) do
        state = @i.instance_variable_get(:@output_flush_threads).first
        state.thread.status == 'sleep'
      end

      assert(@i.write_count > 1)
      assert(@i.num_errors > 1)
    end
  end
end