Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sumologicexporter] Enable metrics pipeline #2117

Merged
merged 11 commits into from
Feb 20, 2021
3 changes: 2 additions & 1 deletion exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Empty string means no compression
- `max_request_body_size` (optional): Max HTTP request body size in bytes before compression (if applied). By default 1MB is used.
- `metadata_attributes` (optional): List of regexes for attributes which should be send as metadata
- `log_format` (optional) (logs only): Format to use when sending logs to Sumo. (default `json`) (possible values: `json`, `text`)
- `metric_format` (optional) (metrics only): Format of the metrics to be sent, either graphite, carbon2 or prometheus (default is carbon2).
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`).
`carbon2` and `graphite` are going to be supported soon.
- `source_category` (optional): Desired source category. Useful if you want to override the source category configured for the source.
- `source_name` (optional): Desired source name. Useful if you want to override the source name configured for the source.
- `source_host` (optional): Desired host name. Useful if you want to override the source host configured for the source.
Expand Down
4 changes: 2 additions & 2 deletions exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
LogFormat LogFormatType `mapstructure:"log_format"`

// Metrics related configuration
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is carbon2)
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus)
MetricFormat MetricFormatType `mapstructure:"metric_format"`

// List of regexes for attributes which should be send as metadata
Expand Down Expand Up @@ -114,7 +114,7 @@ const (
// DefaultLogFormat defines default LogFormat
DefaultLogFormat LogFormatType = JSONFormat
// DefaultMetricFormat defines default MetricFormat
DefaultMetricFormat MetricFormatType = Carbon2Format
DefaultMetricFormat MetricFormatType = PrometheusFormat
// DefaultSourceCategory defines default SourceCategory
DefaultSourceCategory string = ""
// DefaultSourceName defines default SourceName
Expand Down
142 changes: 131 additions & 11 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type sumologicexporter struct {
sources sourceFormats
config *Config
client *http.Client
filter filter
sources sourceFormats
config *Config
client *http.Client
filter filter
prometheusFormatter prometheusFormatter
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -72,16 +73,22 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, err
}

pf, err := newPrometheusFormatter()
if err != nil {
return nil, err
}

httpClient, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
}

se := &sumologicexporter{
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
prometheusFormatter: pf,
}

return se, nil
Expand All @@ -108,6 +115,27 @@ func newLogsExporter(
)
}

func newMetricsExporter(
cfg *Config,
params component.ExporterCreateParams,
) (component.MetricsExporter, error) {
se, err := initExporter(cfg)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
se.pushMetricsData,
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// pushLogsData groups data with common metadata and sends them as separate batched requests.
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
Expand All @@ -124,7 +152,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
if err != nil {
return 0, consumererror.PartialLogsError(fmt.Errorf("failed to initialize compressor: %w", err), ld)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c)
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceLogs
rls := ld.ResourceLogs()
Expand Down Expand Up @@ -159,15 +187,15 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanBuffer()
sdr.cleanLogsBuffer()
}

// assign metadata
previousMetadata = currentMetadata

// add log to the buffer
var dropped []pdata.LogRecord
dropped, err = sdr.batch(ctx, log, previousMetadata)
dropped, err = sdr.batchLog(ctx, log, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
Expand Down Expand Up @@ -202,3 +230,95 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i

return 0, nil
}

// pushMetricsData groups data with common metadata and send them as separate batched requests
// it returns number of unsent metrics and error which contains list of dropped records
// so they can be handle by the OTC retry mechanism
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) {
var (
currentMetadata fields
previousMetadata fields
errs []error
droppedRecords []metricPair
attributes pdata.AttributeMap
)

c, err := newCompressor(se.config.CompressEncoding)
if err != nil {
return 0, consumererror.PartialMetricsError(fmt.Errorf("failed to initialize compressor: %w", err), md)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceMetrics
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)

attributes = rm.Resource().Attributes()

// iterate over InstrumentationLibraryMetrics
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)

// iterate over Metrics
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
mp := metricPair{
metric: m,
attributes: attributes,
}

currentMetadata = sdr.filter.filterIn(attributes)

// If metadata differs from currently buffered, flush the buffer
if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" {
var dropped []metricPair
dropped, err = sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanMetricBuffer()
}

// assign metadata
previousMetadata = currentMetadata
var dropped []metricPair
// add metric to the buffer
dropped, err = sdr.batchMetric(ctx, mp, currentMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}
}
}
}

// Flush pending metrics
dropped, err := sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}

if len(droppedRecords) > 0 {
// Move all dropped records to Metrics
droppedMetrics := pdata.NewMetrics()
rms := droppedMetrics.ResourceMetrics()
rms.Resize(len(droppedRecords))
for num, record := range droppedRecords {
rm := droppedMetrics.ResourceMetrics().At(num)
record.attributes.CopyTo(rm.Resource().Attributes())

ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilms.At(0).Metrics().Append(record.metric)
}

return len(droppedRecords), consumererror.PartialMetricsError(componenterror.CombineErrors(errs), droppedMetrics)
}

return 0, nil
}
Loading