Skip to content

Commit

Permalink
executor: Fix data race when getting snapshot ts in IndexLookupJoin a…
Browse files Browse the repository at this point in the history
…nd UnionExec (#30487) (#31351)

close #30468
  • Loading branch information
ti-srebot authored Apr 29, 2022
1 parent 382775c commit c679590
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 1 deletion.
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,17 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
}

func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executor {
// A quick fix for avoiding a race mentioned in issue #30468.
// Fetch the snapshot ts to make the transaction's state ready. Otherwise, multiple threads in the Union executor
// may change the transaction's state concurrently, which causes race.
// This fix is a hack, but with minimal change to the current code and works. Actually, the usage of the transaction
// states and the logic to access the snapshot ts should all be refactored.
_, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}

childExecs := make([]Executor, len(v.Children()))
for i, child := range v.Children() {
childExecs[i] = b.build(child)
Expand Down
24 changes: 23 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,29 @@ type innerWorker struct {

// Open implements the Executor interface.
func (e *IndexLookUpJoin) Open(ctx context.Context) error {
err := e.children[0].Open(ctx)
// Be careful, very dirty hack in this line!!!
// IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during
// executing. However `executor.Next()` is lazy evaluation when the RecordSet
// result is drained.
// Lazy evaluation means the saved session context may change during executor's
// building and its running.
// A specific sequence for example:
//
// e := buildExecutor() // txn at build time
// recordSet := runStmt(e)
// session.CommitTxn() // txn closed
// recordSet.Next()
// e.dataReaderBuilder.Build() // txn is used again, which is already closed
//
// The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder,
// so even txn is destroyed later, the dataReaderBuilder could still use the
// cached snapshot ts to construct DAG.
_, err := e.innerCtx.readerBuilder.getSnapshotTS()
if err != nil {
return err
}

err = e.children[0].Open(ctx)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ type txnFuture struct {

func (tf *txnFuture) wait() (kv.Transaction, error) {
startTS, err := tf.future.Wait()
failpoint.Inject("txnFutureWait", func() {})
if err == nil {
return tf.store.Begin(tikv.WithTxnScope(tf.txnScope), tikv.WithStartTS(startTS))
} else if config.GetGlobalConfig().Store == "unistore" {
Expand Down

0 comments on commit c679590

Please sign in to comment.