Back to Repositories

Testing File Service Discovery Configuration Management in Fluentd

This test suite validates the File Service Discovery functionality in Fluentd, focusing on configuration loading and service updates monitoring. It ensures proper handling of YAML, JSON config files and service state changes.

Test Coverage Overview

The test suite provides comprehensive coverage of the File Service Discovery plugin’s core functionality.

  • Configuration file loading (YAML, JSON formats)
  • Service state change detection
  • Error handling for invalid configs
  • Event handling for service updates

Implementation Analysis

The testing approach utilizes Ruby’s Test::Unit framework with extensive mocking and fiber-based event simulation. The implementation employs sub-test cases to organize related test scenarios and uses helper modules for controlled event testing.

  • Fiber-based event loop testing
  • Mock-based verification
  • Temporary file manipulation

Technical Details

  • Test::Unit as primary testing framework
  • FileUtils for temporary file management
  • JSON module for configuration parsing
  • Custom TestStatEventHelperWrapper for event simulation
  • Fiber-based asynchronous testing

Best Practices Demonstrated

The test suite exemplifies several testing best practices including proper setup/teardown handling, isolated test cases, and comprehensive error scenario coverage.

  • Thorough setup and cleanup
  • Isolated test environments
  • Comprehensive edge case testing
  • Clear test case organization

fluent/fluentd

test/plugin/test_sd_file.rb

            
require_relative '../helper'
require 'fluent/plugin/sd_file'
require 'fileutils'
require 'json'

class FileServiceDiscoveryTest < ::Test::Unit::TestCase
  setup do
    @dir = File.expand_path('data/sd_file', __dir__)
    FileUtils.mkdir_p(File.join(@dir, 'tmp'))
  end

  teardown do
    FileUtils.rm_r(File.join(@dir, 'tmp'))
  end

  sub_test_case 'configure' do
    test 'load yml' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1]
    end

    test 'load yaml' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yaml') }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1]
    end

    test 'load json' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.json') }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1]
    end

    test 'regard as yaml if ext is not givened' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config') }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1]
    end

    test 'raise an error if config has error' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      e = assert_raise Fluent::ConfigError do
        sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'invalid_config.yaml') }))
      end
      assert_match(/path=/, e.message)
    end

    test 'raise an error if config file does not exist' do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      e = assert_raise Fluent::ConfigError do
        sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'invalid_not_found.json') }))
      end
      assert_match(/not found/, e.message)
    end
  end

  sub_test_case '#start' do
    module TestStatEventHelperWrapper
      # easy to control statsevent
      def event_loop_attach(watcher)
        unless watcher.is_a?(Fluent::Plugin::FileServiceDiscovery::StatWatcher)
          super
          return
        end

        @test_stat_event_helper_wrapper_watchers ||= []
        @test_stat_event_helper_wrapper_watchers << watcher

        @test_stat_event_helper_wrapper_context = Fiber.new do
          loop do
            @test_stat_event_helper_wrapper_watchers.each do |w|
              w.on_change('old', 'new')
            end

            if Fiber.yield == :finish
              break
            end
          end
        end
        resume
      end

      def resume
        @test_stat_event_helper_wrapper_context.resume(:resume)
      end

      def shutdown
        super

        if @test_stat_event_helper_wrapper_context
          @test_stat_event_helper_wrapper_context.resume(:finish)
        end
      end
    end

    def create_tmp_config(path, body)
      File.write(File.join(@dir, 'tmp', path), body)
    end

    setup do
      sdf = Fluent::Plugin::FileServiceDiscovery.new
      @sd_file = sdf
    end

    teardown do
      if @sd_file
        @sd_file.stop unless @sd_file.stopped?
        @sd_file.before_shutdown unless @sd_file.before_shutdown?
        @sd_file.shutdown unless @sd_file.shutdown?
        @sd_file.after_shutdown unless @sd_file.after_shutdown?
        @sd_file.close unless @sd_file.closed?
        @sd_file.terminate unless @sd_file.terminated?
      end
    end

    test 'Skip if file is not updated' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('config.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') }))
      queue = []
      mock.proxy(@sd_file).refresh_file(queue).twice

      @sd_file.start(queue)
      assert_empty queue

      @sd_file.resume
      assert_empty queue
    end

    test 'Skip if file is invalid contents' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('config.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') }))

      queue = []
      @sd_file.start(queue)

      mock.proxy(@sd_file).refresh_file(queue).once
      create_tmp_config('test.json', 'invalid contents')
      @sd_file.resume

      assert_empty queue
    end

    test 'Skip if error is occurred' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('config.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') }))
      queue = []

      FileUtils.rm_r(File.join(@dir, 'tmp', 'config.json'))
      mock.proxy(@sd_file).refresh_file(queue).twice

      @sd_file.start(queue)
      assert_empty queue

      @sd_file.resume
      assert_empty queue
    end

    test 'if service is updated, service_in and service_out event happen' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') }))

      queue = []
      @sd_file.start(queue)
      create_tmp_config('test.json', JSON.generate([{ port: 1234, host: '127.0.0.1' }]))
      @sd_file.resume

      assert_equal 2, queue.size
      join = queue.shift
      drain = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
      assert_equal 1234, join.service.port
      assert_equal '127.0.0.1', join.service.host

      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
      assert_equal 1233, drain.service.port
      assert_equal '127.0.0.1', drain.service.host
    end

    test 'if service is deleted, service_out event happens' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }, { port: 1234, host: '127.0.0.2' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') }))

      queue = []
      @sd_file.start(queue)
      create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.resume

      assert_equal 1, queue.size
      drain = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
      assert_equal 1234, drain.service.port
      assert_equal '127.0.0.2', drain.service.host
    end

    test 'if new service is added, service_in event happens' do
      @sd_file.extend(TestStatEventHelperWrapper)

      create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }]))
      @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') }))

      queue = []
      @sd_file.start(queue)
      create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }, { port: 1234, host: '127.0.0.2' }]))
      @sd_file.resume

      assert_equal 1, queue.size
      join = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
      assert_equal 1234, join.service.port
      assert_equal '127.0.0.2', join.service.host
    end
  end
end