Back to Repositories

Testing Counter Client Operations in Fluentd

This test suite validates the Fluentd Counter Client functionality, focusing on core counter operations and state management. The tests cover client-server communication, counter manipulation, and time-based reset behaviors using minitest framework.

Test Coverage Overview

The test suite provides comprehensive coverage of the Counter Client API and functionality:
  • Core API validation for establish, init, delete, inc, reset, and get operations
  • Message handling and response processing
  • Error handling for invalid inputs and edge cases
  • Time-based counter reset behaviors
  • Client-server communication validation

Implementation Analysis

The testing approach utilizes Ruby’s Test::Unit framework with FlexMock for mocking and Timecop for time manipulation. The tests follow a structured pattern using sub_test_cases to organize related test scenarios, with setup/teardown hooks managing test state and server initialization.

Key testing patterns include:
  • Isolated server-client communication testing
  • Mocked response handling
  • Time-frozen state verification
  • Parameterized test cases using data-driven approach

Technical Details

Testing tools and configuration:
  • Test::Unit as the primary testing framework
  • FlexMock for mock object creation
  • Timecop for time manipulation
  • Coolio::Loop for event loop handling
  • Custom test server running on 127.0.0.1:8277
  • Event-based counter store implementation

Best Practices Demonstrated

The test suite exhibits several testing best practices:
  • Proper test isolation through setup/teardown
  • Comprehensive error case coverage
  • Clear test organization using sub_test_cases
  • Consistent validation patterns
  • Thorough API contract testing
  • Effective use of test helpers and utilities

fluent/fluentd

test/counter/test_client.rb

            
require_relative '../helper'
require 'fluent/counter/client'
require 'fluent/counter/store'
require 'fluent/counter/server'
require 'flexmock/test_unit'
require 'timecop'

