Skip to content

Commit

Permalink
change DeadLetterPostOffice api from << to post
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvd committed Oct 22, 2015
1 parent 53956cf commit 9673e45
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 39 deletions.
18 changes: 7 additions & 11 deletions lib/logstash/dead_letter_post_office.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ def self.destination=(destination)
@destination = destination
end

def self.<<(events)
events = [events] unless events.is_a?(Array)

events.each do |event|
logger.warn("dead letter received!", :event => event.to_hash)
event.tag("_dead_letter")
event.cancel
@destination << event
end
def self.post(event)
logger.warn("dead letter received!", :event => event.to_hash)
event.tag("_dead_letter")
event.cancel
@destination.post(event)
end

module Destination

class Base
def location; end
def <<(event); end
def post(event); end
end

class File < Base
Expand All @@ -47,7 +43,7 @@ def location
@path
end

def <<(event)
def post(event)
@file.puts(event.to_json)
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,14 @@ def inflight_count
end

def dump
dump = []
event_dump = []
[@input_to_filter].each do |queue|
until queue.empty? do
event = queue.pop(true) rescue ThreadError # non-block pop
next unless event.is_a?(LogStash::Event)
dump << event
event_dump << event
end
end
dump
event_dump
end
end; end
2 changes: 1 addition & 1 deletion lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def inspect

def dead_letter(event)
return unless event.is_a?(LogStash::Event)
LogStash::DeadLetterPostOffice << event
LogStash::DeadLetterPostOffice.post(event)
end

# Look up a plugin by type and name.
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/shutdown_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def start(cycle=REPORT_CYCLE)
report(REPORTS.last)
if self.class.force_shutdown? && stalled?
logger.fatal("Stalled pipeline detected. Forcefully quitting logstash..")
DeadLetterPostOffice << @pipeline.dump
@pipeline.dump.each {|e| DeadLetterPostOffice.post(e) }
@pipeline.force_exit()
break
end
Expand Down
26 changes: 6 additions & 20 deletions spec/core/dead_letter_post_office_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,30 @@

describe LogStash::DeadLetterPostOffice do

describe ".<<" do
describe ".post" do
subject { LogStash::DeadLetterPostOffice }
let(:filter) { LogStash::Filter::Base.new }
let(:event) { LogStash::Event.new("message" => "test") }
let(:destination) { LogStash::DeadLetterPostOffice::Destination::Base.new }

before :each do
subject.destination = destination
allow(destination).to receive(:<<) {|event| }
allow(destination).to receive(:post) {|event| }
end

it "should send event to destination" do
expect(destination).to receive(:<<).with(event)
subject << event
expect(destination).to receive(:post).with(event)
subject.post(event)
end

it "should tag the event with \"_dead_letter\"" do
subject << event
subject.post(event)
expect(event["tags"]).to include("_dead_letter")
end

it "should cancel the event" do
subject << event
subject.post(event)
expect(event).to be_cancelled
end

context "array of events" do
let(:event1) { LogStash::Event.new("message" => "test1") }
let(:event2) { LogStash::Event.new("message" => "test2") }
let(:event3) { LogStash::Event.new("message" => "test3") }
let(:events) { [event1, event2, event3] }

it "should push each event to the destination" do
expect(destination).to receive(:<<).with(event1)
expect(destination).to receive(:<<).with(event2)
expect(destination).to receive(:<<).with(event3)
subject << events
end
end
end
end
8 changes: 5 additions & 3 deletions spec/core/shutdown_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

before :each do
LogStash::ShutdownController::REPORTS.clear
allow(LogStash::DeadLetterPostOffice).to receive(:<<)
allow(pipeline).to receive(:dump)
allow(LogStash::DeadLetterPostOffice).to receive(:post)
allow(pipeline).to receive(:dump) { [] }
allow(pipeline).to receive(:force_exit)
allow(pipeline).to receive(:inflight_count) do
subject.stop! if return_values.empty?
Expand Down Expand Up @@ -62,7 +62,9 @@
end

it "should post pipeline contents to DeadLetterPostOffice" do
expect(LogStash::DeadLetterPostOffice).to receive(:<<).once
stalled_events = [LogStash::Event.new("message" => "test")]*2
allow(pipeline).to receive(:dump) { stalled_events }
expect(LogStash::DeadLetterPostOffice).to receive(:post).twice
subject.start(0).join
end
end
Expand Down

0 comments on commit 9673e45

Please sign in to comment.