Skip to content

Commit

Permalink
UPSTREAM: <carry>: kube-apiserver: wire through isTerminating into ha…
Browse files Browse the repository at this point in the history
…ndler chain

Origin-commit: 5772e7285acbe901762d8cd8cb1fc33d8b459d04
  • Loading branch information
sttts authored and soltysh committed Sep 8, 2021
1 parent 641a8b0 commit b4df81a
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 23 deletions.
15 changes: 12 additions & 3 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
return nil, err
}

kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport, stopCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -335,13 +335,14 @@ func CreateKubeAPIServerConfig(
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
stopCh <-chan struct{},
) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport, stopCh)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -482,7 +483,7 @@ func CreateKubeAPIServerConfig(
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,

stopCh <-chan struct{},
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
Expand All @@ -493,6 +494,14 @@ func buildGenericConfig(
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.IsTerminating = func() bool {
select {
case <-stopCh:
return true
default:
return false
}
}
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()

if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver, nil)

return handler
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubeapiserver/server/insecure_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver, nil)

return handler
}
2 changes: 1 addition & 1 deletion pkg/kubelet/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ var statusesNoTracePred = httplog.StatusIsNot(

// ServeHTTP responds to HTTP requests on the Kubelet.
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred)
handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred, nil)

// monitor http requests
var serverType string
Expand Down
5 changes: 4 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ type Config struct {

// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager

// A func that returns whether the server is terminating. This can be nil.
IsTerminating func() bool
}

type RecommendedConfig struct {
Expand Down Expand Up @@ -759,7 +762,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver, c.IsTerminating)
return handler
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.

handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck, requestTimeout)
handler = apifilters.WithRequestInfo(handler, requestInfoFactory)
handler = WithPanicRecovery(handler, requestInfoFactory)
handler = WithPanicRecovery(handler, requestInfoFactory, func() bool { return false })
return handler
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestTimeout(t *testing.T) {
}), func(w http.ResponseWriter, req *http.Request, err interface{}) {
gotPanic <- err
http.Error(w, "This request caused apiserver to panic. Look in the logs for details.", http.StatusInternalServerError)
}),
}, nil),
)
defer ts.Close()

Expand Down Expand Up @@ -236,7 +236,7 @@ func TestErrConnKilled(t *testing.T) {
GrouplessAPIPrefixes: sets.NewString("api"),
}

ts := httptest.NewServer(WithPanicRecovery(handler, resolver))
ts := httptest.NewServer(WithPanicRecovery(handler, resolver, nil))
defer ts.Close()

_, err = http.Get(ts.URL)
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestErrConnKilledHTTP2(t *testing.T) {
}

// test server
ts := httptest.NewUnstartedServer(WithPanicRecovery(handler, resolver))
ts := httptest.NewUnstartedServer(WithPanicRecovery(handler, resolver, nil))
tsCert, err := tls.X509KeyPair(tsCrt, tsKey)
if err != nil {
t.Fatalf("backend: invalid x509/key pair: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/server/filters/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

// WithPanicRecovery wraps an http Handler to recover and log panics (except in the special case of http.ErrAbortHandler panics, which suppress logging).
func WithPanicRecovery(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
func WithPanicRecovery(handler http.Handler, resolver request.RequestInfoResolver, isTerminating func() bool) http.Handler {
return withPanicRecovery(handler, func(w http.ResponseWriter, req *http.Request, err interface{}) {
if err == http.ErrAbortHandler {
// Honor the http.ErrAbortHandler sentinel panic value
Expand Down Expand Up @@ -56,11 +56,11 @@ func WithPanicRecovery(handler http.Handler, resolver request.RequestInfoResolve
}
http.Error(w, "This request caused apiserver to panic. Look in the logs for details.", http.StatusInternalServerError)
klog.Errorf("apiserver panic'd on %v %v", req.Method, req.RequestURI)
})
}, isTerminating)
}

func withPanicRecovery(handler http.Handler, crashHandler func(http.ResponseWriter, *http.Request, interface{})) http.Handler {
handler = httplog.WithLogging(handler, httplog.DefaultStacktracePred)
func withPanicRecovery(handler http.Handler, crashHandler func(http.ResponseWriter, *http.Request, interface{}), isTerminating func() bool) http.Handler {
handler = httplog.WithLogging(handler, httplog.DefaultStacktracePred, isTerminating)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer runtime.HandleCrash(func(err interface{}) {
crashHandler(w, req, err)
Expand Down
17 changes: 14 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type respLogger struct {
statusStack string
addedInfo string
startTime time.Time
isTerminating bool

captureErrorOutput bool

Expand All @@ -76,16 +77,20 @@ func DefaultStacktracePred(status int) bool {
}

// WithLogging wraps the handler with logging.
func WithLogging(handler http.Handler, pred StacktracePred) http.Handler {
func WithLogging(handler http.Handler, pred StacktracePred, isTerminatingFn func() bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if old := respLoggerFromContext(req); old != nil {
panic("multiple WithLogging calls!")
}
rl := newLogged(req, w).StacktraceWhen(pred)
isTerminating := false
if isTerminatingFn != nil {
isTerminating = isTerminatingFn()
}
rl := newLogged(req, w).StacktraceWhen(pred).IsTerminating(isTerminating)
req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl))

if klog.V(3).Enabled() {
if klog.V(3).Enabled() || (rl.isTerminating && klog.V(1).Enabled()) {
defer func() { klog.InfoS("HTTP", rl.LogArgs()...) }()
}
handler.ServeHTTP(rl, req)
Expand Down Expand Up @@ -137,6 +142,12 @@ func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {
return rl
}

// IsTerminating informs the logger that the server is terminating.
func (rl *respLogger) IsTerminating(is bool) *respLogger {
rl.isTerminating = is
return rl
}

// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged
// for any status *not* in the given list.
func StatusIsNot(statuses ...int) StacktracePred {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestWithLogging(t *testing.T) {
}
var handler http.Handler
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})
handler = WithLogging(WithLogging(handler, DefaultStacktracePred), DefaultStacktracePred)
handler = WithLogging(WithLogging(handler, DefaultStacktracePred, nil), DefaultStacktracePred, nil)

func() {
defer func() {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestLogOf(t *testing.T) {
}
})
if makeLogger {
handler = WithLogging(handler, DefaultStacktracePred)
handler = WithLogging(handler, DefaultStacktracePred, nil)
want = "*httplog.respLogger"
} else {
want = "*httplog.passthroughLogger"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestUnlogged(t *testing.T) {
}
})
if makeLogger {
handler = WithLogging(handler, DefaultStacktracePred)
handler = WithLogging(handler, DefaultStacktracePred, nil)
}

handler.ServeHTTP(origWriter, req)
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/controller-manager/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
}
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver, nil)

return handler
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/framework/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport, nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit b4df81a

Please sign in to comment.