diff --git a/docs/other_resources/high_level_architecture.md b/docs/other_resources/high_level_architecture.md
index 15bbe4298..a6f2001b7 100644
--- a/docs/other_resources/high_level_architecture.md
+++ b/docs/other_resources/high_level_architecture.md
@@ -26,4 +26,22 @@ flag configurations watched by the respective implementation. For example, the f
The update provided by sync implementation is pushed to the evaluator engine, which interprets the event and forwards it to the state store. Change notifications generated in the
process gets pushed to event subscribers.
-
\ No newline at end of file
+
+
+## Readiness & Liveness probes
+
+Flagd exposes HTTP liveness and readiness probes. These probes can be used for K8s deployments. With default
+start-up configurations, these probes are exposed at the following URLs,
+
+- Liveness: http://localhost:8014/healthz
+- Readiness: http://localhost:8014/readyz
+
+### Definition of Liveness
+
+The liveness probe becomes active and HTTP 200 status is served as soon as Flagd service is up and running.
+
+### Definition of Readiness
+
+The readiness probe becomes active similar to the liveness probe as soon as Flagd service is up and running. However,
+the probe emits HTTP 412 until all sync providers are ready. This status changes to HTTP 200 when all sync providers at
+least have one successful data sync. The status does not change from there on.
\ No newline at end of file
diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go
index 6d320f543..0920601f0 100644
--- a/pkg/runtime/from_config.go
+++ b/pkg/runtime/from_config.go
@@ -108,7 +108,6 @@ func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
- Mux: &msync.RWMutex{},
}
}
diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go
index 8a11a64c8..c927f092e 100644
--- a/pkg/sync/file/filepath_sync_test.go
+++ b/pkg/sync/file/filepath_sync_test.go
@@ -146,18 +146,19 @@ func TestSimpleSync(t *testing.T) {
dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync))
+ syncHandler := Sync{
+ URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName),
+ Logger: logger.NewLogger(nil, false),
+ Mux: &msync.RWMutex{},
+ }
+
go func() {
- handler := Sync{
- URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName),
- Logger: logger.NewLogger(nil, false),
- Mux: &msync.RWMutex{},
- }
- err := handler.Init(ctx)
+ err := syncHandler.Init(ctx)
if err != nil {
log.Fatalf("Error init sync: %s", err.Error())
return
}
- err = handler.Sync(ctx, dataSyncChan)
+ err = syncHandler.Sync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
@@ -187,6 +188,11 @@ func TestSimpleSync(t *testing.T) {
case <-time.After(10 * time.Second):
t.Errorf("event not found, timeout out after 10 seconds")
}
+
+ // validate readiness - readiness must not change
+ if syncHandler.ready != true {
+ t.Errorf("readiness must be set to true, but found: %t", syncHandler.ready)
+ }
}
})
}
diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go
index 892780c29..11a1667c3 100644
--- a/pkg/sync/grpc/grpc_sync.go
+++ b/pkg/sync/grpc/grpc_sync.go
@@ -30,6 +30,8 @@ const (
constantBackOffDelay = 60
)
+var once msync.Once
+
type Sync struct {
Target string
ProviderID string
@@ -39,7 +41,6 @@ type Sync struct {
client syncv1grpc.FlagSyncServiceClient
options []grpc.DialOption
ready bool
- Mux *msync.RWMutex
}
func (g *Sync) connectClient(ctx context.Context) error {
@@ -84,37 +85,28 @@ func (g *Sync) Init(ctx context.Context) error {
}
func (g *Sync) IsReady() bool {
- g.Mux.RLock()
- defer g.Mux.RUnlock()
return g.ready
}
-func (g *Sync) setReady(val bool) {
- g.Mux.Lock()
- defer g.Mux.Unlock()
- g.ready = val
-}
-
func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// initial stream listening
- g.setReady(true)
err := g.handleFlagSync(g.syncClient, dataSync)
if err == nil {
return nil
}
+
g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))
+
// retry connection establishment
for {
- g.setReady(false)
syncClient, ok := g.connectWithRetry(ctx)
if !ok {
// We shall exit
return nil
}
- g.setReady(true)
+
err = g.handleFlagSync(syncClient, dataSync)
if err != nil {
- g.setReady(false)
g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))
continue
}
@@ -168,6 +160,11 @@ func (g *Sync) connectWithRetry(
// handleFlagSync wraps the stream listening and push updates through dataSync channel
func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
+ // Set ready state once only
+ once.Do(func() {
+ g.ready = true
+ })
+
for {
data, err := stream.Recv()
if err != nil {
diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go
index 0c7e84ba8..735031546 100644
--- a/pkg/sync/grpc/grpc_sync_test.go
+++ b/pkg/sync/grpc/grpc_sync_test.go
@@ -7,7 +7,6 @@ import (
"io"
"log"
"net"
- msync "sync"
"testing"
"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
@@ -147,7 +146,6 @@ func TestSync_BasicFlagSyncStates(t *testing.T) {
Target: "grpc://test",
ProviderID: "",
Logger: logger.NewLogger(nil, false),
- Mux: &msync.RWMutex{},
}
tests := []struct {
@@ -335,7 +333,6 @@ func Test_StreamListener(t *testing.T) {
Target: target,
ProviderID: "",
Logger: logger.NewLogger(nil, false),
- Mux: &msync.RWMutex{},
}
// initialize client
diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go
index 142445517..bd99226d9 100644
--- a/pkg/sync/http/http_sync.go
+++ b/pkg/sync/http/http_sync.go
@@ -62,6 +62,8 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if err != nil {
return err
}
+
+ // Set ready state
hs.ready = true
_ = hs.Cron.AddFunc("*/5 * * * *", func() {
@@ -149,18 +151,16 @@ func (hs *Sync) generateSha(body []byte) string {
func (hs *Sync) Fetch(ctx context.Context) (string, error) {
if hs.URI == "" {
- hs.ready = false
return "", errors.New("no HTTP URL string set")
}
body, err := hs.fetchBodyFromURL(ctx, hs.URI)
if err != nil {
- hs.ready = false
return "", err
}
if len(body) != 0 {
hs.LastBodySHA = hs.generateSha(body)
}
- hs.ready = true
+
return string(body), nil
}
diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go
index e3734f63a..3cb3b3f38 100644
--- a/pkg/sync/http/http_sync_test.go
+++ b/pkg/sync/http/http_sync_test.go
@@ -66,7 +66,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
bearerToken string
lastBodySHA string
handleResponse func(*testing.T, Sync, string, error)
- ready bool
}{
"success": {
setup: func(t *testing.T, client *syncmock.MockClient) {
@@ -84,7 +83,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
t.Errorf("expected fetched to be: '%s', got: '%s'", expected, fetched)
}
},
- ready: true,
},
"return an error if no uri": {
setup: func(t *testing.T, client *syncmock.MockClient) {},
@@ -93,7 +91,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
t.Error("expected err, got nil")
}
},
- ready: false,
},
"update last body sha": {
setup: func(t *testing.T, client *syncmock.MockClient) {
@@ -115,7 +112,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
)
}
},
- ready: true,
},
"authorization header": {
setup: func(t *testing.T, client *syncmock.MockClient) {
@@ -137,7 +133,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
)
}
},
- ready: true,
},
}
@@ -156,9 +151,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
}
fetched, err := httpSync.Fetch(context.Background())
- if httpSync.IsReady() != tt.ready {
- t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready)
- }
tt.handleResponse(t, httpSync, fetched, err)
})
}