diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 9af84ab90044..1159d6a82cb2 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -681,14 +681,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/api v0.176.1 // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index fc9f00171bf6..a6f3c5248de7 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -1769,8 +1769,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 h1:/RIbNt/Zr7rVhIkQhooTxCxFcdWLGIKnZA4IXNFSrvo= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -2093,8 +2093,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= +golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index d5a2bbd6b77b..c77eb136c199 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -702,15 +702,15 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect - golang.org/x/mod v0.16.0 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/api v0.176.1 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 58152b760d3f..8a179f22d335 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -1773,8 +1773,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1803,8 +1803,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= -golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2098,8 +2098,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= +golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/exporter/sumologicexporter/exporter.go b/exporter/sumologicexporter/exporter.go index c36bceb73431..4a0413c1f4e2 100644 --- a/exporter/sumologicexporter/exporter.go +++ b/exporter/sumologicexporter/exporter.go @@ -55,6 +55,8 @@ type sumologicexporter struct { foundSumologicExtension bool sumologicExtension *sumologicextension.SumologicExtension + + id component.ID } func initExporter(cfg *Config, settings component.TelemetrySettings) (*sumologicexporter, error) { @@ -295,6 +297,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err } logsURL, metricsURL, tracesURL := se.getDataURLs() sdr := newSender( + se.logger, se.config, se.getHTTPClient(), se.filter, @@ -305,6 +308,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err logsURL, tracesURL, se.graphiteFormatter, + se.id, ) // Iterate over ResourceLogs @@ -392,6 +396,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met } logsURL, metricsURL, tracesURL := se.getDataURLs() sdr := newSender( + se.logger, se.config, se.getHTTPClient(), se.filter, @@ -402,6 +407,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met logsURL, tracesURL, se.graphiteFormatter, + se.id, ) // Iterate over ResourceMetrics diff --git a/exporter/sumologicexporter/exporter_test.go b/exporter/sumologicexporter/exporter_test.go index 77a2f28d9c7b..dbf56f28457f 100644 --- a/exporter/sumologicexporter/exporter_test.go +++ b/exporter/sumologicexporter/exporter_test.go @@ -98,7 +98,7 @@ func TestAllFailed(t *testing.T) { logs := logRecordsToLogs(exampleTwoLogs()) err := test.exp.pushLogsData(context.Background(), logs) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") var partial consumererror.Logs require.True(t, errors.As(err, &partial)) @@ -131,7 +131,7 @@ func TestPartiallyFailed(t *testing.T) { expected := logRecordsToLogs(records[:1]) err = test.exp.pushLogsData(context.Background(), logs) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") var partial consumererror.Logs require.True(t, errors.As(err, &partial)) @@ -224,7 +224,7 @@ func TestPushFailedBatch(t *testing.T) { } err := test.exp.pushLogsData(context.Background(), logs) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") } func TestAllMetricsSuccess(t *testing.T) { @@ -272,7 +272,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 }) err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") var partial consumererror.Metrics require.True(t, errors.As(err, &partial)) @@ -309,7 +309,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 expected := metricPairToMetrics(records[:1]) err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") var partial consumererror.Metrics require.True(t, errors.As(err, &partial)) @@ -375,7 +375,7 @@ gauge_metric_name{foo="bar",key2="value2",remote_name="156955",url="http://anoth expected := metricPairToMetrics(records[:1]) err = test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") var partial consumererror.Metrics require.True(t, errors.As(err, &partial)) @@ -418,7 +418,7 @@ func TestPushMetricsFailedBatch(t *testing.T) { } err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") } func TestGetSignalUrl(t *testing.T) { diff --git a/exporter/sumologicexporter/fields.go b/exporter/sumologicexporter/fields.go index 368a57b6720d..f468ff24ea11 100644 --- a/exporter/sumologicexporter/fields.go +++ b/exporter/sumologicexporter/fields.go @@ -4,46 +4,81 @@ package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" import ( - "fmt" - "sort" "strings" "go.opentelemetry.io/collector/pdata/pcommon" + "golang.org/x/exp/slices" ) // fields represents metadata type fields struct { - orig pcommon.Map - replacer *strings.Replacer + orig pcommon.Map + initialized bool } func newFields(attrMap pcommon.Map) fields { return fields{ - orig: attrMap, - replacer: strings.NewReplacer(",", "_", "=", ":", "\n", "_"), + orig: attrMap, + initialized: true, } } // string returns fields as ordered key=value string with `, ` as separator func (f fields) string() string { + if !f.initialized { + return "" + } + returnValue := make([]string, 0, f.orig.Len()) + f.orig.Range(func(k string, v pcommon.Value) bool { + // Don't add source related attributes to fields as they are handled separately + // and are added to the payload either as special HTTP headers or as resources + // attributes. + if k == attributeKeySourceCategory || k == attributeKeySourceHost || k == attributeKeySourceName { + return true + } + + sv := v.AsString() + + // Skip empty field + if len(sv) == 0 { + return true + } + + key := []byte(k) + f.sanitizeField(key) + value := []byte(sv) + f.sanitizeField(value) + sb := strings.Builder{} + sb.Grow(len(key) + len(value) + 1) + sb.Write(key) + sb.WriteRune('=') + sb.Write(value) + returnValue = append( returnValue, - fmt.Sprintf( - "%s=%s", - f.sanitizeField(k), - f.sanitizeField(v.AsString()), - ), + sb.String(), ) return true }) - sort.Strings(returnValue) + slices.Sort(returnValue) return strings.Join(returnValue, ", ") } // sanitizeFields sanitize field (key or value) to be correctly parsed by sumologic receiver -func (f fields) sanitizeField(fld string) string { - return f.replacer.Replace(fld) +// It modifies the field in place. +func (f fields) sanitizeField(fld []byte) { + for i := 0; i < len(fld); i++ { + switch fld[i] { + case ',': + fld[i] = '_' + case '=': + fld[i] = ':' + case '\n': + fld[i] = '_' + default: + } + } } diff --git a/exporter/sumologicexporter/fields_test.go b/exporter/sumologicexporter/fields_test.go index 93be0a159007..1acb084c16db 100644 --- a/exporter/sumologicexporter/fields_test.go +++ b/exporter/sumologicexporter/fields_test.go @@ -1,32 +1,88 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package sumologicexporter +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" ) -func TestFieldsAsString(t *testing.T) { - expected := "key1=value1, key2=value2, key3=value3" - flds := fieldsFromMap(map[string]string{ - "key1": "value1", - "key3": "value3", - "key2": "value2", - }) +func TestFields(t *testing.T) { + testcases := []struct { + name string + fields map[string]string + expected string + }{ + { + name: "string", + fields: map[string]string{ + "key1": "value1", + "key3": "value3", + "key2": "value2", + }, + expected: "key1=value1, key2=value2, key3=value3", + }, + { + name: "sanitization", + fields: map[string]string{ + "key1": "value,1", + "key3": "value\n3", + "key=,2": "valu,e=2", + }, + expected: "key1=value_1, key3=value_3, key:_2=valu_e:2", + }, + { + name: "empty element", + fields: map[string]string{ + "key1": "value1", + "key3": "value3", + "key2": "", + }, + expected: "key1=value1, key3=value3", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + flds := fieldsFromMap(tc.fields) - assert.Equal(t, expected, flds.string()) + assert.Equal(t, tc.expected, flds.string()) + }) + } } -func TestFieldsSanitization(t *testing.T) { - expected := "key1=value_1, key3=value_3, key:_2=valu_e:2" - flds := fieldsFromMap(map[string]string{ - "key1": "value,1", - "key3": "value\n3", - "key=,2": "valu,e=2", - }) +func BenchmarkFields(b *testing.B) { + attrMap := pcommon.NewMap() + flds := map[string]any{ + "key1": "value1", + "key3": "value3", + "key2": "", + "map": map[string]string{ + "key1": "value1", + "key3": "value3", + "key2": "", + }, + } + for k, v := range flds { + switch v := v.(type) { + case string: + attrMap.PutStr(k, v) + case map[string]string: + m := pcommon.NewValueMap() + mm := m.Map().AsRaw() + for kk, vv := range v { + mm[kk] = vv + } + m.CopyTo(attrMap.PutEmpty(k)) + } + } + sut := newFields(attrMap) - assert.Equal(t, expected, flds.string()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = sut.string() + } } diff --git a/exporter/sumologicexporter/generated_package_test.go b/exporter/sumologicexporter/generated_package_test.go index 06d9480a812b..f18b099d8cd7 100644 --- a/exporter/sumologicexporter/generated_package_test.go +++ b/exporter/sumologicexporter/generated_package_test.go @@ -9,5 +9,5 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) } diff --git a/exporter/sumologicexporter/go.mod b/exporter/sumologicexporter/go.mod index 3d4de9211b63..cc1237c242a2 100644 --- a/exporter/sumologicexporter/go.mod +++ b/exporter/sumologicexporter/go.mod @@ -19,6 +19,7 @@ require ( go.opentelemetry.io/otel/trace v1.25.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f ) require ( diff --git a/exporter/sumologicexporter/go.sum b/exporter/sumologicexporter/go.sum index aa6b4790e853..d0b5f37eef69 100644 --- a/exporter/sumologicexporter/go.sum +++ b/exporter/sumologicexporter/go.sum @@ -199,6 +199,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/exporter/sumologicexporter/metadata.yaml b/exporter/sumologicexporter/metadata.yaml index fbd8c3b6e03c..69bfad133f02 100644 --- a/exporter/sumologicexporter/metadata.yaml +++ b/exporter/sumologicexporter/metadata.yaml @@ -12,3 +12,8 @@ status: # TODO: Update the exporter to pass the tests tests: skip_lifecycle: true + goleak: + ignore: + top: + # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. + - "go.opencensus.io/stats/view.(*worker).start" diff --git a/exporter/sumologicexporter/sender.go b/exporter/sumologicexporter/sender.go index ccd2a8625785..b1ea5f70d332 100644 --- a/exporter/sumologicexporter/sender.go +++ b/exporter/sumologicexporter/sender.go @@ -12,18 +12,17 @@ import ( "io" "net/http" "strings" + "time" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" -) + "go.uber.org/zap" -type appendResponse struct { - // sent gives information if the data was sent or not - sent bool - // appended keeps state of appending new log line to the body - appended bool -} + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter/internal/observability" +) // metricPair represents information required to send one metric to the Sumo Logic type metricPair struct { @@ -31,7 +30,75 @@ type metricPair struct { metric pmetric.Metric } +// countingReader keeps number of records related to reader +type countingReader struct { + counter int64 + reader io.Reader +} + +// newCountingReader creates countingReader with given number of records +func newCountingReader(records int) *countingReader { + return &countingReader{ + counter: int64(records), + } +} + +// withString sets up reader to read from string data +func (c *countingReader) withString(data string) *countingReader { + c.reader = strings.NewReader(data) + return c +} + +// bodyBuilder keeps information about number of records related to data it keeps +type bodyBuilder struct { + builder strings.Builder + counter int +} + +// newBodyBuilder returns empty bodyBuilder +func newBodyBuilder() bodyBuilder { + return bodyBuilder{} +} + +// Reset resets both counter and builder content +func (b *bodyBuilder) Reset() { + b.counter = 0 + b.builder.Reset() +} + +// addLine adds multiple lines to builder and increments counter +func (b *bodyBuilder) addLines(lines []string) { + if len(lines) == 0 { + return + } + + // add the first line separately to avoid a conditional in the loop + b.builder.WriteString(lines[0]) + + for _, line := range lines[1:] { + b.builder.WriteByte('\n') + b.builder.WriteString(line) // WriteString can't actually return an error + } + b.counter += len(lines) +} + +// addNewLine adds newline to builder +func (b *bodyBuilder) addNewLine() { + b.builder.WriteByte('\n') // WriteByte can't actually return an error +} + +// Len returns builder content length +func (b *bodyBuilder) Len() int { + return b.builder.Len() +} + +// toCountingReader converts bodyBuilder to countingReader +func (b *bodyBuilder) toCountingReader() *countingReader { + return newCountingReader(b.counter).withString(b.builder.String()) +} + type sender struct { + logger *zap.Logger logBuffer []plog.LogRecord metricBuffer []metricPair config *Config @@ -44,6 +111,7 @@ type sender struct { dataURLLogs string dataURLTraces string graphiteFormatter graphiteFormatter + id component.ID } const ( @@ -59,6 +127,10 @@ const ( headerCategory string = "X-Sumo-Category" headerFields string = "X-Sumo-Fields" + attributeKeySourceHost = "_sourceHost" + attributeKeySourceName = "_sourceName" + attributeKeySourceCategory = "_sourceCategory" + contentTypeLogs string = "application/x-www-form-urlencoded" contentTypePrometheus string = "application/vnd.sumologic.prometheus" contentTypeCarbon2 string = "application/vnd.sumologic.carbon2" @@ -68,13 +140,8 @@ const ( contentEncodingDeflate string = "deflate" ) -func newAppendResponse() appendResponse { - return appendResponse{ - appended: true, - } -} - func newSender( + logger *zap.Logger, cfg *Config, cl *http.Client, f filter, @@ -85,8 +152,10 @@ func newSender( logsURL string, tracesURL string, gf graphiteFormatter, + id component.ID, ) *sender { return &sender{ + logger: logger, config: cfg, client: cl, filter: f, @@ -97,83 +166,160 @@ func newSender( dataURLLogs: logsURL, dataURLTraces: tracesURL, graphiteFormatter: gf, + id: id, } } -// send sends data to sumologic -func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader, flds fields) error { - var url string +var errUnauthorized = errors.New("unauthorized") - switch pipeline { - case MetricsPipeline: - url = s.dataURLMetrics - case LogsPipeline: - url = s.dataURLLogs - default: - return fmt.Errorf("unknown pipeline type: %s", pipeline) +// send sends data to sumologic +func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *countingReader, flds fields) error { + req, err := s.createRequest(ctx, pipeline, reader.reader) + if err != nil { + return err } - data, err := s.compressor.compress(body) - if err != nil { + if err = s.addRequestHeaders(req, pipeline, flds); err != nil { return err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data) + + s.logger.Debug("Sending data", + zap.String("pipeline", string(pipeline)), + zap.Any("headers", req.Header), + ) + + start := time.Now() + resp, err := s.client.Do(req) if err != nil { + s.recordMetrics(time.Since(start), reader.counter, req, nil, pipeline) return err } + defer resp.Body.Close() - // Add headers - switch s.config.CompressEncoding { - case GZIPCompression: - req.Header.Set(headerContentEncoding, contentEncodingGzip) - case DeflateCompression: - req.Header.Set(headerContentEncoding, contentEncodingDeflate) - case NoCompression: - default: - return fmt.Errorf("invalid content encoding: %s", s.config.CompressEncoding) - } + s.recordMetrics(time.Since(start), reader.counter, req, resp, pipeline) - req.Header.Add(headerClient, s.config.Client) + return s.handleReceiverResponse(resp) +} - if s.sources.host.isSet() { - req.Header.Add(headerHost, s.sources.host.format(flds)) +func (s *sender) handleReceiverResponse(resp *http.Response) error { + // API responds with a 200 or 204 with ConentLength set to 0 when all data + // has been successfully ingested. + if resp.ContentLength == 0 && (resp.StatusCode == 200 || resp.StatusCode == 204) { + return nil } - if s.sources.name.isSet() { - req.Header.Add(headerName, s.sources.name.format(flds)) + type ReceiverResponseCore struct { + Status int `json:"status,omitempty"` + ID string `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Message string `json:"message,omitempty"` } - if s.sources.category.isSet() { - req.Header.Add(headerCategory, s.sources.category.format(flds)) + // API responds with a 200 or 204 with a JSON body describing what issues + // were encountered when processing the sent data. + switch resp.StatusCode { + case 200, 204: + if resp.ContentLength < 0 { + s.logger.Warn("Unknown length of server response") + return nil + } + + var rResponse ReceiverResponseCore + var ( + b = bytes.NewBuffer(make([]byte, 0, resp.ContentLength)) + tr = io.TeeReader(resp.Body, b) + ) + + if err := json.NewDecoder(tr).Decode(&rResponse); err != nil { + s.logger.Warn("Error decoding receiver response", zap.ByteString("body", b.Bytes())) + return nil + } + + l := s.logger.With(zap.String("status", resp.Status)) + if len(rResponse.ID) > 0 { + l = l.With(zap.String("id", rResponse.ID)) + } + if len(rResponse.Code) > 0 { + l = l.With(zap.String("code", rResponse.Code)) + } + if len(rResponse.Message) > 0 { + l = l.With(zap.String("message", rResponse.Message)) + } + l.Warn("There was an issue sending data") + return nil + + case 401: + return errUnauthorized + + default: + type ReceiverErrorResponse struct { + ReceiverResponseCore + Errors []struct { + Code string `json:"code"` + Message string `json:"message"` + } `json:"errors,omitempty"` + } + + var rResponse ReceiverErrorResponse + if resp.ContentLength > 0 { + var ( + b = bytes.NewBuffer(make([]byte, 0, resp.ContentLength)) + tr = io.TeeReader(resp.Body, b) + ) + + if err := json.NewDecoder(tr).Decode(&rResponse); err != nil { + return fmt.Errorf("failed to decode API response (status: %s): %s", + resp.Status, b.String(), + ) + } + } + + errMsgs := []string{ + fmt.Sprintf("status: %s", resp.Status), + } + + if len(rResponse.ID) > 0 { + errMsgs = append(errMsgs, fmt.Sprintf("id: %s", rResponse.ID)) + } + if len(rResponse.Code) > 0 { + errMsgs = append(errMsgs, fmt.Sprintf("code: %s", rResponse.Code)) + } + if len(rResponse.Message) > 0 { + errMsgs = append(errMsgs, fmt.Sprintf("message: %s", rResponse.Message)) + } + if len(rResponse.Errors) > 0 { + errMsgs = append(errMsgs, fmt.Sprintf("errors: %+v", rResponse.Errors)) + } + + err := fmt.Errorf("failed sending data: %s", strings.Join(errMsgs, ", ")) + + if resp.StatusCode == http.StatusBadRequest { + // Report the failure as permanent if the server thinks the request is malformed. + return consumererror.NewPermanent(err) + } + + return err } +} + +func (s *sender) createRequest(ctx context.Context, pipeline PipelineType, data io.Reader) (*http.Request, error) { + var url string switch pipeline { - case LogsPipeline: - req.Header.Add(headerContentType, contentTypeLogs) - req.Header.Add(headerFields, flds.string()) case MetricsPipeline: - switch s.config.MetricFormat { - case PrometheusFormat: - req.Header.Add(headerContentType, contentTypePrometheus) - case Carbon2Format: - req.Header.Add(headerContentType, contentTypeCarbon2) - case GraphiteFormat: - req.Header.Add(headerContentType, contentTypeGraphite) - default: - return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat) - } + url = s.dataURLMetrics + case LogsPipeline: + url = s.dataURLLogs default: - return errors.New("unexpected pipeline") + return nil, fmt.Errorf("unknown pipeline type: %s", pipeline) } - resp, err := s.client.Do(req) + data, err := s.compressor.compress(data) if err != nil { - return err - } - if resp.StatusCode < 200 || resp.StatusCode >= 400 { - return fmt.Errorf("error during sending data: %s", resp.Status) + return nil, err } - return nil + + return http.NewRequestWithContext(ctx, http.MethodPost, url, data) } // logToText converts LogRecord to a plain text line, returns it and error eventually @@ -199,7 +345,7 @@ func (s *sender) logToJSON(record plog.LogRecord) (string, error) { // returns array of records which has not been sent correctly and error func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, error) { var ( - body strings.Builder + body = newBodyBuilder() errs []error droppedRecords []plog.LogRecord currentRecords []plog.LogRecord @@ -224,31 +370,22 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, e continue } - ar, err := s.appendAndSend(ctx, formattedLine, LogsPipeline, &body, flds) + sent, err := s.appendAndMaybeSend(ctx, []string{formattedLine}, LogsPipeline, &body, flds) if err != nil { errs = append(errs, err) - if ar.sent { - droppedRecords = append(droppedRecords, currentRecords...) - } - - if !ar.appended { - droppedRecords = append(droppedRecords, record) - } + droppedRecords = append(droppedRecords, currentRecords...) } // If data was sent, cleanup the currentTimeSeries counter - if ar.sent { + if sent { currentRecords = currentRecords[:0] } - // If log has been appended to body, increment the currentTimeSeries - if ar.appended { - currentRecords = append(currentRecords, record) - } + currentRecords = append(currentRecords, record) } if body.Len() > 0 { - if err := s.send(ctx, LogsPipeline, strings.NewReader(body.String()), flds); err != nil { + if err := s.send(ctx, LogsPipeline, body.toCountingReader(), flds); err != nil { errs = append(errs, err) droppedRecords = append(droppedRecords, currentRecords...) } @@ -260,7 +397,7 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, e // sendMetrics sends metrics in right format basing on the s.config.MetricFormat func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, error) { var ( - body strings.Builder + body = newBodyBuilder() errs []error droppedRecords []metricPair currentRecords []metricPair @@ -287,31 +424,24 @@ func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, er continue } - ar, err := s.appendAndSend(ctx, formattedLine, MetricsPipeline, &body, flds) + sent, err := s.appendAndMaybeSend(ctx, []string{formattedLine}, MetricsPipeline, &body, flds) if err != nil { errs = append(errs, err) - if ar.sent { + if sent { droppedRecords = append(droppedRecords, currentRecords...) } - - if !ar.appended { - droppedRecords = append(droppedRecords, record) - } } // If data was sent, cleanup the currentTimeSeries counter - if ar.sent { + if sent { currentRecords = currentRecords[:0] } - // If log has been appended to body, increment the currentTimeSeries - if ar.appended { - currentRecords = append(currentRecords, record) - } + currentRecords = append(currentRecords, record) } if body.Len() > 0 { - if err := s.send(ctx, MetricsPipeline, strings.NewReader(body.String()), flds); err != nil { + if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), flds); err != nil { errs = append(errs, err) droppedRecords = append(droppedRecords, currentRecords...) } @@ -320,42 +450,36 @@ func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, er return droppedRecords, errors.Join(errs...) } -// appendAndSend appends line to the request body that will be sent and sends -// the accumulated data if the internal logBuffer has been filled (with maxBufferSize elements). -// It returns appendResponse -func (s *sender) appendAndSend( +// appendAndMaybeSend appends line to the request body that will be sent and sends +// the accumulated data if the internal logBuffer has been filled (with config.MaxRequestBodySize bytes). +// It returns a boolean indicating if the data was sent and an error +func (s *sender) appendAndMaybeSend( ctx context.Context, - line string, + lines []string, pipeline PipelineType, - body *strings.Builder, + body *bodyBuilder, flds fields, -) (appendResponse, error) { - var errs []error - ar := newAppendResponse() +) (sent bool, err error) { + + linesTotalLength := 0 + for _, line := range lines { + linesTotalLength += len(line) + 1 // count the newline as well + } - if body.Len() > 0 && body.Len()+len(line) >= s.config.MaxRequestBodySize { - ar.sent = true - errs = append(errs, s.send(ctx, pipeline, strings.NewReader(body.String()), flds)) + if body.Len() > 0 && body.Len()+linesTotalLength >= s.config.MaxRequestBodySize { + sent = true + err = s.send(ctx, pipeline, body.toCountingReader(), flds) body.Reset() } if body.Len() > 0 { // Do not add newline if the body is empty - if _, err := body.WriteString("\n"); err != nil { - errs = append(errs, err) - ar.appended = false - } + body.addNewLine() } - if ar.appended { - // Do not append new line if separator was not appended - if _, err := body.WriteString(line); err != nil { - errs = append(errs, err) - ar.appended = false - } - } + body.addLines(lines) - return ar, errors.Join(errs...) + return sent, err } // cleanLogsBuffer zeroes logBuffer @@ -405,3 +529,93 @@ func (s *sender) batchMetric(ctx context.Context, metric metricPair, metadata fi func (s *sender) countMetrics() int { return len(s.metricBuffer) } + +func (s *sender) addSourcesHeaders(req *http.Request, flds fields) { + if s.sources.host.isSet() { + req.Header.Add(headerHost, s.sources.host.format(flds)) + } + + if s.sources.name.isSet() { + req.Header.Add(headerName, s.sources.name.format(flds)) + } + + if s.sources.category.isSet() { + req.Header.Add(headerCategory, s.sources.category.format(flds)) + } +} + +func addLogsHeaders(req *http.Request, _ LogFormatType, flds fields) { + req.Header.Add(headerContentType, contentTypeLogs) + + if fieldsStr := flds.string(); fieldsStr != "" { + req.Header.Add(headerFields, fieldsStr) + } +} + +func addMetricsHeaders(req *http.Request, mf MetricFormatType) error { + switch mf { + case PrometheusFormat: + req.Header.Add(headerContentType, contentTypePrometheus) + case Carbon2Format: + req.Header.Add(headerContentType, contentTypeCarbon2) + case GraphiteFormat: + req.Header.Add(headerContentType, contentTypeGraphite) + default: + return fmt.Errorf("unsupported metrics format: %s", mf) + } + return nil +} + +func (s *sender) addRequestHeaders(req *http.Request, pipeline PipelineType, flds fields) error { + req.Header.Add(headerClient, s.config.Client) + s.addSourcesHeaders(req, flds) + + // Add headers + switch s.config.CompressEncoding { + case GZIPCompression: + req.Header.Set(headerContentEncoding, contentEncodingGzip) + case DeflateCompression: + req.Header.Set(headerContentEncoding, contentEncodingDeflate) + case NoCompression: + default: + return fmt.Errorf("invalid content encoding: %s", s.config.CompressEncoding) + } + + switch pipeline { + case LogsPipeline: + addLogsHeaders(req, s.config.LogFormat, flds) + case MetricsPipeline: + if err := addMetricsHeaders(req, s.config.MetricFormat); err != nil { + return err + } + default: + return fmt.Errorf("unexpected pipeline: %v", pipeline) + } + return nil +} + +func (s *sender) recordMetrics(duration time.Duration, count int64, req *http.Request, resp *http.Response, pipeline PipelineType) { + statusCode := 0 + + if resp != nil { + statusCode = resp.StatusCode + } + + id := s.id.String() + + if err := observability.RecordRequestsDuration(duration, statusCode, req.URL.String(), string(pipeline), id); err != nil { + s.logger.Debug("error for recording metric for request duration", zap.Error(err)) + } + + if err := observability.RecordRequestsBytes(req.ContentLength, statusCode, req.URL.String(), string(pipeline), id); err != nil { + s.logger.Debug("error for recording metric for sent bytes", zap.Error(err)) + } + + if err := observability.RecordRequestsRecords(count, statusCode, req.URL.String(), string(pipeline), id); err != nil { + s.logger.Debug("error for recording metric for sent records", zap.Error(err)) + } + + if err := observability.RecordRequestsSent(statusCode, req.URL.String(), string(pipeline), id); err != nil { + s.logger.Debug("error for recording metric for sent request", zap.Error(err)) + } +} diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go index c278659cb5be..e84b42b38db5 100644 --- a/exporter/sumologicexporter/sender_test.go +++ b/exporter/sumologicexporter/sender_test.go @@ -15,10 +15,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" ) type senderTest struct { @@ -67,10 +69,14 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. err = exp.start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) + logger, err := zap.NewDevelopment() + require.NoError(t, err) + return &senderTest{ srv: testServer, exp: exp, s: newSender( + logger, cfg, &http.Client{ Timeout: cfg.ClientConfig.Timeout, @@ -87,6 +93,7 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. testServer.URL, testServer.URL, gf, + component.ID{}, ), } } @@ -244,7 +251,7 @@ func TestSendLogsSplitFailedOne(t *testing.T) { test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") assert.Equal(t, test.s.logBuffer[0:1], dropped) } @@ -272,7 +279,7 @@ func TestSendLogsSplitFailedAll(t *testing.T) { assert.EqualError( t, err, - "error during sending data: 500 Internal Server Error\nerror during sending data: 404 Not Found", + "failed sending data: status: 500 Internal Server Error\nfailed sending data: status: 404 Not Found", ) assert.Equal(t, test.s.logBuffer[0:2], dropped) } @@ -356,7 +363,7 @@ func TestSendLogsJsonSplitFailedOne(t *testing.T) { test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") assert.Equal(t, test.s.logBuffer[0:1], dropped) } @@ -384,7 +391,7 @@ func TestSendLogsJsonSplitFailedAll(t *testing.T) { assert.EqualError( t, err, - "error during sending data: 500 Internal Server Error\nerror during sending data: 404 Not Found", + "failed sending data: status: 500 Internal Server Error\nfailed sending data: status: 404 Not Found", ) assert.Equal(t, test.s.logBuffer[0:2], dropped) } @@ -519,7 +526,7 @@ func TestInvalidMetricFormat(t *testing.T) { test.s.config.MetricFormat = "invalid" - err := test.s.send(context.Background(), MetricsPipeline, strings.NewReader(""), newFields(pcommon.NewMap())) + err := test.s.send(context.Background(), MetricsPipeline, newCountingReader(0).withString(""), newFields(pcommon.NewMap())) assert.EqualError(t, err, `unsupported metrics format: invalid`) } @@ -527,7 +534,7 @@ func TestInvalidPipeline(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() - err := test.s.send(context.Background(), "invalidPipeline", strings.NewReader(""), newFields(pcommon.NewMap())) + err := test.s.send(context.Background(), "invalidPipeline", newCountingReader(0).withString(""), newFields(pcommon.NewMap())) assert.EqualError(t, err, `unknown pipeline type: invalidPipeline`) } @@ -550,7 +557,7 @@ func TestSendCompressGzip(t *testing.T) { require.NoError(t, err) test.s.compressor = c - reader := strings.NewReader("Some example log") + reader := newCountingReader(0).withString("Some example log") err = test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) require.NoError(t, err) @@ -575,7 +582,7 @@ func TestSendCompressDeflate(t *testing.T) { require.NoError(t, err) test.s.compressor = c - reader := strings.NewReader("Some example log") + reader := newCountingReader(0).withString("Some example log") err = test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) require.NoError(t, err) @@ -586,7 +593,7 @@ func TestCompressionError(t *testing.T) { defer func() { test.srv.Close() }() test.s.compressor = getTestCompressor(errors.New("read error"), nil) - reader := strings.NewReader("Some example log") + reader := newCountingReader(0).withString("Some example log") err := test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) assert.EqualError(t, err, "read error") @@ -597,7 +604,7 @@ func TestInvalidContentEncoding(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.CompressEncoding = "test" - reader := strings.NewReader("Some example log") + reader := newCountingReader(0).withString("Some example log") err := test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) assert.EqualError(t, err, "invalid content encoding: test") @@ -681,7 +688,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 } dropped, err := test.s.sendMetrics(context.Background(), newFields(pcommon.NewMap())) - assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") assert.Equal(t, test.s.metricBuffer[0:1], dropped) } @@ -715,7 +722,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 assert.EqualError( t, err, - "error during sending data: 500 Internal Server Error\nerror during sending data: 404 Not Found", + "failed sending data: status: 500 Internal Server Error\nfailed sending data: status: 404 Not Found", ) assert.Equal(t, test.s.metricBuffer[0:2], dropped) } diff --git a/go.mod b/go.mod index 18224334e627..469734eb84bd 100644 --- a/go.mod +++ b/go.mod @@ -672,8 +672,8 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect - golang.org/x/mod v0.16.0 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect golang.org/x/sync v0.7.0 // indirect @@ -681,7 +681,7 @@ require ( golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/api v0.176.1 // indirect diff --git a/go.sum b/go.sum index f031041fcb13..88a647104c34 100644 --- a/go.sum +++ b/go.sum @@ -1770,8 +1770,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 h1:/RIbNt/Zr7rVhIkQhooTxCxFcdWLGIKnZA4IXNFSrvo= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1800,8 +1800,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= -golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2094,8 +2094,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= +golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=