Skip to content

Commit f9bbaf1

Browse files
authored
feat(sdk): allow to stop goroutine (#6754)
Signed-off-by: richardlt <richard.le.terrier@gmail.com>
1 parent 45814f6 commit f9bbaf1

File tree

9 files changed

+108
-68
lines changed

9 files changed

+108
-68
lines changed

engine/hooks/hooks_handlers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,9 @@ func (s *Service) deleteTaskBulkHandler() service.Handler {
547547
return sdk.WithStack(err)
548548
}
549549

550-
for uuid := range hooks {
550+
for _, h := range hooks {
551551
//Load the task
552-
t := s.Dao.FindTask(ctx, uuid)
552+
t := s.Dao.FindTask(ctx, h.UUID)
553553
if t == nil {
554554
continue
555555
}

engine/hooks/kafka.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/Shopify/sarama"
1111
"github.com/fsamin/go-dump"
12+
"github.com/pkg/errors"
1213
"github.com/rockbears/log"
1314

1415
"github.com/ovh/cds/sdk"
@@ -29,6 +30,10 @@ func (s *Service) saveKafkaExecution(t *sdk.Task, error string, nbError int64) {
2930
s.Dao.SaveTaskExecution(exec)
3031
}
3132

33+
func (s *Service) stopKafkaHook(t *sdk.Task) {
34+
s.GoRoutines.Stop("kafka-consume-" + t.UUID)
35+
}
36+
3237
func (s *Service) startKafkaHook(ctx context.Context, t *sdk.Task) error {
3338
var kafkaIntegration, projectKey, topic string
3439
for k, v := range t.Config {
@@ -96,12 +101,12 @@ func (s *Service) startKafkaHook(ctx context.Context, t *sdk.Task) error {
96101
dao: &s.Dao,
97102
}
98103

99-
s.GoRoutines.Exec(context.Background(), "kafka-consume-"+topic, func(ctx context.Context) {
104+
s.GoRoutines.Run(s.Router.Background, "kafka-consume-"+t.UUID, func(ctx context.Context) {
100105
atomic.AddInt64(&nbKafkaConsumers, 1)
101106
defer atomic.AddInt64(&nbKafkaConsumers, -1)
102-
for {
107+
for ctx.Err() == nil {
103108
if err := consumerGroup.Consume(ctx, []string{topic}, h); err != nil {
104-
log.Error(ctx, "error on consume:%s", err)
109+
log.ErrorWithStackTrace(ctx, errors.WithMessage(err, "error on consume"))
105110
}
106111
}
107112
})

engine/hooks/tasks.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -382,15 +382,19 @@ func (s *Service) stopTask(ctx context.Context, t *sdk.Task) error {
382382
}
383383

384384
switch t.Type {
385-
case TypeWebHook, TypeScheduler, TypeRepoManagerWebHook, TypeRepoPoller, TypeKafka, TypeWorkflowHook:
385+
case TypeWebHook, TypeScheduler, TypeRepoManagerWebHook, TypeRepoPoller, TypeWorkflowHook:
386386
log.Debug(ctx, "Hooks> Tasks %s has been stopped", t.UUID)
387387
return nil
388+
case TypeKafka:
389+
s.stopKafkaHook(t)
390+
log.Debug(ctx, "Hooks> Kafka Task %s has been stopped", t.UUID)
391+
return nil
388392
case TypeGerrit:
389393
s.stopGerritHookTask(t)
390394
log.Debug(ctx, "Hooks> Gerrit Task %s has been stopped", t.UUID)
391395
return nil
392396
default:
393-
return fmt.Errorf("Unsupported task type %s", t.Type)
397+
return fmt.Errorf("unsupported task type %s", t.Type)
394398
}
395399
}
396400

sdk/goroutine.go

+39-3
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525

2626
type GoRoutine struct {
2727
ctx context.Context
28+
cancel func()
2829
Name string
2930
Func func(ctx context.Context)
3031
Restart bool
3132
Active bool
33+
mutex sync.RWMutex
3234
}
3335

3436
// GoRoutines contains list of routines that have to stay up
@@ -48,6 +50,20 @@ func (m *GoRoutines) GoRoutine(name string) *GoRoutine {
4850
return nil
4951
}
5052

53+
func (m *GoRoutines) Stop(name string) {
54+
m.mutex.Lock()
55+
defer m.mutex.Unlock()
56+
for i, g := range m.status {
57+
if g.Name == name {
58+
if g.cancel != nil {
59+
g.cancel()
60+
}
61+
m.status = append(m.status[:i], m.status[i+1:]...)
62+
break
63+
}
64+
}
65+
}
66+
5167
// NewGoRoutines instanciates a new GoRoutineManager
5268
func NewGoRoutines(ctx context.Context) *GoRoutines {
5369
m := &GoRoutines{}
@@ -63,6 +79,12 @@ func (m *GoRoutines) restartGoRoutines(ctx context.Context) {
6379
for {
6480
select {
6581
case <-ctx.Done():
82+
for _, g := range m.status {
83+
if g.cancel != nil {
84+
g.cancel()
85+
}
86+
}
87+
m.status = nil
6688
return
6789
case <-t.C:
6890
m.runRestartGoRoutines(ctx)
@@ -74,7 +96,10 @@ func (m *GoRoutines) runRestartGoRoutines(ctx context.Context) {
7496
m.mutex.Lock()
7597
defer m.mutex.Unlock()
7698
for _, g := range m.status {
77-
if !g.Active && g.Restart {
99+
g.mutex.RLock()
100+
active := g.Active
101+
g.mutex.RUnlock()
102+
if !active && g.Restart {
78103
log.Info(ctx, "restarting goroutine %q", g.Name)
79104
m.exec(g)
80105
}
@@ -83,10 +108,12 @@ func (m *GoRoutines) runRestartGoRoutines(ctx context.Context) {
83108

84109
// Run runs the function within a goroutine with a panic recovery, and keep GoRoutine status.
85110
func (m *GoRoutines) Run(c context.Context, name string, fn func(ctx context.Context)) {
111+
ctx, cancel := context.WithCancel(c)
86112
m.mutex.Lock()
87113
defer m.mutex.Unlock()
88114
g := &GoRoutine{
89-
ctx: c,
115+
ctx: ctx,
116+
cancel: cancel,
90117
Name: name,
91118
Func: fn,
92119
Active: true,
@@ -99,10 +126,12 @@ func (m *GoRoutines) Run(c context.Context, name string, fn func(ctx context.Con
99126
// RunWithRestart runs the function within a goroutine with a panic recovery, and keep GoRoutine status.
100127
// if the goroutine is stopped, it will ne restarted
101128
func (m *GoRoutines) RunWithRestart(c context.Context, name string, fn func(ctx context.Context)) {
129+
ctx, cancel := context.WithCancel(c)
102130
m.mutex.Lock()
103131
defer m.mutex.Unlock()
104132
g := &GoRoutine{
105-
ctx: c,
133+
ctx: ctx,
134+
cancel: cancel,
106135
Name: name,
107136
Func: fn,
108137
Active: true,
@@ -121,10 +150,12 @@ func (m *GoRoutines) GetStatus() []MonitoringStatusLine {
121150
for _, g := range m.status {
122151
status := MonitoringStatusAlert
123152
value := "NOT running"
153+
g.mutex.RLock()
124154
if g.Active {
125155
status = MonitoringStatusOK
126156
value = "Running"
127157
}
158+
g.mutex.RUnlock()
128159
lines[i] = MonitoringStatusLine{
129160
Status: status,
130161
Component: "goroutine/" + g.Name,
@@ -137,6 +168,7 @@ func (m *GoRoutines) GetStatus() []MonitoringStatusLine {
137168

138169
func (m *GoRoutines) exec(g *GoRoutine) {
139170
hostname, _ := os.Hostname()
171+
140172
go func(ctx context.Context) {
141173
ctx = context.WithValue(ctx, cdslog.Goroutine, g.Name)
142174

@@ -155,10 +187,14 @@ func (m *GoRoutines) exec(g *GoRoutine) {
155187
ctx = context.WithValue(ctx, cdslog.Stacktrace, string(buf))
156188
log.Error(ctx, "[PANIC][%s] %s failed", hostname, g.Name)
157189
}
190+
g.mutex.Lock()
158191
g.Active = false
192+
g.mutex.Unlock()
159193
}()
160194

195+
g.mutex.Lock()
161196
g.Active = true
197+
g.mutex.Unlock()
162198
g.Func(goroutineCtx)
163199
}(g.ctx)
164200
}

sdk/goroutine_test.go

+48-50
Original file line numberDiff line numberDiff line change
@@ -3,78 +3,76 @@ package sdk
33
import (
44
"bytes"
55
"context"
6-
"sync"
76
"testing"
87
"time"
98

10-
"github.com/stretchr/testify/assert"
119
"github.com/stretchr/testify/require"
1210
)
1311

1412
func Test_GoroutineTools(t *testing.T) {
1513
t.Run("GoroutineID()", func(t *testing.T) {
16-
id := GoroutineID()
17-
var zero uint64
18-
assert.NotEqual(t, zero, id)
14+
require.NotEqual(t, uint64(0), GoroutineID())
1915
})
2016

21-
t.Run("writeGoroutineStacks(...)", func(t *testing.T) {
22-
ctx := context.Background()
23-
var wg = new(sync.WaitGroup)
24-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
25-
defer cancel()
26-
NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
27-
wg.Add(1)
28-
<-ctx.Done()
29-
wg.Done()
30-
})
31-
17+
t.Run("GoRoutineStacks(...)", func(t *testing.T) {
3218
var w = new(bytes.Buffer)
33-
err := writeGoroutineStacks(w)
34-
assert.NoError(t, err)
35-
t.Log(w.String())
36-
wg.Wait()
19+
require.NoError(t, writeGoroutineStacks(w))
20+
_, err := parseGoRoutineStacks(w, nil)
21+
require.NoError(t, err)
3722
})
3823

39-
t.Run("parseGoRoutineStacks(...)", func(t *testing.T) {
40-
ctx := context.Background()
41-
var wg = new(sync.WaitGroup)
42-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
43-
defer cancel()
24+
t.Run("GoRoutineRun", func(t *testing.T) {
25+
ctx, cancel := context.WithCancel(context.TODO())
26+
t.Cleanup(cancel)
27+
m := NewGoRoutines(ctx)
4428

45-
NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
46-
wg.Add(1)
47-
<-ctx.Done()
48-
wg.Done()
29+
m.Run(context.TODO(), "test_goroutine_run", func(ctx context.Context) {
30+
time.Sleep(1 * time.Second)
4931
})
5032

51-
var w = new(bytes.Buffer)
52-
err := writeGoroutineStacks(w)
53-
assert.NoError(t, err)
33+
s := m.GoRoutine("test_goroutine_run")
34+
require.NotNil(t, s)
35+
require.True(t, s.Active)
36+
require.Len(t, m.GetStatus(), 1)
5437

55-
_, err = parseGoRoutineStacks(w, nil)
56-
assert.NoError(t, err)
57-
wg.Wait()
58-
})
38+
time.Sleep(1 * time.Second)
5939

60-
t.Run("GoRoutineLoop", func(t *testing.T) {
61-
ctx := context.Background()
62-
var wg = new(sync.WaitGroup)
63-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
64-
defer cancel()
40+
s = m.GoRoutine("test_goroutine_run")
41+
require.NotNil(t, s)
42+
require.False(t, s.Active)
43+
})
6544

45+
t.Run("GoRoutineRunCancel", func(t *testing.T) {
46+
ctx, cancel := context.WithCancel(context.TODO())
47+
t.Cleanup(cancel)
6648
m := NewGoRoutines(ctx)
67-
m.Run(ctx, "test_goroutine_loop", func(ctx context.Context) {
68-
wg.Add(1)
69-
s := m.GoRoutine("test_goroutine_loop")
70-
require.NotNil(t, s)
71-
require.True(t, s.Active)
49+
50+
ctxToCancelled, cancelRoutine := context.WithTimeout(context.TODO(), 5*time.Second)
51+
var cancelled bool
52+
m.Run(context.TODO(), "test_goroutine_run_cancel", func(ctx context.Context) {
7253
<-ctx.Done()
73-
wg.Done()
54+
cancelled = true
55+
cancelRoutine()
7456
})
7557

76-
s := m.GoRoutine("test_goroutine_loop")
77-
require.NotNil(t, s)
78-
require.Equal(t, 1, len(m.GetStatus()))
58+
require.False(t, cancelled)
59+
m.Stop("test_goroutine_run_cancel")
60+
<-ctxToCancelled.Done()
61+
require.True(t, cancelled)
62+
})
63+
64+
t.Run("GoRoutineRunWithRestart", func(t *testing.T) {
65+
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
66+
t.Cleanup(cancel)
67+
m := NewGoRoutines(ctx)
68+
69+
var count int
70+
m.RunWithRestart(context.TODO(), "test_goroutine_run_with_restart", func(ctx context.Context) {
71+
count++
72+
})
73+
74+
// the routine should have restart 1 time
75+
<-ctx.Done()
76+
require.Equal(t, 2, count)
7977
})
8078
}

sdk/namesgenerator/namesgenerator.go

-5
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,10 @@ package namesgenerator
55
import (
66
"fmt"
77
"math/rand"
8-
"time"
98

109
"github.com/ovh/cds/sdk/slug"
1110
)
1211

13-
func init() {
14-
rand.Seed(time.Now().UTC().UnixNano())
15-
}
16-
1712
var (
1813
left = [...]string{
1914
"admiring",

sdk/vcs/git/git_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func Test_gitCloneOverHTTPS(t *testing.T) {
8989
}
9090
for _, tt := range tests {
9191
os.RemoveAll(test.GetTestName(t))
92+
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
9293
out := new(bytes.Buffer)
9394
err := new(bytes.Buffer)
9495
tt.args.output = &OutputOpts{
@@ -142,6 +143,7 @@ func Test_gitCloneOverSSH(t *testing.T) {
142143
}
143144

144145
os.RemoveAll(test.GetTestName(t))
146+
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
145147
out := new(bytes.Buffer)
146148
err := new(bytes.Buffer)
147149
tt.args.output = &OutputOpts{
@@ -231,6 +233,7 @@ func Test_gitCommand(t *testing.T) {
231233
}
232234
for _, tt := range tests {
233235
os.RemoveAll(test.GetTestName(t))
236+
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
234237
os.MkdirAll(test.GetTestName(t), os.FileMode(0755))
235238
if _, got, _ := prepareGitCloneCommands(tt.args.repo, test.GetTestName(t), tt.args.path, tt.args.opts); !reflect.DeepEqual(got.Strings(), tt.want) {
236239
t.Errorf("%q. gitCloneCommand() = %v, want %v", tt.name, got, tt.want)

tests/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ test-docker-compose:
8080
./test.sh smoke_services cli;
8181

8282
merge-coverage:
83-
@docker run -v `pwd`:/workspace golang:1.16 sh -c "\
83+
@docker run -v `pwd`:/workspace golang:1.21 sh -c "\
8484
go get -u github.com/wadey/gocovmerge && \
8585
cd /workspace && \
8686
gocovmerge $(COVER_FILES) > /workspace/cdsctl.cover.out \

tests/test.sh

+1-2
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,7 @@ for target in $@; do
268268
export AWS_ACCESS_KEY_ID
269269
export AWS_SECRET_ACCESS_KEY
270270
export AWS_ENDPOINT_URL
271-
workflow_with_integration_tests
272-
admin_tests;;
271+
workflow_with_integration_tests;;
273272
workflow_with_third_parties)
274273
export CDS_REGION_REQ
275274
workflow_with_third_parties;;

0 commit comments

Comments
 (0)