Skip to content

Commit

Permalink
Stuff working with else events stack
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Nov 11, 2015
1 parent ee6745b commit 835ffa7
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 13 deletions.
1 change: 1 addition & 0 deletions .rbenv-gemsets
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-global
1 change: 1 addition & 0 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def execute

# TODO(sissel): Get pipeline completion status.
pipeline.run

return 0
rescue LogStash::ConfigurationError => e
@logger.unsubscribe(stdout_logs) if show_startup_errors
Expand Down
52 changes: 43 additions & 9 deletions lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def compile

definitions << "def #{type}_func(events)"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :events => LogStash::Json.dump(events))"
definitions << " else_events = []" # This always starts empty, needed later
definitions << " events_stack = []"
definitions << " else_events_stack = []"

sections.select { |s| s.plugin_type.text_value == type }.each do |s|
definitions << s.compile.split("\n", -1).map { |e| " #{e}" }
Expand Down Expand Up @@ -403,34 +406,65 @@ def cond_func_#{i}(input_events)
CODE
else
<<-CODE
#{super}
end
# BRANCH START
events_stack.push(events)
else_events_stack.push(else_events)
prev_events = events
prev_else_events = else_events
events = []
else_events = []
#{super}
else_events = else_events_stack.pop
events = events_stack.pop
CODE
end
end
end

class BranchEntry < Node; end

def if_style_condition
yield
end

class If < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "if #{condition.compile} # if #{condition.text_value_for_comments}\n" \
<< children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"

code = "\n#IF #{condition.text_value_for_comments} # #{self.object_id}\n"
code << "prev_events.each {|event| (#{condition.compile}) ? events << event : else_events << event }\n"
#code << "@logger.debug? && @logger.debug('IF Condition #{condition.compile}', :prev_events => prev_events)\n"
code << children.collect(&:compile).map { |s| s.split("\n", -1).map {|l| l }.join("\n") }.join("") << "\n"
code << "\n# ENDIF #{condition.text_value_for_comments} # #{self.object_id}\n"
code
end
end
class Elsif < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "elsif #{condition.compile} # else if #{condition.text_value_for_comments}\n" \
<< children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"

code = "\n #ELSIF #{condition.text_value_for_comments} # #{self.object_id}\n"
code << "events = []\n"
code << "prev_else_events = else_events\n"
code << "else_events = []\n"
code << "prev_else_events.each {|event| (#{condition.compile}) ? events << event : else_events << event }\n"
code << children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| l }.join("\n") }.join("") << "\n"
code << "\n# ENDELSIF #{condition.text_value_for_comments} # #{self.object_id}\n"
code
end
end
class Else < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "else\n" \
<< children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"

code = "\n#ELSE# #{self.object_id}\n"
code << "events = else_events\n"
code << children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"
code << "#ENDELSE # #{self.object_id}"
code
end
end

Expand All @@ -457,7 +491,7 @@ module ComparisonExpression; end
module InExpression
def compile
item, list = recursive_select(LogStash::Config::AST::RValue)
return "(x = #{list.compile}; x.respond_to?(:include?) && x.include?(#{item.compile}))"
return "(#{list.compile}; x.respond_to?(:include?) && x.include?(#{item.compile}))"
end
end

Expand Down
13 changes: 9 additions & 4 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def filters?
end

def run
LogStash::Util.set_thread_name(">lsipeline")
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))

start_workers
Expand All @@ -89,21 +90,24 @@ def run
@logger.terminal("Logstash startup completed")

@logger.info("Will run till input threads stopped")
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(self)
wait_inputs
@logger.info("Inputs stopped")

puts "WORKER SHUT"
shutdown_workers

puts "WORKER WAITTT"
@worker_threads.each do |t|
puts "WORKER WAIT #{t}"
@logger.debug("Shutdown waiting for worker thread #{t}")
t.join
end

@filters.each(&:do_close)
@outputs.each(&:do_close)

dump_inflight("/tmp/ls_current_batches_post_close")

@logger.info("Pipeline shutdown complete.")
@logger.terminal("Logstash shutdown completed")

Expand Down Expand Up @@ -163,7 +167,8 @@ def dump_inflight(file_path)
end

def shutdown_workers
dump_inflight("/tmp/ls_current_batches")
puts "DUMPIN"
puts "DUMPINDONE"
# Each worker will receive this exactly once!
@worker_threads.each do
@logger.debug("Pushing shutdown")
Expand Down Expand Up @@ -242,7 +247,7 @@ def inputworker(plugin)
begin
plugin.run(@input_queue)
rescue => e
# if plugin is stopping, ignore uncatched exceptions and exit worker
# if plugin is stop
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
:plugin => plugin.class.config_name, :exception => e,
Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/util/reporter.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# encoding: utf-8
Thread.abort_on_exception = true

class InflightEventsReporter
def self.logger=(logger)
@logger = logger
end

def self.start(pipeline)
return 1
Thread.new do
loop do
sleep 5
Expand Down
31 changes: 31 additions & 0 deletions spec/core/conditionals_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# encoding: utf-8
require 'spec_helper'
Thread.abort_on_exception = true


module ConditionalFanciness
def description
Expand All @@ -22,6 +24,35 @@ def conditional(expression, &block)
end
end

describe "something" do
extend ConditionalFanciness

it "should do something" do
expect(true).to eql(true)
end

puts "DECL CONFIG"

config <<-CONFIG
input {
generator {
message => '{"foo":{"bar"},"baz": "quux"}'
count => 1
}
}
output {
if [foo] == "bar" {
stdout { }
}
}
CONFIG

puts "DECL AGENT"
agent do
"fooey"
end
end

describe "conditionals in output" do
extend ConditionalFanciness

Expand Down

0 comments on commit 835ffa7

Please sign in to comment.