diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 7916825f3e..8385587211 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -87,12 +87,29 @@ class HttpInput < Input EVENT_RECORD_PARAMETER = '_event_record' + def initialize + super + + @km = nil + @format_name = nil + @parser_time_key = nil + + # default parsers + @parser_msgpack = nil + @parser_json = nil + @default_time_parser = nil + @default_keep_time_key = nil + @float_time_parser = nil + + # configured parser + @custom_parser = nil + end + def configure(conf) compat_parameters_convert(conf, :parser) super - @parser = nil m = if @parser_configs.first['@type'] == 'in_http' @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') @parser_msgpack.time_key = nil @@ -108,9 +125,9 @@ def configure(conf) @default_keep_time_key = default_parser.keep_time_key method(:parse_params_default) else - @parser = parser_create + @custom_parser = parser_create @format_name = @parser_configs.first['@type'] - @parser_time_key = @parser.time_key + @parser_time_key = @custom_parser.time_key method(:parse_params_with_parser) end self.singleton_class.module_eval do @@ -165,6 +182,13 @@ def close super end + RES_TEXT_HEADER = {'Content-Type' => 'text/plain'}.freeze + RESPONSE_200 = ["200 OK".freeze, RES_TEXT_HEADER, "".freeze].freeze + RESPONSE_204 = ["204 No Content".freeze, {}.freeze].freeze + RESPONSE_IMG = ["200 OK".freeze, {'Content-Type'=>'image/gif; charset=utf-8'}.freeze, EMPTY_GIF_IMAGE].freeze + RES_400_STATUS = "400 Bad Request".freeze + RES_500_STATUS = "500 Internal Server Error".freeze + def on_request(path_info, params) begin path = path_info[1..-1] # remove / @@ -175,97 +199,54 @@ def on_request(path_info, params) if record.nil? log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } if @respond_with_empty_img - return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] + return RESPONSE_IMG else if @use_204_response - return ["204 No Content", {}] + return RESPONSE_204 else - return ["200 OK", {'Content-Type'=>'text/plain'}, ""] + return RESPONSE_200 end end end - unless record.is_a?(Array) - if @add_http_headers - params.each_pair { |k,v| - if k.start_with?("HTTP_") - record[k] = v - end - } - end - if @add_remote_addr - record['REMOTE_ADDR'] = params['REMOTE_ADDR'] - end - end - time = if param_time = params['time'] - param_time = param_time.to_f - param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - else - if record_time.nil? - if !record.is_a?(Array) - if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) - if @default_time_parser - @default_time_parser.parse(t) - else - Fluent::EventTime.from_time(Time.at(t)) - end - else - Fluent::EventTime.now - end - else - Fluent::EventTime.now - end - else - record_time - end - end - rescue => e - if @dump_error_log - log.error "failed to process request", error: e - end - return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] - end - - mes = nil - # Support batched requests - if record.is_a?(Array) - begin + mes = nil + # Support batched requests + if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| - if @add_http_headers - params.each_pair { |k,v| - if k.start_with?("HTTP_") - single_record[k] = v - end - } - end - if @add_remote_addr - single_record['REMOTE_ADDR'] = params['REMOTE_ADDR'] - end - - if @parser - single_time = @parser.parse_time(single_record) - single_time, single_record = @parser.convert_values(single_time, single_record) + add_params_to_record(single_record, params) + + if param_time = params['time'] + param_time = param_time.to_f + single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) + elsif @custom_parser + single_time = @custom_parser.parse_time(single_record) + single_time, single_record = @custom_parser.convert_values(single_time, single_record) else - single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key) - if @default_time_parser - @default_time_parser.parse(t) - else - Fluent::EventTime.from_time(Time.at(t)) - end - else - time - end + single_time = convert_time_field(single_record) end mes.add(single_time, single_record) end - rescue => e - if @dump_error_log - log.error "failed to process batch request", error: e - end - return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] + else + add_params_to_record(record, params) + + time = if param_time = params['time'] + param_time = param_time.to_f + param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) + else + if record_time.nil? + convert_time_field(record) + else + record_time + end + end end + rescue => e + if @dump_error_log + log.error "failed to process request", error: e + end + return [RES_400_STATUS, RES_TEXT_HEADER, "400 Bad Request\n#{e}\n"] end # TODO server error @@ -279,16 +260,16 @@ def on_request(path_info, params) if @dump_error_log log.error "failed to emit data", error: e end - return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{e}\n"] + return [RES_500_STATUS, RES_TEXT_HEADER, "500 Internal Server Error\n#{e}\n"] end if @respond_with_empty_img - return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] + return RESPONSE_IMG else if @use_204_response - return ["204 No Content", {}] + return RESPONSE_204 else - return ["200 OK", {'Content-Type'=>'text/plain'}, ""] + return RESPONSE_200 end end end @@ -327,7 +308,7 @@ def parse_params_default(params) def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] - @parser.parse(content) { |time, record| + @custom_parser.parse(content) { |time, record| raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } @@ -336,6 +317,32 @@ def parse_params_with_parser(params) end end + def add_params_to_record(record, params) + if @add_http_headers + params.each_pair { |k, v| + if k.start_with?("HTTP_".freeze) + record[k] = v + end + } + end + + if @add_remote_addr + record['REMOTE_ADDR'] = params['REMOTE_ADDR'] + end + end + + def convert_time_field(record) + if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) + if @default_time_parser + @default_time_parser.parse(t) + else + Fluent::EventTime.from_time(Time.at(t)) + end + else + Fluent::EventTime.now + end + end + class Handler attr_reader :content_type @@ -418,7 +425,7 @@ def on_headers_complete(headers) end } if expect - if expect == '100-continue' + if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else @@ -440,17 +447,20 @@ def on_body(chunk) @body << chunk end + RES_200_STATUS = "200 OK".freeze + RES_403_STATUS = "403 Forbidden".freeze + # Web browsers can send an OPTIONS request before performing POST # to check if cross-origin requests are supported. def handle_options_request # Is CORS enabled in the first place? if @cors_allow_origins.nil? - return send_response_and_close("403 Forbidden", {}, "") + return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' - return send_response_and_close("403 Forbidden", {}, "") + return send_response_and_close(RES_403_STATUS, {}, "") end header = { @@ -461,19 +471,19 @@ def handle_options_request # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" - send_response_and_close("200 OK", header, "") + send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin - send_response_and_close("200 OK", header, "") + send_response_and_close(RES_200_STATUS, header, "") else - send_response_and_close("403 Forbidden", {}, "") + send_response_and_close(RES_403_STATUS, {}, "") end end def on_message_complete return if closing? - if @parser.http_method == 'OPTIONS' + if @parser.http_method == 'OPTIONS'.freeze return handle_options_request() end @@ -483,7 +493,7 @@ def on_message_complete # restrictions and white listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') or include_cors_allow_origin - send_response_and_close("403 Forbidden", {'Connection' => 'close'}, "") + send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end @@ -493,14 +503,14 @@ def on_message_complete # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin - if @content_encoding == 'gzip' + if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read - elsif @content_encoding == 'deflate' + elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s - send_response_and_close("400 Bad Request", {}, "") + send_response_and_close(RES_400_STATUS, {}, "") return end @@ -526,8 +536,9 @@ def on_message_complete params.merge!(@env) @env.clear - code, header, body = *@callback.call(path_info, params) + code, header, body = @callback.call(path_info, params) body = body.to_s + header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') @@ -538,7 +549,7 @@ def on_message_complete end if @keep_alive - header['Connection'] = 'Keep-Alive' + header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) @@ -564,13 +575,13 @@ def closing? def send_response(code, header, body) header['Content-Length'] ||= body.bytesize - header['Content-Type'] ||= 'text/plain' + header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } - data << "\r\n" + data << "\r\n".freeze @io.write(data) @io.write(body) @@ -581,7 +592,7 @@ def send_response_nobody(code, header) header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } - data << "\r\n" + data << "\r\n".freeze @io.write(data) end