diff --git a/exporter/jaegerexporter/exporter.go b/exporter/jaegerexporter/exporter.go index 188d4d3d1383..77d481438c01 100644 --- a/exporter/jaegerexporter/exporter.go +++ b/exporter/jaegerexporter/exporter.go @@ -17,10 +17,15 @@ package jaegerexporter import ( "context" "fmt" + "sync" + "time" jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/component" @@ -40,21 +45,23 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter return nil, err } - client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...) + conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...) if err != nil { return nil, err } - collectorServiceClient := jaegerproto.NewCollectorServiceClient(client) - s := &protoGRPCSender{ - logger: logger, - client: collectorServiceClient, - metadata: metadata.New(cfg.GRPCClientSettings.Headers), - waitForReady: cfg.WaitForReady, - } - + collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn) + s := newProtoGRPCSender(logger, + cfg.NameVal, + collectorServiceClient, + metadata.New(cfg.GRPCClientSettings.Headers), + cfg.WaitForReady, + conn, + ) exp, err := exporterhelper.NewTraceExporter( cfg, logger, s.pushTraceData, + exporterhelper.WithStart(s.start), + exporterhelper.WithShutdown(s.shutdown), exporterhelper.WithTimeout(cfg.TimeoutSettings), exporterhelper.WithRetry(cfg.RetrySettings), exporterhelper.WithQueue(cfg.QueueSettings), @@ -66,10 +73,40 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter // protoGRPCSender forwards spans encoded in the jaeger proto // format, to a grpc server. type protoGRPCSender struct { + name string logger *zap.Logger client jaegerproto.CollectorServiceClient metadata metadata.MD waitForReady bool + + conn stateReporter + connStateReporterInterval time.Duration + stateChangeCallbacks []func(connectivity.State) + + stopCh chan (struct{}) + stopped bool + stopLock sync.Mutex +} + +func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender { + s := &protoGRPCSender{ + name: name, + logger: logger, + client: cl, + metadata: md, + waitForReady: waitForReady, + + conn: conn, + connStateReporterInterval: time.Second, + + stopCh: make(chan (struct{})), + } + s.AddStateChangeCallback(s.onStateChange) + return s +} + +type stateReporter interface { + GetState() connectivity.State } func (s *protoGRPCSender) pushTraceData( @@ -100,3 +137,59 @@ func (s *protoGRPCSender) pushTraceData( return 0, nil } + +func (s *protoGRPCSender) shutdown(context.Context) error { + s.stopLock.Lock() + s.stopped = true + s.stopLock.Unlock() + close(s.stopCh) + return nil +} + +func (s *protoGRPCSender) start(context.Context, component.Host) error { + go s.startConnectionStatusReporter() + return nil +} + +func (s *protoGRPCSender) startConnectionStatusReporter() { + connState := s.conn.GetState() + s.propagateStateChange(connState) + + ticker := time.NewTicker(s.connStateReporterInterval) + for { + select { + case <-ticker.C: + s.stopLock.Lock() + if s.stopped { + s.stopLock.Unlock() + return + } + + st := s.conn.GetState() + if connState != st { + // state has changed, report it + connState = st + s.propagateStateChange(st) + } + s.stopLock.Unlock() + case <-s.stopCh: + return + } + } +} + +func (s *protoGRPCSender) propagateStateChange(st connectivity.State) { + for _, callback := range s.stateChangeCallbacks { + callback(st) + } +} + +func (s *protoGRPCSender) onStateChange(st connectivity.State) { + mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name)) + stats.Record(mCtx, mLastConnectionState.M(int64(st))) + s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st)) +} + +func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) { + s.stateChangeCallbacks = append(s.stateChangeCallbacks, f) +} diff --git a/exporter/jaegerexporter/exporter_test.go b/exporter/jaegerexporter/exporter_test.go index 2d383ddf319a..77c8d3f7c320 100644 --- a/exporter/jaegerexporter/exporter_test.go +++ b/exporter/jaegerexporter/exporter_test.go @@ -20,6 +20,7 @@ import ( "path" "sync" "testing" + "time" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" @@ -27,9 +28,11 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/pdata" @@ -248,6 +251,83 @@ func TestMutualTLS(t *testing.T) { assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID) } +func TestConnectionStateChange(t *testing.T) { + var state connectivity.State + + wg := sync.WaitGroup{} + sr := &mockStateReporter{ + state: connectivity.Connecting, + } + sender := &protoGRPCSender{ + logger: zap.NewNop(), + stopCh: make(chan (struct{})), + conn: sr, + connStateReporterInterval: 10 * time.Millisecond, + } + + wg.Add(1) + sender.AddStateChangeCallback(func(c connectivity.State) { + state = c + wg.Done() + }) + + sender.start(context.Background(), componenttest.NewNopHost()) + defer sender.shutdown(context.Background()) + wg.Wait() // wait for the initial state to be propagated + + // test + wg.Add(1) + sr.SetState(connectivity.Ready) + + // verify + wg.Wait() // wait until we get the state change + assert.Equal(t, connectivity.Ready, state) +} + +func TestConnectionReporterEndsOnStopped(t *testing.T) { + sr := &mockStateReporter{ + state: connectivity.Connecting, + } + + sender := &protoGRPCSender{ + logger: zap.NewNop(), + stopCh: make(chan (struct{})), + conn: sr, + connStateReporterInterval: 10 * time.Millisecond, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + sender.startConnectionStatusReporter() + wg.Done() + }() + + sender.stopLock.Lock() + sender.stopped = true + sender.stopLock.Unlock() + + // if the test finishes, we are good... if it gets blocked, the conn status reporter didn't return when the sender was marked as stopped + wg.Wait() +} + +type mockStateReporter struct { + state connectivity.State + mu sync.RWMutex +} + +func (m *mockStateReporter) GetState() connectivity.State { + m.mu.RLock() + st := m.state + m.mu.RUnlock() + return st +} +func (m *mockStateReporter) SetState(st connectivity.State) { + m.mu.Lock() + m.state = st + m.mu.Unlock() +} + func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) { server := grpc.NewServer(opts...) lis, err := net.Listen("tcp", "localhost:0") diff --git a/exporter/jaegerexporter/metrics.go b/exporter/jaegerexporter/metrics.go new file mode 100644 index 000000000000..c351ee2d92fb --- /dev/null +++ b/exporter/jaegerexporter/metrics.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaegerexporter + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless) + vLastConnectionState = &view.View{ + Name: mLastConnectionState.Name(), + Measure: mLastConnectionState, + Description: mLastConnectionState.Description(), + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ + tag.MustNewKey("exporter_name"), + }, + } +) + +// MetricViews return the metrics views according to given telemetry level. +func MetricViews() []*view.View { + return []*view.View{vLastConnectionState} +} diff --git a/exporter/jaegerexporter/metrics_test.go b/exporter/jaegerexporter/metrics_test.go new file mode 100644 index 000000000000..27d5171ceff1 --- /dev/null +++ b/exporter/jaegerexporter/metrics_test.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaegerexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProcessorMetrics(t *testing.T) { + expectedViewNames := []string{ + "jaegerexporter_conn_state", + } + + views := MetricViews() + for i, viewName := range expectedViewNames { + assert.Equal(t, viewName, views[i].Name) + } +} diff --git a/service/telemetry.go b/service/telemetry.go index 2cd67da87614..1b4c944e81ea 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/exporter/jaegerexporter" "go.opentelemetry.io/collector/internal/collector/telemetry" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/processor" @@ -62,12 +63,14 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u } var views []*view.View - views = append(views, obsreport.Configure(level)...) - views = append(views, processor.MetricViews()...) views = append(views, batchprocessor.MetricViews()...) + views = append(views, fluentobserv.MetricViews()...) + views = append(views, jaegerexporter.MetricViews()...) views = append(views, kafkareceiver.MetricViews()...) + views = append(views, obsreport.Configure(level)...) views = append(views, processMetricsViews.Views()...) - views = append(views, fluentobserv.MetricViews()...) + views = append(views, processor.MetricViews()...) + tel.views = views if err = view.Register(views...); err != nil { return err