Testing Buffer Chunk Implementation in Fluentd
This test suite validates the functionality of Fluentd’s buffer chunk implementation, focusing on core buffer operations, lifecycle management, and data compression features. The tests ensure proper handling of data chunks in the logging pipeline.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/plugin/test_buffer_chunk.rb
require_relative '../helper'
require 'fluent/plugin/buffer/chunk'
class BufferChunkTest < Test::Unit::TestCase
sub_test_case 'blank buffer chunk' do
test 'has generated unique id, given metadata, created_at and modified_at' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
assert{ chunk.unique_id.bytesize == 16 }
assert{ chunk.metadata.object_id == meta.object_id }
assert{ chunk.created_at.is_a? Time }
assert{ chunk.modified_at.is_a? Time }
assert chunk.unstaged?
assert !chunk.staged?
assert !chunk.queued?
assert !chunk.closed?
end
test 'has many methods for chunks, but not implemented' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
assert chunk.respond_to?(:append)
assert chunk.respond_to?(:concat)
assert chunk.respond_to?(:commit)
assert chunk.respond_to?(:rollback)
assert chunk.respond_to?(:bytesize)
assert chunk.respond_to?(:size)
assert chunk.respond_to?(:length)
assert chunk.respond_to?(:empty?)
assert chunk.respond_to?(:read)
assert chunk.respond_to?(:open)
assert chunk.respond_to?(:write_to)
assert_raise(NotImplementedError){ chunk.append([]) }
assert_raise(NotImplementedError){ chunk.concat(nil, 0) }
assert_raise(NotImplementedError){ chunk.commit }
assert_raise(NotImplementedError){ chunk.rollback }
assert_raise(NotImplementedError){ chunk.bytesize }
assert_raise(NotImplementedError){ chunk.size }
assert_raise(NotImplementedError){ chunk.length }
assert_raise(NotImplementedError){ chunk.empty? }
assert_raise(NotImplementedError){ chunk.read }
assert_raise(NotImplementedError){ chunk.open(){} }
assert_raise(NotImplementedError){ chunk.write_to(nil) }
assert !chunk.respond_to?(:msgpack_each)
end
test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
assert !chunk.respond_to?(:each)
assert !chunk.respond_to?(:msgpack_each)
chunk.extend Fluent::ChunkMessagePackEventStreamer
assert chunk.respond_to?(:each)
assert chunk.respond_to?(:msgpack_each)
end
test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
chunk.extend Fluent::ChunkMessagePackEventStreamer
unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) }
assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) }
end
test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
assert_raise(ArgumentError){ chunk.read(compressed: :gzip) }
assert_raise(ArgumentError){ chunk.open(compressed: :gzip){} }
assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :gzip) }
assert_raise(ArgumentError){ chunk.append(nil, compress: :gzip) }
end
end
class TestChunk < Fluent::Plugin::Buffer::Chunk
attr_accessor :data
def initialize(meta)
super
@data = ''
end
def size
@data.size
end
def open(**kwargs)
require 'stringio'
io = StringIO.new(@data)
yield io
end
end
sub_test_case 'minimum chunk implements #size and #open' do
test 'chunk lifecycle' do
c = TestChunk.new(Object.new)
assert c.unstaged?
assert !c.staged?
assert !c.queued?
assert !c.closed?
assert c.writable?
c.staged!
assert !c.unstaged?
assert c.staged?
assert !c.queued?
assert !c.closed?
assert c.writable?
c.enqueued!
assert !c.unstaged?
assert !c.staged?
assert c.queued?
assert !c.closed?
assert !c.writable?
c.close
assert !c.unstaged?
assert !c.staged?
assert !c.queued?
assert c.closed?
assert !c.writable?
end
test 'chunk can be unstaged' do
c = TestChunk.new(Object.new)
assert c.unstaged?
assert !c.staged?
assert !c.queued?
assert !c.closed?
assert c.writable?
c.staged!
assert !c.unstaged?
assert c.staged?
assert !c.queued?
assert !c.closed?
assert c.writable?
c.unstaged!
assert c.unstaged?
assert !c.staged?
assert !c.queued?
assert !c.closed?
assert c.writable?
c.enqueued!
assert !c.unstaged?
assert !c.staged?
assert c.queued?
assert !c.closed?
assert !c.writable?
c.close
assert !c.unstaged?
assert !c.staged?
assert !c.queued?
assert c.closed?
assert !c.writable?
end
test 'can respond to #empty? correctly' do
c = TestChunk.new(Object.new)
assert_equal 0, c.size
assert c.empty?
end
test 'can write its contents to io object' do
c = TestChunk.new(Object.new)
c.data << "my data\nyour data\n"
io = StringIO.new
c.write_to(io)
assert "my data\nyour data\n", io.to_s
end
test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do
require 'msgpack'
c = TestChunk.new(Object.new)
c.extend Fluent::ChunkMessagePackEventStreamer
c.data << MessagePack.pack(['my data', 1])
c.data << MessagePack.pack(['your data', 2])
ary = []
c.msgpack_each do |obj|
ary << obj
end
assert_equal ['my data', 1], ary[0]
assert_equal ['your data', 2], ary[1]
end
end
sub_test_case 'when compress is gzip' do
test 'create decompressable chunk' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta, compress: :gzip)
assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
end
end
end