Skip to content

Commit

Permalink
fix(api): try to fix lock between workers and jobs (#6689)
Browse files Browse the repository at this point in the history
Signed-off-by: François Samin <francois.samin@ovhcloud.com>
  • Loading branch information
fsamin authored Nov 17, 2023
1 parent 2c6b3d0 commit e94d2eb
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 20 deletions.
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -850,7 +851,9 @@ func (a *API) Serve(ctx context.Context) error {
a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
})

a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunJobDeletion", func(ctx context.Context) {
a.WorkflowRunJobDeletion(ctx, time.Duration(10*rand.Float64())*time.Second, 10)
})
a.GoRoutines.RunWithRestart(ctx, "api.V2WorkflowRunCraft", func(ctx context.Context) {
a.V2WorkflowRunCraft(ctx, 10*time.Second)
})
Expand Down
14 changes: 7 additions & 7 deletions engine/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package api

import (
"context"
"github.com/ovh/cds/engine/api/link"
"github.com/ovh/cds/engine/api/organization"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/api/authentication/builtin"
"github.com/ovh/cds/engine/api/authentication/local"
authdrivertest "github.com/ovh/cds/engine/api/authentication/test"
"github.com/ovh/cds/engine/api/bootstrap"
"github.com/ovh/cds/engine/api/link"
"github.com/ovh/cds/engine/api/organization"
apiTest "github.com/ovh/cds/engine/api/test"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"

"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
)

func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.FakeTransaction, *Router) {
Expand Down Expand Up @@ -63,7 +63,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak
// Clean all the pending crafting workflow runs
lockKey := cache.Key("api:workflowRunCraft")
require.NoError(t, store.DeleteAll(lockKey))
ids, _ := workflow.LoadCratingWorkflowRunIDs(api.mustDB())
ids, _ := workflow.LoadCraftingWorkflowRunIDs(api.mustDB())
for _, id := range ids {
require.NoError(t, workflow.UpdateCraftedWorkflowRun(api.mustDB(), id))
}
Expand Down
34 changes: 28 additions & 6 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func LoadNodeJobRunIDByNodeRunID(db gorp.SqlExecutor, runNodeID int64) ([]int64,
return ids, nil
}

//LoadNodeJobRun load a NodeJobRun given its ID
// LoadNodeJobRun load a NodeJobRun given its ID
func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1`
Expand All @@ -252,7 +252,7 @@ func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
return &jr, nil
}

//LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker
// LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker
func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store) ([]sdk.WorkflowNodeJobRun, error) {
var deadJobsDB []JobRun
query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE worker_id IS NULL`
Expand All @@ -279,7 +279,29 @@ func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.St
return deadJobs, nil
}

//LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID
func LoadAndLockTerminatedNodeJobRun(ctx context.Context, db gorp.SqlExecutor, limit int) ([]sdk.WorkflowNodeJobRun, error) {
var terminatedJobsDB []JobRun
query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE status IN ($1, $2, $3) ORDER BY id ASC LIMIT $4 FOR UPDATE SKIP LOCKED`
if _, err := db.Select(&terminatedJobsDB, query, sdk.StatusStopped, sdk.StatusSuccess, sdk.StatusFail, limit); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}

deadJobs := make([]sdk.WorkflowNodeJobRun, len(terminatedJobsDB))
for i, deadJob := range terminatedJobsDB {
jr, err := deadJob.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
deadJobs[i] = jr
}

return deadJobs, nil
}

// LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1 for update`
Expand All @@ -294,7 +316,7 @@ func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store c
return &jr, nil
}

//LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID
// LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunSkipLocked(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
var end func()
_, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeJobRunSkipLocked")
Expand Down Expand Up @@ -329,7 +351,7 @@ func insertWorkflowNodeJobRun(db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) er
return nil
}

//DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run
// DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run
func DeleteNodeJobRuns(db gorp.SqlExecutor, nodeID int64) error {
query := `delete from workflow_node_run_job where workflow_node_run_id = $1`
_, err := db.Exec(query, nodeID)
Expand All @@ -343,7 +365,7 @@ func DeleteNodeJobRun(db gorp.SqlExecutor, nodeRunJob int64) error {
return err
}

//UpdateNodeJobRun updates a workflow_node_run_job
// UpdateNodeJobRun updates a workflow_node_run_job
func UpdateNodeJobRun(ctx context.Context, db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) error {
var end func()
_, end = telemetry.Span(ctx, "workflow.UpdateNodeJobRun")
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
func LoadCraftingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
query := `
SELECT id
FROM workflow_run
Expand Down
5 changes: 1 addition & 4 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,7 @@ func executeNodeRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store
}
// End of temporary debug

// Delete the line in workflow_node_run_job
if err := DeleteNodeJobRuns(db, workflowNodeRun.ID); err != nil {
return nil, sdk.WrapError(err, "unable to delete node %d job runs", workflowNodeRun.ID)
}
// Delete the line in workflow_node_run_job is done asynchronously in a goroutine at api level

// If current node has a mutex, we want to trigger another node run that can be waiting for the mutex
node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID)
Expand Down
54 changes: 53 additions & 1 deletion engine/api/workflow_run_craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (api *API) WorkflowRunCraft(ctx context.Context, tick time.Duration) {
}
return
case <-ticker.C:
ids, err := workflow.LoadCratingWorkflowRunIDs(api.mustDB())
ids, err := workflow.LoadCraftingWorkflowRunIDs(api.mustDB())
if err != nil {
log.Error(ctx, "WorkflowRunCraft> unable to start tx: %v", err)
continue
Expand Down Expand Up @@ -166,3 +166,55 @@ func (api *API) workflowRunCraft(ctx context.Context, id int64) error {

return workflow.UpdateCraftedWorkflowRun(api.mustDB(), run.ID)
}

func (api *API) WorkflowRunJobDeletion(ctx context.Context, tick time.Duration, limit int) {
ticker := time.NewTicker(tick)
defer ticker.Stop()

mainLoop:
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "%v", ctx.Err())
}
return
case <-ticker.C:
tx, err := api.mustDB().Begin()
if err != nil {
log.ErrorWithStackTrace(ctx, err)
continue
}
jobs, err := workflow.LoadAndLockTerminatedNodeJobRun(ctx, tx, limit)
if err != nil {
log.Error(ctx, "WorkflowRunJobDeletion> unable to start tx: %v", err)
_ = tx.Rollback()
continue
}
for i := range jobs {
j := &jobs[i]
node, err := workflow.LoadNodeRunByID(ctx, tx, j.WorkflowNodeRunID, workflow.LoadRunOptions{})
if err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to load NodeRun %d", j.WorkflowNodeRunID))
_ = tx.Rollback()
continue mainLoop
}

if !sdk.StatusIsTerminated(node.Status) {
continue
}

if err := workflow.DeleteNodeJobRun(tx, j.ID); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to delete WorkflowNodeJobRun %d", j.ID))
_ = tx.Rollback()
continue mainLoop
}
}
if err := tx.Commit(); err != nil {
log.Error(ctx, "WorkflowRunJobDeletion> unable to commit tx: %v", err)
_ = tx.Rollback()
continue
}
}
}
}

0 comments on commit e94d2eb

Please sign in to comment.