From feecb2db108e976f374e9b8e1f1ef2a6968307f2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 6 Aug 2020 17:28:47 +0800 Subject: [PATCH 1/5] *: don't auto-resume for dump unit error --- dm/worker/task_checker.go | 10 +--------- mydumper/mydumper.go | 4 ++++ pkg/retry/errors.go | 2 ++ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index e9d81ad346..6d787ea031 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -269,21 +269,13 @@ func isResumableError(err *pb.ProcessError) bool { return false } } - for _, msg := range retry.UnsupportedDMLMsgs { - if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) { - return false - } - } - switch err.ErrCode { - case int32(terror.ErrParserParseRelayLog.Code()): + if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) { return false } } - case int32(terror.ErrDumpUnitGlobalLock.Code()): - return false } if _, ok := retry.UnresumableErrCodes[err.ErrCode]; ok { diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index fbbbe65544..09242f3e12 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -130,6 +130,10 @@ var mydumperLogRegexp = regexp.MustCompile( ) func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { + failpoint.Inject("dumpRuntimeError", func(_ failpoint.Value) { + failpoint.Return(0, terror.ErrDumpUnitRuntime) + }) + var ( stdout bytes.Buffer stderr bytes.Buffer diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index e4190c6109..88237adae5 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -49,6 +49,8 @@ var ( UnresumableErrCodes = map[int32]struct{}{ int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {}, int32(terror.ErrSyncerShardDDLConflict.Code()): {}, + int32(terror.ErrDumpUnitGlobalLock.Code()): {}, + int32(terror.ErrDumpUnitRuntime.Code()): {}, } ) From ff04b7a9c99cd213eea358e2d70832899af0ed46 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 6 Aug 2020 19:27:29 +0800 Subject: [PATCH 2/5] *: dump unit error can't auto-resume --- dm/worker/task_checker.go | 2 ++ mydumper/mydumper.go | 26 +++++++++++--------------- pkg/terror/error_list.go | 2 +- tests/full_mode/run.sh | 8 ++++++++ 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 6d787ea031..405166d4c6 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -294,8 +294,10 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, // TODO: use different strategies based on the error detail for _, processErr := range stStatus.Result.Errors { if !isResumableError(processErr) { + tsc.l.Info("error is not resumable", zap.Stringer("error", processErr)) return ResumeNoSense } + tsc.l.Info("error is resumable", zap.Stringer("error", processErr)) } // auto resume interval does not exceed backoff duration, skip this paused task diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 09242f3e12..7f7410b413 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -17,7 +17,6 @@ import ( "bufio" "bytes" "context" - "fmt" "os" "os/exec" "regexp" @@ -99,11 +98,11 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) { } // Cmd cannot be reused, so we create a new cmd when begin processing - output, err := m.spawn(ctx) + err = m.spawn(ctx) if err != nil { mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc() - errs = append(errs, unit.NewProcessError(fmt.Errorf("%s. %s", err.Error(), output))) + errs = append(errs, unit.NewProcessError(err)) } else { select { case <-ctx.Done(): @@ -129,9 +128,9 @@ var mydumperLogRegexp = regexp.MustCompile( `^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `, ) -func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { +func (m *Mydumper) spawn(ctx context.Context) error { failpoint.Inject("dumpRuntimeError", func(_ failpoint.Value) { - failpoint.Return(0, terror.ErrDumpUnitRuntime) + failpoint.Return(terror.ErrDumpUnitRuntime) }) var ( @@ -142,10 +141,10 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { cmd.Stdout = &stdout stderrPipe, err := cmd.StderrPipe() if err != nil { - return nil, terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, "") } if err = cmd.Start(); err != nil { - return nil, terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, "") } // Read the stderr from mydumper, which contained the logs. @@ -158,10 +157,8 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // store first error detected in mydumper's log // TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier // and using non-block IO to drain and output mydumper's stderr - var ( - firstErr error - errMsg []byte - ) + var firstErr error + for scanner.Scan() { line := scanner.Bytes() if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 { @@ -181,7 +178,6 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { m.logger.Error(string(msg)) if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") { firstErr = terror.ErrDumpUnitGlobalLock - errMsg = msg } continue } @@ -191,17 +187,17 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { } if firstErr != nil { - return errMsg, firstErr + return firstErr } if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) - return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes()) } err = cmd.Wait() stdout.Write(stderr.Bytes()) - return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes()) } // Close implements Unit.Close diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index afa406bbe9..4d4d5788a8 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -806,7 +806,7 @@ var ( ErrPreviousGTIDsNotValid = New(codePreviousGTIDsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "previousGTIDs %s not valid", "") // Dump unit error - ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "") + ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error, with output (may empty): %s", "") ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.") ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers") diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index cc2ee7b7d0..920af666e0 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -7,6 +7,8 @@ source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME function fail_acquire_global_lock() { + export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 @@ -32,8 +34,14 @@ function fail_acquire_global_lock() { sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml + # TaskCheckInterval set to 500ms + sleep 1 + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "Couldn't acquire global lock" + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "error is not resumable" check_log_contains $WORK_DIR/worker2/log/dm-worker.log "Couldn't acquire global lock" + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "error is not resumable" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Paused\"" 4 \ From fa6d72a74c86fcd4ebd09fb7bcd3123d5b53f4ad Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 6 Aug 2020 19:31:35 +0800 Subject: [PATCH 3/5] remove failpoint --- mydumper/mydumper.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 7f7410b413..433601c277 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -129,10 +129,6 @@ var mydumperLogRegexp = regexp.MustCompile( ) func (m *Mydumper) spawn(ctx context.Context) error { - failpoint.Inject("dumpRuntimeError", func(_ failpoint.Value) { - failpoint.Return(terror.ErrDumpUnitRuntime) - }) - var ( stdout bytes.Buffer stderr bytes.Buffer From a3afed6cd08cb0e3be82974abedabc6ea8af297c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 6 Aug 2020 20:14:59 +0800 Subject: [PATCH 4/5] fix bug --- _utils/terror_gen/errors_release.txt | 2 +- dm/worker/task_checker.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index ae914c05ec..825fd29567 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -211,7 +211,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high] ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s" ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid" ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid" -ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error" +ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error, with output (may empty): %s" ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file." ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers" diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 405166d4c6..7cb6c194ff 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -270,6 +270,12 @@ func isResumableError(err *pb.ProcessError) bool { } } + for _, msg := range retry.UnsupportedDMLMsgs { + if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) { + return false + } + } + if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) { From 223431249d19a49f309f8ebeac6a7c6e9f5b13f6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 7 Aug 2020 13:52:50 +0800 Subject: [PATCH 5/5] address comment --- dm/worker/task_checker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 7cb6c194ff..d09ad7d27f 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -300,10 +300,11 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, // TODO: use different strategies based on the error detail for _, processErr := range stStatus.Result.Errors { if !isResumableError(processErr) { - tsc.l.Info("error is not resumable", zap.Stringer("error", processErr)) + failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) { + tsc.l.Info("error is not resumable", zap.Stringer("error", processErr)) + }) return ResumeNoSense } - tsc.l.Info("error is resumable", zap.Stringer("error", processErr)) } // auto resume interval does not exceed backoff duration, skip this paused task