diff --git a/go.mod b/go.mod index c5787e7..fca68a2 100644 --- a/go.mod +++ b/go.mod @@ -50,4 +50,4 @@ replace ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver => github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver v0.46.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter => github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.46.0 go.opentelemetry.io/collector => go.opentelemetry.io/collector v0.46.0 -) \ No newline at end of file +) diff --git a/types/types.go b/types/types.go index f61a1bf..99a4d1d 100644 --- a/types/types.go +++ b/types/types.go @@ -188,4 +188,5 @@ type StepData struct { type SvcContainer struct { ServiceName string ContainerId string + Pid string } diff --git a/utils/pod/pod.go b/utils/pod/pod.go index cc2e857..bb17fd9 100644 --- a/utils/pod/pod.go +++ b/utils/pod/pod.go @@ -117,7 +117,7 @@ func checkContainerExitCode(containerId string) (int, error) { return exitCode, nil } -//get docker health check logs +// get docker health check logs func PrintInspectDetail(containerId string) error { dceLog := config.CreateFileAppendMode(types.DCE_OUT) dceErr := config.CreateFileAppendMode(types.DCE_ERR) @@ -191,14 +191,58 @@ func GetServiceContainers(files, services []string) ([]types.SvcContainer, error if err != nil { return nil, err } + pid, err := GetContainerPid(id) + if err != nil { + return nil, err + } ids = append(ids, types.SvcContainer{ ServiceName: s, ContainerId: id, + Pid: pid, }) } return ids, nil } +func GetContainerPid(containerID string) (string, error) { + logger := log.WithFields(log.Fields{ + "containerID": containerID, + "func": "GetContainerPid", + }) + + // Return err if container ID is empty + if containerID == "" { + return "", fmt.Errorf("container ID can't be empty") + } + + // Generate cmd -- docker inspect -f '{{.State.Pid}}' + parts := strings.Fields("inspect -f '{{.State.Pid}}' " + containerID) + + // Run cmd to get container pid + cmd := exec.Command("docker", parts...) + logger.Printf("Command to get container pid by container ID: %s", cmd.Args) + + out, err := waitUtil.RetryCmd(config.GetMaxRetry(), cmd) + if err != nil { + logger.Errorf("Error getting container pid %s : %v", containerID, err) + return "", err + } + + // Scan output + var pid string + scanner := bufio.NewScanner(strings.NewReader(string(out[:]))) + for scanner.Scan() { + pid += scanner.Text() + } + if err := scanner.Err(); err != nil { + logger.Errorln("stderr: ", err) + return "", err + } + pid = strings.Trim(pid, "'") + logger.Printf("container pid: %s", pid) + return pid, nil +} + // GetContainerIdByService does query container id by service name func GetContainerIdByService(files []string, service string) (string, error) { logger := log.WithFields(log.Fields{ @@ -333,9 +377,9 @@ func LaunchPod(files []string) (types.PodStatus, error) { return types.POD_STARTING, nil } -//these logs should be written in a file also along with stdout. +// these logs should be written in a file also along with stdout. // 'retry' parameter is to indicate if RetryCmdLogs func should keep retrying if logs cmd fails or just exit. -//This is to make sure that we don't go in an infinite loop in RetryCmdLogs func when pod is killed, finished or fails. +// This is to make sure that we don't go in an infinite loop in RetryCmdLogs func when pod is killed, finished or fails. func dockerLogToPodLogFile(files []string, retry bool) { parts, err := GenerateCmdParts(files, " logs -t --follow --no-color") if err != nil { @@ -399,7 +443,7 @@ func StopPod(ctx context.Context, files []string) error { err = cmd.Run() if err != nil { logger.Errorf("POD_STOP_FAIL -- %s", err.Error()) - err = ForceKill(files) + err = ForceKill() if err != nil { logger.Errorf("POD_STOP_FORCE_FAIL -- Error in force pod kill : %v", err) return err @@ -515,31 +559,23 @@ func RemoveNetwork(name string) error { // Force kill pod // docker kill -f -func ForceKill(files []string) error { +func ForceKill() error { log.Println("====================Force Kill Pod====================") - parts, err := GenerateCmdParts(files, " kill") - if err != nil { - log.Printf("POD_GENERATE_COMPOSE_PARTS_FAIL -- %v", err) - return err - } - - cmd := exec.Command("docker-compose", parts...) - Dcelog := config.CreateFileAppendMode(types.DCE_OUT) - Dceerr := config.CreateFileAppendMode(types.DCE_ERR) - cmd.Stdout = Dcelog - cmd.Stderr = Dceerr - - log.Println("Kill Pod : Command to kill task : docker-compose ", parts) - - err = cmd.Run() - if err != nil { - log.Printf("POD_FORCE_KILL_FAIL -- %v", err) - return err + var errs error + for _, c := range MonitorContainerList { + if err := KillContainer("SIGKILL", c); err != nil { + if errs == nil { + errs = err + } else { + errs = errors.Wrapf(errs, err.Error()) + } + log.Errorf("fail to force kill container id %s, pid %s: %v", c.ContainerId, c.Pid, err) + } } - return nil + return errs } -//validate compose before image pull +// validate compose before image pull func ValidateCompose(files []string) error { parts, err := GenerateCmdParts(files, " config -q") if err != nil { @@ -603,8 +639,8 @@ func PullImage(files []string) error { return nil } -//CheckContainer does check container details -//return healthy,run,err +// CheckContainer does check container details +// return healthy,run,err func CheckContainer(containerId string, healthCheck bool) (types.HealthStatus, bool, int, error) { containerDetail, err := InspectContainerDetails(containerId, healthCheck) if err != nil { @@ -633,25 +669,44 @@ func CheckContainer(containerId string, healthCheck bool) (types.HealthStatus, b return types.HEALTHY, containerDetail.IsRunning, containerDetail.ExitCode, nil } -func KillContainer(sig string, containerId string) error { +func KillContainer(sig string, svcContainer types.SvcContainer) error { logger := log.WithFields(log.Fields{ - "containerId": containerId, - "signal": sig, - "func": "pod.KillContainer", + "signal": sig, + "func": "pod.KillContainer", }) var err error var cmd *exec.Cmd - if sig != "" { - cmd = exec.Command("docker", "kill", fmt.Sprintf("--signal=%s", sig), containerId) + + // Get container pid + if svcContainer.Pid == "" { + for _, c := range MonitorContainerList { + if c.ContainerId == svcContainer.ContainerId { + svcContainer.Pid = c.Pid + break + } + if c.ServiceName == svcContainer.ServiceName { + svcContainer.Pid = c.Pid + } + } + } + // If pid is not cached, still use docker kill sending signal + if svcContainer.Pid != "" && svcContainer.Pid != "0" { + cmd = exec.Command("kill", "-"+sig, svcContainer.Pid) } else { - cmd = exec.Command("docker", "kill", containerId) + logger.Info("pid not found from cache, sending docker kill instead") + if sig != "" { + cmd = exec.Command("docker", "kill", fmt.Sprintf("--signal=%s", sig), svcContainer.ContainerId) + } else { + cmd = exec.Command("docker", "kill", svcContainer.ContainerId) + } } + logger.Printf("Command to kill container: %v", cmd.Args) _, err = waitUtil.RetryCmd(config.GetMaxRetry(), cmd) if err != nil { - log.Printf("Error kill container %s : %v", containerId, err) + log.Printf("Error kill container %s : %v", svcContainer.ContainerId, err) return err } @@ -875,7 +930,7 @@ func SendPodStatus(ctx context.Context, status types.PodStatus) { logger.Printf("MesosStatus %s completed", status) } -//Update mesos and pod status +// Update mesos and pod status func SendMesosStatus(ctx context.Context, driver executor.ExecutorDriver, taskId *mesos.TaskID, state *mesos.TaskState) error { logger := log.WithFields(log.Fields{ "state": state.Enum().String(), @@ -1134,7 +1189,11 @@ func HealthCheck(files []string, podServices map[string]bool, out chan<- string) logger.Debugf("list of containers are launched : %v", containers) time.Sleep(interval) } - + MonitorContainerList = make([]types.SvcContainer, len(containers)) + copy(MonitorContainerList, containers) + for _, c := range MonitorContainerList { + logger.Infof("service : %s, containerid: %s, pid: %s", c.ServiceName, c.ContainerId, c.Pid) + } logger.Println("Initial Health Check : Expected number of containers in monitoring : ", len(podServices)) logger.Println("Initial Health Check : Actual number of containers in monitoring : ", len(containers)) logger.Println("Container List : ", containers) @@ -1176,7 +1235,7 @@ healthCheck: } if !(exitCode == 0 && !running) && healthy == types.UNHEALTHY { - err = errors.Errorf("service %s is unhealthy", containers[i].ServiceName) + err = fmt.Errorf("service %s is unhealthy", containers[i].ServiceName) EndStep(StepMetrics, fmt.Sprintf("HealthCheck-%s", containers[i].ServiceName), types.GetInstanceStatusTag(containers[i], healthy, running, exitCode), err) @@ -1235,8 +1294,6 @@ healthCheck: } UpdateHealthCheckStatus(StepMetrics) - - MonitorContainerList = make([]types.SvcContainer, len(containers)) copy(MonitorContainerList, containers) logger.Printf("Health Check List: %v", HealthCheckListId) @@ -1400,7 +1457,7 @@ func execPodStatusHooks(ctx context.Context, status string, taskInfo *mesos.Task "PodStatusHook %s failed with %v and is not best effort, so stopping further execution ", name, pherr) if failExec { - return "", errors.Wrapf(pherr, "executing hook %s failed", name) + return "", fmt.Errorf("executing hook %s failed: %v", name, pherr) } } else { logger.Infof("Executed hook %s", name) diff --git a/utils/pod/pod_test.go b/utils/pod/pod_test.go index 00f47c6..871958a 100644 --- a/utils/pod/pod_test.go +++ b/utils/pod/pod_test.go @@ -50,7 +50,7 @@ func TestLaunchPod(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, res, types.POD_STARTING) - err = ForceKill(files) + err = ForceKill() assert.NoError(t, err) } @@ -86,7 +86,7 @@ func TestForceKill(t *testing.T) { if res != types.POD_STARTING { t.Fatalf("expected pod status to be POD_STARTING, but got %s", res) } - err = ForceKill(files) + err = ForceKill() assert.NoError(t, err) } @@ -127,7 +127,7 @@ func TestGetContainerIdByService(t *testing.T) { } func TestKillContainer(t *testing.T) { - err := KillContainer("", "") + err := KillContainer("", types.SvcContainer{}) log.Println(err.Error()) assert.Error(t, err, "test kill invalid container") @@ -140,9 +140,11 @@ func TestKillContainer(t *testing.T) { id, err := wait.PollUntil(time.Duration(1)*time.Second, nil, time.Duration(5)*time.Second, wait.ConditionFunc(func() (string, error) { return GetContainerIdByService(files, "redis") })) - err = KillContainer("SIGUSR1", id) + err = KillContainer("SIGUSR1", types.SvcContainer{ + ContainerId: id, + }) assert.NoError(t, err, "Test sending kill signal to container") - err = KillContainer("", id) + err = KillContainer("", types.SvcContainer{ContainerId: id}) assert.NoError(t, err) config.GetConfig().Set(types.RM_INFRA_CONTAINER, true) @@ -358,6 +360,27 @@ type happyHook struct{} type mandatoryHook struct{} type panicHook struct{} +func (p *happyHook) TaskInfoInitializer(ctx context.Context, data interface{}) error { + return nil +} + +func (p *happyHook) Shutdown(ctx context.Context, podStatus string, data interface{}) { +} + +func (p *panicHook) TaskInfoInitializer(ctx context.Context, data interface{}) error { + return nil +} + +func (p *panicHook) Shutdown(ctx context.Context, podStatus string, data interface{}) { +} + +func (p *mandatoryHook) TaskInfoInitializer(ctx context.Context, data interface{}) error { + return nil +} + +func (p *mandatoryHook) Shutdown(ctx context.Context, podStatus string, data interface{}) { +} + func (p *happyHook) Execute(ctx context.Context, podStatus string, data interface{}) (failExec bool, err error) { return true, nil }