Agent is a resource unit who manages emittable plugins
Next step: `fluentd/root_agent.rb` Next step: `fluentd/label.rb`
# File lib/fluent/agent.rb, line 32 def initialize(opts = {}) super() @context = nil @outputs = [] @filters = [] @started_outputs = [] @started_filters = [] @log = Engine.log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil end
# File lib/fluent/agent.rb, line 140 def add_filter(type, pattern, conf) log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.router = @event_router filter.configure(conf) @filters << filter @event_router.add_rule(pattern, filter) filter end
# File lib/fluent/agent.rb, line 128 def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.router = @event_router output.configure(conf) @outputs << output @event_router.add_rule(pattern, output) output end
# File lib/fluent/agent.rb, line 53 def configure(conf) super # initialize <match> and <filter> elements conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e| pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] || e['type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type if e.name == 'filter' add_filter(type, pattern, e) else add_match(type, pattern, e) end } end
For handling invalid record
# File lib/fluent/agent.rb, line 153 def emit_error_event(tag, time, record, error) end
# File lib/fluent/agent.rb, line 109 def flush! flush_recursive(@outputs) end
# File lib/fluent/agent.rb, line 113 def flush_recursive(array) array.each { |o| begin if o.is_a?(BufferedOutput) o.force_flush elsif o.is_a?(MultiOutput) flush_recursive(o.outputs) end rescue => e log.debug "error while force flushing", error_class: e.class, error: e log.debug_backtrace end } end
# File lib/fluent/agent.rb, line 156 def handle_emits_error(tag, es, error) end
# File lib/fluent/agent.rb, line 81 def shutdown @started_filters.map { |f| Thread.new do begin log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id f.shutdown rescue => e log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } # Output plugin as filter emits records at shutdown so emit problem still exist. # This problem will be resolved after actual filter mechanizm. @started_outputs.map { |o| Thread.new do begin log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id o.shutdown rescue => e log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } end
# File lib/fluent/agent.rb, line 69 def start @outputs.each { |o| o.start @started_outputs << o } @filters.each { |f| f.start @started_filters << f } end