Skip to content

Commit

Permalink
Add propagator option for gRPC instrumentation (#986)
Browse files Browse the repository at this point in the history
* Add propagator option for gRPC instrumentation

* Update CHANGELOG

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
XSAM and MrAlias authored Jul 29, 2020
1 parent 8fbaa9d commit 26e85e1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The Zipkin exporter now has `NewExportPipeline` and `InstallNewPipeline` constructor functions to match the common pattern.
These function build a new exporter with default SDK options and register the exporter with the `global` package respectively. (#944)
- Add propagator option for gRPC instrumentation. (#986)

### Changed

Expand Down
24 changes: 12 additions & 12 deletions instrumentation/grpctrace/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ var (

// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
func UnaryClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
callOpts ...grpc.CallOption,
) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
Expand All @@ -85,12 +85,12 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
)
defer span.End()

Inject(ctx, &metadataCopy)
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)

messageSent.Event(ctx, 1, req)

err := invoker(ctx, method, req, reply, cc, opts...)
err := invoker(ctx, method, req, reply, cc, callOpts...)

messageReceived.Event(ctx, 1, reply)

Expand Down Expand Up @@ -236,14 +236,14 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {

// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
func StreamClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
callOpts ...grpc.CallOption,
) (grpc.ClientStream, error) {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
Expand All @@ -257,10 +257,10 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
trace.WithAttributes(attr...),
)

Inject(ctx, &metadataCopy)
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)

s, err := streamer(ctx, desc, cc, method, opts...)
s, err := streamer(ctx, desc, cc, method, callOpts...)
stream := wrapClientStream(s, desc)

go func() {
Expand All @@ -282,7 +282,7 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {

// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
func UnaryServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
Expand All @@ -292,7 +292,7 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

entries, spanCtx := Extract(ctx, &metadataCopy)
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
Expand Down Expand Up @@ -364,7 +364,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {

// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
func StreamServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
Expand All @@ -376,7 +376,7 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

entries, spanCtx := Extract(ctx, &metadataCopy)
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
Expand Down

0 comments on commit 26e85e1

Please sign in to comment.