Testing Output Processing Infrastructure in Fluentd
This test suite implements comprehensive testing infrastructure for Fluentd’s output functionality, providing specialized test drivers for different output types including standard, buffered, and time-sliced outputs. The suite enables thorough validation of Fluentd’s data emission and processing capabilities.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
lib/fluent/test/output_test.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/engine'
require 'fluent/event'
require 'fluent/test/input_test'
module Fluent
module Test
class TestOutputChain
def initialize
@called = 0
end
def next
@called += 1
end
attr_reader :called
end
class OutputTestDriver < InputTestDriver
def initialize(klass, tag='test', &block)
super(klass, &block)
@tag = tag
end
attr_accessor :tag
def emit(record, time=EventTime.now)
es = OneEventStream.new(time, record)
@instance.emit_events(@tag, es)
end
end
class BufferedOutputTestDriver < InputTestDriver
def initialize(klass, tag='test', &block)
super(klass, &block)
@entries = []
@expected_buffer = nil
@tag = tag
def @instance.buffer
@buffer
end
end
attr_accessor :tag
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
def expect_format(str)
(@expected_buffer ||= '') << str
end
def run(num_waits = 10, &block)
result = nil
super(num_waits) {
block.call if block
es = ArrayEventStream.new(@entries)
buffer = @instance.format_stream(@tag, es)
if @expected_buffer
assert_equal(@expected_buffer, buffer)
end
chunk = if @instance.instance_eval{ @chunk_key_tag }
@instance.buffer.generate_chunk(@instance.metadata(@tag, nil, nil)).staged!
else
@instance.buffer.generate_chunk(@instance.metadata(nil, nil, nil)).staged!
end
chunk.concat(buffer, es.size)
begin
result = @instance.write(chunk)
ensure
chunk.purge
end
}
result
end
end
class TimeSlicedOutputTestDriver < InputTestDriver
def initialize(klass, tag='test', &block)
super(klass, &block)
@entries = []
@expected_buffer = nil
@tag = tag
end
attr_accessor :tag
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
def expect_format(str)
(@expected_buffer ||= '') << str
end
def run(&block)
result = []
super {
block.call if block
buffer = ''
lines = {}
# v0.12 TimeSlicedOutput doesn't call #format_stream
@entries.each do |time, record|
meta = @instance.metadata(@tag, time, record)
line = @instance.format(@tag, time, record)
buffer << line
lines[meta] ||= []
lines[meta] << line
end
if @expected_buffer
assert_equal(@expected_buffer, buffer)
end
lines.each_key do |meta|
chunk = @instance.buffer.generate_chunk(meta).staged!
chunk.append(lines[meta])
begin
result.push(@instance.write(chunk))
ensure
chunk.purge
end
end
}
result
end
end
end
end