Skip to content

Commit

Permalink
Improve connection state logging for Jaeger exporter (#2239)
Browse files Browse the repository at this point in the history
**Description:** 
Adds a connection state reporter to the Jaeger exporter sending state changes to the log and as a metric.

**Link to tracking Issue:** Closes #1861, as it's the best we can do at this exporter level. On a more general level, the collector could provide more verbose info about the underlying gRPC connections. See #2237 for more information.

**Testing:** unit and manual tests.

**Documentation:** None.

Here are the complete logs for an OpenTelemetry Collector that got started without a Jaeger backend available. Note that the config.yaml contains two exporters, one (`jaeger`) with the `insecure: true` option, and a second one (`jaeger/2`) without this option. A Jaeger all-in-one is then started, without TLS support. This ends with the first exporter eventually transitioning to "READY", but the second won't.
```
2020-12-02T11:17:26.116+0100	INFO	service/service.go:409	Starting OpenTelemetry Collector...	{"Version": "latest", "GitHash": "<NOT PROPERLY GENERATED>", "NumCPU": 12}
2020-12-02T11:17:26.116+0100	INFO	service/service.go:253	Setting up own telemetry...
2020-12-02T11:17:26.130+0100	INFO	service/telemetry.go:101	Serving Prometheus metrics	{"address": "localhost:8888", "level": 0, "service.instance.id": "b4f3bc7c-2593-48e5-9ef9-8b585bbcf4fc"}
2020-12-02T11:17:26.131+0100	INFO	service/service.go:290	Loading configuration...
2020-12-02T11:17:26.131+0100	INFO	service/service.go:301	Applying configuration...
2020-12-02T11:17:26.131+0100	INFO	service/service.go:322	Starting extensions...
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:306	Exporter is enabled.	{"component_kind": "exporter", "exporter": "jaeger"}
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:306	Exporter is enabled.	{"component_kind": "exporter", "exporter": "jaeger/2"}
2020-12-02T11:17:26.132+0100	INFO	service/service.go:337	Starting exporters...
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:92	Exporter is starting...	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger"}
2020-12-02T11:17:26.132+0100	INFO	jaegerexporter/exporter.go:183	State of the connection with the Jaeger Collector backend	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "CONNECTING"}
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:97	Exporter started.	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger"}
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:92	Exporter is starting...	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2"}
2020-12-02T11:17:26.132+0100	INFO	jaegerexporter/exporter.go:183	State of the connection with the Jaeger Collector backend	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2", "state": "CONNECTING"}
2020-12-02T11:17:26.132+0100	INFO	builder/exporters_builder.go:97	Exporter started.	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2"}
2020-12-02T11:17:26.132+0100	INFO	builder/pipelines_builder.go:207	Pipeline is enabled.	{"pipeline_name": "traces", "pipeline_datatype": "traces"}
2020-12-02T11:17:26.132+0100	INFO	service/service.go:350	Starting processors...
2020-12-02T11:17:26.132+0100	INFO	builder/pipelines_builder.go:51	Pipeline is starting...	{"pipeline_name": "traces", "pipeline_datatype": "traces"}
2020-12-02T11:17:26.132+0100	INFO	builder/pipelines_builder.go:61	Pipeline is started.	{"pipeline_name": "traces", "pipeline_datatype": "traces"}
2020-12-02T11:17:26.132+0100	INFO	builder/receivers_builder.go:235	Receiver is enabled.	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp", "datatype": "traces"}
2020-12-02T11:17:26.132+0100	INFO	service/service.go:362	Starting receivers...
2020-12-02T11:17:26.132+0100	INFO	builder/receivers_builder.go:70	Receiver is starting...	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"}
2020-12-02T11:17:26.132+0100	INFO	otlpreceiver/otlp.go:93	Starting GRPC server on endpoint 0.0.0.0:4317	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"}
2020-12-02T11:17:26.132+0100	INFO	otlpreceiver/otlp.go:130	Setting up a second GRPC listener on legacy endpoint 0.0.0.0:55680	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"}
2020-12-02T11:17:26.132+0100	INFO	otlpreceiver/otlp.go:93	Starting GRPC server on endpoint 0.0.0.0:55680	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"}
2020-12-02T11:17:26.132+0100	INFO	builder/receivers_builder.go:75	Receiver started.	{"component_kind": "receiver", "component_type": "otlp", "component_name": "otlp"}
2020-12-02T11:17:26.132+0100	INFO	service/service.go:265	Everything is ready. Begin running and processing data.
2020-12-02T11:17:27.132+0100	INFO	jaegerexporter/exporter.go:183	State of the connection with the Jaeger Collector backend	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger/2", "state": "TRANSIENT_FAILURE"}
2020-12-02T11:17:28.132+0100	INFO	jaegerexporter/exporter.go:183	State of the connection with the Jaeger Collector backend	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "TRANSIENT_FAILURE"}
2020-12-02T11:17:43.132+0100	INFO	jaegerexporter/exporter.go:183	State of the connection with the Jaeger Collector backend	{"component_kind": "exporter", "component_type": "jaeger", "component_name": "jaeger", "state": "READY"}
```

And here are the metrics for the final state:

```
# HELP otelcol_jaegerexporter_conn_state Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown
# TYPE otelcol_jaegerexporter_conn_state gauge
otelcol_jaegerexporter_conn_state{exporter_name="jaeger",service_instance_id="341f6179-0c34-4ad1-b2e5-19b2bed4c9d1"} 2
otelcol_jaegerexporter_conn_state{exporter_name="jaeger/2",service_instance_id="341f6179-0c34-4ad1-b2e5-19b2bed4c9d1"} 3
```

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Jan 22, 2021
1 parent 8f75efd commit bfca1eb
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 12 deletions.
111 changes: 102 additions & 9 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ package jaegerexporter
import (
"context"
"fmt"
"sync"
"time"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
Expand All @@ -40,21 +45,23 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
return nil, err
}

client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
if err != nil {
return nil, err
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
s := &protoGRPCSender{
logger: logger,
client: collectorServiceClient,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
waitForReady: cfg.WaitForReady,
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
s := newProtoGRPCSender(logger,
cfg.NameVal,
collectorServiceClient,
metadata.New(cfg.GRPCClientSettings.Headers),
cfg.WaitForReady,
conn,
)
exp, err := exporterhelper.NewTraceExporter(
cfg, logger, s.pushTraceData,
exporterhelper.WithStart(s.start),
exporterhelper.WithShutdown(s.shutdown),
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
Expand All @@ -66,10 +73,40 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
// protoGRPCSender forwards spans encoded in the jaeger proto
// format, to a grpc server.
type protoGRPCSender struct {
name string
logger *zap.Logger
client jaegerproto.CollectorServiceClient
metadata metadata.MD
waitForReady bool

conn stateReporter
connStateReporterInterval time.Duration
stateChangeCallbacks []func(connectivity.State)

stopCh chan (struct{})
stopped bool
stopLock sync.Mutex
}

func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender {
s := &protoGRPCSender{
name: name,
logger: logger,
client: cl,
metadata: md,
waitForReady: waitForReady,

conn: conn,
connStateReporterInterval: time.Second,

stopCh: make(chan (struct{})),
}
s.AddStateChangeCallback(s.onStateChange)
return s
}

type stateReporter interface {
GetState() connectivity.State
}

func (s *protoGRPCSender) pushTraceData(
Expand Down Expand Up @@ -100,3 +137,59 @@ func (s *protoGRPCSender) pushTraceData(

return 0, nil
}

func (s *protoGRPCSender) shutdown(context.Context) error {
s.stopLock.Lock()
s.stopped = true
s.stopLock.Unlock()
close(s.stopCh)
return nil
}

func (s *protoGRPCSender) start(context.Context, component.Host) error {
go s.startConnectionStatusReporter()
return nil
}

func (s *protoGRPCSender) startConnectionStatusReporter() {
connState := s.conn.GetState()
s.propagateStateChange(connState)

ticker := time.NewTicker(s.connStateReporterInterval)
for {
select {
case <-ticker.C:
s.stopLock.Lock()
if s.stopped {
s.stopLock.Unlock()
return
}

st := s.conn.GetState()
if connState != st {
// state has changed, report it
connState = st
s.propagateStateChange(st)
}
s.stopLock.Unlock()
case <-s.stopCh:
return
}
}
}

func (s *protoGRPCSender) propagateStateChange(st connectivity.State) {
for _, callback := range s.stateChangeCallbacks {
callback(st)
}
}

func (s *protoGRPCSender) onStateChange(st connectivity.State) {
mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name))
stats.Record(mCtx, mLastConnectionState.M(int64(st)))
s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st))
}

func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) {
s.stateChangeCallbacks = append(s.stateChangeCallbacks, f)
}
80 changes: 80 additions & 0 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import (
"path"
"sync"
"testing"
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -248,6 +251,83 @@ func TestMutualTLS(t *testing.T) {
assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID)
}

