-
Notifications
You must be signed in to change notification settings - Fork 705
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
18 changed files
with
1,415 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package kit | ||
|
||
import ( | ||
"path" | ||
"time" | ||
|
||
"context" | ||
|
||
"github.com/go-kit/kit/log" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
// UnaryClientInterceptor returns a new unary client interceptor that optionally logs the execution of external gRPC calls. | ||
func UnaryClientInterceptor(logger log.Logger, opts ...Option) grpc.UnaryClientInterceptor { | ||
o := evaluateClientOpt(opts) | ||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { | ||
fields := newClientLoggerFields(ctx, method) | ||
startTime := time.Now() | ||
err := invoker(ctx, method, req, reply, cc, opts...) | ||
logFinalClientLine(o, log.With(logger, fields...), startTime, err, "finished client unary call") | ||
return err | ||
} | ||
} | ||
|
||
// StreamClientInterceptor returns a new streaming client interceptor that optionally logs the execution of external gRPC calls. | ||
func StreamClientInterceptor(logger log.Logger, opts ...Option) grpc.StreamClientInterceptor { | ||
o := evaluateClientOpt(opts) | ||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
fields := newClientLoggerFields(ctx, method) | ||
startTime := time.Now() | ||
clientStream, err := streamer(ctx, desc, cc, method, opts...) | ||
logFinalClientLine(o, log.With(logger, fields...), startTime, err, "finished client streaming call") | ||
return clientStream, err | ||
} | ||
} | ||
|
||
func logFinalClientLine(o *options, logger log.Logger, startTime time.Time, err error, msg string) { | ||
code := o.codeFunc(err) | ||
logger = o.levelFunc(code, logger) | ||
args := []interface{}{"msg", msg, "error", err, "grpc.code", code.String()} | ||
args = append(args, o.durationFunc(time.Since(startTime))...) | ||
logger.Log(args...) | ||
} | ||
|
||
func newClientLoggerFields(ctx context.Context, fullMethodString string) []interface{} { | ||
service := path.Dir(fullMethodString)[1:] | ||
method := path.Base(fullMethodString) | ||
return []interface{}{ | ||
"system", "grpc", | ||
"span.kind", "client", | ||
"grpc.service", service, | ||
"grpc.method", method, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package kit_test | ||
|
||
import ( | ||
"io" | ||
"runtime" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
grpc_kit "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" | ||
pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/stretchr/testify/suite" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
) | ||
|
||
func customClientCodeToLevel(c codes.Code, logger log.Logger) log.Logger { | ||
if c == codes.Unauthenticated { | ||
// Make this a special case for tests, and an error. | ||
return level.Error(logger) | ||
} | ||
return grpc_kit.DefaultClientCodeToLevel(c, logger) | ||
} | ||
|
||
func TestKitClientSuite(t *testing.T) { | ||
opts := []grpc_kit.Option{ | ||
grpc_kit.WithLevels(customClientCodeToLevel), | ||
} | ||
b := newKitBaseSuite(t) | ||
b.logger = level.NewFilter(b.logger, level.AllowDebug()) // a lot of our stuff is on debug level by default | ||
b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{ | ||
grpc.WithUnaryInterceptor(grpc_kit.UnaryClientInterceptor(b.logger, opts...)), | ||
grpc.WithStreamInterceptor(grpc_kit.StreamClientInterceptor(b.logger, opts...)), | ||
} | ||
suite.Run(t, &kitClientSuite{b}) | ||
} | ||
|
||
type kitClientSuite struct { | ||
*kitBaseSuite | ||
} | ||
|
||
func (s *kitClientSuite) TestPing() { | ||
_, err := s.Client.Ping(s.SimpleCtx(), goodPing) | ||
assert.NoError(s.T(), err, "there must be not be an on a successful call") | ||
|
||
msgs := s.getOutputJSONs() | ||
require.Len(s.T(), msgs, 1, "one log statement should be logged") | ||
|
||
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") | ||
assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain the correct method name") | ||
assert.Equal(s.T(), msgs[0]["msg"], "finished client unary call", "handler's message must contain the correct message") | ||
assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") | ||
assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") | ||
|
||
assert.Contains(s.T(), msgs[0], "grpc.time_ms", "interceptor log statement should contain execution time (duration in ms)") | ||
} | ||
|
||
func (s *kitClientSuite) TestPingList() { | ||
stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) | ||
require.NoError(s.T(), err, "should not fail on establishing the stream") | ||
for { | ||
_, err := stream.Recv() | ||
if err == io.EOF { | ||
break | ||
} | ||
require.NoError(s.T(), err, "reading stream should not fail") | ||
} | ||
|
||
msgs := s.getOutputJSONs() | ||
require.Len(s.T(), msgs, 1, "one log statement should be logged") | ||
|
||
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") | ||
assert.Equal(s.T(), msgs[0]["grpc.method"], "PingList", "all lines must contain the correct method name") | ||
assert.Equal(s.T(), msgs[0]["msg"], "finished client streaming call", "handler's message must contain the correct message") | ||
assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") | ||
assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") | ||
assert.Contains(s.T(), msgs[0], "grpc.time_ms", "interceptor log statement should contain execution time (duration in ms)") | ||
} | ||
|
||
func (s *kitClientSuite) TestPingError_WithCustomLevels() { | ||
for _, tcase := range []struct { | ||
code codes.Code | ||
level level.Value | ||
msg string | ||
}{ | ||
{ | ||
code: codes.Internal, | ||
level: level.WarnValue(), | ||
msg: "Internal must remap to WarnLevel in DefaultClientCodeToLevel", | ||
}, | ||
{ | ||
code: codes.NotFound, | ||
level: level.DebugValue(), | ||
msg: "NotFound must remap to DebugLevel in DefaultClientCodeToLevel", | ||
}, | ||
{ | ||
code: codes.FailedPrecondition, | ||
level: level.DebugValue(), | ||
msg: "FailedPrecondition must remap to DebugLevel in DefaultClientCodeToLevel", | ||
}, | ||
{ | ||
code: codes.Unauthenticated, | ||
level: level.ErrorValue(), | ||
msg: "Unauthenticated is overwritten to ErrorLevel with customClientCodeToLevel override, which probably didn't work", | ||
}, | ||
} { | ||
s.SetupTest() | ||
_, err := s.Client.PingError( | ||
s.SimpleCtx(), | ||
&pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(tcase.code)}) | ||
|
||
assert.Error(s.T(), err, "each call here must return an error") | ||
|
||
msgs := s.getOutputJSONs() | ||
require.Len(s.T(), msgs, 1, "only a single log message is printed") | ||
|
||
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") | ||
assert.Equal(s.T(), msgs[0]["grpc.method"], "PingError", "all lines must contain the correct method name") | ||
assert.Equal(s.T(), msgs[0]["grpc.code"], tcase.code.String(), "all lines must contain a grpc code") | ||
assert.Equal(s.T(), msgs[0]["level"], tcase.level.String(), tcase.msg) | ||
} | ||
} | ||
|
||
func TestKitClientOverrideSuite(t *testing.T) { | ||
if strings.HasPrefix(runtime.Version(), "go1.7") { | ||
t.Skip("Skipping due to json.RawMessage incompatibility with go1.7") | ||
return | ||
} | ||
opts := []grpc_kit.Option{ | ||
grpc_kit.WithDurationField(grpc_kit.DurationToDurationField), | ||
} | ||
b := newKitBaseSuite(t) | ||
b.logger = level.NewFilter(b.logger, level.AllowDebug()) // a lot of our stuff is on debug level by default | ||
b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{ | ||
grpc.WithUnaryInterceptor(grpc_kit.UnaryClientInterceptor(b.logger, opts...)), | ||
grpc.WithStreamInterceptor(grpc_kit.StreamClientInterceptor(b.logger, opts...)), | ||
} | ||
suite.Run(t, &kitClientOverrideSuite{b}) | ||
} | ||
|
||
type kitClientOverrideSuite struct { | ||
*kitBaseSuite | ||
} | ||
|
||
func (s *kitClientOverrideSuite) TestPing_HasOverrides() { | ||
_, err := s.Client.Ping(s.SimpleCtx(), goodPing) | ||
assert.NoError(s.T(), err, "there must be not be an on a successful call") | ||
|
||
msgs := s.getOutputJSONs() | ||
require.Len(s.T(), msgs, 1, "one log statement should be logged") | ||
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") | ||
assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain the correct method name") | ||
assert.Equal(s.T(), msgs[0]["msg"], "finished client unary call", "handler's message must contain the correct message") | ||
|
||
assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "message must not contain default duration") | ||
assert.Contains(s.T(), msgs[0], "grpc.duration", "message must contain overridden duration") | ||
} | ||
|
||
func (s *kitClientOverrideSuite) TestPingList_HasOverrides() { | ||
stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) | ||
require.NoError(s.T(), err, "should not fail on establishing the stream") | ||
for { | ||
_, err := stream.Recv() | ||
if err == io.EOF { | ||
break | ||
} | ||
require.NoError(s.T(), err, "reading stream should not fail") | ||
} | ||
|
||
msgs := s.getOutputJSONs() | ||
require.Len(s.T(), msgs, 1, "one log statement should be logged") | ||
|
||
assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") | ||
assert.Equal(s.T(), msgs[0]["grpc.method"], "PingList", "all lines must contain the correct method name") | ||
assert.Equal(s.T(), msgs[0]["msg"], "finished client streaming call", "log message must be correct") | ||
assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") | ||
assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") | ||
|
||
assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "message must not contain default duration") | ||
assert.Contains(s.T(), msgs[0], "grpc.duration", "message must contain overridden duration") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package ctxkit | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-kit/kit/log" | ||
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" | ||
) | ||
|
||
type ctxMarker struct{} | ||
|
||
type ctxLogger struct { | ||
logger log.Logger | ||
fields []interface{} | ||
} | ||
|
||
var ( | ||
ctxMarkerKey = &ctxMarker{} | ||
) | ||
|
||
// AddFields adds fields to the logger. | ||
func AddFields(ctx context.Context, fields ...interface{}) { | ||
l, ok := ctx.Value(ctxMarkerKey).(*ctxLogger) | ||
if !ok || l == nil { | ||
return | ||
} | ||
l.fields = append(l.fields, fields...) | ||
} | ||
|
||
// Extract takes the call-scoped Logger from grpc_kit middleware. | ||
// | ||
// It always returns a Logger that has all the grpc_ctxtags updated. | ||
func Extract(ctx context.Context) log.Logger { | ||
l, ok := ctx.Value(ctxMarkerKey).(*ctxLogger) | ||
if !ok || l == nil { | ||
return log.NewNopLogger() | ||
} | ||
// Add grpc_ctxtags tags metadata until now. | ||
fields := TagsToFields(ctx) | ||
return log.With(l.logger, append(fields, l.fields...)...) | ||
} | ||
|
||
// TagsToFields transforms the Tags on the supplied context into kit fields. | ||
func TagsToFields(ctx context.Context) []interface{} { | ||
var fields []interface{} | ||
tags := grpc_ctxtags.Extract(ctx) | ||
for k, v := range tags.Values() { | ||
fields = append(fields, k, v) | ||
} | ||
return fields | ||
} | ||
|
||
// ToContext adds the kit.Logger to the context for extraction later. | ||
// Returning the new context that has been created. | ||
func ToContext(ctx context.Context, logger log.Logger) context.Context { | ||
l := &ctxLogger{ | ||
logger: logger, | ||
} | ||
return context.WithValue(ctx, ctxMarkerKey, l) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/* | ||
`ctxkit` is a ctxlogger that is backed by go-kit | ||
It accepts a user-configured `log.Logger` that will be used for logging. The same `log.Logger` will | ||
be populated into the `context.Context` passed into gRPC handler code. | ||
You can use `ctxkit.Extract` to log into a request-scoped `log.Logger` instance in your handler code. | ||
As `ctxkit.Extract` will iterate all tags on from `grpc_ctxtags` it is therefore expensive so it is advised that you | ||
extract once at the start of the function from the context and reuse it for the remainder of the function (see examples). | ||
Please see examples and tests for examples of use. | ||
*/ | ||
package ctxkit |
Oops, something went wrong.