From abc2c458f8926453c4a2263ad37c2d74e1385c62 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 6 Aug 2023 01:14:31 +0200 Subject: [PATCH 1/2] fix: incorrect_worker_state Signed-off-by: Valery Piashchynski --- .golangci.yml | 1 - events/events.go | 4 +++ fsm/fsm.go | 40 +++++------------------------ fsm/state.go | 24 +++++++---------- go.mod | 8 +++--- go.sum | 33 ++++++------------------ pool/static_pool/supervisor.go | 39 +++++++++++++++++++++------- pool/static_pool/supervisor_test.go | 2 +- worker/worker.go | 33 ++++++++++++++---------- worker_watcher/worker_watcher.go | 5 +++- 10 files changed, 86 insertions(+), 103 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 6a597e9..9e75523 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -50,7 +50,6 @@ linters: # All available linters list: Worker Stopped here */ - workers[i].State().Transition(fsm.StateInvalid) + + // if the worker in the StateReady, it means, that it's not working on the request and we can safely stop/kill it + // but if the worker in the any other state, we can't stop it, because it might be in the middle of the request execution, instead, we're setting the Invalid state + if workers[i].State().Compare(fsm.StateReady) { + workers[i].State().Transition(fsm.StateTTLReached) + } else { + workers[i].State().Transition(fsm.StateInvalid) + } + sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) continue } @@ -98,7 +111,15 @@ func (sp *Pool) control() { TTL Reached, state - invalid | -----> Worker Stopped here */ - workers[i].State().Transition(fsm.StateInvalid) + + // if the worker in the StateReady, it means, that it's not working on the request and we can safely stop/kill it + // but if the worker in the any other state, we can't stop it, because it might be in the middle of the request execution, instead, we're setting the Invalid state + if workers[i].State().Compare(fsm.StateReady) { + workers[i].State().Transition(fsm.StateMaxMemoryReached) + } else { + workers[i].State().Transition(fsm.StateInvalid) + } + sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String())) continue } diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index b112e63..aca0e09 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -290,7 +290,7 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { time.Sleep(time.Second) assert.NotEqual(t, pid, p.Workers()[0].Pid()) - require.Equal(t, p.Workers()[0].State().CurrentState(), fsm.StateReady) + assert.Equal(t, p.Workers()[0].State().CurrentState(), fsm.StateReady) pid = p.Workers()[0].Pid() resp, err = p.Exec(ctx, &payload.Payload{ diff --git a/worker/worker.go b/worker/worker.go index 292e052..f0fe944 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -235,10 +235,14 @@ func (w *Process) Exec(p *payload.Payload) (*payload.Payload, error) { rsp, err := w.execPayload(p) w.State().RegisterExec() if err != nil && !errors.Is(errors.Stop, err) { - // just to be more verbose - if !errors.Is(errors.SoftJob, err) { - w.State().Transition(fsm.StateErrored) + if errors.Is(errors.SoftJob, err) { + // transfer + if w.State().Compare(fsm.StateWorking) { + w.State().Transition(fsm.StateReady) + } } + + w.State().Transition(fsm.StateErrored) return nil, errors.E(op, err) } @@ -279,20 +283,25 @@ func (w *Process) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload go func() { rsp, err := w.execPayload(p) + w.State().RegisterExec() if err != nil { - // just to be more verbose - if !errors.Is(errors.SoftJob, err) { - w.State().Transition(fsm.StateErrored) - w.State().RegisterExec() - } c <- wexec{ err: errors.E(op, err), } + + // just to be more verbose + if errors.Is(errors.SoftJob, err) { + if w.State().Compare(fsm.StateWorking) { + w.State().Transition(fsm.StateReady) + } + return + } + + w.State().Transition(fsm.StateErrored) return } if !w.State().Compare(fsm.StateWorking) { - w.State().RegisterExec() c <- wexec{ payload: rsp, err: nil, @@ -301,7 +310,6 @@ func (w *Process) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload } w.State().Transition(fsm.StateReady) - w.State().RegisterExec() c <- wexec{ payload: rsp, @@ -334,9 +342,9 @@ func (w *Process) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { const op = errors.Op("process_stop") + w.fsm.Transition(fsm.StateStopping) go func() { - w.fsm.Transition(fsm.StateStopping) w.log.Debug("sending stop request to the worker", zap.Int("pid", w.pid)) err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) if err == nil { @@ -353,7 +361,6 @@ func (w *Process) Stop() error { case <-time.After(time.Second * 10): // kill process w.log.Warn("worker doesn't respond on stop command, killing process", zap.Int64("PID", w.Pid())) - w.fsm.Transition(fsm.StateKilling) _ = w.cmd.Process.Signal(os.Kill) w.fsm.Transition(fsm.StateStopped) return errors.E(op, errors.Network) @@ -363,7 +370,7 @@ func (w *Process) Stop() error { // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not wait for process completion! func (w *Process) Kill() error { - w.fsm.Transition(fsm.StateKilling) + w.fsm.Transition(fsm.StateStopping) err := w.cmd.Process.Kill() if err != nil { return err diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 601d3ff..95cab64 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -192,8 +192,11 @@ func (ww *WorkerWatcher) Release(w *worker.Process) { fsm.StateErrored, fsm.StateWorking, fsm.StateInvalid, + fsm.StateMaxMemoryReached, + fsm.StateMaxJobsReached, fsm.StateIdleTTLReached, - fsm.StateMaxJobsReached: + fsm.StateTTLReached, + fsm.StateExecTTLReached: err := w.Stop() if err != nil { From 0f9bb09eca9a8925f190c8cf610ed0449bb9a0fd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 6 Aug 2023 01:21:20 +0200 Subject: [PATCH 2/2] chore: update CI Signed-off-by: Valery Piashchynski --- .github/workflows/linux.yml | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 69f18f4..c8c83c6 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -1,22 +1,6 @@ name: Linux -on: - push: - branches: - - master - - beta - - stable - tags-ignore: - - "**" - paths-ignore: - - "**.md" - - "**.yaml" - - "**.yml" - pull_request: - paths-ignore: - - "**.md" - - "**.yaml" - - "**.yml" +on: [push, pull_request] jobs: golang: