diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 54aed9fef98cd..44a431f38bc7e 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -980,7 +980,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) ( // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. switch e.(type) { - case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec: + case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec, *ImportIntoExec: snapshotTS := sctx.GetSessionVars().SnapshotTS if snapshotTS != 0 { return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set") diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index bde8add3f6212..91481707422ca 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -1016,13 +1016,15 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor { } func (b *executorBuilder) buildImportInto(v *plannercore.ImportInto) exec.Executor { - tbl, ok := b.is.TableByID(v.Table.TableInfo.ID) + // see planBuilder.buildImportInto for detail why we use the latest schema here. + latestIS := b.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tbl, ok := latestIS.TableByID(v.Table.TableInfo.ID) if !ok { b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID) return nil } if !tbl.Meta().IsBaseTable() { - b.err = plannererrors.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "LOAD") + b.err = plannererrors.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "IMPORT") return nil } diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index bbbb1e6fccb88..a2d5c5260f4fc 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -253,7 +253,9 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { e.dataFilled = true - // must use a new session to pre-check, else the stmt in show processlist will be changed. + // must use a new session as: + // - pre-check will execute other sql, the stmt in show processlist will be changed. + // - userSctx might be in stale read, we cannot do write. newSCtx, err2 := CreateSession(e.userSctx) if err2 != nil { return err2 @@ -334,7 +336,7 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { return err } - if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil { + if err2 = flushStats(ctx, newSCtx, e.importPlan.TableInfo.ID, importResult); err2 != nil { logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) } diff --git a/pkg/planner/core/plan_test.go b/pkg/planner/core/plan_test.go index 263a80bcf5af5..154dbd3483278 100644 --- a/pkg/planner/core/plan_test.go +++ b/pkg/planner/core/plan_test.go @@ -18,11 +18,13 @@ import ( "bytes" "fmt" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -720,4 +722,16 @@ func TestImportIntoBuildPlan(t *testing.T) { plannererrors.ErrWrongValueCountOnRow) require.ErrorIs(t, tk.ExecToErr("IMPORT INTO t1(a) FROM select * from t2;"), plannererrors.ErrWrongValueCountOnRow) + + time.Sleep(100 * time.Millisecond) + now := tk.MustQuery("select now(6)").Rows()[0][0].(string) + time.Sleep(100 * time.Millisecond) + tk.MustExec("create table t3 (a int, b int);") + // set tidb_snapshot will fail without this + tk.MustExec(`replace into mysql.tidb(variable_name, variable_value) values ('tikv_gc_safe_point', '20240131-00:00:00.000 +0800')`) + tk.MustExec("set tidb_snapshot = '" + now + "'") + require.ErrorContains(t, tk.ExecToErr("IMPORT INTO t1 FROM select * from t2"), + "can not execute write statement when 'tidb_snapshot' is set") + require.ErrorIs(t, tk.ExecToErr("IMPORT INTO t3 FROM select * from t2"), + infoschema.ErrTableNotExists) } diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 6a3fedcd74348..2c0f85c9cf25f 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -4217,8 +4217,22 @@ func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStm b.visitInfo = appendVisitInfo(b.visitInfo, mysql.FilePriv, "", "", "", plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("FILE")) } tableInfo := p.Table.TableInfo - tableInPlan, ok := b.is.TableByID(tableInfo.ID) + // we use the latest IS to support IMPORT INTO dst FROM SELECT * FROM src AS OF TIMESTAMP '2020-01-01 00:00:00' + // Note: we need to get p.Table when preprocessing, at that time, IS of session + // transaction is used, if the session ctx is already in snapshot read using tidb_snapshot, we might + // not get the schema or get a stale schema of the target table, so we don't + // support set 'tidb_snapshot' first and then import into the target table. + // + // tidb_read_staleness can be used to do stale read too, it's allowed as long as + // tableInfo.ID matches with the latest schema. + latestIS := b.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tableInPlan, ok := latestIS.TableByID(tableInfo.ID) if !ok { + // adaptor.handleNoDelayExecutor has a similar check, but we want to give + // a more specific error message here. + if b.ctx.GetSessionVars().SnapshotTS != 0 { + return nil, errors.New("can not execute IMPORT statement when 'tidb_snapshot' is set") + } db := b.ctx.GetSessionVars().CurrentDB return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(db, tableInfo.Name.O) } diff --git a/tests/realtikvtest/importintotest2/from_select_test.go b/tests/realtikvtest/importintotest2/from_select_test.go index 8f4a884bb3e5e..37beab71be9c1 100644 --- a/tests/realtikvtest/importintotest2/from_select_test.go +++ b/tests/realtikvtest/importintotest2/from_select_test.go @@ -18,8 +18,10 @@ import ( "fmt" "slices" "strings" + "time" "github.com/pingcap/tidb/pkg/executor/importer" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" ) @@ -88,3 +90,67 @@ func (s *mockGCSSuite) TestWriteAfterImportFromSelect() { s.tk.MustExec("insert into dt values(4, 'aaaaaa'), (5, 'bbbbbb'), (6, 'cccccc'), (7, 'dddddd')") s.testWriteAfterImport(`import into t FROM select * from from_select.dt`, importer.DataSourceTypeQuery) } + +func (s *mockGCSSuite) TestImportFromSelectStaleRead() { + s.prepareAndUseDB("from_select") + // set tidb_snapshot might fail without this, not familiar about this part. + s.tk.MustExec(`replace into mysql.tidb(variable_name, variable_value) values ('tikv_gc_safe_point', '20240131-00:00:00.000 +0800')`) + s.tk.MustExec("create table src(id int, v varchar(64))") + s.tk.MustExec("insert into src values(1, 'a')") + time.Sleep(100 * time.Millisecond) + now := s.tk.MustQuery("select now(6)").Rows()[0][0].(string) + time.Sleep(100 * time.Millisecond) + s.tk.MustExec("insert into src values(2, 'b')") + s.tk.MustQuery("select * from src").Check(testkit.Rows("1 a", "2 b")) + staleReadSQL := fmt.Sprintf("select * from src as of timestamp '%s'", now) + s.tk.MustQuery(staleReadSQL).Check(testkit.Rows("1 a")) + s.tk.MustExec("create table dst(id int, v varchar(64))") + + // + // in below cases, dst table not exists at time 'now' + // + // using set tidb_snapshot + s.tk.MustExec("set tidb_snapshot = '" + now + "'") + s.ErrorIs(s.tk.ExecToErr("import into dst from "+staleReadSQL), infoschema.ErrTableNotExists) + s.ErrorIs(s.tk.ExecToErr("import into dst from select * from src"), infoschema.ErrTableNotExists) + // using AS OF TIMESTAMP + s.tk.MustExec("set tidb_snapshot = ''") + s.tk.MustExec("import into dst from " + staleReadSQL) + s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a")) + + // + // in below cases, table exists at time 'now', and it's the latest version too. + // + s.tk.MustExec("truncate table dst") + time.Sleep(100 * time.Millisecond) + now = s.tk.MustQuery("select now(6)").Rows()[0][0].(string) + time.Sleep(100 * time.Millisecond) + staleReadSQL = fmt.Sprintf("select * from src as of timestamp '%s'", now) + s.tk.MustExec("insert into src values(3, 'c')") + s.tk.MustQuery("select * from src").Check(testkit.Rows("1 a", "2 b", "3 c")) + // using set tidb_snapshot + s.tk.MustExec("set tidb_snapshot = '" + now + "'") + s.ErrorContains(s.tk.ExecToErr("import into dst from "+staleReadSQL), + "can not execute write statement when 'tidb_snapshot' is set") + s.ErrorContains(s.tk.ExecToErr("import into dst from select * from src"), + "can not execute write statement when 'tidb_snapshot' is set") + // using AS OF TIMESTAMP + s.tk.MustExec("set tidb_snapshot = ''") + s.tk.MustExec("import into dst from " + staleReadSQL) + s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a", "2 b")) + + // + // in below cases, table exists at time 'now', and it's NOT the latest version. + // + s.tk.MustExec("truncate table dst") + // using set tidb_snapshot + s.tk.MustExec("set tidb_snapshot = '" + now + "'") + s.ErrorContains(s.tk.ExecToErr("import into dst from "+staleReadSQL), + "can not execute IMPORT statement when 'tidb_snapshot' is set") + s.ErrorContains(s.tk.ExecToErr("import into dst from select * from src"), + "can not execute IMPORT statement when 'tidb_snapshot' is set") + // using AS OF TIMESTAMP + s.tk.MustExec("set tidb_snapshot = ''") + s.tk.MustExec("import into dst from " + staleReadSQL) + s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a", "2 b")) +}