Testing Forward Output Acknowledgment Handler in Fluentd
This test suite validates the acknowledgment handling functionality in Fluentd’s forward output plugin. It focuses on verifying proper handling of chunk acknowledgments, timeout scenarios, and thread safety in the AckHandler component.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/plugin/out_forward/test_ack_handler.rb
require_relative '../../helper'
require 'fluent/test/driver/output'
require 'flexmock/test_unit'
require 'fluent/plugin/out_forward'
require 'fluent/plugin/out_forward/ack_handler'
class AckHandlerTest < Test::Unit::TestCase
data(
'chunk_id is matched' => [MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') }), Fluent::Plugin::ForwardOutput::AckHandler::Result::SUCCESS],
'chunk_id is not matched' => [MessagePack.pack({ 'ack' => 'unmatched' }), Fluent::Plugin::ForwardOutput::AckHandler::Result::CHUNKID_UNMATCHED],
'chunk_id is empty' => ['', Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED],
)
test 'returns chunk_id, node, sock and result status' do |args|
receved, state = args
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)
node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)
r, w = IO.pipe
begin
w.write(chunk_id)
mock(r).recv(anything) { |_| receved } # IO does not have recv
ack.enqueue(r)
a1 = a2 = a3 = a4 = nil
ack_handler.collect_response(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end
assert_equal chunk_id, a1
assert_equal node, a2
assert_equal r, a3
assert_equal state, a4
ensure
r.close rescue nil
w.close rescue nil
end
end
test 'returns nil if raise an error' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)
node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)
r, w = IO.pipe
begin
w.write(chunk_id)
mock(r).recv(anything) { |_| raise 'unexpected error' } # IO does not have recv
ack.enqueue(r)
a1 = a2 = a3 = a4 = nil
ack_handler.collect_response(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end
assert_nil a1
assert_nil a2
assert_nil a3
assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
ensure
r.close rescue nil
w.close rescue nil
end
end
test 'when ack is expired' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 0, log: $log, read_length: 100)
node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)
r, w = IO.pipe
begin
w.write(chunk_id)
mock(r).recv(anything).never
ack.enqueue(r)
a1 = a2 = a3 = a4 = nil
ack_handler.collect_response(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end
assert_equal chunk_id, a1
assert_equal node, a2
assert_equal r, a3
assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
ensure
r.close rescue nil
w.close rescue nil
end
end
# ForwardOutput uses AckHandler in multiple threads, so we need to assume this case.
# If exclusive control for this case is implemented, this test may not be necessary.
test 'raises no error when another thread closes a socket' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)
node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)
r, w = IO.pipe
begin
w.write(chunk_id)
def r.recv(arg)
sleep(1) # To ensure that multiple threads select the socket before closing.
raise IOError, 'stream closed in another thread' if self.closed?
MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') })
end
ack.enqueue(r)
threads = []
2.times do
threads << Thread.new do
ack_handler.collect_response(1) do |cid, n, s, ret|
s&.close
end
end
end
assert_true threads.map{ |t| t.join(10) }.all?
assert_false(
$log.out.logs.any? { |log| log.include?('[error]') },
$log.out.logs.select{ |log| log.include?('[error]') }.join('\n')
)
ensure
r.close rescue nil
w.close rescue nil
end
end
end