Skip to content

Commit

Permalink
Include the response in returned 400 errors (#3338)
Browse files Browse the repository at this point in the history
Include the response in the returned error for response status code 400. That way when the error is logged it will contain the response which is helpful for troubleshooting.

Testing: Test_pushLogData_ShouldAddResponseTo400Error
  • Loading branch information
bjsignalfx authored May 6, 2021
1 parent ef99484 commit b133761
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
16 changes: 9 additions & 7 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return c.postEvents(ctx, body, compressed)
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
c.wg.Add(1)
defer c.wg.Done()

Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
// pushLogDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) (err error) {
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) error {
// Length of retained bytes in buffer after truncation.
var bufLen int
// Buffer capacity.
Expand Down Expand Up @@ -186,7 +186,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(res, logs.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
if err = encoder.Encode(event); err != nil {
if err := encoder.Encode(event); err != nil {
permanentErrors = append(permanentErrors, consumererror.Permanent(fmt.Errorf("dropped log event: %v, error: %v", event, err)))
continue
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
// Truncating buffer at tracked length below capacity and sending.
buf.Truncate(bufLen)
if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
if err := send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
Expand All @@ -229,7 +229,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
}

if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
if err := send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
Expand All @@ -255,11 +255,13 @@ func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bo
if err != nil {
return err
}
defer resp.Body.Close()

err = splunk.HandleHTTPCode(resp)

io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

return splunk.HandleHTTPCode(resp)
return err
}

// subLogs returns a subset of `ld` starting from index `from` to the end.
Expand Down
51 changes: 51 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

type testRoundTripper func(req *http.Request) *http.Response

func (t testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return t(req), nil
}

func newTestClient(respCode int, respBody string) *http.Client {
return &http.Client{
Transport: testRoundTripper(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: respCode,
Body: ioutil.NopCloser(bytes.NewBufferString(respBody)),
Header: make(http.Header),
}
}),
}
}

func createMetricsData(numberOfDataPoints int) pdata.Metrics {

doubleVal := 1234.5678
Expand Down Expand Up @@ -712,6 +730,39 @@ func Test_pushLogData_PostError(t *testing.T) {
assert.Equal(t, (err.(consumererror.Logs)).GetLogs(), logs)
}

func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {
splunkClient := client{
url: &url.URL{Scheme: "http", Host: "splunk"},
zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
config: NewFactory().CreateDefaultConfig().(*Config),
}
logs := createLogData(1, 1, 1)

responseBody := `some error occurred`

// An HTTP client that returns status code 400 and response body responseBody.
splunkClient.client = newTestClient(400, responseBody)
// Sending logs using the client.
err := splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
//require.True(t, consumererror.IsPermanent(err), "Expecting permanent error")
require.Contains(t, err.Error(), "HTTP/0.0 400")
// The returned error should contain the response body responseBody.
assert.Contains(t, err.Error(), responseBody)

// An HTTP client that returns some other status code other than 400 and response body responseBody.
splunkClient.client = newTestClient(500, responseBody)
// Sending logs using the client.
err = splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
//require.False(t, consumererror.IsPermanent(err), "Expecting non-permanent error")
require.Contains(t, err.Error(), "HTTP 500")
// The returned error should not contain the response body responseBody.
assert.NotContains(t, err.Error(), responseBody)
}

func Test_pushLogData_Small_MaxContentLength(t *testing.T) {
c := client{
zippers: sync.Pool{New: func() interface{} {
Expand Down
8 changes: 7 additions & 1 deletion internal/splunk/httprequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package splunk
import (
"fmt"
"net/http"
"net/http/httputil"
"strconv"
"time"

Expand Down Expand Up @@ -53,7 +54,12 @@ func HandleHTTPCode(resp *http.Response) error {
err = exporterhelper.NewThrottleRetry(err, time.Duration(retryAfter)*time.Second)
// Check for permanent errors.
case http.StatusBadRequest, http.StatusUnauthorized:
err = consumererror.Permanent(err)
dump, err2 := httputil.DumpResponse(resp, true)
if err2 == nil {
err = consumererror.Permanent(fmt.Errorf("%w", fmt.Errorf("%q", dump)))
} else {
err = consumererror.Combine([]error{err, err2})
}
}

return err
Expand Down

0 comments on commit b133761

Please sign in to comment.