Skip to content

Commit

Permalink
Added timeout to the exporter path
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Jun 26, 2020
1 parent 4eca960 commit 5c5a2f5
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 26 deletions.
5 changes: 3 additions & 2 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -71,7 +72,7 @@ func (s *protoGRPCSender) pushTraceData(

batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return td.SpanCount(), consumererror.Permanent(errors.Wrap(err, "failed to push trace data via Jaeger exporter"))
}

if s.metadata.Len() > 0 {
Expand All @@ -84,7 +85,7 @@ func (s *protoGRPCSender) pushTraceData(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))
if err != nil {
return td.SpanCount() - sentSpans, err
return td.SpanCount() - sentSpans, errors.Wrap(err, "failed to push trace data via Jaeger exporter")
}
sentSpans += len(batch.Spans)
}
Expand Down
9 changes: 5 additions & 4 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"contrib.go.opencensus.io/exporter/ocagent"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/pkg/errors"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return len(td.Spans), err
return len(td.Spans), errors.Wrap(err, "failed to push trace data via OpenCensus exporter")
}

err := exporter.ExportTraceServiceRequest(
Expand All @@ -168,7 +169,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
)
oce.exporters <- exporter
if err != nil {
return len(td.Spans), err
return len(td.Spans), errors.Wrap(err, "failed to push trace data via OpenCensus exporter")
}
return 0, nil
}
Expand All @@ -181,7 +182,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), errors.Wrap(err, "failed to push metrics data via OpenCensus exporter")
}

req := &agentmetricspb.ExportMetricsServiceRequest{
Expand All @@ -192,7 +193,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
err := exporter.ExportMetricsServiceRequest(req)
oce.exporters <- exporter
if err != nil {
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), errors.Wrap(err, "failed to push metrics data via OpenCensus exporter")
}
return 0, nil
}
13 changes: 7 additions & 6 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"sync"

"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -193,7 +194,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return td.SpanCount(), err
return td.SpanCount(), errors.Wrap(err, "failed to push trace data via OTLP exporter")
}

// Perform the request.
Expand All @@ -205,7 +206,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return td.SpanCount(), err
return td.SpanCount(), errors.Wrap(err, "failed to push trace data via OTLP exporter")
}
return 0, nil
}
Expand All @@ -219,7 +220,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return imd.MetricCount(), err
return imd.MetricCount(), errors.Wrap(err, "failed to push metrics data via OTLP exporter")
}

// Perform the request.
Expand All @@ -231,7 +232,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return imd.MetricCount(), err
return imd.MetricCount(), errors.Wrap(err, "failed to push metrics data via OTLP exporter")
}
return 0, nil
}
Expand All @@ -244,7 +245,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return logs.LogRecordCount(), err
return logs.LogRecordCount(), errors.Wrap(err, "failed to push log data via OTLP exporter")
}

request := &otlplogs.ExportLogServiceRequest{
Expand All @@ -255,7 +256,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return logs.LogRecordCount(), err
return logs.LogRecordCount(), errors.Wrap(err, "failed to push log data via OTLP exporter")
}
return 0, nil
}
14 changes: 7 additions & 7 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"
"github.com/pkg/errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -96,33 +97,32 @@ func createZipkinExporter(config configmodels.Exporter) (*zipkinExporter, error)
return ze, nil
}

func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))

var resource *resourcepb.Resource = td.Resource

for _, span := range td.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(errors.Wrap(err, "failed to push trace data via Zipkin exporter"))
}
tbatch = append(tbatch, zs)
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(errors.Wrap(err, "failed to push trace data via Zipkin exporter"))
}

req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), err
return len(td.Spans), errors.Wrap(err, "failed to push trace data via Zipkin exporter")
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), err
return len(td.Spans), errors.Wrap(err, "failed to push trace data via Zipkin exporter")
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
Expand Down
6 changes: 4 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() {
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batchTraces.reset()
}

Expand Down
28 changes: 23 additions & 5 deletions processor/fanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package processor

import (
"context"
"time"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -25,6 +26,8 @@ import (
"go.opentelemetry.io/collector/internal/data"
)

const timeout = 5 * time.Second

// This file contains implementations of Trace/Metrics connectors
// that fan out the data to multiple other consumers.

Expand Down Expand Up @@ -63,9 +66,12 @@ var _ consumer.MetricsConsumerOld = (*metricsFanOutConnectorOld)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mfc metricsFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var errs []error
for _, mc := range mfc {
if err := mc.ConsumeMetricsData(ctx, md); err != nil {
if err := mc.ConsumeMetricsData(ctxWithTimeout, md); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -83,9 +89,12 @@ var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var errs []error
for _, mc := range mfc {
if err := mc.ConsumeMetrics(ctx, md); err != nil {
if err := mc.ConsumeMetrics(ctxWithTimeout, md); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -127,9 +136,12 @@ var _ consumer.TraceConsumerOld = (*traceFanOutConnectorOld)(nil)

// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
func (tfc traceFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var errs []error
for _, tc := range tfc {
if err := tc.ConsumeTraceData(ctx, td); err != nil {
if err := tc.ConsumeTraceData(ctxWithTimeout, td); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -147,9 +159,12 @@ var _ consumer.TraceConsumer = (*traceFanOutConnector)(nil)

// ConsumeTraces exports the span data to all trace consumers wrapped by the current one.
func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var errs []error
for _, tc := range tfc {
if err := tc.ConsumeTraces(ctx, td); err != nil {
if err := tc.ConsumeTraces(ctxWithTimeout, td); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -167,9 +182,12 @@ var _ consumer.LogConsumer = (*LogFanOutConnector)(nil)

// Consume exports the span data to all consumers wrapped by the current one.
func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var errs []error
for _, tc := range fc {
if err := tc.ConsumeLogs(ctx, ld); err != nil {
if err := tc.ConsumeLogs(ctxWithTimeout, ld); err != nil {
errs = append(errs, err)
}
}
Expand Down

0 comments on commit 5c5a2f5

Please sign in to comment.