Skip to content

Commit

Permalink
executor: log trace fields in CoprocessorDAGHandler (#46506)
Browse files Browse the repository at this point in the history
ref #46071, close #46505
  • Loading branch information
lcwangchao authored Sep 4, 2023
1 parent 302786f commit 8b9a90d
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 8 deletions.
24 changes: 24 additions & 0 deletions executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,35 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tipb/go-tipb"
)

func copHandlerCtx(ctx context.Context, req *coprocessor.Request) context.Context {
source := req.Context.SourceStmt
if source == nil {
return ctx
}

traceInfo := &model.TraceInfo{
ConnectionID: source.ConnectionId,
SessionAlias: source.SessionAlias,
}

ctx = tracing.ContextWithTraceInfo(ctx, traceInfo)
ctx = logutil.WithTraceFields(ctx, traceInfo)
return ctx
}

// CoprocessorDAGHandler uses to handle cop dag request.
type CoprocessorDAGHandler struct {
sctx sessionctx.Context
Expand All @@ -50,6 +69,8 @@ func NewCoprocessorDAGHandler(sctx sessionctx.Context) *CoprocessorDAGHandler {

// HandleRequest handles the coprocessor request.
func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coprocessor.Request) *coprocessor.Response {
ctx = copHandlerCtx(ctx, req)

e, err := h.buildDAGExecutor(req)
if err != nil {
return h.buildErrorResponse(err)
Expand Down Expand Up @@ -90,6 +111,9 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces

// HandleStreamRequest handles the coprocessor stream request.
func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *coprocessor.Request, stream tikvpb.Tikv_CoprocessorStreamServer) error {
ctx = copHandlerCtx(ctx, req)
logutil.Logger(ctx).Debug("handle coprocessor stream request")

e, err := h.buildDAGExecutor(req)
if err != nil {
return stream.Send(h.buildErrorResponse(err))
Expand Down
9 changes: 8 additions & 1 deletion store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//store/driver/txn",
"//store/gcworker",
"//util/logutil",
"//util/tracing",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/deadlock",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
Expand All @@ -32,6 +33,7 @@ go_test(
name = "driver_test",
timeout = "short",
srcs = [
"client_test.go",
"config_test.go",
"main_test.go",
"snap_interceptor_test.go",
Expand All @@ -40,21 +42,26 @@ go_test(
],
embed = [":driver"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//domain",
"//kv",
"//parser/model",
"//session",
"//store/copr",
"//store/mockstore",
"//store/mockstore/unistore",
"//testkit/testsetup",
"//util",
"//util/tracing",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_uber_go_goleak//:goleak",
],
)
111 changes: 111 additions & 0 deletions store/driver/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package driver

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/tracing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)

type mockTiKVClient struct {
tikv.Client
mock.Mock
}

func (c *mockTiKVClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
args := c.Called(ctx, addr, req, timeout)
var resp *tikvrpc.Response
if v := args.Get(0); v != nil {
resp = v.(*tikvrpc.Response)
}
return resp, args.Error(1)
}

func TestInjectTracingClient(t *testing.T) {
cases := []struct {
name string
trace *model.TraceInfo
existSourceStmt *kvrpcpb.SourceStmt
}{
{
name: "trace is nil",
trace: nil,
},
{
name: "trace not nil",
trace: &model.TraceInfo{
ConnectionID: 123,
SessionAlias: "alias123",
},
},
{
name: "only connection id in trace valid",
trace: &model.TraceInfo{
ConnectionID: 456,
},
},
{
name: "only session alias in trace valid and sourceStmt exists",
trace: &model.TraceInfo{
SessionAlias: "alias456",
},
existSourceStmt: &kvrpcpb.SourceStmt{},
},
}

cli := &mockTiKVClient{}
inject := injectTraceClient{Client: cli}
for _, c := range cases {
ctx := context.Background()
if c.trace != nil {
ctx = tracing.ContextWithTraceInfo(ctx, c.trace)
}

req := &tikvrpc.Request{}
expectedResp := &tikvrpc.Response{}
verifySendRequest := func(args mock.Arguments) {
inj := args.Get(2).(*tikvrpc.Request)
if c.trace == nil {
require.Nil(t, inj.Context.SourceStmt, c.name)
} else {
require.NotNil(t, inj.Context.SourceStmt, c.name)
require.Equal(t, c.trace.ConnectionID, inj.Context.SourceStmt.ConnectionId, c.name)
require.Equal(t, c.trace.SessionAlias, inj.Context.SourceStmt.SessionAlias, c.name)
}
}

cli.On("SendRequest", ctx, "addr1", req, time.Second).Return(expectedResp, nil).Once().Run(verifySendRequest)
resp, err := inject.SendRequest(ctx, "addr1", req, time.Second)
cli.AssertExpectations(t)
require.NoError(t, err)
require.Same(t, expectedResp, resp)

expectedErr := errors.New("mockErr")
cli.On("SendRequest", ctx, "addr2", req, time.Minute).Return(nil, expectedErr).Once().Run(verifySendRequest)
resp, err = inject.SendRequest(ctx, "addr2", req, time.Minute)
require.Same(t, expectedErr, err)
require.Nil(t, resp)
}
}
22 changes: 21 additions & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
txn_driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv
tikv.WithCodec(codec),
)

s, err = tikv.NewKVStore(uuid, pdClient, spkv, rpcClient, tikv.WithPDHTTPClient(tlsConfig, etcdAddrs))
s, err = tikv.NewKVStore(uuid, pdClient, spkv, &injectTraceClient{Client: rpcClient}, tikv.WithPDHTTPClient(tlsConfig, etcdAddrs))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -406,3 +407,22 @@ func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
func (s *tikvStore) GetCodec() tikv.Codec {
return s.codec
}

// injectTraceClient injects trace info to the tikv request
type injectTraceClient struct {
tikv.Client
}

// SendRequest sends Request.
func (c *injectTraceClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if info := tracing.TraceInfoFromContext(ctx); info != nil {
source := req.Context.SourceStmt
if source == nil {
source = &kvrpcpb.SourceStmt{}
req.Context.SourceStmt = source
}
source.ConnectionId = info.ConnectionID
source.SessionAlias = info.SessionAlias
}
return c.Client.SendRequest(ctx, addr, req, timeout)
}
30 changes: 25 additions & 5 deletions util/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ const (
DefaultTiDBEnableSlowLog = true
)

const (
// LogFieldCategory is the field name for log category
LogFieldCategory = "category"
// LogFieldConn is the field name for connection id in log
LogFieldConn = "conn"
// LogFieldSessionAlias is the field name for session_alias in log
LogFieldSessionAlias = "session_alias"
)

// EmptyFileLogConfig is an empty FileLogConfig.
var EmptyFileLogConfig = FileLogConfig{}

Expand Down Expand Up @@ -215,17 +224,28 @@ func LoggerWithTraceInfo(logger *zap.Logger, info *model.TraceInfo) *zap.Logger

// WithConnID attaches connId to context.
func WithConnID(ctx context.Context, connID uint64) context.Context {
return WithFields(ctx, zap.Uint64("conn", connID))
return WithFields(ctx, zap.Uint64(LogFieldConn, connID))
}

// WithSessionAlias attaches session_alias to context
func WithSessionAlias(ctx context.Context, alias string) context.Context {
return WithFields(ctx, zap.String("session_alias", alias))
return WithFields(ctx, zap.String(LogFieldSessionAlias, alias))
}

// WithCategory attaches category to context.
func WithCategory(ctx context.Context, category string) context.Context {
return WithFields(ctx, zap.String("category", category))
return WithFields(ctx, zap.String(LogFieldCategory, category))
}

// WithTraceFields attaches trace fields to context
func WithTraceFields(ctx context.Context, info *model.TraceInfo) context.Context {
if info == nil {
return WithFields(ctx)
}
return WithFields(ctx,
zap.Uint64(LogFieldConn, info.ConnectionID),
zap.String(LogFieldSessionAlias, info.SessionAlias),
)
}

func fieldsFromTraceInfo(info *model.TraceInfo) []zap.Field {
Expand All @@ -235,11 +255,11 @@ func fieldsFromTraceInfo(info *model.TraceInfo) []zap.Field {

fields := make([]zap.Field, 0, 2)
if info.ConnectionID != 0 {
fields = append(fields, zap.Uint64("conn", info.ConnectionID))
fields = append(fields, zap.Uint64(LogFieldConn, info.ConnectionID))
}

if info.SessionAlias != "" {
fields = append(fields, zap.String("session_alias", info.SessionAlias))
fields = append(fields, zap.String(LogFieldSessionAlias, info.SessionAlias))
}

return fields
Expand Down
9 changes: 8 additions & 1 deletion util/logutil/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@ func TestZapLoggerWithKeys(t *testing.T) {

err = InitLogger(conf)
require.NoError(t, err)
newLogger := LoggerWithTraceInfo(log.L(), &model.TraceInfo{ConnectionID: 456, SessionAlias: "alias789"})
ctx1 = WithTraceFields(context.Background(), &model.TraceInfo{ConnectionID: 456, SessionAlias: "alias789"})
testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithTraceInfoPattern)
err = os.Remove(fileCfg.Filename)
require.NoError(t, err)

err = InitLogger(conf)
require.NoError(t, err)
newLogger := LoggerWithTraceInfo(log.L(), &model.TraceInfo{ConnectionID: 789, SessionAlias: "alias012"})
ctx1 = context.WithValue(context.Background(), CtxLogKey, newLogger)
testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithTraceInfoPattern)
err = os.Remove(fileCfg.Filename)
Expand Down

0 comments on commit 8b9a90d

Please sign in to comment.