Skip to content

Commit

Permalink
datadogexporter: Retry per network call (#6412)
Browse files Browse the repository at this point in the history
* [exporter/datadogexporter] Refactor metadata retrier to take RetrySettings into account

* [exporter/datadogexporter] Do retries per network call on the metrics side

* Address review comment

* Fix lint
  • Loading branch information
mx-psi authored Dec 2, 2021
1 parent bb386a3 commit 13e7a7a
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 44 deletions.
1 change: 1 addition & 0 deletions exporter/datadogexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The exporter will try to retrieve a hostname following the OpenTelemetry semanti

See the sample configuration files under the `example` folder for other available options, as well as an example K8s Manifest.
This exporter also supports the `exporterhelper` queuing, retry and timeout settings documented [here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration).
Retry settings will only affect metrics.

## Trace exporter
### **Important Pipeline Setup Details**
Expand Down
6 changes: 4 additions & 2 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func createMetricsExporter(
pushMetricsFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
exporterhelper.WithRetry(cfg.RetrySettings),
// We use our own custom mechanism for retries, since we hit several endpoints.
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
Expand Down Expand Up @@ -210,7 +211,8 @@ func createTracesExporter(
pushTracesFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
exporterhelper.WithRetry(cfg.RetrySettings),
// We don't do retries on traces because of deduping concerns on APM Events.
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
Expand Down
14 changes: 7 additions & 7 deletions exporter/datadogexporter/internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes/gcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/system"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils"
)

Expand Down Expand Up @@ -174,19 +175,17 @@ func pushMetadata(cfg *config.Config, buildInfo component.BuildInfo, metadata *H
return nil
}

func pushMetadataWithRetry(params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) {
const maxRetries = 5

func pushMetadataWithRetry(retrier *utils.Retrier, params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) {
params.Logger.Debug("Sending host metadata payload", zap.Any("payload", hostMetadata))

numRetries, err := utils.DoWithRetries(maxRetries, func() error {
err := retrier.DoWithRetries(context.Background(), func(context.Context) error {
return pushMetadata(cfg, params.BuildInfo, hostMetadata)
})

if err != nil {
params.Logger.Warn("Sending host metadata failed", zap.Error(err))
} else {
params.Logger.Info("Sent host metadata", zap.Int("retries", numRetries))
params.Logger.Info("Sent host metadata")
}

}
Expand All @@ -197,6 +196,7 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
defer params.Logger.Debug("Shut down host metadata routine")
retrier := utils.NewRetrier(params.Logger, cfg.RetrySettings, scrub.NewScrubber())

// Get host metadata from resources and fill missing info using our exporter.
// Currently we only retrieve it once but still send the same payload
Expand All @@ -212,14 +212,14 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c
fillHostMetadata(params, cfg, hostMetadata)

// Run one first time at startup
pushMetadataWithRetry(params, cfg, hostMetadata)
pushMetadataWithRetry(retrier, params, cfg, hostMetadata)

for {
select {
case <-ctx.Done():
return
case <-ticker.C: // Send host metadata
pushMetadataWithRetry(params, cfg, hostMetadata)
pushMetadataWithRetry(retrier, params, cfg, hostMetadata)
}
}
}
16 changes: 0 additions & 16 deletions exporter/datadogexporter/internal/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,3 @@ func SetDDHeaders(reqHeader http.Header, buildInfo component.BuildInfo, apiKey s
reqHeader.Set("DD-Api-Key", apiKey)
reqHeader.Set("User-Agent", UserAgent(buildInfo))
}

// DoWithRetries repeats a fallible action up to `maxRetries` times
// with exponential backoff
func DoWithRetries(maxRetries int, fn func() error) (i int, err error) {
wait := 1 * time.Second
for i = 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return
}
time.Sleep(wait)
wait = 2 * wait
}

return
}
11 changes: 0 additions & 11 deletions exporter/datadogexporter/internal/utils/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package utils

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
)

Expand All @@ -44,12 +42,3 @@ func TestDDHeaders(t *testing.T) {
assert.Equal(t, header.Get("USer-Agent"), "otelcontribcol/1.0")

}

func TestDoWithRetries(t *testing.T) {
i, err := DoWithRetries(3, func() error { return nil })
require.NoError(t, err)
assert.Equal(t, i, 0)

_, err = DoWithRetries(1, func() error { return errors.New("action failed") })
require.Error(t, err)
}
97 changes: 97 additions & 0 deletions exporter/datadogexporter/internal/utils/retrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils"

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
)

