Testing File Buffer Implementation in Fluentd
This test suite validates the file-based buffer implementation in Fluentd, focusing on buffer chunk management, metadata handling, and recovery scenarios. The tests ensure proper functionality of buffer operations including chunk creation, staging, queuing and error handling.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
fluent/fluentd
test/plugin/test_buf_file.rb
require_relative '../helper'
require 'fluent/plugin/buf_file'
require 'fluent/plugin/output'
require 'fluent/unique_id'
require 'fluent/system_config'
require 'fluent/env'
require 'msgpack'
module FluentPluginFileBufferTest
class DummyOutputPlugin < Fluent::Plugin::Output
Fluent::Plugin.register_output('buffer_file_test_output', self)
config_section :buffer do
config_set_default :@type, 'file'
end
def multi_workers_ready?
true
end
def write(chunk)
# drop
end
end
end
class FileBufferTest < Test::Unit::TestCase
def metadata(timekey: nil, tag: nil, variables: nil, seq: 0)
m = Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
m.seq = seq
m
end
def write_metadata_old(path, chunk_id, metadata, size, ctime, mtime)
metadata = {
timekey: metadata.timekey, tag: metadata.tag, variables: metadata.variables,
id: chunk_id,
s: size,
c: ctime,
m: mtime,
}
File.open(path, 'wb') do |f|
f.write metadata.to_msgpack
end
end
def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
metadata = {
timekey: metadata.timekey, tag: metadata.tag, variables: metadata.variables,
seq: metadata.seq,
id: chunk_id,
s: size,
c: ctime,
m: mtime,
}
data = metadata.to_msgpack
size = [data.size].pack('N')
File.open(path, 'wb') do |f|
f.write(Fluent::Plugin::Buffer::FileChunk::BUFFER_HEADER + size + data)
end
end
sub_test_case 'non configured buffer plugin instance' do
setup do
Fluent::Test.setup
@dir = File.expand_path('../../tmp/buffer_file_dir', __FILE__)
FileUtils.rm_rf @dir
FileUtils.mkdir_p @dir
end
test 'path should include * normally' do
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
p = Fluent::Plugin::FileBuffer.new
p.owner = d
p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer.*.file')}))
assert_equal File.join(@dir, 'buffer.*.file'), p.path
end
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test 'existing directory will be used with additional default file name' do |params|
conf, suffix = params
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
p = Fluent::Plugin::FileBuffer.new
p.owner = d
c = {'path' => @dir}
c['path_suffix'] = conf if conf
p.configure(config_element('buffer', '', c))
assert_equal File.join(@dir, "buffer.*.#{suffix}"), p.path
end
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test 'unexisting path without * handled as directory' do |params|
conf, suffix = params
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
p = Fluent::Plugin::FileBuffer.new
p.owner = d
c = {'path' => File.join(@dir, 'buffer')}
c['path_suffix'] = conf if conf
p.configure(config_element('buffer', '', c))
assert_equal File.join(@dir, 'buffer', "buffer.*.#{suffix}"), p.path
end
end
sub_test_case 'buffer configurations and workers' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
FileUtils.rm_rf @bufdir
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
end
test 'raise error if configured path is of existing file' do
@bufpath = File.join(@bufdir, 'buf')
FileUtils.mkdir_p @bufdir
File.open(@bufpath, 'w'){|f| } # create and close the file
assert File.exist?(@bufpath)
assert File.file?(@bufpath)
buf_conf = config_element('buffer', '', {'path' => @bufpath})
assert_raise Fluent::ConfigError.new("Plugin 'file' does not support multi workers configuration (Fluent::Plugin::FileBuffer)") do
Fluent::SystemConfig.overwrite_system_config('workers' => 4) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}, [buf_conf]))
end
end
end
test 'raise error if fluentd is configured to use file path pattern and multi workers' do
@bufpath = File.join(@bufdir, 'testbuf.*.log')
buf_conf = config_element('buffer', '', {'path' => @bufpath})
assert_raise Fluent::ConfigError.new("Plugin 'file' does not support multi workers configuration (Fluent::Plugin::FileBuffer)") do
Fluent::SystemConfig.overwrite_system_config('workers' => 4) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}, [buf_conf]))
end
end
end
test 'enables multi worker configuration with unexisting directory path' do
assert_false File.exist?(@bufdir)
buf_conf = config_element('buffer', '', {'path' => @bufdir})
assert_nothing_raised do
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do
@d.configure(config_element('ROOT', '', {}, [buf_conf]))
end
end
end
test 'enables multi worker configuration with existing directory path' do
FileUtils.mkdir_p @bufdir
buf_conf = config_element('buffer', '', {'path' => @bufdir})
assert_nothing_raised do
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do
@d.configure(config_element('ROOT', '', {}, [buf_conf]))
end
end
end
test 'enables multi worker configuration with root dir' do
buf_conf = config_element('buffer', '')
assert_nothing_raised do
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}, [buf_conf]))
end
end
end
end
sub_test_case 'buffer plugin configured only with path' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
@bufpath = File.join(@bufdir, 'testbuf.*.log')
FileUtils.rm_r @bufdir if File.exist?(@bufdir)
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test 'this is persistent plugin' do
assert @p.persistent?
end
test '#start creates directory for buffer chunks' do
plugin = Fluent::Plugin::FileBuffer.new
plugin.owner = @d
rand_num = rand(0..100)
bufpath = File.join(File.expand_path("../../tmp/buffer_file_#{rand_num}", __FILE__), 'testbuf.*.log')
bufdir = File.dirname(bufpath)
FileUtils.rm_r bufdir if File.exist?(bufdir)
assert !File.exist?(bufdir)
plugin.configure(config_element('buffer', '', {'path' => bufpath}))
assert !File.exist?(bufdir)
plugin.start
assert File.exist?(bufdir)
assert{ File.stat(bufdir).mode.to_s(8).end_with?('755') }
plugin.stop; plugin.before_shutdown; plugin.shutdown; plugin.after_shutdown; plugin.close; plugin.terminate
FileUtils.rm_r bufdir
end
test '#start creates directory for buffer chunks with specified permission' do
omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?
plugin = Fluent::Plugin::FileBuffer.new
plugin.owner = @d
rand_num = rand(0..100)
bufpath = File.join(File.expand_path("../../tmp/buffer_file_#{rand_num}", __FILE__), 'testbuf.*.log')
bufdir = File.dirname(bufpath)
FileUtils.rm_r bufdir if File.exist?(bufdir)
assert !File.exist?(bufdir)
plugin.configure(config_element('buffer', '', {'path' => bufpath, 'dir_permission' => '0700'}))
assert !File.exist?(bufdir)
plugin.start
assert File.exist?(bufdir)
assert{ File.stat(bufdir).mode.to_s(8).end_with?('700') }
plugin.stop; plugin.before_shutdown; plugin.shutdown; plugin.after_shutdown; plugin.close; plugin.terminate
FileUtils.rm_r bufdir
end
test '#start creates directory for buffer chunks with specified permission via system config' do
omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?
sysconf = {'dir_permission' => '700'}
Fluent::SystemConfig.overwrite_system_config(sysconf) do
plugin = Fluent::Plugin::FileBuffer.new
plugin.owner = @d
rand_num = rand(0..100)
bufpath = File.join(File.expand_path("../../tmp/buffer_file_#{rand_num}", __FILE__), 'testbuf.*.log')
bufdir = File.dirname(bufpath)
FileUtils.rm_r bufdir if File.exist?(bufdir)
assert !File.exist?(bufdir)
plugin.configure(config_element('buffer', '', {'path' => bufpath}))
assert !File.exist?(bufdir)
plugin.start
assert File.exist?(bufdir)
assert{ File.stat(bufdir).mode.to_s(8).end_with?('700') }
plugin.stop; plugin.before_shutdown; plugin.shutdown; plugin.after_shutdown; plugin.close; plugin.terminate
FileUtils.rm_r bufdir
end
end
test '#generate_chunk generates blank file chunk on path from unique_id of metadata' do
m1 = metadata()
c1 = @p.generate_chunk(m1)
assert c1.is_a? Fluent::Plugin::Buffer::FileChunk
assert_equal m1, c1.metadata
assert c1.empty?
assert_equal :unstaged, c1.state
assert_equal Fluent::DEFAULT_FILE_PERMISSION, c1.permission
assert_equal @bufpath.gsub('.*.', ".b#{Fluent::UniqueId.hex(c1.unique_id)}."), c1.path
assert{ File.stat(c1.path).mode.to_s(8).end_with?('644') }
m2 = metadata(timekey: event_time('2016-04-17 11:15:00 -0700').to_i)
c2 = @p.generate_chunk(m2)
assert c2.is_a? Fluent::Plugin::Buffer::FileChunk
assert_equal m2, c2.metadata
assert c2.empty?
assert_equal :unstaged, c2.state
assert_equal Fluent::DEFAULT_FILE_PERMISSION, c2.permission
assert_equal @bufpath.gsub('.*.', ".b#{Fluent::UniqueId.hex(c2.unique_id)}."), c2.path
assert{ File.stat(c2.path).mode.to_s(8).end_with?('644') }
c1.purge
c2.purge
end
test '#generate_chunk generates blank file chunk with specified permission' do
omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?
plugin = Fluent::Plugin::FileBuffer.new
plugin.owner = @d
rand_num = rand(0..100)
bufpath = File.join(File.expand_path("../../tmp/buffer_file_#{rand_num}", __FILE__), 'testbuf.*.log')
bufdir = File.dirname(bufpath)
FileUtils.rm_r bufdir if File.exist?(bufdir)
assert !File.exist?(bufdir)
plugin.configure(config_element('buffer', '', {'path' => bufpath, 'file_permission' => '0600'}))
assert !File.exist?(bufdir)
plugin.start
m = metadata()
c = plugin.generate_chunk(m)
assert c.is_a? Fluent::Plugin::Buffer::FileChunk
assert_equal m, c.metadata
assert c.empty?
assert_equal :unstaged, c.state
assert_equal 0600, c.permission
assert_equal bufpath.gsub('.*.', ".b#{Fluent::UniqueId.hex(c.unique_id)}."), c.path
assert{ File.stat(c.path).mode.to_s(8).end_with?('600') }
c.purge
plugin.stop; plugin.before_shutdown; plugin.shutdown; plugin.after_shutdown; plugin.close; plugin.terminate
FileUtils.rm_r bufdir
end
test '#generate_chunk generates blank file chunk with specified permission with system_config' do
omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?
begin
plugin = Fluent::Plugin::FileBuffer.new
plugin.owner = @d
rand_num = rand(0..100)
bufpath = File.join(File.expand_path("../../tmp/buffer_file_#{rand_num}", __FILE__), 'testbuf.*.log')
bufdir = File.dirname(bufpath)
FileUtils.rm_r bufdir if File.exist?(bufdir)
assert !File.exist?(bufdir)
plugin.configure(config_element('buffer', '', { 'path' => bufpath }))
assert !File.exist?(bufdir)
plugin.start
m = metadata()
c = nil
Fluent::SystemConfig.overwrite_system_config("file_permission" => "700") do
c = plugin.generate_chunk(m)
end
assert c.is_a? Fluent::Plugin::Buffer::FileChunk
assert_equal m, c.metadata
assert c.empty?
assert_equal :unstaged, c.state
assert_equal 0700, c.permission
assert_equal bufpath.gsub('.*.', ".b#{Fluent::UniqueId.hex(c.unique_id)}."), c.path
assert{ File.stat(c.path).mode.to_s(8).end_with?('700') }
c.purge
plugin.stop; plugin.before_shutdown; plugin.shutdown; plugin.after_shutdown; plugin.close; plugin.terminate
ensure
FileUtils.rm_r bufdir
end
end
end
sub_test_case 'configured with system root directory and plugin @id' do
setup do
@root_dir = File.expand_path('../../tmp/buffer_file_root', __FILE__)
FileUtils.rm_rf @root_dir
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
end
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test '#start creates directory for buffer chunks' do |params|
conf, suffix = params
c = {}
c['path_suffix'] = conf if conf
Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}))
@p.configure(config_element('buffer', '', c))
end
expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', "buffer.*.#{suffix}")
expected_buffer_dir = File.dirname(expected_buffer_path)
assert_equal expected_buffer_path, @p.path
assert_false Dir.exist?(expected_buffer_dir)
@p.start
assert Dir.exist?(expected_buffer_dir)
end
end
sub_test_case 'there are no existing file chunks' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
@bufpath = File.join(@bufdir, 'testbuf.*.log')
FileUtils.rm_r @bufdir if File.exist?(@bufdir)
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test '#resume returns empty buffer state' do
ary = @p.resume
assert_equal({}, ary[0])
assert_equal([], ary[1])
end
end
sub_test_case 'there are some existing file chunks' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p1 + '.meta', @c1id, metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i),
4, event_time('2016-04-17 13:58:00 -0700').to_i, event_time('2016-04-17 13:58:22 -0700').to_i
)
@c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "etest.q#{Fluent::UniqueId.hex(@c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p2 + '.meta', @c2id, metadata(timekey: event_time('2016-04-17 13:59:00 -0700').to_i),
3, event_time('2016-04-17 13:59:00 -0700').to_i, event_time('2016-04-17 13:59:23 -0700').to_i
)
@c3id = Fluent::UniqueId.generate
p3 = File.join(@bufdir, "etest.b#{Fluent::UniqueId.hex(@c3id)}.log")
File.open(p3, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p3 + '.meta', @c3id, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)
@c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "etest.b#{Fluent::UniqueId.hex(@c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p4 + '.meta', @c4id, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)
@bufpath = File.join(@bufdir, 'etest.*.log')
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test '#resume returns staged/queued chunks with metadata' do
assert_equal 2, @p.stage.size
assert_equal 2, @p.queue.size
stage = @p.stage
m3 = metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i)
assert_equal @c3id, stage[m3].unique_id
assert_equal 4, stage[m3].size
assert_equal :staged, stage[m3].state
m4 = metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i)
assert_equal @c4id, stage[m4].unique_id
assert_equal 3, stage[m4].size
assert_equal :staged, stage[m4].state
end
test '#resume returns queued chunks ordered by last modified time (FIFO)' do
assert_equal 2, @p.stage.size
assert_equal 2, @p.queue.size
queue = @p.queue
assert{ queue[0].modified_at < queue[1].modified_at }
assert_equal @c1id, queue[0].unique_id
assert_equal :queued, queue[0].state
assert_equal event_time('2016-04-17 13:58:00 -0700').to_i, queue[0].metadata.timekey
assert_nil queue[0].metadata.tag
assert_nil queue[0].metadata.variables
assert_equal Time.parse('2016-04-17 13:58:00 -0700').localtime, queue[0].created_at
assert_equal Time.parse('2016-04-17 13:58:22 -0700').localtime, queue[0].modified_at
assert_equal 4, queue[0].size
assert_equal @c2id, queue[1].unique_id
assert_equal :queued, queue[1].state
assert_equal event_time('2016-04-17 13:59:00 -0700').to_i, queue[1].metadata.timekey
assert_nil queue[1].metadata.tag
assert_nil queue[1].metadata.variables
assert_equal Time.parse('2016-04-17 13:59:00 -0700').localtime, queue[1].created_at
assert_equal Time.parse('2016-04-17 13:59:23 -0700').localtime, queue[1].modified_at
assert_equal 3, queue[1].size
end
end
sub_test_case 'there are some existing file chunks with placeholders path' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_${test}_file', __FILE__)
FileUtils.rm_rf(@bufdir)
FileUtils.mkdir_p(@bufdir)
@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p1 + '.meta', @c1id, metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i),
1, event_time('2016-04-17 13:58:00 -0700').to_i, event_time('2016-04-17 13:58:22 -0700').to_i
)
@c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "etest.b#{Fluent::UniqueId.hex(@c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
p2 + '.meta', @c2id, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
1, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)
@bufpath = File.join(@bufdir, 'etest.*.log')
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
FileUtils.rm_rf(@bufdir)
end
test '#resume returns staged/queued chunks with metadata' do
assert_equal 1, @p.stage.size
assert_equal 1, @p.queue.size
end
end
sub_test_case 'there are some existing file chunks, both in specified path and per-worker directory under specified path, configured as multi workers' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file/path', __FILE__)
@worker0_dir = File.join(@bufdir, "worker0")
@worker1_dir = File.join(@bufdir, "worker1")
FileUtils.rm_rf @bufdir
FileUtils.mkdir_p @worker0_dir
FileUtils.mkdir_p @worker1_dir
@bufdir_chunk_1 = Fluent::UniqueId.generate
bc1 = File.join(@bufdir, "buffer.q#{Fluent::UniqueId.hex(@bufdir_chunk_1)}.log")
File.open(bc1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
bc1 + '.meta', @bufdir_chunk_1, metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i),
4, event_time('2016-04-17 13:58:00 -0700').to_i, event_time('2016-04-17 13:58:22 -0700').to_i
)
@bufdir_chunk_2 = Fluent::UniqueId.generate
bc2 = File.join(@bufdir, "buffer.q#{Fluent::UniqueId.hex(@bufdir_chunk_2)}.log")
File.open(bc2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
bc2 + '.meta', @bufdir_chunk_2, metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i),
4, event_time('2016-04-17 13:58:00 -0700').to_i, event_time('2016-04-17 13:58:22 -0700').to_i
)
@worker_dir_chunk_1 = Fluent::UniqueId.generate
wc0_1 = File.join(@worker0_dir, "buffer.q#{Fluent::UniqueId.hex(@worker_dir_chunk_1)}.log")
wc1_1 = File.join(@worker1_dir, "buffer.q#{Fluent::UniqueId.hex(@worker_dir_chunk_1)}.log")
[wc0_1, wc1_1].each do |chunk_path|
File.open(chunk_path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
chunk_path + '.meta', @worker_dir_chunk_1, metadata(timekey: event_time('2016-04-17 13:59:00 -0700').to_i),
3, event_time('2016-04-17 13:59:00 -0700').to_i, event_time('2016-04-17 13:59:23 -0700').to_i
)
end
@worker_dir_chunk_2 = Fluent::UniqueId.generate
wc0_2 = File.join(@worker0_dir, "buffer.b#{Fluent::UniqueId.hex(@worker_dir_chunk_2)}.log")
wc1_2 = File.join(@worker1_dir, "buffer.b#{Fluent::UniqueId.hex(@worker_dir_chunk_2)}.log")
[wc0_2, wc1_2].each do |chunk_path|
File.open(chunk_path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
chunk_path + '.meta', @worker_dir_chunk_2, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)
end
@worker_dir_chunk_3 = Fluent::UniqueId.generate
wc0_3 = File.join(@worker0_dir, "buffer.b#{Fluent::UniqueId.hex(@worker_dir_chunk_3)}.log")
wc1_3 = File.join(@worker1_dir, "buffer.b#{Fluent::UniqueId.hex(@worker_dir_chunk_3)}.log")
[wc0_3, wc1_3].each do |chunk_path|
File.open(chunk_path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
chunk_path + '.meta', @worker_dir_chunk_3, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)
end
Fluent::Test.setup
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
end
test 'worker(id=0) #resume returns staged/queued chunks with metadata, not only in worker dir, including the directory specified by path' do
ENV['SERVERENGINE_WORKER_ID'] = '0'
buf_conf = config_element('buffer', '', {'path' => @bufdir})
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
with_worker_config(workers: 2, worker_id: 0) do
@d.configure(config_element('output', '', {}, [buf_conf]))
end
@d.start
@p = @d.buffer
assert_equal 2, @p.stage.size
assert_equal 3, @p.queue.size
stage = @p.stage
m1 = metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i)
assert_equal @worker_dir_chunk_2, stage[m1].unique_id
assert_equal 4, stage[m1].size
assert_equal :staged, stage[m1].state
m2 = metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i)
assert_equal @worker_dir_chunk_3, stage[m2].unique_id
assert_equal 3, stage[m2].size
assert_equal :staged, stage[m2].state
queue = @p.queue
assert_equal [@bufdir_chunk_1, @bufdir_chunk_2, @worker_dir_chunk_1].sort, queue.map(&:unique_id).sort
assert_equal [3, 4, 4], queue.map(&:size).sort
assert_equal [:queued, :queued, :queued], queue.map(&:state)
end
test 'worker(id=1) #resume returns staged/queued chunks with metadata, only in worker dir' do
buf_conf = config_element('buffer', '', {'path' => @bufdir})
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
with_worker_config(workers: 2, worker_id: 1) do
@d.configure(config_element('output', '', {}, [buf_conf]))
end
@d.start
@p = @d.buffer
assert_equal 2, @p.stage.size
assert_equal 1, @p.queue.size
stage = @p.stage
m1 = metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i)
assert_equal @worker_dir_chunk_2, stage[m1].unique_id
assert_equal 4, stage[m1].size
assert_equal :staged, stage[m1].state
m2 = metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i)
assert_equal @worker_dir_chunk_3, stage[m2].unique_id
assert_equal 3, stage[m2].size
assert_equal :staged, stage[m2].state
queue = @p.queue
assert_equal @worker_dir_chunk_1, queue[0].unique_id
assert_equal 3, queue[0].size
assert_equal :queued, queue[0].state
end
end
sub_test_case 'there are some existing file chunks with old format metadta' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata_old(
p1 + '.meta', @c1id, metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i),
4, event_time('2016-04-17 13:58:00 -0700').to_i, event_time('2016-04-17 13:58:22 -0700').to_i
)
@c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "etest.q#{Fluent::UniqueId.hex(@c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata_old(
p2 + '.meta', @c2id, metadata(timekey: event_time('2016-04-17 13:59:00 -0700').to_i),
3, event_time('2016-04-17 13:59:00 -0700').to_i, event_time('2016-04-17 13:59:23 -0700').to_i
)
@c3id = Fluent::UniqueId.generate
p3 = File.join(@bufdir, "etest.b#{Fluent::UniqueId.hex(@c3id)}.log")
File.open(p3, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata_old(
p3 + '.meta', @c3id, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)
@c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "etest.b#{Fluent::UniqueId.hex(@c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata_old(
p4 + '.meta', @c4id, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)
@bufpath = File.join(@bufdir, 'etest.*.log')
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test '#resume returns staged/queued chunks with metadata' do
assert_equal 2, @p.stage.size
assert_equal 2, @p.queue.size
stage = @p.stage
m3 = metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i)
assert_equal @c3id, stage[m3].unique_id
assert_equal 4, stage[m3].size
assert_equal :staged, stage[m3].state
m4 = metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i)
assert_equal @c4id, stage[m4].unique_id
assert_equal 3, stage[m4].size
assert_equal :staged, stage[m4].state
end
end
sub_test_case 'there are some existing file chunks with old format metadata file' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.201604171358.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p1, mtime: Time.parse('2016-04-17 13:58:28 -0700'))
@c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "etest.201604171359.q#{Fluent::UniqueId.hex(@c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p2, mtime: Time.parse('2016-04-17 13:59:30 -0700'))
@c3id = Fluent::UniqueId.generate
p3 = File.join(@bufdir, "etest.201604171400.b#{Fluent::UniqueId.hex(@c3id)}.log")
File.open(p3, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p3, mtime: Time.parse('2016-04-17 14:00:29 -0700'))
@c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "etest.201604171401.b#{Fluent::UniqueId.hex(@c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p4, mtime: Time.parse('2016-04-17 14:01:22 -0700'))
@bufpath = File.join(@bufdir, 'etest.*.log')
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test '#resume returns queued chunks for files without metadata' do
assert_equal 0, @p.stage.size
assert_equal 4, @p.queue.size
queue = @p.queue
m = metadata()
assert_equal @c1id, queue[0].unique_id
assert_equal m, queue[0].metadata
assert_equal 0, queue[0].size
assert_equal :queued, queue[0].state
assert_equal Time.parse('2016-04-17 13:58:28 -0700'), queue[0].modified_at
assert_equal @c2id, queue[1].unique_id
assert_equal m, queue[1].metadata
assert_equal 0, queue[1].size
assert_equal :queued, queue[1].state
assert_equal Time.parse('2016-04-17 13:59:30 -0700'), queue[1].modified_at
assert_equal @c3id, queue[2].unique_id
assert_equal m, queue[2].metadata
assert_equal 0, queue[2].size
assert_equal :queued, queue[2].state
assert_equal Time.parse('2016-04-17 14:00:29 -0700'), queue[2].modified_at
assert_equal @c4id, queue[3].unique_id
assert_equal m, queue[3].metadata
assert_equal 0, queue[3].size
assert_equal :queued, queue[3].state
assert_equal Time.parse('2016-04-17 14:01:22 -0700'), queue[3].modified_at
end
end
sub_test_case 'there are the same timekey metadata in stage' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
@bufpath = File.join(@bufdir, 'testbuf.*.log')
FileUtils.rm_r(@bufdir) if File.exist?(@bufdir)
FileUtils.mkdir_p(@bufdir)
m = metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i)
c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay1"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay2"}].to_json + "\n"
end
write_metadata(p1 + '.meta', c1id, m, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)
c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay3"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay4"}].to_json + "\n"
end
m2 = m.dup_next
write_metadata(p2 + '.meta', c2id, m2, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)
c3id = Fluent::UniqueId.generate
p3 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c3id)}.log")
File.open(p3, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay5"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
m3 = m2.dup_next
write_metadata(p3 + '.meta', c3id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)
c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay5"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
write_metadata(p4 + '.meta', c4id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
# Windows does not permit to delete files which are used in another process.
# Just ignore for removing failure.
File.delete(path) rescue nil
end
end
end
test '#resume returns each chunks' do
s, e = @p.resume
assert_equal 3, s.size
assert_equal [0, 1, 2], s.keys.map(&:seq).sort
assert_equal 1, e.size
assert_equal [0], e.map { |e| e.metadata.seq }
end
end
sub_test_case 'there are some non-buffer chunk files, with a path without buffer chunk ids' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
FileUtils.rm_rf @bufdir
FileUtils.mkdir_p @bufdir
@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.201604171358.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p1, mtime: Time.parse('2016-04-17 13:58:28 -0700'))
@not_chunk = File.join(@bufdir, 'etest.20160416.log')
File.open(@not_chunk, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-16 23:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-16 23:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-16 23:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-16 23:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(@not_chunk, mtime: Time.parse('2016-04-17 00:00:00 -0700'))
@bufpath = File.join(@bufdir, 'etest.*.log')
Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end
test '#resume returns queued chunks for files without metadata, while ignoring non-chunk looking files' do
assert_equal 0, @p.stage.size
assert_equal 1, @p.queue.size
queue = @p.queue
m = metadata()
assert_equal @c1id, queue[0].unique_id
assert_equal m, queue[0].metadata
assert_equal 0, queue[0].size
assert_equal :queued, queue[0].state
assert_equal Time.parse('2016-04-17 13:58:28 -0700'), queue[0].modified_at
assert File.exist?(@not_chunk)
end
end
sub_test_case 'there are existing broken file chunks' do
setup do
@id_output = 'backup_test'
@bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__)
FileUtils.rm_rf @bufdir rescue nil
FileUtils.mkdir_p @bufdir
@bufpath = File.join(@bufdir, 'broken_test.*.log')
Fluent::Test.setup
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
end
def setup_plugins(buf_conf)
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@d.configure(config_element('ROOT', '', {'@id' => @id_output}, [config_element('buffer', '', buf_conf)]))
@p = @d.buffer
end
def create_first_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)
return cid, path
end
def create_second_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)
return cid, path
end
def compare_staged_chunk(staged, id, time, num, mode)
assert_equal 1, staged.size
m = metadata(timekey: event_time(time).to_i)
assert_equal id, staged[m].unique_id
assert_equal num, staged[m].size
assert_equal mode, staged[m].state
end
def compare_queued_chunk(queued, id, num, mode)
assert_equal 1, queued.size
assert_equal id, queued[0].unique_id
assert_equal num, queued[0].size
assert_equal mode, queued[0].state
end
def compare_log(plugin, msg)
logs = plugin.log.out.logs
assert { logs.any? { |log| log.include?(msg) } }
end
test '#resume backups staged empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('b')
File.open(p1, 'wb') { |f| } # create staged empty chunk file
c2id, _ = create_second_chunk('b')
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end
compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged)
compare_log(@p, 'staged file chunk is empty')
assert { not File.exist?(p1) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") }
end
test '#resume backups staged broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end
compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
assert { not File.exist?(p2) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end
test '#resume backups enqueued empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('q')
File.open(p1, 'wb') { |f| } # create enqueued empty chunk file
c2id, _ = create_second_chunk('q')
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end
compare_queued_chunk(@p.queue, c2id, 3, :queued)
compare_log(@p, 'enqueued file chunk is empty')
assert { not File.exist?(p1) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") }
end
test '#resume backups enqueued broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('q')
c2id, p2 = create_second_chunk('q')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end
compare_queued_chunk(@p.queue, c1id, 4, :queued)
compare_log(@p, 'enqueued meta file is broken')
assert { not File.exist?(p2) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end
test '#resume throws away broken chunk with disable_chunk_backup' do
setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true})
c1id, _ = create_first_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end
compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
compare_log(@p, 'disable_chunk_backup is true')
assert { not File.exist?(p2) }
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end
end
end