Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task创建流程优化 #219

Merged
merged 6 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/backend/booster/server/pkg/api/v1/dcc/distcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func ApplyResource(req *restful.Request, resp *restful.Response) {
Extra: string(extraData),
})
if err != nil {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
if err == engine.ErrorProjectNoFound {
blog.Warnf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
} else {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
}
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrApplyResourceFailed, Message: err.Error()})
return
}
Expand Down
8 changes: 6 additions & 2 deletions src/backend/booster/server/pkg/api/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func ApplyResource(req *restful.Request, resp *restful.Response) {

tb, err := defaultManager.CreateTask(param)
if err != nil {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
if err == engine.ErrorProjectNoFound {
blog.Warnf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
} else {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
}
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrApplyResourceFailed, Message: err.Error()})
return
}
Expand Down Expand Up @@ -89,7 +93,7 @@ func SendMessage(req *restful.Request, resp *restful.Response) {
return
}
if data, err = defaultManager.SendProjectMessage(param.ProjectID, []byte(param.Extra)); err != nil {
blog.Errorf("send message: send project(%s) message to engine failed, url(%s) message(%s): %v",
blog.Warnf("send message: send project(%s) message to engine failed, url(%s) message(%s): %v",
param.ProjectID, req.Request.URL.String(), param.Extra, err)
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrSendMessageFailed, Message: err.Error()})
return
Expand Down
64 changes: 58 additions & 6 deletions src/backend/booster/server/pkg/manager/normal/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type TaskBasicLayer interface {
// list task basic from cache, return a new pointer
ListTaskBasic(released bool, statusList ...engine.TaskStatusType) ([]*engine.TaskBasic, error)

// init task basic, create task basic table in database
InitTaskBasic(tb *engine.TaskBasic) error
// create task basic, create task basic table in database
CreateTaskBasic(tb *engine.TaskBasic) error

//insert task basic to cache first time
InsertTB(tb *engine.TaskBasic) error

//delete task basic from cache
DeleteTB(tb *engine.TaskBasic)

// update task basic, both database and cache, just update the field implements in task basic
UpdateTaskBasic(tb *engine.TaskBasic) error
Expand Down Expand Up @@ -237,9 +243,23 @@ func (tc *taskBasicLayer) ListTaskBasic(
return rl, nil
}

// InitTaskBasic init a task basic into layer cache and databases.
func (tc *taskBasicLayer) InitTaskBasic(tb *engine.TaskBasic) error {
return tc.updateTaskBasic(tb, true)
// CreateTaskBasic create a task basic in databases.
func (tc *taskBasicLayer) CreateTaskBasic(tb *engine.TaskBasic) error {
egn, err := tc.GetEngineByTypeName(tb.Client.EngineName)
if err != nil {
blog.Errorf("layer: try updating task basic(%s), get engine(%s) failed: %v", tb.ID, tb.Client.EngineName, err)
return err
}
err = engine.CreateTaskBasic(egn, tb)

if err != nil {
blog.Errorf("layer: update task basic(%s) via engine(%s) failed: %v", tb.ID, tb.Client.EngineName, err)
return err

}
blog.Infof("layer: success to init task basic(%s) in status(%s) with engine(%s) and queue(%s)",
tb.ID, tb.Status.Status, tb.Client.EngineName, tb.Client.QueueName)
return nil
}

// UpdateTaskBasic update a existing task basic into layer cache and databases.
Expand Down Expand Up @@ -337,11 +357,43 @@ func (tc *taskBasicLayer) updateTaskBasic(tbRaw *engine.TaskBasic, new bool) err
return nil
}

// InsertTB create a new record of init task in cache, do not need to create in queue
func (tc *taskBasicLayer) InsertTB(tbRaw *engine.TaskBasic) error {
tb := engine.CopyTaskBasic(tbRaw)
blog.Infof("layer: going to insertTB(%s) status(%s) to cache", tb.ID, tb.Status.Status)
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()
if tb.Status.Status != engine.TaskStatusInit {
return fmt.Errorf("taskId %s is not init status,can not insert into cache", tb.ID)
}
if _, ok := tc.tbm[tb.ID]; ok {
return fmt.Errorf("taskId %s already exist in cache", tb.ID)
}
selfMetric.TaskNumController.Inc(
tb.Client.EngineName.String(), tb.Client.QueueName, string(tb.Status.Status), "")
tc.tbm[tb.ID] = tb
return nil
}

// DeleteTB delete task from cache and queue if task exsited
func (tc *taskBasicLayer) DeleteTB(tb *engine.TaskBasic) {
blog.Infof("layer: going to deleteTB(%s) status(%s) from cache and queue", tb.ID, tb.Status.Status)
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()

if oldTask, ok := tc.tbm[tb.ID]; ok {
selfMetric.TaskNumController.Dec(
tb.Client.EngineName.String(), oldTask.Client.QueueName, string(oldTask.Status.Status), "")
}
tc.deleteTBFromQueue(tb)
delete(tc.tbm, tb.ID)
}

func (tc *taskBasicLayer) putTB(tb *engine.TaskBasic) {
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()

blog.Debugf("layer: get lock and going to putTB(%s) to cache and queue", tb.ID)
blog.Debugf("layer: get lock and going to putTB(%s) status(%s) to cache and queue", tb.ID, tb.Status.Status)

// update metric data of task num
// decrease last status num and add current status num, if the status is same as last one, then do nothing
Expand Down
48 changes: 29 additions & 19 deletions src/backend/booster/server/pkg/manager/normal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err

pb, egn, err := m.getBasicProject(param.ProjectID)
if err != nil {
blog.Errorf("manager: try creating task, get project(%s) failed: %v", param.ProjectID, err)
blog.Warnf("manager: try creating task, get project(%s) failed: %v", param.ProjectID, err)
return nil, err
}

Expand All @@ -332,18 +332,19 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err

// lock project when creating task for controlling concurrency
m.layer.LockProject(param.ProjectID)
defer m.layer.UnLockProject(param.ProjectID)

if err = m.invalidConcurrency(pb); err != nil {
blog.Errorf("manager: try creating task, check concurrency for project(%s) in engine(%s) failed: %v",
param.ProjectID, pb.EngineName.String(), err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}

taskID, err := m.generateTaskID(egn, param.ProjectID)
if err != nil {
blog.Errorf("manager: try creating task, generate taskID for project(%s) in engine(%s) failed: %v",
param.ProjectID, pb.EngineName.String(), err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}

Expand Down Expand Up @@ -379,14 +380,36 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err
}
if err = tb.Check(); err != nil {
blog.Errorf("manager: create task basic(%s) check failed: %v", taskID, err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}
tb.Status.Init()
tb.Status.Message = messageTaskInit
//creat task to cache, if task exsited, return error
if err = m.layer.InsertTB(tb); err != nil {
blog.Errorf("manager: create task basic(%s) insert db failed: %v", taskID, err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}
m.layer.UnLockProject(param.ProjectID)

ok, err = engine.CheckTaskIDValid(egn, taskID)
if !ok {
if err == nil {
err = fmt.Errorf("task %s is already exsit in db", taskID)
}
blog.Errorf("manager: check task valid(%s) for project(%s) in engine(%s) failed: %v",
taskID, param.ProjectID, pb.EngineName.String(), err)
//check task id failed, now task not in db, delete task from cache directly
m.layer.DeleteTB(tb)
return nil, err
}

if err = m.layer.InitTaskBasic(tb); err != nil {
if err = m.layer.CreateTaskBasic(tb); err != nil {
blog.Errorf("manager: create task basic(%s) for project(%s) in engine(%s) failed: %v",
taskID, param.ProjectID, pb.EngineName.String(), err)
//insert task to db failed, delete task from cache directly
m.layer.DeleteTB(tb)
return nil, err
}
if err = egn.CreateTaskExtension(tb, []byte(param.Extra)); err != nil {
Expand Down Expand Up @@ -417,7 +440,7 @@ func (m *manager) sendProjectMessage(projectID string, data []byte) ([]byte, err

_, egn, err := m.getBasicProject(projectID)
if err != nil {
blog.Errorf("manager: try sending project message, get project(%s) failed: %v", projectID, err)
blog.Warnf("manager: try sending project message, get project(%s) failed: %v", projectID, err)
return nil, err
}

Expand Down Expand Up @@ -535,7 +558,7 @@ func (m *manager) getBasicProject(projectID string) (*engine.ProjectBasic, engin
return pb, egn, nil
}

blog.Errorf("manager: get project(%s) no found", projectID)
blog.Warnf("manager: get project(%s) no found", projectID)
return nil, nil, engine.ErrorProjectNoFound
}

Expand Down Expand Up @@ -619,20 +642,7 @@ func (m *manager) invalidConcurrency(pb *engine.ProjectBasic) error {
}

func (m *manager) generateTaskID(egn engine.Engine, projectID string) (string, error) {
for i := 0; i < 3; i++ {
taskID := generateTaskID(egn.Name().String(), projectID)

ok, err := engine.CheckTaskIDValid(egn, taskID)
if err != nil {
return "", err
}

if ok {
return taskID, nil
}
}

return "", types.ErrorGenerateTaskIDFailed
return generateTaskID(egn.Name().String(), projectID), nil
}

func generateTaskID(egnName string, projectID string) string {
Expand Down
2 changes: 1 addition & 1 deletion src/backend/booster/server/pkg/manager/normal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
trackerCheckGapTime = 1 * time.Second
trackerTrackGapTime = 1 * time.Second

keeperHealthCheckGapTime = 10 * time.Second
keeperHealthCheckGapTime = 5 * time.Second
keeperFirstStartGraceTime = 1 * time.Minute
keeperInitTimeout = 20 * time.Second
keeperStartingTimeout = 120 * time.Second
Expand Down
Loading