Back to Repositories

Testing Socket Cache Management in Fluentd Forward Output Plugin

This test suite validates the socket cache functionality in Fluentd’s forward output plugin. It ensures proper handling of socket connections, lifecycle management, and resource cleanup for network communications.

Test Coverage Overview

The test suite comprehensively covers socket cache operations including checkout, checkin, and revocation scenarios. Key functionality tested includes:

  • Socket creation and reuse patterns
  • Cache key management and expiration
  • Connection pooling behavior
  • Resource cleanup mechanisms

Implementation Analysis

Tests utilize Minitest’s Test::Unit framework with mock objects for socket operations. The implementation follows a systematic approach to verify cache behavior through isolated test cases and time-dependent scenarios using Timecop.

Testing patterns include state verification, mock expectations, and timeout simulations.

Technical Details

Key technical components include:

  • Test::Unit framework
  • Timecop for time manipulation
  • Mock objects for socket simulation
  • ForwardOutput plugin integration
  • Cache timeout configuration

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Isolated test cases with clear setup/teardown
  • Comprehensive edge case coverage
  • Proper resource cleanup verification
  • Time-dependent behavior testing
  • Mock object usage for external dependencies

fluent/fluentd

test/plugin/out_forward/test_socket_cache.rb

            
require_relative '../../helper'

require 'fluent/plugin/out_forward/socket_cache'
require 'timecop'

class SocketCacheTest < Test::Unit::TestCase
  sub_test_case 'checkout_or' do
    test 'when given key does not exist' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.open { 'socket' }.subject
      assert_equal('socket', c.checkout_or('key') { sock.open })
    end

    test 'when given key exists' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      socket = 'socket'
      assert_equal(socket, c.checkout_or('key') { socket })
      c.checkin(socket)

      sock = mock!.open.never.subject
      assert_equal(socket, c.checkout_or('key') { sock.open })
    end

    test 'when given key exists but used by other' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      assert_equal('sock', c.checkout_or('key') { 'sock' })

      new_sock = 'new sock'
      sock = mock!.open { new_sock }.subject
      assert_equal(new_sock, c.checkout_or('key') { sock.open })
    end

    test "when given key's value was expired" do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(0, $log)
      assert_equal('sock', c.checkout_or('key') { 'sock' })

      new_sock = 'new sock'
      sock = mock!.open { new_sock }.subject
      assert_equal(new_sock, c.checkout_or('key') { sock.open })
    end

    test 'reuse same hash object after calling purge_obsolete_socks' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      c.checkout_or('key') { 'socket' }
      c.purge_obsolete_socks

      assert_nothing_raised(NoMethodError) do
        c.checkout_or('key') { 'new socket' }
      end
    end
  end

  sub_test_case 'checkin' do
    test 'when value exists' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      socket = 'socket'
      c.checkout_or('key') { socket }
      c.checkin(socket)

      assert_equal(socket, c.instance_variable_get(:@available_sockets)['key'].first.sock)
      assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
    end

    test 'when value does not exist' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      c.checkout_or('key') { 'sock' }
      c.checkin('other sock')

      assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
    end
  end

  test 'revoke' do
    c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
    socket = 'socket'
    c.checkout_or('key') { socket }
    c.revoke(socket)

    assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)
    assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
    assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)

    sock = mock!.open { 1 }.subject
    assert_equal(1, c.checkout_or('key') { sock.open })
  end

  sub_test_case 'clear' do
    test 'when value is in available_sockets' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      m = mock!.close { 'closed' }.subject
      m2 = mock!.close { 'closed' }.subject
      m3 = mock!.close { 'closed' }.subject
      c.checkout_or('key') { m }
      c.revoke(m)
      c.checkout_or('key') { m2 }
      c.checkin(m2)
      c.checkout_or('key2') { m3 }

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)

      c.clear
      assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(0, c.instance_variable_get(:@inactive_sockets).size)
    end
  end

  sub_test_case 'purge_obsolete_socks' do
    def teardown
      Timecop.return
    end

    test 'delete key in inactive_socks' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.close { 'closed' }.subject
      c.checkout_or('key') { sock }
      c.revoke(sock)
      assert_false(c.instance_variable_get(:@inactive_sockets).empty?)

      c.purge_obsolete_socks
      assert_true(c.instance_variable_get(:@inactive_sockets).empty?)
    end

    test 'move key from available_sockets to inactive_sockets' do
      Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))

      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.close { 'closed' }.subject
      sock2 = mock!.close.never.subject
      stub(sock).inspect
      stub(sock2).inspect

      c.checkout_or('key') { sock }
      c.checkin(sock)

      # wait timeout
      Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
      c.checkout_or('key') { sock2 }

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)

      c.purge_obsolete_socks

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)
    end

    test 'should not purge just after checkin and purge after timeout' do
      Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))

      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.close.never.subject
      stub(sock).inspect
      c.checkout_or('key') { sock }

      Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
      c.checkin(sock)

      assert_equal(1, c.instance_variable_get(:@available_sockets).size)
      c.purge_obsolete_socks
      assert_equal(1, c.instance_variable_get(:@available_sockets).size)

      Timecop.freeze(Time.parse('2016-04-13 14:00:22 +0900'))
      assert_equal(1, c.instance_variable_get(:@available_sockets).size)
      c.purge_obsolete_socks
      assert_equal(0, c.instance_variable_get(:@available_sockets).size)
    end
  end
end