Back to Repositories

Testing Buffered Output Compression Implementation in Fluentd

This test suite validates the compression functionality in Fluentd’s buffered output plugin implementation. It focuses on testing gzip compression for both standard and custom formatted data streams, ensuring proper handling of buffered outputs with different configuration scenarios.

Test Coverage Overview

The test suite comprehensively covers buffer compression functionality in Fluentd’s output plugin system. Key areas include:

  • Standard format data compression handling
  • Custom format data compression processing
  • File-based and memory-based buffer configurations
  • Compressed MessagePack event stream handling
  • Various buffer configuration combinations

Implementation Analysis

The testing approach utilizes dummy output implementations to validate compression behavior. It employs both async and async-with-format output variants to test different formatting scenarios. The implementation leverages Ruby’s compression libraries and Fluentd’s plugin architecture to verify data integrity through the compression pipeline.

Technical Details

  • Testing Framework: Test::Unit
  • Compression Method: gzip
  • Buffer Types: memory and file-based
  • Key Components: Fluent::Plugin::Compressable, StringIO for verification
  • Test Utilities: Timeout handling, temporary directory management

Best Practices Demonstrated

The test suite exemplifies robust testing practices through:

  • Proper resource cleanup in teardown
  • Comprehensive configuration testing
  • Edge case handling with compressed input
  • Isolation of test cases
  • Clear separation of concerns between format and write operations

fluent/fluentd

test/plugin/test_output_as_buffered_compress.rb

            
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/plugin/compressable'
require 'fluent/event'

require 'timeout'

module FluentPluginOutputAsBufferedCompressTest
  class DummyBareOutput < Fluent::Plugin::Output
    def register(name, &block)
      instance_variable_set("@#{name}", block)
    end
  end

  class DummyAsyncOutput < DummyBareOutput
    def initialize
      super
      @format = @write = nil
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
  end

  class DummyAsyncOutputWithFormat < DummyBareOutput
    def initialize
      super
      @format = nil
    end
    def write(chunk)
      @write ? @write.call(chunk) : nil
    end
    def format(tag, time, record)
      @format ? @format.call(tag, time, record) : [tag, time, record].to_json
    end
  end

  def self.dummy_event_stream
    Fluent::ArrayEventStream.new(
      [
        [event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
        [event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
        [event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
      ]
    )
  end
end

class BufferedOutputCompressTest < Test::Unit::TestCase
  include Fluent::Plugin::Compressable

  def create_output(type=:async)
    case type
    when :async then FluentPluginOutputAsBufferedCompressTest::DummyAsyncOutput.new
    when :async_with_format then FluentPluginOutputAsBufferedCompressTest::DummyAsyncOutputWithFormat.new
    else
      raise ArgumentError, "unknown type: #{type}"
    end
  end

  def waiting(seconds)
    begin
      Timeout.timeout(seconds) do
        yield
      end
    rescue Timeout::Error
      STDERR.print(*@i.log.out.logs)
      raise
    end
  end

  TMP_DIR = File.expand_path('../../tmp/test_output_as_buffered_compress', __FILE__)

  setup do
    FileUtils.rm_r TMP_DIR rescue nil
    FileUtils.mkdir_p TMP_DIR
  end

  teardown do
    if @i
      @i.stop unless @i.stopped?
      @i.before_shutdown unless @i.before_shutdown?
      @i.shutdown unless @i.shutdown?
      @i.after_shutdown unless @i.after_shutdown?
      @i.close unless @i.closed?
      @i.terminate unless @i.terminated?
    end
  end

  data(
    :buffer_config,
    [
      config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
      config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
      config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
      config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
    ],
  )
  data(
    :input_es,
    [
      FluentPluginOutputAsBufferedCompressTest.dummy_event_stream,
      # If already compressed data is incoming, it must be written as is (i.e. without decompressed).
      # https://github.com/fluent/fluentd/issues/4146
      Fluent::CompressedMessagePackEventStream.new(FluentPluginOutputAsBufferedCompressTest.dummy_event_stream.to_compressed_msgpack_stream),
    ],
  )
  test 'call a standard format when output plugin adds data to chunk' do |data|
    buffer_config = data[:buffer_config]
    es = data[:input_es].dup # Note: the data matrix is shared in all patterns, so we need `dup` here.

    @i = create_output(:async)
    @i.configure(config_element('ROOT','', {}, [buffer_config]))
    @i.start
    @i.after_start

    io = StringIO.new
    expected = es.dup.map { |t, r| [t, r] }
    compressed_data = ''

    assert_equal :gzip, @i.buffer.compress

    @i.register(:write) do |c|
      compressed_data = c.instance_variable_get(:@chunk)
      if compressed_data.is_a?(File)
        compressed_data.seek(0, IO::SEEK_SET)
        compressed_data = compressed_data.read
      end
      c.write_to(io)
    end

    @i.emit_events('tag', es)
    @i.enqueue_thread_wait
    @i.flush_thread_wakeup
    waiting(4) { Thread.pass until io.size > 0 }

    assert_equal expected, Fluent::MessagePackEventStream.new(decompress(compressed_data)).map { |t, r| [t, r] }
    assert_equal expected, Fluent::MessagePackEventStream.new(io.string).map { |t, r| [t, r] }
  end

  data(
    handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
    handle_stream_with_custom_format:  config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
    handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
    handle_stream_with_custom_format_and_file_chunk:  config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
  )
  test 'call a custom format when output plugin adds data to chunk' do |buffer_config|
    @i = create_output(:async_with_format)
    @i.configure(config_element('ROOT','', {}, [buffer_config]))
    @i.start
    @i.after_start

    io = StringIO.new
    es = FluentPluginOutputAsBufferedCompressTest.dummy_event_stream
    expected = es.map { |e| "#{e[1]}\n" }.join # e[1] is record
    compressed_data = ''

    assert_equal :gzip, @i.buffer.compress

    @i.register(:format) { |tag, time, record| "#{record}\n" }
    @i.register(:write) { |c|
      compressed_data = c.instance_variable_get(:@chunk)
      if compressed_data.is_a?(File)
        compressed_data.seek(0, IO::SEEK_SET)
        compressed_data = compressed_data.read
      end
      c.write_to(io)
    }

    @i.emit_events('tag', es)
    @i.enqueue_thread_wait
    @i.flush_thread_wakeup
    waiting(4) { sleep 0.1 until io.size > 0 }

    assert_equal expected, decompress(compressed_data)
    assert_equal expected, io.string
  end
end