From 41a888d6b60c030b913280c2a1eeff8b25e8aada Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 7 Mar 2023 10:01:07 -0800 Subject: [PATCH] fix: set readiness once only (#465) ## This PR The current readiness state can be changed because of an underlying sync provider error. This seems to be incorrect as **not ready state** make flagd to not receive traffic through K8s services [1] For example, if HTTP endpoint-backed sync provider get an error, the current logic set readiness to `false`. Similarly, if grpc connectivity is lost for grpc provider, readiness goes to `false` state till connection establishment completes. This PR fix this behavior by setting the ready state **once only** per sync provider. This means if there is at least a single successful sync, sync provider sets its readiness to `true`. Going further, we might need to decide if we need better readiness state handling. [1] - https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes --------- Signed-off-by: Kavindu Dodanduwa Co-authored-by: Michael Beemer --- .../high_level_architecture.md | 20 +++++++++++++++- pkg/runtime/from_config.go | 1 - pkg/sync/file/filepath_sync_test.go | 20 ++++++++++------ pkg/sync/grpc/grpc_sync.go | 23 ++++++++----------- pkg/sync/grpc/grpc_sync_test.go | 3 --- pkg/sync/http/http_sync.go | 6 ++--- pkg/sync/http/http_sync_test.go | 8 ------- 7 files changed, 45 insertions(+), 36 deletions(-) diff --git a/docs/other_resources/high_level_architecture.md b/docs/other_resources/high_level_architecture.md index 15bbe4298..a6f2001b7 100644 --- a/docs/other_resources/high_level_architecture.md +++ b/docs/other_resources/high_level_architecture.md @@ -26,4 +26,22 @@ flag configurations watched by the respective implementation. For example, the f The update provided by sync implementation is pushed to the evaluator engine, which interprets the event and forwards it to the state store. Change notifications generated in the process gets pushed to event subscribers. - \ No newline at end of file + + +## Readiness & Liveness probes + +Flagd exposes HTTP liveness and readiness probes. These probes can be used for K8s deployments. With default +start-up configurations, these probes are exposed at the following URLs, + +- Liveness: http://localhost:8014/healthz +- Readiness: http://localhost:8014/readyz + +### Definition of Liveness + +The liveness probe becomes active and HTTP 200 status is served as soon as Flagd service is up and running. + +### Definition of Readiness + +The readiness probe becomes active similar to the liveness probe as soon as Flagd service is up and running. However, +the probe emits HTTP 412 until all sync providers are ready. This status changes to HTTP 200 when all sync providers at +least have one successful data sync. The status does not change from there on. \ No newline at end of file diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 6d320f543..0920601f0 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -108,7 +108,6 @@ func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync { zap.String("component", "sync"), zap.String("sync", "grpc"), ), - Mux: &msync.RWMutex{}, } } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 8a11a64c8..c927f092e 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -146,18 +146,19 @@ func TestSimpleSync(t *testing.T) { dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync)) + syncHandler := Sync{ + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, + } + go func() { - handler := Sync{ - URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), - Logger: logger.NewLogger(nil, false), - Mux: &msync.RWMutex{}, - } - err := handler.Init(ctx) + err := syncHandler.Init(ctx) if err != nil { log.Fatalf("Error init sync: %s", err.Error()) return } - err = handler.Sync(ctx, dataSyncChan) + err = syncHandler.Sync(ctx, dataSyncChan) if err != nil { log.Fatalf("Error start sync: %s", err.Error()) return @@ -187,6 +188,11 @@ func TestSimpleSync(t *testing.T) { case <-time.After(10 * time.Second): t.Errorf("event not found, timeout out after 10 seconds") } + + // validate readiness - readiness must not change + if syncHandler.ready != true { + t.Errorf("readiness must be set to true, but found: %t", syncHandler.ready) + } } }) } diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 892780c29..11a1667c3 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -30,6 +30,8 @@ const ( constantBackOffDelay = 60 ) +var once msync.Once + type Sync struct { Target string ProviderID string @@ -39,7 +41,6 @@ type Sync struct { client syncv1grpc.FlagSyncServiceClient options []grpc.DialOption ready bool - Mux *msync.RWMutex } func (g *Sync) connectClient(ctx context.Context) error { @@ -84,37 +85,28 @@ func (g *Sync) Init(ctx context.Context) error { } func (g *Sync) IsReady() bool { - g.Mux.RLock() - defer g.Mux.RUnlock() return g.ready } -func (g *Sync) setReady(val bool) { - g.Mux.Lock() - defer g.Mux.Unlock() - g.ready = val -} - func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening - g.setReady(true) err := g.handleFlagSync(g.syncClient, dataSync) if err == nil { return nil } + g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) + // retry connection establishment for { - g.setReady(false) syncClient, ok := g.connectWithRetry(ctx) if !ok { // We shall exit return nil } - g.setReady(true) + err = g.handleFlagSync(syncClient, dataSync) if err != nil { - g.setReady(false) g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) continue } @@ -168,6 +160,11 @@ func (g *Sync) connectWithRetry( // handleFlagSync wraps the stream listening and push updates through dataSync channel func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + // Set ready state once only + once.Do(func() { + g.ready = true + }) + for { data, err := stream.Recv() if err != nil { diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 0c7e84ba8..735031546 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -7,7 +7,6 @@ import ( "io" "log" "net" - msync "sync" "testing" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" @@ -147,7 +146,6 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { Target: "grpc://test", ProviderID: "", Logger: logger.NewLogger(nil, false), - Mux: &msync.RWMutex{}, } tests := []struct { @@ -335,7 +333,6 @@ func Test_StreamListener(t *testing.T) { Target: target, ProviderID: "", Logger: logger.NewLogger(nil, false), - Mux: &msync.RWMutex{}, } // initialize client diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 142445517..bd99226d9 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -62,6 +62,8 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { if err != nil { return err } + + // Set ready state hs.ready = true _ = hs.Cron.AddFunc("*/5 * * * *", func() { @@ -149,18 +151,16 @@ func (hs *Sync) generateSha(body []byte) string { func (hs *Sync) Fetch(ctx context.Context) (string, error) { if hs.URI == "" { - hs.ready = false return "", errors.New("no HTTP URL string set") } body, err := hs.fetchBodyFromURL(ctx, hs.URI) if err != nil { - hs.ready = false return "", err } if len(body) != 0 { hs.LastBodySHA = hs.generateSha(body) } - hs.ready = true + return string(body), nil } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index e3734f63a..3cb3b3f38 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -66,7 +66,6 @@ func TestHTTPSync_Fetch(t *testing.T) { bearerToken string lastBodySHA string handleResponse func(*testing.T, Sync, string, error) - ready bool }{ "success": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -84,7 +83,6 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Errorf("expected fetched to be: '%s', got: '%s'", expected, fetched) } }, - ready: true, }, "return an error if no uri": { setup: func(t *testing.T, client *syncmock.MockClient) {}, @@ -93,7 +91,6 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Error("expected err, got nil") } }, - ready: false, }, "update last body sha": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -115,7 +112,6 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, - ready: true, }, "authorization header": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -137,7 +133,6 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, - ready: true, }, } @@ -156,9 +151,6 @@ func TestHTTPSync_Fetch(t *testing.T) { } fetched, err := httpSync.Fetch(context.Background()) - if httpSync.IsReady() != tt.ready { - t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready) - } tt.handleResponse(t, httpSync, fetched, err) }) }