Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a TLS test and more debug output to httpjson input #16315

Merged
merged 4 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121]
- Add MQTT input. {issue}15602[15602] {pull}16204[16204]
- Add ECS categorization fields to activemq module. {issue}16151[16151] {pull}16201[16201]
- Add a TLS test and more debug output to httpjson input {pull}16315[16315]

*Heartbeat*

Expand Down
41 changes: 33 additions & 8 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) {
// Setup httpbin environment
func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create test http server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -154,7 +157,29 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestGetHTTPS(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -176,7 +201,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -198,7 +223,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -219,7 +244,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
28 changes: 15 additions & 13 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,32 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
for {
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
return err
return errors.Wrapf(err, "failed to create http request")
}
msg, err := client.Do(req)
if err != nil {
return errors.New("failed to do http request. Stopping input worker - ")
}
if msg.StatusCode != http.StatusOK {
return errors.Errorf("return HTTP status is %s - ", msg.Status)
return errors.Wrapf(err, "failed to execute http client.Do")
}
responseData, err := ioutil.ReadAll(msg.Body)
defer msg.Body.Close()
msg.Body.Close()
if err != nil {
return err
return errors.Wrapf(err, "failed to read http.response.body")
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
err = json.Unmarshal(responseData, &m)
if err != nil {
return err
return errors.Wrapf(err, "failed to unmarshal http.response.body")
}
switch mmap := m.(type) {
case map[string]interface{}:
if in.config.JSONObjects == "" {
ok := in.outlet.OnEvent(makeEvent(string(responseData)))
if !ok {
return errors.New("function OnEvent returned false - ")
return errors.New("function OnEvent returned false")
}
} else {
v, err = common.MapStr(mmap).GetValue(in.config.JSONObjects)
Expand All @@ -192,11 +193,11 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
case map[string]interface{}:
d, err := json.Marshal(tv)
if err != nil {
return errors.New("failed to process http response data - ")
return errors.Wrapf(err, "failed to marshal json_objects_array")
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return errors.New("OnEvent returned false - ")
return errors.New("function OnEvent returned false")
}
default:
return errors.New("invalid json_objects_array configuration")
Expand All @@ -222,7 +223,7 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
case string:
ri.URL = v.(string)
default:
return errors.New("pagination ID is not string, which is required for URL - ")
return errors.New("pagination ID is not of string type")
}
}
if in.config.Pagination.ExtraBodyContent != nil {
Expand All @@ -232,7 +233,8 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
return nil
default:
return errors.New("response is not valid JSON - ")
in.log.Debugw("http.response.body is not valid JSON", string(responseData))
return errors.New("http.response.body is not valid JSON")
}
}
}
Expand Down