Back to Repositories

Testing Server Protocol Implementation in Fluentd

This test suite validates the server plugin helper functionality in Fluentd, focusing on TCP, UDP, and TLS server creation and connection management. It ensures proper server setup, connection handling, data transmission, and TLS certificate management across different protocols.

Test Coverage Overview

The test suite provides comprehensive coverage of server functionality including:
  • Server creation and configuration for TCP, UDP and TLS protocols
  • Connection handling and data transmission
  • TLS certificate management and verification
  • Error handling and edge cases
  • Server lifecycle management including startup and shutdown

Implementation Analysis

The testing approach uses minitest to validate server behavior through:
  • Protocol-specific server creation with different configuration options
  • Connection establishment and data transfer verification
  • Certificate generation and validation for TLS
  • Server callback handling for events like data, write completion and connection close

Technical Details

Key technical components tested include:
  • Server helper methods for TCP/UDP/TLS protocols
  • TLS certificate management and verification
  • Connection callbacks and event handling
  • Socket options and configurations
  • IPv4/IPv6 support

Best Practices Demonstrated

The test suite demonstrates testing best practices including:
  • Comprehensive protocol coverage across TCP, UDP and TLS
  • Thorough validation of security features and certificate handling
  • Edge case testing for connection management
  • Clean test organization using sub_test_cases
  • Proper teardown and resource cleanup

fluent/fluentd

test/plugin_helper/test_server.rb

            
require_relative '../helper'
require 'fluent/plugin_helper/server'
require 'fluent/plugin_helper/cert_option' # to create certs for tests
require 'fluent/plugin/base'
require 'timeout'

require 'serverengine'
require 'fileutils'

