Skip to content

Commit

Permalink
test(dm): fix minor problems and improve CI (#5422)
Browse files Browse the repository at this point in the history
ref #4159, close #5385
  • Loading branch information
lance6716 authored May 19, 2022
1 parent 6148c45 commit c06ca1f
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 27 deletions.
15 changes: 9 additions & 6 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,14 @@ func (s *Server) waitOperationOk(
}

for num := 0; num < maxRetryNum; num++ {
if num > 0 {
select {
case <-ctx.Done():
return false, "", nil, ctx.Err()
case <-time.After(retryInterval):
}
}

// check whether source relative worker has been removed by scheduler
if _, ok := masterReq.(*pb.OperateSourceRequest); ok {
if expect == pb.Stage_Stopped {
Expand All @@ -1813,6 +1821,7 @@ func (s *Server) waitOperationOk(
}
}

// TODO: this is 30s, too long compared with retryInterval
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
if err != nil {
log.L().Error("fail to query operation",
Expand Down Expand Up @@ -1925,12 +1934,6 @@ func (s *Server) waitOperationOk(
log.L().Info("fail to get expect operation result", zap.Int("retryNum", num), zap.String("task", taskName),
zap.String("source", sourceID), zap.Stringer("expect", expect), zap.Stringer("resp", queryResp))
}

select {
case <-ctx.Done():
return false, "", nil, ctx.Err()
case <-time.After(retryInterval):
}
}

return false, "", nil, terror.ErrMasterFailToGetExpectResult
Expand Down
30 changes: 14 additions & 16 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Server struct {
sync.Mutex
wg sync.WaitGroup
kaWg sync.WaitGroup
httpWg sync.WaitGroup
closed atomic.Bool
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -207,25 +208,11 @@ func (s *Server) Start() error {
}
}(s.ctx)

httpExitCh := make(chan struct{}, 1)
s.wg.Add(1)
s.httpWg.Add(1)
go func() {
s.httpWg.Done()
InitStatus(httpL) // serve status
httpExitCh <- struct{}{}
}()
go func(ctx context.Context) {
defer s.wg.Done()
select {
case <-ctx.Done():
if s.rootLis != nil {
err2 := s.rootLis.Close()
if err2 != nil && !common.IsErrNetClosing(err2) {
log.L().Error("fail to close net listener", log.ShortError(err2))
}
}
case <-httpExitCh:
}
}(s.ctx)

s.closed.Store(false)
log.L().Info("listening gRPC API and status request", zap.String("address", s.cfg.WorkerAddr))
Expand Down Expand Up @@ -450,10 +437,21 @@ func (s *Server) doClose() {
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()

// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(false); w != nil {
w.Stop(true)
}

// close listener at last, so we can get status from it if worker failed to close in previous step
if s.rootLis != nil {
err2 := s.rootLis.Close()
if err2 != nil && !common.IsErrNetClosing(err2) {
log.L().Error("fail to close net listener", log.ShortError(err2))
}
}
s.httpWg.Wait()

s.closed.Store(true)
}

Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (w *SourceWorker) DisableHandleSubtasks() {

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
w.l.Info("handling subtask enabled")
w.l.Info("handling subtask disabled")
}

// fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status
Expand Down
5 changes: 3 additions & 2 deletions dm/tests/_utils/kill_dm_worker
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/sh

while :; do
for ((k = 0; k < 100; k++)); do
dm_worker_num=$(ps aux >temp && grep "dm-worker.test" temp | wc -l && rm temp)
echo "$dm_worker_num dm-worker alive"
if [ $dm_worker_num -ne 0 ]; then
killall dm-worker.test || true
sleep 1
else
break
exit 0
fi
done
exit 1
11 changes: 9 additions & 2 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,15 @@ function run_case() {
done
DM_${case}_CASE $args

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"
if [[ "$task_conf" == *"single"* ]]; then
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 2
elif [[ "$task_conf" == *"double"* ]]; then
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 3
fi

eval ${clean_table_cmd}

Expand Down
14 changes: 14 additions & 0 deletions dm/tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ stop_services() {
mysql -u root -h $MYSQL_HOST2 -P $MYSQL_PORT2 -p$MYSQL_PASSWORD2 -e "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'"
}

print_worker_stacks() {
if [ $? != 0 ]; then
mkdir -p "$TEST_DIR/goroutines/stack/log"
# don't know which case failed, so we print them all
for port in $MASTER_PORT1 $MASTER_PORT2 $MASTER_PORT3 $MASTER_PORT4 $MASTER_PORT5 $MASTER_PORT6; do
curl -sS "127.0.0.1:$port/debug/pprof/goroutine?debug=2" >"$TEST_DIR/goroutines/stack/log/master-$port.log" || true
done
for port in $WORKER1_PORT $WORKER2_PORT $WORKER3_PORT $WORKER4_PORT $WORKER5_PORT; do
curl -sS "127.0.0.1:$port/debug/pprof/goroutine?debug=2" >"$TEST_DIR/goroutines/stack/log/worker-$port.log" || true
done
fi
}

check_mysql() {
host=$1
port=$2
Expand Down Expand Up @@ -85,6 +98,7 @@ if [ $should_run -eq 0 ]; then
fi

trap stop_services EXIT
trap print_worker_stacks EXIT
start_services

function run() {
Expand Down

0 comments on commit c06ca1f

Please sign in to comment.