Skip to content

Commit

Permalink
Merge branch 'master' into derived_topn_latest
Browse files Browse the repository at this point in the history
  • Loading branch information
ghazalfamilyusa committed Feb 20, 2023
2 parents 0e10f17 + 60dda69 commit 45a7402
Show file tree
Hide file tree
Showing 142 changed files with 3,936 additions and 1,443 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ def go_deps():
name = "com_github_blacktear23_go_proxyprotocol",
build_file_proto_mode = "disable_global",
importpath = "github.com/blacktear23/go-proxyprotocol",
sum = "h1:zR7PZeoU0wAkElcIXenFiy3R56WB6A+UEVi4c6RH8wo=",
version = "v1.0.2",
sum = "h1:moi4x1lJlrQj2uYUJdEyCxqj9UNmaSKZwaGZIXnbAis=",
version = "v1.0.5",
)
go_repository(
name = "com_github_blizzy78_varnamelen",
Expand Down
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -403,18 +403,19 @@ check-bazel-prepare:
@echo "make bazel_prepare"
./tools/check/check-bazel-prepare.sh

bazel_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) \
bazel_test: failpoint-enable bazel_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...


bazel_coverage_test: check-bazel-prepare failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
Expand Down
2 changes: 1 addition & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ DUMPLING_GOTEST := CGO_ENABLED=1 GO111MODULE=on go test -ldflags '$(DUMPLING_LD
TEST_COVERAGE_DIR := "test_coverage"

