Back to Repositories

Testing Buffer Chunk Implementation in Fluentd

This test suite validates the functionality of Fluentd’s buffer chunk implementation, focusing on core buffer operations, lifecycle management, and data compression features. The tests ensure proper handling of data chunks in the logging pipeline.

Test Coverage Overview

The test suite provides comprehensive coverage of buffer chunk functionality including:

  • Buffer chunk initialization and metadata handling
  • Chunk lifecycle states (unstaged, staged, queued, closed)
  • Data manipulation methods (append, concat, read, write)
  • MessagePack event streaming capabilities
  • Gzip compression support

Implementation Analysis

The testing approach utilizes Test::Unit framework with sub-test cases to organize related test scenarios. The implementation employs mock objects and custom test chunks to validate both basic and extended functionality. The tests verify method availability, state transitions, and proper error handling.

Technical Details

  • Testing Framework: Test::Unit
  • Key Dependencies: MessagePack, StringIO
  • Test Components: BufferChunkTest, TestChunk (custom implementation)
  • Features Tested: Chunk lifecycle, streaming, compression

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Isolation of test cases using sub_test_case blocks
  • Comprehensive state transition testing
  • Proper error handling verification
  • Extension mechanism testing (MessagePack streaming, compression)
  • Clear test organization and naming conventions

fluent/fluentd

test/plugin/test_buffer_chunk.rb

            
require_relative '../helper'
require 'fluent/plugin/buffer/chunk'

class BufferChunkTest < Test::Unit::TestCase
  sub_test_case 'blank buffer chunk' do
    test 'has generated unique id, given metadata, created_at and modified_at' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
      assert{ chunk.unique_id.bytesize == 16 }
      assert{ chunk.metadata.object_id == meta.object_id }
      assert{ chunk.created_at.is_a? Time }
      assert{ chunk.modified_at.is_a? Time }
      assert chunk.unstaged?
      assert !chunk.staged?
      assert !chunk.queued?
      assert !chunk.closed?
    end

    test 'has many methods for chunks, but not implemented' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

      assert chunk.respond_to?(:append)
      assert chunk.respond_to?(:concat)
      assert chunk.respond_to?(:commit)
      assert chunk.respond_to?(:rollback)
      assert chunk.respond_to?(:bytesize)
      assert chunk.respond_to?(:size)
      assert chunk.respond_to?(:length)
      assert chunk.respond_to?(:empty?)
      assert chunk.respond_to?(:read)
      assert chunk.respond_to?(:open)
      assert chunk.respond_to?(:write_to)
      assert_raise(NotImplementedError){ chunk.append([]) }
      assert_raise(NotImplementedError){ chunk.concat(nil, 0) }
      assert_raise(NotImplementedError){ chunk.commit }
      assert_raise(NotImplementedError){ chunk.rollback }
      assert_raise(NotImplementedError){ chunk.bytesize }
      assert_raise(NotImplementedError){ chunk.size }
      assert_raise(NotImplementedError){ chunk.length }
      assert_raise(NotImplementedError){ chunk.empty? }
      assert_raise(NotImplementedError){ chunk.read }
      assert_raise(NotImplementedError){ chunk.open(){} }
      assert_raise(NotImplementedError){ chunk.write_to(nil) }
      assert !chunk.respond_to?(:msgpack_each)
    end

    test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

      assert !chunk.respond_to?(:each)
      assert !chunk.respond_to?(:msgpack_each)

      chunk.extend Fluent::ChunkMessagePackEventStreamer
      assert chunk.respond_to?(:each)
      assert chunk.respond_to?(:msgpack_each)
    end

    test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
      chunk.extend Fluent::ChunkMessagePackEventStreamer

      unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker

      assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) }
      assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) }
    end

    test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

      assert_raise(ArgumentError){ chunk.read(compressed: :gzip) }
      assert_raise(ArgumentError){ chunk.open(compressed: :gzip){} }
      assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :gzip) }
      assert_raise(ArgumentError){ chunk.append(nil, compress: :gzip) }
    end
  end

  class TestChunk < Fluent::Plugin::Buffer::Chunk
    attr_accessor :data
    def initialize(meta)
      super
      @data = ''
    end
    def size
      @data.size
    end
    def open(**kwargs)
      require 'stringio'
      io = StringIO.new(@data)
      yield io
    end
  end

  sub_test_case 'minimum chunk implements #size and #open' do
    test 'chunk lifecycle' do
      c = TestChunk.new(Object.new)
      assert c.unstaged?
      assert !c.staged?
      assert !c.queued?
      assert !c.closed?
      assert c.writable?

      c.staged!

      assert !c.unstaged?
      assert c.staged?
      assert !c.queued?
      assert !c.closed?
      assert c.writable?

      c.enqueued!

      assert !c.unstaged?
      assert !c.staged?
      assert c.queued?
      assert !c.closed?
      assert !c.writable?

      c.close

      assert !c.unstaged?
      assert !c.staged?
      assert !c.queued?
      assert c.closed?
      assert !c.writable?
    end

    test 'chunk can be unstaged' do
      c = TestChunk.new(Object.new)
      assert c.unstaged?
      assert !c.staged?
      assert !c.queued?
      assert !c.closed?
      assert c.writable?

      c.staged!

      assert !c.unstaged?
      assert c.staged?
      assert !c.queued?
      assert !c.closed?
      assert c.writable?

      c.unstaged!

      assert c.unstaged?
      assert !c.staged?
      assert !c.queued?
      assert !c.closed?
      assert c.writable?

      c.enqueued!

      assert !c.unstaged?
      assert !c.staged?
      assert c.queued?
      assert !c.closed?
      assert !c.writable?

      c.close

      assert !c.unstaged?
      assert !c.staged?
      assert !c.queued?
      assert c.closed?
      assert !c.writable?
    end

    test 'can respond to #empty? correctly' do
      c = TestChunk.new(Object.new)
      assert_equal 0, c.size
      assert c.empty?
    end

    test 'can write its contents to io object' do
      c = TestChunk.new(Object.new)
      c.data << "my data\nyour data\n"
      io = StringIO.new
      c.write_to(io)
      assert "my data\nyour data\n", io.to_s
    end

    test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do
      require 'msgpack'
      c = TestChunk.new(Object.new)
      c.extend Fluent::ChunkMessagePackEventStreamer
      c.data << MessagePack.pack(['my data', 1])
      c.data << MessagePack.pack(['your data', 2])
      ary = []
      c.msgpack_each do |obj|
        ary << obj
      end
      assert_equal ['my data', 1], ary[0]
      assert_equal ['your data', 2], ary[1]
    end
  end

  sub_test_case 'when compress is gzip' do
    test 'create decompressable chunk' do
      meta = Object.new
      chunk = Fluent::Plugin::Buffer::Chunk.new(meta, compress: :gzip)
      assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
    end
  end
end