Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: handle scan details & record next wait details #41580

Merged
merged 1 commit into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type lookupTableTask struct {
idxRows *chunk.Chunk
cursor int

doneCh chan error
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDoneTime time.Time
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down Expand Up @@ -789,13 +791,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
var (
enableStats = e.stats != nil
start time.Time
indexFetchedInstant time.Time
)
if enableStats {
start = time.Now()
}
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
if enableStats {
indexFetchedInstant = time.Now()
}
if err := <-task.doneCh; err != nil {
return nil, err
}
if enableStats {
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDoneTime.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDoneTime
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)
}

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
Expand Down Expand Up @@ -1118,6 +1139,11 @@ type IndexLookUpRunTimeStats struct {
TableRowScan int64
TableTaskNum int64
Concurrency int

// Record the `Next` call affected wait duration details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}

func (e *IndexLookUpRunTimeStats) String() string {
Expand All @@ -1141,6 +1167,16 @@ func (e *IndexLookUpRunTimeStats) String() string {
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}

if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}

Expand All @@ -1161,6 +1197,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -1308,6 +1347,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDoneTime = time.Now()
if err != nil {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
Expand Down
23 changes: 15 additions & 8 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {

func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
NextWaitIndexScan: time.Second,
NextWaitTableLookUpBuild: 2 * time.Second,
NextWaitTableLookUpResp: 3 * time.Second,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+
", table_task: {total_time: 2s, num: 2, concurrency: 1}"+
", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+
", table_task: {total_time: 4s, num: 4, concurrency: 1}"+
", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
37 changes: 27 additions & 10 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,15 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
return buildTiDBMemCopTasks(ranges, req)
}

hints := req.FixedRowCountHint
rangesLen := ranges.Len()
// Since ranges from multi partitions may be pushed to one cop iterator,
// the relationship between hints and ranges is probably broken.
// But multi-partitioned ranges and hints should not exist in the same time,
// this check only guarantees there is no out-of-range use.
if len(hints) != rangesLen {
hints = nil
}

// TODO(youjiali1995): is there any request type that needn't be splitted by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
Expand Down Expand Up @@ -345,7 +353,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
nextI := mathutil.Min(i+rangesPerTask, rLen)
hint := -1
// calculate the row count hint
if req.FixedRowCountHint != nil {
if hints != nil {
startKey, endKey := loc.Ranges.At(i).StartKey, loc.Ranges.At(nextI-1).EndKey
// move to the previous range if startKey of current range is lower than endKey of previous location.
// In the following example, task1 will move origRangeIdx to region(i, z).
Expand All @@ -362,7 +370,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
origRangeIdx = nextOrigRangeIdx
break
}
hint += req.FixedRowCountHint[nextOrigRangeIdx]
hint += hints[nextOrigRangeIdx]
}
}
task := &copTask{
Expand Down Expand Up @@ -1160,13 +1168,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.BatchResponses, task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1255,27 +1263,34 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
}
batchResps := resp.pbResp.BatchResponses
worker.sendToRespCh(resp, ch, true)
return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch)
return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch)
}

func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(task.batchTaskList) == 0 {
return remains, nil
}
batchedTasks := task.batchTaskList
task.batchTaskList = nil
batchedRemains, err := worker.handleBatchCopResponse(bo, batchResp, batchedTasks, ch)
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch)
if err != nil {
return nil, err
}
return append(remains, batchedRemains...), nil
}

// handle the batched cop response.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(tasks) == 0 {
return nil, nil
}
// need Addr for recording details.
var dummyRPCCtx *tikv.RPCContext
if rpcCtx != nil {
dummyRPCCtx = &tikv.RPCContext{
Addr: rpcCtx.Addr,
}
}
var remainTasks []*copTask
for _, batchResp := range batchResps {
batchedTask, ok := tasks[batchResp.GetTaskId()]
Expand All @@ -1284,7 +1299,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
}
resp := &copResponse{
pbResp: &coprocessor.Response{
Data: batchResp.Data,
Data: batchResp.Data,
ExecDetailsV2: batchResp.ExecDetailsV2,
},
}
task := batchedTask.task
Expand Down Expand Up @@ -1331,8 +1347,9 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
}
return nil, errors.Trace(err)
}
worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp)
// TODO: check OOM
worker.sendToRespCh(resp, ch, false)
worker.sendToRespCh(resp, ch, true)
}
return remainTasks, nil
}
Expand Down