Skip to content

Commit

Permalink
UPSTREAM: <carry>: create termination events
Browse files Browse the repository at this point in the history
Origin-commit: a869af0c97e3d97bddedcd76af8a62da6c879c02

UPSTREAM: <carry>: apiserver: log new connections during termination

Origin-commit: 89d1c3ceeb91755aae9099cd5f76c42a22de18c5

UPSTREAM: <carry>: apiserver: create LateConnections events on events in the last 20% of graceful termination time

Origin-commit: 91bc33b6ddf9e1d80906717db5bd9096183e8795

UPSTREAM: <carry>: apiserver: log source in LateConnections event

Origin-commit: 575e54740eb7c2ba635c73f24c22ad77cb5a6e70

UPSTREAM: <carry>: apiserver: skip local IPs and probes for LateConnections

Origin-commit: 2109b95866e81b84a290f34f0806becc2cbd83e9

UPSTREAM: <carry>: only create valid LateConnections/GracefulTermination events

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready

UPSTREAM: <carry>: apiserver: create hasBeenReadyCh channel

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready
  • Loading branch information
sttts authored and soltysh committed Sep 8, 2021
1 parent b4df81a commit 01df7f3
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 6 deletions.
46 changes: 45 additions & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,20 @@ import (
"k8s.io/kubernetes/openshift-kube-apiserver/enablement"
"k8s.io/kubernetes/openshift-kube-apiserver/openshiftkubeapiserver"

"github.com/go-openapi/spec"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

corev1 "k8s.io/api/core/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
Expand Down Expand Up @@ -74,6 +76,8 @@ import (

"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
Expand All @@ -84,6 +88,7 @@ import (
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/serviceaccount"
)
Expand Down Expand Up @@ -384,6 +389,13 @@ func CreateKubeAPIServerConfig(
}
}

var eventStorage *eventstorage.REST
eventStorage, err = eventstorage.NewREST(genericConfig.RESTOptionsGetter, uint64(s.EventTTL.Seconds()))
if err != nil {
return nil, nil, nil, err
}
genericConfig.EventSink = eventRegistrySink{eventStorage}

config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
Expand Down Expand Up @@ -830,3 +842,35 @@ func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, ne
}
return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
}

// eventRegistrySink wraps an event registry in order to be used as direct event sync, without going through the API.
type eventRegistrySink struct {
*eventstorage.REST
}

var _ genericapiserver.EventSink = eventRegistrySink{}

func (s eventRegistrySink) Create(v1event *corev1.Event) (*corev1.Event, error) {
ctx := request.WithNamespace(request.NewContext(), v1event.Namespace)

var event core.Event
if err := v1.Convert_v1_Event_To_core_Event(v1event, &event, nil); err != nil {
return nil, err
}

obj, err := s.REST.Create(ctx, &event, nil, &metav1.CreateOptions{})
if err != nil {
return nil, err
}
ret, ok := obj.(*core.Event)
if !ok {
return nil, fmt.Errorf("expected corev1.Event, got %T", obj)
}

var v1ret corev1.Event
if err := v1.Convert_core_Event_To_v1_Event(ret, &v1ret, nil); err != nil {
return nil, err
}

return &v1ret, nil
}
89 changes: 89 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package server

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
goruntime "runtime"
"runtime/debug"
"sort"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/go-openapi/spec"
"github.com/google/uuid"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -66,6 +69,8 @@ import (
"k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -215,6 +220,9 @@ type Config struct {
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
MergedResourceConfig *serverstore.ResourceConfig

// EventSink receives events about the life cycle of the API server, e.g. readiness, serving, signals and termination.
EventSink EventSink

//===========================================================================
// values below here are targets for removal
//===========================================================================
Expand All @@ -234,10 +242,18 @@ type Config struct {
// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager

// hasBeenReadyCh is closed when /readyz succeeds for the first time.
hasBeenReadyCh chan struct{}

// A func that returns whether the server is terminating. This can be nil.
IsTerminating func() bool
}

// EventSink allows to create events.
type EventSink interface {
Create(event *corev1.Event) (*corev1.Event, error)
}

type RecommendedConfig struct {
Config

Expand Down Expand Up @@ -343,6 +359,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),

hasBeenReadyCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -505,6 +523,10 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
c.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: c.ExternalAddress}
}

if c.EventSink == nil {
c.EventSink = nullEventSink{}
}

AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)

if c.RequestInfoResolver == nil {
Expand Down Expand Up @@ -532,9 +554,58 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *RecommendedConfig) Complete() CompletedConfig {
if c.ClientConfig != nil {
ref, err := eventReference()
if err != nil {
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
c.EventSink = nullEventSink{}
} else {
ns := ref.Namespace
if len(ns) == 0 {
ns = "default"
}
c.EventSink = &v1.EventSinkImpl{
Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns),
}
}
}

return c.Config.Complete(c.SharedInformerFactory)
}

