Skip to content

Commit

Permalink
in_monitor_agent emits plugins info to use exisiting plugins. fix #667
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Sep 16, 2015
1 parent c529038 commit 76b2cc4
Showing 1 changed file with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def initialize

config_param :bind, :string, :default => '0.0.0.0'
config_param :port, :integer, :default => 24220
config_param :tag, :string, :default => nil
config_param :emit_interval, :time, :default => 60

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -199,6 +201,36 @@ def process(req, res)
end
end

class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log

# Avoid long shutdown time
@num_call = 0
if interval >= 10
min_interval = 10
@call_interval = interval / 10
else
min_interval = interval
@call_interval = 0
end

super(min_interval, true)
end

def on_timer
@num_call += 1
if @num_call >= @call_interval
@num_call = 0
@callback.call
end
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

def start
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
Expand All @@ -214,6 +246,29 @@ def start
@thread = Thread.new {
@srv.start
}
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

@loop = Coolio::Loop.new
opts = {:with_config => false}
timer = TimerWatcher.new(@emit_interval, log) {
es = MultiEventStream.new
now = Engine.now
plugins_info_all(opts).each { |record|
es.add(now, record)
}
router.emit_stream(@tag, es)
}
@loop.attach(timer)
@thread_for_emit = Thread.new(&method(:run))
end
end

def run
@loop.run
rescue => e
log.error "unexpected error", :error => e.to_s
log.error_backtrace
end

def shutdown
Expand All @@ -225,6 +280,13 @@ def shutdown
@thread.join
@thread = nil
end
if @tag
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop = nil
@thread_for_emit.join
@thread_for_emit = nil
end
end

MONITOR_INFO = {
Expand Down Expand Up @@ -320,7 +382,7 @@ def get_monitor_info(pe, opts={})
obj['plugin_id'] = pe.plugin_id
obj['plugin_category'] = plugin_category(pe)
obj['type'] = pe.config['@type'] || pe.config['type']
obj['config'] = pe.config
obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config]

# run MONITOR_INFO in plugins' instance context and store the info to obj
MONITOR_INFO.each_pair {|key,code|
Expand Down

0 comments on commit 76b2cc4

Please sign in to comment.