Testing Event Stream Processing and Serialization in Fluentd
This test suite validates the event handling and stream processing functionality in Fluentd, focusing on different event stream implementations and their behaviors. It covers essential operations like event serialization, compression, and stream manipulation across multiple event stream types.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/test_event.rb
require_relative 'helper'
require 'fluent/test'
require 'fluent/event'
require 'fluent/plugin/compressable'
module EventTest
module DeepCopyAssertion
def assert_duplicated_records(es1, es2)
ary1 = []
es1.each do |_, record|
ary1 << record
end
ary2 = []
es2.each do |_, record|
ary2 << record
end
assert_equal ary1.size, ary2.size
ary1.each_with_index do |r, i|
assert_not_equal r.object_id, ary2[i].object_id
end
end
end
class OneEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
include Fluent::Plugin::Compressable
def setup
@time = event_time()
@record = {'k' => 'v', 'n' => 1}
@es = OneEventStream.new(@time, @record)
end
test 'empty?' do
assert_false @es.empty?
end
test 'size' do
assert_equal 1, @es.size
end
test 'repeatable?' do
assert_true @es.repeatable?
end
test 'dup' do
dupped = @es.dup
assert_kind_of OneEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped
end
test 'slice' do
assert_equal 0, @es.slice(1, 1).size
assert_equal 0, @es.slice(0, 0).size
sliced = @es.slice(0, 1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size
sliced.each do |time, record|
assert_equal @time, time
assert_equal @record, record
end
end
test 'each' do
@es.each { |time, record|
assert_equal @time, time
assert_equal @record, record
}
end
test 'to_msgpack_stream' do
stream = @es.to_msgpack_stream
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @time, time
assert_equal @record, record
}
end
test 'to_msgpack_stream with time_int argument' do
stream = @es.to_msgpack_stream(time_int: true)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @time.to_i, time
assert_equal @record, record
}
end
test 'to_compressed_msgpack_stream' do
stream = @es.to_compressed_msgpack_stream
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(decompress(stream)) { |time, record|
assert_equal @time, time
assert_equal @record, record
}
end
test 'to_compressed_msgpack_stream with time_int argument' do
stream = @es.to_compressed_msgpack_stream(time_int: true)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(decompress(stream)) { |time, record|
assert_equal @time.to_i, time
assert_equal @record, record
}
end
end
class ArrayEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
include Fluent::Plugin::Compressable
def setup
time = Engine.now
@times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
@records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
@es = ArrayEventStream.new(@times.zip(@records))
end
test 'repeatable?' do
assert_true @es.repeatable?
end
test 'dup' do
dupped = @es.dup
assert_kind_of ArrayEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped
end
test 'empty?' do
assert_not_empty @es
assert_true ArrayEventStream.new([]).empty?
end
test 'size' do
assert_equal 2, @es.size
assert_equal 0, ArrayEventStream.new([]).size
end
test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size
sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end
sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size
counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end
test 'each' do
i = 0
@es.each { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_msgpack_stream' do
i = 0
stream = @es.to_msgpack_stream
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream' do
i = 0
compressed_stream = @es.to_compressed_msgpack_stream
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream with time_int argument' do
i = 0
compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i].to_i, time
assert_equal @records[i], record
i += 1
}
end
end
class MultiEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
include Fluent::Plugin::Compressable
def setup
time = Engine.now
@times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
@records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
@es = MultiEventStream.new
@times.zip(@records).each { |_time, record|
@es.add(_time, record)
}
end
test 'repeatable?' do
assert_true @es.repeatable?
end
test 'dup' do
dupped = @es.dup
assert_kind_of MultiEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped
end
test 'empty?' do
assert_not_empty @es
assert_true MultiEventStream.new.empty?
end
test 'size' do
assert_equal 2, @es.size
assert_equal 0, MultiEventStream.new.size
end
test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size
sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end
sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size
counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end
test 'each' do
i = 0
@es.each { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_msgpack_stream' do
i = 0
stream = @es.to_msgpack_stream
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream' do
i = 0
compressed_stream = @es.to_compressed_msgpack_stream
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream with time_int argument' do
i = 0
compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i].to_i, time
assert_equal @records[i], record
i += 1
}
end
end
class MessagePackEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
include Fluent::Plugin::Compressable
def setup
pk = Fluent::MessagePackFactory.msgpack_packer
time = Engine.now
@times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
@records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}]
@times.zip(@records).each { |_time, record|
pk.write([_time, record])
}
@es = MessagePackEventStream.new(pk.to_s)
end
test 'dup' do
dupped = @es.dup
assert_kind_of MessagePackEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped
# After iteration of events (done in assert_duplicated_records),
# duplicated event stream still has unpacked objects and correct size
dupped = @es.dup
assert_equal 2, dupped.instance_eval{ @size }
end
test 'empty?' do
assert_false @es.empty?
assert_true MessagePackEventStream.new('', 0).empty?
end
test 'size' do
assert_equal 2, @es.size
assert_equal 0, MessagePackEventStream.new('').size
end
test 'repeatable?' do
assert_true @es.repeatable?
end
test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size
sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end
sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size
counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end
test 'each' do
i = 0
@es.each { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_msgpack_stream' do
i = 0
stream = @es.to_msgpack_stream
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream' do
i = 0
compressed_stream = @es.to_compressed_msgpack_stream
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
# `any?` represents an Enumerable method which calls `each` internally
test 'size_after_any' do
@es.any?
assert_equal 2, @es.size
end
# `any?` represents an Enumerable method which calls `each` internally
test 'each_after_any' do
@es.any?
count = 0
@es.each { |time, record| count += 1 }
assert_equal 2, count
end
end
class CompressedMessagePackEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
include Fluent::Plugin::Compressable
def setup
time = Engine.now
@times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)]
@records = [{ 'k' => 'v1', 'n' => 1 }, { 'k' => 'v2', 'n' => 2 }]
@packed_record = ''
@entries = ''
@times.zip(@records).each do |_time, record|
v = [_time, record].to_msgpack
@packed_record += v
@entries += compress(v)
end
@es = CompressedMessagePackEventStream.new(@entries)
end
def ensure_data_is_decompressed
assert_equal @entries, @es.instance_variable_get(:@data)
yield
assert_equal @packed_record, @es.instance_variable_get(:@data)
end
test 'dup' do
dupped = @es.dup
assert_kind_of CompressedMessagePackEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped
# After iteration of events (done in assert_duplicated_records),
# duplicated event stream still has unpacked objects and correct size
dupped = @es.dup
assert_equal 2, dupped.instance_eval{ @size }
end
test 'repeatable?' do
assert_true @es.repeatable?
end
test 'size' do
assert_equal 0, CompressedMessagePackEventStream.new('').size
ensure_data_is_decompressed { assert_equal 2, @es.size }
end
test 'each' do
i = 0
ensure_data_is_decompressed do
@es.each do |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
end
end
end
test 'slice' do
sliced = nil
ensure_data_is_decompressed { sliced = @es.slice(1,1) }
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size
sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end
sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size
counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end
test 'to_msgpack_stream' do
i = 0
stream = nil
ensure_data_is_decompressed { stream = @es.to_msgpack_stream }
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
test 'to_compressed_msgpack_stream' do
i = 0
# Do not call ensure_decompressed!
assert_equal @entries, @es.instance_variable_get(:@data)
compressed_stream = @es.to_compressed_msgpack_stream
assert_equal @entries, @es.instance_variable_get(:@data)
stream = decompress(compressed_stream)
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(stream) { |time, record|
assert_equal @times[i], time
assert_equal @records[i], record
i += 1
}
end
end
end