Back to Repositories

Testing HTTP Input Plugin Implementation in Fluentd

This test suite validates the HTTP input plugin functionality for Fluentd, covering various request handling scenarios, content types, and data formats. The tests ensure proper handling of HTTP requests, data parsing, and event routing within the Fluentd logging framework.

Test Coverage Overview

The test suite provides comprehensive coverage of HTTP input plugin features including:
  • Basic HTTP request handling and response codes
  • Multiple content type support (JSON, MessagePack, CSV, NDJSON)
  • Time handling and event timestamping
  • CORS configuration and preflight requests
  • Content encoding (gzip, deflate)
  • Query parameter handling

Implementation Analysis

The testing approach uses Ruby’s Test::Unit framework with detailed test cases for each feature. The implementation follows a structured pattern of setting up test drivers, executing HTTP requests, and validating responses and event processing. Tests utilize mock HTTP servers and custom configuration options to verify plugin behavior.

Technical Details

Key technical components include:
  • Fluent::Test::Driver::Input for test execution
  • Net::HTTP for request simulation
  • Custom handler hooks for content type validation
  • ServerEngine socket management
  • Time handling with Timecop

Best Practices Demonstrated

The test suite exemplifies testing best practices including:
  • Isolated test environment setup and teardown
  • Comprehensive edge case coverage
  • Clear test method naming and organization
  • Proper assertion usage and validation
  • Modular test configuration management

fluent/fluentd

test/plugin/test_in_http.rb

            
require_relative '../helper'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_http'
require 'net/http'
require 'timecop'

