Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): allow to stop goroutine #6754

Merged
merged 1 commit into from
Dec 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions engine/hooks/hooks_handlers.go
Original file line number Diff line number Diff line change
@@ -547,9 +547,9 @@ func (s *Service) deleteTaskBulkHandler() service.Handler {
return sdk.WithStack(err)
}

for uuid := range hooks {
for _, h := range hooks {
//Load the task
t := s.Dao.FindTask(ctx, uuid)
t := s.Dao.FindTask(ctx, h.UUID)
if t == nil {
continue
}
11 changes: 8 additions & 3 deletions engine/hooks/kafka.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (

"github.com/Shopify/sarama"
"github.com/fsamin/go-dump"
"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/sdk"
@@ -29,6 +30,10 @@ func (s *Service) saveKafkaExecution(t *sdk.Task, error string, nbError int64) {
s.Dao.SaveTaskExecution(exec)
}

func (s *Service) stopKafkaHook(t *sdk.Task) {
s.GoRoutines.Stop("kafka-consume-" + t.UUID)
}

func (s *Service) startKafkaHook(ctx context.Context, t *sdk.Task) error {
var kafkaIntegration, projectKey, topic string
for k, v := range t.Config {
@@ -96,12 +101,12 @@ func (s *Service) startKafkaHook(ctx context.Context, t *sdk.Task) error {
dao: &s.Dao,
}

s.GoRoutines.Exec(context.Background(), "kafka-consume-"+topic, func(ctx context.Context) {
s.GoRoutines.Run(s.Router.Background, "kafka-consume-"+t.UUID, func(ctx context.Context) {
atomic.AddInt64(&nbKafkaConsumers, 1)
defer atomic.AddInt64(&nbKafkaConsumers, -1)
for {
for ctx.Err() == nil {
if err := consumerGroup.Consume(ctx, []string{topic}, h); err != nil {
log.Error(ctx, "error on consume:%s", err)
log.ErrorWithStackTrace(ctx, errors.WithMessage(err, "error on consume"))
}
}
})
8 changes: 6 additions & 2 deletions engine/hooks/tasks.go
Original file line number Diff line number Diff line change
@@ -382,15 +382,19 @@ func (s *Service) stopTask(ctx context.Context, t *sdk.Task) error {
}

switch t.Type {
case TypeWebHook, TypeScheduler, TypeRepoManagerWebHook, TypeRepoPoller, TypeKafka, TypeWorkflowHook:
case TypeWebHook, TypeScheduler, TypeRepoManagerWebHook, TypeRepoPoller, TypeWorkflowHook:
log.Debug(ctx, "Hooks> Tasks %s has been stopped", t.UUID)
return nil
case TypeKafka:
s.stopKafkaHook(t)
log.Debug(ctx, "Hooks> Kafka Task %s has been stopped", t.UUID)
return nil
case TypeGerrit:
s.stopGerritHookTask(t)
log.Debug(ctx, "Hooks> Gerrit Task %s has been stopped", t.UUID)
return nil
default:
return fmt.Errorf("Unsupported task type %s", t.Type)
return fmt.Errorf("unsupported task type %s", t.Type)
}
}

42 changes: 39 additions & 3 deletions sdk/goroutine.go
Original file line number Diff line number Diff line change
@@ -25,10 +25,12 @@ import (

type GoRoutine struct {
ctx context.Context
cancel func()
Name string
Func func(ctx context.Context)
Restart bool
Active bool
mutex sync.RWMutex
}

// GoRoutines contains list of routines that have to stay up
@@ -48,6 +50,20 @@ func (m *GoRoutines) GoRoutine(name string) *GoRoutine {
return nil
}

func (m *GoRoutines) Stop(name string) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i, g := range m.status {
if g.Name == name {
if g.cancel != nil {
g.cancel()
}
m.status = append(m.status[:i], m.status[i+1:]...)
break
}
}
}

// NewGoRoutines instanciates a new GoRoutineManager
func NewGoRoutines(ctx context.Context) *GoRoutines {
m := &GoRoutines{}
@@ -63,6 +79,12 @@ func (m *GoRoutines) restartGoRoutines(ctx context.Context) {
for {
select {
case <-ctx.Done():
for _, g := range m.status {
if g.cancel != nil {
g.cancel()
}
}
m.status = nil
return
case <-t.C:
m.runRestartGoRoutines(ctx)
@@ -74,7 +96,10 @@ func (m *GoRoutines) runRestartGoRoutines(ctx context.Context) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, g := range m.status {
if !g.Active && g.Restart {
g.mutex.RLock()
active := g.Active
g.mutex.RUnlock()
if !active && g.Restart {
log.Info(ctx, "restarting goroutine %q", g.Name)
m.exec(g)
}
@@ -83,10 +108,12 @@ func (m *GoRoutines) runRestartGoRoutines(ctx context.Context) {

// Run runs the function within a goroutine with a panic recovery, and keep GoRoutine status.
func (m *GoRoutines) Run(c context.Context, name string, fn func(ctx context.Context)) {
ctx, cancel := context.WithCancel(c)
m.mutex.Lock()
defer m.mutex.Unlock()
g := &GoRoutine{
ctx: c,
ctx: ctx,
cancel: cancel,
Name: name,
Func: fn,
Active: true,
@@ -99,10 +126,12 @@ func (m *GoRoutines) Run(c context.Context, name string, fn func(ctx context.Con
// RunWithRestart runs the function within a goroutine with a panic recovery, and keep GoRoutine status.
// if the goroutine is stopped, it will ne restarted
func (m *GoRoutines) RunWithRestart(c context.Context, name string, fn func(ctx context.Context)) {
ctx, cancel := context.WithCancel(c)
m.mutex.Lock()
defer m.mutex.Unlock()
g := &GoRoutine{
ctx: c,
ctx: ctx,
cancel: cancel,
Name: name,
Func: fn,
Active: true,
@@ -121,10 +150,12 @@ func (m *GoRoutines) GetStatus() []MonitoringStatusLine {
for _, g := range m.status {
status := MonitoringStatusAlert
value := "NOT running"
g.mutex.RLock()
if g.Active {
status = MonitoringStatusOK
value = "Running"
}
g.mutex.RUnlock()
lines[i] = MonitoringStatusLine{
Status: status,
Component: "goroutine/" + g.Name,
@@ -137,6 +168,7 @@ func (m *GoRoutines) GetStatus() []MonitoringStatusLine {

func (m *GoRoutines) exec(g *GoRoutine) {
hostname, _ := os.Hostname()

go func(ctx context.Context) {
ctx = context.WithValue(ctx, cdslog.Goroutine, g.Name)

@@ -155,10 +187,14 @@ func (m *GoRoutines) exec(g *GoRoutine) {
ctx = context.WithValue(ctx, cdslog.Stacktrace, string(buf))
log.Error(ctx, "[PANIC][%s] %s failed", hostname, g.Name)
}
g.mutex.Lock()
g.Active = false
g.mutex.Unlock()
}()

g.mutex.Lock()
g.Active = true
g.mutex.Unlock()
g.Func(goroutineCtx)
}(g.ctx)
}
98 changes: 48 additions & 50 deletions sdk/goroutine_test.go
Original file line number Diff line number Diff line change
@@ -3,78 +3,76 @@ package sdk
import (
"bytes"
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_GoroutineTools(t *testing.T) {
t.Run("GoroutineID()", func(t *testing.T) {
id := GoroutineID()
var zero uint64
assert.NotEqual(t, zero, id)
require.NotEqual(t, uint64(0), GoroutineID())
})

t.Run("writeGoroutineStacks(...)", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
wg.Add(1)
<-ctx.Done()
wg.Done()
})

t.Run("GoRoutineStacks(...)", func(t *testing.T) {
var w = new(bytes.Buffer)
err := writeGoroutineStacks(w)
assert.NoError(t, err)
t.Log(w.String())
wg.Wait()
require.NoError(t, writeGoroutineStacks(w))
_, err := parseGoRoutineStacks(w, nil)
require.NoError(t, err)
})

t.Run("parseGoRoutineStacks(...)", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
t.Run("GoRoutineRun", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
m := NewGoRoutines(ctx)

NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
wg.Add(1)
<-ctx.Done()
wg.Done()
m.Run(context.TODO(), "test_goroutine_run", func(ctx context.Context) {
time.Sleep(1 * time.Second)
})

var w = new(bytes.Buffer)
err := writeGoroutineStacks(w)
assert.NoError(t, err)
s := m.GoRoutine("test_goroutine_run")
require.NotNil(t, s)
require.True(t, s.Active)
require.Len(t, m.GetStatus(), 1)

_, err = parseGoRoutineStacks(w, nil)
assert.NoError(t, err)
wg.Wait()
})
time.Sleep(1 * time.Second)

t.Run("GoRoutineLoop", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
s = m.GoRoutine("test_goroutine_run")
require.NotNil(t, s)
require.False(t, s.Active)
})

t.Run("GoRoutineRunCancel", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
m := NewGoRoutines(ctx)
m.Run(ctx, "test_goroutine_loop", func(ctx context.Context) {
wg.Add(1)
s := m.GoRoutine("test_goroutine_loop")
require.NotNil(t, s)
require.True(t, s.Active)

ctxToCancelled, cancelRoutine := context.WithTimeout(context.TODO(), 5*time.Second)
var cancelled bool
m.Run(context.TODO(), "test_goroutine_run_cancel", func(ctx context.Context) {
<-ctx.Done()
wg.Done()
cancelled = true
cancelRoutine()
})

s := m.GoRoutine("test_goroutine_loop")
require.NotNil(t, s)
require.Equal(t, 1, len(m.GetStatus()))
require.False(t, cancelled)
m.Stop("test_goroutine_run_cancel")
<-ctxToCancelled.Done()
require.True(t, cancelled)
})

t.Run("GoRoutineRunWithRestart", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
t.Cleanup(cancel)
m := NewGoRoutines(ctx)

var count int
m.RunWithRestart(context.TODO(), "test_goroutine_run_with_restart", func(ctx context.Context) {
count++
})

// the routine should have restart 1 time
<-ctx.Done()
require.Equal(t, 2, count)
})
}
5 changes: 0 additions & 5 deletions sdk/namesgenerator/namesgenerator.go
Original file line number Diff line number Diff line change
@@ -5,15 +5,10 @@ package namesgenerator
import (
"fmt"
"math/rand"
"time"

"github.com/ovh/cds/sdk/slug"
)

func init() {
rand.Seed(time.Now().UTC().UnixNano())
}

var (
left = [...]string{
"admiring",
3 changes: 3 additions & 0 deletions sdk/vcs/git/git_test.go
Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@ func Test_gitCloneOverHTTPS(t *testing.T) {
}
for _, tt := range tests {
os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
out := new(bytes.Buffer)
err := new(bytes.Buffer)
tt.args.output = &OutputOpts{
@@ -142,6 +143,7 @@ func Test_gitCloneOverSSH(t *testing.T) {
}

os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
out := new(bytes.Buffer)
err := new(bytes.Buffer)
tt.args.output = &OutputOpts{
@@ -231,6 +233,7 @@ func Test_gitCommand(t *testing.T) {
}
for _, tt := range tests {
os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
os.MkdirAll(test.GetTestName(t), os.FileMode(0755))
if _, got, _ := prepareGitCloneCommands(tt.args.repo, test.GetTestName(t), tt.args.path, tt.args.opts); !reflect.DeepEqual(got.Strings(), tt.want) {
t.Errorf("%q. gitCloneCommand() = %v, want %v", tt.name, got, tt.want)
2 changes: 1 addition & 1 deletion tests/Makefile
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ test-docker-compose:
./test.sh smoke_services cli;

merge-coverage:
@docker run -v `pwd`:/workspace golang:1.16 sh -c "\
@docker run -v `pwd`:/workspace golang:1.21 sh -c "\
go get -u github.com/wadey/gocovmerge && \
cd /workspace && \
gocovmerge $(COVER_FILES) > /workspace/cdsctl.cover.out \
3 changes: 1 addition & 2 deletions tests/test.sh
Original file line number Diff line number Diff line change
@@ -268,8 +268,7 @@ for target in $@; do
export AWS_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY
export AWS_ENDPOINT_URL
workflow_with_integration_tests
admin_tests;;
workflow_with_integration_tests;;
workflow_with_third_parties)
export CDS_REGION_REQ
workflow_with_third_parties;;