Back to Repositories

Validating Buffered Output Secondary Plugin Behavior in Fluentd

This test suite validates the secondary output plugin functionality in Fluentd’s buffered output system, focusing on retry mechanisms and failover behavior.

Test Coverage Overview

The test suite provides comprehensive coverage of Fluentd’s secondary output plugin system, including buffer handling, retry logic, and failover scenarios. Key functionality tested includes buffer queue management, retry timeout thresholds, and delayed commit processing. Edge cases cover failed write attempts and secondary plugin transitions. Integration points include buffer chunk handling and plugin lifecycle management.

Implementation Analysis

Testing approach uses minitest framework with mock output plugins to simulate primary and secondary output behaviors. The implementation validates both periodic and exponential backoff retry patterns using time manipulation via Timecop. Key patterns include:

  • Buffer chunk processing and commit validation
  • Retry state management and timeout handling
  • Secondary plugin activation thresholds
  • Delayed commit synchronization

Technical Details

Testing tools and libraries:

  • Minitest for test framework
  • Timecop for time manipulation
  • Mock output plugins (DummyBareOutput, DummySyncOutput, DummyFullFeatureOutput)
  • JSON for data formatting
  • Timeout module for execution control

Best Practices Demonstrated

The test suite demonstrates several testing best practices for complex output plugin systems. Notable practices include proper setup/teardown handling, comprehensive error scenario coverage, and thorough validation of plugin lifecycle states. The code organization follows a clear pattern of setup, execution and verification phases with proper isolation between test cases.

fluent/fluentd

test/plugin/test_output_as_buffered_secondary.rb

            
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/event'

require 'json'
require 'time'
require 'timeout'
require 'timecop'

module FluentPluginOutputAsBufferedSecondaryTest
  class DummyBareOutput < Fluent::Plugin::Output
    def register(name, &block)
      instance_variable_set("@#{name}", block)
    end
  end
  class DummySyncOutput < DummyBareOutput
    def initialize
      super
      @process = nil
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
  end
  class DummyFullFeatureOutput < DummyBareOutput
    def initialize
      super
      @prefer_buffered_processing = nil
      @prefer_delayed_commit = nil
      @process = nil
      @format = nil
      @write = nil
      @try_write = nil
    end
    def prefer_buffered_processing
      @prefer_buffered_processing ? @prefer_buffered_processing.call : false
    end
    def prefer_delayed_commit
      @prefer_delayed_commit ? @prefer_delayed_commit.call : false
    end
    def process(tag, es)
      @process ? @process.call(tag, es) : nil
    end
    def format(tag, time, record)
      @format ? @format.call(tag, time, record) : [tag, time, record].to_json
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def try_write(chunk)
      @try_write ? @try_write.call(chunk) : nil
    end
  end
  class DummyFullFeatureOutput2 < DummyFullFeatureOutput
    def prefer_buffered_processing; true; end
    def prefer_delayed_commit; super; end
    def format(tag, time, record); super; end
    def write(chunk); super; end
    def try_write(chunk); super; end
  end
end

