diff --git a/lib/logstash/config/config_ast.rb b/lib/logstash/config/config_ast.rb index 3f668750485..c957bd5d198 100644 --- a/lib/logstash/config/config_ast.rb +++ b/lib/logstash/config/config_ast.rb @@ -111,7 +111,6 @@ def compile definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :events => LogStash::Json.dump(events))" definitions << " else_events = []" # This always starts empty, needed later definitions << " events_stack = []" - definitions << " else_events_stack = []" sections.select { |s| s.plugin_type.text_value == type }.each do |s| definitions << s.compile.split("\n", -1).map { |e| " #{e}" } @@ -237,6 +236,8 @@ def compile when "filter" return <<-CODE events = #{variable_name}.multi_filter(events) + output_events.concat(events) + events CODE when "output" return "#{variable_name}.multi_handle(events)\n" @@ -375,6 +376,20 @@ def compile class BranchOrPlugin < Node; end class Branch < Node + def cond_wrapper + <<-CODE +# BRANCH START +events_stack.push(events) +prev_events = events +prev_else_events = else_events +events = [] +else_events = [] +#{yield} +events = events_stack.pop + CODE + + end + def compile # this construct is non obvious. we need to loop through each event and apply the conditional. @@ -388,15 +403,12 @@ def compile if type == "filter" i = LogStash::Config::AST.defered_conditionals_index += 1 source = <<-CODE - def cond_func_#{i}(input_events) - result = [] - input_events.each do |event| - events = [event] - #{super} - end - result += events - end - result + def cond_func_#{i}(events) + else_events = [] + events_stack = [] + output_events = [] + #{cond_wrapper { super }} + output_events end CODE LogStash::Config::AST.defered_conditionals << source @@ -405,21 +417,9 @@ def cond_func_#{i}(input_events) events = cond_func_#{i}(events) CODE else - <<-CODE - -# BRANCH START -events_stack.push(events) -else_events_stack.push(else_events) -prev_events = events -prev_else_events = else_events -events = [] -else_events = [] - -#{super} - -else_events = else_events_stack.pop -events = events_stack.pop - CODE + cond_wrapper do + super + end end end end @@ -434,7 +434,7 @@ class If < BranchEntry def compile children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) } - code = "\n#IF #{condition.text_value_for_comments} # #{self.object_id}\n" + code = "#IF #{condition.text_value_for_comments} # #{self.object_id}\n" code << "prev_events.each {|event| (#{condition.compile}) ? events << event : else_events << event }\n" #code << "@logger.debug? && @logger.debug('IF Condition #{condition.compile}', :prev_events => prev_events)\n" code << children.collect(&:compile).map { |s| s.split("\n", -1).map {|l| l }.join("\n") }.join("") << "\n" diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index 43d9477c853..4ccaeb430ff 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -167,8 +167,6 @@ def dump_inflight(file_path) end def shutdown_workers - puts "DUMPIN" - puts "DUMPINDONE" # Each worker will receive this exactly once! @worker_threads.each do @logger.debug("Pushing shutdown")