# File lib/fluent/plugin/in_tail.rb, line 398 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) if @enable_watch_timer @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher end
# File lib/fluent/plugin/in_tail.rb, line 431 def attach(loop) @timer_trigger.attach(loop) if @enable_watch_timer @stat_trigger.attach(loop) on_notify end
# File lib/fluent/plugin/in_tail.rb, line 442 def close(close_io = true) if close_io && @io_handler @io_handler.on_notify @io_handler.close end detach end
# File lib/fluent/plugin/in_tail.rb, line 437 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end
# File lib/fluent/plugin/in_tail.rb, line 450 def on_notify @rotate_handler.on_notify if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher return unless @io_handler @io_handler.on_notify end
# File lib/fluent/plugin/in_tail.rb, line 457 def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist @log.info log_msg if io stat = io.stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(stat.size) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler.close @io_handler = io_handler elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher @pe.update(inode, io.pos) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler = io_handler else # file is rotated and new file found detach @update_watcher.call(@path, swap_state(@pe)) end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil detach @update_watcher.call(@path, swap_state(@pe)) end end def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end end
# File lib/fluent/plugin/in_tail.rb, line 522 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end
# File lib/fluent/plugin/in_tail.rb, line 423 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end
# File lib/fluent/plugin/in_tail.rb, line 427 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end