type Retrier struct {
cfg exporterhelper.RetrySettings
logger *zap.Logger
scrubber scrub.Scrubber
}

func NewRetrier(logger *zap.Logger, settings exporterhelper.RetrySettings, scrubber scrub.Scrubber) *Retrier {
return &Retrier{
cfg: settings,
logger: logger,
scrubber: scrubber,
}
}

// DoWithRetries does a function with retries. This is a condensed version of the code on
// the exporterhelper, which we reuse here since we want custom retry logic.
func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) error) error {
if !r.cfg.Enabled {
return fn(ctx)
}

// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := backoff.ExponentialBackOff{
InitialInterval: r.cfg.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: r.cfg.MaxInterval,
MaxElapsedTime: r.cfg.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
retryNum := int64(0)
for {
err := fn(ctx)
if err == nil {
return nil
}

err = r.scrubber.Scrub(err)

if consumererror.IsPermanent(err) {
return err
}

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
err = fmt.Errorf("max elapsed time expired %w", err)
return err
}

backoffDelayStr := backoffDelay.String()
r.logger.Info(
"Request failed. Will retry the request after interval.",
zap.Error(err),
zap.String("interval", backoffDelayStr),
)
retryNum++

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-time.After(backoffDelay):
}
}
}
49 changes: 49 additions & 0 deletions exporter/datadogexporter/internal/utils/retrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
)

func TestDoWithRetries(t *testing.T) {
scrubber := scrub.NewScrubber()
retrier := NewRetrier(zap.NewNop(), exporterhelper.DefaultRetrySettings(), scrubber)
ctx := context.Background()

err := retrier.DoWithRetries(ctx, func(context.Context) error { return nil })
require.NoError(t, err)

retrier = NewRetrier(zap.NewNop(),
exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Millisecond,
MaxInterval: 30 * time.Millisecond,
MaxElapsedTime: 100 * time.Millisecond,
},
scrubber,
)
err = retrier.DoWithRetries(ctx, func(context.Context) error { return errors.New("action failed") })
require.Error(t, err)
}
27 changes: 19 additions & 8 deletions exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
"gopkg.in/zorkian/go-datadog-api.v2"

Expand All @@ -42,6 +43,7 @@ type metricsExporter struct {
client *datadog.Client
tr *translator.Translator
scrubber scrub.Scrubber
retrier *utils.Retrier
}

// assert `hostProvider` implements HostnameProvider interface
Expand Down Expand Up @@ -104,13 +106,15 @@ func newMetricsExporter(ctx context.Context, params component.ExporterCreateSett
return nil, err
}

scrubber := scrub.NewScrubber()
return &metricsExporter{
params: params,
cfg: cfg,
ctx: ctx,
client: client,
tr: tr,
scrubber: scrub.NewScrubber(),
scrubber: scrubber,
retrier: utils.NewRetrier(params.Logger, cfg.RetrySettings, scrubber),
}, nil
}

Expand Down Expand Up @@ -173,17 +177,24 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pdata.Metric
ms, sl := consumer.All(pushTime, exp.params.BuildInfo)
metrics.ProcessMetrics(ms, exp.cfg)

err = nil
if len(ms) > 0 {
if err := exp.client.PostMetrics(ms); err != nil {
return err
}
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(context.Context) error {
return exp.client.PostMetrics(ms)
}),
)
}

if len(sl) > 0 {
if err := exp.pushSketches(ctx, sl); err != nil {
return err
}
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error {
return exp.pushSketches(ctx, sl)
}),
)
}

return nil
return err
}

0 comments on commit 13e7a7a

Please sign in to comment.