Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support auto_random column in composite primary key (#41463) #41481

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 106 additions & 4 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type genCol struct {
type autoIDConverter func(int64) int64

type tableKVEncoder struct {
tbl table.Table
se *session
recordCache []types.Datum
genCols []genCol
tbl table.Table
autoRandomColID int64
se *session
recordCache []types.Datum
genCols []genCol
// convert auto id for shard rowid or auto random id base on row id generated by lightning
autoIDFn autoIDConverter
}
Expand All @@ -74,7 +75,9 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

var autoRandomColID int64
autoIDFn := func(id int64) int64 { return id }
<<<<<<< HEAD
if meta.PKIsHandle && meta.ContainsAutoRandomBits() {
for _, col := range cols {
if mysql.HasPriKeyFlag(col.Flag) {
Expand All @@ -85,6 +88,16 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error
}
break
}
=======
if meta.ContainsAutoRandomBits() {
col := common.GetAutoRandomColumn(meta)
autoRandomColID = col.ID

shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463))
}
} else if meta.ShardRowIDBits > 0 {
rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec
Expand All @@ -104,10 +117,19 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error
}

return &tableKVEncoder{
<<<<<<< HEAD
tbl: tbl,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
=======
tbl: tbl,
autoRandomColID: autoRandomColID,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463))
}, nil
}

Expand Down Expand Up @@ -392,8 +414,13 @@ func (kvcodec *tableKVEncoder) Encode(

record = append(record, value)

<<<<<<< HEAD
if isAutoRandom && isPk {
incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits))
=======
if kvcodec.isAutoRandomCol(col.ToInfo()) {
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463))
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -450,6 +477,81 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

<<<<<<< HEAD
=======
func (kvcodec *tableKVEncoder) isAutoRandomCol(col *model.ColumnInfo) bool {
return kvcodec.tbl.Meta().ContainsAutoRandomBits() && col.ID == kvcodec.autoRandomColID
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
}

// GetEncoderIncrementalID return Auto increment id.
func GetEncoderIncrementalID(encoder Encoder, id int64) int64 {
return encoder.(*tableKVEncoder).autoIDFn(id)
}

// GetEncoderSe return session.
func GetEncoderSe(encoder Encoder) *session {
return encoder.(*tableKVEncoder).se
}

// GetActualDatum export getActualDatum function.
func GetActualDatum(encoder Encoder, rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
return encoder.(*tableKVEncoder).getActualDatum(70, 0, inputDatum)
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
err error
)

cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
// we can assume that the `colIndex` always have a valid input
col := cols[colIndex]

isBadNullValue := false
if inputDatum != nil {
value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
}
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case kvcodec.isAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
return value, err
}

>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463))
// get record value for auto-increment field
//
// See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852
Expand Down
118 changes: 118 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "common",
srcs = [
"conn.go",
"errors.go",
"once_error.go",
"pause.go",
"retry.go",
"security.go",
"storage.go",
"storage_unix.go",
"storage_windows.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/common",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/httputil",
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//errno",
"//parser/model",
"//store/driver/error",
"//table/tables",
"//util",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
] + select({
"@io_bazel_rules_go//go/platform:aix": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:android": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:darwin": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:illumos": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:ios": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:js": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:linux": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:plan9": [
"@org_golang_x_sys//unix",
],
"@io_bazel_rules_go//go/platform:solaris": [
"@org_golang_x_sys//unix",
],
"//conditions:default": [],
}),
)

go_test(
name = "common_test",
timeout = "short",
srcs = [
"errors_test.go",
"main_test.go",
"once_error_test.go",
"pause_test.go",
"retry_test.go",
"security_test.go",
"storage_test.go",
"util_test.go",
],
embed = [":common"],
flaky = True,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
"//errno",
"//parser",
"//store/driver/error",
"//testkit/testsetup",
"//util/dbutil",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_multierr//:multierr",
],
)
20 changes: 20 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -337,3 +338,22 @@ func StringSliceEqual(a, b []string) bool {
}
return true
}

// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it.
// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
if !tblInfo.ContainsAutoRandomBits() {
return nil
}
if tblInfo.PKIsHandle {
return tblInfo.GetPkColInfo()
} else if tblInfo.IsCommonHandle {
pk := tables.FindPrimaryIndex(tblInfo)
if pk == nil {
return nil
}
offset := pk.Columns[0].Offset
return tblInfo.Columns[offset]
}
return nil
}
32 changes: 32 additions & 0 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/util/dbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463))
)

type utilSuite struct{}
Expand Down Expand Up @@ -170,3 +177,28 @@ func (s *utilSuite) TestInterpolateMySQLString(c *C) {
c.Assert(common.InterpolateMySQLString("1'23"), Equals, "'1''23'")
c.Assert(common.InterpolateMySQLString("1'2''3"), Equals, "'1''2''''3'")
}

func TestGetAutoRandomColumn(t *testing.T) {
tests := []struct {
ddl string
colName string
}{
{"create table t(c int)", ""},
{"create table t(c int auto_increment)", ""},
{"create table t(c bigint auto_random primary key)", "c"},
{"create table t(a int, c bigint auto_random primary key)", "c"},
{"create table t(c bigint auto_random, a int, primary key(c,a))", "c"},
{"create table t(a int, c bigint auto_random, primary key(c,a))", "c"},
}
p := parser.New()
for _, tt := range tests {
tableInfo, err := dbutil.GetTableInfoBySQL(tt.ddl, p)
require.NoError(t, err)
col := common.GetAutoRandomColumn(tableInfo)
if tt.colName == "" {
require.Nil(t, col, tt.ddl)
} else {
require.Equal(t, tt.colName, col.Name.L, tt.ddl)
}
}
}
Loading