func eventReference() (*corev1.ObjectReference, error) {
ns := os.Getenv("POD_NAMESPACE")
pod := os.Getenv("POD_NAME")
if len(ns) == 0 && len(pod) > 0 {
serviceAccountNamespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if _, err := os.Stat(serviceAccountNamespaceFile); err == nil {
bs, err := ioutil.ReadFile(serviceAccountNamespaceFile)
if err != nil {
return nil, err
}
ns = string(bs)
}
}
if len(ns) == 0 {
pod = ""
ns = "kube-system"
}
if len(pod) == 0 {
return &corev1.ObjectReference{
Kind: "Namespace",
Name: ns,
APIVersion: "v1",
}, nil
}

return &corev1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: pod,
APIVersion: "v1",
}, nil
}

// New creates a new server which logically combines the handling chain with the passed server.
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating.
// delegationTarget may not be nil.
Expand Down Expand Up @@ -587,6 +658,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
readinessStopCh: make(chan struct{}),
hasBeenReadyCh: c.hasBeenReadyCh,
livezGracePeriod: c.LivezGracePeriod,

DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
Expand All @@ -596,7 +668,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G

APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager,

eventSink: c.EventSink,
}

ref, err := eventReference()
if err != nil {
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
c.EventSink = nullEventSink{}
}
s.eventRef = ref

for {
if c.JSONPatchMaxCopyBytes <= 0 {
Expand Down Expand Up @@ -754,6 +835,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = WithNonReadyRequestLogging(handler, c.hasBeenReadyCh)
handler = WithLateConnectionFilter(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
Expand Down Expand Up @@ -860,3 +943,9 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
authz.Authorizer = authorizerunion.New(tokenAuthorizer, authz.Authorizer)
}
}

type nullEventSink struct{}

func (nullEventSink) Create(event *corev1.Event) (*corev1.Event, error) {
return nil, nil
}
66 changes: 66 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server
import (
"fmt"
"net/http"
"os"
gpath "path"
"strings"
"sync"
Expand All @@ -27,6 +28,7 @@ import (
systemd "github.com/coreos/go-systemd/daemon"
"github.com/go-openapi/spec"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -176,6 +178,8 @@ type GenericAPIServer struct {
// the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this
// will cause readyz to return unhealthy.
readinessStopCh chan struct{}
// hasBeenReadyCh is closed when /readyz succeeds for the first time.
hasBeenReadyCh chan struct{}

// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
Expand Down Expand Up @@ -209,6 +213,10 @@ type GenericAPIServer struct {

// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager

// EventSink creates events.
eventSink EventSink
eventRef *corev1.ObjectReference
}

// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
Expand Down Expand Up @@ -338,22 +346,49 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// and stop sending traffic to this server.
close(s.readinessStopCh)

s.Eventf(corev1.EventTypeNormal, "TerminationStart", "Received signal to terminate, becoming unready, but keeping serving")

time.Sleep(s.ShutdownDelayDuration)

s.Eventf(corev1.EventTypeNormal, "TerminationMinimalShutdownDurationFinished", "The minimal shutdown duration of %v finished", s.ShutdownDelayDuration)
}()

lateStopCh := make(chan struct{})
if s.ShutdownDelayDuration > 0 {
go func() {
defer close(lateStopCh)

<-stopCh

time.Sleep(s.ShutdownDelayDuration * 8 / 10)
}()
}

s.SecureServingInfo.Listener = &terminationLoggingListener{
Listener: s.SecureServingInfo.Listener,
lateStopCh: lateStopCh,
}
unexpectedRequestsEventf.Store(s.Eventf)

// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
if err != nil {
return err
}

go func() {
<-delayedStopCh
s.Eventf(corev1.EventTypeNormal, "TerminationStoppedServing", "Server has stopped listening")
}()

<-stopCh

// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
s.Eventf(corev1.EventTypeNormal, "TerminationPreShutdownHooksFinished", "All pre-shutdown hooks have been finished")

// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-delayedStopCh
Expand All @@ -362,6 +397,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {

// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
s.Eventf(corev1.EventTypeNormal, "TerminationGracefulTerminationFinished", "All pending requests processed")

return nil
}
Expand Down Expand Up @@ -636,3 +672,33 @@ func getResourceNamesForGroup(apiPrefix string, apiGroupInfo *APIGroupInfo, path

return resourceNames, nil
}

// Eventf creates an event with the API server as source, either in default namespace against default namespace, or
// if POD_NAME/NAMESPACE are set against that pod.
func (s *GenericAPIServer) Eventf(eventType, reason, messageFmt string, args ...interface{}) {
t := metav1.Time{Time: time.Now()}
host, _ := os.Hostname() // expicitly ignore error. Empty host is fine

ref := *s.eventRef
if len(ref.Namespace) == 0 {
ref.Namespace = "default" // TODO: event broadcaster sets event ns to default. We have to match. Odd.
}

e := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: ref.Namespace,
},
InvolvedObject: ref,
Reason: reason,
Message: fmt.Sprintf(messageFmt, args...),
Type: eventType,
Source: corev1.EventSource{Component: "apiserver", Host: host},
}

klog.V(2).Infof("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)

if _, err := s.eventSink.Create(e); err != nil {
klog.Warningf("failed to create event %s/%s: %v", e.Namespace, e.Name, err)
}
}
Loading

0 comments on commit 01df7f3

Please sign in to comment.