ifneq ("$(CI)", "0")
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp --host_jvm_args=-XX:+UnlockExperimentalVMOptions --host_jvm_args=-XX:+UseZGC
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp
BAZEL_CMD_CONFIG := --config=ci --repository_cache=/home/jenkins/.tidb/tmp
endif
52 changes: 24 additions & 28 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ type genCol struct {
type autoIDConverter func(int64) int64

type tableKVEncoder struct {
tbl table.Table
se *session
recordCache []types.Datum
genCols []genCol
tbl table.Table
autoRandomColID int64
se *session
recordCache []types.Datum
genCols []genCol
// convert auto id for shard rowid or auto random id base on row id generated by lightning
autoIDFn autoIDConverter
metrics *metric.Metrics
Expand All @@ -84,17 +85,16 @@ func NewTableKVEncoder(
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

var autoRandomColID int64
autoIDFn := func(id int64) int64 { return id }
if meta.PKIsHandle && meta.ContainsAutoRandomBits() {
for _, col := range cols {
if mysql.HasPriKeyFlag(col.GetFlag()) {
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
}
break
}
if meta.ContainsAutoRandomBits() {
col := common.GetAutoRandomColumn(meta)
autoRandomColID = col.ID

shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
}
} else if meta.ShardRowIDBits > 0 {
rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec
Expand All @@ -114,11 +114,12 @@ func NewTableKVEncoder(
}

return &tableKVEncoder{
tbl: tbl,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
tbl: tbl,
autoRandomColID: autoRandomColID,
se: se,
genCols: genCols,
autoIDFn: autoIDFn,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -380,7 +381,7 @@ func (kvcodec *tableKVEncoder) Encode(

record = append(record, value)

if isTableAutoRandom(meta) && isPKCol(col.ToInfo()) {
if kvcodec.isAutoRandomCol(col.ToInfo()) {
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil {
Expand Down Expand Up @@ -438,18 +439,14 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

func isTableAutoRandom(tblMeta *model.TableInfo) bool {
return tblMeta.PKIsHandle && tblMeta.ContainsAutoRandomBits()
func (kvcodec *tableKVEncoder) isAutoRandomCol(col *model.ColumnInfo) bool {
return kvcodec.tbl.Meta().ContainsAutoRandomBits() && col.ID == kvcodec.autoRandomColID
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
}

func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

// GetEncoderIncrementalID return Auto increment id.
func GetEncoderIncrementalID(encoder Encoder, id int64) int64 {
return encoder.(*tableKVEncoder).autoIDFn(id)
Expand All @@ -471,7 +468,6 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
err error
)

tblMeta := kvcodec.tbl.Meta()
cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
Expand All @@ -494,7 +490,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
case isAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
case kvcodec.isAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,14 +1160,14 @@ func (local *local) WriteToTiKV(
if len(leaderPeerMetas) == 0 {
log.FromContext(ctx).Warn("write to tikv no leader", logutil.Region(region.Region), logutil.Leader(region.Leader),
zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size))
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize))
return nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
region.Region.Id, leaderID)
}

log.FromContext(ctx).Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID),
zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", totalSize),
zap.Int64("buf_size", bytesBuf.TotalSize()),
zap.Stringer("takeTime", time.Since(begin)))

Expand All @@ -1176,7 +1176,7 @@ func (local *local) WriteToTiKV(
firstKey := append([]byte{}, iter.Key()...)
finishedRange = Range{start: regionRange.start, end: firstKey}
log.FromContext(ctx).Info("write to tikv partial finish", zap.Int64("count", totalCount),
zap.Int64("size", size), logutil.Key("startKey", regionRange.start), logutil.Key("endKey", regionRange.end),
zap.Int64("size", totalSize), logutil.Key("startKey", regionRange.start), logutil.Key("endKey", regionRange.end),
logutil.Key("remainStart", firstKey), logutil.Key("remainEnd", regionRange.end),
logutil.Region(region.Region), logutil.Leader(region.Leader))
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ func (h *scanRegionEmptyHook) AfterScanRegions(res []*split.RegionInfo, err erro
}

func TestBatchSplitRegionByRangesScanFailed(t *testing.T) {
backup := split.ScanRegionAttemptTimes
split.ScanRegionAttemptTimes = 3
defer func() {
split.ScanRegionAttemptTimes = backup
}()
doTestBatchSplitRegionByRanges(context.Background(), t, &scanRegionEmptyHook{}, "scan region return empty result", defaultHook{})
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//errno",
"//parser/model",
"//store/driver/error",
"//table/tables",
"//util",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down Expand Up @@ -99,8 +100,10 @@ go_test(
"//br/pkg/errors",
"//br/pkg/lightning/log",
"//errno",
"//parser",
"//store/driver/error",
"//testkit/testsetup",
"//util/dbutil",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -416,3 +417,22 @@ func StringSliceEqual(a, b []string) bool {
}
return true
}

// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it.
// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
if !tblInfo.ContainsAutoRandomBits() {
return nil
}
if tblInfo.PKIsHandle {
return tblInfo.GetPkColInfo()
} else if tblInfo.IsCommonHandle {
pk := tables.FindPrimaryIndex(tblInfo)
if pk == nil {
return nil
}
offset := pk.Columns[0].Offset
return tblInfo.Columns[offset]
}
return nil
}
27 changes: 27 additions & 0 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/util/dbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -179,3 +181,28 @@ func TestInterpolateMySQLString(t *testing.T) {
assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23"))
assert.Equal(t, "'1''2''''3'", common.InterpolateMySQLString("1'2''3"))
}

func TestGetAutoRandomColumn(t *testing.T) {
tests := []struct {
ddl string
colName string
}{
{"create table t(c int)", ""},
{"create table t(c int auto_increment)", ""},
{"create table t(c bigint auto_random primary key)", "c"},
{"create table t(a int, c bigint auto_random primary key)", "c"},
{"create table t(c bigint auto_random, a int, primary key(c,a))", "c"},
{"create table t(a int, c bigint auto_random, primary key(c,a))", "c"},
}
p := parser.New()
for _, tt := range tests {
tableInfo, err := dbutil.GetTableInfoBySQL(tt.ddl, p)
require.NoError(t, err)
col := common.GetAutoRandomColumn(tableInfo)
if tt.colName == "" {
require.Nil(t, col, tt.ddl)
} else {
require.Equal(t, tt.colName, col.Name.L, tt.ddl)
}
}
}
3 changes: 2 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ type CSVConfig struct {
EscapedBy string `toml:"escaped-by" json:"escaped-by"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
StartingBy string `toml:"-" json:"-"`
AllowEmptyLine bool `toml:"-" json:"-"`
// For non-empty Delimiter (for example quotes), null elements inside quotes are not considered as null except for
// `\N` (when escape-by is `\`). That is to say, `\N` is special for null because it always means null.
QuotedNullIsText bool
Expand Down
22 changes: 13 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type CSVParser struct {
escFlavor escapeFlavor
// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
quotedNullIsText bool
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
}

type field struct {
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewCSVParser(
unquoteByteSet: makeByteSet(unquoteStopSet),
newLineByteSet: makeByteSet(newLineStopSet),
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
}, nil
}
Expand Down Expand Up @@ -446,7 +449,6 @@ outside:
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}
Expand Down Expand Up @@ -497,13 +499,15 @@ outside:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
if !parser.allowEmptyLine {
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
Expand Down
Loading

0 comments on commit 45a7402

Please sign in to comment.