Skip to content

Commit

Permalink
planner, executor: add PARAMS column to the output of `show traffic j…
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Mar 4, 2025
1 parent 6b4d85b commit bbb9623
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 32 deletions.
29 changes: 22 additions & 7 deletions pkg/executor/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ var tiproxyAddrKey tiproxyAddrKeyType
var trafficStoreKey trafficStoreKeyType

type trafficJob struct {
Instance string `json:"-"` // not passed from TiProxy
Type string `json:"type"`
Status string `json:"status"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time,omitempty"`
Progress string `json:"progress"`
Err string `json:"error,omitempty"`
Instance string `json:"-"` // not passed from TiProxy
Type string `json:"type"`
Status string `json:"status"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time,omitempty"`
Progress string `json:"progress"`
Err string `json:"error,omitempty"`
Output string `json:"output,omitempty"`
Duration string `json:"duration,omitempty"`
Compress bool `json:"compress,omitempty"`
EncryptionMethod string `json:"encryption-method,omitempty"`
Input string `json:"input,omitempty"`
Username string `json:"username,omitempty"`
Speed float64 `json:"speed,omitempty"`
ReadOnly bool `json:"readonly,omitempty"`
}

const (
Expand Down Expand Up @@ -222,11 +230,18 @@ func (e *TrafficShowExec) Next(ctx context.Context, req *chunk.Chunk) error {
} else {
req.AppendTime(1, parseTime(ctx, e.BaseExecutor, job.EndTime))
}
var params string
if job.Type == "capture" {
params = fmt.Sprintf("OUTPUT=\"%s\", DURATION=\"%s\", COMPRESS=%t, ENCRYPTION_METHOD=\"%s\"", job.Output, job.Duration, job.Compress, job.EncryptionMethod)
} else {
params = fmt.Sprintf("INPUT=\"%s\", USER=\"%s\", SPEED=%f, READ_ONLY=%t", job.Input, job.Username, job.Speed, job.ReadOnly)
}
req.AppendString(2, job.Instance)
req.AppendString(3, job.Type)
req.AppendString(4, job.Progress)
req.AppendString(5, job.Status)
req.AppendString(6, job.Err)
req.AppendString(7, params)
}
return nil
}
Expand Down
66 changes: 43 additions & 23 deletions pkg/executor/traffic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,31 @@ func TestTrafficShow(t *testing.T) {
ctx = fillCtxWithTiProxyAddr(ctx, ports)

marshaledTime1, marshaledTime2 := "2020-01-01T00:00:00Z", "2020-01-01T01:00:00Z"
marshaledJob := `{
marshaledCaptureJob := `{
"type": "capture",
"status": "canceled",
"start_time": "%s",
"end_time": "2020-01-01T02:01:01Z",
"progress": "50%%",
"error": "mock error"
"error": "mock error",
"output": "/tmp/traffic",
"duration": "1m",
"compress": true,
"encryption_method": ""
}`
marshaledReplayJob := `{
"type": "replay",
"status": "running",
"start_time": "%s",
"progress": "50%%",
"input": "s3://bucket/tmp&access-key=xxx&secret-access-key=xxx",
"username": "root",
"speed": 1,
"read_only": true
}`
showTime1, showTime2 := "2020-01-01 00:00:00.000000", "2020-01-01 01:00:00.000000"
showResult := "%s, 2020-01-01 02:01:01.000000, 127.0.0.1:%d, capture, 50%%, canceled, mock error\n"
showCaptureResult := "%s, 2020-01-01 02:01:01.000000, 127.0.0.1:%d, capture, 50%%, canceled, mock error, OUTPUT=\"/tmp/traffic\", DURATION=\"1m\", COMPRESS=true, ENCRYPTION_METHOD=\"\"\n"
showReplayResult := "%s, NULL, 127.0.0.1:%d, replay, 50%%, running, , INPUT=\"s3://bucket/tmp&access-key=xxx&secret-access-key=xxx\", USER=\"root\", SPEED=1.000000, READ_ONLY=false\n"
tests := []struct {
resp []string
chks []string
Expand All @@ -346,34 +361,38 @@ func TestTrafficShow(t *testing.T) {
chks: []string{},
},
{
resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1)), "[]"},
chks: []string{fmt.Sprintf(showResult, showTime1, ports[0])},
resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1)), "[]"},
chks: []string{fmt.Sprintf(showCaptureResult, showTime1, ports[0])},
},
{
resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledReplayJob, marshaledTime1)), "[]"},
chks: []string{fmt.Sprintf(showReplayResult, showTime1, ports[0])},
},
{
resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1)), fmt.Sprintf("[%s]", fmt.Sprintf(marshaledJob, marshaledTime1))},
chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime1, ports[0]), fmt.Sprintf(showResult, showTime1, ports[1]))},
resp: []string{fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1)), fmt.Sprintf("[%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1))},
chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showCaptureResult, showTime1, ports[0]), fmt.Sprintf(showCaptureResult, showTime1, ports[1]))},
},
{
resp: []string{fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledJob, marshaledTime1), fmt.Sprintf(marshaledJob, marshaledTime2)),
fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledJob, marshaledTime1), fmt.Sprintf(marshaledJob, marshaledTime2))},
chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime2, ports[0]), fmt.Sprintf(showResult, showTime2, ports[1])),
fmt.Sprintf("%s%s", fmt.Sprintf(showResult, showTime1, ports[0]), fmt.Sprintf(showResult, showTime1, ports[1]))},
resp: []string{fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1), fmt.Sprintf(marshaledReplayJob, marshaledTime2)),
fmt.Sprintf("[%s,%s]", fmt.Sprintf(marshaledCaptureJob, marshaledTime1), fmt.Sprintf(marshaledReplayJob, marshaledTime2))},
chks: []string{fmt.Sprintf("%s%s", fmt.Sprintf(showReplayResult, showTime2, ports[0]), fmt.Sprintf(showReplayResult, showTime2, ports[1])),
fmt.Sprintf("%s%s", fmt.Sprintf(showCaptureResult, showTime1, ports[0]), fmt.Sprintf(showCaptureResult, showTime1, ports[1]))},
},
}

for _, test := range tests {
for i, test := range tests {
for j := range test.resp {
handlers[j].setResponse(test.resp[j])
}
executor := suite.build(ctx, "show traffic jobs")
require.NoError(t, executor.Open(ctx))
require.NoError(t, executor.Open(ctx), "case %d", i)
chk := chunk.New(fields, 2, 2)
for j := 0; j < len(test.chks); j++ {
require.NoError(t, executor.Next(ctx, chk))
require.Equal(t, test.chks[j], chk.ToString(fields))
require.NoError(t, executor.Next(ctx, chk), "case %d, %d", i, j)
require.Equal(t, test.chks[j], chk.ToString(fields), "case %d, %d", i, j)
}
require.NoError(t, executor.Next(ctx, chk))
require.Equal(t, 0, chk.NumRows())
require.NoError(t, executor.Next(ctx, chk), "case %d", i)
require.Equal(t, 0, chk.NumRows(), "case %d", i)
}
}

Expand Down Expand Up @@ -572,13 +591,14 @@ func fillCtxWithTiProxyAddr(ctx context.Context, ports []int) context.Context {

func trafficJobFields() []*types.FieldType {
return []*types.FieldType{
types.NewFieldType(mysql.TypeDatetime),
types.NewFieldType(mysql.TypeDate),
types.NewFieldType(mysql.TypeDate),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeVarchar),
types.NewFieldType(mysql.TypeVarchar),
types.NewFieldType(mysql.TypeVarchar),
types.NewFieldType(mysql.TypeVarchar),
types.NewFieldType(mysql.TypeVarchar),
types.NewFieldType(mysql.TypeVarchar),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,14 +3339,15 @@ func buildAddQueryWatchSchema() (*expression.Schema, types.NameSlice) {
}

func buildShowTrafficJobsSchema() (*expression.Schema, types.NameSlice) {
schema := newColumnsWithNames(7)
schema := newColumnsWithNames(8)
schema.Append(buildColumnWithName("", "START_TIME", mysql.TypeDatetime, 19))
schema.Append(buildColumnWithName("", "END_TIME", mysql.TypeDatetime, 19))
schema.Append(buildColumnWithName("", "INSTANCE", mysql.TypeVarchar, 256))
schema.Append(buildColumnWithName("", "TYPE", mysql.TypeVarchar, 32))
schema.Append(buildColumnWithName("", "PROGRESS", mysql.TypeVarchar, 32))
schema.Append(buildColumnWithName("", "STATUS", mysql.TypeVarchar, 32))
schema.Append(buildColumnWithName("", "FAIL_REASON", mysql.TypeVarchar, 256))
schema.Append(buildColumnWithName("", "PARAMS", mysql.TypeVarchar, 4096))

return schema.col2Schema(), schema.names
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func TestTraffic(t *testing.T) {
{
sql: "show traffic jobs",
privs: []string{"TRAFFIC_CAPTURE_ADMIN", "TRAFFIC_REPLAY_ADMIN"},
cols: 7,
cols: 8,
},
{
sql: "cancel traffic jobs",
Expand Down

0 comments on commit bbb9623

Please sign in to comment.