Testing Socket Cache Management in Fluentd Forward Output Plugin
This test suite validates the socket cache functionality in Fluentd’s forward output plugin. It ensures proper handling of socket connections, lifecycle management, and resource cleanup for network communications.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/plugin/out_forward/test_socket_cache.rb
require_relative '../../helper'
require 'fluent/plugin/out_forward/socket_cache'
require 'timecop'
class SocketCacheTest < Test::Unit::TestCase
sub_test_case 'checkout_or' do
test 'when given key does not exist' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = mock!.open { 'socket' }.subject
assert_equal('socket', c.checkout_or('key') { sock.open })
end
test 'when given key exists' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
socket = 'socket'
assert_equal(socket, c.checkout_or('key') { socket })
c.checkin(socket)
sock = mock!.open.never.subject
assert_equal(socket, c.checkout_or('key') { sock.open })
end
test 'when given key exists but used by other' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
assert_equal('sock', c.checkout_or('key') { 'sock' })
new_sock = 'new sock'
sock = mock!.open { new_sock }.subject
assert_equal(new_sock, c.checkout_or('key') { sock.open })
end
test "when given key's value was expired" do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(0, $log)
assert_equal('sock', c.checkout_or('key') { 'sock' })
new_sock = 'new sock'
sock = mock!.open { new_sock }.subject
assert_equal(new_sock, c.checkout_or('key') { sock.open })
end
test 'reuse same hash object after calling purge_obsolete_socks' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
c.checkout_or('key') { 'socket' }
c.purge_obsolete_socks
assert_nothing_raised(NoMethodError) do
c.checkout_or('key') { 'new socket' }
end
end
end
sub_test_case 'checkin' do
test 'when value exists' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
socket = 'socket'
c.checkout_or('key') { socket }
c.checkin(socket)
assert_equal(socket, c.instance_variable_get(:@available_sockets)['key'].first.sock)
assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
end
test 'when value does not exist' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
c.checkout_or('key') { 'sock' }
c.checkin('other sock')
assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
end
end
test 'revoke' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
socket = 'socket'
c.checkout_or('key') { socket }
c.revoke(socket)
assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)
assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
sock = mock!.open { 1 }.subject
assert_equal(1, c.checkout_or('key') { sock.open })
end
sub_test_case 'clear' do
test 'when value is in available_sockets' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
m = mock!.close { 'closed' }.subject
m2 = mock!.close { 'closed' }.subject
m3 = mock!.close { 'closed' }.subject
c.checkout_or('key') { m }
c.revoke(m)
c.checkout_or('key') { m2 }
c.checkin(m2)
c.checkout_or('key2') { m3 }
assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)
c.clear
assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
assert_equal(0, c.instance_variable_get(:@inactive_sockets).size)
end
end
sub_test_case 'purge_obsolete_socks' do
def teardown
Timecop.return
end
test 'delete key in inactive_socks' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = mock!.close { 'closed' }.subject
c.checkout_or('key') { sock }
c.revoke(sock)
assert_false(c.instance_variable_get(:@inactive_sockets).empty?)
c.purge_obsolete_socks
assert_true(c.instance_variable_get(:@inactive_sockets).empty?)
end
test 'move key from available_sockets to inactive_sockets' do
Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = mock!.close { 'closed' }.subject
sock2 = mock!.close.never.subject
stub(sock).inspect
stub(sock2).inspect
c.checkout_or('key') { sock }
c.checkin(sock)
# wait timeout
Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
c.checkout_or('key') { sock2 }
assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)
c.purge_obsolete_socks
assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)
end
test 'should not purge just after checkin and purge after timeout' do
Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = mock!.close.never.subject
stub(sock).inspect
c.checkout_or('key') { sock }
Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
c.checkin(sock)
assert_equal(1, c.instance_variable_get(:@available_sockets).size)
c.purge_obsolete_socks
assert_equal(1, c.instance_variable_get(:@available_sockets).size)
Timecop.freeze(Time.parse('2016-04-13 14:00:22 +0900'))
assert_equal(1, c.instance_variable_get(:@available_sockets).size)
c.purge_obsolete_socks
assert_equal(0, c.instance_variable_get(:@available_sockets).size)
end
end
end