class ServerPluginHelperTest < Test::Unit::TestCase
  class Dummy < Fluent::Plugin::TestBase
    helpers :server
  end

  TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/plugin_helper_server")

  setup do
    @port = unused_port(protocol: :tcp)
    if Fluent.windows?
      @socket_manager_server = ServerEngine::SocketManager::Server.open
      @socket_manager_path = @socket_manager_server.path
    else
      @socket_manager_path = ServerEngine::SocketManager::Server.generate_path
      if @socket_manager_path.is_a?(String) && File.exist?(@socket_manager_path)
        FileUtils.rm_f @socket_manager_path
      end
      @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path)
    end
    ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s

    @d = Dummy.new
    @d.under_plugin_development = true
    @d.start
    @d.after_start
  end

  teardown do
    (@d.stopped? || @d.stop) rescue nil
    (@d.before_shutdown? || @d.before_shutdown) rescue nil
    (@d.shutdown? || @d.shutdown) rescue nil
    (@d.after_shutdown? || @d.after_shutdown) rescue nil
    (@d.closed? || @d.close) rescue nil
    (@d.terminated? || @d.terminate) rescue nil

    @socket_manager_server.close
    if @socket_manager_path.is_a?(String) && File.exist?(@socket_manager_path)
      FileUtils.rm_f @socket_manager_path
    end
  end

  sub_test_case 'plugin instance' do
    test 'can be instantiated to be able to create threads' do
      d = Dummy.new
      assert d.respond_to?(:_servers)
      assert d._servers.empty?

      assert d.respond_to?(:server_wait_until_start)
      assert d.respond_to?(:server_wait_until_stop)
      assert d.respond_to?(:server_create_connection)
      assert d.respond_to?(:server_create)
      assert d.respond_to?(:server_create_tcp)
      assert d.respond_to?(:server_create_udp)
      assert d.respond_to?(:server_create_tls)
    end

    test 'can be configured' do
      d = Dummy.new
      assert_nothing_raised do
        d.configure(config_element())
      end
      assert d.plugin_id
      assert d.log
      assert_equal 0, d.transport_config.linger_timeout
    end

    test 'can change linger_timeout option' do
      d = Dummy.new

      transport_opts = {
        'linger_timeout' => 1,
      }
      transport_conf = config_element('transport', 'tcp', transport_opts)
      conf = config_element('source', 'tag.*', {}, [transport_conf])

      assert_nothing_raised do
        d.configure(conf)
      end
      assert d.plugin_id
      assert d.log
      assert_equal 1, d.transport_config.linger_timeout
    end

    test 'can change receive_buffer_size option' do
      d = Dummy.new

      transport_opts = {
        'receive_buffer_size' => 1024,
      }
      transport_conf = config_element('transport', 'tcp', transport_opts)
      conf = config_element('source', 'tag.*', {}, [transport_conf])

      assert_nothing_raised do
        d.configure(conf)
      end
      assert d.plugin_id
      assert d.log
      assert_equal 1024, d.transport_config.receive_buffer_size
    end
  end

  # run tests for tcp, udp, tls and unix
  sub_test_case '#server_create and #server_create_connection' do
    methods = {server_create: :server_create, server_create_connection: :server_create_connection}

    data(methods)
    test 'raise error if title is not specified or not a symbol' do |m|
      assert_raise(ArgumentError.new("BUG: title must be a symbol")) do
        @d.__send__(m, nil, @port){|x| x }
      end
      assert_raise(ArgumentError.new("BUG: title must be a symbol")) do
        @d.__send__(m, "", @port){|x| x }
      end
      assert_raise(ArgumentError.new("BUG: title must be a symbol")) do
        @d.__send__(m, "title", @port){|x| x }
      end
      assert_nothing_raised do
        @d.__send__(m, :myserver, @port){|x| x }
      end
    end

    data(methods)
    test 'raise error if port is not specified or not an integer' do |m|
      assert_raise(ArgumentError.new("BUG: port must be an integer")) do
        @d.__send__(m, :myserver, nil){|x| x }
      end
      assert_raise(ArgumentError.new("BUG: port must be an integer")) do
        @d.__send__(m, :myserver, "1"){|x| x }
      end
      assert_raise(ArgumentError.new("BUG: port must be an integer")) do
        @d.__send__(m, :myserver, 1.5){|x| x }
      end
      assert_nothing_raised do
        @d.__send__(m, :myserver, @port){|x| x }
      end
    end

    data(methods)
    test 'raise error if block is not specified' do |m|
      assert_raise(ArgumentError) do
        @d.__send__(m, :myserver, @port)
      end
      assert_nothing_raised do
        @d.__send__(m, :myserver, @port){|x| x }
      end
    end

    data(methods)
    test 'creates tcp server, binds 0.0.0.0 in default' do |m|
      @d.__send__(m, :myserver, @port){|x| x }

      assert_equal 1, @d._servers.size

      created_server_info = @d._servers.first

      assert_equal :myserver, created_server_info.title
      assert_equal @port, created_server_info.port

      assert_equal :tcp, created_server_info.proto
      assert_equal "0.0.0.0", created_server_info.bind

      created_server = created_server_info.server

      assert created_server.is_a?(Coolio::TCPServer)
      assert_equal "0.0.0.0", created_server.instance_eval{ @listen_socket }.addr[3]
    end

    data(methods)
    test 'creates tcp server if specified in proto' do |m|
      @d.__send__(m, :myserver, @port, proto: :tcp){|x| x }

      created_server_info = @d._servers.first
      assert_equal :tcp, created_server_info.proto
      created_server = created_server_info.server
      assert created_server.is_a?(Coolio::TCPServer)
    end

    data(methods)
    test 'creates tls server in default if transport section and tcp protocol specified' do |m|
      @d = d = Dummy.new
      transport_conf = config_element('transport', 'tcp', {}, [])
      d.configure(config_element('ROOT', '', {}, [transport_conf]))
      d.start
      d.after_start

      d.__send__(m, :myserver, @port){|x| x }

      created_server_info = @d._servers.first
      assert_equal :tcp, created_server_info.proto
      created_server = created_server_info.server
      assert created_server.is_a?(Coolio::TCPServer)
    end

    data(methods)
    test 'creates tls server if specified in proto' do |m|
      assert_raise(ArgumentError.new("BUG: TLS transport specified, but certification options are not specified")) do
        @d.__send__(m, :myserver, @port, proto: :tls){|x| x }
      end
      @d.__send__(m, :myserver, @port, proto: :tls, tls_options: {insecure: true}){|x| x }

      created_server_info = @d._servers.first
      assert_equal :tls, created_server_info.proto
      created_server = created_server_info.server
      assert created_server.is_a?(Coolio::TCPServer) # yes, TCP here
    end

    data(methods)
    test 'creates tls server in default if transport section and tls protocol specified' do |m|
      @d = d = Dummy.new
      transport_conf = config_element('transport', 'tls', {'insecure' => 'true'}, [])
      d.configure(config_element('ROOT', '', {}, [transport_conf]))
      d.start
      d.after_start

      d.__send__(m, :myserver, @port){|x| x }

      created_server_info = @d._servers.first
      assert_equal :tls, created_server_info.proto
      created_server = created_server_info.server
      assert created_server.is_a?(Coolio::TCPServer) # OK, it's Coolio::TCPServer
    end

    data(methods)
    test 'creates unix server if specified in proto' do |m|
      # pend "not implemented yet"
    end

    data(methods)
    test 'raise error if unknown protocol specified' do |m|
      assert_raise(ArgumentError.new("BUG: invalid protocol name")) do
        @d.__send__(m, :myserver, @port, proto: :quic){|x| x }
      end
    end

    data(
      'server_create tcp' => [:server_create, :tcp],
      'server_create tls' => [:server_create, :tls],
      # 'server_create unix' => [:server_create, :unix],
      'server_create_connection tcp' => [:server_create_connection, :tcp],
      'server_create_connection tls' => [:server_create_connection, :tls],
      # 'server_create_connection tcp' => [:server_create_connection, :unix],
    )
    test 'raise error if udp options specified for tcp/tls/unix' do |(m, proto)|
      port = unused_port(protocol: proto)
      assert_raise ArgumentError do
        @d.__send__(m, :myserver, port, proto: proto, max_bytes: 128){|x| x }
      end
      assert_raise ArgumentError do
        @d.__send__(m, :myserver, port, proto: proto, flags: 1){|x| x }
      end
    end

    data(
      'server_create udp' => [:server_create, :udp],
    )
    test 'raise error if tcp/tls options specified for udp' do |(m, proto)|
      port = unused_port(protocol: proto)
      assert_raise(ArgumentError.new("BUG: linger_timeout is available for tcp/tls")) do
        @d.__send__(m, :myserver, port, proto: proto, linger_timeout: 1, max_bytes: 128){|x| x }
      end
    end

    data(
      'server_create udp' => [:server_create, :udp],
    )
    test 'raise error if tcp/tls/unix backlog options specified for udp' do |(m, proto)|
      port = unused_port(protocol: proto)
      assert_raise(ArgumentError.new("BUG: backlog is available for tcp/tls")) do
        @d.__send__(m, :myserver, port, proto: proto, backlog: 500){|x| x }
      end
    end

    data(
      'server_create udp' => [:server_create, :udp],
    )
    test 'raise error if tcp/tls send_keepalive_packet option is specified for udp' do |(m, proto)|
      port = unused_port(protocol: proto)
      assert_raise(ArgumentError.new("BUG: send_keepalive_packet is available for tcp/tls")) do
        @d.__send__(m, :myserver, port, proto: proto, send_keepalive_packet: true){|x| x }
      end
    end

    data(
      'server_create tcp' => [:server_create, :tcp, {}],
      'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
      # 'server_create unix' => [:server_create, :unix, {}],
      'server_create_connection tcp' => [:server_create_connection, :tcp, {}],
      # 'server_create_connection unix' => [:server_create_connection, :unix, {}],
    )
    test 'raise error if tls options specified for tcp/udp/unix' do |(m, proto, kwargs)|
      port = unused_port(protocol: proto)
      assert_raise(ArgumentError.new("BUG: tls_options is available only for tls")) do
        @d.__send__(m, :myserver, port, proto: proto, tls_options: {}, **kwargs){|x| x }
      end
    end

    data(
      'server_create tcp' => [:server_create, :tcp, {}],
      'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
      'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      'server_create_connection tcp' => [:server_create_connection, :tcp, {}],
      'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}],
    )
    test 'can bind specified IPv4 address' do |(m, proto, kwargs)|
      port = unused_port(protocol: proto)
      @d.__send__(m, :myserver, port, proto: proto, bind: "127.0.0.1", **kwargs){|x| x }
      assert_equal "127.0.0.1", @d._servers.first.bind
      assert_equal "127.0.0.1", @d._servers.first.server.instance_eval{ instance_variable_defined?(:@listen_socket) ? @listen_socket : @_io }.addr[3]
    end

    data(
      'server_create tcp' => [:server_create, :tcp, {}],
      'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
      'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      'server_create_connection tcp' => [:server_create_connection, :tcp, {}],
      'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}],
    )
    test 'can bind specified IPv6 address' do |(m, proto, kwargs)| # if available
      omit "IPv6 unavailable here" unless ipv6_enabled?
      port = unused_port(protocol: proto)
      @d.__send__(m, :myserver, port, proto: proto, bind: "::1", **kwargs){|x| x }
      assert_equal "::1", @d._servers.first.bind
      assert_equal "::1", @d._servers.first.server.instance_eval{ instance_variable_defined?(:@listen_socket) ? @listen_socket : @_io }.addr[3]
    end

    data(
      'server_create tcp' => [:server_create, :tcp, {}],
      'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
      'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      # 'server_create unix' => [:server_create, :unix, {}],
      'server_create_connection tcp' => [:server_create, :tcp, {}],
      'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      # 'server_create_connection unix' => [:server_create, :unix, {}],
    )
    test 'can create 2 or more servers which share same bind address and port if shared option is true' do |(m, proto, kwargs)|
      begin
        d2 = Dummy.new; d2.start; d2.after_start
        port = unused_port(protocol: proto)

        assert_nothing_raised do
          @d.__send__(m, :myserver, port, proto: proto, **kwargs){|x| x }
          d2.__send__(m, :myserver, port, proto: proto, **kwargs){|x| x }
        end
      ensure
        d2.stop; d2.before_shutdown; d2.shutdown; d2.after_shutdown; d2.close; d2.terminate
      end
    end

    data(
      'server_create tcp' => [:server_create, :tcp, {}],
      # Disable udp test because the behaviour of SO_REUSEXXX option is different betweeen BSD, Linux and others...
      # Need to find good way for testing on local, CI service and others.
      #'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
      'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      # 'server_create unix' => [:server_create, :unix, {}],
      'server_create_connection tcp' => [:server_create, :tcp, {}],
      'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
      # 'server_create_connection unix' => [:server_create, :unix, {}],
    )
    test 'cannot create 2 or more servers using same bind address and port if shared option is false' do |(m, proto, kwargs)|
      begin
        d2 = Dummy.new; d2.start; d2.after_start
        port = unused_port(protocol: proto)

        assert_nothing_raised do
          @d.__send__(m, :myserver, port, proto: proto, shared: false, **kwargs){|x| x }
        end
        assert_raise(Errno::EADDRINUSE, Errno::EACCES) do
          d2.__send__(m, :myserver, port, proto: proto, **kwargs){|x| x }
        end
      ensure
        d2.stop; d2.before_shutdown; d2.shutdown; d2.after_shutdown; d2.close; d2.terminate
      end
    end
  end

  sub_test_case '#server_create' do
    data(
      'tcp' => [:tcp, {}],
      'udp' => [:udp, {max_bytes: 128}],
      'tls' => [:tls, {tls_options: {insecure: true}}],
      # 'unix' => [:unix, {}],
    )
    test 'raise error if block argument is not specified or too many' do |(proto, kwargs)|
      port = unused_port(protocol: proto)
      assert_raise(ArgumentError.new("BUG: block must have 1 or 2 arguments")) do
        @d.server_create(:myserver, port, proto: proto, **kwargs){ 1 }
      end
      assert_raise(ArgumentError.new("BUG: block must have 1 or 2 arguments")) do
        @d.server_create(:myserver, port, proto: proto, **kwargs){|sock, conn, what_is_this| 1 }
      end
    end

    test 'creates udp server if specified in proto' do
      port = unused_port(protocol: :udp)
      @d.server_create(:myserver, port, proto: :udp, max_bytes: 512){|x| x }

      created_server_info = @d._servers.first
      assert_equal :udp, created_server_info.proto
      created_server = created_server_info.server
      assert created_server.is_a?(Fluent::PluginHelper::Server::EventHandler::UDPServer)
    end
  end

  sub_test_case '#server_create_tcp' do
    test 'can accept all keyword arguments valid for tcp server' do
      assert_nothing_raised do
        @d.server_create_tcp(:s, @port, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, send_keepalive_packet: true) do |data, conn|
          # ...
        end
      end
    end

    test 'creates a tcp server just to read data' do
      received = ""
      @d.server_create_tcp(:s, @port) do |data|
        received << data
      end
      3.times do
        sock = TCPSocket.new("127.0.0.1", @port)
        sock.puts "yay"
        sock.puts "foo"
        sock.close
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
    end

    test 'creates a tcp server to read and write data' do
      received = ""
      responses = []
      @d.server_create_tcp(:s, @port) do |data, conn|
        received << data
        conn.write "ack\n"
      end
      3.times do
        TCPSocket.open("127.0.0.1", @port) do |sock|
          sock.puts "yay"
          sock.puts "foo"
          responses << sock.readline
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'creates a tcp server to read and write data using IPv6' do
      omit "IPv6 unavailable here" unless ipv6_enabled?

      received = ""
      responses = []
      @d.server_create_tcp(:s, @port, bind: "::1") do |data, conn|
        received << data
        conn.write "ack\n"
      end
      3.times do
        TCPSocket.open("::1", @port) do |sock|
          sock.puts "yay"
          sock.puts "foo"
          responses << sock.readline
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'does not resolve name of client address in default' do
      received = ""
      sources = []
      @d.server_create_tcp(:s, @port) do |data, conn|
        received << data
        sources << conn.remote_host
      end
      3.times do
        TCPSocket.open("127.0.0.1", @port) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == "127.0.0.1" } }
    end

    test 'does resolve name of client address if resolve_name is true' do
      hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0]

      received = ""
      sources = []
      @d.server_create_tcp(:s, @port, resolve_name: true) do |data, conn|
        received << data
        sources << conn.remote_host
      end
      3.times do
        TCPSocket.open("127.0.0.1", @port) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == hostname } }
    end

    test 'can keep connections alive for tcp if keepalive specified' do
      # pend "not implemented yet"
    end

    test 'raises error if plugin registers data callback for connection object from #server_create' do
      received = ""
      errors = []
      @d.server_create_tcp(:s, @port) do |data, conn|
        received << data
        begin
          conn.data{|d| received << d.upcase }
        rescue => e
          errors << e
        end
      end
      TCPSocket.open("127.0.0.1", @port) do |sock|
        sock.puts "foo"
      end
      waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
      assert_equal "foo\n", received
      assert{ errors.size > 0 } # it might be called twice (or more) when connection was accepted, and then data arrived (or more)
      assert_equal "data callback can be registered just once, but registered twice", errors.first.message
    end

    test 'can call write_complete callback if registered' do
      buffer = ""
      lines = []
      responses = []
      response_completes = []
      @d.server_create_tcp(:s, @port) do |data, conn|
        conn.on(:write_complete){|c| response_completes << true }
        buffer << data
        if idx = buffer.index("\n")
          lines << buffer.slice!(0,idx+1)
          conn.write "ack\n"
        end
      end
      3.times do
        TCPSocket.open("127.0.0.1", @port) do |sock|
          sock.write "yay"
          sock.write "foo\n"
          begin
            responses << sock.readline
          rescue EOFError, IOError, Errno::ECONNRESET
            # ignore
          end
          sock.close
        end
      end
      waiting(10){ sleep 0.1 until lines.size == 3 && response_completes.size == 3 }
      assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
      assert_equal ["ack\n","ack\n","ack\n"], responses
      assert_equal [true, true, true], response_completes
    end

    test 'can call close callback if registered' do
      buffer = ""
      lines = []
      callback_results = []
      @d.server_create_tcp(:s, @port) do |data, conn|
        conn.on(:close){|c| callback_results << "closed" }
        buffer << data
        if idx = buffer.index("\n")
          lines << buffer.slice!(0,idx+1)
          conn.write "ack\n"
        end
      end
      3.times do
        TCPSocket.open("127.0.0.1", @port) do |sock|
          sock.write "yay"
          sock.write "foo\n"
          begin
            while line = sock.readline
              if line == "ack\n"
                sock.close
              end
            end
          rescue EOFError, IOError, Errno::ECONNRESET
            # ignore
          end
        end
      end
      waiting(10){ sleep 0.1 until lines.size == 3 && callback_results.size == 3 }
      assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
      assert_equal ["closed", "closed", "closed"], callback_results
    end

    test 'can listen IPv4 / IPv6 together' do
      omit "IPv6 unavailable here" unless ipv6_enabled?

      assert_nothing_raised do
        @d.server_create_tcp(:s_ipv4, @port, bind: '0.0.0.0', shared: false) do |data, conn|
          # ...
        end
        @d.server_create_tcp(:s_ipv6, @port, bind: '::', shared: false) do |data, conn|
          # ...
        end
      end
    end
  end

  sub_test_case '#server_create_udp' do
    test 'can accept all keyword arguments valid for udp server' do
      assert_nothing_raised do
        port = unused_port(protocol: :udp)
        @d.server_create_udp(:s, port, bind: '127.0.0.1', shared: false, resolve_name: true, max_bytes: 100, flags: 1) do |data, conn|
          # ...
        end
      end
    end

    test 'creates a udp server just to read data' do
      received = ""
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data|
        received << data
      end
      bind_port = unused_port(protocol: :udp, bind: "127.0.0.1")
      3.times do
        sock = UDPSocket.new(Socket::AF_INET)
        sock.bind("127.0.0.1", bind_port)
        sock.connect("127.0.0.1", port)
        sock.puts "yay"
        sock.puts "foo"
        sock.close
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
    end

    test 'creates a udp server to read and write data' do
      received = ""
      responses = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data, sock|
        received << data
        sock.write "ack\n"
      end
      bind_port = unused_port(protocol: :udp)
      3.times do
        begin
          sock = UDPSocket.new(Socket::AF_INET)
          sock.bind("127.0.0.1", bind_port)
          sock.connect("127.0.0.1", port)
          th = Thread.new do
            while true
              begin
                in_data, _addr = sock.recvfrom_nonblock(16)
                if in_data
                  responses << in_data
                  break
                end
              rescue IO::WaitReadable
                IO.select([sock])
              end
            end
            true
          end
          sock.write "yay\nfoo\n"
          th.join(5)
        ensure
          sock.close
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'creates a udp server to read and write data using IPv6' do
      omit "IPv6 unavailable here" unless ipv6_enabled?

      received = ""
      responses = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, bind: "::1", max_bytes: 128) do |data, sock|
        received << data
        sock.write "ack\n"
      end
      bind_port = unused_port(protocol: :udp)
      3.times do
        begin
          sock = UDPSocket.new(Socket::AF_INET6)
          sock.bind("::1", bind_port)
          th = Thread.new do
            responses << sock.recv(16)
            true
          end
          sock.connect("::1", port)
          sock.write "yay\nfoo\n"
          th.join(5)
        ensure
          sock.close
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'does not resolve name of client address in default' do
      received = ""
      sources = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data, sock|
        received << data
        sources << sock.remote_host
      end
      3.times do
        sock = UDPSocket.new(Socket::AF_INET)
        sock.connect("127.0.0.1", port)
        sock.puts "yay"
        sock.close
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == "127.0.0.1" } }
    end

    test 'does resolve name of client address if resolve_name is true' do
      hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0]

      received = ""
      sources = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, resolve_name: true, max_bytes: 128) do |data, sock|
        received << data
        sources << sock.remote_host
      end
      3.times do
        sock = UDPSocket.new(Socket::AF_INET)
        sock.connect("127.0.0.1", port)
        sock.puts "yay"
        sock.close
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == hostname } }
    end

    test 'raises error if plugin registers data callback for connection object from #server_create' do
      received = ""
      errors = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data, sock|
        received << data
        begin
          sock.data{|d| received << d.upcase }
        rescue => e
          errors << e
        end
      end
      sock = UDPSocket.new(Socket::AF_INET)
      sock.connect("127.0.0.1", port)
      sock.write "foo\n"
      sock.close

      waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
      assert_equal "foo\n", received
      assert_equal 1, errors.size
      assert_equal "BUG: this event is disabled for udp: data", errors.first.message
    end

    test 'raise error if plugin registers write_complete callback for udp' do
      received = ""
      errors = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data, sock|
        received << data
        begin
          sock.on(:write_complete){|conn| "" }
        rescue => e
          errors << e
        end
      end
      sock = UDPSocket.new(Socket::AF_INET)
      sock.connect("127.0.0.1", port)
      sock.write "foo\n"
      sock.close

      waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
      assert_equal "foo\n", received
      assert_equal 1, errors.size
      assert_equal "BUG: this event is disabled for udp: write_complete", errors.first.message
    end

    test 'raises error if plugin registers close callback for udp' do
      received = ""
      errors = []
      port = unused_port(protocol: :udp)
      @d.server_create_udp(:s, port, max_bytes: 128) do |data, sock|
        received << data
        begin
          sock.on(:close){|d| "" }
        rescue => e
          errors << e
        end
      end
      sock = UDPSocket.new(Socket::AF_INET)
      sock.connect("127.0.0.1", port)
      sock.write "foo\n"
      sock.close

      waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
      assert_equal "foo\n", received
      assert_equal 1, errors.size
      assert_equal "BUG: this event is disabled for udp: close", errors.first.message
    end

    test 'can bind IPv4 / IPv6 together' do
      omit "IPv6 unavailable here" unless ipv6_enabled?

      port = unused_port(protocol: :udp)
      assert_nothing_raised do
        @d.server_create_udp(:s_ipv4_udp, port, bind: '0.0.0.0', shared: false, max_bytes: 128) do |data, sock|
          # ...
        end
        @d.server_create_udp(:s_ipv6_udp, port, bind: '::', shared: false, max_bytes: 128) do |data, sock|
          # ...
        end
      end
    end

    sub_test_case 'over max_bytes' do
      data("cut off on Non-Windows", { max_bytes: 32, records: ["a" * 40], expected: ["a" * 32] }, keep: true) unless Fluent.windows?
      data("drop on Windows", { max_bytes: 32, records: ["a" * 40], expected: [] }, keep: true) if Fluent.windows?
      test 'with sock' do |data|
        max_bytes, records, expected = data.values

        actual_records = []
        port = unused_port(protocol: :udp)
        @d.server_create_udp(:myserver, port, max_bytes: max_bytes) do |data, sock|
          actual_records << data
        end

        open_client(:udp, "127.0.0.1", port) do |sock|
          records.each do |record|
            sock.send(record, 0)
          end
        end

        waiting(10) { sleep 0.1 until actual_records.size >= expected.size }
        sleep 1 if expected.size == 0 # To confirm no record recieved.

        assert_equal expected, actual_records
      end

      test 'without sock' do |data|
        max_bytes, records, expected = data.values

        actual_records = []
        port = unused_port(protocol: :udp)
        @d.server_create_udp(:myserver, port, max_bytes: max_bytes) do |data|
          actual_records << data
        end

        open_client(:udp, "127.0.0.1", port) do |sock|
          records.each do |record|
            sock.send(record, 0)
          end
        end

        waiting(10) { sleep 0.1 until actual_records.size >= expected.size }
        sleep 1 if expected.size == 0 # To confirm no record recieved.

        assert_equal expected, actual_records
      end
    end
  end

  module CertUtil
    extend Fluent::PluginHelper::CertOption
  end

  def create_ca_options
    {
      private_key_length: 2048,
      country: 'US',
      state: 'CA',
      locality: 'Mountain View',
      common_name: 'ca.testing.fluentd.org',
      expiration: 30 * 86400,
      digest: :sha256,
    }
  end

  def create_server_options
    {
      private_key_length: 2048,
      country: 'US',
      state: 'CA',
      locality: 'Mountain View',
      common_name: 'server.testing.fluentd.org',
      expiration: 30 * 86400,
      digest: :sha256,
    }
  end

  def write_cert_and_key(cert_path, cert, key_path, key, passphrase)
    File.open(cert_path, "w"){|f| f.write(cert.to_pem) }
    # Write the secret key (raw or encrypted by AES256) in PEM format
    key_str = passphrase ? key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase) : key.export
    File.open(key_path, "w"){|f| f.write(key_str) }
    File.chmod(0600, cert_path, key_path)
  end

  def create_server_pair_signed_by_self(cert_path, private_key_path, passphrase)
    cert, key, _ = CertUtil.cert_option_generate_server_pair_self_signed(create_server_options)
    write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
    return cert
  end

  def create_ca_pair_signed_by_self(cert_path, private_key_path, passphrase)
    cert, key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options)
    write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
  end

  def create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase)
    cert, key, _ = CertUtil.cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, create_server_options)
    write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
    return cert
  end

  def create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase)
    root_cert, root_key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options)
    write_cert_and_key(ca_cert_path, root_cert, ca_key_path, root_key, ca_key_passphrase)

    intermediate_ca_options = create_ca_options
    intermediate_ca_options[:common_name] = 'ca2.testing.fluentd.org'
    chain_cert, chain_key = CertUtil.cert_option_generate_pair(intermediate_ca_options, root_cert.subject)
    chain_cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(true)]))
    chain_cert.sign(root_key, "sha256")

    server_cert, server_key, _ = CertUtil.cert_option_generate_pair(create_server_options, chain_cert.subject)
    factory = OpenSSL::X509::ExtensionFactory.new
    server_cert.add_extension(factory.create_extension('basicConstraints', 'CA:FALSE'))
    server_cert.add_extension(factory.create_extension('nsCertType', 'server'))
    server_cert.sign(chain_key, "sha256")

    # write chained cert
    File.open(cert_path, "w") do |f|
      f.write server_cert.to_pem
      f.write chain_cert.to_pem
    end
    key_str = passphrase ? server_key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase) : server_key.export
    File.open(private_key_path, "w"){|f| f.write(key_str) }
    File.chmod(0600, cert_path, private_key_path)
  end

  def open_tls_session(addr, port, version: Fluent::TLS::DEFAULT_VERSION, verify: true, cert_path: nil, selfsigned: true, hostname: nil)
    context = OpenSSL::SSL::SSLContext.new
    context.set_params({})
    if verify
      cert_store = OpenSSL::X509::Store.new
      cert_store.set_default_paths
      if selfsigned && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE')
        cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE
      end
      if cert_path
        cert_store.add_file(cert_path)
      end
      context.verify_mode = OpenSSL::SSL::VERIFY_PEER
      context.cert_store = cert_store
      if !hostname
        context.verify_hostname = false # In test code, using hostname to be connected is very difficult
      end
    else
      context.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
    Fluent::TLS.set_version_to_context(context, version, nil, nil)

    sock = OpenSSL::SSL::SSLSocket.new(TCPSocket.new(addr, port), context)
    sock.hostname = hostname if hostname && sock.respond_to?(:hostname)
    sock.connect
    yield sock
  ensure
    sock.close rescue nil
  end

  def assert_certificate(cert, expected_extensions)
    get_extension = lambda do |oid|
      cert.extensions.detect { |e| e.oid == oid }
    end

    assert_true cert.serial > 1
    assert_equal 2, cert.version

    expected_extensions.each do |ext|
      expected_oid, expected_value = ext
      assert_equal expected_value, get_extension.call(expected_oid).value
    end
  end

  sub_test_case '#server_create_tls with various certificate options' do
    setup do
      @d = Dummy.new # to get plugin not configured/started yet

      @certs_dir = File.join(TMP_DIR, "tls_certs")
      @server_cert_dir = File.join(@certs_dir, "server")
      FileUtils.rm_rf @certs_dir
      FileUtils.mkdir_p @server_cert_dir
    end

    sub_test_case 'using tls_options arguments to specify cert options' do
      setup do
        @d.configure(config_element()); @d.start; @d.after_start
      end

      test 'create dynamic self-signed cert/key pair (without any verification from clients)' do
        # insecure
        tls_options = {
          protocol: :tls,
          version: :'TLSv1_2',
          ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
          insecure: true,
          generate_private_key_length: 2048,
          generate_cert_country: 'US',
          generate_cert_state: 'CA',
          generate_cert_locality: 'Mountain View',
          generate_cert_common_name: 'myserver.testing.fluentd.org',
          generate_cert_expiration: 10 * 365 * 86400,
          generate_cert_digest: :sha256,
        }

        received = ""
        @d.server_create_tls(:s, @port, tls_options: tls_options) do |data, conn|
          received << data
        end
        assert_raise "" do
          open_tls_session('127.0.0.1', @port) do |sock|
            sock.post_connection_check('myserver.testing.fluentd.org')
            # cannot connect ....
          end
        end
        open_tls_session('127.0.0.1', @port, verify: false) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => 'yaaaaaaaaaaaaaaaaaaay',
           'without passphrase'  => nil)
      test 'load self-signed cert/key pair (files), verified from clients using cert files' do |private_key_passphrase|
        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        cert = create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase)

        assert_certificate(cert,[
          ['basicConstraints', 'CA:FALSE'],
          ['nsCertType', 'SSL Server']
        ])

        tls_options = {
          protocol: :tls,
          version: :'TLSv1_2',
          ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
          insecure: false,
          cert_path: cert_path,
          private_key_path: private_key_path,
        }
        tls_options[:private_key_passphrase] = private_key_passphrase if private_key_passphrase
        received = ""
        @d.server_create_tls(:s, @port, tls_options: tls_options) do |data, conn|
          received << data
        end
        assert_raise "" do
          open_tls_session('127.0.0.1', @port) do |sock|
            sock.post_connection_check('server.testing.fluentd.org')
            # cannot connect by failing verification without server cert
          end
        end
        open_tls_session('127.0.0.1', @port, cert_path: cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => "fooooooooooooooooooooooooo",
           'without passphrase'  => nil)
      test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do |ca_key_passphrase|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)

        tls_options = {
          protocol: :tls,
          version: :'TLSv1_2',
          ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
          insecure: false,
          ca_cert_path: ca_cert_path,
          ca_private_key_path: ca_key_path,
          generate_private_key_length: 2048,
        }
        tls_options[:ca_private_key_passphrase] = ca_key_passphrase if ca_key_passphrase
        received = ""
        @d.server_create_tls(:s, @port, tls_options: tls_options) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => ["foooooooo", "yaaaaaaaaaaaaaaaaaaay"],
           'without passphrase'  => [nil, nil])
      test 'load static server cert by private CA cert file, verified from clients using CA cert file' do |(ca_key_passphrase, private_key_passphrase)|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)

        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        cert = create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)

        assert_certificate(cert,[
            ['basicConstraints', 'CA:FALSE'],
            ['nsCertType', 'SSL Server'],
            ['keyUsage', 'Digital Signature, Key Encipherment'],
            ['extendedKeyUsage', 'TLS Web Server Authentication']
        ])

        tls_options = {
          protocol: :tls,
          version: :'TLSv1_2',
          ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
          insecure: false,
          cert_path: cert_path,
          private_key_path: private_key_path,
        }
        tls_options[:private_key_passphrase] = private_key_passphrase if private_key_passphrase
        received = ""
        @d.server_create_tls(:s, @port, tls_options: tls_options) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => ["foooooooo", "yaaaaaaaaaaaaaaaaaaay"],
           'without passphrase'  => [nil, nil])
      test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do |(ca_key_passphrase, private_key_passphrase)|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)

        tls_options = {
          protocol: :tls,
          version: :'TLSv1_2',
          ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
          insecure: false,
          cert_path: cert_path,
          private_key_path: private_key_path,
        }
        tls_options[:private_key_passphrase] = private_key_passphrase if private_key_passphrase
        received = ""
        @d.server_create_tls(:s, @port, tls_options: tls_options) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end
    end

    sub_test_case 'using configurations to specify cert options' do
      test 'create dynamic self-signed cert/key pair (without any verification from clients)' do
        # insecure
        transport_opts = {
          'insecure' => 'true',
        }
        transport_conf = config_element('transport', 'tls', transport_opts)
        conf = config_element('match', 'tag.*', {}, [transport_conf])

        @d.configure(conf); @d.start; @d.after_start

        received = ""
        @d.server_create_tls(:s, @port) do |data, conn|
          received << data
        end
        assert_raise "" do
          open_tls_session('127.0.0.1', @port) do |sock|
            sock.post_connection_check('myserver.testing.fluentd.org')
            # cannot connect ....
          end
        end
        open_tls_session('127.0.0.1', @port, verify: false) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => "yaaaaaaaaaaaaaaaaaaay",
           'without passphrase'  => nil)
      test 'load self-signed cert/key pair (files), verified from clients using cert files' do |private_key_passphrase|
        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase)

        transport_opts = {
          'cert_path' => cert_path,
          'private_key_path' => private_key_path,
        }
        transport_opts['private_key_passphrase'] = private_key_passphrase if private_key_passphrase
        transport_conf = config_element('transport', 'tls', transport_opts)
        conf = config_element('match', 'tag.*', {}, [transport_conf])

        @d.configure(conf); @d.start; @d.after_start

        received = ""
        @d.server_create_tls(:s, @port) do |data, conn|
          received << data
        end
        assert_raise "" do
          open_tls_session('127.0.0.1', @port) do |sock|
            sock.post_connection_check('server.testing.fluentd.org')
            # cannot connect by failing verification without server cert
          end
        end
        open_tls_session('127.0.0.1', @port, cert_path: cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => "fooooooooooooooooooooooooo",
           'without passphrase'  => nil)
      test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do |ca_key_passphrase|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)

        transport_opts = {
          'ca_cert_path' => ca_cert_path,
          'ca_private_key_path' => ca_key_path,
        }
        transport_opts['ca_private_key_passphrase'] = ca_key_passphrase if ca_key_passphrase
        transport_conf = config_element('transport', 'tls', transport_opts)
        conf = config_element('match', 'tag.*', {}, [transport_conf])

        @d.configure(conf); @d.start; @d.after_start

        received = ""
        @d.server_create_tls(:s, @port) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => ["foooooooo", "yaaaaaaaaaaaaaaaaaaay"],
           'without passphrase'  => [nil, nil])
      test 'load static server cert by private CA cert file, verified from clients using CA cert file' do |(ca_key_passphrase, private_key_passphrase)|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)

        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)

        transport_opts = {
          'cert_path' => cert_path,
          'private_key_path' => private_key_path,
        }
        transport_opts['private_key_passphrase'] = private_key_passphrase if private_key_passphrase
        transport_conf = config_element('transport', 'tls', transport_opts)
        conf = config_element('match', 'tag.*', {}, [transport_conf])

        @d.configure(conf); @d.start; @d.after_start

        received = ""
        @d.server_create_tls(:s, @port) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      data('with passphrase' => ["foooooooo", "yaaaaaaaaaaaaaaaaaaay"],
           'without passphrase'  => [nil, nil])
      test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do |(ca_key_passphrase, private_key_passphrase)|
        ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
        ca_key_path = File.join(@certs_dir, "ca.key.pem")
        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)

        transport_opts = {
          'cert_path' => cert_path,
          'private_key_path' => private_key_path,
        }
        transport_opts['private_key_passphrase'] = private_key_passphrase if private_key_passphrase
        transport_conf = config_element('transport', 'tls', transport_opts)
        conf = config_element('match', 'tag.*', {}, [transport_conf])

        @d.configure(conf); @d.start; @d.after_start

        received = ""
        @d.server_create_tls(:s, @port) do |data, conn|
          received << data
        end
        open_tls_session('127.0.0.1', @port, cert_path: ca_cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
        waiting(10){ sleep 0.1 until received.bytesize == 8 }
        assert_equal "yay\nfoo\n", received
      end

      test 'set ciphers' do
        cert_path = File.join(@server_cert_dir, "cert.pem")
        private_key_path = File.join(@certs_dir, "server.key.pem")
        create_server_pair_signed_by_self(cert_path, private_key_path, nil)
        tls_options = {
          protocol: :tls,
          version: :TLSv1_2,
          ciphers: 'SHA256',
          insecure: false,
          cert_path: cert_path,
          private_key_path: private_key_path,
        }
        conf = @d.server_create_transport_section_object(tls_options)
        ctx = @d.cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
        matched = false
        ctx.ciphers.each do |cipher|
          cipher_name, tls_version = cipher
          # OpenSSL 1.0.2: "TLSv1/SSLv3"
          # OpenSSL 1.1.1: "TLSv1.2"
          if tls_version == "TLSv1/SSLv3" || tls_version == "TLSv1.2"
            matched = true
            unless cipher_name.match?(/#{conf.ciphers}/)
              matched = false
              break
            end
          end
        end

        error_msg = build_message("Unexpected ciphers for #{conf.version}",
                                  "<?>\nwas expected to include only <?> ciphers for #{conf.version}",
                                  ctx.ciphers, conf.ciphers)
        assert(matched, error_msg)
      end
    end
  end

  sub_test_case '#server_create_tls' do
    setup do
      @certs_dir = File.join(TMP_DIR, "tls_certs")
      FileUtils.rm_rf @certs_dir
      FileUtils.mkdir_p @certs_dir

      @server_cert_dir = File.join(@certs_dir, "server")
      FileUtils.mkdir_p @server_cert_dir

      @cert_path = File.join(@server_cert_dir, "cert.pem")
      private_key_path = File.join(@certs_dir, "server.key.pem")
      private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
      create_server_pair_signed_by_self(@cert_path, private_key_path, private_key_passphrase)

      @default_hostname = ::Socket.gethostname

      @tls_options = {
        protocol: :tls,
        version: :'TLSv1_2',
        ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
        insecure: false,
        cert_path: @cert_path,
        private_key_path: private_key_path,
        private_key_passphrase: private_key_passphrase,
      }
    end

    test 'can accept all keyword arguments valid for tcp/tls server' do
      assert_nothing_raised do
        @d.server_create_tls(:s, @port, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, tls_options: @tls_options, send_keepalive_packet: true) do |data, conn|
          # ...
        end
      end
    end

    test 'creates a tls server just to read data' do
      received = ""
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        received << data
      end
      3.times do
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal 3, received.scan("yay\n").size
      assert_equal 3, received.scan("foo\n").size
    end

    test 'creates a tls server to read and write data' do
      received = ""
      responses = []
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        received << data
        conn.write "ack\n"
      end
      3.times do
        # open_tls_session('127.0.0.1', @port, cert_path: @cert_path, hostname: @default_hostname) do |sock|
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
          responses << sock.readline
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal 3, received.scan("yay\n").size
      assert_equal 3, received.scan("foo\n").size
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'creates a tls server to read and write data using IPv6' do
      omit "IPv6 unavailable here" unless ipv6_enabled?

      received = ""
      responses = []
      @d.server_create_tls(:s, @port, bind: "::1",  tls_options: @tls_options) do |data, conn|
        received << data
        conn.write "ack\n"
      end
      3.times do
        # open_tls_session('::1', @port, cert_path: @cert_path, hostname: @default_hostname) do |sock|
        open_tls_session('::1', @port, cert_path: @cert_path) do |sock|
          sock.puts "yay"
          sock.puts "foo"
          responses << sock.readline
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 24 }
      assert_equal 3, received.scan("yay\n").size
      assert_equal 3, received.scan("foo\n").size
      assert_equal ["ack\n","ack\n","ack\n"], responses
    end

    test 'does not resolve name of client address in default' do
      received = ""
      sources = []
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        received << data
        sources << conn.remote_host
      end
      3.times do
        # open_tls_session('127.0.0.1', @port, cert_path: @cert_path, hostname: @default_hostname) do |sock|
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal 3, received.scan("yay\n").size
      assert{ sources.all?{|s| s == "127.0.0.1" } }
    end

    test 'does resolve name of client address if resolve_name is true' do
      hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0]

      received = ""
      sources = []
      @d.server_create_tls(:s, @port, resolve_name: true, tls_options: @tls_options) do |data, conn|
        received << data
        sources << conn.remote_host
      end
      3.times do
        # open_tls_session('127.0.0.1', @port, cert_path: @cert_path, hostname: @default_hostname) do |sock|
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal 3, received.scan("yay\n").size
      assert{ sources.all?{|s| s == hostname } }
    end

    test 'can keep connections alive for tls if keepalive specified' do
      # pend "not implemented yet"
    end

    test 'raises error if plugin registers data callback for connection object from #server_create' do
      received = ""
      errors = []
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        received << data
        begin
          conn.data{|d| received << d.upcase }
        rescue => e
          errors << e
        end
      end
      open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
        sock.puts "foo"
      end
      waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
      assert_equal "foo\n", received
      assert_equal 1, errors.size
      assert_equal "data callback can be registered just once, but registered twice", errors.first.message
    end

    test 'can call write_complete callback if registered' do
      buffer = ""
      lines = []
      responses = []
      response_completes = []
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        conn.on(:write_complete){|c| response_completes << true }
        buffer << data
        if idx = buffer.index("\n")
          lines << buffer.slice!(0,idx+1)
          conn.write "ack\n"
        end
      end
      3.times do
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.write "yay"
          sock.write "foo\n"
          begin
            responses << sock.readline
          rescue EOFError, IOError, Errno::ECONNRESET
            # ignore
          end
          sock.close
        end
      end
      waiting(10){ sleep 0.1 until lines.size == 3 && response_completes.size == 3 }
      assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
      assert_equal ["ack\n","ack\n","ack\n"], responses
      assert_equal [true, true, true], response_completes
    end

    test 'can call close callback if registered' do
      buffer = ""
      lines = []
      callback_results = []
      @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        conn.on(:close){|c| callback_results << "closed" }
        buffer << data
        if idx = buffer.index("\n")
          lines << buffer.slice!(0,idx+1)
          conn.write "ack\n"
        end
      end
      3.times do
        open_tls_session('127.0.0.1', @port, cert_path: @cert_path) do |sock|
          sock.write "yay"
          sock.write "foo\n"
          begin
            while line = sock.readline
              if line == "ack\n"
                sock.close
              end
            end
          rescue EOFError, IOError, Errno::ECONNRESET
            # ignore
          end
        end
      end
      waiting(10){ sleep 0.1 until lines.size == 3 && callback_results.size == 3 }
      assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
      assert_equal ["closed", "closed", "closed"], callback_results
    end

    sub_test_case 'TLS version connection check' do
      test "can't connect with different TLS version" do
        @d.server_create_tls(:s, @port, tls_options: @tls_options) do |data, conn|
        end
        if defined?(OpenSSL::SSL::TLS1_3_VERSION)
          version = :'TLS1_3'
        else
          version = :'TLS1_1'
        end
        assert_raise(OpenSSL::SSL::SSLError, Errno::ECONNRESET) {
          open_tls_session('127.0.0.1', @port, cert_path: @cert_path, version: version) do |sock|
          end
        }
      end

      test "can specify multiple TLS versions by min_version/max_version" do
        omit "min_version=/max_version= is not supported" unless Fluent::TLS::MIN_MAX_AVAILABLE

        min_version = :'TLS1_2'
        if defined?(OpenSSL::SSL::TLS1_3_VERSION)
          max_version = :'TLS1_3'
        else
          max_version = :'TLS1_2'
        end

        opts = @tls_options.merge(min_version: min_version, max_version: max_version)
        @d.server_create_tls(:s, @port, tls_options: opts) do |data, conn|
        end
        assert_raise(OpenSSL::SSL::SSLError, Errno::ECONNRESET) {
          open_tls_session('127.0.0.1', @port, cert_path: @cert_path, version: :'TLS1') do |sock|
          end
        }
        [min_version, max_version].each { |ver|
          assert_nothing_raised {
            open_tls_session('127.0.0.1', @port, cert_path: @cert_path, version: ver) do |sock|
            end
          }
        }
      end
    end
  end

  sub_test_case '#server_create_unix' do
    # not implemented yet

    # test 'can accept all keyword arguments valid for unix server'
    # test 'creates a unix server just to read data'
    # test 'creates a unix server to read and write data'

    # test 'raises error if plugin registers data callback for connection object from #server_create'
    # test 'can call write_complete callback if registered'
    # test 'can call close callback if registered'
  end

  def open_client(proto, addr, port)
    client = case proto
             when :udp
               c = UDPSocket.open
               c.connect(addr, port)
               c
             when :tcp
               TCPSocket.open(addr, port)
             when :tls
               c = OpenSSL::SSL::SSLSocket.new(TCPSocket.open(addr, port))
               c.sync_close = true
               c.connect
             else
               raise ArgumentError, "unknown proto:#{proto}"
             end
    yield client
  ensure
    client.close rescue nil
  end

  # run tests for tcp, tls and unix
  sub_test_case '#server_create_connection' do
    test 'raise error if udp is specified in proto' do
      assert_raise(ArgumentError.new("BUG: cannot create connection for UDP")) do
        @d.server_create_connection(:myserver, @port, proto: :udp){|c| c }
      end
    end

    # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, tls_options: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block)
    protocols = {
      'tcp' => [:tcp, {}],
      'tls' => [:tls, {tls_options: {insecure: true}}],
      # 'unix' => [:unix, {path: ""}],
    }

    data(protocols)
    test 'raise error if block argument is not specified or too many' do |(proto, kwargs)|
      empty_block = ->(){}
      assert_raise(ArgumentError.new("BUG: block must have just one argument")) do
        @d.server_create_connection(:myserver, @port, proto: proto, **kwargs, &empty_block)
      end
      assert_raise(ArgumentError.new("BUG: block must have just one argument")) do
        @d.server_create_connection(:myserver, @port, proto: proto, **kwargs){|conn, what_is_this| [conn, what_is_this] }
      end
    end

    data(protocols)
    test 'does not resolve name of client address in default' do |(proto, kwargs)|
      received = ""
      sources = []
      @d.server_create_connection(:s, @port, proto: proto, **kwargs) do |conn|
        sources << conn.remote_host
        conn.data do |d|
          received << d
        end
      end
      3.times do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == "127.0.0.1" } }
    end

    data(protocols)
    test 'does resolve name of client address if resolve_name is true' do |(proto, kwargs)|
      hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0]

      received = ""
      sources = []
      @d.server_create_connection(:s, @port, proto: proto, resolve_name: true, **kwargs) do |conn|
        sources << conn.remote_host
        conn.data do |d|
          received << d
        end
      end
      3.times do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until received.bytesize == 12 }
      assert_equal "yay\nyay\nyay\n", received
      assert{ sources.all?{|s| s == hostname } }
    end

    data(protocols)
    test 'creates a server to provide connection, which can read, write and close' do |(proto, kwargs)|
      lines = []
      buffer = ""
      @d.server_create_connection(:s, @port, proto: proto, **kwargs) do |conn|
        conn.data do |d|
          buffer << d
          if buffer == "x"
            buffer.slice!(0, 1)
            conn.close
          end
          if idx = buffer.index("\n")
            lines << buffer.slice!(0, idx + 1)
            conn.write "foo!\n"
          end
        end
      end
      replied = []
      disconnecteds = []
      3.times do |i|
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
          while line = sock.readline
            replied << line
            break
          end
          sock.write "x"
          connection_closed = false
          begin
            data = sock.read
            if data.empty?
              connection_closed = true
            end
          rescue => e
            if e.is_a?(Errno::ECONNRESET)
              connection_closed = true
            end
          ensure
            disconnecteds << connection_closed
          end
        end
      end
      waiting(10){ sleep 0.1 until lines.size == 3 }
      waiting(10){ sleep 0.1 until replied.size == 3 }
      waiting(10){ sleep 0.1 until disconnecteds.size == 3 }
      assert_equal ["yay\n", "yay\n", "yay\n"], lines
      assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied
      assert_equal [true, true, true], disconnecteds
    end

    data(protocols)
    test 'creates a server to provide connection, which accepts callbacks for data, write_complete, and close' do |(proto, kwargs)|
      lines = []
      buffer = ""
      written = 0
      closed = 0
      @d.server_create_connection(:s, @port, proto: proto, **kwargs) do |conn|
        conn.on(:write_complete){|_conn| written += 1 }
        conn.on(:close){|_conn| closed += 1 }
        conn.on(:data) do |d|
          buffer << d
          if idx = buffer.index("\n")
            lines << buffer.slice!(0, idx + 1)
            conn.write "foo!\n"
          end
        end
      end
      replied = []
      3.times do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
          while line = sock.readline
            replied << line
            break
          end
        end # TCP socket is closed here
      end
      waiting(10){ sleep 0.1 until lines.size == 3 }
      waiting(10){ sleep 0.1 until replied.size == 3 }
      waiting(10){ sleep 0.1 until closed == 3 }
      assert_equal ["yay\n", "yay\n", "yay\n"], lines
      assert_equal 3, written
      assert_equal 3, closed
      assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied
    end

    data(protocols)
    test 'creates a server, and does not leak connections' do |(proto, kwargs)|
      buffer = ""
      closed = 0
      @d.server_create_connection(:s, @port, proto: proto, **kwargs) do |conn|
        conn.on(:close){|_c| closed += 1 }
        conn.on(:data) do |d|
          buffer << d
        end
      end
      3.times do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
        end
      end
      waiting(10){ sleep 0.1 until buffer.bytesize == 12 }
      waiting(10){ sleep 0.1 until closed == 3 }
      assert_equal 0, @d.instance_eval{ @_server_connections.size }
    end

    data(protocols)
    test 'will refuse more connect requests after stop, but read data from sockets already connected, in non-shared server' do |(proto, kwargs)|
      connected = false
      begin
        open_client(proto, "127.0.0.1", @port) do |sock|
          # expected behavior is connection refused...
          connected = true
        end
      rescue
      end

      assert_false connected

      received = ""
      @d.server_create_connection(:s, @port, proto: proto, shared: false, **kwargs) do |conn|
        conn.on(:data) do |data|
          received << data
          conn.write "ack\n"
        end
      end

      th0 = Thread.new do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sock.puts "yay"
          sock.readline
        end
      end

      value0 = waiting(5){ th0.value }
      assert_equal "ack\n", value0

      stopped = false
      sleeping = false
      ending = false

      th1 = Thread.new do
        open_client(proto, "127.0.0.1", @port) do |sock|
          sleeping = true
          sleep 0.1 until stopped
          sock.puts "yay"
          res = sock.readline
          ending = true
          res
        end
      end

      sleep 0.1 until sleeping

      @d.stop
      assert @d.stopped?
      stopped = true

      sleep 0.1 until ending

      @d.before_shutdown
      @d.shutdown

      th2 = Thread.new do
        begin
          open_client(proto, "127.0.0.1", @port) do |sock|
            sock.puts "foo"
          end
          false # failed
        rescue
          true # success
        end
      end

      value1 = waiting(5){ th1.value }
      value2 = waiting(5){ th2.value }

      assert_equal "yay\nyay\n", received
      assert_equal "ack\n", value1
      assert value2, "should be truthy value to show connection was correctly refused"
    end

    test 'can keep connections alive for tcp/tls if keepalive specified' do
      # pend "not implemented yet"
    end
  end
end