From d39c532441c20c367f2ba89de8dd465a4b01b8ea Mon Sep 17 00:00:00 2001
From: Masahiro Nakagawa <repeatedly@gmail.com>
Date: Thu, 11 Jun 2015 04:54:47 +0900
Subject: [PATCH] Add chunk format check in in_forward

---
 lib/fluent/plugin/in_forward.rb |  7 ++++++-
 test/plugin/test_in_forward.rb  | 20 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb
index 3c44a93ceb..2bfcee690d 100644
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -125,7 +125,12 @@ def on_message(msg, chunk_size, source)
         return
       end
 
-      # TODO format error
+      # TODO: raise an exception if broken chunk is generated by recoverable situation
+      unless msg.is_a?(Array)
+        log.warn "incoming chunk is broken:", source: source, msg: msg
+        return
+      end
+
       tag = msg[0].to_s
       entries = msg[1]
 
diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb
index 18c0c1139b..329725f19e 100644
--- a/test/plugin/test_in_forward.rb
+++ b/test/plugin/test_in_forward.rb
@@ -200,6 +200,26 @@ def test_send_large_chunk_limit
     }.size == 1, "large chunk warning is not logged"
   end
 
+  data('string chunk' => 'broken string',
+       'integer chunk' => 10)
+  def test_send_broken_chunk(data)
+    d = create_driver
+
+    # d.run => send_data
+    d.run do
+      d.instance.send(:on_message, data, 1000000000, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
+    end
+
+    # check emitted data
+    emits = d.emits
+    assert_equal 0, emits.size
+
+    # check log
+    assert d.instance.log.logs.select{|line|
+      line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/
+    }.size == 1, "should not accept broken chunk"
+  end
+
   def test_respond_to_message_requiring_ack
     d = create_driver