Back to Repositories

Testing Fluent-Cat Command Line Utility Integration in Fluentd

This test suite validates the functionality of the fluent-cat command-line utility in Fluentd, focusing on event handling, data formats, and time management. It ensures reliable data ingestion and proper event processing across different input formats and configurations.

Test Coverage Overview

The test suite provides comprehensive coverage of fluent-cat’s core functionality, including:
  • JSON and MessagePack format handling
  • Secondary file output processing
  • Event time management and parsing
  • Network port configuration and communication
  • Data streaming and event validation

Implementation Analysis

The testing approach utilizes Test::Unit framework with custom drivers and fixtures. It implements isolated test cases for different data formats and scenarios, using setup/teardown patterns for clean test environments. The implementation leverages Fluentd’s plugin architecture to test input/output interactions.

Key patterns include mock output creation, event stream handling, and pipeline testing with Open3.

Technical Details

Testing tools and configuration:
  • Test::Unit for test execution
  • Open3 for command pipeline handling
  • Custom test drivers for input/output plugins
  • Temporary directory management for file operations
  • Network port allocation for communication testing
  • EventTime handling for timestamp validation

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Proper test isolation through setup/teardown methods
  • Comprehensive error handling and resource cleanup
  • Modular test organization using sub_test_case blocks
  • Clear separation of configuration and test logic
  • Consistent assertion patterns and validation approaches

fluent/fluentd

test/command/test_cat.rb

            
require_relative '../helper'

require 'test-unit'
require 'open3'
require 'fluent/plugin/output'
require 'fluent/plugin/in_forward'
require 'fluent/plugin/out_secondary_file'
require 'fluent/test/driver/output'
require 'fluent/test/driver/input'

class TestFluentCat < ::Test::Unit::TestCase
  def setup
    Fluent::Test.setup
    FileUtils.mkdir_p(TMP_DIR)
    @record = { 'key' => 'value' }
    @time = event_time
    @es = Fluent::OneEventStream.new(@time, @record)
    @primary = create_primary
    metadata = @primary.buffer.new_metadata
    @chunk = create_chunk(@primary, metadata, @es)
    @port = unused_port(protocol: :all)
  end

  def teardown
    FileUtils.rm_rf(TMP_DIR)
    @port = nil
  end

  TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluent_cat#{ENV['TEST_ENV_NUMBER']}")
  FLUENT_CAT_COMMAND = File.expand_path(File.dirname(__FILE__) + "/../../bin/fluent-cat")

  def config
    %[
      port #{@port}
      bind 127.0.0.1
    ]
  end

  SECONDARY_CONFIG = %[
    directory #{TMP_DIR}
  ]

  class DummyOutput < Fluent::Plugin::Output
    def write(chunk); end
  end

  def create_driver(conf=config)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf)
  end

  def create_primary(buffer_cofig = config_element('buffer'))
    DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig]))
  end

  def create_secondary_driver(conf=SECONDARY_CONFIG)
    c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
    c.instance.acts_as_secondary(@primary)
    c.configure(conf)
  end

  def create_chunk(primary, metadata, es)
    primary.buffer.generate_chunk(metadata).tap do |c|
      c.concat(es.to_msgpack_stream, es.size)
      c.commit
    end
  end

  sub_test_case "json" do
    def test_cat_json
      d = create_driver
      d.run(expect_records: 1) do
        Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} json") do |stdin|
          stdin.puts('{"key":"value"}')
          stdin.close
        end
      end
      event = d.events.first
      assert_equal([1, "json", @record],
                   [d.events.size, event.first, event.last])
    end
  end

  sub_test_case "msgpack" do
    def test_cat_secondary_file
      d = create_secondary_driver
      path = d.instance.write(@chunk)
      d = create_driver
      d.run(expect_records: 1) do
        Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} --format msgpack secondary") do |stdin|
          stdin.write(File.read(path, File.size(path)))
          stdin.close
        end
      end
      event = d.events.first
      assert_equal([1, "secondary", @record],
                   [d.events.size, event.first, event.last])
    end
  end

  sub_test_case "send specific event time" do
    def test_without_event_time
      event_time = Fluent::EventTime.now
      d = create_driver
      d.run(expect_records: 1) do
        Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} tag") do |stdin|
          stdin.puts('{"key":"value"}')
          stdin.close
        end
      end
      event = d.events.first
      assert_in_delta(event_time.to_f, event[1].to_f, 3.0) # expect command to be finished in 3 seconds
      assert_equal([1, "tag", true, @record],
                   [d.events.size, event.first, event_time.to_f < event[1].to_f, event.last])
    end

    def test_with_event_time
      event_time = "2021-01-02 13:14:15.0+00:00"
      d = create_driver
      d.run(expect_records: 1) do
        Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} --event-time '#{event_time}' tag") do |stdin|
          stdin.puts('{"key":"value"}')
          stdin.close
        end
      end
      assert_equal([["tag", Fluent::EventTime.parse(event_time), @record]], d.events)
    end
  end
end