-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SHIPA-2066] ketch to monitor deployment better #177
Conversation
rm message filter; use recorder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the loop in watchDeployEvents
looks like a good solution.
Yesterday @DavisFrench merged a PR to send events with annotations, maybe we can annotate events in this PR too? so it'll be easier to consume events in shipa
"time" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/pkg/errors" | ||
"helm.sh/helm/v3/pkg/release" | ||
appsv1 "k8s.io/api/apps/v1" | ||
apiv1 "k8s.io/api/core/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's already imported as v1
var dep appsv1.Deployment | ||
if err := r.Get(ctx, client.ObjectKey{ | ||
Namespace: framework.Spec.NamespaceName, | ||
Name: fmt.Sprintf("%s-%s-%d", app.GetName(), process.Name, len(app.Spec.Deployments)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be fmt.Sprintf("%s-%s-%d", app.GetName(), process.Name, latestDeployment.Version)
?
because ketch generates a k8s Deployment name using <app-name>-<process-name>-<deploymentVersion>
template.
Do we need this check?
if dep.Status.ObservedGeneration >= dep.Generation {
continue
}
Inside watchDeployEvents
we wait when dep.Status.ObservedGeneration < dep.Generation
condition becomes true, and then start monitoring things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generation check removed.
if dep.Status.ObservedGeneration >= dep.Generation { | ||
continue | ||
} | ||
err = watchDeployEvents(ctx, app, framework.Spec.NamespaceName, &dep, &process, r.Recorder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
running this function in the app reconciler's goroutine blocks all other deployments.
We can either run it in a dedicated goroutine or set MaxConcurrentReconciles
to something more suitable.
func (r *AppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ketchv1.App{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(r)
}
Moreover, ketch doesn't use ObservedGeneration and Generation.
A nice write-up about it https://alenkacz.medium.com/kubernetes-operator-best-practices-implementing-observedgeneration-250728868792
When ketch starts up, it goes thru all apps and updates their states, meaning it'll run this function for all apps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the MaxConcurrentReconciles
} | ||
|
||
opts := listOptsForPodEvent(app) | ||
opts.Watch = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use opts.ResourceVersion here
opts := listOptsForPodEvent(app)
opts.Watch = true
opts.ResourceVersion = app.ResourceVersion
https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
defer func() { | ||
watch.Stop() | ||
if watchCh != nil { | ||
// Drain watch channel to avoid goroutine leaks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I get it
why we can't go with
watch, err := cli.CoreV1().Events(namespace).Watch(ctx, opts)
if err != nil {
return err
}
defer watch.Stop()
for {
select {
case <-time.After(100 * time.Millisecond):
case msg, isOpen := <-watch.ResultChan():
if !isOpen {
break
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me neither. I added the deferred watch.Stop().
// allNewPodsRunning returns true if a list of pods contains the same number of running pods with <app>-<process>-<deploymentVersion> as the | ||
// process.Units requires. | ||
func allNewPodsRunning(ctx context.Context, cli *kubernetes.Clientset, app *ketchv1.App, process *ketchv1.ProcessSpec, depRevision string) (bool, error) { | ||
pods, err := cli.CoreV1().Pods(app.GetNamespace()).List(ctx, listOptsForPodEvent(app)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we update a k8s Deployment, k8s controller creates a new ReplicaSet.
The previous ReplicaSet starts removing pods, the new one starts creating pods.
We are interested in new pods, right?
idk, maybe there is another way to get them, but here's a working solution:
- get the k8s deployment's
deployment.kubernetes.io/revision
annotation - find a ReplicaSet that has the same annotation with the same value
- find all pods that have a link in their
ownerReference
list pointing to the ReplicaSet from step 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more about ownerReference
https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this func and used the existing checkPodStatus
.
opts := metav1.ListOptions{ | ||
FieldSelector: "involvedObject.kind=Pod", | ||
} | ||
pods, err := cli.CoreV1().Pods(app.GetNamespace()).List(ctx, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we get all pods here even ones not related to the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I added a LabelSelector to limit by AppName.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it might be that we need to check the pods of the current deployment's revision.
A use-case: as a user I deployed an application and it is failing now, several of the app's pods are not ready. I have built a new image and am deploying it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated label selector to LabelSelector: fmt.Sprintf("%s/app-name=%s,%s/app-deployment-version=%d", group, app.Name, group, deploymentVersion),
to add limiting by deployment version. Is this what you're thinking?
eventsInterface := cli.CoreV1().Events(namespace) | ||
selector := eventsInterface.GetFieldSelector(&pod.Name, &namespace, nil, nil) | ||
options := metav1.ListOptions{FieldSelector: selector.String()} | ||
events, err := eventsInterface.List(context.TODO(), options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use the passed in ctx
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Changed.
deadlineExeceededProgressCond = "ProgressDeadlineExceeded" | ||
DefaultPodRunningTimeout = 10 * time.Minute | ||
maxWaitTimeDuration = time.Duration(120) * time.Second | ||
maxConcurrentReconciles = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we can deploy only 10 apps simultaneously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this and opted to run all of the watch
in a goroutine to prevent blocking.
config, err := GetRESTConfig() | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about creating a new field in AppReconclier and using that instea of incluster config get
type AppReconciler struct {
...
Config *restclient.Config
}
and instantiate it in main with
AppReconclier{
Config: ctrl.GetConfigOrDie
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is also a way given here
https://github.com/shipa-corp/ketch/blob/master/cmd/ketch/configuration/configuration.go#L93-L96
this would introduce 3rd way to initialize config, do we want it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I utilized the existing function in the config and initialized in main
as you suggest.
message: fmt.Sprintf("failed to get deployment: %v", err), | ||
} | ||
} | ||
err = r.watchDeployEvents(ctx, app, framework.Spec.NamespaceName, &dep, &process, r.Recorder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this line blocking?
meaning should we watch processes in parallel?
or with this we wait for first process, then monitor second and so on?
i belive it should be parallel, what do you guys think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I put all of the Event watch
code in a goroutine. I was struggling a lot with an issue where the reconciler would complete before creating all events, but realized it was fixable with a requeue
parameter rather than blocking everything.
} | ||
|
||
opts := metav1.ListOptions{ | ||
FieldSelector: "involvedObject.kind=Pod", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be for the given app?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure there is a way to filter Pod events by App since events don't have labels. isDeploymentEvent
filters them by app name prefix before emitting them as App Events.
oldUpdatedReplicas := int32(-1) | ||
oldReadyUnits := int32(-1) | ||
oldPendingTermination := int32(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is always -1?
should we read old status instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets set to pendingTermination
each cycle here: https://github.com/theketchio/ketch/pull/177/files#diff-01570fd749623ed07d5f2e0b7097495a4ef86b6f3419378180bc43fc73c9223eR443. I'm okay with changing it, but was trying to keep output similar to Shipa's: https://github.com/shipa-corp/shipa/blob/master/provision/kubernetes/app_manager.go#L445
} | ||
}() | ||
|
||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it always break?
do we always either get and error or the following condition is always met?
readyUnits == specReplicas &&
dep.Status.Replicas == specReplicas
should we have timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I re-added the bit that checks for timeouts.
} | ||
|
||
// stringifyEvent accepts an event and returns relevant details as a string | ||
func stringifyEvent(watchEvent watch.Event) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe using annotations, similar to CanaryEvents is better solution, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good. I removed this and added an AppDeploymentEvent
type that includes annotations
and String()
, similar to the Canary work. Hoping it's close to what you are expecting.
opts := metav1.ListOptions{ | ||
FieldSelector: "involvedObject.kind=Pod", | ||
} | ||
pods, err := cli.CoreV1().Pods(app.GetNamespace()).List(ctx, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it might be that we need to check the pods of the current deployment's revision.
A use-case: as a user I deployed an application and it is failing now, several of the app's pods are not ready. I have built a new image and am deploying it right now.
internal/controllers/k8s_config.go
Outdated
|
||
// GetRESTConfig returns a rest.Config. It uses the presence of KUBERNETES_SERVICE_HOST | ||
// to determine whether to use an InClusterConfig or the user's config. | ||
func GetRESTConfig() (*rest.Config, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a nice ctrl.GetConfigOrDie()
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I removed this whole file as it's not used anymore.
internal/controllers/k8s_config.go
Outdated
package controllers | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
|
||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
) | ||
|
||
// GetRESTConfig returns a rest.Config. It uses the presence of KUBERNETES_SERVICE_HOST | ||
// to determine whether to use an InClusterConfig or the user's config. | ||
func GetRESTConfig() (*rest.Config, error) { | ||
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { | ||
return externalConfig() | ||
} | ||
return rest.InClusterConfig() | ||
} | ||
|
||
// externalConfig returns a REST config to be run external to the cluster, e.g. testing locally. | ||
func externalConfig() (*rest.Config, error) { | ||
home, err := os.UserHomeDir() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
configStr := filepath.Join(home, ".kube", "config") | ||
return clientcmd.BuildConfigFromFlags("", configStr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you loaded config from ctrl, we don't need this file anymore right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Removed this dead code.
} | ||
} | ||
|
||
go r.watchFunc(ctx, app, namespace, dep, process, recorder, watcher, cli, timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do processes share watcher?
we instantiate watcher for each process, but i would say from code that we would get same events for each process, we just print them differently Updating units [%s]", process.Name
how do we know that some events belong to certain process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each process has it's own watcher (is that a good idea 🤷 ). The watcher does 2 things: 1) watch Pod Events and 2) keep checking the appsv1.Deployment for updates.
- The Pod Events are filtered by process here because the appsv1.Deployment name includes the process name. An appsv1.Deployment is 1-to-1 with a Ketch Process (which confuses because Ketch has Deployments too).
- The appsv1.Deployments that we keep checking are also filtered by appsv1.Deployment.Name e.g. here which is the Ketch Process Name.
At least, I think that's what's going on. I'm new here.
func (r *AppReconciler) watchFunc(ctx context.Context, app *ketchv1.App, namespace string, dep *appsv1.Deployment, process *ketchv1.ProcessSpec, recorder record.EventRecorder, watcher watch.Interface, cli kubernetes.Interface, timeout <-chan time.Time) error { | ||
var err error | ||
watchCh := watcher.ResultChan() | ||
// recorder.Eventf(app, v1.EventTypeNormal, appReconcileStarted, "Updating units [%s]", process.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this line
} | ||
} | ||
|
||
func (a *AppDeploymentEvent) String() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this function if we use annotations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I removed this func and moved the string presentation logic into the corresponding shipa PR: https://github.com/shipa-corp/shipa/pull/1028
…od-getter; improves annotations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome job
@@ -319,7 +320,8 @@ func TestNewApplicationChart(t *testing.T) { | |||
Version: "0.0.1", | |||
AppName: tt.application.Name, | |||
} | |||
client := HelmClient{cfg: &action.Configuration{KubeClient: &fake.PrintingKubeClient{}, Releases: storage.Init(driver.NewMemory())}, namespace: tt.framework.Spec.NamespaceName} | |||
|
|||
client := HelmClient{cfg: &action.Configuration{KubeClient: &fake.PrintingKubeClient{}, Releases: storage.Init(driver.NewMemory())}, namespace: tt.framework.Spec.NamespaceName, c: clientfake.NewClientBuilder().Build()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was panicking without a client.
|
||
reconcileStartedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileStarted, fmt.Sprintf("Updating units [%s]", process.Name), process.Name) | ||
recorder.AnnotatedEventf(app, reconcileStartedEvent.Annotations, v1.EventTypeNormal, reconcileStartedEvent.Reason, reconcileStartedEvent.Description) | ||
go r.watchFunc(ctx, app, namespace, dep, process.Name, recorder, watcher, cli, timeout, watcher.Stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch asynchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!
Description
A lot of this was borrowed from: https://github.com/shipa-corp/shipa/blob/master/provision/kubernetes/app_manager.go#L404, as the ticket 2066 indicates. Essentially, this change creates an event watcher and repeatedly checks deployments for updates. It then records new
app
events with relevant info.Fixes # 2066
Permits Shipa to generate output shown below during an App Deployment using Ketch provisioner. Corresponding shipa PR - we may want to remove the AppReconcileOutcome in the Shipa PR.
To use, make docker-build and docker-push this branch, run shipa specifying this new docker image for ketch in the shipa.yaml, and create & deploy a ketch-provisioned App.
Type of change
Testing
Documentation
Final Checklist: