diff --git a/exporter/sumologicexporter/sender.go b/exporter/sumologicexporter/sender.go new file mode 100644 index 000000000000..b8577be84778 --- /dev/null +++ b/exporter/sumologicexporter/sender.go @@ -0,0 +1,258 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sumologicexporter + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/pdata" +) + +type appendResponse struct { + // sent gives information if the data was sent or not + sent bool + // appended keeps state of appending new log line to the body + appended bool +} + +type sender struct { + buffer []pdata.LogRecord + config *Config + client *http.Client + filter filter + ctx context.Context +} + +const ( + logKey string = "log" + // maxBufferSize defines size of the buffer (maximum number of pdata.LogRecord entries) + maxBufferSize int = 1024 * 1024 +) + +func newAppendResponse() appendResponse { + return appendResponse{ + appended: true, + } +} + +func newSender(ctx context.Context, cfg *Config, cl *http.Client, f filter) *sender { + return &sender{ + config: cfg, + client: cl, + filter: f, + ctx: ctx, + } +} + +// send sends data to sumologic +func (s *sender) send(pipeline PipelineType, body io.Reader, flds fields) error { + // Add headers + req, err := http.NewRequestWithContext(s.ctx, http.MethodPost, s.config.HTTPClientSettings.Endpoint, body) + if err != nil { + return err + } + + req.Header.Add("X-Sumo-Client", s.config.Client) + + if len(s.config.SourceHost) > 0 { + req.Header.Add("X-Sumo-Host", s.config.SourceHost) + } + + if len(s.config.SourceName) > 0 { + req.Header.Add("X-Sumo-Name", s.config.SourceName) + } + + if len(s.config.SourceCategory) > 0 { + req.Header.Add("X-Sumo-Category", s.config.SourceCategory) + } + + switch pipeline { + case LogsPipeline: + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("X-Sumo-Fields", string(flds)) + case MetricsPipeline: + // ToDo: Implement metrics pipeline + return errors.New("current sender version doesn't support metrics") + default: + return errors.New("unexpected pipeline") + } + + resp, err := s.client.Do(req) + if err != nil { + return err + } + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + return fmt.Errorf("error during sending data: %s", resp.Status) + } + return nil +} + +// logToText converts LogRecord to a plain text line, returns it and error eventually +func (s *sender) logToText(record pdata.LogRecord) (string, error) { + return record.Body().StringVal(), nil +} + +// logToText converts LogRecord to a json line, returns it and error eventually +func (s *sender) logToJSON(record pdata.LogRecord) (string, error) { + data := s.filter.filterOut(record.Attributes()) + data[logKey] = record.Body().StringVal() + + nextLine, err := json.Marshal(data) + if err != nil { + return "", err + } + + return bytes.NewBuffer(nextLine).String(), nil +} + +// sendLogs sends log records from the buffer formatted according +// to configured LogFormat and as the result of execution +// returns array of records which has not been sent correctly and error +func (s *sender) sendLogs(flds fields) ([]pdata.LogRecord, error) { + var ( + body strings.Builder + errs []error + droppedRecords []pdata.LogRecord + currentRecords []pdata.LogRecord + ) + + for _, record := range s.buffer { + var ( + formattedLine string + err error + ) + + switch s.config.LogFormat { + case TextFormat: + formattedLine, err = s.logToText(record) + case JSONFormat: + formattedLine, err = s.logToJSON(record) + default: + err = errors.New("unexpected log format") + } + + if err != nil { + droppedRecords = append(droppedRecords, record) + errs = append(errs, err) + continue + } + + ar, err := s.appendAndSend(formattedLine, LogsPipeline, &body, flds) + if err != nil { + errs = append(errs, err) + if ar.sent { + droppedRecords = append(droppedRecords, currentRecords...) + } + + if !ar.appended { + droppedRecords = append(droppedRecords, record) + } + } + + // If data was sent, cleanup the currentTimeSeries counter + if ar.sent { + currentRecords = currentRecords[:0] + } + + // If log has been appended to body, increment the currentTimeSeries + if ar.appended { + currentRecords = append(currentRecords, record) + } + } + + if err := s.send(LogsPipeline, strings.NewReader(body.String()), flds); err != nil { + errs = append(errs, err) + droppedRecords = append(droppedRecords, currentRecords...) + } + + if len(errs) > 0 { + return droppedRecords, componenterror.CombineErrors(errs) + } + return droppedRecords, nil +} + +// appendAndSend appends line to the request body that will be sent and sends +// the accumulated data if the internal buffer has been filled (with maxBufferSize elements). +// It returns appendResponse +func (s *sender) appendAndSend( + line string, + pipeline PipelineType, + body *strings.Builder, + flds fields, +) (appendResponse, error) { + var errors []error + ar := newAppendResponse() + + if body.Len() > 0 && body.Len()+len(line) >= s.config.MaxRequestBodySize { + ar.sent = true + if err := s.send(pipeline, strings.NewReader(body.String()), flds); err != nil { + errors = append(errors, err) + } + body.Reset() + } + + if body.Len() > 0 { + // Do not add newline if the body is empty + if _, err := body.WriteString("\n"); err != nil { + errors = append(errors, err) + ar.appended = false + } + } + + if ar.appended { + // Do not append new line if separator was not appended + if _, err := body.WriteString(line); err != nil { + errors = append(errors, err) + ar.appended = false + } + } + + if len(errors) > 0 { + return ar, componenterror.CombineErrors(errors) + } + return ar, nil +} + +// cleanBuffer zeroes buffer +func (s *sender) cleanBuffer() { + s.buffer = (s.buffer)[:0] +} + +// batch adds log to the buffer and flushes them if buffer is full to avoid overflow +// returns list of log records which were not sent successfully +func (s *sender) batch(log pdata.LogRecord, metadata fields) ([]pdata.LogRecord, error) { + s.buffer = append(s.buffer, log) + + if s.count() >= maxBufferSize { + dropped, err := s.sendLogs(metadata) + s.cleanBuffer() + return dropped, err + } + + return nil, nil +} + +// count returns number of logs in buffer +func (s *sender) count() int { + return len(s.buffer) +} diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go new file mode 100644 index 000000000000..51ad7e64e938 --- /dev/null +++ b/exporter/sumologicexporter/sender_test.go @@ -0,0 +1,419 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sumologicexporter + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer/pdata" +) + +type senderTest struct { + srv *httptest.Server + s *sender +} + +func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request)) *senderTest { + reqCounter := 0 + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if len(cb) > 0 && assert.Greater(t, len(cb), reqCounter) { + cb[reqCounter](w, req) + reqCounter++ + } + })) + + cfg := &Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: testServer.URL, + Timeout: defaultTimeout, + }, + LogFormat: "text", + Client: "otelcol", + MaxRequestBodySize: 20_971_520, + } + + f, err := newFilter([]string{}) + require.NoError(t, err) + + return &senderTest{ + srv: testServer, + s: newSender( + context.Background(), + cfg, + &http.Client{ + Timeout: cfg.HTTPClientSettings.Timeout, + }, + f, + ), + } +} + +func extractBody(t *testing.T, req *http.Request) string { + buf := new(strings.Builder) + _, err := io.Copy(buf, req.Body) + require.NoError(t, err) + return buf.String() +} + +func exampleLog() []pdata.LogRecord { + buffer := make([]pdata.LogRecord, 1) + buffer[0] = pdata.NewLogRecord() + buffer[0].InitEmpty() + buffer[0].Body().SetStringVal("Example log") + + return buffer +} + +func exampleTwoLogs() []pdata.LogRecord { + buffer := make([]pdata.LogRecord, 2) + buffer[0] = pdata.NewLogRecord() + buffer[0].InitEmpty() + buffer[0].Body().SetStringVal("Example log") + buffer[0].Attributes().InsertString("key1", "value1") + buffer[0].Attributes().InsertString("key2", "value2") + buffer[1] = pdata.NewLogRecord() + buffer[1].InitEmpty() + buffer[1].Body().SetStringVal("Another example log") + buffer[1].Attributes().InsertString("key1", "value1") + buffer[1].Attributes().InsertString("key2", "value2") + + return buffer +} + +func TestSend(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, "Example log\nAnother example log", body) + assert.Equal(t, "test_metadata", req.Header.Get("X-Sumo-Fields")) + assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) + assert.Equal(t, "application/x-www-form-urlencoded", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + + test.s.buffer = exampleTwoLogs() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestSendSplit(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, "Example log", body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, "Another example log", body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.buffer = exampleTwoLogs() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} +func TestSendSplitFailedOne(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + assert.Equal(t, "Example log", body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, "Another example log", body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.config.LogFormat = TextFormat + test.s.buffer = exampleTwoLogs() + + dropped, err := test.s.sendLogs("test_metadata") + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, test.s.buffer[0:1], dropped) +} + +func TestSendSplitFailedAll(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + assert.Equal(t, "Example log", body) + }, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(404) + + body := extractBody(t, req) + assert.Equal(t, "Another example log", body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.config.LogFormat = TextFormat + test.s.buffer = exampleTwoLogs() + + dropped, err := test.s.sendLogs("test_metadata") + assert.EqualError( + t, + err, + "[error during sending data: 500 Internal Server Error; error during sending data: 404 Not Found]", + ) + assert.Equal(t, test.s.buffer[0:2], dropped) +} + +func TestSendJson(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `{"key1":"value1","key2":"value2","log":"Example log"} +{"key1":"value1","key2":"value2","log":"Another example log"}` + assert.Equal(t, expected, body) + assert.Equal(t, "test_metadata", req.Header.Get("X-Sumo-Fields")) + assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) + assert.Equal(t, "application/x-www-form-urlencoded", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.LogFormat = JSONFormat + test.s.buffer = exampleTwoLogs() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestSendJsonSplit(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.LogFormat = JSONFormat + test.s.config.MaxRequestBodySize = 10 + test.s.buffer = exampleTwoLogs() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestSendJsonSplitFailedOne(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.LogFormat = JSONFormat + test.s.config.MaxRequestBodySize = 10 + test.s.buffer = exampleTwoLogs() + + dropped, err := test.s.sendLogs("test_metadata") + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, test.s.buffer[0:1], dropped) +} + +func TestSendJsonSplitFailedAll(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + }, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(404) + + body := extractBody(t, req) + assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.LogFormat = JSONFormat + test.s.config.MaxRequestBodySize = 10 + test.s.buffer = exampleTwoLogs() + + dropped, err := test.s.sendLogs("test_metadata") + assert.EqualError( + t, + err, + "[error during sending data: 500 Internal Server Error; error during sending data: 404 Not Found]", + ) + assert.Equal(t, test.s.buffer[0:2], dropped) +} + +func TestSendUnexpectedFormat(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + }, + }) + defer func() { test.srv.Close() }() + test.s.config.LogFormat = "dummy" + test.s.buffer = exampleTwoLogs() + + _, err := test.s.sendLogs("test_metadata") + assert.Error(t, err) +} + +func TestOverrideSourceName(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "Test source name", req.Header.Get("X-Sumo-Name")) + }, + }) + defer func() { test.srv.Close() }() + + test.s.config.SourceName = "Test source name" + test.s.buffer = exampleLog() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestOverrideSourceCategory(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "Test source category", req.Header.Get("X-Sumo-Category")) + }, + }) + defer func() { test.srv.Close() }() + + test.s.config.SourceCategory = "Test source category" + test.s.buffer = exampleLog() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestOverrideSourceHost(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "Test source host", req.Header.Get("X-Sumo-Host")) + }, + }) + defer func() { test.srv.Close() }() + + test.s.config.SourceHost = "Test source host" + test.s.buffer = exampleLog() + + _, err := test.s.sendLogs("test_metadata") + assert.NoError(t, err) +} + +func TestBuffer(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + assert.Equal(t, test.s.count(), 0) + logs := exampleTwoLogs() + + droppedLogs, err := test.s.batch(logs[0], "") + require.NoError(t, err) + assert.Nil(t, droppedLogs) + assert.Equal(t, 1, test.s.count()) + assert.Equal(t, []pdata.LogRecord{logs[0]}, test.s.buffer) + + droppedLogs, err = test.s.batch(logs[1], "") + require.NoError(t, err) + assert.Nil(t, droppedLogs) + assert.Equal(t, 2, test.s.count()) + assert.Equal(t, logs, test.s.buffer) + + test.s.cleanBuffer() + assert.Equal(t, 0, test.s.count()) + assert.Equal(t, []pdata.LogRecord{}, test.s.buffer) +} + +func TestInvalidEndpoint(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + test.s.config.HTTPClientSettings.Endpoint = ":" + test.s.buffer = exampleLog() + + _, err := test.s.sendLogs("test_metadata") + assert.EqualError(t, err, `parse ":": missing protocol scheme`) +} + +func TestInvalidPostRequest(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + test.s.config.HTTPClientSettings.Endpoint = "" + test.s.buffer = exampleLog() + + _, err := test.s.sendLogs("test_metadata") + assert.EqualError(t, err, `Post "": unsupported protocol scheme ""`) +} + +func TestBufferOverflow(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + test.s.config.HTTPClientSettings.Endpoint = ":" + log := exampleLog() + + for test.s.count() < maxBufferSize-1 { + _, err := test.s.batch(log[0], "test_metadata") + require.NoError(t, err) + } + + _, err := test.s.batch(log[0], "test_metadata") + assert.EqualError(t, err, `parse ":": missing protocol scheme`) + assert.Equal(t, 0, test.s.count()) +} + +func TestMetricsPipeline(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + err := test.s.send(MetricsPipeline, strings.NewReader(""), "") + assert.EqualError(t, err, `current sender version doesn't support metrics`) +} + +func TestInvalidPipeline(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + err := test.s.send("invalidPipeline", strings.NewReader(""), "") + assert.EqualError(t, err, `unexpected pipeline`) +}