Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing PodStatusHooks #78

Merged
merged 10 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ launchtask:
httptimeout: 20000
plugins:
pluginorder: general
podStatusHooks:
task_failed: []
task_finished: []
task_running: []
cleanpod:
cleanfailtask: false
timeout: 20
Expand Down
7 changes: 4 additions & 3 deletions dce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
"github.com/paypal/dce-go/config"
"github.com/paypal/dce-go/dce/monitor"
"github.com/paypal/dce-go/plugin"
_ "github.com/paypal/dce-go/plugin/example"
_ "github.com/paypal/dce-go/plugin/general"

_ "github.com/paypal/dce-go/pluginimpl/example"
_ "github.com/paypal/dce-go/pluginimpl/general"
"github.com/paypal/dce-go/types"
"github.com/paypal/dce-go/utils"
fileUtils "github.com/paypal/dce-go/utils/file"
Expand Down Expand Up @@ -82,6 +81,8 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
"pool": pod.GetLabel("pool", taskInfo),
})

go pod.ListenOnTaskStatus(driver, taskInfo)

task, err := json.Marshal(taskInfo)
if err != nil {
log.Println("Error marshalling taskInfo", err.Error())
Expand Down
6 changes: 3 additions & 3 deletions docs/how-to-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ PreLaunchTask and PostLaunchTask have Context object as first parameter. This is
Below sample code illustrates a sample plugin loading parsed compose file from context in an object of ServiceDetail type.

##### Sample Plugin implementation
Example plugin implementation can be found [here](../plugin/example/impl.go)
Example plugin implementation can be found [here](../pluginimpl/example/impl.go)
```
func (ex *exampleExt) PreLaunchTask(ctx *context.Context, composeFiles *[]string, executorId string, taskInfo *mesos.TaskInfo) error {
logger.Println("PreLaunchTask Starting")
Expand Down Expand Up @@ -156,7 +156,7 @@ func (ex *exampleExt) Shutdown(executor.ExecutorDriver) error {
```
Here is an example.
```
_ "github.com/paypal/dce-go/plugin/example"
_ "github.com/paypal/dce-go/pluginimpl/example"
```
3. Last but not least, an order of plugins need to be specified in the main configuration file([config.yaml](../config/config.yaml)). Plugins are automatically invoked in that order at appropriate executor callback interface .

Expand Down Expand Up @@ -361,7 +361,7 @@ healthcheck: health check is enabled or not in your plugins. (Required)
foldername: folder to keep temporary files generated by plugins. (Optional, default folder name is poddata)
-->

##### General Plugin configuration file(plugin/general/general.yaml)
##### General Plugin configuration file(pluginimpl/general/general.yaml)
Individual Plugin (such as General Plugin) configuration file caters to plugin relevant information. See details below:

```
Expand Down
68 changes: 54 additions & 14 deletions plugin/extpoints.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// generated by go-extpoints -- DO NOT EDIT
package plugin

Expand Down Expand Up @@ -69,6 +55,7 @@ func UnregisterExtension(name string) []string {
return ifaces
}


// Base extension point

type extensionPoint struct {
Expand Down Expand Up @@ -189,3 +176,56 @@ func (ep *composePluginExt) Names() []string {
}
return names
}


// PodStatusHook

var PodStatusHooks = &podStatusHookExt{
newExtensionPoint(new(PodStatusHook)),
}

type podStatusHookExt struct {
*extensionPoint
}

func (ep *podStatusHookExt) Unregister(name string) bool {
return ep.unregister(name)
}

func (ep *podStatusHookExt) Register(extension PodStatusHook, name string) bool {
return ep.register(extension, name)
}

func (ep *podStatusHookExt) Lookup(name string) PodStatusHook {
ext := ep.lookup(name)
if ext == nil {
return nil
}
return ext.(PodStatusHook)
}

func (ep *podStatusHookExt) Select(names []string) []PodStatusHook {
var selected []PodStatusHook
for _, name := range names {
selected = append(selected, ep.Lookup(name))
}
return selected
}

func (ep *podStatusHookExt) All() map[string]PodStatusHook {
all := make(map[string]PodStatusHook)
for k, v := range ep.all() {
all[k] = v.(PodStatusHook)
}
return all
}

func (ep *podStatusHookExt) Names() []string {
var names []string
for k := range ep.all() {
names = append(names, k)
}
return names
}


11 changes: 10 additions & 1 deletion plugin/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* limitations under the License.
*/

//go:generate go-extpoints . ComposePlugin
//go:generate go-extpoints . ComposePlugin PodStatusHook
package plugin

import (
Expand All @@ -30,3 +30,12 @@ type ComposePlugin interface {
PostKillTask(taskInfo *mesos.TaskInfo) error
Shutdown(executor.ExecutorDriver) error
}

// PodStatusHook allows custom implementations to be plugged when a Pod (mesos task) status changes. Currently this is
// designed to be executed on task status changes during LaunchTask.
type PodStatusHook interface {
// Execute is invoked when the pod.taskStatusCh channel has a new status. It returns an error on failure,
// and also a flag "failExec" indicating if the error needs to fail the execution when a series of hooks are executed
// This is to support cases where a few hooks can be executed in a best effort manner and need not fail the executor
Execute(podStatus string, data interface{}) (failExec bool, err error)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion plugin/general/impl.go → pluginimpl/general/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (gp *generalExt) PreKillTask(taskInfo *mesos.TaskInfo) error {
func (gp *generalExt) PostKillTask(taskInfo *mesos.TaskInfo) error {
logger.Println("PostKillTask begin, pod status:", pod.GetPodStatus())
var err error
if !pod.IsPodLaunched {
if !pod.LaunchCmdAttempted {
logger.Println("Pod hasn't started, no postKill work needed.")
return nil
}
Expand Down
File renamed without changes.
File renamed without changes.
135 changes: 122 additions & 13 deletions utils/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bufio"
"container/list"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
Expand All @@ -27,6 +26,8 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/mesos/mesos-go/executor"
mesos "github.com/mesos/mesos-go/mesosproto"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -59,7 +60,13 @@ var PluginOrder []string
var HealthCheckListId = make(map[string]bool)
var MonitorContainerList []string
var SinglePort bool
var IsPodLaunched = false

// LaunchCmdAttempted indicates that an attempt to run the command to launch the pod (docker compose up with params) was
// made. This does not indicate that the result of the command execution.
var LaunchCmdAttempted = false

// taskStatusCh is pushed with the task status sent to Mesos, so any custom pod task status hooks can be executed
var taskStatusCh = make(chan string, 1)

// Check exit code of all the containers in the pod.
// If all the exit codes are zero, then assign zero as pod's exit code,
Expand Down Expand Up @@ -306,8 +313,8 @@ func LaunchPod(files []string) types.PodStatus {
go dockerLogToPodLogFile(files, true)

err = cmd.Run()
IsPodLaunched = true
log.Println("Updated the state of IsPodLaunched to true.")
LaunchCmdAttempted = true
log.Println("Updated the state of LaunchCmdAttempted to true.")
if err != nil {
log.Printf("POD_LAUNCH_FAIL -- Error running launch task command : %v", err)
return types.POD_FAILED
Expand Down Expand Up @@ -381,7 +388,7 @@ func StopPod(files []string) error {

err = cmd.Run()
if err != nil {
logger.Errorf("POD_STOP_FAIL --", err.Error())
logger.Errorf("POD_STOP_FAIL -- %s", err.Error())
err = ForceKill(files)
if err != nil {
logger.Errorf("POD_STOP_FORCE_FAIL -- Error in force pod kill : %v", err)
Expand Down Expand Up @@ -799,6 +806,14 @@ func SetPodStatus(status types.PodStatus) {
log.Printf("Update Status : Update podStatus as %s", status)
}

// updatePodLaunched sets the CurPodStatus.Launched to true
func updatePodLaunched() {
CurPodStatus.Lock()
CurPodStatus.Launched = true
CurPodStatus.Unlock()
log.Printf("Updated Current Pod Status with Pod Launched ")
}

func SendPodStatus(status types.PodStatus) {
logger := log.WithFields(log.Fields{
"status": status,
Expand All @@ -814,6 +829,7 @@ func SendPodStatus(status types.PodStatus) {
logger.Println("Pod status:", status)
switch status {
case types.POD_RUNNING:
updatePodLaunched()
SendMesosStatus(ComposeExecutorDriver, ComposeTaskInfo.GetTaskId(), mesos.TaskState_TASK_RUNNING.Enum())
case types.POD_FINISHED:
// Stop pod after sending status to mesos
Expand All @@ -833,7 +849,7 @@ func SendPodStatus(status types.PodStatus) {
}
SendMesosStatus(ComposeExecutorDriver, ComposeTaskInfo.GetTaskId(), mesos.TaskState_TASK_FINISHED.Enum())
case types.POD_FAILED:
if IsPodLaunched {
if LaunchCmdAttempted {
err := StopPod(ComposeFiles)
if err != nil {
logger.Errorf("Error cleaning up pod : %v\n", err.Error())
Expand Down Expand Up @@ -885,14 +901,13 @@ func SendMesosStatus(driver executor.ExecutorDriver, taskId *mesos.TaskID, state
}
}

// Per @kkrishna, This delay is historical, and to have mesos process the status sent above. task failed or finished
// would stop the driver prematurely
time.Sleep(5 * time.Second)
if state.Enum().String() == mesos.TaskState_TASK_FAILED.Enum().String() ||
state.Enum().String() == mesos.TaskState_TASK_FINISHED.Enum().String() {
log.Println("====================Stop ExecutorDriver====================")
driver.Stop()
}

logger.Printf(" SendMesosStatus complete.")
// Push the state to Task status channel so any further steps on a given task status can be executed
taskStatusCh <- state.Enum().String()

return nil
}

Expand All @@ -904,6 +919,7 @@ func WaitOnPod(ctx *context.Context) {
log.Println("POD_LAUNCH_TIMEOUT")
if dump, ok := config.GetConfig().GetStringMap("dockerdump")["enable"].(bool); ok && dump {
DockerDump()

}
SendPodStatus(types.POD_FAILED)
} else if (*ctx).Err() == context.Canceled {
Expand Down Expand Up @@ -1131,7 +1147,7 @@ healthCheck:
log.Println("POD_INIT_HEALTH_CHECK_FAILURE -- Send Failed")
err = PrintInspectDetail(containers[i])
if err != nil {
log.Warnf("Error during docker inspect: ", err)
log.Warnf("Error during docker inspect: %v ", err)
}
out <- types.POD_FAILED.String()
return
Expand Down Expand Up @@ -1213,3 +1229,96 @@ func IsService(taskInfo *mesos.TaskInfo) bool {
d.Read(assignTask, taskInfo.GetData())
return assignTask.Task.IsService
}

// ListenOnTaskStatus listens
func ListenOnTaskStatus(driver executor.ExecutorDriver, taskInfo *mesos.TaskInfo) {
logger := log.WithFields(log.Fields{
"func": "ListenOnTaskStatus",
})
defer close(taskStatusCh)
// Copy taskInfo locally, it may be garbage collected when hooks are executed after LaunchTask
cachedTaskInfo := &taskInfo
for {
select {
case status, ok := <-taskStatusCh: // wait for task status from LaunchTask
if ok {
switch status {
case mesos.TaskState_TASK_RUNNING.String():
if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil {
logger.Errorf("executing hooks failed %v ", err)
}
case mesos.TaskState_TASK_FAILED.String():
/*
Tasks are marked as Failed at,
1. Initial launch failure (PodStatus.Launched == false)
2. Health Monitor or any plugin monitors and fails after the task has been running for
a longtime (PodStatus.Launched = true, and marked as failed later)
*/
if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil {
logger.Errorf("executing hooks failed %v ", err)
}
stopDriver(driver)
break
case mesos.TaskState_TASK_FINISHED.String():
stopDriver(driver)
break
default:
log.Infof("Nothing to do on task status %s", status)
}
} else {
log.Errorln("failure reading from task status channel")
}
}
}
}

// execPodStatusHooks finds the hooks (implementations of ExecutorHook interface) configured for executor phase and executes them
// error is returned if any of the hooks failed, and ExecutorHook.BestEffort() returns true
func execPodStatusHooks(status string, taskInfo *mesos.TaskInfo) error {
logger := log.WithFields(log.Fields{
"requuid": GetLabel("requuid", taskInfo),
"tenant": GetLabel("tenant", taskInfo),
"namespace": GetLabel("namespace", taskInfo),
"pool": GetLabel("pool", taskInfo),
})
var podStatusHooks []string
if podStatusHooks = config.GetConfig().GetStringSlice(fmt.Sprintf("podStatusHooks.%s",
status)); len(podStatusHooks) < 1 {
logger.Infof("No post podStatusHook implementations found in config, skipping")
return nil
}
logger.Infof("Executor Post Hooks found: %v", podStatusHooks)
if _, err := utils.PluginPanicHandler(utils.ConditionFunc(func() (string, error) {
for _, name := range podStatusHooks {
hook := plugin.PodStatusHooks.Lookup(name)
if hook == nil {
logger.Errorf("Hook %s is nil, not initialized? still continuing with available hooks", name)
continue
}
if failExec, pherr := hook.Execute(status, taskInfo); pherr != nil {
logger.Errorf(
"PodStatusHook %s failed with %v and is not best effort, so stopping further execution ",
name, pherr)
if failExec {
return "", errors.Wrapf(pherr, "executing hook %s failed", name)
}
} else {
logger.Infof("Executed hook %s", name)
}
}
return "", nil
})); err != nil {
logger.Errorf("executing hooks at pod status %s failed | err: %v", status, err)
return err
}
return nil
}

func stopDriver(driver executor.ExecutorDriver) {
log.Println("====================Stop ExecutorDriver====================")
status, err := driver.Stop()
if err != nil {
log.Errorf("attempt to stop driver failed with error %v", err)
}
log.Infof("driver stopped with status %s", status.String())
}
Loading