diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 1ef228b09..c34e41f40 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -130,19 +130,17 @@ func (h *realRelayHolder) Close() { } func (h *realRelayHolder) run() { + if !h.setStageIfNot(pb.Stage_Running, pb.Stage_Running) { + return + } h.ctx, h.cancel = context.WithCancel(context.Background()) - pr := make(chan pb.ProcessResult, 1) h.setResult(nil) // clear previous result - h.setStage(pb.Stage_Running) - h.relay.Process(h.ctx, pr) + r := h.relay.Process(h.ctx) - for len(pr) > 0 { - r := <-pr - h.setResult(&r) - for _, err := range r.Errors { - h.l.Error("process error", zap.Stringer("type", err)) - } + h.setResult(&r) + for _, err := range r.Errors { + h.l.Error("process error", zap.Stringer("type", err)) } h.setStageIfNot(pb.Stage_Stopped, pb.Stage_Paused) @@ -243,12 +241,6 @@ func (h *realRelayHolder) Stage() pb.Stage { return h.stage } -func (h *realRelayHolder) setStage(stage pb.Stage) { - h.Lock() - defer h.Unlock() - h.stage = stage -} - // setStageIfNot sets stage to newStage if its current value is not oldStage, similar to CAS. func (h *realRelayHolder) setStageIfNot(oldStage, newStage pb.Stage) bool { h.Lock() diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index cccc9649b..bd51d24ef 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -63,9 +63,9 @@ func (d *DummyRelay) InjectInitError(err error) { } // Process implements Process interface. -func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) { +func (d *DummyRelay) Process(ctx context.Context) pb.ProcessResult { <-ctx.Done() - pr <- d.processResult + return d.processResult } // InjectProcessResult injects process result. diff --git a/relay/relay.go b/relay/relay.go index 12cdf3a62..db40f900c 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/dm/pkg/binlog/common" binlogReader "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/conn" - fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" pkgstreamer "github.com/pingcap/dm/pkg/streamer" @@ -74,7 +73,7 @@ type Process interface { // Init initial relat log unit Init(ctx context.Context) (err error) // Process run background logic of relay log unit - Process(ctx context.Context, pr chan pb.ProcessResult) + Process(ctx context.Context) pb.ProcessResult // ActiveRelayLog returns the earliest active relay log info in this operator ActiveRelayLog() *pkgstreamer.RelayLogInfo // Reload reloads config @@ -129,41 +128,13 @@ func NewRealRelay(cfg *Config) Process { } // Init implements the dm.Unit interface. +// NOTE when Init encounters an error, it will make DM-worker exit when it boots up and assigned relay. func (r *Relay) Init(ctx context.Context) (err error) { - rollbackHolder := fr.NewRollbackHolder("relay") - defer func() { - if err != nil { - rollbackHolder.RollbackReverseOrder() - } - }() - - err = r.setSyncConfig() - if err != nil { - return err - } - - db, err := conn.DefaultDBProvider.Apply(r.cfg.From) - if err != nil { - return terror.WithScope(err, terror.ScopeUpstream) - } - - r.db = db - rollbackHolder.Add(fr.FuncRollback{Name: "close-DB", Fn: r.closeDB}) - - if err2 := os.MkdirAll(r.cfg.RelayDir, 0o755); err2 != nil { - return terror.ErrRelayMkdir.Delegate(err2) - } - - err = r.meta.Load() - if err != nil { - return err - } - return reportRelayLogSpaceInBackground(ctx, r.cfg.RelayDir) } // Process implements the dm.Unit interface. -func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) { +func (r *Relay) Process(ctx context.Context) pb.ProcessResult { errs := make([]*pb.ProcessError, 0, 1) err := r.process(ctx) if err != nil && errors.Cause(err) != replication.ErrSyncClosed { @@ -181,13 +152,33 @@ func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } } - pr <- pb.ProcessResult{ + return pb.ProcessResult{ IsCanceled: isCanceled, Errors: errs, } } func (r *Relay) process(ctx context.Context) error { + err := r.setSyncConfig() + if err != nil { + return err + } + + db, err := conn.DefaultDBProvider.Apply(r.cfg.From) + if err != nil { + return terror.WithScope(err, terror.ScopeUpstream) + } + r.db = db + + if err2 := os.MkdirAll(r.cfg.RelayDir, 0o755); err2 != nil { + return terror.ErrRelayMkdir.Delegate(err2) + } + + err = r.meta.Load() + if err != nil { + return err + } + parser2, err := utils.GetParser(ctx, r.db.DB) // refine to use user config later if err != nil { return err diff --git a/tests/new_relay/run.sh b/tests/new_relay/run.sh index 6e987f67c..e25fff145 100755 --- a/tests/new_relay/run.sh +++ b/tests/new_relay/run.sh @@ -10,6 +10,39 @@ SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt" API_VERSION="v1alpha1" +function test_cant_dail_upstream() { + cleanup_data $TEST_NAME + cleanup_process + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 1 + + kill_dm_worker + + export GO_FAILPOINTS="github.com/pingcap/dm/pkg/conn/failDBPing=return()" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # make sure DM-worker doesn't exit + sleep 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "injected error" 1 + + export GO_FAILPOINTS="" + cleanup_process + cleanup_data $TEST_NAME +} + function test_kill_dump_connection() { cleanup_data $TEST_NAME cleanup_process @@ -50,6 +83,8 @@ function test_kill_dump_connection() { } function run() { + test_cant_dail_upstream + export GO_FAILPOINTS="github.com/pingcap/dm/relay/ReportRelayLogSpaceInBackground=return(1)" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1