Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quit all processes when "apiserver-boot run local" failed #255

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 45 additions & 31 deletions cmd/apiserver-boot/boot/run/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package run

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -98,6 +99,10 @@ func RunLocal(cmd *cobra.Command, args []string) {

WriteKubeConfig()

// parent context to indicate whether cmds quit
ctx, cancel := context.WithCancel(context.Background())
ctx = util.CancelWhenSignaled(ctx)

r := map[string]interface{}{}
for _, s := range toRun {
r[s] = nil
Expand All @@ -106,46 +111,38 @@ func RunLocal(cmd *cobra.Command, args []string) {
// Start etcd
if _, f := r["etcd"]; f {
etcd = "http://localhost:2379"
etcdCmd := RunEtcd()
defer etcdCmd.Process.Kill()
RunEtcd(ctx, cancel)
time.Sleep(time.Second * 2)
}

// Start apiserver
if _, f := r["apiserver"]; f {
go RunApiserver()
RunApiserver(ctx, cancel)
time.Sleep(time.Second * 2)
}

// Start controller manager
if _, f := r["controller-manager"]; f {
go RunControllerManager()
RunControllerManager(ctx, cancel)
}

fmt.Printf("to test the server run `kubectl --kubeconfig %s api-versions`\n", config)
select {} // wait forever
<-ctx.Done() // wait forever
}

func RunEtcd() *exec.Cmd {
func RunEtcd(ctx context.Context, cancel context.CancelFunc) *exec.Cmd {
etcdCmd := exec.Command("etcd")
if printetcd {
etcdCmd.Stderr = os.Stderr
etcdCmd.Stdout = os.Stdout
}

fmt.Printf("%s\n", strings.Join(etcdCmd.Args, " "))
go func() {
err := etcdCmd.Run()
defer etcdCmd.Process.Kill()
if err != nil {
log.Fatalf("Failed to run etcd %v", err)
os.Exit(-1)
}
}()
go runCommon(etcdCmd, ctx, cancel)

return etcdCmd
}

func RunApiserver() *exec.Cmd {
func RunApiserver(ctx context.Context, cancel context.CancelFunc) *exec.Cmd {
if len(server) == 0 {
server = "bin/apiserver"
}
Expand All @@ -162,45 +159,62 @@ func RunApiserver() *exec.Cmd {
apiserverCmd := exec.Command(server,
flags...,
)
fmt.Printf("%s\n", strings.Join(apiserverCmd.Args, " "))
if printapiserver {
apiserverCmd.Stderr = os.Stderr
apiserverCmd.Stdout = os.Stdout
}

err := apiserverCmd.Run()
if err != nil {
defer apiserverCmd.Process.Kill()
log.Fatalf("Failed to run apiserver %v", err)
os.Exit(-1)
}
go runCommon(apiserverCmd, ctx, cancel)

return apiserverCmd
}

func RunControllerManager() *exec.Cmd {
func RunControllerManager(ctx context.Context, cancel context.CancelFunc) *exec.Cmd {
if len(controllermanager) == 0 {
controllermanager = "bin/controller-manager"
}

controllerManagerCmd := exec.Command(controllermanager,
fmt.Sprintf("--kubeconfig=%s", config),
)
fmt.Printf("%s\n", strings.Join(controllerManagerCmd.Args, " "))
if printcontrollermanager {
controllerManagerCmd.Stderr = os.Stderr
controllerManagerCmd.Stdout = os.Stdout
}

err := controllerManagerCmd.Run()
if err != nil {
defer controllerManagerCmd.Process.Kill()
log.Fatalf("Failed to run controller-manager %v", err)
os.Exit(-1)
}
go runCommon(controllerManagerCmd, ctx, cancel)

return controllerManagerCmd
}

// run a command via goroutine
func runCommon(cmd *exec.Cmd, ctx context.Context, cancel context.CancelFunc) {
stopCh := make(chan error)
cmdName := cmd.Args[0]

fmt.Printf("%s\n", strings.Join(cmd.Args, " "))
go func() {
err := cmd.Run()
if err != nil {
log.Printf("Failed to run %s, error: %v\n", cmdName, err)
} else {
log.Printf("Command %s quitted normally\n", cmdName)
}
stopCh <- err
}()

select {
case <-stopCh:
// my command quited
cancel()
case <-ctx.Done():
// other commands quited
if cmd.Process != nil {
cmd.Process.Kill()
}
}
}

func WriteKubeConfig() {
// Write a kubeconfig
dir, err := os.Getwd()
Expand Down
15 changes: 10 additions & 5 deletions cmd/apiserver-boot/boot/run/local_aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package run

import (
"context"
"fmt"
"log"
"os"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/spf13/cobra"

"github.com/kubernetes-incubator/apiserver-builder/cmd/apiserver-boot/boot/build"
"github.com/kubernetes-incubator/apiserver-builder/cmd/apiserver-boot/boot/util"
"k8s.io/client-go/util/homedir"
)

Expand Down Expand Up @@ -93,6 +95,10 @@ func RunLocalMinikube(cmd *cobra.Command, args []string) {
build.RunBuildExecutables(cmd, args)
}

// parent context to indicate whether cmds quit
ctx, cancel := context.WithCancel(context.Background())
ctx = util.CancelWhenSignaled(ctx)

r := map[string]interface{}{}
for _, s := range toRun {
r[s] = nil
Expand All @@ -112,24 +118,23 @@ func RunLocalMinikube(cmd *cobra.Command, args []string) {
// Start etcd
if _, f := r["etcd"]; f {
etcd = "http://localhost:2379"
etcdCmd := RunEtcd()
defer etcdCmd.Process.Kill()
RunEtcd(ctx, cancel)
time.Sleep(time.Second * 2)
}

// Start apiserver
if _, f := r["apiserver"]; f {
go RunApiserverMinikube()
RunApiserver(ctx, cancel)
time.Sleep(time.Second * 2)
}

// Start controller manager
if _, f := r["controller-manager"]; f {
go RunControllerManager()
RunControllerManager(ctx, cancel)
}

fmt.Printf("to test the server run `kubectl api-versions`, if you specified --kubeconfig you must also provide the flag `--kubeconfig %s`\n", config)
select {} // wait forever
<-ctx.Done() // wait forever
}

func RunApiserverMinikube() *exec.Cmd {
Expand Down
16 changes: 16 additions & 0 deletions cmd/apiserver-boot/boot/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package util

import (
"context"
"fmt"
"io/ioutil"
"log"
Expand All @@ -27,6 +28,8 @@ import (
"strings"
"text/template"

"os/signal"

"github.com/markbates/inflect"
)

Expand Down Expand Up @@ -139,3 +142,16 @@ func CheckInstall() {
strings.Join(missing, ","))
}
}

func CancelWhenSignaled(parent context.Context) context.Context {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@interma any plan to bump the thread?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double-receiving should be a fine addition to this piece of code if that's what you mean, but the context is also used to support cancelling when one of the processes unexpectedly quits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double-receiving should be a fine addition to this piece of code if that's what you mean

for now we're not catching signal in the code so there's no double-receiving. i mean the pull LGTM overall, but we should reuse the existing code from upstream as much as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've attempted to address this final code-review comment in #410

It'd be great to get this fixed in the next release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trouble is that importing SetupSignalHandler from k8s.io/apiserver causes a clash with the k8s.io/kube-openapi dependency.
So I copied the code across instead.

ctx, cancel := context.WithCancel(parent)

go func() {
signalChannel := make(chan os.Signal)
signal.Notify(signalChannel, os.Interrupt, os.Kill)
<-signalChannel
cancel()
}()

return ctx
}