Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

relay: move dailing upstream and other action out of Init #2227

Merged
merged 8 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,17 @@ func (h *realRelayHolder) Close() {
}

func (h *realRelayHolder) run() {
if !h.setStageIfNot(pb.Stage_Running, pb.Stage_Running) {
return
}
h.ctx, h.cancel = context.WithCancel(context.Background())
pr := make(chan pb.ProcessResult, 1)
h.setResult(nil) // clear previous result
h.setStage(pb.Stage_Running)

h.relay.Process(h.ctx, pr)
r := h.relay.Process(h.ctx)

for len(pr) > 0 {
r := <-pr
h.setResult(&r)
for _, err := range r.Errors {
h.l.Error("process error", zap.Stringer("type", err))
}
h.setResult(&r)
for _, err := range r.Errors {
h.l.Error("process error", zap.Stringer("type", err))
}

h.setStageIfNot(pb.Stage_Stopped, pb.Stage_Paused)
Expand Down Expand Up @@ -243,12 +241,6 @@ func (h *realRelayHolder) Stage() pb.Stage {
return h.stage
}

func (h *realRelayHolder) setStage(stage pb.Stage) {
h.Lock()
defer h.Unlock()
h.stage = stage
}

// setStageIfNot sets stage to newStage if its current value is not oldStage, similar to CAS.
func (h *realRelayHolder) setStageIfNot(oldStage, newStage pb.Stage) bool {
h.Lock()
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (d *DummyRelay) InjectInitError(err error) {
}

// Process implements Process interface.
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) {
func (d *DummyRelay) Process(ctx context.Context) pb.ProcessResult {
<-ctx.Done()
pr <- d.processResult
return d.processResult
}

// InjectProcessResult injects process result.
Expand Down
57 changes: 24 additions & 33 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/dm/pkg/binlog/common"
binlogReader "github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/conn"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
Expand Down Expand Up @@ -74,7 +73,7 @@ type Process interface {
// Init initial relat log unit
Init(ctx context.Context) (err error)
// Process run background logic of relay log unit
Process(ctx context.Context, pr chan pb.ProcessResult)
Process(ctx context.Context) pb.ProcessResult
// ActiveRelayLog returns the earliest active relay log info in this operator
ActiveRelayLog() *pkgstreamer.RelayLogInfo
// Reload reloads config
Expand Down Expand Up @@ -129,41 +128,13 @@ func NewRealRelay(cfg *Config) Process {
}

// Init implements the dm.Unit interface.
// NOTE when Init encounters an error, it will make DM-worker exit when it boots up and assigned relay.
func (r *Relay) Init(ctx context.Context) (err error) {
rollbackHolder := fr.NewRollbackHolder("relay")
defer func() {
if err != nil {
rollbackHolder.RollbackReverseOrder()
}
}()

err = r.setSyncConfig()
if err != nil {
return err
}

db, err := conn.DefaultDBProvider.Apply(r.cfg.From)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}

r.db = db
rollbackHolder.Add(fr.FuncRollback{Name: "close-DB", Fn: r.closeDB})

if err2 := os.MkdirAll(r.cfg.RelayDir, 0o755); err2 != nil {
return terror.ErrRelayMkdir.Delegate(err2)
}

err = r.meta.Load()
if err != nil {
return err
}

return reportRelayLogSpaceInBackground(ctx, r.cfg.RelayDir)
}

// Process implements the dm.Unit interface.
func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) {
func (r *Relay) Process(ctx context.Context) pb.ProcessResult {
errs := make([]*pb.ProcessError, 0, 1)
err := r.process(ctx)
if err != nil && errors.Cause(err) != replication.ErrSyncClosed {
Expand All @@ -181,13 +152,33 @@ func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) {
default:
}
}
pr <- pb.ProcessResult{
return pb.ProcessResult{
IsCanceled: isCanceled,
Errors: errs,
}
}

func (r *Relay) process(ctx context.Context) error {
err := r.setSyncConfig()
if err != nil {
return err
}

db, err := conn.DefaultDBProvider.Apply(r.cfg.From)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}
r.db = db

if err2 := os.MkdirAll(r.cfg.RelayDir, 0o755); err2 != nil {
return terror.ErrRelayMkdir.Delegate(err2)
}

err = r.meta.Load()
if err != nil {
return err
}

parser2, err := utils.GetParser(ctx, r.db.DB) // refine to use user config later
if err != nil {
return err
Expand Down
35 changes: 35 additions & 0 deletions tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,39 @@ SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt"

API_VERSION="v1alpha1"

function test_cant_dail_upstream() {
cleanup_data $TEST_NAME
cleanup_process

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1" \
"\"result\": true" 1

kill_dm_worker

export GO_FAILPOINTS="github.com/pingcap/dm/pkg/conn/failDBPing=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# make sure DM-worker doesn't exit
sleep 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"injected error" 1

export GO_FAILPOINTS=""
cleanup_process
cleanup_data $TEST_NAME
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand Down Expand Up @@ -50,6 +83,8 @@ function test_kill_dump_connection() {
}

function run() {
test_cant_dail_upstream

export GO_FAILPOINTS="github.com/pingcap/dm/relay/ReportRelayLogSpaceInBackground=return(1)"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down