func TestConnectionStateChange(t *testing.T) {
var state connectivity.State

wg := sync.WaitGroup{}
sr := &mockStateReporter{
state: connectivity.Connecting,
}
sender := &protoGRPCSender{
logger: zap.NewNop(),
stopCh: make(chan (struct{})),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
}

wg.Add(1)
sender.AddStateChangeCallback(func(c connectivity.State) {
state = c
wg.Done()
})

sender.start(context.Background(), componenttest.NewNopHost())
defer sender.shutdown(context.Background())
wg.Wait() // wait for the initial state to be propagated

// test
wg.Add(1)
sr.SetState(connectivity.Ready)

// verify
wg.Wait() // wait until we get the state change
assert.Equal(t, connectivity.Ready, state)
}

func TestConnectionReporterEndsOnStopped(t *testing.T) {
sr := &mockStateReporter{
state: connectivity.Connecting,
}

sender := &protoGRPCSender{
logger: zap.NewNop(),
stopCh: make(chan (struct{})),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
sender.startConnectionStatusReporter()
wg.Done()
}()

sender.stopLock.Lock()
sender.stopped = true
sender.stopLock.Unlock()

// if the test finishes, we are good... if it gets blocked, the conn status reporter didn't return when the sender was marked as stopped
wg.Wait()
}

