From d8655c0187ef227819d6258e32a7dffac43eb432 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 13 Nov 2024 11:02:14 +0800 Subject: [PATCH] *: Use strict validation for stale read ts & flashback ts (#57050) (#57315) close pingcap/tidb#56809 --- DEPS.bzl | 4 +- MODULE.bazel.lock | 110 ------------------------- ddl/cluster.go | 13 +-- executor/set.go | 6 +- executor/stale_txn_test.go | 25 +++++- go.mod | 2 +- go.sum | 4 +- pkg/sessionctx/BUILD.bazel | 0 planner/core/plan_cache_utils.go | 2 +- planner/core/planbuilder.go | 4 +- planner/core/preprocess.go | 2 +- sessionctx/BUILD.bazel | 2 - sessionctx/context.go | 43 +--------- sessiontxn/staleread/processor.go | 20 ++--- sessiontxn/staleread/processor_test.go | 30 +++---- sessiontxn/staleread/util.go | 17 +++- 16 files changed, 78 insertions(+), 206 deletions(-) delete mode 100644 MODULE.bazel.lock create mode 100644 pkg/sessionctx/BUILD.bazel diff --git a/DEPS.bzl b/DEPS.bzl index 042f4025e5db0..322bc9d8025ce 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3603,8 +3603,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:0YcirnuxtXC9eQRb231im1M5w/n7JFuOo0IgE/K9ffM=", - version = "v2.0.4-0.20241125064444-5f59e4e34c62", + sum = "h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8=", + version = "v2.0.4-0.20250109055446-ccec7efbf0f7", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock deleted file mode 100644 index d62a47c0f60cd..0000000000000 --- a/MODULE.bazel.lock +++ /dev/null @@ -1,110 +0,0 @@ -{ - "lockFileVersion": 11, - "registryFileHashes": { - "https://bcr.bazel.build/bazel_registry.json": "8a28e4aff06ee60aed2a8c281907fb8bcbf3b753c91fb5a5c57da3215d5b3497", - "https://bcr.bazel.build/modules/abseil-cpp/20210324.2/MODULE.bazel": "7cd0312e064fde87c8d1cd79ba06c876bd23630c83466e9500321be55c96ace2", - "https://bcr.bazel.build/modules/abseil-cpp/20211102.0/MODULE.bazel": "70390338f7a5106231d20620712f7cccb659cd0e9d073d1991c038eb9fc57589", - "https://bcr.bazel.build/modules/abseil-cpp/20211102.0/source.json": "7e3a9adf473e9af076ae485ed649d5641ad50ec5c11718103f34de03170d94ad", - "https://bcr.bazel.build/modules/apple_support/1.5.0/MODULE.bazel": "50341a62efbc483e8a2a6aec30994a58749bd7b885e18dd96aa8c33031e558ef", - "https://bcr.bazel.build/modules/apple_support/1.5.0/source.json": "eb98a7627c0bc486b57f598ad8da50f6625d974c8f723e9ea71bd39f709c9862", - "https://bcr.bazel.build/modules/bazel_features/1.11.0/MODULE.bazel": "f9382337dd5a474c3b7d334c2f83e50b6eaedc284253334cf823044a26de03e8", - "https://bcr.bazel.build/modules/bazel_features/1.11.0/source.json": "c9320aa53cd1c441d24bd6b716da087ad7e4ff0d9742a9884587596edfe53015", - "https://bcr.bazel.build/modules/bazel_skylib/1.0.3/MODULE.bazel": "bcb0fd896384802d1ad283b4e4eb4d718eebd8cb820b0a2c3a347fb971afd9d8", - "https://bcr.bazel.build/modules/bazel_skylib/1.2.1/MODULE.bazel": "f35baf9da0efe45fa3da1696ae906eea3d615ad41e2e3def4aeb4e8bc0ef9a7a", - "https://bcr.bazel.build/modules/bazel_skylib/1.3.0/MODULE.bazel": "20228b92868bf5cfc41bda7afc8a8ba2a543201851de39d990ec957b513579c5", - "https://bcr.bazel.build/modules/bazel_skylib/1.6.1/MODULE.bazel": "8fdee2dbaace6c252131c00e1de4b165dc65af02ea278476187765e1a617b917", - "https://bcr.bazel.build/modules/bazel_skylib/1.6.1/source.json": "082ed5f9837901fada8c68c2f3ddc958bb22b6d654f71dd73f3df30d45d4b749", - "https://bcr.bazel.build/modules/buildozer/7.1.2/MODULE.bazel": "2e8dd40ede9c454042645fd8d8d0cd1527966aa5c919de86661e62953cd73d84", - "https://bcr.bazel.build/modules/buildozer/7.1.2/source.json": "c9028a501d2db85793a6996205c8de120944f50a0d570438fcae0457a5f9d1f8", - "https://bcr.bazel.build/modules/googletest/1.11.0/MODULE.bazel": "3a83f095183f66345ca86aa13c58b59f9f94a2f81999c093d4eeaa2d262d12f4", - "https://bcr.bazel.build/modules/googletest/1.11.0/source.json": "c73d9ef4268c91bd0c1cd88f1f9dfa08e814b1dbe89b5f594a9f08ba0244d206", - "https://bcr.bazel.build/modules/platforms/0.0.4/MODULE.bazel": "9b328e31ee156f53f3c416a64f8491f7eb731742655a47c9eec4703a71644aee", - "https://bcr.bazel.build/modules/platforms/0.0.5/MODULE.bazel": "5733b54ea419d5eaf7997054bb55f6a1d0b5ff8aedf0176fef9eea44f3acda37", - "https://bcr.bazel.build/modules/platforms/0.0.6/MODULE.bazel": "ad6eeef431dc52aefd2d77ed20a4b353f8ebf0f4ecdd26a807d2da5aa8cd0615", - "https://bcr.bazel.build/modules/platforms/0.0.7/MODULE.bazel": "72fd4a0ede9ee5c021f6a8dd92b503e089f46c227ba2813ff183b71616034814", - "https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc", - "https://bcr.bazel.build/modules/platforms/0.0.9/source.json": "cd74d854bf16a9e002fb2ca7b1a421f4403cda29f824a765acd3a8c56f8d43e6", - "https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7", - "https://bcr.bazel.build/modules/protobuf/21.7/source.json": "bbe500720421e582ff2d18b0802464205138c06056f443184de39fbb8187b09b", - "https://bcr.bazel.build/modules/protobuf/3.19.0/MODULE.bazel": "6b5fbb433f760a99a22b18b6850ed5784ef0e9928a72668b66e4d7ccd47db9b0", - "https://bcr.bazel.build/modules/protobuf/3.19.6/MODULE.bazel": "9233edc5e1f2ee276a60de3eaa47ac4132302ef9643238f23128fea53ea12858", - "https://bcr.bazel.build/modules/rules_cc/0.0.1/MODULE.bazel": "cb2aa0747f84c6c3a78dad4e2049c154f08ab9d166b1273835a8174940365647", - "https://bcr.bazel.build/modules/rules_cc/0.0.2/MODULE.bazel": "6915987c90970493ab97393024c156ea8fb9f3bea953b2f3ec05c34f19b5695c", - "https://bcr.bazel.build/modules/rules_cc/0.0.8/MODULE.bazel": "964c85c82cfeb6f3855e6a07054fdb159aced38e99a5eecf7bce9d53990afa3e", - "https://bcr.bazel.build/modules/rules_cc/0.0.9/MODULE.bazel": "836e76439f354b89afe6a911a7adf59a6b2518fafb174483ad78a2a2fde7b1c5", - "https://bcr.bazel.build/modules/rules_cc/0.0.9/source.json": "1f1ba6fea244b616de4a554a0f4983c91a9301640c8fe0dd1d410254115c8430", - "https://bcr.bazel.build/modules/rules_java/4.0.0/MODULE.bazel": "5a78a7ae82cd1a33cef56dc578c7d2a46ed0dca12643ee45edbb8417899e6f74", - "https://bcr.bazel.build/modules/rules_java/7.6.5/MODULE.bazel": "481164be5e02e4cab6e77a36927683263be56b7e36fef918b458d7a8a1ebadb1", - "https://bcr.bazel.build/modules/rules_java/7.6.5/source.json": "a805b889531d1690e3c72a7a7e47a870d00323186a9904b36af83aa3d053ee8d", - "https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/MODULE.bazel": "a56b85e418c83eb1839819f0b515c431010160383306d13ec21959ac412d2fe7", - "https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/source.json": "a075731e1b46bc8425098512d038d416e966ab19684a10a34f4741295642fc35", - "https://bcr.bazel.build/modules/rules_license/0.0.3/MODULE.bazel": "627e9ab0247f7d1e05736b59dbb1b6871373de5ad31c3011880b4133cafd4bd0", - "https://bcr.bazel.build/modules/rules_license/0.0.7/MODULE.bazel": "088fbeb0b6a419005b89cf93fe62d9517c0a2b8bb56af3244af65ecfe37e7d5d", - "https://bcr.bazel.build/modules/rules_license/0.0.7/source.json": "355cc5737a0f294e560d52b1b7a6492d4fff2caf0bef1a315df5a298fca2d34a", - "https://bcr.bazel.build/modules/rules_pkg/0.7.0/MODULE.bazel": "df99f03fc7934a4737122518bb87e667e62d780b610910f0447665a7e2be62dc", - "https://bcr.bazel.build/modules/rules_pkg/0.7.0/source.json": "c2557066e0c0342223ba592510ad3d812d4963b9024831f7f66fd0584dd8c66c", - "https://bcr.bazel.build/modules/rules_proto/4.0.0/MODULE.bazel": "a7a7b6ce9bee418c1a760b3d84f83a299ad6952f9903c67f19e4edd964894e06", - "https://bcr.bazel.build/modules/rules_proto/5.3.0-21.7/MODULE.bazel": "e8dff86b0971688790ae75528fe1813f71809b5afd57facb44dad9e8eca631b7", - "https://bcr.bazel.build/modules/rules_proto/5.3.0-21.7/source.json": "d57902c052424dfda0e71646cb12668d39c4620ee0544294d9d941e7d12bc3a9", - "https://bcr.bazel.build/modules/rules_python/0.10.2/MODULE.bazel": "cc82bc96f2997baa545ab3ce73f196d040ffb8756fd2d66125a530031cd90e5f", - "https://bcr.bazel.build/modules/rules_python/0.22.1/MODULE.bazel": "26114f0c0b5e93018c0c066d6673f1a2c3737c7e90af95eff30cfee38d0bbac7", - "https://bcr.bazel.build/modules/rules_python/0.22.1/source.json": "57226905e783bae7c37c2dd662be078728e48fa28ee4324a7eabcafb5a43d014", - "https://bcr.bazel.build/modules/rules_python/0.4.0/MODULE.bazel": "9208ee05fd48bf09ac60ed269791cf17fb343db56c8226a720fbb1cdf467166c", - "https://bcr.bazel.build/modules/stardoc/0.5.1/MODULE.bazel": "1a05d92974d0c122f5ccf09291442580317cdd859f07a8655f1db9a60374f9f8", - "https://bcr.bazel.build/modules/stardoc/0.5.1/source.json": "a96f95e02123320aa015b956f29c00cb818fa891ef823d55148e1a362caacf29", - "https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/MODULE.bazel": "7298990c00040a0e2f121f6c32544bab27d4452f80d9ce51349b1a28f3005c43", - "https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/source.json": "f1ef7d3f9e0e26d4b23d1c39b5f5de71f584dd7d1b4ef83d9bbba6ec7a6a6459", - "https://bcr.bazel.build/modules/zlib/1.2.11/MODULE.bazel": "07b389abc85fdbca459b69e2ec656ae5622873af3f845e1c9d80fe179f3effa0", - "https://bcr.bazel.build/modules/zlib/1.2.12/MODULE.bazel": "3b1a8834ada2a883674be8cbd36ede1b6ec481477ada359cd2d3ddc562340b27", - "https://bcr.bazel.build/modules/zlib/1.3.1.bcr.3/MODULE.bazel": "af322bc08976524477c79d1e45e241b6efbeb918c497e8840b8ab116802dda79", - "https://bcr.bazel.build/modules/zlib/1.3.1.bcr.3/source.json": "2be409ac3c7601245958cd4fcdff4288be79ed23bd690b4b951f500d54ee6e7d" - }, - "selectedYankedVersions": {}, - "moduleExtensions": { - "@@apple_support~//crosstool:setup.bzl%apple_cc_configure_extension": { - "general": { - "bzlTransitiveDigest": "PjIds3feoYE8SGbbIq2SFTZy3zmxeO2tQevJZNDo7iY=", - "usagesDigest": "+hz7IHWN6A1oVJJWNDB6yZRG+RYhF76wAYItpAeIUIg=", - "recordedFileInputs": {}, - "recordedDirentsInputs": {}, - "envVariables": {}, - "generatedRepoSpecs": { - "local_config_apple_cc_toolchains": { - "bzlFile": "@@apple_support~//crosstool:setup.bzl", - "ruleClassName": "_apple_cc_autoconf_toolchains", - "attributes": {} - }, - "local_config_apple_cc": { - "bzlFile": "@@apple_support~//crosstool:setup.bzl", - "ruleClassName": "_apple_cc_autoconf", - "attributes": {} - } - }, - "recordedRepoMappingEntries": [ - [ - "apple_support~", - "bazel_tools", - "bazel_tools" - ] - ] - } - }, - "@@platforms//host:extension.bzl%host_platform": { - "general": { - "bzlTransitiveDigest": "xelQcPZH8+tmuOHVjL9vDxMnnQNMlwj0SlvgoqBkm4U=", - "usagesDigest": "pCYpDQmqMbmiiPI1p2Kd3VLm5T48rRAht5WdW0X2GlA=", - "recordedFileInputs": {}, - "recordedDirentsInputs": {}, - "envVariables": {}, - "generatedRepoSpecs": { - "host_platform": { - "bzlFile": "@@platforms//host:extension.bzl", - "ruleClassName": "host_platform_repo", - "attributes": {} - } - }, - "recordedRepoMappingEntries": [] - } - } - } -} diff --git a/ddl/cluster.go b/ddl/cluster.go index 598f1cda7de51..cf482cb6e8efe 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -108,16 +107,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { - currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD. + currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate flashback timestamp: %v", err) - } - currentTS = currentVer.Ver + return errors.Errorf("fail to validate flashback timestamp: %v", err) } + currentTS := currentVer.Ver + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") diff --git a/executor/set.go b/executor/set.go index 75e4938d41725..da6e1a58198b7 100644 --- a/executor/set.go +++ b/executor/set.go @@ -197,10 +197,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - if name == variable.TiDBTxnReadTS { - err = sessionctx.ValidateStaleReadTS(ctx, e.ctx, newSnapshotTS) - } else { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx, newSnapshotTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS) + if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts // Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor. diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index e621c33ccc675..5b141f75e443a 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -17,6 +17,7 @@ package executor_test import ( "context" "fmt" + "strconv" "testing" "time" @@ -1406,14 +1407,30 @@ func TestStaleTSO(t *testing.T) { tk.MustExec("create table t (id int)") tk.MustExec("insert into t values(1)") + ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + + // Wait until the physical advances for 1s + var currentTS uint64 + for { + tk.MustExec("begin") + currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + tk.MustExec("rollback") + if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) { + break + } + time.Sleep(time.Millisecond * 100) + } asOfExprs := []string{ - "now(3) - interval 1 second", - "current_time() - interval 1 second", - "curtime() - interval 1 second", + "now(3) - interval 10 second", + "current_time() - interval 10 second", + "curtime() - interval 10 second", } - nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second)) + nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second)) + nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS)) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO))) defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO") for _, expr := range asOfExprs { diff --git a/go.mod b/go.mod index 6945c5367ea86..b891b04dabb67 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62 + github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7 github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 3e24ce3c05608..a8833b4242b28 100644 --- a/go.sum +++ b/go.sum @@ -948,8 +948,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62 h1:0YcirnuxtXC9eQRb231im1M5w/n7JFuOo0IgE/K9ffM= -github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= +github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7 h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8= +github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/planner/core/plan_cache_utils.go b/planner/core/plan_cache_utils.go index ae2ed489cb7df..5aa35357eb1cd 100644 --- a/planner/core/plan_cache_utils.go +++ b/planner/core/plan_cache_utils.go @@ -515,7 +515,7 @@ type PlanCacheStmt struct { SQLDigest *parser.Digest PlanDigest *parser.Digest ForUpdateRead bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) NormalizedSQL4PC string SQLDigest4PC string diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index e6b0ce5e9e4f5..8ce34e4ce3a26 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3399,7 +3399,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3413,7 +3413,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 785ff61a615f3..ae64a9f23b188 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -168,7 +168,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS type PreprocessorReturn struct { initedLastSnapshotTS bool IsStaleness bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) // LastSnapshotTS is the last evaluated snapshotTS if any // otherwise it defaults to zero LastSnapshotTS uint64 diff --git a/sessionctx/BUILD.bazel b/sessionctx/BUILD.bazel index 800001fd426b3..7f893d0412173 100644 --- a/sessionctx/BUILD.bazel +++ b/sessionctx/BUILD.bazel @@ -8,7 +8,6 @@ go_library( deps = [ "//extension", "//kv", - "//metrics", "//parser/model", "//sessionctx/sessionstates", "//sessionctx/variable", @@ -17,7 +16,6 @@ go_library( "//util/kvcache", "//util/sli", "//util/topsql/stmtstats", - "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_tipb//go-binlog", "@com_github_tikv_client_go_v2//oracle", diff --git a/sessionctx/context.go b/sessionctx/context.go index 35eb7ba68ca1d..281c3f8d24a19 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -17,13 +17,10 @@ package sessionctx import ( "context" "fmt" - "time" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/extension" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/variable" @@ -223,44 +220,8 @@ const ( ) // ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error { - latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check - if err != nil || readTS > latestTS { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - if readTS > currentVer.Ver { - return errors.Errorf("cannot set read timestamp to a future time") - } - } - return nil -} - -// How far future from now ValidateStaleReadTS allows at most -const allowedTimeFromNow = 100 * time.Millisecond - -// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly. -func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error { - currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO() - if currentTS == 0 || err != nil { - currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - } - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD - if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - currentTS = currentVer.Ver - } - if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { - return errors.Errorf("cannot set read timestamp to a future time") - } - return nil +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { + return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } // SysProcTracker is used to track background sys processes diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index af91ffd1b175e..278d1b158a599 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -30,7 +30,7 @@ import ( var _ Processor = &staleReadProcessor{} // StalenessTSEvaluator is a function to get staleness ts -type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) +type StalenessTSEvaluator func(ctx context.Context, sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { @@ -100,7 +100,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { return err } - return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return p.setEvaluatedValues(ts, is, func(_ context.Context, sctx sessionctx.Context) (uint64, error) { return ts, nil }) } @@ -116,7 +116,7 @@ func (p *baseProcessor) setEvaluatedTSWithoutEvaluator(ts uint64) (err error) { } func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { - ts, err := evaluator(p.sctx) + ts, err := evaluator(p.ctx, p.sctx) if err != nil { return err } @@ -167,10 +167,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' - evaluateTS := func(sctx sessionctx.Context) (uint64, error) { - return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf) + evaluateTS := func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return parseAndValidateAsOf(ctx, p.sctx, tn.AsOf) } - stmtAsOfTS, err := evaluateTS(p.sctx) + stmtAsOfTS, err := evaluateTS(p.ctx, p.sctx) if err != nil { return err } @@ -200,7 +200,7 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness var stmtTS uint64 if preparedTSEvaluator != nil { // If the `preparedTSEvaluator` is not nil, it means the prepared statement is stale read - if stmtTS, err = preparedTSEvaluator(p.sctx); err != nil { + if stmtTS, err = preparedTSEvaluator(p.ctx, p.sctx); err != nil { return err } } @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateStaleReadTS(ctx, sctx, ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { return 0, err } @@ -298,8 +298,8 @@ func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluat return nil } - return func(sctx sessionctx.Context) (uint64, error) { - return CalculateTsWithReadStaleness(sctx, readStaleness) + return func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(ctx, sctx, readStaleness) } } diff --git a/sessiontxn/staleread/processor_test.go b/sessiontxn/staleread/processor_test.go index 204bb63a3d8de..d34c2712b73e1 100644 --- a/sessiontxn/staleread/processor_test.go +++ b/sessiontxn/staleread/processor_test.go @@ -51,7 +51,7 @@ func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.P evaluator := processor.GetStalenessTSEvaluatorForPrepare() if hasEvaluator { require.NotNil(t, evaluator) - ts, err := evaluator(p.tk.Session()) + ts, err := evaluator(context.Background(), p.tk.Session()) require.NoError(t, err) require.Equal(t, processor.GetStalenessReadTS(), ts) } else { @@ -108,6 +108,7 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { tn := astTableWithAsOf(t, "") p1 := genStaleReadPoint(t, tk) p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") @@ -157,19 +158,19 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator := processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err := evaluator(tk.Session()) + evaluatorTS, err := evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") tk.MustExec("do sleep(0.01)") - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) - expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS2, evaluatorTS) @@ -216,11 +217,11 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator = processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") @@ -233,13 +234,14 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { tk := testkit.NewTestKit(t, store) p1 := genStaleReadPoint(t, tk) //p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") // execute prepared stmt with ts evaluator processor := createProcessor(t, tk.Session()) - err := processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -247,7 +249,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // will get an error when ts evaluator fails processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return 0, errors.New("mock error") }) require.Error(t, err) @@ -272,7 +274,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // prepared ts is not allowed when @@txn_read_ts is set tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) @@ -285,7 +287,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -293,7 +295,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` tk.MustExec("set @@tidb_read_staleness=-5") processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -336,7 +338,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -376,7 +378,7 @@ func TestStaleReadProcessorInTxn(t *testing.T) { // return an error when execute prepared stmt with as of processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index d2cc7e4863446..30a446bbb1817 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -71,14 +71,25 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr as } // CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration -func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { +func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { nowVal, err := expression.GetStmtTimestamp(sctx) if err != nil { return 0, err } tsVal := nowVal.Add(readStaleness) - minTsVal := expression.GetMinSafeTime(sctx) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil + minSafeTSVal := expression.GetMinSafeTime(sctx) + calculatedTime := expression.CalAppropriateTime(tsVal, nowVal, minSafeTSVal) + readTS := oracle.GoTimeToTS(calculatedTime) + if calculatedTime.After(minSafeTSVal) { + // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that + // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). + // So in this case, do an extra check on it. + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + if err != nil { + return 0, err + } + } + return readTS, nil } // IsStmtStaleness indicates whether the current statement is staleness or not