Back to Repositories

Testing SRV Service Discovery Implementation in Fluentd

This test suite validates the SRV Service Discovery functionality in Fluentd, focusing on DNS record handling and service configuration. It ensures proper service discovery implementation through comprehensive testing of DNS resolution, service updates, and event handling mechanisms.

Test Coverage Overview

The test suite covers essential SRV service discovery functionality including:

  • DNS record priority ordering and resolution
  • Hostname resolution with configurable DNS lookup
  • Custom DNS server configuration and port handling
  • RFC2782 compliance for target formatting
  • Service update event handling and queue management

Implementation Analysis

The testing approach utilizes Test::Unit with FlexMock for dependency isolation. It implements sub-test cases for configuration and runtime behavior verification, with particular attention to service state changes and event propagation.

The test structure follows a modular pattern with separate test cases for configuration and operational aspects.

Technical Details

Key technical components include:

  • Test::Unit framework integration
  • FlexMock for DNS resolver mocking
  • Fiber-based timer event simulation
  • Custom test helper modules for event control
  • Service discovery event queue validation

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Comprehensive setup and teardown handling
  • Isolated test cases with clear boundaries
  • Thorough error condition coverage
  • Mock object usage for external dependencies
  • Clear test case organization and naming

fluent/fluentd

test/plugin/test_sd_srv.rb

            
require_relative '../helper'
require 'fluent/plugin/sd_srv'
require 'fileutils'
require 'flexmock/test_unit'
require 'json'

class SrvServiceDiscoveryTest < ::Test::Unit::TestCase
  SRV_RECORD1 = Resolv::DNS::Resource::IN::SRV.new(1, 10, 8081, 'service1.example.com')
  SRV_RECORD2 = Resolv::DNS::Resource::IN::SRV.new(2, 20, 8082, 'service2.example.com')

  sub_test_case 'configure' do
    test 'set services ordered by priority' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }

      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'reuturn host name without revolving name when dns_lookup is flase' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, 'service1.example.com', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, 'service2.example.com', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'pass a value as :nameserver to Resolve::DNS when dns_server_host is given' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock(Resolv::DNS).new(nameserver: '8.8.8.8') { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_server_host' => '8.8.8.8' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'pass a value as :nameserver_port to Resolve::DNS when dns_server_host has port' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock(Resolv::DNS).new(nameserver_port: [['8.8.8.8', 8080]]) { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_server_host' => '8.8.8.8:8080' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'target follows RFC2782' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources).with("_service1._tcp.example.com", Resolv::DNS::Resource::IN::SRV)
               .and_return([SRV_RECORD1, SRV_RECORD2])
               .mock

      mock(Resolv::DNS).new { mock }
      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'can change protocol' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources).with("_service1._udp.example.com", Resolv::DNS::Resource::IN::SRV)
               .and_return([SRV_RECORD1, SRV_RECORD2])
               .mock

      mock(Resolv::DNS).new { mock }
      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'proto' => 'udp' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
    end

    test 'can set password, username, password' do
      sdf = Fluent::Plugin::SrvServiceDiscovery.new
      mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }

      sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'shared_key' => 'key', 'username' => 'user', 'password' => 'pass' }))
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, 'user', 'pass', 'key'), sdf.services[0]
      assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, 'user', 'pass', 'key'), sdf.services[1]
    end
  end

  sub_test_case '#start' do
    module TestTimerEventHelperWrapper
      # easy to control statsevent
      def timer_execute(_name, _interval, &block)
        @test_timer_event_helper_wrapper_context = Fiber.new do
          loop do
            block.call

            if Fiber.yield == :finish
              break
            end
          end
        end

        resume
      end

      def resume
        @test_timer_event_helper_wrapper_context.resume(:resume)
      end

      def shutdown
        super

        if @test_timer_event_helper_wrapper_context
          @test_timer_event_helper_wrapper_context.resume(:finish)
        end

      end
    end

    setup do
      sds = Fluent::Plugin::SrvServiceDiscovery.new
      @sd_srv = sds
    end

    teardown do
      if @sd_srv
        @sd_srv.stop unless @sd_srv.stopped?
        @sd_srv.before_shutdown unless @sd_srv.before_shutdown?
        @sd_srv.shutdown unless @sd_srv.shutdown?
        @sd_srv.after_shutdown unless @sd_srv.after_shutdown?
        @sd_srv.close unless @sd_srv.closed?
        @sd_srv.terminate unless @sd_srv.terminated?
      end
    end

    test 'Skip if srv record is not updated' do
      @sd_srv.extend(TestTimerEventHelperWrapper)
      mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }
      @sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
      queue = []

      @sd_srv.start(queue)
      assert_empty queue

      @sd_srv.resume
      assert_empty queue
    end

    test 'Skip if DNS resolver raise an error' do
      @sd_srv.extend(TestTimerEventHelperWrapper)
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources)
               .and_return([SRV_RECORD1, SRV_RECORD2])
               .and_return { raise 'some error' } # for start
               .and_return { raise 'some error' } # for resume
               .mock

      mock(Resolv::DNS).new { mock }
      @sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
      queue = []

      @sd_srv.start(queue)
      assert_empty queue

      @sd_srv.resume
      assert_empty queue
    end

    test 'if service is updated, service_in and service_out event happen' do
      @sd_srv.extend(TestTimerEventHelperWrapper)
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources)
               .and_return([SRV_RECORD1])
               .and_return([SRV_RECORD2])
               .mock

      mock(Resolv::DNS).new { mock }
      @sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
      queue = []

      @sd_srv.start(queue)
      join = queue.shift
      drain = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
      assert_equal 8082, join.service.port
      assert_equal 'service2.example.com', join.service.host

      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
      assert_equal 8081, drain.service.port
      assert_equal 'service1.example.com', drain.service.host
    end

    test 'if service is deleted, service_out event happens' do
      @sd_srv.extend(TestTimerEventHelperWrapper)
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources)
               .and_return([SRV_RECORD1, SRV_RECORD2])
               .and_return([SRV_RECORD2])
               .mock

      mock(Resolv::DNS).new { mock }
      @sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
      queue = []

      @sd_srv.start(queue)

      assert_equal 1, queue.size
      drain = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
      assert_equal 8081, drain.service.port
      assert_equal 'service1.example.com', drain.service.host
    end

    test 'if new service is added, service_in event happens' do
      @sd_srv.extend(TestTimerEventHelperWrapper)
      mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
               .should_receive(:getresources)
               .and_return([SRV_RECORD2])
               .and_return([SRV_RECORD1, SRV_RECORD2])
               .mock

      mock(Resolv::DNS).new { mock }
      @sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
      queue = []

      @sd_srv.start(queue)

      assert_equal 1, queue.size
      join = queue.shift
      assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
      assert_equal 8081, join.service.port
      assert_equal 'service1.example.com', join.service.host
    end
  end
end