-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coprocessor: parallel build cop tasks #14384
Conversation
- CopClient.Send: removed buildCopTasks call - copIterator: - tasks is empty at the start - added tasksFilled field to indicate thet tasks are filled - Added buildCopTasksChan which is almost the same as buildCopTasks, except it returns a channel, where all new tasks will be sent - Added a new function type: splitRangesCallback - Added buildSplitRangesCallback to create a callback function wich is used in buildCopTasks & buildCopTasksChan for gettings tasks - Added buildTiDBMemCopTasksChan. almost the same as buildTiDBMemCopTasks, except it returns a channel - copIteratorTaskSender: - added iter field. to be able to fill copIterator.tasks in copIteratorTaskSender.run - added newTasksCh field. to get new tasks from buildCopTasksChan - removed tasks field - copIteratorTaskSender.run: - iterating via tasksCh instead of tasks - send a signal after the loop by tasksCh into iter.tasksFilled - copIterator.open: - used buildCopTasksChan to create a channel for getting new tasks - copIterator.Next: - for the KeepOrder branch only: start iterate over tasks slice only after tasksFilled channel got a message (from copIteratorTaskSender.run) - Added tests for buildCopTasksChan
Thanks for your contribution. If your PR get merged, you will be rewarded 2500 points. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @ekalinin do you have any clue to fix the tests you commented?
Not yet :( I hope today I will have time to fix them |
Hi @SunRunAway Found the reason of failed tests. It was So, I've removed it for a while. As far as i can see |
Anyway, found a way to use |
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @ekalinin , it looks great.
I'm very sorry for the late reply, I've left several comments and hope to merge it soon.
@@ -526,8 +611,9 @@ func (worker *copIteratorWorker) run(ctx context.Context) { | |||
} | |||
|
|||
// open starts workers and sender goroutines. | |||
func (it *copIterator) open(ctx context.Context) { | |||
func (it *copIterator) open(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you draw a text graph to illustrate how tasks will be transisted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do it today. What would better: just add it here, or add it as a comment into code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a comment is good to me.
…db into PCP/parallel-cop-tasks-build
@@ -231,28 +267,45 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv | |||
tableStart, tableEnd = keyRange[0].StartKey, keyRange[0].EndKey | |||
} | |||
|
|||
if shouldStop(finishCh) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not only check finishCh
here, but also check it each time a task sends to receiver, to make sure it will not be blocked by receiver channel if copIteratorTaskSender.run
exits.
replicaReadSeed: it.replicaReadSeed, | ||
} | ||
go worker.run(ctx) | ||
} | ||
bo := NewBackoffer(ctx, copBuildTaskMaxBackoff).WithVars(it.vars) | ||
_, err := buildCopTasksChan(bo, it.store.regionCache, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The errCh
should not be discarded here, and we should check errCh
each time it tries to retrieve a task somewhere, cancel all tasks and throw the error when error of buildCopTasksChan
happens.
} | ||
|
||
func (sender *copIteratorTaskSender) run() { | ||
// Send tasks to feed the worker goroutines. | ||
for _, t := range sender.tasks { | ||
for t := range sender.tasksFromBuilderCh { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This chanel should not be blocked when,
buildCopTasksChan
meets an errorbuildCopTasksChan
get an exit signal fromfinishCh
And copIterator.Next
should return error if buildCopTasksChan
meets an error.
…urrTask to keep order
@ekalinin, please update your pull request. |
1 similar comment
@ekalinin, please update your pull request. |
PCP #14320
What problem does this PR solve?
Build all cop tasks in a separate goroutine, and send them immediately to
copIteratorWorker
.What is changed and how it works?
Added three new functions which returns a channel instead of slice:
buildCopTasksChan
which is almost the same asbuildCopTasks
, except it does all requests in a separate goroutines and returns a channel, where all new tasks will be sent.buildTiDBMemCopTasksChan
function. almost the same asbuildTiDBMemCopTasks
, except it returns a channel and sends tasks immediatelyAlso:
splitRangesCallback
(used insplitRanges
)Changes in other functions and structures:
CopClient.Send
: removedbuildCopTasks
call (buildCopTasksChan
will be used incopIterator.open
)copIterator
:tasks
is a channel, will be filled fromcopIteratorTaskSender.run -> sendToTaskCh
copIteratorTaskSender
:iterator
field - reference tocopIterator
. Will be used to fillcopIterator.tasks
incopIteratorTaskSender.run
newTasksCh
field. It will be used to get new tasks frombuildCopTasksChan
tasks
is a channel to send tasks into copIterator.Next (order for processing)copIteratorTaskSender.run
:newTasksCh
instead oftasks
and fillcopIterator.tasks
during an iteration (viacopIteratorTaskSender.sendToTaskCh
)copIterator.open
:buildCopTasksChan
to create a channel for getting new tasksAdded tests for
buildCopTasksChan
Check List
Tests
Code changes
Side effects