type mockStateReporter struct {
state connectivity.State
mu sync.RWMutex
}

func (m *mockStateReporter) GetState() connectivity.State {
m.mu.RLock()
st := m.state
m.mu.RUnlock()
return st
}
func (m *mockStateReporter) SetState(st connectivity.State) {
m.mu.Lock()
m.state = st
m.mu.Unlock()
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
server := grpc.NewServer(opts...)
lis, err := net.Listen("tcp", "localhost:0")
Expand Down
39 changes: 39 additions & 0 deletions exporter/jaegerexporter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaegerexporter

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless)
vLastConnectionState = &view.View{
Name: mLastConnectionState.Name(),
Measure: mLastConnectionState,
Description: mLastConnectionState.Description(),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{
tag.MustNewKey("exporter_name"),
},
}
)

// MetricViews return the metrics views according to given telemetry level.
func MetricViews() []*view.View {
return []*view.View{vLastConnectionState}
}
32 changes: 32 additions & 0 deletions exporter/jaegerexporter/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaegerexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcessorMetrics(t *testing.T) {
expectedViewNames := []string{
"jaegerexporter_conn_state",
}

views := MetricViews()
for i, viewName := range expectedViewNames {
assert.Equal(t, viewName, views[i].Name)
}
}
9 changes: 6 additions & 3 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/jaegerexporter"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -62,12 +63,14 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
}

var views []*view.View
views = append(views, obsreport.Configure(level)...)
views = append(views, processor.MetricViews()...)
views = append(views, batchprocessor.MetricViews()...)
views = append(views, fluentobserv.MetricViews()...)
views = append(views, jaegerexporter.MetricViews()...)
views = append(views, kafkareceiver.MetricViews()...)
views = append(views, obsreport.Configure(level)...)
views = append(views, processMetricsViews.Views()...)
views = append(views, fluentobserv.MetricViews()...)
views = append(views, processor.MetricViews()...)

tel.views = views
if err = view.Register(views...); err != nil {
return err
Expand Down

0 comments on commit bfca1eb

Please sign in to comment.