class CounterClientTest < ::Test::Unit::TestCase
  TEST_ADDR = '127.0.0.1'
  TEST_PORT = '8277'

  setup do
    # timecop isn't compatible with EventTime
    t = Time.parse('2016-09-22 16:59:59 +0900')
    Timecop.freeze(t)
    @now = Fluent::EventTime.now

    @options = {
      addr: TEST_ADDR,
      port: TEST_PORT,
      log: $log,
    }

    @server_name = 'server1'
    @scope = "worker1\tplugin1"
    @loop = Coolio::Loop.new
    @server = Fluent::Counter::Server.new(@server_name, @options).start
    @client = Fluent::Counter::Client.new(@loop, @options).start
  end

  teardown do
    Timecop.return
    @server.stop
    @client.stop
  end

  test 'Callable API' do
    [:establish, :init, :delete, :inc, :reset, :get].each do |m|
      assert_true @client.respond_to?(m)
    end
  end

  sub_test_case 'on_message' do
    setup do
      @future = flexmock('future')
      @client.instance_variable_set(:@responses, { 1 => @future })
    end

    test 'call a set method to a corresponding object' do
      @future.should_receive(:set).once.with(Hash)
      @client.send(:on_message, { 'id' => 1 })
    end

    test "output a warning log when passed id doesn't exist" do
      data = { 'id' => 2 }
      mock($log).warn("Receiving missing id data: #{data}")
      @client.send(:on_message, data)
    end
  end

  def extract_value_from_server(server, scope, name)
    store = server.instance_variable_get(:@store).instance_variable_get(:@storage).instance_variable_get(:@store)
    key = Fluent::Counter::Store.gen_key(scope, name)
    store[key]
  end

  def travel(sec)
    # Since Timecop.travel() causes test failures on Windows/AppVeyor by inducing
    # rounding errors to Time.now, we need to use Timecop.freeze() instead.
    Timecop.freeze(Time.now + sec)
  end

  sub_test_case 'establish' do
    test 'establish a scope' do
      @client.establish(@scope)
      assert_equal "#{@server_name}\t#{@scope}", @client.instance_variable_get(:@scope)
    end

    data(
      empty: '',
      invalid_string: '_scope',
      invalid_string2: 'Scope'
    )
    test 'raise an error when passed scope is invalid' do |scope|
      assert_raise do
        @client.establish(scope)
      end
    end
  end

  sub_test_case 'init' do
    setup do
      @client.instance_variable_set(:@scope, @scope)
    end

    data(
      numeric_type: [
        { name: 'key', reset_interval: 20, type: 'numeric' }, 0
      ],
      float_type: [
        { name: 'key', reset_interval: 20, type: 'float' }, 0.0
      ],
      integer_type: [
        { name: 'key', reset_interval: 20, type: 'integer' }, 0
      ]
    )
    test 'create a value' do |(param, initial_value)|
      assert_nil extract_value_from_server(@server, @scope, param[:name])

      response = @client.init(param).get
      data = response.data.first

      assert_nil response.errors
      assert_equal param[:name], data['name']
      assert_equal param[:reset_interval], data['reset_interval']
      assert_equal param[:type], data['type']
      assert_equal initial_value, data['current']
      assert_equal initial_value, data['total']

      v = extract_value_from_server(@server, @scope, param[:name])
      assert_equal param[:name], v['name']
      assert_equal param[:reset_interval], v['reset_interval']
      assert_equal param[:type], v['type']
      assert_equal initial_value, v['total']
      assert_equal initial_value, v['current']
    end

    test 'raise an error when @scope is nil' do
      @client.instance_variable_set(:@scope, nil)
      assert_raise 'Call `establish` method to get a `scope` before calling this method' do
        params = { name: 'key1', reset_interval: 10 }
        @client.init(params).get
      end
    end

    data(
      already_exist_key: [
        { name: 'key1', reset_interval: 10 },
        { 'code' => 'invalid_params', 'message' => "worker1\tplugin1\tkey1 already exists in counter" }
      ],
      missing_name: [
        { reset_interval: 10 },
        { 'code' => 'invalid_params', 'message' => '`name` is required' },
      ],
      missing_reset_interval: [
        { name: 'key' },
        { 'code' => 'invalid_params', 'message' => '`reset_interval` is required' },
      ],
      invalid_name: [
        { name: '\tkey' },
        { 'code' => 'invalid_params', 'message' => '`name` is the invalid format' }
      ]
    )
    test 'return an error object' do |(param, expected_error)|
      params = { name: 'key1', reset_interval: 10 }
      @client.init(params).get
      response = @client.init(param).get
      errors = response.errors.first

      assert_empty response.data
      assert_equal expected_error, errors

      assert_raise {
        @client.init(param).wait
      }
    end

    test 'return an existing value when passed key already exists and ignore option is true' do
      params = { name: 'key1', reset_interval: 10 }
      res1 = @client.init(params).get
      res2 = nil
      assert_nothing_raised do
        res2 = @client.init({ name: 'key1', reset_interval: 10 }, options: { ignore: true }).get
      end
      assert_equal res1.data, res2.data
    end

    test 'return an error object and data object' do
      param = { name: 'key1', reset_interval: 10 }
      param2 = { name: 'key2', reset_interval: 10 }
      @client.init(param).get

      response = @client.init([param2, param]).get
      data = response.data.first
      error = response.errors.first

      assert_equal param2[:name], data['name']
      assert_equal param2[:reset_interval], data['reset_interval']

      assert_equal 'invalid_params', error['code']
      assert_equal "#{@scope}\t#{param[:name]} already exists in counter", error['message']
    end

    test 'return a future object when async call' do
      param = { name: 'key', reset_interval: 10 }
      r = @client.init(param)
      assert_true r.is_a?(Fluent::Counter::Future)
      assert_nil r.errors
    end
  end

  sub_test_case 'delete' do
    setup do
      @client.instance_variable_set(:@scope, @scope)
      @name = 'key'
      @key = Fluent::Counter::Store.gen_key(@scope, @name)

      @init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
      @client.init(@init_obj).get
    end

    test 'delete a value' do
      assert extract_value_from_server(@server, @scope, @name)

      response = @client.delete(@name).get
      v = response.data.first

      assert_nil response.errors
      assert_equal @init_obj[:name], v['name']
      assert_equal @init_obj[:type], v['type']
      assert_equal @init_obj[:reset_interval], v['reset_interval']

      assert_nil extract_value_from_server(@server, @scope, @name)
    end

    test 'raise an error when @scope is nil' do
      @client.instance_variable_set(:@scope, nil)
      assert_raise 'Call `establish` method to get a `scope` before calling this method' do
        @client.delete(@name).get
      end
    end

    data(
      key_not_found: [
        'key2',
        { 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
      ],
      invalid_key: [
        '\tkey',
        { 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
      ]
    )
    test 'return an error object' do |(param, expected_error)|
      response = @client.delete(param).get
      errors = response.errors.first

      assert_empty response.data
      assert_equal expected_error, errors
    end

    test 'return an error object and data object' do
      unknown_name = 'key2'

      response = @client.delete(@name, unknown_name).get
      data = response.data.first
      error = response.errors.first

      assert_equal @name, data['name']
      assert_equal @init_obj[:reset_interval], data['reset_interval']

      assert_equal 'unknown_key', error['code']
      assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']

      assert_nil extract_value_from_server(@server, @scope, @name)
    end

    test 'return a future object when async call' do
      r = @client.delete(@name)
      assert_true r.is_a?(Fluent::Counter::Future)
      assert_nil r.errors
    end
  end

  sub_test_case 'inc' do
    setup do
      @client.instance_variable_set(:@scope, @scope)
      @name = 'key'
      @key = Fluent::Counter::Store.gen_key(@scope, @name)

      @init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
      @client.init(@init_obj).get
    end

    test 'increment a value' do
      v = extract_value_from_server(@server, @scope, @name)
      assert_equal 0, v['total']
      assert_equal 0, v['current']

      travel(1)
      inc_obj = { name: @name, value: 10 }
      @client.inc(inc_obj).get

      v = extract_value_from_server(@server, @scope, @name)
      assert_equal inc_obj[:value], v['total']
      assert_equal inc_obj[:value], v['current']
      assert_equal (@now + 1), Fluent::EventTime.new(*v['last_modified_at'])
    end

    test 'create and increment a value when force option is true' do
      name = 'new_key'
      param = { name: name, value: 11, reset_interval: 1 }

      assert_nil extract_value_from_server(@server, @scope, name)

      @client.inc(param, options: { force: true }).get

      v = extract_value_from_server(@server, @scope, name)
      assert v
      assert_equal param[:name], v['name']
      assert_equal 1, v['reset_interval']
      assert_equal param[:value], v['current']
      assert_equal param[:value], v['total']
    end

    test 'raise an error when @scope is nil' do
      @client.instance_variable_set(:@scope, nil)
      assert_raise 'Call `establish` method to get a `scope` before calling this method' do
        params = { name: 'name', value: 1 }
        @client.inc(params).get
      end
    end

    data(
      not_exist_key: [
        { name: 'key2', value: 10 },
        { 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
      ],
      missing_name: [
        { value: 10 },
        { 'code' => 'invalid_params', 'message' => '`name` is required' },
      ],
      missing_value: [
        { name: 'key' },
        { 'code' => 'invalid_params', 'message' => '`value` is required' },
      ],
      invalid_name: [
        { name: '\tkey' },
        { 'code' => 'invalid_params', 'message' => '`name` is the invalid format' }
      ]
    )
    test 'return an error object' do |(param, expected_error)|
      response = @client.inc(param).get
      errors = response.errors.first
      assert_empty response.data
      assert_equal expected_error, errors
    end

    test 'return an error object and data object' do
      parmas = [
        { name: @name, value: 10 },
        { name: 'unknown_key', value: 9 },
      ]
      response = @client.inc(parmas).get

      data = response.data.first
      error = response.errors.first

      assert_equal @name, data['name']
      assert_equal 10, data['current']
      assert_equal 10, data['total']

      assert_equal 'unknown_key', error['code']
      assert_equal "`#{@scope}\tunknown_key` doesn't exist in counter", error['message']
    end

    test 'return a future object when async call' do
      param = { name: 'key', value: 10 }
      r = @client.inc(param)
      assert_true r.is_a?(Fluent::Counter::Future)
      assert_nil r.errors
    end
  end

  sub_test_case 'get' do
    setup do
      @client.instance_variable_set(:@scope, @scope)
      @name = 'key'

      @init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
      @client.init(@init_obj).get
    end

    test 'get a value' do
      v1 = extract_value_from_server(@server, @scope, @name)
      v2 = @client.get(@name).data.first

      assert_equal v1['name'], v2['name']
      assert_equal v1['current'], v2['current']
      assert_equal v1['total'], v2['total']
      assert_equal v1['type'], v2['type']
    end

    test 'raise an error when @scope is nil' do
      @client.instance_variable_set(:@scope, nil)
      assert_raise 'Call `establish` method to get a `scope` before calling this method' do
        @client.get(@name).get
      end
    end

    data(
      key_not_found: [
        'key2',
        { 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
      ],
      invalid_key: [
        '\tkey',
        { 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
      ]
    )
    test 'return an error object' do |(param, expected_error)|
      response = @client.get(param).get
      errors = response.errors.first
      assert_empty response.data
      assert_equal expected_error, errors
    end

    test 'return an error object and data object' do
      unknown_name = 'key2'

      response = @client.get(@name, unknown_name).get
      data = response.data.first
      error = response.errors.first

      assert_equal @name, data['name']
      assert_equal @init_obj[:reset_interval], data['reset_interval']

      assert_equal 'unknown_key', error['code']
      assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']
    end

    test 'return a future object when async call' do
      r = @client.get(@name)
      assert_true r.is_a?(Fluent::Counter::Future)
      assert_nil r.errors
    end
  end

  sub_test_case 'reset' do
    setup do
      @client.instance_variable_set(:@scope, @scope)
      @name = 'key'
      @key = Fluent::Counter::Store.gen_key(@scope, @name)

      @init_obj = { name: @name, reset_interval: 5, type: 'numeric' }
      @client.init(@init_obj).get
      @inc_obj = { name: @name, value: 10 }
      @client.inc(@inc_obj).get
    end

    test 'reset a value after `reset_interval` passed' do
      v1 = extract_value_from_server(@server, @scope, @name)
      assert_equal @inc_obj[:value], v1['total']
      assert_equal @inc_obj[:value], v1['current']
      assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])

      travel_sec = 6            # greater than reset_interval
      travel(travel_sec)

      v2 = @client.reset(@name).get
      data = v2.data.first

      c = data['counter_data']

      assert_equal travel_sec, data['elapsed_time']
      assert_true data['success']

      assert_equal @inc_obj[:value], c['current']
      assert_equal @inc_obj[:value], c['total']
      assert_equal @now, c['last_reset_at']

      v1 = extract_value_from_server(@server, @scope, @name)
      assert_equal 0, v1['current']
      assert_equal @inc_obj[:value], v1['total']
      assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_reset_at'])
      assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_modified_at'])
    end

    test 'areturn a value object before `reset_interval` passed' do
      v1 = extract_value_from_server(@server, @scope, @name)
      assert_equal @inc_obj[:value], v1['total']
      assert_equal @inc_obj[:value], v1['current']
      assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])

      travel_sec = 4            # less than reset_interval
      travel(travel_sec)

      v2 = @client.reset(@name).get
      data = v2.data.first

      c = data['counter_data']

      assert_equal travel_sec, data['elapsed_time']
      assert_equal false, data['success']

      assert_equal @inc_obj[:value], c['current']
      assert_equal @inc_obj[:value], c['total']
      assert_equal @now, c['last_reset_at']

      v1 = extract_value_from_server(@server, @scope, @name)
      assert_equal @inc_obj[:value], v1['current']
      assert_equal @inc_obj[:value], v1['total']
      assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])
    end

    test 'raise an error when @scope is nil' do
      @client.instance_variable_set(:@scope, nil)
      assert_raise 'Call `establish` method to get a `scope` before calling this method' do
        @client.reset(@name).get
      end
    end

    data(
      key_not_found: [
        'key2',
        { 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
      ],
      invalid_key: [
        '\tkey',
        { 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
      ]
    )
    test 'return an error object' do |(param, expected_error)|
      response = @client.reset(param).get
      errors = response.errors.first
      assert_empty response.data
      assert_equal expected_error, errors
    end

    test 'return an error object and data object' do
      unknown_name = 'key2'

      travel_sec = 6            # greater than reset_interval
      travel(travel_sec)

      response = @client.reset(@name, unknown_name).get
      data = response.data.first
      error = response.errors.first
      counter = data['counter_data']

      assert_true data['success']
      assert_equal travel_sec, data['elapsed_time']
      assert_equal @name, counter['name']
      assert_equal @init_obj[:reset_interval], counter['reset_interval']
      assert_equal @inc_obj[:value], counter['total']
      assert_equal @inc_obj[:value], counter['current']

      assert_equal 'unknown_key', error['code']
      assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']

      v1 = extract_value_from_server(@server, @scope, @name)
      assert_equal 0, v1['current']
      assert_equal @inc_obj[:value], v1['total']
      assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_reset_at'])
      assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_modified_at'])
    end

    test 'return a future object when async call' do
      r = @client.reset(@name)
      assert_true r.is_a?(Fluent::Counter::Future)
      assert_nil r.errors
    end
  end
end