Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include the response in returned 400 errors #3338

Merged
merged 3 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return c.postEvents(ctx, body, compressed)
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
c.wg.Add(1)
defer c.wg.Done()

Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
// pushLogDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) (err error) {
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) error {
// Length of retained bytes in buffer after truncation.
var bufLen int
// Buffer capacity.
Expand Down Expand Up @@ -186,7 +186,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(res, logs.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
if err = encoder.Encode(event); err != nil {
if err := encoder.Encode(event); err != nil {
permanentErrors = append(permanentErrors, consumererror.Permanent(fmt.Errorf("dropped log event: %v, error: %v", event, err)))
continue
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
// Truncating buffer at tracked length below capacity and sending.
buf.Truncate(bufLen)
if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
if err := send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
Expand All @@ -229,7 +229,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
}

if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
if err := send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
Expand All @@ -255,11 +255,13 @@ func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bo
if err != nil {
return err
}
defer resp.Body.Close()

err = splunk.HandleHTTPCode(resp)

io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

return splunk.HandleHTTPCode(resp)
return err
}

// subLogs returns a subset of `ld` starting from index `from` to the end.
Expand Down
51 changes: 51 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

type testRoundTripper func(req *http.Request) *http.Response

func (t testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return t(req), nil
}

func newTestClient(respCode int, respBody string) *http.Client {
return &http.Client{
Transport: testRoundTripper(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: respCode,
Body: ioutil.NopCloser(bytes.NewBufferString(respBody)),
Header: make(http.Header),
}
}),
}
}

func createMetricsData(numberOfDataPoints int) pdata.Metrics {

doubleVal := 1234.5678
Expand Down Expand Up @@ -712,6 +730,39 @@ func Test_pushLogData_PostError(t *testing.T) {
assert.Equal(t, (err.(consumererror.Logs)).GetLogs(), logs)
}

func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {
splunkClient := client{
url: &url.URL{Scheme: "http", Host: "splunk"},
zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
config: NewFactory().CreateDefaultConfig().(*Config),
}
logs := createLogData(1, 1, 1)

responseBody := `some error occurred`

// An HTTP client that returns status code 400 and response body responseBody.
splunkClient.client = newTestClient(400, responseBody)
// Sending logs using the client.
err := splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
//require.True(t, consumererror.IsPermanent(err), "Expecting permanent error")
require.Contains(t, err.Error(), "HTTP/0.0 400")
// The returned error should contain the response body responseBody.
assert.Contains(t, err.Error(), responseBody)

// An HTTP client that returns some other status code other than 400 and response body responseBody.
splunkClient.client = newTestClient(500, responseBody)
// Sending logs using the client.
err = splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
//require.False(t, consumererror.IsPermanent(err), "Expecting non-permanent error")
require.Contains(t, err.Error(), "HTTP 500")
// The returned error should not contain the response body responseBody.
assert.NotContains(t, err.Error(), responseBody)
}

func Test_pushLogData_Small_MaxContentLength(t *testing.T) {
c := client{
zippers: sync.Pool{New: func() interface{} {
Expand Down
8 changes: 7 additions & 1 deletion internal/splunk/httprequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package splunk
import (
"fmt"
"net/http"
"net/http/httputil"
"strconv"
"time"

Expand Down Expand Up @@ -53,7 +54,12 @@ func HandleHTTPCode(resp *http.Response) error {
err = exporterhelper.NewThrottleRetry(err, time.Duration(retryAfter)*time.Second)
// Check for permanent errors.
case http.StatusBadRequest, http.StatusUnauthorized:
err = consumererror.Permanent(err)
dump, err2 := httputil.DumpResponse(resp, true)
if err2 == nil {
err = consumererror.Permanent(fmt.Errorf("%w", fmt.Errorf("%q", dump)))
} else {
err = consumererror.Combine([]error{err, err2})
}
}

return err
Expand Down