From 33cbc672b4112d5c67f2c815f04196051ea79989 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 14:32:25 +0900 Subject: [PATCH 1/7] in_http: Merge single and batch codes into 1 if Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 93 ++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 51 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 7916825f3e..5a9fa7bbde 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -185,51 +185,9 @@ def on_request(path_info, params) end end - unless record.is_a?(Array) - if @add_http_headers - params.each_pair { |k,v| - if k.start_with?("HTTP_") - record[k] = v - end - } - end - if @add_remote_addr - record['REMOTE_ADDR'] = params['REMOTE_ADDR'] - end - end - time = if param_time = params['time'] - param_time = param_time.to_f - param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - else - if record_time.nil? - if !record.is_a?(Array) - if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) - if @default_time_parser - @default_time_parser.parse(t) - else - Fluent::EventTime.from_time(Time.at(t)) - end - else - Fluent::EventTime.now - end - else - Fluent::EventTime.now - end - else - record_time - end - end - rescue => e - if @dump_error_log - log.error "failed to process request", error: e - end - return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] - end - - mes = nil - # Support batched requests - if record.is_a?(Array) - begin + mes = nil + # Support batched requests + if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers @@ -243,7 +201,10 @@ def on_request(path_info, params) single_record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end - if @parser + if param_time = params['time'] + param_time = param_time.to_f + single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) + elsif @parser single_time = @parser.parse_time(single_record) single_time, single_record = @parser.convert_values(single_time, single_record) else @@ -254,18 +215,48 @@ def on_request(path_info, params) Fluent::EventTime.from_time(Time.at(t)) end else - time + Fluent::EventTime.now end end mes.add(single_time, single_record) end - rescue => e - if @dump_error_log - log.error "failed to process batch request", error: e + else + if @add_http_headers + params.each_pair { |k,v| + if k.start_with?("HTTP_") + record[k] = v + end + } end - return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] + if @add_remote_addr + record['REMOTE_ADDR'] = params['REMOTE_ADDR'] + end + + time = if param_time = params['time'] + param_time = param_time.to_f + param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) + else + if record_time.nil? + if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) + if @default_time_parser + @default_time_parser.parse(t) + else + Fluent::EventTime.from_time(Time.at(t)) + end + else + Fluent::EventTime.now + end + else + record_time + end + end end + rescue => e + if @dump_error_log + log.error "failed to process request", error: e + end + return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] end # TODO server error From 2af0d8f0f007f5106537b594035c3aff35c89874 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 14:36:51 +0900 Subject: [PATCH 2/7] in_http: extract add headers routine to method Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 5a9fa7bbde..58f177903a 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -190,16 +190,7 @@ def on_request(path_info, params) if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| - if @add_http_headers - params.each_pair { |k,v| - if k.start_with?("HTTP_") - single_record[k] = v - end - } - end - if @add_remote_addr - single_record['REMOTE_ADDR'] = params['REMOTE_ADDR'] - end + add_params_to_record(single_record, params) if param_time = params['time'] param_time = param_time.to_f @@ -222,16 +213,7 @@ def on_request(path_info, params) mes.add(single_time, single_record) end else - if @add_http_headers - params.each_pair { |k,v| - if k.start_with?("HTTP_") - record[k] = v - end - } - end - if @add_remote_addr - record['REMOTE_ADDR'] = params['REMOTE_ADDR'] - end + add_params_to_record(record, params) time = if param_time = params['time'] param_time = param_time.to_f @@ -327,6 +309,20 @@ def parse_params_with_parser(params) end end + def add_params_to_record(record, params) + if @add_http_headers + params.each_pair { |k, v| + if k.start_with?("HTTP_".freeze) + record[k] = v + end + } + end + + if @add_remote_addr + record['REMOTE_ADDR'] = params['REMOTE_ADDR'] + end + end + class Handler attr_reader :content_type From f4773f88532d71cbb11beeed031d92c4f7604b42 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 16:21:52 +0900 Subject: [PATCH 3/7] in_http: use constant for fixed values Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 58f177903a..0db4d86a69 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -165,6 +165,13 @@ def close super end + RES_TEXT_HEADER = {'Content-Type' => 'text/plain'}.freeze + RESPONSE_200 = ["200 OK".freeze, RES_TEXT_HEADER, "".freeze].freeze + RESPONSE_204 = ["204 No Content".freeze, {}.freeze].freeze + RESPONSE_IMG = ["200 OK".freeze, {'Content-Type'=>'image/gif; charset=utf-8'}.freeze, EMPTY_GIF_IMAGE].freeze + RES_400_STATUS = "400 Bad Request".freeze + RES_500_STATUS = "500 Internal Server Error".freeze + def on_request(path_info, params) begin path = path_info[1..-1] # remove / @@ -175,12 +182,12 @@ def on_request(path_info, params) if record.nil? log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } if @respond_with_empty_img - return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] + return RESPONSE_IMG else if @use_204_response - return ["204 No Content", {}] + return RESPONSE_204 else - return ["200 OK", {'Content-Type'=>'text/plain'}, ""] + return RESPONSE_200 end end end @@ -238,7 +245,7 @@ def on_request(path_info, params) if @dump_error_log log.error "failed to process request", error: e end - return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] + return [RES_400_STATUS, RES_TEXT_HEADER, "400 Bad Request\n#{e}\n"] end # TODO server error @@ -252,16 +259,16 @@ def on_request(path_info, params) if @dump_error_log log.error "failed to emit data", error: e end - return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{e}\n"] + return [RES_500_STATUS, RES_TEXT_HEADER, "500 Internal Server Error\n#{e}\n"] end if @respond_with_empty_img - return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] + return RESPONSE_IMG else if @use_204_response - return ["204 No Content", {}] + return RESPONSE_204 else - return ["200 OK", {'Content-Type'=>'text/plain'}, ""] + return RESPONSE_200 end end end @@ -515,6 +522,7 @@ def on_message_complete code, header, body = *@callback.call(path_info, params) body = body.to_s + header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') From b88304237473ee047dc696a247629d8ca0a576ec Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 16:36:41 +0900 Subject: [PATCH 4/7] in_http: No need '*' for array assignment Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 0db4d86a69..fef51b6f99 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -520,7 +520,7 @@ def on_message_complete params.merge!(@env) @env.clear - code, header, body = *@callback.call(path_info, params) + code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? From 08e2d20a6bf7e1281feaf2d2795a6fd097e65fd6 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 16:43:42 +0900 Subject: [PATCH 5/7] in_http: Use constant and .freeze for fixed values Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index fef51b6f99..0980723c7d 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -412,7 +412,7 @@ def on_headers_complete(headers) end } if expect - if expect == '100-continue' + if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else @@ -434,17 +434,20 @@ def on_body(chunk) @body << chunk end + RES_200_STATUS = "200 OK".freeze + RES_403_STATUS = "403 Forbidden".freeze + # Web browsers can send an OPTIONS request before performing POST # to check if cross-origin requests are supported. def handle_options_request # Is CORS enabled in the first place? if @cors_allow_origins.nil? - return send_response_and_close("403 Forbidden", {}, "") + return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' - return send_response_and_close("403 Forbidden", {}, "") + return send_response_and_close(RES_403_STATUS, {}, "") end header = { @@ -455,19 +458,19 @@ def handle_options_request # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" - send_response_and_close("200 OK", header, "") + send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin - send_response_and_close("200 OK", header, "") + send_response_and_close(RES_200_STATUS, header, "") else - send_response_and_close("403 Forbidden", {}, "") + send_response_and_close(RES_403_STATUS, {}, "") end end def on_message_complete return if closing? - if @parser.http_method == 'OPTIONS' + if @parser.http_method == 'OPTIONS'.freeze return handle_options_request() end @@ -477,7 +480,7 @@ def on_message_complete # restrictions and white listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') or include_cors_allow_origin - send_response_and_close("403 Forbidden", {'Connection' => 'close'}, "") + send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end @@ -487,14 +490,14 @@ def on_message_complete # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin - if @content_encoding == 'gzip' + if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read - elsif @content_encoding == 'deflate' + elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s - send_response_and_close("400 Bad Request", {}, "") + send_response_and_close(RES_400_STATUS, {}, "") return end @@ -533,7 +536,7 @@ def on_message_complete end if @keep_alive - header['Connection'] = 'Keep-Alive' + header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) @@ -559,13 +562,13 @@ def closing? def send_response(code, header, body) header['Content-Length'] ||= body.bytesize - header['Content-Type'] ||= 'text/plain' + header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } - data << "\r\n" + data << "\r\n".freeze @io.write(data) @io.write(body) @@ -576,7 +579,7 @@ def send_response_nobody(code, header) header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } - data << "\r\n" + data << "\r\n".freeze @io.write(data) end From ddddb3e856d408a6015a118f40c9efcff33c6a26 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 23:28:50 +0900 Subject: [PATCH 6/7] in_http: define initialize to list variables Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 0980723c7d..61c8bb5162 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -87,12 +87,29 @@ class HttpInput < Input EVENT_RECORD_PARAMETER = '_event_record' + def initialize + super + + @km = nil + @format_name = nil + @parser_time_key = nil + + # default parsers + @parser_msgpack = nil + @parser_json = nil + @default_time_parser = nil + @default_keep_time_key = nil + @float_time_parser = nil + + # configured parser + @custom_parser = nil + end + def configure(conf) compat_parameters_convert(conf, :parser) super - @parser = nil m = if @parser_configs.first['@type'] == 'in_http' @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') @parser_msgpack.time_key = nil @@ -108,9 +125,9 @@ def configure(conf) @default_keep_time_key = default_parser.keep_time_key method(:parse_params_default) else - @parser = parser_create + @custom_parser = parser_create @format_name = @parser_configs.first['@type'] - @parser_time_key = @parser.time_key + @parser_time_key = @custom_parser.time_key method(:parse_params_with_parser) end self.singleton_class.module_eval do @@ -202,9 +219,9 @@ def on_request(path_info, params) if param_time = params['time'] param_time = param_time.to_f single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - elsif @parser - single_time = @parser.parse_time(single_record) - single_time, single_record = @parser.convert_values(single_time, single_record) + elsif @custom_parser + single_time = @custom_parser.parse_time(single_record) + single_time, single_record = @custom_parser.convert_values(single_time, single_record) else single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key) if @default_time_parser @@ -307,7 +324,7 @@ def parse_params_default(params) def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] - @parser.parse(content) { |time, record| + @custom_parser.parse(content) { |time, record| raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } From 82545cc67b975edc4edbd6e422532ed329c653bf Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 23:39:56 +0900 Subject: [PATCH 7/7] in_http: use method for time conversion Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 61c8bb5162..8385587211 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -223,15 +223,7 @@ def on_request(path_info, params) single_time = @custom_parser.parse_time(single_record) single_time, single_record = @custom_parser.convert_values(single_time, single_record) else - single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key) - if @default_time_parser - @default_time_parser.parse(t) - else - Fluent::EventTime.from_time(Time.at(t)) - end - else - Fluent::EventTime.now - end + single_time = convert_time_field(single_record) end mes.add(single_time, single_record) @@ -244,15 +236,7 @@ def on_request(path_info, params) param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else if record_time.nil? - if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) - if @default_time_parser - @default_time_parser.parse(t) - else - Fluent::EventTime.from_time(Time.at(t)) - end - else - Fluent::EventTime.now - end + convert_time_field(record) else record_time end @@ -347,6 +331,18 @@ def add_params_to_record(record, params) end end + def convert_time_field(record) + if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) + if @default_time_parser + @default_time_parser.parse(t) + else + Fluent::EventTime.from_time(Time.at(t)) + end + else + Fluent::EventTime.now + end + end + class Handler attr_reader :content_type