From bfca1eb624a8db05ce056959bb0d1b00139b8bc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Fri, 22 Jan 2021 17:51:29 +0100 Subject: [PATCH] Improve connection state logging for Jaeger exporter (#2239) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Adds a connection state reporter to the Jaeger exporter sending state changes to the log and as a metric. **Link to tracking Issue:** Closes #1861, as it's the best we can do at this exporter level. On a more general level, the collector could provide more verbose info about the underlying gRPC connections. See #2237 for more information. **Testing:** unit and manual tests. **Documentation:** None. Here are the complete logs for an OpenTelemetry Collector that got started without a Jaeger backend available. Note that the config.yaml contains two exporters, one (`jaeger`) with the `insecure: true` option, and a second one (`jaeger/2`) without this option. A Jaeger all-in-one is then started, without TLS support. This ends with the first exporter eventually transitioning to "READY", but the second won't. ``` 2020-12-02T11:17:26.116+0100 INFO service/service.go:409 Starting OpenTelemetry Collector... {"Version": "latest", "GitHash": "", "NumCPU": 12} 2020-12-02T11:17:26.116+0100 INFO service/service.go:253 Setting up own telemetry... 2020-12-02T11:17:26.130+0100 INFO service/telemetry.go:101 Serving Prometheus metrics {"address": "localhost:8888", "level": 0, "service.instance.id": "b4f3bc7c-2593-48e5-9ef9-8b585bbcf4fc"} 2020-12-02T11:17:26.131+0100 INFO service/service.go:290 Loading configuration... 2020-12-02T11:17:26.131+0100 INFO service/service.go:301 Applying configuration... 2020-12-02T11:17:26.131+0100 INFO service/service.go:322 Starting extensions... 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:306 Exporter is enabled. {"component_kind": "exporter", "exporter": "jaeger"} 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:306 Exporter is enabled. {"component_kind": "exporter", "exporter": "jaeger/2"} 2020-12-02T11:17:26.132+0100 INFO service/service.go:337 Starting exporters... 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:92 Exporter is starting... {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger"} 2020-12-02T11:17:26.132+0100 INFO jaegerexporter/exporter.go:183 State of the connection with the Jaeger Collector backend {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "CONNECTING"} 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:97 Exporter started. {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger"} 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:92 Exporter is starting... {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2"} 2020-12-02T11:17:26.132+0100 INFO jaegerexporter/exporter.go:183 State of the connection with the Jaeger Collector backend {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2", "state": "CONNECTING"} 2020-12-02T11:17:26.132+0100 INFO builder/exporters_builder.go:97 Exporter started. {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2"} 2020-12-02T11:17:26.132+0100 INFO builder/pipelines_builder.go:207 Pipeline is enabled. {"pipeline_name": "traces", "pipeline_datatype": "traces"} 2020-12-02T11:17:26.132+0100 INFO service/service.go:350 Starting processors... 2020-12-02T11:17:26.132+0100 INFO builder/pipelines_builder.go:51 Pipeline is starting... {"pipeline_name": "traces", "pipeline_datatype": "traces"} 2020-12-02T11:17:26.132+0100 INFO builder/pipelines_builder.go:61 Pipeline is started. {"pipeline_name": "traces", "pipeline_datatype": "traces"} 2020-12-02T11:17:26.132+0100 INFO builder/receivers_builder.go:235 Receiver is enabled. {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp", "datatype": "traces"} 2020-12-02T11:17:26.132+0100 INFO service/service.go:362 Starting receivers... 2020-12-02T11:17:26.132+0100 INFO builder/receivers_builder.go:70 Receiver is starting... {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"} 2020-12-02T11:17:26.132+0100 INFO otlpreceiver/otlp.go:93 Starting GRPC server on endpoint 0.0.0.0:4317 {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"} 2020-12-02T11:17:26.132+0100 INFO otlpreceiver/otlp.go:130 Setting up a second GRPC listener on legacy endpoint 0.0.0.0:55680 {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"} 2020-12-02T11:17:26.132+0100 INFO otlpreceiver/otlp.go:93 Starting GRPC server on endpoint 0.0.0.0:55680 {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"} 2020-12-02T11:17:26.132+0100 INFO builder/receivers_builder.go:75 Receiver started. {"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"} 2020-12-02T11:17:26.132+0100 INFO service/service.go:265 Everything is ready. Begin running and processing data. 2020-12-02T11:17:27.132+0100 INFO jaegerexporter/exporter.go:183 State of the connection with the Jaeger Collector backend {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2", "state": "TRANSIENT_FAILURE"} 2020-12-02T11:17:28.132+0100 INFO jaegerexporter/exporter.go:183 State of the connection with the Jaeger Collector backend {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "TRANSIENT_FAILURE"} 2020-12-02T11:17:43.132+0100 INFO jaegerexporter/exporter.go:183 State of the connection with the Jaeger Collector backend {"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "READY"} ``` And here are the metrics for the final state: ``` # HELP otelcol_jaegerexporter_conn_state Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown # TYPE otelcol_jaegerexporter_conn_state gauge otelcol_jaegerexporter_conn_state{exporter_name="jaeger",service_instance_id="341f6179-0c34-4ad1-b2e5-19b2bed4c9d1"} 2 otelcol_jaegerexporter_conn_state{exporter_name="jaeger/2",service_instance_id="341f6179-0c34-4ad1-b2e5-19b2bed4c9d1"} 3 ``` Signed-off-by: Juraci Paixão Kröhling --- exporter/jaegerexporter/exporter.go | 111 +++++++++++++++++++++-- exporter/jaegerexporter/exporter_test.go | 80 ++++++++++++++++ exporter/jaegerexporter/metrics.go | 39 ++++++++ exporter/jaegerexporter/metrics_test.go | 32 +++++++ service/telemetry.go | 9 +- 5 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 exporter/jaegerexporter/metrics.go create mode 100644 exporter/jaegerexporter/metrics_test.go diff --git a/exporter/jaegerexporter/exporter.go b/exporter/jaegerexporter/exporter.go index 188d4d3d1383..77d481438c01 100644 --- a/exporter/jaegerexporter/exporter.go +++ b/exporter/jaegerexporter/exporter.go @@ -17,10 +17,15 @@ package jaegerexporter import ( "context" "fmt" + "sync" + "time" jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/component" @@ -40,21 +45,23 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter return nil, err } - client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...) + conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...) if err != nil { return nil, err } - collectorServiceClient := jaegerproto.NewCollectorServiceClient(client) - s := &protoGRPCSender{ - logger: logger, - client: collectorServiceClient, - metadata: metadata.New(cfg.GRPCClientSettings.Headers), - waitForReady: cfg.WaitForReady, - } - + collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn) + s := newProtoGRPCSender(logger, + cfg.NameVal, + collectorServiceClient, + metadata.New(cfg.GRPCClientSettings.Headers), + cfg.WaitForReady, + conn, + ) exp, err := exporterhelper.NewTraceExporter( cfg, logger, s.pushTraceData, + exporterhelper.WithStart(s.start), + exporterhelper.WithShutdown(s.shutdown), exporterhelper.WithTimeout(cfg.TimeoutSettings), exporterhelper.WithRetry(cfg.RetrySettings), exporterhelper.WithQueue(cfg.QueueSettings), @@ -66,10 +73,40 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter // protoGRPCSender forwards spans encoded in the jaeger proto // format, to a grpc server. type protoGRPCSender struct { + name string logger *zap.Logger client jaegerproto.CollectorServiceClient metadata metadata.MD waitForReady bool + + conn stateReporter + connStateReporterInterval time.Duration + stateChangeCallbacks []func(connectivity.State) + + stopCh chan (struct{}) + stopped bool + stopLock sync.Mutex +} + +func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender { + s := &protoGRPCSender{ + name: name, + logger: logger, + client: cl, + metadata: md, + waitForReady: waitForReady, + + conn: conn, + connStateReporterInterval: time.Second, + + stopCh: make(chan (struct{})), + } + s.AddStateChangeCallback(s.onStateChange) + return s +} + +type stateReporter interface { + GetState() connectivity.State } func (s *protoGRPCSender) pushTraceData( @@ -100,3 +137,59 @@ func (s *protoGRPCSender) pushTraceData( return 0, nil } + +func (s *protoGRPCSender) shutdown(context.Context) error { + s.stopLock.Lock() + s.stopped = true + s.stopLock.Unlock() + close(s.stopCh) + return nil +} + +func (s *protoGRPCSender) start(context.Context, component.Host) error { + go s.startConnectionStatusReporter() + return nil +} + +func (s *protoGRPCSender) startConnectionStatusReporter() { + connState := s.conn.GetState() + s.propagateStateChange(connState) + + ticker := time.NewTicker(s.connStateReporterInterval) + for { + select { + case <-ticker.C: + s.stopLock.Lock() + if s.stopped { + s.stopLock.Unlock() + return + } + + st := s.conn.GetState() + if connState != st { + // state has changed, report it + connState = st + s.propagateStateChange(st) + } + s.stopLock.Unlock() + case <-s.stopCh: + return + } + } +} + +func (s *protoGRPCSender) propagateStateChange(st connectivity.State) { + for _, callback := range s.stateChangeCallbacks { + callback(st) + } +} + +func (s *protoGRPCSender) onStateChange(st connectivity.State) { + mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name)) + stats.Record(mCtx, mLastConnectionState.M(int64(st))) + s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st)) +} + +func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) { + s.stateChangeCallbacks = append(s.stateChangeCallbacks, f) +} diff --git a/exporter/jaegerexporter/exporter_test.go b/exporter/jaegerexporter/exporter_test.go index 2d383ddf319a..77c8d3f7c320 100644 --- a/exporter/jaegerexporter/exporter_test.go +++ b/exporter/jaegerexporter/exporter_test.go @@ -20,6 +20,7 @@ import ( "path" "sync" "testing" + "time" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" @@ -27,9 +28,11 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/pdata" @@ -248,6 +251,83 @@ func TestMutualTLS(t *testing.T) { assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID) } +func TestConnectionStateChange(t *testing.T) { + var state connectivity.State + + wg := sync.WaitGroup{} + sr := &mockStateReporter{ + state: connectivity.Connecting, + } + sender := &protoGRPCSender{ + logger: zap.NewNop(), + stopCh: make(chan (struct{})), + conn: sr, + connStateReporterInterval: 10 * time.Millisecond, + } + + wg.Add(1) + sender.AddStateChangeCallback(func(c connectivity.State) { + state = c + wg.Done() + }) + + sender.start(context.Background(), componenttest.NewNopHost()) + defer sender.shutdown(context.Background()) + wg.Wait() // wait for the initial state to be propagated + + // test + wg.Add(1) + sr.SetState(connectivity.Ready) + + // verify + wg.Wait() // wait until we get the state change + assert.Equal(t, connectivity.Ready, state) +} + +func TestConnectionReporterEndsOnStopped(t *testing.T) { + sr := &mockStateReporter{ + state: connectivity.Connecting, + } + + sender := &protoGRPCSender{ + logger: zap.NewNop(), + stopCh: make(chan (struct{})), + conn: sr, + connStateReporterInterval: 10 * time.Millisecond, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + sender.startConnectionStatusReporter() + wg.Done() + }() + + sender.stopLock.Lock() + sender.stopped = true + sender.stopLock.Unlock() + + // if the test finishes, we are good... if it gets blocked, the conn status reporter didn't return when the sender was marked as stopped + wg.Wait() +} + +type mockStateReporter struct { + state connectivity.State + mu sync.RWMutex +} + +func (m *mockStateReporter) GetState() connectivity.State { + m.mu.RLock() + st := m.state + m.mu.RUnlock() + return st +} +func (m *mockStateReporter) SetState(st connectivity.State) { + m.mu.Lock() + m.state = st + m.mu.Unlock() +} + func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) { server := grpc.NewServer(opts...) lis, err := net.Listen("tcp", "localhost:0") diff --git a/exporter/jaegerexporter/metrics.go b/exporter/jaegerexporter/metrics.go new file mode 100644 index 000000000000..c351ee2d92fb --- /dev/null +++ b/exporter/jaegerexporter/metrics.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaegerexporter + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless) + vLastConnectionState = &view.View{ + Name: mLastConnectionState.Name(), + Measure: mLastConnectionState, + Description: mLastConnectionState.Description(), + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ + tag.MustNewKey("exporter_name"), + }, + } +) + +// MetricViews return the metrics views according to given telemetry level. +func MetricViews() []*view.View { + return []*view.View{vLastConnectionState} +} diff --git a/exporter/jaegerexporter/metrics_test.go b/exporter/jaegerexporter/metrics_test.go new file mode 100644 index 000000000000..27d5171ceff1 --- /dev/null +++ b/exporter/jaegerexporter/metrics_test.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaegerexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProcessorMetrics(t *testing.T) { + expectedViewNames := []string{ + "jaegerexporter_conn_state", + } + + views := MetricViews() + for i, viewName := range expectedViewNames { + assert.Equal(t, viewName, views[i].Name) + } +} diff --git a/service/telemetry.go b/service/telemetry.go index 2cd67da87614..1b4c944e81ea 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/exporter/jaegerexporter" "go.opentelemetry.io/collector/internal/collector/telemetry" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/processor" @@ -62,12 +63,14 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u } var views []*view.View - views = append(views, obsreport.Configure(level)...) - views = append(views, processor.MetricViews()...) views = append(views, batchprocessor.MetricViews()...) + views = append(views, fluentobserv.MetricViews()...) + views = append(views, jaegerexporter.MetricViews()...) views = append(views, kafkareceiver.MetricViews()...) + views = append(views, obsreport.Configure(level)...) views = append(views, processMetricsViews.Views()...) - views = append(views, fluentobserv.MetricViews()...) + views = append(views, processor.MetricViews()...) + tel.views = views if err = view.Register(views...); err != nil { return err