class BufferedOutputSecondaryTest < Test::Unit::TestCase
  def create_output(type=:full)
    case type
    when :bare then FluentPluginOutputAsBufferedSecondaryTest::DummyBareOutput.new
    when :sync then FluentPluginOutputAsBufferedSecondaryTest::DummySyncOutput.new
    when :full then FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput.new
    else
      raise ArgumentError, "unknown type: #{type}"
    end
  end
  def create_metadata(timekey: nil, tag: nil, variables: nil)
    Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
  end
  def waiting(seconds)
    begin
      Timeout.timeout(seconds) do
        yield
      end
    rescue Timeout::Error
      STDERR.print(*@i.log.out.logs)
      raise
    end
  end
  def dummy_event_stream
    Fluent::ArrayEventStream.new([
      [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ],
      [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ],
      [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ],
    ])
  end

  setup do
    @i = create_output
  end

  teardown do
    if @i
      @i.stop unless @i.stopped?
      @i.before_shutdown unless @i.before_shutdown?
      @i.shutdown unless @i.shutdown?
      @i.after_shutdown unless @i.after_shutdown?
      @i.close unless @i.closed?
      @i.terminate unless @i.terminated?
    end
    Timecop.return
  end

  sub_test_case 'secondary plugin feature for buffered output with periodical retry' do
    setup do
      Fluent::Plugin.register_output('output_secondary_test', FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput)
      Fluent::Plugin.register_output('output_secondary_test2', FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput2)
    end

    test 'raises configuration error if primary does not support buffering' do
      i = create_output(:sync)
      assert_raise Fluent::ConfigError do
        i.configure(config_element('ROOT','',{},[config_element('secondary','',{'@type'=>'output_secondary_test'})]))
      end
    end

    test 'raises configuration error if <buffer>/<secondary> section is specified in <secondary> section' do
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 30, 'retry_randomize' => false})
      secconf1 = config_element('secondary','',{'@type' => 'output_secondary_test'},[config_element('buffer', 'time')])
      secconf2 = config_element('secondary','',{'@type' => 'output_secondary_test'},[config_element('secondary', '')])
      i = create_output()
      assert_raise Fluent::ConfigError do
        i.configure(config_element('ROOT','',{},[priconf,secconf1]))
      end
      assert_raise Fluent::ConfigError do
        i.configure(config_element('ROOT','',{},[priconf,secconf2]))
      end
    end

    test 'uses same plugin type with primary if @type is missing in secondary' do
      bufconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 30, 'retry_randomize' => false})
      secconf = config_element('secondary','',{})
      priconf = config_element('ROOT', '', {'@type' => 'output_secondary_test'}, [bufconf, secconf])
      i = create_output()
      assert_nothing_raised do
        i.configure(priconf)
      end
      logs = i.log.out.logs
      assert{ logs.empty? }
      assert{ i.secondary.is_a? FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput }
    end

    test 'warns if secondary plugin is different type from primary one' do
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 30, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      i = create_output()
      i.configure(config_element('ROOT','',{},[priconf,secconf]))
      logs = i.log.out.logs
      assert{ logs.any?{|l| l.include?("Use different plugin for secondary. Check the plugin works with primary like secondary_file") } }
    end

    test 'secondary plugin lifecycle is kicked by primary' do
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 30, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      i = create_output()
      i.configure(config_element('ROOT','',{},[priconf,secconf]))
      logs = i.log.out.logs
      assert{ logs.any?{|l| l.include?("Use different plugin for secondary. Check the plugin works with primary like secondary_file") } }

      assert i.secondary.configured?

      assert !i.secondary.started?
      i.start
      assert i.secondary.started?

      assert !i.secondary.after_started?
      i.after_start
      assert i.secondary.after_started?

      assert !i.secondary.stopped?
      i.stop
      assert i.secondary.stopped?

      assert !i.secondary.before_shutdown?
      i.before_shutdown
      assert i.secondary.before_shutdown?

      assert !i.secondary.shutdown?
      i.shutdown
      assert i.secondary.shutdown?

      assert !i.secondary.after_shutdown?
      i.after_shutdown
      assert i.secondary.after_shutdown?

      assert !i.secondary.closed?
      i.close
      assert i.secondary.closed?

      assert !i.secondary.terminated?
      i.terminate
      assert i.secondary.terminated?
    end

    test 'primary plugin will emit event streams to secondary after retries for time of retry_timeout * retry_secondary_threshold' do
      written = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ false }
      @i.secondary.register(:write){|chunk| chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8
      now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
      Timecop.freeze( now )

      unless @i.retry.secondary?
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        # next step is on secondary
        now = first_failure + 60 * 0.8 + 10
        Timecop.freeze( now )
      end

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

      current_write_count = @i.write_count
      current_num_errors = @i.num_errors
      assert{ current_write_count > prev_write_count }
      assert{ current_num_errors == prev_num_errors }

      assert_nil @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]

      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
      assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
    end

    test 'secondary can do non-delayed commit even if primary do delayed commit' do
      written = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:try_write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ false }
      @i.secondary.register(:write){|chunk| chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8
      now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
      Timecop.freeze( now )

      unless @i.retry.secondary?
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        # next step is on secondary
        now = first_failure + 60 * 0.8 + 10
        Timecop.freeze( now )
      end

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

      assert{ @i.write_count > prev_write_count }
      assert{ @i.num_errors == prev_num_errors }

      assert_nil @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]

      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
      assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
    end

    test 'secondary plugin can do delayed commit if primary do it' do
      written = []
      chunks = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ true }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:try_write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ true }
      @i.secondary.register(:try_write){|chunk| chunks << chunk; chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8
      now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
      Timecop.freeze( now )

      unless @i.retry.secondary?
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        # next step is on secondary
        now = first_failure + 60 * 0.8 + 10
        Timecop.freeze( now )
      end

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

      assert{ @i.write_count > prev_write_count }
      assert{ @i.num_errors == prev_num_errors }

      assert @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]

      assert{ @i.buffer.dequeued.size > 0 }
      assert{ chunks.size > 0 }
      assert{ !chunks.first.empty? }

      @i.secondary.commit_write(chunks[0].unique_id)

      assert{ @i.buffer.dequeued[chunks[0].unique_id].nil? }
      assert{ chunks.first.empty? }

      assert_nil @i.retry

      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
      assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
    end

    test 'secondary plugin can do delayed commit even if primary does not do it' do
      written = []
      chunks = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ true }
      @i.secondary.register(:try_write){|chunk| chunks << chunk; chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8
      now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
      Timecop.freeze( now )

      unless @i.retry.secondary?
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        # next step is on secondary
        now = first_failure + 60 * 0.8 + 10
        Timecop.freeze( now )
      end

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

      assert{ @i.write_count > prev_write_count }
      assert{ @i.num_errors == prev_num_errors }

      assert @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]

      assert{ @i.buffer.dequeued.size > 0 }
      assert{ chunks.size > 0 }
      assert{ !chunks.first.empty? }

      @i.secondary.commit_write(chunks[0].unique_id)

      assert{ @i.buffer.dequeued[chunks[0].unique_id].nil? }
      assert{ chunks.first.empty? }

      assert_nil @i.retry

      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
      assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
    end

    test 'secondary plugin can do delayed commit even if primary does not do it, and non-committed chunks will be rollbacked by primary' do
      written = []
      chunks = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false, 'queued_chunks_limit_size' => 10})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ true }
      @i.secondary.register(:try_write){|chunk| chunks << chunk; chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.secondary.register(:write){|chunk| raise "don't use this" }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())
      @i.emit_events("test.tag.2", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size == 2 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8

      now = first_failure + 60 * 0.8 + 1
      Timecop.freeze( now )
      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      now = first_failure + 60 * 0.8 + 2
      Timecop.freeze( now )
      @i.enqueue_thread_wait
      @i.flush_thread_wakeup

      waiting(4){ sleep 0.1 until chunks.size == 2 }

      assert{ @i.write_count > prev_write_count }
      assert{ @i.num_errors == prev_num_errors }

      assert @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]
      assert_equal [ 'test.tag.2', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[3]
      assert_equal [ 'test.tag.2', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[4]
      assert_equal [ 'test.tag.2', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[5]

      assert{ @i.buffer.dequeued.size == 2 }
      assert{ chunks.size == 2 }
      assert{ !chunks[0].empty? }
      assert{ !chunks[1].empty? }

      30.times do |i| # large enough
        # In https://github.com/fluent/fluentd/blob/c90c024576b3d35f356a55fd33d1232947114a9a/lib/fluent/plugin_helper/retry_state.rb
        # @timeout_at is 2016-04-13 18:34:31, @next_time must be less than 2016-04-13 18:34:30
        #
        # first_failure + 60 * 0.8 + 2 # => 2016-04-13 18:34:21
        # @next_time is not added by 1, but by randomize(@retry_wait) https://github.com/fluent/fluentd/blob/c90c024576b3d35f356a55fd33d1232947114a9a/lib/fluent/plugin_helper/retry_state.rb#L196
        # current_time(=Time.now) + randomize(@retry_wait) < @timeout_at
        # (2016-04-13 18:34:21 + 6) + 3 < 2016-04-13 18:34:31
        # So, current_time must be at most 6
        now = first_failure + 60 * 0.8 + 2 + [i, 6].min
        Timecop.freeze( now )
        @i.flush_thread_wakeup

        break if @i.buffer.dequeued.size == 0
      end

      assert @i.retry
      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.count{|l| l.include?("[warn]: failed to flush the buffer chunk, timeout to commit.") } == 2 }
      assert{ logs.count{|l| l.include?("[warn]: failed to flush the buffer chunk, timeout to commit.") } == 2 }
    end

    test 'retry_wait for secondary is same with one for primary' do
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ false }
      @i.secondary.register(:write){|chunk| raise "your secondary is also useless." }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8

      now = first_failure + 60 * 0.8 + 1

      Timecop.freeze( now )
      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

      assert{ @i.write_count > prev_write_count }
      assert{ @i.num_errors > prev_num_errors }

      assert @i.retry

      assert_equal 3, (@i.next_flush_time - Time.now)

      logs = @i.log.out.logs
      waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: failed to flush the buffer with secondary output.") } }
      assert{ logs.any?{|l| l.include?("[warn]: failed to flush the buffer with secondary output.") } }
    end
  end

  sub_test_case 'secondary plugin feature for buffered output with exponential backoff' do
    setup do
      Fluent::Plugin.register_output('output_secondary_test', FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput)
      Fluent::Plugin.register_output('output_secondary_test2', FluentPluginOutputAsBufferedSecondaryTest::DummyFullFeatureOutput2)
    end

    test 'primary plugin will emit event streams to secondary after retries for time of retry_timeout * retry_secondary_threshold' do
      written = []
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :exponential_backoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ false }
      @i.secondary.register(:write){|chunk| chunk.read.split("\n").each{|line| written << JSON.parse(line) } }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      first_failure = @i.retry.start

      20.times do |i| # large enough
        now = @i.next_flush_time
        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        assert{ @i.write_count > prev_write_count }

        break if @i.buffer.queue.size == 0

        prev_write_count = @i.write_count
      end

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8

      assert{ now >= first_failure + 60 * 0.8 }

      assert_nil @i.retry

      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
      assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]

      assert(@i.log.out.logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") })
    end

    test 'exponential backoff interval will be initialized when switched to secondary' do
      priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :exponential_backoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false})
      secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
      @i.configure(config_element('ROOT','',{},[priconf,secconf]))
      @i.register(:prefer_buffered_processing){ true }
      @i.register(:prefer_delayed_commit){ false }
      @i.register(:format){|tag,time,record| [tag,time.to_i,record].to_json + "\n" }
      @i.register(:write){|chunk| raise "yay, your #write must fail" }
      @i.secondary.register(:prefer_delayed_commit){ false }
      @i.secondary.register(:write){|chunk| raise "your secondary is also useless." }
      @i.start
      @i.after_start

      @i.interrupt_flushes

      now = Time.parse('2016-04-13 18:33:30 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.1", dummy_event_stream())

      now = Time.parse('2016-04-13 18:33:31 -0700')
      Timecop.freeze( now )

      @i.emit_events("test.tag.2", dummy_event_stream())

      assert_equal 0, @i.write_count
      assert_equal 0, @i.num_errors

      @i.enqueue_thread_wait
      @i.flush_thread_wakeup
      waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

      assert{ @i.buffer.queue.size > 0 }
      assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

      assert{ @i.write_count > 0 }
      assert{ @i.num_errors > 0 }

      prev_write_count = @i.write_count
      prev_num_errors = @i.num_errors

      first_failure = @i.retry.start

      20.times do |i| # large enough
        now = @i.next_flush_time
        # p({i: i, now: now, diff: (now - Time.now)})
        # {:i=>0, :now=>2016-04-13 18:33:32 -0700, :diff=>1.0}
        # {:i=>1, :now=>2016-04-13 18:33:34 -0700, :diff=>2.0}
        # {:i=>2, :now=>2016-04-13 18:33:38 -0700, :diff=>4.0}
        # {:i=>3, :now=>2016-04-13 18:33:46 -0700, :diff=>8.0}
        # {:i=>4, :now=>2016-04-13 18:34:02 -0700, :diff=>16.0}
        # {:i=>5, :now=>2016-04-13 18:34:19 -0700, :diff=>17.0}
        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        prev_write_count = @i.write_count
        prev_num_errors = @i.num_errors

        break if @i.retry.secondary?

        assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
      end

      # retry_timeout == 60(sec), retry_secondary_threshold == 0.8

      assert{ now >= first_failure + 60 * 0.8 }
      assert @i.retry
      logs = @i.log.out.logs
      assert{ logs.any?{|l| l.include?("[warn]: failed to flush the buffer with secondary output.") } }

      assert{ (@i.next_flush_time - Time.now) <= 2 } # <= retry_wait (1s) * base (2) ** 1

      20.times do |i| # large enough again
        now = @i.next_flush_time
        # p({i: i, now: now, diff: (now - Time.now)})
        # {:i=>0, :now=>2016-04-13 18:34:20 -0700, :diff=>1.0}
        # {:i=>1, :now=>2016-04-13 18:34:24 -0700, :diff=>4.0}
        # {:i=>2, :now=>2016-04-13 18:34:31 -0700, :diff=>7.0}

        Timecop.freeze( now )
        @i.enqueue_thread_wait
        @i.flush_thread_wakeup
        waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }

        assert{ @i.write_count > prev_write_count }
        assert{ @i.num_errors > prev_num_errors }

        break if @i.buffer.queue.size == 0
      end

      logs = @i.log.out.logs
      assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } }

      assert{ now >= first_failure + 60 }
    end
  end
end