Testing Counter Client Operations in Fluentd
This test suite validates the Fluentd Counter Client functionality, focusing on core counter operations and state management. The tests cover client-server communication, counter manipulation, and time-based reset behaviors using minitest framework.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/counter/test_client.rb
require_relative '../helper'
require 'fluent/counter/client'
require 'fluent/counter/store'
require 'fluent/counter/server'
require 'flexmock/test_unit'
require 'timecop'
class CounterClientTest < ::Test::Unit::TestCase
TEST_ADDR = '127.0.0.1'
TEST_PORT = '8277'
setup do
# timecop isn't compatible with EventTime
t = Time.parse('2016-09-22 16:59:59 +0900')
Timecop.freeze(t)
@now = Fluent::EventTime.now
@options = {
addr: TEST_ADDR,
port: TEST_PORT,
log: $log,
}
@server_name = 'server1'
@scope = "worker1\tplugin1"
@loop = Coolio::Loop.new
@server = Fluent::Counter::Server.new(@server_name, @options).start
@client = Fluent::Counter::Client.new(@loop, @options).start
end
teardown do
Timecop.return
@server.stop
@client.stop
end
test 'Callable API' do
[:establish, :init, :delete, :inc, :reset, :get].each do |m|
assert_true @client.respond_to?(m)
end
end
sub_test_case 'on_message' do
setup do
@future = flexmock('future')
@client.instance_variable_set(:@responses, { 1 => @future })
end
test 'call a set method to a corresponding object' do
@future.should_receive(:set).once.with(Hash)
@client.send(:on_message, { 'id' => 1 })
end
test "output a warning log when passed id doesn't exist" do
data = { 'id' => 2 }
mock($log).warn("Receiving missing id data: #{data}")
@client.send(:on_message, data)
end
end
def extract_value_from_server(server, scope, name)
store = server.instance_variable_get(:@store).instance_variable_get(:@storage).instance_variable_get(:@store)
key = Fluent::Counter::Store.gen_key(scope, name)
store[key]
end
def travel(sec)
# Since Timecop.travel() causes test failures on Windows/AppVeyor by inducing
# rounding errors to Time.now, we need to use Timecop.freeze() instead.
Timecop.freeze(Time.now + sec)
end
sub_test_case 'establish' do
test 'establish a scope' do
@client.establish(@scope)
assert_equal "#{@server_name}\t#{@scope}", @client.instance_variable_get(:@scope)
end
data(
empty: '',
invalid_string: '_scope',
invalid_string2: 'Scope'
)
test 'raise an error when passed scope is invalid' do |scope|
assert_raise do
@client.establish(scope)
end
end
end
sub_test_case 'init' do
setup do
@client.instance_variable_set(:@scope, @scope)
end
data(
numeric_type: [
{ name: 'key', reset_interval: 20, type: 'numeric' }, 0
],
float_type: [
{ name: 'key', reset_interval: 20, type: 'float' }, 0.0
],
integer_type: [
{ name: 'key', reset_interval: 20, type: 'integer' }, 0
]
)
test 'create a value' do |(param, initial_value)|
assert_nil extract_value_from_server(@server, @scope, param[:name])
response = @client.init(param).get
data = response.data.first
assert_nil response.errors
assert_equal param[:name], data['name']
assert_equal param[:reset_interval], data['reset_interval']
assert_equal param[:type], data['type']
assert_equal initial_value, data['current']
assert_equal initial_value, data['total']
v = extract_value_from_server(@server, @scope, param[:name])
assert_equal param[:name], v['name']
assert_equal param[:reset_interval], v['reset_interval']
assert_equal param[:type], v['type']
assert_equal initial_value, v['total']
assert_equal initial_value, v['current']
end
test 'raise an error when @scope is nil' do
@client.instance_variable_set(:@scope, nil)
assert_raise 'Call `establish` method to get a `scope` before calling this method' do
params = { name: 'key1', reset_interval: 10 }
@client.init(params).get
end
end
data(
already_exist_key: [
{ name: 'key1', reset_interval: 10 },
{ 'code' => 'invalid_params', 'message' => "worker1\tplugin1\tkey1 already exists in counter" }
],
missing_name: [
{ reset_interval: 10 },
{ 'code' => 'invalid_params', 'message' => '`name` is required' },
],
missing_reset_interval: [
{ name: 'key' },
{ 'code' => 'invalid_params', 'message' => '`reset_interval` is required' },
],
invalid_name: [
{ name: '\tkey' },
{ 'code' => 'invalid_params', 'message' => '`name` is the invalid format' }
]
)
test 'return an error object' do |(param, expected_error)|
params = { name: 'key1', reset_interval: 10 }
@client.init(params).get
response = @client.init(param).get
errors = response.errors.first
assert_empty response.data
assert_equal expected_error, errors
assert_raise {
@client.init(param).wait
}
end
test 'return an existing value when passed key already exists and ignore option is true' do
params = { name: 'key1', reset_interval: 10 }
res1 = @client.init(params).get
res2 = nil
assert_nothing_raised do
res2 = @client.init({ name: 'key1', reset_interval: 10 }, options: { ignore: true }).get
end
assert_equal res1.data, res2.data
end
test 'return an error object and data object' do
param = { name: 'key1', reset_interval: 10 }
param2 = { name: 'key2', reset_interval: 10 }
@client.init(param).get
response = @client.init([param2, param]).get
data = response.data.first
error = response.errors.first
assert_equal param2[:name], data['name']
assert_equal param2[:reset_interval], data['reset_interval']
assert_equal 'invalid_params', error['code']
assert_equal "#{@scope}\t#{param[:name]} already exists in counter", error['message']
end
test 'return a future object when async call' do
param = { name: 'key', reset_interval: 10 }
r = @client.init(param)
assert_true r.is_a?(Fluent::Counter::Future)
assert_nil r.errors
end
end
sub_test_case 'delete' do
setup do
@client.instance_variable_set(:@scope, @scope)
@name = 'key'
@key = Fluent::Counter::Store.gen_key(@scope, @name)
@init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
@client.init(@init_obj).get
end
test 'delete a value' do
assert extract_value_from_server(@server, @scope, @name)
response = @client.delete(@name).get
v = response.data.first
assert_nil response.errors
assert_equal @init_obj[:name], v['name']
assert_equal @init_obj[:type], v['type']
assert_equal @init_obj[:reset_interval], v['reset_interval']
assert_nil extract_value_from_server(@server, @scope, @name)
end
test 'raise an error when @scope is nil' do
@client.instance_variable_set(:@scope, nil)
assert_raise 'Call `establish` method to get a `scope` before calling this method' do
@client.delete(@name).get
end
end
data(
key_not_found: [
'key2',
{ 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
],
invalid_key: [
'\tkey',
{ 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
]
)
test 'return an error object' do |(param, expected_error)|
response = @client.delete(param).get
errors = response.errors.first
assert_empty response.data
assert_equal expected_error, errors
end
test 'return an error object and data object' do
unknown_name = 'key2'
response = @client.delete(@name, unknown_name).get
data = response.data.first
error = response.errors.first
assert_equal @name, data['name']
assert_equal @init_obj[:reset_interval], data['reset_interval']
assert_equal 'unknown_key', error['code']
assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']
assert_nil extract_value_from_server(@server, @scope, @name)
end
test 'return a future object when async call' do
r = @client.delete(@name)
assert_true r.is_a?(Fluent::Counter::Future)
assert_nil r.errors
end
end
sub_test_case 'inc' do
setup do
@client.instance_variable_set(:@scope, @scope)
@name = 'key'
@key = Fluent::Counter::Store.gen_key(@scope, @name)
@init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
@client.init(@init_obj).get
end
test 'increment a value' do
v = extract_value_from_server(@server, @scope, @name)
assert_equal 0, v['total']
assert_equal 0, v['current']
travel(1)
inc_obj = { name: @name, value: 10 }
@client.inc(inc_obj).get
v = extract_value_from_server(@server, @scope, @name)
assert_equal inc_obj[:value], v['total']
assert_equal inc_obj[:value], v['current']
assert_equal (@now + 1), Fluent::EventTime.new(*v['last_modified_at'])
end
test 'create and increment a value when force option is true' do
name = 'new_key'
param = { name: name, value: 11, reset_interval: 1 }
assert_nil extract_value_from_server(@server, @scope, name)
@client.inc(param, options: { force: true }).get
v = extract_value_from_server(@server, @scope, name)
assert v
assert_equal param[:name], v['name']
assert_equal 1, v['reset_interval']
assert_equal param[:value], v['current']
assert_equal param[:value], v['total']
end
test 'raise an error when @scope is nil' do
@client.instance_variable_set(:@scope, nil)
assert_raise 'Call `establish` method to get a `scope` before calling this method' do
params = { name: 'name', value: 1 }
@client.inc(params).get
end
end
data(
not_exist_key: [
{ name: 'key2', value: 10 },
{ 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
],
missing_name: [
{ value: 10 },
{ 'code' => 'invalid_params', 'message' => '`name` is required' },
],
missing_value: [
{ name: 'key' },
{ 'code' => 'invalid_params', 'message' => '`value` is required' },
],
invalid_name: [
{ name: '\tkey' },
{ 'code' => 'invalid_params', 'message' => '`name` is the invalid format' }
]
)
test 'return an error object' do |(param, expected_error)|
response = @client.inc(param).get
errors = response.errors.first
assert_empty response.data
assert_equal expected_error, errors
end
test 'return an error object and data object' do
parmas = [
{ name: @name, value: 10 },
{ name: 'unknown_key', value: 9 },
]
response = @client.inc(parmas).get
data = response.data.first
error = response.errors.first
assert_equal @name, data['name']
assert_equal 10, data['current']
assert_equal 10, data['total']
assert_equal 'unknown_key', error['code']
assert_equal "`#{@scope}\tunknown_key` doesn't exist in counter", error['message']
end
test 'return a future object when async call' do
param = { name: 'key', value: 10 }
r = @client.inc(param)
assert_true r.is_a?(Fluent::Counter::Future)
assert_nil r.errors
end
end
sub_test_case 'get' do
setup do
@client.instance_variable_set(:@scope, @scope)
@name = 'key'
@init_obj = { name: @name, reset_interval: 20, type: 'numeric' }
@client.init(@init_obj).get
end
test 'get a value' do
v1 = extract_value_from_server(@server, @scope, @name)
v2 = @client.get(@name).data.first
assert_equal v1['name'], v2['name']
assert_equal v1['current'], v2['current']
assert_equal v1['total'], v2['total']
assert_equal v1['type'], v2['type']
end
test 'raise an error when @scope is nil' do
@client.instance_variable_set(:@scope, nil)
assert_raise 'Call `establish` method to get a `scope` before calling this method' do
@client.get(@name).get
end
end
data(
key_not_found: [
'key2',
{ 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
],
invalid_key: [
'\tkey',
{ 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
]
)
test 'return an error object' do |(param, expected_error)|
response = @client.get(param).get
errors = response.errors.first
assert_empty response.data
assert_equal expected_error, errors
end
test 'return an error object and data object' do
unknown_name = 'key2'
response = @client.get(@name, unknown_name).get
data = response.data.first
error = response.errors.first
assert_equal @name, data['name']
assert_equal @init_obj[:reset_interval], data['reset_interval']
assert_equal 'unknown_key', error['code']
assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']
end
test 'return a future object when async call' do
r = @client.get(@name)
assert_true r.is_a?(Fluent::Counter::Future)
assert_nil r.errors
end
end
sub_test_case 'reset' do
setup do
@client.instance_variable_set(:@scope, @scope)
@name = 'key'
@key = Fluent::Counter::Store.gen_key(@scope, @name)
@init_obj = { name: @name, reset_interval: 5, type: 'numeric' }
@client.init(@init_obj).get
@inc_obj = { name: @name, value: 10 }
@client.inc(@inc_obj).get
end
test 'reset a value after `reset_interval` passed' do
v1 = extract_value_from_server(@server, @scope, @name)
assert_equal @inc_obj[:value], v1['total']
assert_equal @inc_obj[:value], v1['current']
assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])
travel_sec = 6 # greater than reset_interval
travel(travel_sec)
v2 = @client.reset(@name).get
data = v2.data.first
c = data['counter_data']
assert_equal travel_sec, data['elapsed_time']
assert_true data['success']
assert_equal @inc_obj[:value], c['current']
assert_equal @inc_obj[:value], c['total']
assert_equal @now, c['last_reset_at']
v1 = extract_value_from_server(@server, @scope, @name)
assert_equal 0, v1['current']
assert_equal @inc_obj[:value], v1['total']
assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_reset_at'])
assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_modified_at'])
end
test 'areturn a value object before `reset_interval` passed' do
v1 = extract_value_from_server(@server, @scope, @name)
assert_equal @inc_obj[:value], v1['total']
assert_equal @inc_obj[:value], v1['current']
assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])
travel_sec = 4 # less than reset_interval
travel(travel_sec)
v2 = @client.reset(@name).get
data = v2.data.first
c = data['counter_data']
assert_equal travel_sec, data['elapsed_time']
assert_equal false, data['success']
assert_equal @inc_obj[:value], c['current']
assert_equal @inc_obj[:value], c['total']
assert_equal @now, c['last_reset_at']
v1 = extract_value_from_server(@server, @scope, @name)
assert_equal @inc_obj[:value], v1['current']
assert_equal @inc_obj[:value], v1['total']
assert_equal @now, Fluent::EventTime.new(*v1['last_reset_at'])
end
test 'raise an error when @scope is nil' do
@client.instance_variable_set(:@scope, nil)
assert_raise 'Call `establish` method to get a `scope` before calling this method' do
@client.reset(@name).get
end
end
data(
key_not_found: [
'key2',
{ 'code' => 'unknown_key', 'message' => "`worker1\tplugin1\tkey2` doesn't exist in counter" }
],
invalid_key: [
'\tkey',
{ 'code' => 'invalid_params', 'message' => '`key` is the invalid format' }
]
)
test 'return an error object' do |(param, expected_error)|
response = @client.reset(param).get
errors = response.errors.first
assert_empty response.data
assert_equal expected_error, errors
end
test 'return an error object and data object' do
unknown_name = 'key2'
travel_sec = 6 # greater than reset_interval
travel(travel_sec)
response = @client.reset(@name, unknown_name).get
data = response.data.first
error = response.errors.first
counter = data['counter_data']
assert_true data['success']
assert_equal travel_sec, data['elapsed_time']
assert_equal @name, counter['name']
assert_equal @init_obj[:reset_interval], counter['reset_interval']
assert_equal @inc_obj[:value], counter['total']
assert_equal @inc_obj[:value], counter['current']
assert_equal 'unknown_key', error['code']
assert_equal "`#{@scope}\t#{unknown_name}` doesn't exist in counter", error['message']
v1 = extract_value_from_server(@server, @scope, @name)
assert_equal 0, v1['current']
assert_equal @inc_obj[:value], v1['total']
assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_reset_at'])
assert_equal (@now + travel_sec), Fluent::EventTime.new(*v1['last_modified_at'])
end
test 'return a future object when async call' do
r = @client.reset(@name)
assert_true r.is_a?(Fluent::Counter::Future)
assert_nil r.errors
end
end
end