Skip to content

Commit

Permalink
Measure Jaeger request metrics in shared gRPC server (#5207)
Browse files Browse the repository at this point in the history
* beater: add Jaeger metrics to shared gRPC server

Add both Jaeger and OTLP method monitoring maps
to the shared gRPC server.

* systemtest: test request metrics for jaeger
  • Loading branch information
axw authored May 3, 2021
1 parent 28ee810 commit 365e191
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
19 changes: 14 additions & 5 deletions beater/interceptors/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,29 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
)

// Metrics returns a grpc.UnaryServerInterceptor that increments the metrics in
// a supplied registry keyed to its gRPC full method name.
// Metrics returns a grpc.UnaryServerInterceptor that increments metrics
// for gRPC method calls. The full gRPC method name will be used to look
// up a monitoring map in any of the given maps; the last one wins.
func Metrics(
logger *logp.Logger,
registries map[string]map[request.ResultID]*monitoring.Int,
methodMetrics ...map[string]map[request.ResultID]*monitoring.Int,
) grpc.UnaryServerInterceptor {

allMethodMetrics := make(map[string]map[request.ResultID]*monitoring.Int)
for _, methodMetrics := range methodMetrics {
for method, metrics := range methodMetrics {
allMethodMetrics[method] = metrics
}
}

return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
m, prs := registries[info.FullMethod]
if !prs {
m, ok := allMethodMetrics[info.FullMethod]
if !ok {
logger.With(
"grpc.request.method", info.FullMethod,
).Error("metrics registry missing")
Expand Down
2 changes: 1 addition & 1 deletion beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newGRPCServer(
apmInterceptor,
interceptors.ClientMetadata(),
interceptors.Logging(logger),
interceptors.Metrics(logger, otlp.RegistryMonitoringMaps),
interceptors.Metrics(logger, otlp.RegistryMonitoringMaps, jaeger.RegistryMonitoringMaps),
interceptors.Timeout(),
authInterceptor,
),
Expand Down
10 changes: 9 additions & 1 deletion systemtest/jaeger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -45,20 +46,24 @@ func TestJaegerGRPC(t *testing.T) {
GRPCEnabled: true,
GRPCHost: "localhost:0",
}
srv.Config.Monitoring = newFastMonitoringConfig()
err := srv.Start()
require.NoError(t, err)
testJaegerGRPC(t, srv, srv.JaegerGRPCAddr, grpc.WithInsecure())
}

func TestJaegerGRPCMuxed(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewServer(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Monitoring = newFastMonitoringConfig()
require.NoError(t, srv.Start())
testJaegerGRPC(t, srv, serverAddr(srv), grpc.WithInsecure())
}

func TestJaegerGRPCMuxedTLS(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Monitoring = newFastMonitoringConfig()
require.NoError(t, srv.StartTLS())
testJaegerGRPC(t, srv, serverAddr(srv), grpc.WithTransportCredentials(credentials.NewTLS(srv.TLS)))
}
Expand All @@ -74,6 +79,9 @@ func testJaegerGRPC(t *testing.T, srv *apmservertest.Server, addr string, dialOp
_, err = client.PostSpans(context.Background(), request)
require.NoError(t, err)

doc := getBeatsMonitoringStats(t, srv, nil)
assert.Equal(t, int64(1), gjson.GetBytes(doc.RawSource, "beats_stats.metrics.apm-server.jaeger.grpc.collect.request.count").Int())

systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{
estest.TermQuery{Field: "processor.event", Value: "transaction"},
}})
Expand Down
14 changes: 9 additions & 5 deletions systemtest/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ import (

func TestAPMServerMonitoring(t *testing.T) {
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Monitoring = &apmservertest.MonitoringConfig{
Enabled: true,
MetricsPeriod: time.Duration(time.Second),
StatePeriod: time.Duration(time.Second),
}
srv.Config.Monitoring = newFastMonitoringConfig()
err := srv.Start()
require.NoError(t, err)

Expand Down Expand Up @@ -123,3 +119,11 @@ type BeatsState struct {
type BeatsStats struct {
Metrics map[string]interface{} `json:"metrics"`
}

func newFastMonitoringConfig() *apmservertest.MonitoringConfig {
return &apmservertest.MonitoringConfig{
Enabled: true,
MetricsPeriod: 100 * time.Millisecond,
StatePeriod: 100 * time.Millisecond,
}
}
6 changes: 1 addition & 5 deletions systemtest/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ func TestOTLPGRPCTraces(t *testing.T) {
func TestOTLPGRPCMetrics(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Monitoring = &apmservertest.MonitoringConfig{
Enabled: true,
MetricsPeriod: time.Duration(time.Second),
StatePeriod: time.Duration(time.Second),
}
srv.Config.Monitoring = newFastMonitoringConfig()
err := srv.Start()
require.NoError(t, err)

Expand Down
6 changes: 1 addition & 5 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ func TestTailSampling(t *testing.T) {
Policies: []apmservertest.TailSamplingPolicy{{SampleRate: 0.5}},
},
}
srv1.Config.Monitoring = &apmservertest.MonitoringConfig{
Enabled: true,
MetricsPeriod: 100 * time.Millisecond,
StatePeriod: 100 * time.Millisecond,
}
srv1.Config.Monitoring = newFastMonitoringConfig()
require.NoError(t, srv1.Start())

srv2 := apmservertest.NewUnstartedServer(t)
Expand Down

0 comments on commit 365e191

Please sign in to comment.