Skip to content

Commit

Permalink
Merge branch 'master' into issue-31372
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Jan 7, 2022
2 parents 26633cc + c27f8f6 commit fa25af0
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 110 deletions.
44 changes: 18 additions & 26 deletions br/pkg/lightning/log/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,60 @@ package log_test
import (
"regexp"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var _ = Suite(&testFilterSuite{})

type testFilterSuite struct{}

func (s *testFilterSuite) TestFilter(c *C) {
func TestFilter(t *testing.T) {
logger, buffer := log.MakeTestLogger()
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`,
)

require.Equal(t, `{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`, buffer.Stripped())

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/br/")
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(buffer.Stripped(), HasLen, 0)
require.Len(t, buffer.Stripped(), 0)

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/tidb/br/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","a":"b","number":123456,"array":[7,8,9]}`,
)
require.Equal(t, `{"$lvl":"WARN","$msg":"the message","a":"b","number":123456,"array":[7,8,9]}`, buffer.Stripped())

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/br/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(buffer.Stripped(), HasLen, 0)
require.Len(t, buffer.Stripped(), 0)

// Fields won't trigger filter.
logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/check/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.String("stack", "github.com/pingcap/tidb/br/"))
c.Assert(buffer.Stripped(), HasLen, 0)
require.Len(t, buffer.Stripped(), 0)
}

// PASS: filter_test.go:82: testFilterSuite.BenchmarkFilterRegexMatchString 1000000 1163 ns/op
// PASS: filter_test.go:64: testFilterSuite.BenchmarkFilterStringsContains 10000000 159 ns/op
// BenchmarkFilterStringsContains-16 16693887 66.68 ns/op
// BenchmarkFilterRegexMatchString-16 2350828 510.6 ns/op
//
// Run `go test github.com/pingcap/tidb/br/pkg/lightning/log -check.b -test.v` to get benchmark result.
func (s *testFilterSuite) BenchmarkFilterStringsContains(c *C) {
c.ResetTimer()
// Run `go test -run='^$' -bench=. -v github.com/pingcap/tidb/br/pkg/lightning/log` to get benchmark result.
func BenchmarkFilterStringsContains(b *testing.B) {
b.ResetTimer()

inputs := []string{
"github.com/pingcap/tidb/some/package/path",
"github.com/tikv/pd/some/package/path",
"github.com/pingcap/tidb/br/some/package/path",
}
filters := []string{"github.com/pingcap/tidb/", "github.com/tikv/pd/"}
for i := 0; i < c.N; i++ {
for i := 0; i < b.N; i++ {
for i := range inputs {
for j := range filters {
_ = strings.Contains(inputs[i], filters[j])
Expand All @@ -75,16 +67,16 @@ func (s *testFilterSuite) BenchmarkFilterStringsContains(c *C) {
}
}

func (s *testFilterSuite) BenchmarkFilterRegexMatchString(c *C) {
c.ResetTimer()
func BenchmarkFilterRegexMatchString(b *testing.B) {
b.ResetTimer()

inputs := []string{
"github.com/pingcap/tidb/some/package/path",
"github.com/tikv/pd/some/package/path",
"github.com/pingcap/tidb/br/some/package/path",
}
filters := regexp.MustCompile(`github.com/(pingcap/tidb|tikv/pd)/`)
for i := 0; i < c.N; i++ {
for i := 0; i < b.N; i++ {
for i := range inputs {
_ = filters.MatchString(inputs[i])
}
Expand Down
43 changes: 0 additions & 43 deletions br/pkg/lightning/log/log_serial_test.go

This file was deleted.

60 changes: 42 additions & 18 deletions br/pkg/lightning/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,61 @@
package log_test

import (
"io"
"os"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestLog(t *testing.T) {
TestingT(t)
}

type logSuite struct{}

var _ = Suite(&logSuite{})

func (s *logSuite) TestConfigAdjust(c *C) {
func TestConfigAdjust(t *testing.T) {
cfg := &log.Config{}
cfg.Adjust()
c.Assert(cfg.Level, Equals, "info")
require.Equal(t, "info", cfg.Level)

cfg.File = "."
err := log.InitLogger(cfg, "info")
c.Assert(err, ErrorMatches, "can't use directory as log file name")
log.L().Named("xx")
require.EqualError(t, err, "can't use directory as log file name")
}

func (s *logSuite) TestTestLogger(c *C) {
func TestTestLogger(t *testing.T) {
logger, buffer := log.MakeTestLogger()
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`,
)
require.Equal(t, `{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`, buffer.Stripped())
}

func TestInitStdoutLogger(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
oldStdout := os.Stdout
os.Stdout = w

msg := "logger is initialized to stdout"
outputC := make(chan string, 1)
go func() {
buf := make([]byte, 4096)
n := 0
for {
nn, err := r.Read(buf[n:])
if nn == 0 || err == io.EOF {
break
}
require.NoError(t, err)
n += nn
}
outputC <- string(buf[:n])
}()

logCfg := &log.Config{File: "-"}
err = log.InitLogger(logCfg, "info")
require.NoError(t, err)
log.L().Info(msg)

os.Stdout = oldStdout
require.NoError(t, w.Close())
output := <-outputC
require.NoError(t, r.Close())
require.Contains(t, output, msg)
}
8 changes: 6 additions & 2 deletions ddl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ func TestDefaultValueInEnum(t *testing.T) {
tk.MustExec("insert into t values (1), (2);") // Use 1-base index to locate the value.
tk.MustQuery("select a from t;").Check(testkit.Rows("a", "")) // 0x91 is truncate.
tk.MustExec("drop table t;")
tk.MustExec("create table t (a enum('a', 0x91)) charset gbk;") // Test for table charset.
tk.MustExec("insert into t values (1), (2);")
tk.MustQuery("select a from t;").Check(testkit.Rows("a", ""))
tk.MustExec("drop table t;")
tk.MustGetErrMsg("create table t(a set('a', 0x91, '') charset gbk);",
"[types:1291]Column 'a' has duplicated value '' in SET")
// Test valid gbk string value in enum.
tk.MustExec("create table t (a enum('a', 0xC4E3BAC3) charset gbk);")
// Test valid utf-8 string value in enum. Note that the binary literal only can be decoded to utf-8.
tk.MustExec("create table t (a enum('a', 0xE4BDA0E5A5BD) charset gbk);")
tk.MustExec("insert into t values (1), (2);")
tk.MustQuery("select a from t;").Check(testkit.Rows("a", "你好"))
}
70 changes: 51 additions & 19 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
if err != nil {
task = &indexHashJoinTask{err: err}
if ow.keepOuterOrder {
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 1)
// The outerBuilder and innerFetcher run concurrently, we may
// get 2 errors at simultaneously. Thus the capacity of task.resultCh
// needs to be initialized to 2 to avoid waiting.
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 2)
ow.pushToChan(ctx, task, ow.taskCh)
}
ow.pushToChan(ctx, task, ow.innerCh)
Expand Down Expand Up @@ -520,19 +523,26 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
if joinResult.err != nil {
resultCh <- joinResult
return
}
// When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last
// joinResult will be checked when the a task has been processed, thus we do
// not need to check it here again.
if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
// When task.keepOuterOrder is TRUE (resultCh != iw.resultCh):
// - the last joinResult will be handled when the task has been processed,
// thus we DO NOT need to check it here again.
// - we DO NOT check the error here neither, because:
// - if the error is from task.err, the main thread will check the error of each task
// - if the error is from handleTask, the error will be handled in handleTask
// We should not check `task != nil && !task.keepOuterOrder` here since it's
// possible that `join.chk.NumRows > 0` is true even if task == nil.
if resultCh == iw.resultCh {
if joinResult.err != nil {
resultCh <- joinResult
return
}
if joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return
}
}
}
}

Expand All @@ -550,6 +560,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil)
if iw.stats != nil {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -599,19 +610,26 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents)
}

func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) {
func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(resultCh chan *indexHashJoinResult, err error) {
defer func() {
iw.wg.Done()
iw.lookup.workerWg.Done()
}()
if r != nil {
iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)}
if err != nil {
resultCh <- &indexHashJoinResult{err: err}
}
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
defer func() {
iw.memTracker.Consume(-iw.memTracker.BytesConsumed())
if task.keepOuterOrder {
if err != nil {
joinResult.err = err
resultCh <- joinResult
}
close(resultCh)
}
}()
var joinStartTime time.Time
if iw.stats != nil {
Expand All @@ -631,9 +649,21 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
func(r interface{}) {
var err error
if r != nil {
err = errors.Errorf("%v", r)
}
iw.handleHashJoinInnerWorkerPanic(resultCh, err)
},
)
err = iw.fetchInnerResults(ctx, task.lookUpJoinTask)
iw.wg.Wait()
// check error after wg.Wait to make sure error message can be sent to
// resultCh even if panic happen in buildHashTableForOuterResult.
failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() {
err = errors.New("IndexHashJoinFetchInnerResultsErr")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -783,13 +813,15 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind
joinResult.src <- joinResult.chk
}
}
close(resultCh)
}()
for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ {
for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ {
row := chk.GetRow(j)
ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf)
failpoint.Inject("TestIssue31129", func() {
err = errors.New("TestIssue31129")
})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit fa25af0

Please sign in to comment.