Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up consumer/consumererror package. #2768

Merged
merged 6 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Remove ValidateConfig and add Validate on the Config struct (#2665)
- Rename pdata Size to OtlpProtoSize (#2726)
- Rename [Traces|Metrics|Logs]Consumer to [Traces|Metrics|Logs] (#2761)
- Refactored `consumererror` package, eliminating `PartialError` type (#TBD)
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

## 💡 Enhancements 💡

Expand Down
4 changes: 2 additions & 2 deletions config/configcheck/configcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func ValidateConfigFromFactories(factories component.Factories) error {
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// ValidateConfig enforces that given configuration object is following the patterns
Expand Down Expand Up @@ -109,7 +109,7 @@ func validateConfigDataType(t reflect.Type) error {
// reflect.UnsafePointer.
}

if err := consumererror.CombineErrors(errs); err != nil {
if err := consumererror.Combine(errs); err != nil {
return fmt.Errorf(
"type %q from package %q has invalid config settings: %v",
t.Name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ import (
"strings"
)

// CombineErrors converts a list of errors into one error.
func CombineErrors(errs []error) error {
// Combine converts a list of errors into one error.
//
// If any of the errors in errs are Permanent then the returned
// error will also be Permanent.
//
// Any signal data associated with a PartialError will be discarded
// and the resulting error will not be a PartialError.
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
func Combine(errs []error) error {
numErrors := len(errs)
if numErrors == 0 {
// No errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
)

func TestCombineErrors(t *testing.T) {
func TestCombine(t *testing.T) {
testCases := []struct {
errors []error
expected string
Expand Down Expand Up @@ -53,15 +53,15 @@ func TestCombineErrors(t *testing.T) {
}

for _, tc := range testCases {
got := CombineErrors(tc.errors)
got := Combine(tc.errors)
if (got == nil) != tc.expectNil {
t.Errorf("CombineErrors(%v) == nil? Got: %t. Want: %t", tc.errors, got == nil, tc.expectNil)
t.Errorf("Combine(%v) == nil? Got: %t. Want: %t", tc.errors, got == nil, tc.expectNil)
}
if got != nil && tc.expected != got.Error() {
t.Errorf("CombineErrors(%v) = %q. Want: %q", tc.errors, got, tc.expected)
t.Errorf("Combine(%v) = %q. Want: %q", tc.errors, got, tc.expected)
}
if tc.expectedPermanent && !IsPermanent(got) {
t.Errorf("CombineErrors(%v) = %q. Want: consumererror.permanent", tc.errors, got)
t.Errorf("Combine(%v) = %q. Want: consumererror.permanent", tc.errors, got)
}
}
}
68 changes: 0 additions & 68 deletions consumer/consumererror/partialerror.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ type permanent struct {
err error
}

// permanentError exists to test errors for "IsPermanent"
var permanentError = &permanent{}

// Permanent wraps an error to indicate that it is a permanent error, i.e.: an
// error that will be always returned if its source receives the same inputs.
func Permanent(err error) error {
Expand All @@ -42,8 +39,8 @@ func (p permanent) Error() string {
// is used to indicate that a given error will always be returned in the case
// that its sources receives the same input.
func IsPermanent(err error) bool {
if err != nil {
return errors.As(err, permanentError)
if err == nil {
return false
}
return false
return errors.As(err, &permanent{})
}
108 changes: 108 additions & 0 deletions consumer/consumererror/signalerrors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumererror

import (
"errors"

"go.opentelemetry.io/collector/consumer/pdata"
)

// Traces is an error that may carry associated Trace data for a subset of received data
// that faiiled to be processed or sent.
type Traces struct {
error
failed pdata.Traces
}

// NewTraces creates a Traces that can encapsulate received data that failed to be processed or sent.
func NewTraces(err error, failed pdata.Traces) error {
return Traces{
error: err,
failed: failed,
}
}

// AsTraces finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsTraces(err error, target *Traces) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetTraces returns failed traces from the associated error.
func (err Traces) GetTraces() pdata.Traces {
return err.failed
}

// Logs is an error that may carry associated Log data for a subset of received data
// that faiiled to be processed or sent.
type Logs struct {
error
failed pdata.Logs
}

// NewLogs creates a Logs that can encapsulate received data that failed to be processed or sent.
func NewLogs(err error, failed pdata.Logs) error {
return Logs{
error: err,
failed: failed,
}
}

// AsLogs finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsLogs(err error, target *Logs) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetLogs returns failed logs from the associated error.
func (err Logs) GetLogs() pdata.Logs {
return err.failed
}

// Metrics is an error that may carry associated Metrics data for a subset of received data
// that faiiled to be processed or sent.
type Metrics struct {
error
failed pdata.Metrics
}

// NewMetrics creates a Metrics that can encapsulate received data that failed to be processed or sent.
func NewMetrics(err error, failed pdata.Metrics) error {
return Metrics{
error: err,
failed: failed,
}
}

// AsMetrics finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsMetrics(err error, target *Metrics) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetMetrics returns failed metrics from the associated error.
func (err Metrics) GetMetrics() pdata.Metrics {
return err.failed
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,38 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestPartialError(t *testing.T) {
func TestTraces(t *testing.T) {
td := testdata.GenerateTraceDataOneSpan()
err := fmt.Errorf("some error")
partialErr := PartialTracesError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failed)
traceErr := NewTraces(err, td)
assert.Equal(t, err.Error(), traceErr.Error())
target := &Traces{}
assert.False(t, AsTraces(nil, target))
assert.False(t, AsTraces(err, target))
assert.True(t, AsTraces(traceErr, target))
assert.Equal(t, td, target.GetTraces())
}

func TestPartialErrorLogs(t *testing.T) {
func TestLogs(t *testing.T) {
td := testdata.GenerateLogDataOneLog()
err := fmt.Errorf("some error")
partialErr := PartialLogsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedLogs)
logsErr := NewLogs(err, td)
assert.Equal(t, err.Error(), logsErr.Error())
target := &Logs{}
assert.False(t, AsLogs(nil, target))
assert.False(t, AsLogs(err, target))
assert.True(t, AsLogs(logsErr, target))
assert.Equal(t, td, target.GetLogs())
}

func TestPartialErrorMetrics(t *testing.T) {
func TestMetrics(t *testing.T) {
td := testdata.GenerateMetricsOneMetric()
err := fmt.Errorf("some error")
partialErr := PartialMetricsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedMetrics)
metricErr := NewMetrics(err, td)
assert.Equal(t, err.Error(), metricErr.Error())
target := &Metrics{}
assert.False(t, AsMetrics(nil, target))
assert.False(t, AsMetrics(err, target))
assert.True(t, AsMetrics(metricErr, target))
assert.Equal(t, td, target.GetMetrics())
}
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/cloningconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.M
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewTracesCloning wraps multiple traces consumers in a single one and clones the data
Expand Down Expand Up @@ -93,7 +93,7 @@ func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Tra
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewLogsCloning wraps multiple trace consumers in a single one and clones the data
Expand Down Expand Up @@ -130,5 +130,5 @@ func (lfc logsCloningConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) e
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics)
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewTraces wraps multiple trace consumers in a single one.
Expand All @@ -72,7 +72,7 @@ func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) err
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewLogs wraps multiple log consumers in a single one.
Expand All @@ -96,5 +96,5 @@ func (lfc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)

// ComponentSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand All @@ -46,8 +45,9 @@ type request interface {
// setContext updates the Context of the requests.
setContext(context.Context)
export(ctx context.Context) error
// Returns a new request that contains the items left to be sent.
onPartialError(consumererror.PartialError) request
// Returns a new request may contain the items left to be sent if some items failed to process and can be retried.
// Otherwise, it should return the original request.
onError(error) request
// Returns the count of spans/metric points or log records.
count() int
}
Expand Down
Loading