Skip to content

Commit

Permalink
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Browse files Browse the repository at this point in the history
…tor into consumererror_tidy
  • Loading branch information
Aneurysm9 committed Mar 23, 2021
2 parents 3f37d32 + c81a01b commit ebd3673
Show file tree
Hide file tree
Showing 16 changed files with 143 additions and 21 deletions.
4 changes: 2 additions & 2 deletions config/configauth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewAuthenticator(cfg Authentication) (Authenticator, error) {
return newOIDCAuthenticator(cfg)
}

func defaultUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate authenticateFunc) (interface{}, error) {
func defaultUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate authenticateFunc) (interface{}, error) {
headers, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMetadataNotFound
Expand All @@ -77,7 +77,7 @@ func defaultUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Un
return handler(ctx, req)
}

func defaultStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate authenticateFunc) error {
func defaultStreamInterceptor(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate authenticateFunc) error {
ctx := stream.Context()
headers, ok := metadata.FromIncomingContext(ctx)
if !ok {
Expand Down
52 changes: 52 additions & 0 deletions config/internal/configsource/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configsource

import (
"context"
)

// ConfigSource is the interface to be implemented by objects used by the collector
// to retrieve external configuration information.
type ConfigSource interface {
// NewSession must create a Session object that will be used to inject data into
// a configuration.
//
// The Session object should use its creation according to their ConfigSource needs:
// lock resources, suspend background tasks, etc. An implementation, for instance,
// can use the creation of the Session object to prevent torn configurations,
// by acquiring a lock (or some other mechanism) that prevents concurrent changes to the
// configuration during the middle of a session.
//
// The code managing the returned Session object must guarantee that the object is not used
// concurrently.
NewSession(ctx context.Context) (Session, error)
}

// Session is the interface used to inject configuration data from a ConfigSource. A Session
// object is created from a ConfigSource. The code using Session objects must guarantee that
// methods of a single instance are not called concurrently.
type Session interface {
// Apply goes to the configuration source, and according to the specified selector and
// parameters retrieves a configuration value that is injected into the configuration.
//
// The selector is a string that is required on all invocations, the params are optional.
Apply(ctx context.Context, selector string, params interface{}) (interface{}, error)

// Close signals that the object won't be used anymore to inject data into a configuration.
// Each Session object should use this call according to their needs: release resources,
// close communication channels, etc.
Close(ctx context.Context) error
}
2 changes: 1 addition & 1 deletion exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func exportMessageAsLine(e *fileExporter, message proto.Message) error {
return nil
}

func (e *fileExporter) Start(ctx context.Context, host component.Host) error {
func (e *fileExporter) Start(context.Context, component.Host) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type mockAccumulator struct {
metrics []pdata.Metric
}

func (a *mockAccumulator) Accumulate(rm pdata.ResourceMetrics) (n int) {
func (a *mockAccumulator) Accumulate(pdata.ResourceMetrics) (n int) {
return 0
}
func (a *mockAccumulator) Collect() []pdata.Metric {

func (a *mockAccumulator) Collect() []pdata.Metric {
return a.metrics
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *errorCheckCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zap
}
return ce
}
func (c *errorCheckCore) Write(ent zapcore.Entry, field []zapcore.Field) error {
func (c *errorCheckCore) Write(ent zapcore.Entry, _ []zapcore.Field) error {
if ent.Level == zapcore.ErrorLevel {
c.errorMessages = append(c.errorMessages, ent.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusexporter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ func (pe *prometheusExporter) ConsumeMetrics(ctx context.Context, md pdata.Metri
return nil
}

func (pe *prometheusExporter) Shutdown(ctx context.Context) error {
func (pe *prometheusExporter) Shutdown(_ context.Context) error {
return pe.shutdownFunc()
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ type processHandlesMock struct {
handles []*processHandleMock
}

func (p *processHandlesMock) Pid(index int) int32 {
func (p *processHandlesMock) Pid(int) int32 {
return 1
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type customUnamarshaller struct {

var _ Unmarshaller = (*customUnamarshaller)(nil)

func (c customUnamarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) {
func (c customUnamarshaller) Unmarshal([]byte) (pdata.Traces, error) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func (t testConsumerGroupSession) GenerationID() int32 {
panic("implement me")
}

func (t testConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) {
panic("implement me")
}

func (t testConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) {
panic("implement me")
}

func (t testConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {
func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {
}

func (t testConsumerGroupSession) Context() context.Context {
Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric

// Start is the method that starts Prometheus scraping and it
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
func (r *pReceiver) Start(_ context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
r.cancelFunc = cancel

Expand Down
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func New(config *Config, nextConsumer consumer.Traces) (*ZipkinReceiver, error)
}

// Start spins up the receiver's HTTP server and makes the receiver start its processing.
func (zr *ZipkinReceiver) Start(ctx context.Context, host component.Host) error {
func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error {
if host == nil {
return errors.New("nil host")
}
Expand Down
4 changes: 2 additions & 2 deletions service/internal/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (exps Exporters) StartAll(ctx context.Context, host component.Host) error {
for _, exp := range exps {
exp.logger.Info("Exporter is starting...")

if err := exp.Start(ctx, host); err != nil {
if err := exp.Start(ctx, newHostWrapper(host, exp.logger)); err != nil {
return err
}
exp.logger.Info("Exporter started.")
Expand Down Expand Up @@ -299,7 +299,7 @@ func (eb *exportersBuilder) buildExporter(
}
}

eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name()))
eb.logger.Info("Exporter was built.", zap.String("exporter", config.Name()))

return exporter, nil
}
Expand Down
2 changes: 1 addition & 1 deletion service/internal/builder/extensions_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (exts Extensions) StartAll(ctx context.Context, host component.Host) error
for _, ext := range exts {
ext.logger.Info("Extension is starting...")

if err := ext.Start(ctx, host); err != nil {
if err := ext.Start(ctx, newHostWrapper(host, ext.logger)); err != nil {
return err
}

Expand Down
40 changes: 40 additions & 0 deletions service/internal/builder/host_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
)

// hostWrapper adds behavior on top of the component.Host being passed when starting the built components.
type hostWrapper struct {
component.Host
*zap.Logger
}

func newHostWrapper(host component.Host, logger *zap.Logger) component.Host {
return &hostWrapper{
host,
logger,
}
}

func (hw *hostWrapper) ReportFatalError(err error) {
// The logger from the built component already identifies the component.
hw.Logger.Error("Component fatal error", zap.Error(err))
hw.Host.ReportFatalError(err)
}
29 changes: 29 additions & 0 deletions service/internal/builder/host_wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"errors"
"testing"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
)

func Test_newHostWrapper(t *testing.T) {
hw := newHostWrapper(componenttest.NewNopHost(), zap.NewNop())
hw.ReportFatalError(errors.New("test error"))
}
5 changes: 3 additions & 2 deletions service/internal/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ type BuiltPipelines map[*configmodels.Pipeline]*builtPipeline
func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error {
for _, bp := range bps {
bp.logger.Info("Pipeline is starting...")
hostWrapper := newHostWrapper(host, bp.logger)
// Start in reverse order, starting from the back of processors pipeline.
// This is important so that processors that are earlier in the pipeline and
// reference processors that are later in the pipeline do not start sending
// data to later pipelines which are not yet started.
for i := len(bp.processors) - 1; i >= 0; i-- {
if err := bp.processors[i].Start(ctx, host); err != nil {
if err := bp.processors[i].Start(ctx, hostWrapper); err != nil {
return err
}
}
Expand Down Expand Up @@ -200,7 +201,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf

pipelineLogger := pb.logger.With(zap.String("pipeline_name", pipelineCfg.Name),
zap.String("pipeline_datatype", string(pipelineCfg.InputType)))
pipelineLogger.Info("Pipeline is enabled.")
pipelineLogger.Info("Pipeline was built.")

bp := &builtPipeline{
pipelineLogger,
Expand Down
4 changes: 2 additions & 2 deletions service/internal/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
for _, rcv := range rcvs {
rcv.logger.Info("Receiver is starting...")

if err := rcv.Start(ctx, host); err != nil {
if err := rcv.Start(ctx, newHostWrapper(host, rcv.logger)); err != nil {
return err
}
rcv.logger.Info("Receiver started.")
Expand Down Expand Up @@ -227,7 +227,7 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
}
rcv.receiver = createdReceiver

logger.Info("Receiver is enabled.", zap.String("datatype", string(dataType)))
logger.Info("Receiver was built.", zap.String("datatype", string(dataType)))

return nil
}
Expand Down

0 comments on commit ebd3673

Please sign in to comment.