Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sumologicexporter] Enable exporter #1859

2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
Expand Down Expand Up @@ -140,6 +141,7 @@ func components() (component.Factories, error) {
signalfxexporter.NewFactory(),
splunkhecexporter.NewFactory(),
stackdriverexporter.NewFactory(),
sumologicexporter.NewFactory(),
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
Expand Down
5 changes: 4 additions & 1 deletion exporter/sumologicexporter/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"compress/flate"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
)
Expand Down Expand Up @@ -48,8 +49,10 @@ func newCompressor(format CompressEncodingType) (compressor, error) {
if err != nil {
return compressor{}, err
}
default:
case NoCompression:
writer = nil
default:
return compressor{}, fmt.Errorf("invalid format: %s", format)
}

return compressor{
Expand Down
114 changes: 111 additions & 3 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ import (
"context"
"errors"
"fmt"
"net/http"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type sumologicexporter struct {
config *Config
sources sourceFormats
config *Config
client *http.Client
filter filter
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -62,9 +67,21 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, err
}

f, err := newFilter(cfg.MetadataAttributes)
if err != nil {
return nil, err
}

httpClient, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
}

se := &sumologicexporter{
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
}

return se, nil
Expand All @@ -85,12 +102,103 @@ func newLogsExporter(
se.pushLogsData,
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// pushLogsData groups data with common metadata and send them together to Sumo Logic
func (se *sumologicexporter) pushLogsData(context.Context, pdata.Logs) (droppedTimeSeries int, err error) {
// pushLogsData groups data with common metadata and sends them as separate batched requests.
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (int, error) {
var (
currentMetadata fields
previousMetadata fields
errs []error
droppedRecords []pdata.LogRecord
err error
)

c, err := newCompressor(se.config.CompressEncoding)
if err != nil {
return 0, consumererror.PartialLogsError(fmt.Errorf("failed to initialize compressor: %w", err), ld)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c)

// Iterate over ResourceLogs
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)

ills := rl.InstrumentationLibraryLogs()
// iterate over InstrumentationLibraryLogs
for j := 0; j < ills.Len(); j++ {
ill := ills.At(j)

// iterate over Logs
logs := ill.Logs()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

// copy resource attributes into logs attributes
// log attributes have precedence over resource attributes
rl.Resource().Attributes().ForEach(
func(k string, v pdata.AttributeValue) {
log.Attributes().Insert(k, v)
},
)

currentMetadata = sdr.filter.filterIn(log.Attributes())

// If metadata differs from currently buffered, flush the buffer
if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" {
var dropped []pdata.LogRecord
dropped, err = sdr.sendLogs(ctx, previousMetadata)
if err != nil {
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanBuffer()
}

// assign metadata
previousMetadata = currentMetadata

// add log to the buffer
var dropped []pdata.LogRecord
dropped, err = sdr.batch(ctx, log, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}
}
}
}

// Flush pending logs
dropped, err := sdr.sendLogs(ctx, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}

if len(droppedRecords) > 0 {
// Move all dropped records to Logs
droppedLogs := pdata.NewLogs()
rls = droppedLogs.ResourceLogs()
rls.Resize(1)

ills := rls.At(0).InstrumentationLibraryLogs()
ills.Resize(1)
logs := ills.At(0).Logs()

for _, log := range droppedRecords {
logs.Append(log)
}

return len(droppedRecords), consumererror.PartialLogsError(componenterror.CombineErrors(errs), droppedLogs)
}

return 0, nil
}
Loading