Back to Repositories

Testing SourceOnlyBufferAgent Configuration and Management in Fluentd

This test suite validates the SourceOnlyBufferAgent functionality in Fluentd, focusing on buffer configuration, cleanup operations, and error handling mechanisms. The tests ensure proper initialization, worker handling, and buffer management across different scenarios.

Test Coverage Overview

The test suite provides comprehensive coverage of the SourceOnlyBufferAgent component:

  • Buffer configuration validation with default and custom settings
  • Multi-worker scenario testing
  • Buffer cleanup operations verification
  • Error handling and event routing validation
  • Flush behavior testing with various configurations

Implementation Analysis

The testing approach employs Minitest framework with extensive use of mocking and stubbing to isolate component behavior. The implementation validates buffer configurations, directory management, and event routing through systematic test cases that cover both standard operations and edge cases.

  • Stub-based system configuration testing
  • Mock-based filesystem operations verification
  • Event router interaction validation

Technical Details

Key technical components include:

  • Test::Unit framework integration
  • ServerEngine::DaemonLogger for logging verification
  • Fluent::SystemConfig for configuration management
  • Mock/Stub utilities for isolation testing
  • FileUtils for buffer directory management

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Systematic sub-test case organization
  • Comprehensive setup and teardown management
  • Platform-specific test handling (Windows omission)
  • Thorough error scenario coverage
  • Clear test case naming and organization

fluent/fluentd

test/test_source_only_buffer_agent.rb

            
require_relative 'helper'

class SourceOnlyBufferAgentTest < ::Test::Unit::TestCase
  def log
    logger = ServerEngine::DaemonLogger.new(
      Fluent::Test::DummyLogDevice.new,
      { log_level: ServerEngine::DaemonLogger::INFO }
    )
    Fluent::Log.new(logger)
  end

  def setup
    omit "Not supported on Windows" if Fluent.windows?
    @log = log
  end

  sub_test_case "#configure" do
    test "default" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      assert_equal(
        {
          "num of filter plugins" => 0,
          "num of output plugins" => 1,
          "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path),
          "actual_buffer_dir" => agent.instance_variable_get(:@default_buffer_path),
          "EventRouter of BufferOutput" => root_agent.event_router.object_id,
          "flush_thread_count" => 0,
          "flush_at_shutdown" => false,
        },
        {
          "num of filter plugins" => agent.filters.size,
          "num of output plugins" => agent.outputs.size,
          "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir),
          "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir),
          "EventRouter of BufferOutput" => agent.outputs[0].router.object_id,
          "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count,
          "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown,
        }
      )

      assert do
        @log.out.logs.any? { |log| log.include? "the emitted data will be stored in the buffer files" }
      end
    end

    test "flush: true" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure(flush: true)

      assert_equal(
        {
          "num of filter plugins" => 0,
          "num of output plugins" => 1,
          "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path),
          "actual_buffer_dir" => agent.instance_variable_get(:@default_buffer_path),
          "EventRouter of BufferOutput" => root_agent.event_router.object_id,
          "flush_thread_count" => 1,
          "flush_at_shutdown" => true,
        },
        {
          "num of filter plugins" => agent.filters.size,
          "num of output plugins" => agent.outputs.size,
          "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir),
          "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir),
          "EventRouter of BufferOutput" => agent.outputs[0].router.object_id,
          "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count,
          "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown,
        }
      )

      assert do
        not @log.out.logs.any? { |log| log.include? "the emitted data will be stored in the buffer files" }
      end
    end

    test "multiple workers" do
      system_config = Fluent::SystemConfig.new(config_element("system", "", {"workers" => 2}))
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      assert_equal(
        {
          "num of filter plugins" => 0,
          "num of output plugins" => 1,
          "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path),
          "actual_buffer_dir" => "#{agent.instance_variable_get(:@default_buffer_path)}/worker0",
          "EventRouter of BufferOutput" => root_agent.event_router.object_id,
          "flush_thread_count" => 0,
          "flush_at_shutdown" => false,
        },
        {
          "num of filter plugins" => agent.filters.size,
          "num of output plugins" => agent.outputs.size,
          "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir),
          "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir),
          "EventRouter of BufferOutput" => agent.outputs[0].router.object_id,
          "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count,
          "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown,
        }
      )
    end

    test "full setting with flush:true" do
      system_config = Fluent::SystemConfig.new(config_element("system", "", {}, [
        config_element("source_only_buffer", "", {
          "flush_thread_count" => 4,
          "overflow_action" => :throw_exception,
          "path" => "tmp_buffer_path",
          "flush_interval" => 1,
          "chunk_limit_size" => 100,
          "total_limit_size" => 1000,
          "compress" => :gzip,
        })
      ]))
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure(flush: true)

      assert_equal(
        {
          "num of filter plugins" => 0,
          "num of output plugins" => 1,
          "base_buffer_dir" => "tmp_buffer_path",
          "actual_buffer_dir" => "tmp_buffer_path",
          "EventRouter of BufferOutput" => root_agent.event_router.object_id,
          "flush_thread_count" => 4,
          "flush_at_shutdown" => true,
          "overflow_action" => :throw_exception,
          "flush_interval" => 1,
          "chunk_limit_size" => 100,
          "total_limit_size" => 1000,
          "compress" => :gzip,
        },
        {
          "num of filter plugins" => agent.filters.size,
          "num of output plugins" => agent.outputs.size,
          "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir),
          "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir),
          "EventRouter of BufferOutput" => agent.outputs[0].router.object_id,
          "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count,
          "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown,
          "overflow_action" => agent.outputs[0].buffer_config.overflow_action,
          "flush_interval" => agent.outputs[0].buffer_config.flush_interval,
          "chunk_limit_size" => agent.outputs[0].buffer.chunk_limit_size,
          "total_limit_size" => agent.outputs[0].buffer.total_limit_size,
          "compress" => agent.outputs[0].buffer.compress,
        }
      )
    end
  end

  sub_test_case "#cleanup" do
    test "do not remove the buffer if it is not empty" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      stub(Dir).empty?(agent.instance_variable_get(:@actual_buffer_dir)) { false }
      mock(FileUtils).remove_dir.never

      agent.cleanup

      assert do
        @log.out.logs.any? { |log| log.include? "some buffer files remain in" }
      end
    end

    test "remove the buffer if it is empty" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      stub(Dir).empty?(agent.instance_variable_get(:@actual_buffer_dir)) { true }
      mock(FileUtils).remove_dir(agent.instance_variable_get(:@base_buffer_dir)).times(1)

      agent.cleanup

      assert do
        not @log.out.logs.any? { |log| log.include? "some buffer files remain in" }
      end
    end
  end

  sub_test_case "error" do
    test "#emit_error_event" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      agent.event_router.emit_error_event("tag", 0, "hello", Exception.new)

      assert do
        @log.out.logs.any? { |log| log.include? "SourceOnlyBufferAgent: dump an error event" }
      end
    end

    test "#handle_emits_error" do
      system_config = Fluent::SystemConfig.new
      root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config)
      stub(Fluent::Engine).root_agent { root_agent }
      stub(Fluent::Engine).system_config { system_config }
      root_agent.configure(config_element)

      agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config)
      agent.configure

      stub(agent.outputs[0]).emit_events { raise "test error" }

      agent.event_router.emit("foo", 0, "hello")

      assert do
        @log.out.logs.any? { |log| log.include? "SourceOnlyBufferAgent: emit transaction failed" }
      end
    end
  end
end