From 9673e45b9fb33c265c1bc97db9191274176a9724 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Thu, 22 Oct 2015 12:37:43 +0100 Subject: [PATCH] change DeadLetterPostOffice api from << to post --- lib/logstash/dead_letter_post_office.rb | 18 ++++++---------- lib/logstash/pipeline.rb | 6 +++--- lib/logstash/plugin.rb | 2 +- lib/logstash/shutdown_controller.rb | 2 +- spec/core/dead_letter_post_office_spec.rb | 26 ++++++----------------- spec/core/shutdown_controller_spec.rb | 8 ++++--- 6 files changed, 23 insertions(+), 39 deletions(-) diff --git a/lib/logstash/dead_letter_post_office.rb b/lib/logstash/dead_letter_post_office.rb index 6b50a3dfe5d..f28628bd147 100644 --- a/lib/logstash/dead_letter_post_office.rb +++ b/lib/logstash/dead_letter_post_office.rb @@ -15,22 +15,18 @@ def self.destination=(destination) @destination = destination end - def self.<<(events) - events = [events] unless events.is_a?(Array) - - events.each do |event| - logger.warn("dead letter received!", :event => event.to_hash) - event.tag("_dead_letter") - event.cancel - @destination << event - end + def self.post(event) + logger.warn("dead letter received!", :event => event.to_hash) + event.tag("_dead_letter") + event.cancel + @destination.post(event) end module Destination class Base def location; end - def <<(event); end + def post(event); end end class File < Base @@ -47,7 +43,7 @@ def location @path end - def <<(event) + def post(event) @file.puts(event.to_json) end end diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index c8db2810f50..397e7ecef9d 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -336,14 +336,14 @@ def inflight_count end def dump - dump = [] + event_dump = [] [@input_to_filter].each do |queue| until queue.empty? do event = queue.pop(true) rescue ThreadError # non-block pop next unless event.is_a?(LogStash::Event) - dump << event + event_dump << event end end - dump + event_dump end end; end diff --git a/lib/logstash/plugin.rb b/lib/logstash/plugin.rb index fb61e0fa3fc..e26192349b5 100644 --- a/lib/logstash/plugin.rb +++ b/lib/logstash/plugin.rb @@ -61,7 +61,7 @@ def inspect def dead_letter(event) return unless event.is_a?(LogStash::Event) - LogStash::DeadLetterPostOffice << event + LogStash::DeadLetterPostOffice.post(event) end # Look up a plugin by type and name. diff --git a/lib/logstash/shutdown_controller.rb b/lib/logstash/shutdown_controller.rb index 18b01d4c1a0..cf1a921b849 100644 --- a/lib/logstash/shutdown_controller.rb +++ b/lib/logstash/shutdown_controller.rb @@ -34,7 +34,7 @@ def start(cycle=REPORT_CYCLE) report(REPORTS.last) if self.class.force_shutdown? && stalled? logger.fatal("Stalled pipeline detected. Forcefully quitting logstash..") - DeadLetterPostOffice << @pipeline.dump + @pipeline.dump.each {|e| DeadLetterPostOffice.post(e) } @pipeline.force_exit() break end diff --git a/spec/core/dead_letter_post_office_spec.rb b/spec/core/dead_letter_post_office_spec.rb index 3c42b57e569..5da5115c102 100644 --- a/spec/core/dead_letter_post_office_spec.rb +++ b/spec/core/dead_letter_post_office_spec.rb @@ -3,7 +3,7 @@ describe LogStash::DeadLetterPostOffice do - describe ".<<" do + describe ".post" do subject { LogStash::DeadLetterPostOffice } let(:filter) { LogStash::Filter::Base.new } let(:event) { LogStash::Event.new("message" => "test") } @@ -11,36 +11,22 @@ before :each do subject.destination = destination - allow(destination).to receive(:<<) {|event| } + allow(destination).to receive(:post) {|event| } end it "should send event to destination" do - expect(destination).to receive(:<<).with(event) - subject << event + expect(destination).to receive(:post).with(event) + subject.post(event) end it "should tag the event with \"_dead_letter\"" do - subject << event + subject.post(event) expect(event["tags"]).to include("_dead_letter") end it "should cancel the event" do - subject << event + subject.post(event) expect(event).to be_cancelled end - - context "array of events" do - let(:event1) { LogStash::Event.new("message" => "test1") } - let(:event2) { LogStash::Event.new("message" => "test2") } - let(:event3) { LogStash::Event.new("message" => "test3") } - let(:events) { [event1, event2, event3] } - - it "should push each event to the destination" do - expect(destination).to receive(:<<).with(event1) - expect(destination).to receive(:<<).with(event2) - expect(destination).to receive(:<<).with(event3) - subject << events - end - end end end diff --git a/spec/core/shutdown_controller_spec.rb b/spec/core/shutdown_controller_spec.rb index 579a57c6789..4cc1ca21dfc 100644 --- a/spec/core/shutdown_controller_spec.rb +++ b/spec/core/shutdown_controller_spec.rb @@ -10,8 +10,8 @@ before :each do LogStash::ShutdownController::REPORTS.clear - allow(LogStash::DeadLetterPostOffice).to receive(:<<) - allow(pipeline).to receive(:dump) + allow(LogStash::DeadLetterPostOffice).to receive(:post) + allow(pipeline).to receive(:dump) { [] } allow(pipeline).to receive(:force_exit) allow(pipeline).to receive(:inflight_count) do subject.stop! if return_values.empty? @@ -62,7 +62,9 @@ end it "should post pipeline contents to DeadLetterPostOffice" do - expect(LogStash::DeadLetterPostOffice).to receive(:<<).once + stalled_events = [LogStash::Event.new("message" => "test")]*2 + allow(pipeline).to receive(:dump) { stalled_events } + expect(LogStash::DeadLetterPostOffice).to receive(:post).twice subject.start(0).join end end