class HttpInputTest < Test::Unit::TestCase
  class << self
    def startup
      @server = ServerEngine::SocketManager::Server.open
      ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @server.path.to_s
    end

    def shutdown
      @server.close
    end
  end

  def setup
    Fluent::Test.setup
    @port = unused_port(protocol: :tcp)
  end

  def teardown
    Timecop.return
    @port = nil
  end

  def config
    %[
      port #{@port}
      bind "127.0.0.1"
      body_size_limit 10m
      keepalive_timeout 5
      respond_with_empty_img true
     use_204_response false
    ]
  end

  def create_driver(conf=config)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpInput).configure(conf)
  end

  def test_configure
    d = create_driver
    assert_equal @port, d.instance.port
    assert_equal '127.0.0.1', d.instance.bind
    assert_equal 10*1024*1024, d.instance.body_size_limit
    assert_equal 5, d.instance.keepalive_timeout
    assert_equal false, d.instance.add_http_headers
    assert_equal false, d.instance.add_query_params
  end

  def test_time
    d = create_driver
    time = event_time("2011-01-02 13:14:15.123 UTC")
    Timecop.freeze(Time.at(time))

    events = [
      ["tag1", time, {"a" => 1}],
      ["tag2", time, {"a" => 2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, _time, record|
        res = post("/#{tag}", {"json"=>record.to_json})
        res_codes << res.code
      end
    end

    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_time_as_float
    d = create_driver
    time = event_time("2011-01-02 13:14:15.123 UTC")
    float_time = time.to_f

    events = [
      ["tag1", time, {"a"=>1}],
    ]
    res_codes = []

    d.run(expect_records: 1) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
  end

  def test_json
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    events = [
      ["tag1", time_i, {"a"=>1}],
      ["tag2", time_i, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  data('json' => ['json', :to_json],
       'msgpack' => ['msgpack', :to_msgpack])
  def test_default_with_time_format(data)
    param, method_name = data
    d = create_driver(config + %[
      <parse>
        keep_time_key
        time_format %iso8601
      </parse>
    ])

    time = event_time("2020-06-10T01:14:27+00:00")
    events = [
      ["tag1", time, {"a" => 1, "time" => '2020-06-10T01:14:27+00:00'}],
      ["tag2", time, {"a" => 2, "time" => '2020-06-10T01:14:27+00:00'}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {param => record.__send__(method_name)})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_multi_json
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    records = [{"a"=>1},{"a"=>2}]
    events = [
      ["tag1", time_i, records[0]],
      ["tag1", time_i, records[1]],
    ]
    tag = "tag1"
    res_codes = []
    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_multi_json_with_time_field
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    time_f = time.to_f

    records = [{"a" => 1, 'time' => time_i},{"a" => 2, 'time' => time_f}]
    events = [
      ["tag1", time, {'a' => 1}],
      ["tag1", time, {'a' => 2}],
    ]
    tag = "tag1"
    res_codes = []
    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json" => records.to_json})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_instance_of Fluent::EventTime, d.events[0][1]
    assert_instance_of Fluent::EventTime, d.events[1][1]
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  data('json' => ['json', :to_json],
       'msgpack' => ['msgpack', :to_msgpack])
  def test_default_multi_with_time_format(data)
    param, method_name = data
    d = create_driver(config + %[
      <parse>
        keep_time_key
        time_format %iso8601
      </parse>
    ])
    time = event_time("2020-06-10T01:14:27+00:00")
    events = [
      ["tag1", time, {'a' => 1, 'time' => "2020-06-10T01:14:27+00:00"}],
      ["tag1", time, {'a' => 2, 'time' => "2020-06-10T01:14:27+00:00"}],
    ]
    tag = "tag1"
    res_codes = []
    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {param => events.map { |e| e[2] }.__send__(method_name)})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_multi_json_with_nonexistent_time_key
    d = create_driver(config + %[
      <parse>
        time_key missing
      </parse>
    ])
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    time_f = time.to_f

    records = [{"a" => 1, 'time' => time_i},{"a" => 2, 'time' => time_f}]
    tag = "tag1"
    res_codes = []
    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json" => records.to_json})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal 2, d.events.size
    assert_not_equal time_i, d.events[0][1].sec # current time is used because "missing" field doesn't exist
    assert_not_equal time_i, d.events[1][1].sec
  end

  def test_json_with_add_remote_addr
    d = create_driver(config + "add_remote_addr true")
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    events = [
      ["tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1}],
      ["tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2}],
    ]
    res_codes = []
    d.run(expect_records: 2) do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_exact_match_for_expect
    d = create_driver(config)
    records = [{ "a" => 1}, { "a" => 2 }]
    tag = "tag1"
    res_codes = []

    d.run(expect_records: 0, timeout: 5) do
      res = post("/#{tag}", { "json" => records.to_json }, { 'Expect' => 'something' })
      res_codes << res.code
    end
    assert_equal ["417"], res_codes
  end

  def test_exact_match_for_expect_with_other_header
    d = create_driver(config)

    records = [{ "a" => 1}, { "a" => 2 }]
    tag = "tag1"
    res_codes = []

    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", { "json" => records.to_json, 'x-envoy-expected-rq-timeout-ms' => 4 })
      res_codes << res.code
    end
    assert_equal ["200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal 1, d.events[0][2]["a"]
    assert_equal "tag1", d.events[1][0]
    assert_equal 2, d.events[1][2]["a"]
  end

  def test_multi_json_with_add_remote_addr
    d = create_driver(config + "add_remote_addr true")
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    records = [{"a"=>1},{"a"=>2}]
    tag = "tag1"
    res_codes = []

    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal_event_time time, d.events[0][1]
    assert_equal 1, d.events[0][2]["a"]
    assert{ d.events[0][2].has_key?("REMOTE_ADDR") && d.events[0][2]["REMOTE_ADDR"] =~ /^\d{1,4}(\.\d{1,4}){3}$/ }

    assert_equal "tag1", d.events[1][0]
    assert_equal_event_time time, d.events[1][1]
    assert_equal 2, d.events[1][2]["a"]
  end

  def test_json_with_add_remote_addr_given_x_forwarded_for
    d = create_driver(config + "add_remote_addr true")
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal_event_time time, d.events[0][1]
    assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}, d.events[0][2])

    assert_equal "tag2", d.events[1][0]
    assert_equal_event_time time, d.events[1][1]
    assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}, d.events[1][2])
  end

  def test_multi_json_with_add_remote_addr_given_x_forwarded_for
    d = create_driver(config + "add_remote_addr true")

    tag = "tag1"
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    records = [{"a"=>1},{"a"=>2}]
    events = [
      [tag, time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}],
      [tag, time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_add_remote_addr_given_multi_x_forwarded_for
    d = create_driver(config + "add_remote_addr true")

    tag = "tag1"
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    record = {"a" => 1}
    event = ["tag1", time, {"REMOTE_ADDR" => "129.78.138.66", "a" => 1}]
    res_code = nil

    d.run(expect_records: 1, timeout: 5) do
      res = post("/#{tag}", {"json" => record.to_json, "time" => time_i.to_s}) { |http, req|
        # net/http can't send multiple headers so overwrite it.
        def req.each_capitalized
          block_given? or return enum_for(__method__) { @header.size }
          @header.each do |k, vs|
            vs.each { |v|
              yield capitalize(k), v
            }
          end
        end
        req.add_field("X-Forwarded-For", "129.78.138.66, 127.0.0.1")
        req.add_field("X-Forwarded-For", "8.8.8.8")
      }
      res_code = res.code
    end
    assert_equal "200", res_code
    assert_equal event, d.events.first
    assert_equal_event_time time, d.events.first[1]
  end

  def test_multi_json_with_add_http_headers
    d = create_driver(config + "add_http_headers true")
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    records = [{"a"=>1},{"a"=>2}]
    tag = "tag1"
    res_codes = []

    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal_event_time time, d.events[0][1]
    assert_equal 1, d.events[0][2]["a"]

    assert_equal "tag1", d.events[1][0]
    assert_equal_event_time time, d.events[1][1]
    assert_equal 2, d.events[1][2]["a"]

    assert include_http_header?(d.events[0][2])
    assert include_http_header?(d.events[1][2])
  end

  def test_json_with_add_http_headers
    d = create_driver(config + "add_http_headers true")
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal_event_time time, d.events[0][1]
    assert_equal 1, d.events[0][2]["a"]

    assert_equal "tag2", d.events[1][0]
    assert_equal_event_time time, d.events[1][1]
    assert_equal 2, d.events[1][2]["a"]

    assert include_http_header?(d.events[0][2])
    assert include_http_header?(d.events[1][2])
  end

  def test_multi_json_with_custom_parser
    d = create_driver(config + %[
        <parse>
          @type json
          keep_time_key true
          time_key foo
          time_format %iso8601
        </parse>
    ])

    time = event_time("2011-01-02 13:14:15 UTC")
    time_s = Time.at(time).iso8601

    records = [{"foo"=>time_s,"bar"=>"test1"},{"foo"=>time_s,"bar"=>"test2"}]
    tag = "tag1"
    res_codes = []

    d.run(expect_records: 2, timeout: 5) do
      res = post("/#{tag}", records.to_json, {"Content-Type"=>"application/octet-stream"})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes

    assert_equal "tag1", d.events[0][0]
    assert_equal_event_time time, d.events[0][1]
    assert_equal d.events[0][2], records[0]

    assert_equal "tag1", d.events[1][0]
    assert_equal_event_time time, d.events[1][1]
    assert_equal d.events[1][2], records[1]
  end

  def test_application_json
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_csp_report
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/csp-report; charset=utf-8"})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_application_msgpack
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}?time=#{time_i.to_s}", record.to_msgpack, {"Content-Type"=>"application/msgpack"})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_application_ndjson
    d = create_driver
    events = [
        ["tag1", 1643935663, "{\"a\":1}\n{\"b\":2}"],
        ["tag2", 1643935664, "{\"a\":3}\r\n{\"b\":4}"]
    ]

    expected = [
        ["tag1", 1643935663, {"a"=>1}],
        ["tag1", 1643935663, {"b"=>2}],
        ["tag2", 1643935664, {"a"=>3}],
        ["tag2", 1643935664, {"b"=>4}]
    ]

    d.run(expect_records: 1) do
      events.each do |tag, time, record|
        res = post("/#{tag}?time=#{time}", record, {"Content-Type"=>"application/x-ndjson"})
        assert_equal("200", res.code)
      end
    end
    assert_equal(expected, d.events)
  end

  def test_msgpack
    d = create_driver
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_multi_msgpack
    d = create_driver

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i

    records = [{"a"=>1},{"a"=>2}]
    events = [
      ["tag1", time, records[0]],
      ["tag1", time, records[1]],
    ]
    tag = "tag1"
    res_codes = []
    d.run(expect_records: 2) do
      res = post("/#{tag}", {"msgpack"=>records.to_msgpack, "time"=>time_i.to_s})
      res_codes << res.code
    end
    assert_equal ["200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_with_regexp
    d = create_driver(config + %[
      format /^(?<field_1>\\d+):(?<field_2>\\w+)$/
      types field_1:integer
    ])

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"field_1" => 1, "field_2" => 'str'}],
      ["tag2", time, {"field_1" => 2, "field_2" => 'str'}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        body = record.map { |k, v|
          v.to_s
        }.join(':')
        res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'application/octet-stream'})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_with_csv
    require 'csv'

    d = create_driver(config + %[
      format csv
      keys foo,bar
    ])
    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"foo" => "1", "bar" => 'st"r'}],
      ["tag2", time, {"foo" => "2", "bar" => 'str'}],
    ]
    res_codes = []

    d.run(expect_records: 2) do
      events.each do |tag, t, record|
        body = record.map { |k, v| v }.to_csv
        res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'text/comma-separated-values'})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_response_with_empty_img
    d = create_driver(config)
    assert_equal true, d.instance.respond_with_empty_img

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []
    res_bodies = []

    d.run do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
        # Ruby returns ASCII-8 encoded string for GIF.
        res_bodies << res.body.force_encoding("UTF-8")
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal [Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE, Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE], res_bodies
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_response_without_empty_img
    d = create_driver(config + "respond_with_empty_img false")
    assert_equal false, d.instance.respond_with_empty_img

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []
    res_bodies = []

    d.run do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal [], res_bodies
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_response_use_204_response
    d = create_driver(config + %[
          respond_with_empty_img false
          use_204_response true
        ])
    assert_equal true, d.instance.use_204_response

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []
    res_bodies = []

    d.run do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["204", "204"], res_codes
    assert_equal [], res_bodies
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_cors_allowed
    d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]")
    assert_equal ["http://foo.com"], d.instance.cors_allow_origins

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []
    res_headers = []

    d.run do
      events.each do |tag, _t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://foo.com"})
        res_codes << res.code
        res_headers << res["Access-Control-Allow-Origin"]
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal ["http://foo.com", "http://foo.com"], res_headers
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_cors_allowed_wildcard
    d = create_driver(config + 'cors_allow_origins ["*"]')

    time = event_time("2011-01-02 13:14:15 UTC")
    events = [
      ["tag1", time, {"a"=>1}],
    ]

    d.run do
      events.each do |tag, time, record|
        headers = {"Origin" => "http://foo.com"}

        res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers)

        assert_equal "200", res.code
        assert_equal "*", res["Access-Control-Allow-Origin"]
      end
    end
  end

  def test_get_request
    d = create_driver(config)

    d.run do
      res = get("/cors.test", {}, {})
      assert_equal "200", res.code
    end
  end

  def test_cors_preflight
    d = create_driver(config + 'cors_allow_origins ["*"]')

    d.run do
      header = {
        "Origin" => "http://foo.com",
        "Access-Control-Request-Method" => "POST",
        "Access-Control-Request-Headers" => "Content-Type",
      }
      res = options("/cors.test", {}, header)

      assert_equal "200", res.code
      assert_equal "*", res["Access-Control-Allow-Origin"]
      assert_equal "POST", res["Access-Control-Allow-Methods"]
    end
  end

  def test_cors_allowed_wildcard_for_subdomain
    d = create_driver(config + 'cors_allow_origins ["http://*.foo.com"]')

    time = event_time("2011-01-02 13:14:15 UTC")
    events = [
      ["tag1", time, {"a"=>1}],
    ]

    d.run do
      events.each do |tag, time, record|
        headers = {"Origin" => "http://subdomain.foo.com"}

        res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers)

        assert_equal "200", res.code
        assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"]
      end
    end
  end

  def test_cors_allowed_exclude_empty_string
    d = create_driver(config + 'cors_allow_origins ["", "http://*.foo.com"]')

    time = event_time("2011-01-02 13:14:15 UTC")
    events = [
      ["tag1", time, {"a"=>1}],
    ]

    d.run do
      events.each do |tag, time, record|
        headers = {"Origin" => "http://subdomain.foo.com"}

        res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers)

        assert_equal "200", res.code
        assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"]
      end
    end
  end

  def test_cors_allowed_wildcard_preflight_for_subdomain
    d = create_driver(config + 'cors_allow_origins ["http://*.foo.com"]')

    d.run do
      header = {
        "Origin" => "http://subdomain.foo.com",
        "Access-Control-Request-Method" => "POST",
        "Access-Control-Request-Headers" => "Content-Type",
      }
      res = options("/cors.test", {}, header)

      assert_equal "200", res.code
      assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"]
      assert_equal "POST", res["Access-Control-Allow-Methods"]
    end
  end

  def test_cors_allow_credentials
    d = create_driver(config + %[
          cors_allow_origins ["http://foo.com"]
          cors_allow_credentials
        ])
    assert_equal true, d.instance.cors_allow_credentials

    time = event_time("2011-01-02 13:14:15 UTC")
    event = ["tag1", time, {"a"=>1}]
    res_code = nil
    res_header = nil

    d.run do
      res = post("/#{event[0]}", {"json"=>event[2].to_json, "time"=>time.to_i.to_s}, {"Origin"=>"http://foo.com"})
      res_code = res.code
      res_header = res["Access-Control-Allow-Credentials"]
    end
    assert_equal(
      {
        response_code: "200",
        allow_credentials_header: "true",
        events: [event]
      },
      {
        response_code: res_code,
        allow_credentials_header: res_header,
        events: d.events
      }
    )
  end

  def test_cors_allow_credentials_for_wildcard_origins
    assert_raise(Fluent::ConfigError) do
      create_driver(config + %[
        cors_allow_origins ["*"]
        cors_allow_credentials
      ])
    end
  end

  def test_content_encoding_gzip
    d = create_driver

    time = event_time("2011-01-02 13:14:15 UTC")
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run do
      events.each do |tag, time, record|
        header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'}
        res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header)
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_content_encoding_deflate
    d = create_driver

    time = event_time("2011-01-02 13:14:15 UTC")
    events = [
      ["tag1", time, {"a"=>1}],
      ["tag2", time, {"a"=>2}],
    ]
    res_codes = []

    d.run do
      events.each do |tag, time, record|
        header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'}
        res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header)
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal events, d.events
    assert_equal_event_time time, d.events[0][1]
    assert_equal_event_time time, d.events[1][1]
  end

  def test_cors_disallowed
    d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]")
    assert_equal ["http://foo.com"], d.instance.cors_allow_origins

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    res_codes = []

    d.end_if{ res_codes.size == 2 }
    d.run do
      res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"})
      res_codes << res.code
      res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"})
      res_codes << res.code
    end
    assert_equal ["403", "403"], res_codes
  end

  def test_add_query_params
    d = create_driver(config + "add_query_params true")
    assert_equal true, d.instance.add_query_params

    time = event_time("2011-01-02 13:14:15 UTC")
    time_i = time.to_i
    events = [
      ["tag1", time, {"a"=>1, "QUERY_A"=>"b"}],
      ["tag2", time, {"a"=>2, "QUERY_A"=>"b"}],
    ]
    res_codes = []
    res_bodies = []

    d.run do
      events.each do |tag, _t, record|
        res = post("/#{tag}?a=b", {"json"=>record.to_json, "time"=>time_i.to_s})
        res_codes << res.code
      end
    end
    assert_equal ["200", "200"], res_codes
    assert_equal [], res_bodies
    assert_equal events, d.events
  end

  def test_add_tag_prefix
    d = create_driver(config + "add_tag_prefix test")
    assert_equal "test", d.instance.add_tag_prefix

    time = event_time("2011-01-02 13:14:15.123 UTC")
    float_time = time.to_f

    events = [
      ["tag1", time, {"a"=>1}],
    ]
    res_codes = []

    d.run(expect_records: 1) do
      events.each do |tag, t, record|
        res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s})
        res_codes << res.code
      end
    end

    assert_equal ["200"], res_codes
    assert_equal [["test.tag1", time, {"a"=>1}]], d.events
  end

  $test_in_http_connection_object_ids = []
  $test_in_http_content_types = []
  $test_in_http_content_types_flag = false
  module ContentTypeHook
    def initialize(*args)
      @io_handler = nil
      super
    end
    def on_headers_complete(headers)
      super
      if $test_in_http_content_types_flag
        $test_in_http_content_types << self.content_type
      end
    end

    def on_message_begin
      super
      if $test_in_http_content_types_flag
        $test_in_http_connection_object_ids << @io_handler.object_id
      end
    end
  end

  class Fluent::Plugin::HttpInput::Handler
    prepend ContentTypeHook
  end

  def test_if_content_type_is_initialized_properly
    # This test is to check if Fluent::HttpInput::Handler's @content_type is initialized properly.
    # Especially when in Keep-Alive and the second request has no 'Content-Type'.

    begin
      d = create_driver

      $test_in_http_content_types_flag = true
      d.run do
        # Send two requests the second one has no Content-Type in Keep-Alive
        Net::HTTP.start("127.0.0.1", @port) do |http|
          req = Net::HTTP::Post.new("/foodb/bartbl", {"connection" => "keepalive", "Content-Type" => "application/json"})
          http.request(req)
          req = Net::HTTP::Get.new("/foodb/bartbl", {"connection" => "keepalive"})
          http.request(req)
        end

      end
    ensure
      $test_in_http_content_types_flag = false
    end
    assert_equal(['application/json', ''], $test_in_http_content_types)
    # Asserting keepalive
    assert_equal $test_in_http_connection_object_ids[0], $test_in_http_connection_object_ids[1]
  end

  def get(path, params, header = {})
    http = Net::HTTP.new("127.0.0.1", @port)
    req = Net::HTTP::Get.new(path, header)
    http.request(req)
  end

  def options(path, params, header = {})
    http = Net::HTTP.new("127.0.0.1", @port)
    req = Net::HTTP::Options.new(path, header)
    http.request(req)
  end

  def post(path, params, header = {}, &block)
    http = Net::HTTP.new("127.0.0.1", @port)
    req = Net::HTTP::Post.new(path, header)
    block.call(http, req) if block
    if params.is_a?(String)
      unless header.has_key?('Content-Type')
        header['Content-Type'] = 'application/octet-stream'
      end
      req.body = params
    else
      unless header.has_key?('Content-Type')
        header['Content-Type'] = 'application/x-www-form-urlencoded'
      end
      req.set_form_data(params)
    end
    http.request(req)
  end

  def compress_gzip(data)
    io = StringIO.new
    io.binmode
    Zlib::GzipWriter.wrap(io) { |gz| gz.write data }
    return io.string
  end

  def include_http_header?(record)
    record.keys.find { |header| header.start_with?('HTTP_') }
  end
end