Back to Repositories

Testing Event Stream Processing and Serialization in Fluentd

This test suite validates the event handling and stream processing functionality in Fluentd, focusing on different event stream implementations and their behaviors. It covers essential operations like event serialization, compression, and stream manipulation across multiple event stream types.

Test Coverage Overview

The test suite comprehensively covers four main event stream implementations: OneEventStream, ArrayEventStream, MultiEventStream, and MessagePackEventStream. Key functionality tested includes:

  • Event stream initialization and manipulation
  • Stream compression and serialization
  • Deep copy operations and record duplication
  • Stream slicing and iteration capabilities
  • MessagePack encoding/decoding operations

Implementation Analysis

The testing approach uses Ruby’s Test::Unit framework with modular test organization through nested test classes. Each event stream implementation is tested using shared behaviors via Ruby modules (DeepCopyAssertion, Compressable), ensuring consistent verification across different stream types.

The tests employ fixture setup through setup methods and utilize helper assertions for complex validations.

Technical Details

Testing tools and configuration:

  • Test::Unit as the primary testing framework
  • Fluent::MessagePackFactory for message serialization
  • Custom assertion modules for deep copy validation
  • Event time handling utilities
  • Compression utilities via Compressable module

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Modular test organization with shared behavior modules
  • Comprehensive edge case coverage
  • Consistent setup and teardown patterns
  • Thorough validation of object state and behavior
  • Efficient test data management through setup methods

fluent/fluentd

test/test_event.rb

            
require_relative 'helper'
require 'fluent/test'
require 'fluent/event'
require 'fluent/plugin/compressable'

