Back to Repositories

Testing Parser Filter Implementation in Fluentd

This test suite verifies the functionality of Fluentd’s parser filter plugin, which handles parsing and transforming log data. The tests cover key features like JSON, LTSV, CSV and regexp parsing along with configuration options for data handling and error management.

Test Coverage Overview

The test suite provides comprehensive coverage of the parser filter’s core capabilities:
  • Configuration validation and error handling
  • Multiple parser format support (JSON, LTSV, CSV, TSV, regexp)
  • Time parsing and formatting options
  • Data preservation and field manipulation
  • Error handling for invalid inputs and parsing failures

Implementation Analysis

The testing approach uses Minitest with Fluentd’s test driver framework to validate parser behavior. Key patterns include:
  • Configuration testing with various parser types and options
  • Event time handling and validation
  • Data transformation verification
  • Edge case handling for malformed inputs

Technical Details

Testing tools and configuration:
  • Fluentd::Test::Driver::Filter for filter testing
  • Timecop for time manipulation
  • Multiple parser configurations (regexp, json, csv, etc)
  • Custom assertions for time comparison
  • Error injection for edge case testing

Best Practices Demonstrated

The test suite demonstrates several testing best practices:
  • Thorough configuration validation
  • Comprehensive error case coverage
  • Time handling edge cases
  • Data preservation verification
  • Multiple format support testing
  • Clear test organization and structure

fluent/fluentd

test/plugin/test_filter_parser.rb

            
require_relative '../helper'
require 'timecop'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_parser'

class ParserFilterTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
    @tag = 'test'
    @default_time = Time.parse('2010-05-04 03:02:01 UTC')
    Timecop.freeze(@default_time)
  end

  def teardown
    super
    Timecop.return
  end

  def assert_equal_parsed_time(expected, actual)
    if expected.is_a?(Integer)
      assert_equal(expected, actual.to_i)
    else
      assert_equal_event_time(expected, actual)
    end
  end

  ParserError = Fluent::Plugin::Parser::ParserError
  CONFIG = %[
    key_name     message
    reserve_data true
    <parse>
      @type regexp
      expression /^(?<x>.)(?<y>.) (?<time>.+)$/
      time_format %Y%m%d%H%M%S
    </parse>
  ]

  def create_driver(conf=CONFIG)
    Fluent::Test::Driver::Filter.new(Fluent::Plugin::ParserFilter).configure(conf)
  end

  def test_configure
    assert_raise(Fluent::ConfigError) {
      create_driver('')
    }
    assert_raise(Fluent::NotFoundPluginError) {
      create_driver %[
        key_name foo
        <parse>
          @type unknown_format_that_will_never_be_implemented
        </parse>
      ]
    }
    assert_nothing_raised {
      create_driver %[
        key_name foo
        <parse>
          @type regexp
          expression /(?<x>.)/
        </parse>
      ]
    }
    assert_nothing_raised {
      create_driver %[
        key_name foo
        <parse>
          @type json
        </parse>
      ]
    }
    assert_nothing_raised {
      create_driver %[
        key_name foo
        format json
      ]
    }
    assert_nothing_raised {
      create_driver %[
        key_name foo
        <parse>
          @type ltsv
        </parse>
      ]
    }
    assert_nothing_raised {
      create_driver %[
        key_name message
        <parse>
          @type regexp
          expression /^col1=(?<col1>.+) col2=(?<col2>.+)$/
        </parse>
      ]
    }
    d = create_driver %[
      key_name foo
      <parse>
        @type regexp
        expression /(?<x>.)/
      </parse>
    ]
    assert_false d.instance.reserve_data
  end

  # CONFIG = %[
  #   remove_prefix test
  #   add_prefix    parsed
  #   key_name      message
  #   format        /^(?<x>.)(?<y>.) (?<time>.+)$/
  #   time_format   %Y%m%d%H%M%S
  #   reserve_data  true
  # ]
  def test_filter
    d1 = create_driver(CONFIG)
    time = event_time("2012-01-02 13:14:15")
    d1.run(default_tag: @tag) do
      d1.feed(time, {'message' => '12 20120402182059'})
      d1.feed(time, {'message' => '34 20120402182100'})
      d1.feed(time, {'message' => '56 20120402182100'})
      d1.feed(time, {'message' => '78 20120402182101'})
      d1.feed(time, {'message' => '90 20120402182100'})
    end
    filtered = d1.filtered
    assert_equal 5, filtered.length

    first = filtered[0]
    assert_equal_event_time event_time("2012-04-02 18:20:59"), first[0]
    assert_equal '1', first[1]['x']
    assert_equal '2', first[1]['y']
    assert_equal '12 20120402182059', first[1]['message']

    second = filtered[1]
    assert_equal_event_time event_time("2012-04-02 18:21:00"), second[0]
    assert_equal '3', second[1]['x']
    assert_equal '4', second[1]['y']

    third = filtered[2]
    assert_equal_event_time event_time("2012-04-02 18:21:00"), third[0]
    assert_equal '5', third[1]['x']
    assert_equal '6', third[1]['y']

    fourth = filtered[3]
    assert_equal_event_time event_time("2012-04-02 18:21:01"), fourth[0]
    assert_equal '7', fourth[1]['x']
    assert_equal '8', fourth[1]['y']

    fifth = filtered[4]
    assert_equal_event_time event_time("2012-04-02 18:21:00"), fifth[0]
    assert_equal '9', fifth[1]['x']
    assert_equal '0', fifth[1]['y']

    d2 = create_driver(%[
      key_name data
      format   /^(?<x>.)(?<y>.) (?<t>.+)$/
    ])
    time = Fluent::EventTime.from_time(@default_time) # EventTime emit test
    d2.run(default_tag: @tag) do
      d2.feed(time, {'data' => '12 20120402182059'})
      d2.feed(time, {'data' => '34 20120402182100'})
    end
    filtered = d2.filtered
    assert_equal 2, filtered.length

    first = filtered[0]
    assert_equal_event_time time, first[0]
    assert_nil first[1]['data']
    assert_equal '1', first[1]['x']
    assert_equal '2', first[1]['y']
    assert_equal '20120402182059', first[1]['t']

    second = filtered[1]
    assert_equal_event_time time, second[0]
    assert_nil second[1]['data']
    assert_equal '3', second[1]['x']
    assert_equal '4', second[1]['y']
    assert_equal '20120402182100', second[1]['t']

    d3 = create_driver(%[
      key_name data
      <parse>
        @type regexp
        expression /^(?<x>[0-9])(?<y>[0-9]) (?<t>.+)$/
      </parse>
    ])
    time = Time.parse("2012-04-02 18:20:59").to_i
    d3.run(default_tag: @tag) do
      d3.feed(time, {'data' => '12 20120402182059'})
      d3.feed(time, {'data' => '34 20120402182100'})
      d3.feed(time, {'data' => 'xy 20120402182101'})
    end
    filtered = d3.filtered
    assert_equal 2, filtered.length

    d4 = create_driver(%[
      key_name data
      <parse>
        @type json
      </parse>
    ])
    time = Time.parse("2012-04-02 18:20:59").to_i
    d4.run(default_tag: @tag) do
      d4.feed(time, {'data' => '{"xxx":"first","yyy":"second"}', 'xxx' => 'x', 'yyy' => 'y'})
      d4.feed(time, {'data' => 'foobar', 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d4.filtered
    assert_equal 1, filtered.length

  end

  def test_filter_with_multiple_records
    d1 = create_driver(%[
      key_name data
      <parse>
        @type json
      </parse>
    ])
    time = Fluent::EventTime.from_time(@default_time)
    d1.run(default_tag: @tag) do
      d1.feed(time, {'data' => '[{"xxx_1":"first","yyy":"second"}, {"xxx_2":"first", "yyy_2":"second"}]'})
    end
    filtered = d1.filtered
    assert_equal 2, filtered.length
    assert_equal ({"xxx_1"=>"first", "yyy"=>"second"}), filtered[0][1]
    assert_equal ({"xxx_2"=>"first", "yyy_2"=>"second"}), filtered[1][1]
  end

  data(:keep_key_name => false,
       :remove_key_name => true)
  def test_filter_with_reserved_data(remove_key_name)
    d1 = create_driver(%[
      key_name data
      reserve_data yes
      remove_key_name_field #{remove_key_name}
      <parse>
        @type regexp
        expression /^(?<x>\\d)(?<y>\\d) (?<t>.+)$/
      </parse>
    ])
    time = event_time("2012-04-02 18:20:59")
    d1.run(default_tag: @tag) do
      d1.feed(time, {'data' => '12 20120402182059'})
      d1.feed(time, {'data' => '34 20120402182100'})
      d1.feed(time, {'data' => 'xy 20120402182101'})
    end
    filtered = d1.filtered
    assert_equal 3, filtered.length

    d2 = create_driver(%[
      key_name data
      reserve_data yes
      remove_key_name_field #{remove_key_name}
      <parse>
        @type json
      </parse>
    ])
    time = Fluent::EventTime.from_time(@default_time)
    d2.run(default_tag: @tag) do
      d2.feed(time, {'data' => '{"xxx":"first","yyy":"second"}', 'xxx' => 'x', 'yyy' => 'y'})
      d2.feed(time, {'data' => 'foobar', 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d2.filtered
    assert_equal 2, filtered.length

    first = filtered[0]
    assert_equal_event_time time, first[0]
    if remove_key_name
      assert_not_include first[1], 'data'
    else
      assert_equal '{"xxx":"first","yyy":"second"}', first[1]['data']
    end
    assert_equal 'first', first[1]['xxx']
    assert_equal 'second', first[1]['yyy']

    second = filtered[1]
    assert_equal_event_time time, second[0]
    assert_equal 'foobar', second[1]['data']
    assert_equal 'x', second[1]['xxx']
    assert_equal 'y', second[1]['yyy']
  end


  CONFIG_LTSV =  %[
    key_name data
    <parse>
      @type ltsv
    </parse>
  ]
  CONFIG_LTSV_WITH_TYPES =  %[
    key_name data
    <parse>
      @type ltsv
      types i:integer,s:string,f:float,b:bool
    </parse>
  ]
  data(:event_time => lambda { |time| Fluent::EventTime.from_time(time) },
       :int_time => lambda { |time| time.to_i })
  def test_filter_ltsv(time_parse)
    d = create_driver(CONFIG_LTSV)
    time = time_parse.call(@default_time)
    d.run(default_tag: @tag) do
      d.feed(time, {'data' => "xxx:first\tyyy:second", 'xxx' => 'x', 'yyy' => 'y'})
      d.feed(time, {'data' => "xxx:first\tyyy:second2", 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d.filtered
    assert_equal 2, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]
    assert_nil first[1]['data']
    assert_equal 'first', first[1]['xxx']
    assert_equal 'second', first[1]['yyy']

    second = filtered[1]
    assert_equal_parsed_time time, second[0]
    assert_nil first[1]['data']
    assert_equal 'first', second[1]['xxx']
    assert_equal 'second2', second[1]['yyy']

    d = create_driver(CONFIG_LTSV + %[reserve_data yes])
    time = @default_time.to_i
    d.run(default_tag: @tag) do
      d.feed(time, {'data' => "xxx:first\tyyy:second", 'xxx' => 'x', 'yyy' => 'y'})
      d.feed(time, {'data' => "xxx:first\tyyy:second2", 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d.filtered
    assert_equal 2, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]
    assert_equal "xxx:first\tyyy:second", first[1]['data']
    assert_equal 'first', first[1]['xxx']
    assert_equal 'second', first[1]['yyy']

    second = filtered[1]
    assert_equal_parsed_time time, second[0]
    assert_equal "xxx:first\tyyy:second", first[1]['data']
    assert_equal 'first', second[1]['xxx']
    assert_equal 'second2', second[1]['yyy']

    # convert types
    #d = create_driver(CONFIG_LTSV + %[
    d = create_driver(CONFIG_LTSV_WITH_TYPES)
    time = @default_time.to_i
    d.run do
      d.feed(@tag, time, {'data' => "i:1\ts:2\tf:3\tb:true\tx:123"})
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]
    assert_equal 1, first[1]['i']
    assert_equal '2', first[1]['s']
    assert_equal 3.0, first[1]['f']
    assert_equal true, first[1]['b']
    assert_equal '123', first[1]['x']
  end

  CONFIG_TSV =  %[
    key_name data
    <parse>
      @type tsv
      keys key1,key2,key3
    </parse>
  ]
  data(:event_time => lambda { |time| Fluent::EventTime.from_time(time) },
       :int_time => lambda { |time| time.to_i })
  def test_filter_tsv(time_parse)
    d = create_driver(CONFIG_TSV)
    time = time_parse.call(@default_time)
    d.run do
      d.feed(@tag, time, {'data' => "value1\tvalue2\tvalueThree", 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]
    assert_nil first[1]['data']
    assert_equal 'value1', first[1]['key1']
    assert_equal 'value2', first[1]['key2']
    assert_equal 'valueThree', first[1]['key3']
  end

  CONFIG_CSV =  %[
    key_name data
    <parse>
      @type csv
      keys key1,key2,key3
    </parse>
  ]
  CONFIG_CSV_COMPAT =  %[
    key_name data
    format csv
    keys key1,key2,key3
  ]
  data(new_conf: CONFIG_CSV,
       compat_conf: CONFIG_CSV_COMPAT)
  def test_filter_csv(conf)
    d = create_driver(conf)
    time = @default_time.to_i
    d.run do
      d.feed(@tag, time, {'data' => 'value1,"value2","value""ThreeYes!"', 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal time, first[0]
    assert_nil first[1]['data']
    assert_equal 'value1', first[1]['key1']
    assert_equal 'value2', first[1]['key2']
    assert_equal 'value"ThreeYes!', first[1]['key3']
  end

  def test_filter_with_nested_record
    d = create_driver(%[
      key_name $.data.log
      <parse>
        @type csv
        keys key1,key2,key3
      </parse>
    ])
    time = @default_time.to_i
    d.run do
      d.feed(@tag, time, {'data' => {'log' => 'value1,"value2","value""ThreeYes!"'}, 'xxx' => 'x', 'yyy' => 'y'})
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal time, first[0]
    assert_nil first[1]['data']
    assert_equal 'value1', first[1]['key1']
    assert_equal 'value2', first[1]['key2']
    assert_equal 'value"ThreeYes!', first[1]['key3']
  end

  CONFIG_HASH_VALUE_FIELD = %[
    key_name data
    hash_value_field parsed
    <parse>
      @type json
    </parse>
  ]
  CONFIG_HASH_VALUE_FIELD_RESERVE_DATA = %[
    key_name data
    reserve_data yes
    hash_value_field parsed
    <parse>
      @type json
    </parse>
  ]
  CONFIG_HASH_VALUE_FIELD_WITH_INJECT_KEY_PREFIX = %[
    key_name data
    hash_value_field parsed
    inject_key_prefix data.
    <parse>
      @type json
    </parse>
  ]
  data(:event_time => lambda { |time| Fluent::EventTime.from_time(time) },
       :int_time => lambda { |time| time.to_i })
  def test_filter_inject_hash_value_field(time_parse)
    original = {'data' => '{"xxx":"first","yyy":"second"}', 'xxx' => 'x', 'yyy' => 'y'}

    d = create_driver(CONFIG_HASH_VALUE_FIELD)
    time = time_parse.call(@default_time)
    d.run do
      d.feed(@tag, time, original)
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]

    record = first[1]
    assert_equal 1, record.keys.size
    assert_equal({"xxx"=>"first","yyy"=>"second"}, record['parsed'])

    d = create_driver(CONFIG_HASH_VALUE_FIELD_RESERVE_DATA)
    time = @default_time.to_i
    d.run do
      d.feed(@tag, time, original)
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]

    record = first[1]
    assert_equal 4, record.keys.size
    assert_equal original['data'], record['data']
    assert_equal original['xxx'], record['xxx']
    assert_equal original['yyy'], record['yyy']
    assert_equal({"xxx"=>"first","yyy"=>"second"}, record['parsed'])

    d = create_driver(CONFIG_HASH_VALUE_FIELD_WITH_INJECT_KEY_PREFIX)
    time = @default_time.to_i
    d.run do
      d.feed(@tag, time, original)
    end
    filtered = d.filtered
    assert_equal 1, filtered.length

    first = filtered[0]
    assert_equal_parsed_time time, first[0]

    record = first[1]
    assert_equal 1, record.keys.size
    assert_equal({"data.xxx"=>"first","data.yyy"=>"second"}, record['parsed'])
  end

  CONFIG_DONT_PARSE_TIME = %[
    key_name data
    reserve_time true
    <parse>
      @type json
      keep_time_key true
    </parse>
  ]
  CONFIG_DONT_PARSE_TIME_COMPAT = %[
    key_name data
    reserve_time true
    format json
    keep_time_key true
  ]
  data(new_conf: CONFIG_DONT_PARSE_TIME,
       compat_conf: CONFIG_DONT_PARSE_TIME_COMPAT)
  def test_time_should_be_reserved(conf)
    t = Time.now.to_i
    d = create_driver(conf)
    d.run(default_tag: @tag) do
      d.feed(t, {'data' => '{"time":1383190430, "f1":"v1"}'})
      d.feed(t, {'data' => '{"time":"1383190430", "f1":"v1"}'})
      d.feed(t, {'data' => '{"time":"2013-10-31 12:34:03 +0900", "f1":"v1"}'})
    end
    filtered = d.filtered
    assert_equal 3, filtered.length

    assert_equal 'v1', filtered[0][1]['f1']
    assert_equal 1383190430, filtered[0][1]['time']
    assert_equal t, filtered[0][0]

    assert_equal 'v1', filtered[1][1]['f1']
    assert_equal "1383190430", filtered[1][1]['time']
    assert_equal t, filtered[1][0]

    assert_equal 'v1', filtered[2][1]['f1']
    assert_equal '2013-10-31 12:34:03 +0900', filtered[2][1]['time']
    assert_equal t, filtered[2][0]
  end

  sub_test_case "abnormal cases" do
    module HashExcept
      refine Hash do
        def except(*keys)
          reject do |key, _|
            keys.include?(key)
          end
        end
      end
    end

    # Ruby 2.x does not support Hash#except.
    using HashExcept unless {}.respond_to?(:except)

    def run_and_assert(driver, records:, expected_records:, expected_error_records:, expected_errors:)
      driver.run do
        records.each do |record|
          driver.feed(@tag, Fluent::EventTime.now.to_i, record)
        end
      end

      assert_equal(
        [
          expected_records,
          expected_error_records,
          expected_errors.collect { |e| [e.class, e.message] },
        ],
        [
          driver.filtered_records,
          driver.error_events.collect { |_, _, record, _| record },
          driver.error_events.collect { |_, _, _, e| [e.class, e.message] },
        ]
      )
    end

    data(
      "with default" => {
        records: [{"foo" => "bar"}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{"foo" => "bar"}],
        expected_errors: [ArgumentError.new("data does not exist")],
      },
      "with reserve_data" => {
        records: [{"foo" => "bar"}],
        additional_config: "reserve_data",
        expected_records: [{"foo" => "bar"}],
        expected_error_records: [{"foo" => "bar"}],
        expected_errors: [ArgumentError.new("data does not exist")],
      },
      "with disabled emit_invalid_record_to_error" => {
        records: [{"foo" => "bar"}],
        additional_config: "emit_invalid_record_to_error false",
        expected_records: [],
        expected_error_records: [],
        expected_errors: [],
      },
      "with reserve_data and disabled emit_invalid_record_to_error" => {
        records: [{"foo" => "bar"}],
        additional_config: ["reserve_data", "emit_invalid_record_to_error false"].join("\n"),
        expected_records: [{"foo" => "bar"}],
        expected_error_records: [],
        expected_errors: [],
      },
      "with reserve_data and hash_value_field" => {
        records: [{"foo" => "bar"}],
        additional_config: ["reserve_data", "hash_value_field parsed"].join("\n"),
        expected_records: [{"foo" => "bar", "parsed" => {}}],
        expected_error_records: [{"foo" => "bar"}],
        expected_errors: [ArgumentError.new("data does not exist")],
      },
    )
    def test_filter_key_not_exist(data)
      driver = create_driver(<<~EOC)
        key_name data
        #{data[:additional_config]}
        <parse>
          @type json
        </parse>
      EOC

      run_and_assert(driver, **data.except(:additional_config))
    end

    data(
      "with default" => {
        records: [{"data" => "foo bar"}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{"data" => "foo bar"}],
        expected_errors: [Fluent::Plugin::Parser::ParserError.new("pattern not matched with data 'foo bar'")],
      },
      "with reserve_data" => {
        records: [{"data" => "foo bar"}],
        additional_config: "reserve_data",
        expected_records: [{"data" => "foo bar"}],
        expected_error_records: [{"data" => "foo bar"}],
        expected_errors: [Fluent::Plugin::Parser::ParserError.new("pattern not matched with data 'foo bar'")],
      },
      "with disabled emit_invalid_record_to_error" => {
        records: [{"data" => "foo bar"}],
        additional_config: "emit_invalid_record_to_error false",
        expected_records: [],
        expected_error_records: [],
        expected_errors: [],
      },
      "with reserve_data and disabled emit_invalid_record_to_error" => {
        records: [{"data" => "foo bar"}],
        additional_config: ["reserve_data", "emit_invalid_record_to_error false"].join("\n"),
        expected_records: [{"data" => "foo bar"}],
        expected_error_records: [],
        expected_errors: [],
      },
      "with matched pattern" => {
        records: [{"data" => "col1=foo col2=bar"}],
        additional_config: "",
        expected_records: [{"col1" => "foo", "col2" => "bar"}],
        expected_error_records: [],
        expected_errors: [],
      },
    )
    def test_pattern_not_matched(data)
      driver = create_driver(<<~EOC)
        key_name data
        #{data[:additional_config]}
        <parse>
          @type regexp
          expression /^col1=(?<col1>.+) col2=(?<col2>.+)$/
        </parse>
      EOC

      run_and_assert(driver, **data.except(:additional_config))
    end

    data(
      "invalid format with default" => {
        records: [{'data' => '{"time":[], "f1":"v1"}'}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{'data' => '{"time":[], "f1":"v1"}'}],
        expected_errors: [Fluent::Plugin::Parser::ParserError.new("value must be a string or a number: [](Array)")],
      },
      "invalid format with disabled emit_invalid_record_to_error" => {
        records: [{'data' => '{"time":[], "f1":"v1"}'}],
        additional_config: "emit_invalid_record_to_error false",
        expected_records: [],
        expected_error_records: [],
        expected_errors: [],
      },
      "mixed valid and invalid with default" => {
        records: [{'data' => '{"time":[], "f1":"v1"}'}, {'data' => '{"time":0, "f1":"v1"}'}],
        additional_config: "",
        expected_records: [{"f1" => "v1"}],
        expected_error_records: [{'data' => '{"time":[], "f1":"v1"}'}],
        expected_errors: [Fluent::Plugin::Parser::ParserError.new("value must be a string or a number: [](Array)")],
      },
    )
    def test_parser_error(data)
      driver = create_driver(<<~EOC)
        key_name data
        #{data[:additional_config]}
        <parse>
          @type json
        </parse>
      EOC

      run_and_assert(driver, **data.except(:additional_config))
    end

    data(
      "UTF-8 with default" => {
        records: [{"data" => "\xff"}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{"data" => "\xff"}],
        expected_errors: [ArgumentError.new("invalid byte sequence in UTF-8")],
      },
      "UTF-8 with replace_invalid_sequence" => {
        records: [{"data" => "\xff"}],
        additional_config: "replace_invalid_sequence",
        expected_records: [{"message" => "?"}],
        expected_error_records: [],
        expected_errors: [],
      },
      "UTF-8 with replace_invalid_sequence and reserve_data" => {
        records: [{"data" => "\xff"}],
        additional_config: ["replace_invalid_sequence", "reserve_data"].join("\n"),
        expected_records: [{"message" => "?", "data" => "\xff"}],
        expected_error_records: [],
        expected_errors: [],
      },
      "US-ASCII with default" => {
        records: [{"data" => "\xff".force_encoding("US-ASCII")}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{"data" => "\xff".force_encoding("US-ASCII")}],
        expected_errors: [ArgumentError.new("invalid byte sequence in US-ASCII")],
      },
      "US-ASCII with replace_invalid_sequence" => {
        records: [{"data" => "\xff".force_encoding("US-ASCII")}],
        additional_config: "replace_invalid_sequence",
        expected_records: [{"message" => "?".force_encoding("US-ASCII")}],
        expected_error_records: [],
        expected_errors: [],
      },
      "US-ASCII with replace_invalid_sequence and reserve_data" => {
        records: [{"data" => "\xff".force_encoding("US-ASCII")}],
        additional_config: ["replace_invalid_sequence", "reserve_data"].join("\n"),
        expected_records: [{"message" => "?".force_encoding("US-ASCII"), "data" => "\xff".force_encoding("US-ASCII")}],
        expected_error_records: [],
        expected_errors: [],
      },
    )
    def test_invalid_byte(data)
      driver = create_driver(<<~EOC)
        key_name data
        #{data[:additional_config]}
        <parse>
          @type regexp
          expression /^(?<message>.*)$/
        </parse>
      EOC

      run_and_assert(driver, **data.except(:additional_config))
    end

    test "replace_invalid_sequence should be applied only to invalid byte sequence errors" do
      error_msg = "This is a dummy ArgumentError other than invalid byte sequence"
      any_instance_of(Fluent::Plugin::JSONParser) do |klass|
        stub(klass).parse do
          raise ArgumentError, error_msg
        end
      end

      driver = create_driver(<<~EOC)
        key_name data
        replace_invalid_sequence
        <parse>
          @type json
        </parse>
      EOC

      # replace_invalid_sequence should not applied
      run_and_assert(
        driver, 
        records: [{'data' => '{"foo":"bar"}'}],
        expected_records: [],
        expected_error_records: [{'data' => '{"foo":"bar"}'}],
        expected_errors: [ArgumentError.new(error_msg)]
      )
    end

    data(
      "with default" => {
        records: [{'data' => '{"foo":"bar"}'}],
        additional_config: "",
        expected_records: [],
        expected_error_records: [{'data' => '{"foo":"bar"}'}],
        expected_errors: [Fluent::Plugin::Parser::ParserError.new("parse failed This is a dummy unassumed error")],
      },
      "with disabled emit_invalid_record_to_error" => {
        records: [{'data' => '{"foo":"bar"}'}],
        additional_config: "emit_invalid_record_to_error false",
        expected_records: [],
        expected_error_records: [],
        expected_errors: [],
      },
    )
    def test_unassumed_error(data)
      any_instance_of(Fluent::Plugin::JSONParser) do |klass|
        stub(klass).parse do
          raise RuntimeError, "This is a dummy unassumed error"
        end
      end

      driver = create_driver(<<~EOC)
        key_name data
        #{data[:additional_config]}
        <parse>
          @type json
        </parse>
      EOC

      run_and_assert(driver, **data.except(:additional_config))
    end
  end
end