From c679590945be5a232afc72a276d6c192ff95b861 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 29 Apr 2022 11:38:52 +0800 Subject: [PATCH] executor: Fix data race when getting snapshot ts in IndexLookupJoin and UnionExec (#30487) (#31351) close pingcap/tidb#30468 --- executor/builder.go | 11 +++++++++++ executor/index_lookup_join.go | 24 +++++++++++++++++++++++- session/txn.go | 1 + 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/executor/builder.go b/executor/builder.go index 52c8942b52932..9b10061ca21d7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1905,6 +1905,17 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu } func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executor { + // A quick fix for avoiding a race mentioned in issue #30468. + // Fetch the snapshot ts to make the transaction's state ready. Otherwise, multiple threads in the Union executor + // may change the transaction's state concurrently, which causes race. + // This fix is a hack, but with minimal change to the current code and works. Actually, the usage of the transaction + // states and the logic to access the snapshot ts should all be refactored. + _, err := b.getSnapshotTS() + if err != nil { + b.err = err + return nil + } + childExecs := make([]Executor, len(v.Children())) for i, child := range v.Children() { childExecs[i] = b.build(child) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 994385566dde3..6337b8a01dc09 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -162,7 +162,29 @@ type innerWorker struct { // Open implements the Executor interface. func (e *IndexLookUpJoin) Open(ctx context.Context) error { - err := e.children[0].Open(ctx) + // Be careful, very dirty hack in this line!!! + // IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during + // executing. However `executor.Next()` is lazy evaluation when the RecordSet + // result is drained. + // Lazy evaluation means the saved session context may change during executor's + // building and its running. + // A specific sequence for example: + // + // e := buildExecutor() // txn at build time + // recordSet := runStmt(e) + // session.CommitTxn() // txn closed + // recordSet.Next() + // e.dataReaderBuilder.Build() // txn is used again, which is already closed + // + // The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder, + // so even txn is destroyed later, the dataReaderBuilder could still use the + // cached snapshot ts to construct DAG. + _, err := e.innerCtx.readerBuilder.getSnapshotTS() + if err != nil { + return err + } + + err = e.children[0].Open(ctx) if err != nil { return err } diff --git a/session/txn.go b/session/txn.go index 2ed83e89c2fde..ce36621ba8afe 100644 --- a/session/txn.go +++ b/session/txn.go @@ -488,6 +488,7 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() + failpoint.Inject("txnFutureWait", func() {}) if err == nil { return tf.store.Begin(tikv.WithTxnScope(tf.txnScope), tikv.WithStartTS(startTS)) } else if config.GetGlobalConfig().Store == "unistore" {