module EventTest
  module DeepCopyAssertion
    def assert_duplicated_records(es1, es2)
      ary1 = []
      es1.each do |_, record|
        ary1 << record
      end
      ary2 = []
      es2.each do |_, record|
        ary2 << record
      end
      assert_equal ary1.size, ary2.size
      ary1.each_with_index do |r, i|
        assert_not_equal r.object_id, ary2[i].object_id
      end
    end
  end

  class OneEventStreamTest < ::Test::Unit::TestCase
    include Fluent
    include DeepCopyAssertion
    include Fluent::Plugin::Compressable

    def setup
      @time = event_time()
      @record = {'k' => 'v', 'n' => 1}
      @es = OneEventStream.new(@time, @record)
    end

    test 'empty?' do
      assert_false @es.empty?
    end

    test 'size' do
      assert_equal 1, @es.size
    end

    test 'repeatable?' do
      assert_true @es.repeatable?
    end

    test 'dup' do
      dupped = @es.dup
      assert_kind_of OneEventStream, dupped
      assert_not_equal @es.object_id, dupped.object_id
      assert_duplicated_records @es, dupped
    end

    test 'slice' do
      assert_equal 0, @es.slice(1, 1).size
      assert_equal 0, @es.slice(0, 0).size

      sliced = @es.slice(0, 1)
      assert_kind_of EventStream, sliced
      assert_equal 1, sliced.size

      sliced.each do |time, record|
        assert_equal @time, time
        assert_equal @record, record
      end
    end

    test 'each' do
      @es.each { |time, record|
        assert_equal @time, time
        assert_equal @record, record
      }
    end

    test 'to_msgpack_stream' do
      stream = @es.to_msgpack_stream
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @time, time
        assert_equal @record, record
      }
    end

    test 'to_msgpack_stream with time_int argument' do
      stream = @es.to_msgpack_stream(time_int: true)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @time.to_i, time
        assert_equal @record, record
      }
    end

    test 'to_compressed_msgpack_stream' do
      stream = @es.to_compressed_msgpack_stream
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(decompress(stream)) { |time, record|
        assert_equal @time, time
        assert_equal @record, record
      }
    end

    test 'to_compressed_msgpack_stream with time_int argument' do
      stream = @es.to_compressed_msgpack_stream(time_int: true)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(decompress(stream)) { |time, record|
        assert_equal @time.to_i, time
        assert_equal @record, record
      }
    end
  end

  class ArrayEventStreamTest < ::Test::Unit::TestCase
    include Fluent
    include DeepCopyAssertion
    include Fluent::Plugin::Compressable

    def setup
      time = Engine.now
      @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
      @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
      @es = ArrayEventStream.new(@times.zip(@records))
    end

    test 'repeatable?' do
      assert_true @es.repeatable?
    end

    test 'dup' do
      dupped = @es.dup
      assert_kind_of ArrayEventStream, dupped
      assert_not_equal @es.object_id, dupped.object_id
      assert_duplicated_records @es, dupped
    end

    test 'empty?' do
      assert_not_empty @es
      assert_true ArrayEventStream.new([]).empty?
    end

    test 'size' do
      assert_equal 2, @es.size
      assert_equal 0, ArrayEventStream.new([]).size
    end

    test 'slice' do
      sliced = @es.slice(1,1)
      assert_kind_of EventStream, sliced
      assert_equal 1, sliced.size

      sliced.each do |time, record|
        assert_equal @times[1], time
        assert_equal 'v2', record['k']
        assert_equal 2, record['n']
      end

      sliced = @es.slice(0,2)
      assert_kind_of EventStream, sliced
      assert_equal 2, sliced.size

      counter = 0
      sliced.each do |time, record|
        assert_equal @times[counter], time
        assert_equal @records[counter]['k'], record['k']
        assert_equal @records[counter]['n'], record['n']
        counter += 1
      end
    end

    test 'each' do
      i = 0
      @es.each { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_msgpack_stream' do
      i = 0
      stream = @es.to_msgpack_stream
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream' do
      i = 0
      compressed_stream = @es.to_compressed_msgpack_stream
      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream with time_int argument' do
      i = 0
      compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i].to_i, time
        assert_equal @records[i], record
        i += 1
      }
    end
  end

  class MultiEventStreamTest < ::Test::Unit::TestCase
    include Fluent
    include DeepCopyAssertion
    include Fluent::Plugin::Compressable

    def setup
      time = Engine.now
      @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
      @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
      @es = MultiEventStream.new
      @times.zip(@records).each { |_time, record|
        @es.add(_time, record)
      }
    end

    test 'repeatable?' do
      assert_true @es.repeatable?
    end

    test 'dup' do
      dupped = @es.dup
      assert_kind_of MultiEventStream, dupped
      assert_not_equal @es.object_id, dupped.object_id
      assert_duplicated_records @es, dupped
    end

    test 'empty?' do
      assert_not_empty @es
      assert_true MultiEventStream.new.empty?
    end

    test 'size' do
      assert_equal 2, @es.size
      assert_equal 0, MultiEventStream.new.size
    end

    test 'slice' do
      sliced = @es.slice(1,1)
      assert_kind_of EventStream, sliced
      assert_equal 1, sliced.size

      sliced.each do |time, record|
        assert_equal @times[1], time
        assert_equal 'v2', record['k']
        assert_equal 2, record['n']
      end

      sliced = @es.slice(0,2)
      assert_kind_of EventStream, sliced
      assert_equal 2, sliced.size

      counter = 0
      sliced.each do |time, record|
        assert_equal @times[counter], time
        assert_equal @records[counter]['k'], record['k']
        assert_equal @records[counter]['n'], record['n']
        counter += 1
      end
    end

    test 'each' do
      i = 0
      @es.each { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_msgpack_stream' do
      i = 0
      stream = @es.to_msgpack_stream
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream' do
      i = 0
      compressed_stream = @es.to_compressed_msgpack_stream
      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream with time_int argument' do
      i = 0
      compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i].to_i, time
        assert_equal @records[i], record
        i += 1
      }
    end
  end

  class MessagePackEventStreamTest < ::Test::Unit::TestCase
    include Fluent
    include DeepCopyAssertion
    include Fluent::Plugin::Compressable

    def setup
      pk = Fluent::MessagePackFactory.msgpack_packer
      time = Engine.now
      @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
      @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
      @times.zip(@records).each { |_time, record|
        pk.write([_time, record])
      }
      @es = MessagePackEventStream.new(pk.to_s)
    end

    test 'dup' do
      dupped = @es.dup
      assert_kind_of MessagePackEventStream, dupped
      assert_not_equal @es.object_id, dupped.object_id
      assert_duplicated_records @es, dupped

      # After iteration of events (done in assert_duplicated_records),
      # duplicated event stream still has unpacked objects and correct size
      dupped = @es.dup
      assert_equal 2, dupped.instance_eval{ @size }
    end

    test 'empty?' do
      assert_false @es.empty?
      assert_true MessagePackEventStream.new('', 0).empty?
    end

    test 'size' do
      assert_equal 2, @es.size
      assert_equal 0, MessagePackEventStream.new('').size
    end

    test 'repeatable?' do
      assert_true @es.repeatable?
    end

    test 'slice' do
      sliced = @es.slice(1,1)
      assert_kind_of EventStream, sliced
      assert_equal 1, sliced.size

      sliced.each do |time, record|
        assert_equal @times[1], time
        assert_equal 'v2', record['k']
        assert_equal 2, record['n']
      end

      sliced = @es.slice(0,2)
      assert_kind_of EventStream, sliced
      assert_equal 2, sliced.size

      counter = 0
      sliced.each do |time, record|
        assert_equal @times[counter], time
        assert_equal @records[counter]['k'], record['k']
        assert_equal @records[counter]['n'], record['n']
        counter += 1
      end
    end

    test 'each' do
      i = 0
      @es.each { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_msgpack_stream' do
      i = 0
      stream = @es.to_msgpack_stream
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream' do
      i = 0
      compressed_stream = @es.to_compressed_msgpack_stream
      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    # `any?` represents an Enumerable method which calls `each` internally
    test 'size_after_any' do
      @es.any?

      assert_equal 2, @es.size
    end

    # `any?` represents an Enumerable method which calls `each` internally
    test 'each_after_any' do
      @es.any?

      count = 0
      @es.each { |time, record| count += 1 }
      assert_equal 2, count
    end
  end

  class CompressedMessagePackEventStreamTest < ::Test::Unit::TestCase
    include Fluent
    include DeepCopyAssertion
    include Fluent::Plugin::Compressable

    def setup
      time = Engine.now
      @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
      @records = [{ 'k' => 'v1', 'n' => 1 }, { 'k' => 'v2', 'n' => 2 }]
      @packed_record = ''
      @entries = ''
      @times.zip(@records).each do |_time, record|
        v = [_time, record].to_msgpack
        @packed_record += v
        @entries += compress(v)
      end
      @es = CompressedMessagePackEventStream.new(@entries)
    end

    def ensure_data_is_decompressed
      assert_equal @entries, @es.instance_variable_get(:@data)
      yield
      assert_equal @packed_record, @es.instance_variable_get(:@data)
    end

    test 'dup' do
      dupped = @es.dup
      assert_kind_of CompressedMessagePackEventStream, dupped
      assert_not_equal @es.object_id, dupped.object_id
      assert_duplicated_records @es, dupped

      # After iteration of events (done in assert_duplicated_records),
      # duplicated event stream still has unpacked objects and correct size
      dupped = @es.dup
      assert_equal 2, dupped.instance_eval{ @size }
    end

    test 'repeatable?' do
      assert_true @es.repeatable?
    end

    test 'size' do
      assert_equal 0, CompressedMessagePackEventStream.new('').size
      ensure_data_is_decompressed { assert_equal 2, @es.size  }
    end

    test 'each' do
      i = 0
      ensure_data_is_decompressed do
        @es.each do |time, record|
          assert_equal @times[i], time
          assert_equal @records[i], record
          i += 1
        end
      end
    end

    test 'slice' do
      sliced = nil
      ensure_data_is_decompressed { sliced = @es.slice(1,1) }
      assert_kind_of EventStream, sliced
      assert_equal 1, sliced.size

      sliced.each do |time, record|
        assert_equal @times[1], time
        assert_equal 'v2', record['k']
        assert_equal 2, record['n']
      end

      sliced = @es.slice(0,2)
      assert_kind_of EventStream, sliced
      assert_equal 2, sliced.size

      counter = 0
      sliced.each do |time, record|
        assert_equal @times[counter], time
        assert_equal @records[counter]['k'], record['k']
        assert_equal @records[counter]['n'], record['n']
        counter += 1
      end
    end

    test 'to_msgpack_stream' do
      i = 0
      stream = nil
      ensure_data_is_decompressed { stream = @es.to_msgpack_stream }

      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end

    test 'to_compressed_msgpack_stream' do
      i = 0
      # Do not call ensure_decompressed!
      assert_equal @entries, @es.instance_variable_get(:@data)
      compressed_stream = @es.to_compressed_msgpack_stream
      assert_equal @entries, @es.instance_variable_get(:@data)

      stream = decompress(compressed_stream)
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
        assert_equal @times[i], time
        assert_equal @records[i], record
        i += 1
      }
    end
  end
end