Skip to content

Commit

Permalink
Add additional unit tests to event Recorder (kubeflow#888)
Browse files Browse the repository at this point in the history
* add unit tests for recorder

* adding additional tests

* add ConditionReady and assume the empty == False

* move record events to controller

* resolve comments
  • Loading branch information
ifilonenko authored Jun 18, 2020
1 parent b24ac47 commit 32bd954
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 60 deletions.
18 changes: 8 additions & 10 deletions pkg/apis/serving/v1alpha2/inferenceservice_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const (

// PropagateDefaultStatus propagates the status for the default spec
func (ss *InferenceServiceStatus) PropagateDefaultStatus(component constants.InferenceServiceComponent,
defaultStatus *knservingv1.ServiceStatus) bool {
defaultStatus *knservingv1.ServiceStatus) {
if ss.Default == nil {
emptyStatusMap := make(map[constants.InferenceServiceComponent]StatusConfigurationSpec)
ss.Default = &emptyStatusMap
Expand All @@ -96,20 +96,20 @@ func (ss *InferenceServiceStatus) PropagateDefaultStatus(component constants.Inf
if defaultStatus == nil {
conditionSet.Manage(ss).ClearCondition(conditionType)
delete(*ss.Default, component)
return false
return
}

statusSpec, ok := (*ss.Default)[component]
if !ok {
statusSpec = StatusConfigurationSpec{}
(*ss.Default)[component] = statusSpec
}
return ss.propagateStatus(component, false, conditionType, defaultStatus)
ss.propagateStatus(component, false, conditionType, defaultStatus)
}

// PropagateCanaryStatus propagates the status for the canary spec
func (ss *InferenceServiceStatus) PropagateCanaryStatus(component constants.InferenceServiceComponent,
canaryStatus *knservingv1.ServiceStatus) bool {
canaryStatus *knservingv1.ServiceStatus) {
if ss.Canary == nil {
emptyStatusMap := make(map[constants.InferenceServiceComponent]StatusConfigurationSpec)
ss.Canary = &emptyStatusMap
Expand All @@ -119,32 +119,31 @@ func (ss *InferenceServiceStatus) PropagateCanaryStatus(component constants.Infe
if canaryStatus == nil {
conditionSet.Manage(ss).ClearCondition(conditionType)
delete(*ss.Canary, component)
return false
return
}

statusSpec, ok := (*ss.Canary)[component]
if !ok {
statusSpec = StatusConfigurationSpec{}
(*ss.Canary)[component] = statusSpec
}
return ss.propagateStatus(component, true, conditionType, canaryStatus)
ss.propagateStatus(component, true, conditionType, canaryStatus)
}

func (ss *InferenceServiceStatus) propagateStatus(component constants.InferenceServiceComponent, isCanary bool,
conditionType apis.ConditionType,
serviceStatus *knservingv1.ServiceStatus) bool {
serviceStatus *knservingv1.ServiceStatus) {
statusSpec := StatusConfigurationSpec{}
statusSpec.Name = serviceStatus.LatestCreatedRevisionName
serviceCondition := serviceStatus.GetCondition(knservingv1.ServiceConditionReady)
isReady := false

switch {
case serviceCondition == nil:
case serviceCondition.Status == v1.ConditionUnknown:
conditionSet.Manage(ss).MarkUnknown(conditionType, serviceCondition.Reason, serviceCondition.Message)
statusSpec.Hostname = ""
case serviceCondition.Status == v1.ConditionTrue:
conditionSet.Manage(ss).MarkTrue(conditionType)
isReady = true
if serviceStatus.URL != nil {
statusSpec.Hostname = serviceStatus.URL.Host
}
Expand All @@ -157,7 +156,6 @@ func (ss *InferenceServiceStatus) propagateStatus(component constants.InferenceS
} else {
(*ss.Default)[component] = statusSpec
}
return isReady
}

// PropagateRouteStatus propagates route's status to the service's status.
Expand Down
21 changes: 18 additions & 3 deletions pkg/controller/inferenceservice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ package service

import (
"context"
"fmt"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/apis"
"os"

"github.com/kubeflow/kfserving/pkg/controller/inferenceservice/reconcilers/istio"
Expand Down Expand Up @@ -163,7 +165,7 @@ func (r *ReconcileService) Reconcile(request reconcile.Request) (reconcile.Resul
}

reconcilers := []Reconciler{
knative.NewServiceReconciler(r.Client, r.scheme, r.Recorder, configMap),
knative.NewServiceReconciler(r.Client, r.scheme, configMap),
istio.NewVirtualServiceReconciler(r.Client, r.scheme, configMap),
}

Expand All @@ -174,20 +176,26 @@ func (r *ReconcileService) Reconcile(request reconcile.Request) (reconcile.Resul
return reconcile.Result{}, err
}
}

if err = r.updateStatus(isvc); err != nil {
r.Recorder.Eventf(isvc, v1.EventTypeWarning, "InternalError", err.Error())
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func InferenceServiceReadiness(status v1alpha2.InferenceServiceStatus) bool {
return status.Conditions != nil &&
status.GetCondition(apis.ConditionReady) != nil &&
status.GetCondition(apis.ConditionReady).Status == v1.ConditionTrue
}

func (r *ReconcileService) updateStatus(desiredService *kfserving.InferenceService) error {
existing := &kfserving.InferenceService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existing); err != nil {
return err
}
wasReady := InferenceServiceReadiness(existing.Status)
if equality.Semantic.DeepEqual(existing.Status, desiredService.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
Expand All @@ -200,8 +208,15 @@ func (r *ReconcileService) updateStatus(desiredService *kfserving.InferenceServi
return err
} else {
// If there was a difference and there was no error.
isReady := InferenceServiceReadiness(desiredService.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(v1alpha2.InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(v1alpha2.InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName()))
}
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, "Updated", "Updated InferenceService %q", desiredService.GetName())
}

return nil
}
151 changes: 127 additions & 24 deletions pkg/controller/inferenceservice/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package service

import (
"fmt"
"reflect"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -335,21 +337,16 @@ func TestInferenceServiceWithOnlyPredictor(t *testing.T) {
}
return &isvc.Status
}, timeout).Should(testutils.BeSematicEqual(&expectedKfsvcStatus))
// We are testing for a Ready event
expectedReadyEvents := []SimpleEvent{
{Count: 1, Type: v1.EventTypeNormal, Reason: string(kfserving.InferenceServiceReadyState)},
}
g.Eventually(func() error {
events := &v1.EventList{}
if err := c.List(context.TODO(), events); err != nil {
return fmt.Errorf("Test %q failed: returned error: %v", serviceName, err)
}
if len(events.Items) == 0 {
return fmt.Errorf("Test %q failed: no events were created", serviceName)
}
for _, event := range events.Items {
if event.Reason == string(kfserving.InferenceServiceReadyState) &&
event.Type == v1.EventTypeNormal {
return nil
}
events := getEvents()
if reflect.DeepEqual(events, expectedReadyEvents) {
return nil
}
return fmt.Errorf("Test %q failed: events [%v] did not contain ready", serviceName, events.Items)
return fmt.Errorf("test %q failed: [%v] did not equal [%v]", serviceName, events, expectedReadyEvents)
}, timeout).Should(gomega.Succeed())
// Testing that when service fails, that an event is thrown
failingService := &knservingv1.Service{}
Expand All @@ -366,24 +363,130 @@ func TestInferenceServiceWithOnlyPredictor(t *testing.T) {
}
g.Expect(c.Status().Update(context.TODO(), failingService)).NotTo(gomega.HaveOccurred())
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
g.Eventually(func() bool {
isvc := &kfserving.InferenceService{}
err := c.Get(context.TODO(), serviceKey, isvc)
if err != nil {
return false
}
if isvc.Status.GetCondition(apis.ConditionReady) == nil {
return true // Because ConditionReady might be removed, this is true
} else if isvc.Status.GetCondition(apis.ConditionReady).Status ==
v1.ConditionFalse {
return true
}
return false
}, timeout).Should(gomega.BeTrue())
// We are testing for a NonReady event
expectedNonReadyEvent := []SimpleEvent{
{Count: 1, Type: v1.EventTypeNormal, Reason: string(kfserving.InferenceServiceReadyState)},
{Count: 1, Type: v1.EventTypeWarning, Reason: string(kfserving.InferenceServiceNotReadyState)},
}
g.Eventually(func() error {
events := &v1.EventList{}
if err := c.List(context.TODO(), events); err != nil {
return fmt.Errorf("Test %q failed: returned error: %v", serviceName, err)
events := getEvents()
if reflect.DeepEqual(events, expectedNonReadyEvent) {
return nil
}
if len(events.Items) == 0 {
return fmt.Errorf("Test %q failed: no events were created", serviceName)
return fmt.Errorf("test %q failed: [%v] did not equal [%v]", serviceName, events, expectedNonReadyEvent)
}, timeout).Should(gomega.Succeed())
succedingService := &knservingv1.Service{}
g.Eventually(func() error { return c.Get(context.TODO(), predictorService, succedingService) }, timeout).
Should(gomega.Succeed())
succedingService.Status.LatestCreatedRevisionName = "revision-v3"
succedingService.Status.LatestReadyRevisionName = "revision-v3"
succedingService.Status.URL, _ = apis.ParseURL(
constants.InferenceServiceURL("http", constants.DefaultPredictorServiceName(serviceKey.Name), serviceKey.Namespace, domain))
succedingService.Status.Conditions = duckv1.Conditions{
{
Type: knservingv1.ServiceConditionReady,
Status: "True",
},
}
g.Expect(c.Status().Update(context.TODO(), succedingService)).NotTo(gomega.HaveOccurred())
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
g.Eventually(func() bool {
isvc := &kfserving.InferenceService{}
err := c.Get(context.TODO(), serviceKey, isvc)
if err != nil || isvc.Status.GetCondition(apis.ConditionReady) == nil {
return false
}
for _, event := range events.Items {
if event.Reason == string(kfserving.InferenceServiceNotReadyState) &&
event.Type == v1.EventTypeWarning {
return nil
}
if isvc.Status.GetCondition(apis.ConditionReady).Status ==
v1.ConditionTrue {
return true
}
return fmt.Errorf("Test %q failed: events [%v] did not contain warning", serviceName, events.Items)
return false
}, timeout).Should(gomega.BeTrue())
// We are testing for another Ready event
expectedTwoReadyEvents := []SimpleEvent{
{Count: 1, Type: v1.EventTypeWarning, Reason: string(kfserving.InferenceServiceNotReadyState)},
{Count: 2, Type: v1.EventTypeNormal, Reason: string(kfserving.InferenceServiceReadyState)},
}
g.Eventually(func() error {
events := getEvents()
if reflect.DeepEqual(events, expectedTwoReadyEvents) {
return nil
}
return fmt.Errorf("test %q failed: [%v] did not equal [%v]", serviceName, events, expectedTwoReadyEvents)
}, timeout).Should(gomega.Succeed())
}

type SimpleEvent struct {
Reason string
Count int32
Type string
}

type SimpleEventWithTime struct {
event SimpleEvent
LastTimestamp metav1.Time
}

type timeSlice []SimpleEventWithTime

func (p timeSlice) Len() int {
return len(p)
}

func (p timeSlice) Less(i, j int) bool {
return p[i].LastTimestamp.Before(&p[j].LastTimestamp)
}

func (p timeSlice) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

func getEvents() []SimpleEvent {
events := &v1.EventList{}
if err := c.List(context.TODO(), events); err != nil {
return nil
}
numEvents := len(events.Items)
if numEvents == 0 {
return nil
}
sortedEvents := make(timeSlice, 0, numEvents)
for _, event := range events.Items {
if event.Reason != "Updated" { // Not checking for updates
sortedEvents = append(sortedEvents, SimpleEventWithTime{
event: SimpleEvent{
Reason: event.Reason,
Count: event.Count,
Type: event.Type,
},
LastTimestamp: event.LastTimestamp,
})
}
}
sort.Slice(sortedEvents, func(i, j int) bool {
return sortedEvents[i].LastTimestamp.Before(&sortedEvents[j].LastTimestamp)
})
simpleEvents := make([]SimpleEvent, 0, len(sortedEvents))
for _, sEvent := range sortedEvents {
simpleEvents = append(simpleEvents, sEvent.event)
}
return simpleEvents
}

func TestInferenceServiceWithDefaultAndCanaryPredictor(t *testing.T) {
var expectedCanaryRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "bar", Namespace: "default"}}
var canaryServiceKey = expectedCanaryRequest.NamespacedName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package knative
import (
"context"
"fmt"
"k8s.io/client-go/tools/record"

"github.com/kubeflow/kfserving/pkg/apis/serving/v1alpha2"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/kubeflow/kfserving/pkg/controller/inferenceservice/resources/knative"
Expand All @@ -45,15 +43,13 @@ type ServiceReconciler struct {
client client.Client
scheme *runtime.Scheme
serviceBuilder *knative.ServiceBuilder
recorder record.EventRecorder
}

func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, config *v1.ConfigMap) *ServiceReconciler {
func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, config *v1.ConfigMap) *ServiceReconciler {
return &ServiceReconciler{
client: client,
scheme: scheme,
serviceBuilder: knative.NewServiceBuilder(client, config),
recorder: recorder,
}
}

Expand All @@ -73,9 +69,6 @@ func (r *ServiceReconciler) reconcileComponent(isvc *v1alpha2.InferenceService,
endpointSpec := &isvc.Spec.Default
serviceName := constants.DefaultServiceName(isvc.Name, component)
propagateStatusFn := isvc.Status.PropagateDefaultStatus
wasReady := isvc.Status.Conditions != nil &&
isvc.Status.GetCondition(knservingv1.ServiceConditionReady) != nil &&
isvc.Status.GetCondition(knservingv1.ServiceConditionReady).Status == v1.ConditionTrue
if isCanary {
endpointSpec = isvc.Spec.Canary
serviceName = constants.CanaryServiceName(isvc.Name, component)
Expand All @@ -93,29 +86,18 @@ func (r *ServiceReconciler) reconcileComponent(isvc *v1alpha2.InferenceService,
if err = r.finalizeService(serviceName, isvc.Namespace); err != nil {
return err
}
// If it was ready and is no longer ready
r.RecordServeEvent(isvc, wasReady, propagateStatusFn(component, nil))
propagateStatusFn(component, nil)
return nil
} else {
if status, err := r.reconcileService(isvc, service); err != nil {
return err
} else {
r.RecordServeEvent(isvc, wasReady, propagateStatusFn(component, status))
propagateStatusFn(component, status)
return nil
}
}
}

func (r *ServiceReconciler) RecordServeEvent(isvc *v1alpha2.InferenceService, old bool, new bool) {
if old && !new {
r.recorder.Event(isvc, v1.EventTypeWarning, string(v1alpha2.InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready ", isvc.Name))
} else if !old && new {
r.recorder.Event(isvc, v1.EventTypeNormal, string(v1alpha2.InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready ", isvc.Name))
}
}

func (r *ServiceReconciler) finalizeService(serviceName, namespace string) error {
existing := &knservingv1.Service{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: serviceName, Namespace: namespace}, existing); err != nil {
Expand Down
Loading

0 comments on commit 32bd954

Please sign in to comment.