Back to Repositories

Testing File Output Plugin Implementation in Fluentd

This test suite validates the file output plugin functionality in Fluentd, focusing on file path handling, compression, buffering, and timestamp-based path generation. It ensures proper file writing, path templating, and configuration options for the file output plugin.

Test Coverage Overview

The test suite provides comprehensive coverage of the Fluentd file output plugin’s core functionality.

Key areas tested include:
  • File path generation and templating
  • Compression handling (gzip)
  • Time-based file rotation
  • Buffering configurations
  • Symlink management
  • File permissions and directory creation

Implementation Analysis

The tests use Minitest framework with a structured approach focusing on isolated component testing.

Key testing patterns include:
  • Configuration validation using various path formats
  • Time manipulation using Timecop
  • File system operations verification
  • Buffer chunk management testing

Technical Details

Testing tools and setup:
  • Minitest as the testing framework
  • Timecop for time manipulation
  • FileUtils for filesystem operations
  • Zlib for compression testing
  • Fluent::Test::Driver::Output for plugin testing

Best Practices Demonstrated

The test suite showcases several testing best practices:

  • Comprehensive setup and teardown for clean test environment
  • Isolation of filesystem operations
  • Thorough edge case coverage
  • Clear test organization using sub_test_cases
  • Proper resource cleanup

fluent/fluentd

test/plugin/test_out_file.rb

            
require_relative '../helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_file'
require 'fileutils'
require 'time'
require 'timecop'
require 'zlib'
require 'fluent/file_wrapper'

class FileOutputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
    FileUtils.rm_rf(TMP_DIR)
    FileUtils.mkdir_p(TMP_DIR)
    @default_newline = if Fluent.windows?
                         "\r\n"
                       else
                         "\n"
                       end
  end

  TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file#{ENV['TEST_ENV_NUMBER']}")

  CONFIG = %[
    path #{TMP_DIR}/out_file_test
    compress gz
    utc
    <buffer>
      timekey_use_utc true
    </buffer>
  ]

  def create_driver(conf = CONFIG, opts = {})
    Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput, opts: opts).configure(conf)
  end

  sub_test_case 'configuration' do
    test 'basic configuration' do
      d = create_driver %[
        path test_path
        compress gz
      ]
      assert_equal 'test_path', d.instance.path
      assert_equal :gz, d.instance.compress
      assert_equal :gzip, d.instance.instance_eval{ @compress_method }
    end

    test 'using root_dir for buffer path' do
      system_conf_opts = {'root_dir' => File.join(TMP_DIR, 'testrootdir')}
      buf_conf = config_element('buffer', '', {'flush_interval' => '1s'})
      conf = config_element('match', '**', {'@id' => 'myout', 'path' => 'test_path', 'append' => 'true'}, [buf_conf])
      d = create_driver(conf, system_conf_opts)

      assert_equal 'test_path', d.instance.path
      assert d.instance.append

      assert d.instance.buffer.respond_to?(:path) # file buffer
      assert_equal 1, d.instance.buffer_config.flush_interval

      assert_equal File.join(TMP_DIR, 'testrootdir', 'worker0', 'myout'), d.instance.plugin_root_dir

      buffer_path_under_root_dir = File.join(TMP_DIR, 'testrootdir', 'worker0', 'myout', 'buffer', 'buffer.*.log')
      assert_equal buffer_path_under_root_dir, d.instance.buffer.path
    end

    test 'path should be writable' do
      assert_raise(Fluent::ConfigError.new("'path' parameter is required")) do
        create_driver ""
      end

      assert_nothing_raised do
        create_driver %[path #{TMP_DIR}/test_path]
      end

      assert_nothing_raised do
        FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
        File.chmod(0777, "#{TMP_DIR}/test_dir")
        create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz]
      end

      if Process.uid.nonzero?
        assert_raise(Fluent::ConfigError) do
          FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
          File.chmod(0555, "#{TMP_DIR}/test_dir")
          create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz]
        end
      end
    end

    test 'default timezone is localtime' do
      d = create_driver(%[path #{TMP_DIR}/out_file_test])
      time = event_time("2011-01-02 13:14:15 UTC")

      with_timezone(Fluent.windows? ? 'NST-8' : 'Asia/Taipei') do
        d.run(default_tag: 'test') do
          d.feed(time, {"a"=>1})
        end
      end
      assert_equal 1, d.formatted.size
      assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}#{@default_newline}], d.formatted[0]
    end

    test 'no configuration error raised for basic configuration using "*" (v0.12 style)' do
      conf = config_element('match', '**', {
          'path' => "#{TMP_DIR}/test_out.*.log",
          'time_slice_format' => '%Y%m%d',
      })
      assert_nothing_raised do
        create_driver(conf)
      end
    end

    if Process.uid.nonzero?
      test 'configuration error raised if specified directory via template is not writable' do
        Timecop.freeze(Time.parse("2016-10-04 21:33:27 UTC")) do
          conf = config_element('match', '**', {
            'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log",
          }, [ config_element('buffer', 'time,tag', {'timekey' => 86400, 'timekey_zone' => '+0000'}) ])
          FileUtils.mkdir_p("#{TMP_DIR}/prohibited")
          File.chmod(0555, "#{TMP_DIR}/prohibited")
          assert_raise Fluent::ConfigError.new("out_file: `#{TMP_DIR}/prohibited/a/file.20161004.log_**.log` is not writable") do
            create_driver(conf)
          end
        end
      end
    end

    test 'configuration using inject/format/buffer sections fully' do
      conf = config_element('match', '**', {
          'path' => "#{TMP_DIR}/${tag}/${type}/conf_test.%Y%m%d.%H%M.log",
          'add_path_suffix' => 'false',
          'append' => "true",
          'symlink_path' => "#{TMP_DIR}/${tag}/conf_test.current.log",
          'compress' => 'gzip',
          'recompress' => 'true',
        }, [
          config_element('inject', '', {
              'hostname_key' => 'hostname',
              'hostname' => 'testing.local',
              'tag_key' => 'tag',
              'time_key' => 'time',
              'time_type' => 'string',
              'time_format' => '%Y/%m/%d %H:%M:%S %z',
              'timezone' => '+0900',
          }),
          config_element('format', '', {
              '@type' => 'out_file',
              'include_tag' => 'true',
              'include_time' => 'true',
              'delimiter' => 'COMMA',
              'time_type' => 'string',
              'time_format' => '%Y-%m-%d %H:%M:%S %z',
              'utc' => 'true',
          }),
          config_element('buffer', 'time,tag,type', {
              '@type' => 'file',
              'timekey' => '15m',
              'timekey_wait' => '5s',
              'timekey_zone' => '+0000',
              'path' => "#{TMP_DIR}/buf_conf_test",
              'chunk_limit_size' => '50m',
              'total_limit_size' => '1g',
              'compress' => 'gzip',
          }),
      ])
      assert_nothing_raised do
        create_driver(conf)
      end
    end

    test 'configured as secondary with primary using chunk_key_tag and not using chunk_key_time' do
      require 'fluent/plugin/out_null'
      conf = config_element('match', '**', {
        }, [
          config_element('buffer', 'tag', {
          }),
          config_element('secondary', '', {
              '@type' => 'file',
              'path' => "#{TMP_DIR}/testing_to_dump_by_out_file",
          }),
      ])
      assert_nothing_raised do
        Fluent::Test::Driver::Output.new(Fluent::Plugin::NullOutput).configure(conf)
      end
    end

    test 'warning for symlink_path not including correct placeholders corresponding to chunk keys' do
      omit "Windows doesn't support symlink" if Fluent.windows?
      conf = config_element('match', '**', {
          'path' => "#{TMP_DIR}/${tag}/${key1}/${key2}/conf_test.%Y%m%d.%H%M.log",
          'symlink_path' => "#{TMP_DIR}/conf_test.current.log",
        }, [
          config_element('buffer', 'time,tag,key1,key2', {
              '@type' => 'file',
              'timekey' => '1d',
              'path' => "#{TMP_DIR}/buf_conf_test",
          }),
      ])
      assert_nothing_raised do
        d = create_driver(conf)
        assert do
          d.logs.count { |log| log.include?("multiple chunks are competing for a single symlink_path") } == 2
        end
      end
    end
  end

  sub_test_case 'fully configured output' do
    setup do
      Timecop.freeze(Time.parse("2016-10-03 23:58:00 UTC"))
      conf = config_element('match', '**', {
          'path' => "#{TMP_DIR}/${tag}/${type}/full.%Y%m%d.%H%M.log",
          'add_path_suffix' => 'false',
          'append' => "true",
          'symlink_path' => "#{TMP_DIR}/full.current.log",
          'compress' => 'gzip',
          'recompress' => 'true',
        }, [
          config_element('inject', '', {
              'hostname_key' => 'hostname',
              'hostname' => 'testing.local',
              'tag_key' => 'tag',
              'time_key' => 'time',
              'time_type' => 'string',
              'time_format' => '%Y/%m/%d %H:%M:%S %z',
              'timezone' => '+0900',
          }),
          config_element('format', '', {
              '@type' => 'out_file',
              'include_tag' => 'true',
              'include_time' => 'true',
              'delimiter' => 'COMMA',
              'time_type' => 'string',
              'time_format' => '%Y-%m-%d %H:%M:%S %z',
              'utc' => 'true',
          }),
          config_element('buffer', 'time,tag,type', {
              '@type' => 'file',
              'timekey' => '15m',
              'timekey_wait' => '5s',
              'timekey_zone' => '+0000',
              'path' => "#{TMP_DIR}/buf_full",
              'chunk_limit_size' => '50m',
              'total_limit_size' => '1g',
              'compress' => 'gzip',
          }),
      ])
      @d = create_driver(conf)
    end

    teardown do
      FileUtils.rm_rf("#{TMP_DIR}/buf_full")
      FileUtils.rm_rf("#{TMP_DIR}/my.data")
      FileUtils.rm_rf("#{TMP_DIR}/your.data")
      FileUtils.rm_rf("#{TMP_DIR}/full.current.log")
      Timecop.return
    end

    test 'can format/write data correctly' do
      d = @d

      assert_equal 50*1024*1024, d.instance.buffer.chunk_limit_size
      assert_equal 1*1024*1024*1024, d.instance.buffer.total_limit_size

      assert !(File.symlink?("#{TMP_DIR}/full.current.log"))

      t1 = event_time("2016-10-03 23:58:09 UTC")
      t2 = event_time("2016-10-03 23:59:33 UTC")
      t3 = event_time("2016-10-03 23:59:57 UTC")
      t4 = event_time("2016-10-04 00:00:17 UTC")
      t5 = event_time("2016-10-04 00:01:59 UTC")

      Timecop.freeze(Time.parse("2016-10-03 23:58:30 UTC"))

      d.run(start: true, flush: false, shutdown: false) do
        d.feed('my.data', t1, {"type" => "a", "message" => "data raw content"})
        d.feed('my.data', t2, {"type" => "a", "message" => "data raw content"})
        d.feed('your.data', t3, {"type" => "a", "message" => "data raw content"})
      end

      assert_equal 3, d.formatted.size

      assert Dir.exist?("#{TMP_DIR}/buf_full")
      assert !(Dir.exist?("#{TMP_DIR}/my.data/a"))
      assert !(Dir.exist?("#{TMP_DIR}/your.data/a"))
      buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ }
      assert_equal 2, buffer_files.count{|n| n.end_with?('.meta') }
      assert_equal 2, buffer_files.count{|n| !n.end_with?('.meta') }

      m1 = d.instance.metadata('my.data', t1, {"type" => "a"})
      m2 = d.instance.metadata('your.data', t3, {"type" => "a"})

      assert_equal 2, d.instance.buffer.stage.size
      b1_path = d.instance.buffer.stage[m1].path
      b1_size = File.lstat(b1_path).size

      unless Fluent.windows?
        assert File.symlink?("#{TMP_DIR}/full.current.log")
        assert_equal d.instance.buffer.stage[m2].path, File.readlink("#{TMP_DIR}/full.current.log")
      end

      Timecop.freeze(Time.parse("2016-10-04 00:00:06 UTC"))

      d.run(start: false, flush: true, shutdown: true) do
        d.feed('my.data', t4, {"type" => "a", "message" => "data raw content"})
        d.feed('your.data', t5, {"type" => "a", "message" => "data raw content"})
      end

      assert Dir.exist?("#{TMP_DIR}/buf_full")
      assert Dir.exist?("#{TMP_DIR}/my.data/a")
      assert Dir.exist?("#{TMP_DIR}/your.data/a")

      buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ }
      assert_equal 0, buffer_files.size

      assert File.exist?("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz")
      assert File.exist?("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz")
      assert File.exist?("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz")
      assert File.exist?("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz")

      assert{ File.lstat("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz").size < b1_size } # recompress

      assert_equal 5, d.formatted.size

      r1 = %!2016-10-03 23:58:09 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:58:09 +0900"}#{@default_newline}!
      r2 = %!2016-10-03 23:59:33 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:59:33 +0900"}#{@default_newline}!
      r3 = %!2016-10-03 23:59:57 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 08:59:57 +0900"}#{@default_newline}!
      r4 = %!2016-10-04 00:00:17 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 09:00:17 +0900"}#{@default_newline}!
      r5 = %!2016-10-04 00:01:59 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 09:01:59 +0900"}#{@default_newline}!
      assert_equal r1, d.formatted[0]
      assert_equal r2, d.formatted[1]
      assert_equal r3, d.formatted[2]
      assert_equal r4, d.formatted[3]
      assert_equal r5, d.formatted[4]

      read_gunzip = ->(path){
        File.open(path, 'rb'){ |fio|
          Zlib::GzipReader.new(StringIO.new(fio.read)).read
        }
      }
      assert_equal r1 + r2, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz")
      assert_equal r3, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz")
      assert_equal r4, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz")
      assert_equal r5, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz")
    end
  end

  sub_test_case 'format' do
    test 'timezone UTC specified' do
      d = create_driver

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end
      assert_equal 2, d.formatted.size
      assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}], d.formatted[0]
      assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}], d.formatted[1]
    end

    test 'time formatted with specified timezone, using area name' do
      d = create_driver %[
        path #{TMP_DIR}/out_file_test
        timezone Asia/Taipei
      ]

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      assert_equal 1, d.formatted.size
      assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}#{@default_newline}], d.formatted[0]
    end

    test 'time formatted with specified timezone, using offset' do
      d = create_driver %[
        path #{TMP_DIR}/out_file_test
        timezone -03:30
      ]

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      assert_equal 1, d.formatted.size
      assert_equal %[2011-01-02T09:44:15-03:30\ttest\t{"a":1}#{@default_newline}], d.formatted[0]
    end

    test 'configuration error raised for invalid timezone' do
      assert_raise(Fluent::ConfigError) do
        create_driver %[
          path #{TMP_DIR}/out_file_test
          timezone Invalid/Invalid
        ]
      end
    end
  end

  def check_gzipped_result(path, expect)
    # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
    # Following code from https://www.ruby-forum.com/topic/971591#979520
    result = ''
    File.open(path, "rb") { |io|
      loop do
        gzr = Zlib::GzipReader.new(StringIO.new(io.read))
        result << gzr.read
        unused = gzr.unused
        gzr.finish
        break if unused.nil?
        io.pos -= unused.length
      end
    }

    assert_equal expect, result
  end

  def check_result(path, expect)
    result = File.read(path, mode: "rb")
    assert_equal expect, result
  end

  sub_test_case 'write' do
    test 'basic case' do
      d = create_driver

      assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end

      assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")
      check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}])
    end
  end

  sub_test_case 'file/directory permissions' do
    TMP_DIR_WITH_SYSTEM = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file_system#{ENV['TEST_ENV_NUMBER']}")
    # 0750 interprets as "488". "488".to_i(8) # => 4. So, it makes wrong permission. Umm....
    OVERRIDE_DIR_PERMISSION = 750
    OVERRIDE_FILE_PERMISSION = 0620
    CONFIG_WITH_SYSTEM = %[
      path #{TMP_DIR_WITH_SYSTEM}/out_file_test
      compress gz
      utc
      <buffer>
        timekey_use_utc true
      </buffer>
      <system>
        file_permission #{OVERRIDE_FILE_PERMISSION}
        dir_permission #{OVERRIDE_DIR_PERMISSION}
      </system>
    ]

    setup do
      omit "NTFS doesn't support UNIX like permissions" if Fluent.windows?
      FileUtils.rm_rf(TMP_DIR_WITH_SYSTEM)
    end

    def parse_system(text)
      basepath = File.expand_path(File.dirname(__FILE__) + '/../../')
      Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' }
    end

    test 'write to file with permission specifications' do
      system_conf = parse_system(CONFIG_WITH_SYSTEM)
      sc = Fluent::SystemConfig.new(system_conf)
      Fluent::Engine.init(sc)
      d = create_driver CONFIG_WITH_SYSTEM

      assert_false File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz")

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end

      assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz")

      check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n])
      dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode
      assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i)
      file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode
      assert_equal(OVERRIDE_FILE_PERMISSION, file_mode[-3, 3].to_i)
    end
  end

  sub_test_case 'format specified' do
    test 'json' do
      d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n")

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end

      path = d.instance.last_written_path
      check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}])
    end

    test 'ltsv' do
      d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n")

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end

      path = d.instance.last_written_path
      check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}])
    end

    test 'single_value' do
      d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n")

      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      end

      path = d.instance.last_written_path
      check_gzipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}])
    end
  end

  test 'path with index number' do
    time = event_time("2011-01-02 13:14:15 UTC")
    formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}]

    write_once = ->(){
      d = create_driver
      d.run(default_tag: 'test'){
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      }
      d.instance.last_written_path
    }

    assert !File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")

    path = write_once.call
    assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path
    check_gzipped_result(path, formatted_lines)
    assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size

    path = write_once.call
    assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path
    check_gzipped_result(path, formatted_lines)
    assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size

    path = write_once.call
    assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path
    check_gzipped_result(path, formatted_lines)
    assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size
  end

  data(
    "with compression" => true,
    "without compression" => false,
  )
  test 'append' do |compression|
    time = event_time("2011-01-02 13:14:15 UTC")
    formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}]

    write_once = ->(){
      config = %[
        path #{TMP_DIR}/out_file_test
        utc
        append true
        <buffer>
          timekey_use_utc true
        </buffer>
      ]
      if compression
        config << "        compress gz"
      end
      d = create_driver(config)
      d.run(default_tag: 'test'){
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      }
      d.instance.last_written_path
    }

    log_file_name = "out_file_test.20110102.log"
    if compression
      log_file_name << ".gz"
    end

    1.upto(3) do |i|
      path = write_once.call
      assert_equal "#{TMP_DIR}/#{log_file_name}", path
      expect = formatted_lines * i
      if compression
        check_gzipped_result(path, expect)
      else
        check_result(path, expect)
      end
    end
  end

  test 'append when JST' do
    with_timezone(Fluent.windows? ? "JST-9" : "Asia/Tokyo") do
      time = event_time("2011-01-02 03:14:15+09:00")
      formatted_lines = %[2011-01-02T03:14:15+09:00\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T03:14:15+09:00\ttest\t{"a":2}#{@default_newline}]

      write_once = ->(){
        d = create_driver %[
          path #{TMP_DIR}/out_file_test
          compress gz
          append true
          <buffer>
            timekey_use_utc false
            timekey_zone Asia/Tokyo
          </buffer>
        ]
        d.run(default_tag: 'test'){
          d.feed(time, {"a"=>1})
          d.feed(time, {"a"=>2})
        }
        d.instance.last_written_path
      }

      path = write_once.call
      assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
      check_gzipped_result(path, formatted_lines)

      path = write_once.call
      assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
      check_gzipped_result(path, formatted_lines * 2)

      path = write_once.call
      assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
      check_gzipped_result(path, formatted_lines * 3)
    end
  end

  test 'append when UTC-02 but timekey_zone is +0900' do
    with_timezone("UTC-02") do # +0200
      time = event_time("2011-01-02 17:14:15+02:00")
      formatted_lines = %[2011-01-02T17:14:15+02:00\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T17:14:15+02:00\ttest\t{"a":2}#{@default_newline}]

      write_once = ->(){
        d = create_driver %[
          path #{TMP_DIR}/out_file_test
          compress gz
          append true
          <buffer>
            timekey_use_utc false
            timekey_zone +0900
          </buffer>
        ]
        d.run(default_tag: 'test'){
          d.feed(time, {"a"=>1})
          d.feed(time, {"a"=>2})
        }
        d.instance.last_written_path
      }

      path = write_once.call
      # Rotated at 2011-01-02 17:00:00+02:00
      assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
      check_gzipped_result(path, formatted_lines)

      path = write_once.call
      assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
      check_gzipped_result(path, formatted_lines * 2)

      path = write_once.call
      assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
      check_gzipped_result(path, formatted_lines * 3)
    end
  end

  test '${chunk_id}' do
    time = event_time("2011-01-02 13:14:15 UTC")

    write_once = ->(){
      d = create_driver %[
        path #{TMP_DIR}/out_file_chunk_id_${chunk_id}
        utc
        append true
        <buffer>
          timekey_use_utc true
        </buffer>
      ]
      d.run(default_tag: 'test'){
        d.feed(time, {"a"=>1})
        d.feed(time, {"a"=>2})
      }
      d.instance.last_written_path
    }

    path = write_once.call
    if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).20110102.log/
      unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate)
      assert_equal unique_id.size, $1.size, "chunk_id size is mismatched"
    else
      flunk "chunk_id is not included in the path"
    end
  end

  SYMLINK_PATH = File.expand_path("#{TMP_DIR}/current")

  sub_test_case 'symlink' do
    test 'static symlink' do
      omit "Windows doesn't support symlink" if Fluent.windows?
      conf = CONFIG + %[
        symlink_path #{SYMLINK_PATH}
      ]
      symlink_path = "#{SYMLINK_PATH}"

      d = create_driver(conf)
      begin
        run_and_check(d, symlink_path)
      ensure
        FileUtils.rm_rf(symlink_path)
      end
    end

    test 'symlink with placeholders' do
      omit "Windows doesn't support symlink" if Fluent.windows?
      conf = %[
        path #{TMP_DIR}/${tag}/out_file_test
        symlink_path #{SYMLINK_PATH}/foo/${tag}
        <buffer tag,time>
        </buffer>
      ]
      symlink_path = "#{SYMLINK_PATH}/foo/tag"

      d = create_driver(conf)
      begin
        run_and_check(d, symlink_path)
      ensure
        FileUtils.rm_rf(symlink_path)
      end
    end

    def run_and_check(d, symlink_path)
      d.run(default_tag: 'tag') do
        es = Fluent::OneEventStream.new(event_time("2011-01-02 13:14:15 UTC"), {"a"=>1})
        d.feed(es)

        assert File.symlink?(symlink_path)
        assert File.exist?(symlink_path) # This checks dest of symlink exists or not.

        es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2})
        d.feed(es)

        assert File.symlink?(symlink_path)
        assert File.exist?(symlink_path)

        meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {})
        assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path)
      end
    end
  end

  sub_test_case 'path' do
    test 'normal' do
      d = create_driver(%[
        path #{TMP_DIR}/out_file_test
        time_slice_format %Y-%m-%d-%H
        utc true
      ])
      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      path = d.instance.last_written_path
      assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.log", path
    end

    test 'normal with append' do
      d = create_driver(%[
        path #{TMP_DIR}/out_file_test
        time_slice_format %Y-%m-%d-%H
        utc true
        append true
      ])
      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      path = d.instance.last_written_path
      assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.log", path
    end

    test '*' do
      d = create_driver(%[
        path #{TMP_DIR}/out_file_test.*.txt
        time_slice_format %Y-%m-%d-%H
        utc true
      ])
      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      path = d.instance.last_written_path
      assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt", path
    end

    test '* with append' do
      d = create_driver(%[
        path #{TMP_DIR}/out_file_test.*.txt
        time_slice_format %Y-%m-%d-%H
        utc true
        append true
      ])
      time = event_time("2011-01-02 13:14:15 UTC")
      d.run(default_tag: 'test') do
        d.feed(time, {"a"=>1})
      end
      path = d.instance.last_written_path
      assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.txt", path
    end
  end

  sub_test_case '#timekey_to_timeformat' do
    setup do
      @d = create_driver
      @i = @d.instance
    end

    test 'returns empty string for nil' do
      assert_equal '', @i.timekey_to_timeformat(nil)
    end

    test 'returns timestamp string with seconds for timekey smaller than 60' do
      assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(1)
      assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(30)
      assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(59)
    end

    test 'returns timestamp string with minutes for timekey smaller than 3600' do
      assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(60)
      assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(180)
      assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(1800)
      assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(3599)
    end

    test 'returns timestamp string with hours for timekey smaller than 86400 (1 day)' do
      assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(3600)
      assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(7200)
      assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(86399)
    end

    test 'returns timestamp string with days for timekey equal or greater than 86400' do
      assert_equal '%Y%m%d', @i.timekey_to_timeformat(86400)
      assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000)
      assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000000)
    end
  end

  sub_test_case '#compression_suffix' do
    setup do
      @i = create_driver.instance
    end

    test 'returns empty string for nil (no compression method specified)' do
      assert_equal '', @i.compression_suffix(nil)
    end

    test 'returns .gz for gzip' do
      assert_equal '.gz', @i.compression_suffix(:gzip)
    end
  end

  sub_test_case '#generate_path_template' do
    setup do
      @i = create_driver.instance
    end

    data(
      'day' => [86400, '%Y%m%d', '%Y-%m-%d'],
      'hour' => [3600, '%Y%m%d%H', '%Y-%m-%d_%H'],
      'minute' => [60, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'],
    )
    test 'generates path with timestamp placeholder for original path with tailing star with timekey' do |data|
      timekey, placeholder, time_slice_format = data
      # with index placeholder, without compression suffix when append disabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}_**",    @i.generate_path_template('/path/to/file.*', timekey, false, nil)
      # with index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip)
      # without index placeholder, without compression suffix when append enabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}",       @i.generate_path_template('/path/to/file.*', timekey, true, nil)
      # without index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}.gz",    @i.generate_path_template('/path/to/file.*', timekey, true, :gzip)

      # time_slice_format will used instead of computed placeholder if specified
      assert_equal "/path/to/file.#{time_slice_format}_**",    @i.generate_path_template('/path/to/file.*', timekey, false, nil, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}",       @i.generate_path_template('/path/to/file.*', timekey, true, nil, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}.gz",    @i.generate_path_template('/path/to/file.*', timekey, true, :gzip, time_slice_format: time_slice_format)
    end

    data(
      'day' => [86400 * 2, '%Y%m%d', '%Y-%m-%d'],
      'hour' => [7200, '%Y%m%d%H', '%Y-%m-%d_%H'],
      'minute' => [180, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'],
    )
    test 'generates path with timestamp placeholder for original path with star and suffix with timekey' do |data|
      timekey, placeholder, time_slice_format = data
      # with index placeholder, without compression suffix when append disabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}_**.data",    @i.generate_path_template('/path/to/file.*.data', timekey, false, nil)
      # with index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip)
      # without index placeholder, without compression suffix when append enabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}.data",       @i.generate_path_template('/path/to/file.*.data', timekey, true, nil)
      # without index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}.data.gz",    @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip)

      # time_slice_format will used instead of computed placeholder if specified
      assert_equal "/path/to/file.#{time_slice_format}_**.data",    @i.generate_path_template('/path/to/file.*.data', timekey, false, nil, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}.data",       @i.generate_path_template('/path/to/file.*.data', timekey, true, nil, time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}.data.gz",    @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip, time_slice_format: time_slice_format)
    end

    test 'raise error to show it is a bug when path including * specified without timekey' do
      assert_raise RuntimeError.new("BUG: configuration error must be raised for path including '*' without timekey") do
        @i.generate_path_template('/path/to/file.*.log', nil, false, nil)
      end
    end

    data(
      'day' => [86400 * 7, '%Y%m%d', '%Y-%m-%d'],
      'hour' => [3600 * 6, '%Y%m%d%H', '%Y-%m-%d_%H'],
      'minute' => [60 * 15, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'],
    )
    test 'generates path with timestamp placeholder for original path without time placeholders & star with timekey, and path_suffix configured' do |data|
      timekey, placeholder, time_slice_format = data
      # with index placeholder, without compression suffix when append disabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}_**.log",    @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log')
      # with index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log')
      # without index placeholder, without compression suffix when append enabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}.log",       @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log')
      # without index placeholder, with compression suffix when append enabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}.log.gz",    @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log')

      # time_slice_format will be appended always if it's specified
      assert_equal "/path/to/file.#{time_slice_format}_**.log",    @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log', time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log', time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}.log",       @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log', time_slice_format: time_slice_format)
      assert_equal "/path/to/file.#{time_slice_format}.log.gz",    @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log', time_slice_format: time_slice_format)
    end

    data(
      'day' => [86400, '%Y%m%d'],
      'hour' => [3600, '%Y%m%d%H'],
      'minute' => [60, '%Y%m%d%H%M'],
    )
    test 'generates path with timestamp placeholder for original path without star with timekey, and path_suffix not configured' do |data|
      timekey, placeholder = data
      # with index placeholder, without compression suffix when append disabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}_**",    @i.generate_path_template('/path/to/file', timekey, false, nil)
      # with index placeholder, with .gz suffix when append disabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip)
      # without index placeholder, without compression suffix when append enabled and compression disabled
      assert_equal "/path/to/file.#{placeholder}",       @i.generate_path_template('/path/to/file', timekey, true, nil)
      # without index placeholder, with compression suffix when append enabled and gzip compression enabled
      assert_equal "/path/to/file.#{placeholder}.gz",    @i.generate_path_template('/path/to/file', timekey, true, :gzip)
    end

    test 'generates path without adding timestamp placeholder part if original path has enough placeholders for specified timekey' do
      assert_equal "/path/to/file.%Y%m%d", @i.generate_path_template('/path/to/file.%Y%m%d', 86400, true, nil)
      assert_equal "/path/to/%Y%m%d/file", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil)

      assert_equal "/path/to/%Y%m%d/file_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil)

      assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do
        @i.generate_path_template('/path/to/%Y%m/file', 86400, true, nil)
      end
      assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do
        @i.generate_path_template('/path/to/file.%Y%m%d.log', 3600, true, nil)
      end

      assert_equal "/path/to/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log')
      assert_equal "/path/to/${tag}/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/${tag}/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log')
    end

    test 'generates path with specified time_slice_format appended even if path has sufficient timestamp placeholders' do
      assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H')
      assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H')
      assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log')
      assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log')
      assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log.gz", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, :gzip, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log')
    end

    test 'generates path without timestamp placeholder when path does not include * and timekey not specified' do
      assert_equal '/path/to/file.log', @i.generate_path_template('/path/to/file.log', nil, true, nil)
      assert_equal '/path/to/file.log_**', @i.generate_path_template('/path/to/file.log', nil, false, nil)
      assert_equal '/path/to/file.${tag}.log_**', @i.generate_path_template('/path/to/file.${tag}.log', nil, false, nil)
      assert_equal '/path/to/file.${tag}_**.log', @i.generate_path_template('/path/to/file.${tag}', nil, false, nil, path_suffix: '.log')
    end
  end

  sub_test_case '#find_filepath_available' do
    setup do
      @tmp = File.join(TMP_DIR, 'find_filepath_test')
      FileUtils.mkdir_p @tmp
      @i = create_driver.instance
    end

    teardown do
      FileUtils.rm_rf @tmp
    end

    test 'raise error if argument path does not include index placeholder' do
      assert_raise RuntimeError.new("BUG: index placeholder not found in path: #{@tmp}/myfile") do
        @i.find_filepath_available("#{@tmp}/myfile") do |path|
          # ...
        end
      end
    end

    data(
      'without suffix' => ['myfile_0', 'myfile_**'],
      'with timestamp' => ['myfile_20161003_0', 'myfile_20161003_**'],
      'with base suffix' => ['myfile_0.log', 'myfile_**.log'],
      'with compression suffix' => ['myfile_0.log.gz', 'myfile_**.log.gz'],
    )
    test 'returns filepath with _0 at first' do |data|
      expected, argument = data
      @i.find_filepath_available(File.join(@tmp, argument)) do |path|
        assert_equal File.join(@tmp, expected), path
      end
    end

    test 'returns filepath with index which does not exist yet' do
      5.times do |i|
        Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close
      end
      @i.find_filepath_available(File.join(@tmp, "exist_**.log")) do |path|
        assert_equal File.join(@tmp, "exist_5.log"), path
      end
    end

    test 'creates lock directory when with_lock is true to exclude operations of other worker process' do
      5.times do |i|
        Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a')
      end
      Dir.mkdir(File.join(@tmp, "exist_5.log.lock"))
      @i.find_filepath_available(File.join(@tmp, "exist_**.log"), with_lock: true) do |path|
        assert Dir.exist?(File.join(@tmp, "exist_6.log.lock"))
        assert_equal File.join(@tmp, "exist_6.log"), path
      end
    end
  end
end