From bbb96236a73cb26464e7642833101f82bebfd112 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Tue, 4 Mar 2025 17:17:03 +0800 Subject: [PATCH] planner, executor: add PARAMS column to the output of `show traffic jobs` (#59872) close pingcap/tidb#59871 --- pkg/executor/traffic.go | 29 +++++++++--- pkg/executor/traffic_test.go | 66 ++++++++++++++++++---------- pkg/planner/core/planbuilder.go | 3 +- pkg/planner/core/planbuilder_test.go | 2 +- 4 files changed, 68 insertions(+), 32 deletions(-) diff --git a/pkg/executor/traffic.go b/pkg/executor/traffic.go index c9677f6ea75c8..a43d17c352257 100644 --- a/pkg/executor/traffic.go +++ b/pkg/executor/traffic.go @@ -50,13 +50,21 @@ var tiproxyAddrKey tiproxyAddrKeyType var trafficStoreKey trafficStoreKeyType type trafficJob struct { - Instance string `json:"-"` // not passed from TiProxy - Type string `json:"type"` - Status string `json:"status"` - StartTime string `json:"start_time"` - EndTime string `json:"end_time,omitempty"` - Progress string `json:"progress"` - Err string `json:"error,omitempty"` + Instance string `json:"-"` // not passed from TiProxy + Type string `json:"type"` + Status string `json:"status"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time,omitempty"` + Progress string `json:"progress"` + Err string `json:"error,omitempty"` + Output string `json:"output,omitempty"` + Duration string `json:"duration,omitempty"` + Compress bool `json:"compress,omitempty"` + EncryptionMethod string `json:"encryption-method,omitempty"` + Input string `json:"input,omitempty"` + Username string `json:"username,omitempty"` + Speed float64 `json:"speed,omitempty"` + ReadOnly bool `json:"readonly,omitempty"` } const ( @@ -222,11 +230,18 @@ func (e *TrafficShowExec) Next(ctx context.Context, req *chunk.Chunk) error { } else { req.AppendTime(1, parseTime(ctx, e.BaseExecutor, job.EndTime)) } + var params string + if job.Type == "capture" { + params = fmt.Sprintf("OUTPUT=\"%s\", DURATION=\"%s\", COMPRESS=%t, ENCRYPTION_METHOD=\"%s\"", job.Output, job.Duration, job.Compress, job.EncryptionMethod) + } else { + params = fmt.Sprintf("INPUT=\"%s\", USER=\"%s\", SPEED=%f, READ_ONLY=%t", job.Input, job.Username, job.Speed, job.ReadOnly) + } req.AppendString(2, job.Instance) req.AppendString(3, job.Type) req.AppendString(4, job.Progress) req.AppendString(5, job.Status) req.AppendString(6, job.Err) + req.AppendString(7, params) } return nil } diff --git a/pkg/executor/traffic_test.go b/pkg/executor/traffic_test.go index a7c31294491f2..332f4bd8e145d 100644 --- a/pkg/executor/traffic_test.go +++ b/pkg/executor/traffic_test.go @@ -327,16 +327,31 @@ func TestTrafficShow(t *testing.T) { ctx = fillCtxWithTiProxyAddr(ctx, ports) marshaledTime1, marshaledTime2 := "2020-01-01T00:00:00Z", "2020-01-01T01:00:00Z" - marshaledJob := `{ + marshaledCaptureJob := `{ "type": "capture", "status": "canceled", "start_time": "%s", "end_time": "2020-01-01T02:01:01Z", "progress": "50%%", - "error": "mock error" + "error": "mock error", + "output": "/tmp/traffic", + "duration": "1m", + "compress": true, + "encryption_method": "" + }` + marshaledReplayJob := `{ + "type": "replay", + "status": "running", + "start_time": "%s", + "progress": "50%%", + "input": "s3://bucket/tmp&access-key=xxx&secret-access-key=xxx", + "username": "root", + "speed": 1, + "read_only": true }` showTime1, showTime2 := "2020-01-01 00:00:00.000000", "2020-01-01 01:00:00.000000" - showResult := "%s, 2020-01-01 02:01:01.000000, 127.0.0.1:%d, capture, 50%%, canceled, mock error\n" + showCaptureResult := "%s, 2020-01-01 02:01:01.000000, 127.0.0.1:%d, capture, 50%%, canceled, mock error, OUTPUT=\"/tmp/traffic\", DURATION=\"1m\", COMPRESS=true, ENCRYPTION_METHOD=\"\"\n" + showReplayResult := "%s, NULL, 127.0.0.1:%d, replay, 50%%, running, , INPUT=\"s3://bucket/tmp&access-key=xxx&secret-access-key=xxx\", USER=\"root\", SPEED=1.000000, READ_ONLY=false\n" tests := []struct { resp []string chks []string @@ -346,34 +361,38 @@ func TestTrafficShow(t *testing.T) { chks: []string{}, }, { - resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1)), "[]"}, - chks: []string{fmt.Sprintf(showResult, showTime1, ports[0])}, + resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1)), "[]"}, + chks: []string{fmt.Sprintf(showCaptureResult, showTime1, ports[0])}, + }, + { + resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledReplayJob, marshaledTime1)), "[]"}, + chks: []string{fmt.Sprintf(showReplayResult, showTime1, ports[0])}, }, { - resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1)), fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1))}, - chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime1, ports[0]), fmt.Sprintf(showResult, showTime1, ports[1]))}, + resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1)), fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1))}, + chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showCaptureResult, showTime1, ports[0]), fmt.Sprintf(showCaptureResult, showTime1, ports[1]))}, }, { - resp: []string{fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledJob, marshaledTime1), fmt.Sprintf(marshaledJob, marshaledTime2)), - fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledJob, marshaledTime1), fmt.Sprintf(marshaledJob, marshaledTime2))}, - chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime2, ports[0]), fmt.Sprintf(showResult, showTime2, ports[1])), - fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime1, ports[0]), fmt.Sprintf(showResult, showTime1, ports[1]))}, + resp: []string{fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1), fmt.Sprintf(marshaledReplayJob, marshaledTime2)), + fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1), fmt.Sprintf(marshaledReplayJob, marshaledTime2))}, + chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showReplayResult, showTime2, ports[0]), fmt.Sprintf(showReplayResult, showTime2, ports[1])), + fmt.Sprintf("%s%s", fmt.Sprintf(showCaptureResult, showTime1, ports[0]), fmt.Sprintf(showCaptureResult, showTime1, ports[1]))}, }, } - for _, test := range tests { + for i, test := range tests { for j := range test.resp { handlers[j].setResponse(test.resp[j]) } executor := suite.build(ctx, "show traffic jobs") - require.NoError(t, executor.Open(ctx)) + require.NoError(t, executor.Open(ctx), "case %d", i) chk := chunk.New(fields, 2, 2) for j := 0; j < len(test.chks); j++ { - require.NoError(t, executor.Next(ctx, chk)) - require.Equal(t, test.chks[j], chk.ToString(fields)) + require.NoError(t, executor.Next(ctx, chk), "case %d, %d", i, j) + require.Equal(t, test.chks[j], chk.ToString(fields), "case %d, %d", i, j) } - require.NoError(t, executor.Next(ctx, chk)) - require.Equal(t, 0, chk.NumRows()) + require.NoError(t, executor.Next(ctx, chk), "case %d", i) + require.Equal(t, 0, chk.NumRows(), "case %d", i) } } @@ -572,13 +591,14 @@ func fillCtxWithTiProxyAddr(ctx context.Context, ports []int) context.Context { func trafficJobFields() []*types.FieldType { return []*types.FieldType{ + types.NewFieldType(mysql.TypeDatetime), types.NewFieldType(mysql.TypeDate), - types.NewFieldType(mysql.TypeDate), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeVarchar), + types.NewFieldType(mysql.TypeVarchar), + types.NewFieldType(mysql.TypeVarchar), + types.NewFieldType(mysql.TypeVarchar), + types.NewFieldType(mysql.TypeVarchar), + types.NewFieldType(mysql.TypeVarchar), } } diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 4f644f55ed311..a02c4772fe423 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3339,7 +3339,7 @@ func buildAddQueryWatchSchema() (*expression.Schema, types.NameSlice) { } func buildShowTrafficJobsSchema() (*expression.Schema, types.NameSlice) { - schema := newColumnsWithNames(7) + schema := newColumnsWithNames(8) schema.Append(buildColumnWithName("", "START_TIME", mysql.TypeDatetime, 19)) schema.Append(buildColumnWithName("", "END_TIME", mysql.TypeDatetime, 19)) schema.Append(buildColumnWithName("", "INSTANCE", mysql.TypeVarchar, 256)) @@ -3347,6 +3347,7 @@ func buildShowTrafficJobsSchema() (*expression.Schema, types.NameSlice) { schema.Append(buildColumnWithName("", "PROGRESS", mysql.TypeVarchar, 32)) schema.Append(buildColumnWithName("", "STATUS", mysql.TypeVarchar, 32)) schema.Append(buildColumnWithName("", "FAIL_REASON", mysql.TypeVarchar, 256)) + schema.Append(buildColumnWithName("", "PARAMS", mysql.TypeVarchar, 4096)) return schema.col2Schema(), schema.names } diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index dde836022d8e5..abcb231aec8c9 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -897,7 +897,7 @@ func TestTraffic(t *testing.T) { { sql: "show traffic jobs", privs: []string{"TRAFFIC_CAPTURE_ADMIN", "TRAFFIC_REPLAY_ADMIN"}, - cols: 7, + cols: 8, }, { sql: "cancel traffic jobs",