From eec10fa20bfcfa5725f611ac06253252508179e4 Mon Sep 17 00:00:00 2001 From: Lei Qiu Date: Sat, 15 Feb 2020 15:52:27 -0800 Subject: [PATCH] Cherry-pick #16315 to 7.6: Add a TLS test and more debug output to httpjson input (#16351) * Add a TLS test and more debug output to httpjson input (#16315) (cherry picked from commit 20e9d695ce89a9aacb95a45821b4033f600cc127) --- CHANGELOG.next.asciidoc | 2 + .../filebeat/input/httpjson/httpjson_test.go | 41 +++++++++++++++---- x-pack/filebeat/input/httpjson/input.go | 28 +++++++------ 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1157bd71109c..3dbfac07c972 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -77,6 +77,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* +- Add a TLS test and more debug output to httpjson input {pull}16315[16315] + *Heartbeat* diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 6c8ad1a98b67..ceb0218dabad 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -35,11 +35,14 @@ func testSetup(t *testing.T) { }) } -func runTest(t *testing.T, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) { - // Setup httpbin environment +func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) { testSetup(t) - // Create test http server - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Create an http test server according to whether TLS is used + var newServer = httptest.NewServer + if isTLS { + newServer = httptest.NewTLSServer + } + ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPost { req, err := ioutil.ReadAll(r.Body) defer r.Body.Close() @@ -154,7 +157,29 @@ func TestGET(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(1) + if !ok { + t.Fatalf("Expected 1 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestGetHTTPS(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "ssl.verification_mode": "none", + } + runTest(t, true, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -176,7 +201,7 @@ func TestPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 0, } - runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -198,7 +223,7 @@ func TestRepeatedPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 10 ^ 9, } - runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -219,7 +244,7 @@ func TestRunStop(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { input.Run() input.Stop() input.Run() diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 00bc6496b325..d5d1c61185c4 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -154,31 +154,32 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl for { req, err := in.createHTTPRequest(ctx, ri) if err != nil { - return err + return errors.Wrapf(err, "failed to create http request") } msg, err := client.Do(req) if err != nil { - return errors.New("failed to do http request. Stopping input worker - ") - } - if msg.StatusCode != http.StatusOK { - return errors.Errorf("return HTTP status is %s - ", msg.Status) + return errors.Wrapf(err, "failed to execute http client.Do") } responseData, err := ioutil.ReadAll(msg.Body) - defer msg.Body.Close() + msg.Body.Close() if err != nil { - return err + return errors.Wrapf(err, "failed to read http.response.body") + } + if msg.StatusCode != http.StatusOK { + in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData)) + return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode) } var m, v interface{} err = json.Unmarshal(responseData, &m) if err != nil { - return err + return errors.Wrapf(err, "failed to unmarshal http.response.body") } switch mmap := m.(type) { case map[string]interface{}: if in.config.JSONObjects == "" { ok := in.outlet.OnEvent(makeEvent(string(responseData))) if !ok { - return errors.New("function OnEvent returned false - ") + return errors.New("function OnEvent returned false") } } else { v, err = common.MapStr(mmap).GetValue(in.config.JSONObjects) @@ -192,11 +193,11 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl case map[string]interface{}: d, err := json.Marshal(tv) if err != nil { - return errors.New("failed to process http response data - ") + return errors.Wrapf(err, "failed to marshal json_objects_array") } ok := in.outlet.OnEvent(makeEvent(string(d))) if !ok { - return errors.New("OnEvent returned false - ") + return errors.New("function OnEvent returned false") } default: return errors.New("invalid json_objects_array configuration") @@ -222,7 +223,7 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl case string: ri.URL = v.(string) default: - return errors.New("pagination ID is not string, which is required for URL - ") + return errors.New("pagination ID is not of string type") } } if in.config.Pagination.ExtraBodyContent != nil { @@ -232,7 +233,8 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl } return nil default: - return errors.New("response is not valid JSON - ") + in.log.Debugw("http.response.body is not valid JSON", string(responseData)) + return errors.New("http.response.body is not valid JSON") } } }