Skip to content

Commit

Permalink
feat: Introduce flagd kube proxy (#495)
Browse files Browse the repository at this point in the history
<!-- Please use this template for your pull request. -->
<!-- Please use the sections that you need and delete other sections -->

## This PR
<!-- add the description of the PR here -->

- introduced flagd Kubernetes proxy as new binary 

requires testing and documentation

### Related Issues
<!-- add here the GitHub issue that this PR resolves if applicable -->


### Notes
<!-- any additional notes for this PR -->

### Follow-up Tasks
<!-- anything that is related to this PR but not done here should be
noted under this section -->
<!-- if there is a need for a new issue, please link it here -->
create issues for certificate handling
#524

### How to test
<!-- if applicable, add testing instructions under this section -->

Resolves: #500

---------

Signed-off-by: James Milligan <james@omnant.co.uk>
Signed-off-by: James Milligan <75740990+james-milligan@users.noreply.github.com>
Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
Co-authored-by: Kavindu Dodanduwa <Kavindu-Dodan@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 20, 2023
1 parent aa74951 commit 440864c
Show file tree
Hide file tree
Showing 28 changed files with 1,364 additions and 1,267 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-please.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:

env:
# Stringified list of items that should be published
PUBLISHABLE_ITEMS: '["flagd"]'
PUBLISHABLE_ITEMS: '["flagd","kube-flagd-proxy"]'
REGISTRY: ghcr.io
REPO_OWNER: ${{ github.repository_owner }}
DEFAULT_GO_VERSION: 1.19
Expand Down
18 changes: 18 additions & 0 deletions config/deployments/kube-flagd-proxy/crb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"rbac.authorization.k8s.io/v1","kind":"ClusterRoleBinding","metadata":{"annotations":{},"name":"open-feature-operator-flagd-kubernetes-sync"},"roleRef":{"apiGroup":"","kind":"ClusterRole","name":"open-feature-operator-flagd-kubernetes-sync"},"subjects":[{"apiGroup":"","kind":"ServiceAccount","name":"open-feature-operator-controller-manager","namespace":"system"}]}
name: open-feature-operator-flagd-kubernetes-sync
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: open-feature-operator-flagd-kubernetes-sync
subjects:
- kind: ServiceAccount
name: open-feature-operator-controller-manager
namespace: system
- kind: ServiceAccount
name: default
namespace: kube-proxy
31 changes: 31 additions & 0 deletions config/deployments/kube-flagd-proxy/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
namespace: kube-proxy
labels:
app: kube-proxy
name: kube-proxy
annotations:
openfeature.dev/allowkubernetessync: "true"
spec:
replicas: 1
selector:
matchLabels:
app: kube-proxy
template:
metadata:
creationTimestamp: null
labels:
app.kubernetes.io/name: kube-proxy
app: kube-proxy
annotations:
openfeature.dev/allowkubernetessync: "true"
spec:
containers:
- image: ghcr.io/open-feature/kube-flagd-proxy:v0.1.1 # x-release-please-version
name: kube-flagd-proxy
ports:
- containerPort: 8015
args:
- start
11 changes: 11 additions & 0 deletions config/deployments/kube-flagd-proxy/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Service
metadata:
name: kube-proxy-svc
namespace: kube-proxy
spec:
selector:
app.kubernetes.io/name: kube-proxy
ports:
- port: 8015
targetPort: 8015
6 changes: 0 additions & 6 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1286,18 +1286,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.26.2 h1:dM3cinp3PGB6asOySalOZxEG4CZ0IAdJsrYZXE/ovGQ=
k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU=
k8s.io/api v0.26.3 h1:emf74GIQMTik01Aum9dPP0gAypL8JTLl/lHa4V9RFSU=
k8s.io/api v0.26.3/go.mod h1:PXsqwPMXBSBcL1lJ9CYDKy7kIReUydukS5JiRlxC3qE=
k8s.io/apiextensions-apiserver v0.26.1 h1:cB8h1SRk6e/+i3NOrQgSFij1B2S0Y0wDoNl66bn8RMI=
k8s.io/apiextensions-apiserver v0.26.1/go.mod h1:AptjOSXDGuE0JICx/Em15PaoO7buLwTs0dGleIHixSM=
k8s.io/apimachinery v0.26.2 h1:da1u3D5wfR5u2RpLhE/ZtZS2P7QvDgLZTi9wrNZl/tQ=
k8s.io/apimachinery v0.26.2/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I=
k8s.io/apimachinery v0.26.3 h1:dQx6PNETJ7nODU3XPtrwkfuubs6w7sX0M8n61zHIV/k=
k8s.io/apimachinery v0.26.3/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I=
k8s.io/client-go v0.26.2 h1:s1WkVujHX3kTp4Zn4yGNFK+dlDXy1bAAkIl+cFAiuYI=
k8s.io/client-go v0.26.2/go.mod h1:u5EjOuSyBa09yqqyY7m3abZeovO/7D/WehVVlZ2qcqU=
k8s.io/client-go v0.26.3 h1:k1UY+KXfkxV2ScEL3gilKcF7761xkYsSD6BC9szIu8s=
k8s.io/client-go v0.26.3/go.mod h1:ZPNu9lm8/dbRIPAgteN30RSXea6vrCpFvq+MateTUuQ=
k8s.io/component-base v0.26.1 h1:4ahudpeQXHZL5kko+iDHqLj/FSGAEUnSVO0EBbgDd+4=
Expand Down
4 changes: 1 addition & 3 deletions core/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/open-feature/flagd/core/pkg/eval"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
service "github.com/open-feature/flagd/core/pkg/service/flag-evaluation"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/sync/file"
Expand Down Expand Up @@ -67,8 +67,6 @@ func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
func (r *Runtime) setService(logger *logger.Logger) {
r.Service = &service.ConnectService{
ConnectServiceConfiguration: &service.ConnectServiceConfiguration{
Port: r.config.ServicePort,
MetricsPort: r.config.MetricsPort,
ServerKeyPath: r.config.ServiceKeyPath,
ServerCertPath: r.config.ServiceCertPath,
ServerSocketPath: r.config.ServiceSocketPath,
Expand Down
9 changes: 6 additions & 3 deletions core/pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

type Runtime struct {
config Config
Service service.IService
Service service.IFlagEvaluationService
SyncImpl []sync.ISync

mu msync.Mutex
Expand All @@ -27,8 +27,8 @@ type Runtime struct {
}

type Config struct {
ServicePort int32
MetricsPort int32
ServicePort uint16
MetricsPort uint16
ServiceSocketPath string
ServiceCertPath string
ServiceKeyPath string
Expand All @@ -37,6 +37,7 @@ type Config struct {
CORS []string
}

// nolint: funlen
func (r *Runtime) Start() error {
if r.Service == nil {
return errors.New("no service set")
Expand Down Expand Up @@ -90,6 +91,8 @@ func (r *Runtime) Start() error {
g.Go(func() error {
return r.Service.Serve(gCtx, r.Evaluator, service.Configuration{
ReadinessProbe: r.isReady,
Port: r.config.ServicePort,
MetricsPort: r.config.MetricsPort,
})
})
<-gCtx.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/open-feature/flagd/core/pkg/eval"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/open-feature/flagd/core/pkg/service/metrics"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"github.com/rs/xid"
Expand All @@ -36,8 +38,6 @@ type ConnectService struct {
server http.Server
}
type ConnectServiceConfiguration struct {
Port int32
MetricsPort int32
ServerCertPath string
ServerKeyPath string
ServerSocketPath string
Expand All @@ -46,13 +46,13 @@ type ConnectServiceConfiguration struct {

type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan Notification
subs map[interface{}]chan iservice.Notification
}

func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error {
func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf iservice.Configuration) error {
s.Eval = eval
s.eventingConfiguration = &eventingConfiguration{
subs: make(map[interface{}]chan Notification),
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
}
lis, err := s.setupServer(svcConf)
Expand Down Expand Up @@ -88,14 +88,14 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon
}
}

func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error) {
func (s *ConnectService) setupServer(svcConf iservice.Configuration) (net.Listener, error) {
var lis net.Listener
var err error
mux := http.NewServeMux()
if s.ConnectServiceConfiguration.ServerSocketPath != "" {
lis, err = net.Listen("unix", s.ConnectServiceConfiguration.ServerSocketPath)
} else {
address := fmt.Sprintf(":%d", s.ConnectServiceConfiguration.Port)
address := fmt.Sprintf(":%d", svcConf.Port)
lis, err = net.Listen("tcp", address)
}
if err != nil {
Expand All @@ -108,12 +108,12 @@ func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error
return nil, err
}

mdlw := New(middlewareConfig{
mdlw := metrics.New(metrics.MiddlewareConfig{
Service: "openfeature/flagd",
MetricReader: exporter,
Logger: s.Logger,
})
h := Handler("", mdlw, mux)
h := metrics.Handler("", mdlw, mux)

go bindMetrics(s, svcConf)

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *ConnectService) EventStream(
req *connect.Request[schemaV1.EventStreamRequest],
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan Notification, 1)
requestNotificationChan := make(chan iservice.Notification, 1)
s.eventingConfiguration.mu.Lock()
s.eventingConfiguration.subs[req] = requestNotificationChan
s.eventingConfiguration.mu.Unlock()
Expand All @@ -201,14 +201,14 @@ func (s *ConnectService) EventStream(
delete(s.eventingConfiguration.subs, req)
s.eventingConfiguration.mu.Unlock()
}()
requestNotificationChan <- Notification{
Type: ProviderReady,
requestNotificationChan <- iservice.Notification{
Type: iservice.ProviderReady,
}
for {
select {
case <-time.After(20 * time.Second):
err := stream.Send(&schemaV1.EventStreamResponse{
Type: string(KeepAlive),
Type: string(iservice.KeepAlive),
})
if err != nil {
s.Logger.Error(err.Error())
Expand All @@ -231,7 +231,7 @@ func (s *ConnectService) EventStream(
}
}

func (s *ConnectService) Notify(n Notification) {
func (s *ConnectService) Notify(n iservice.Notification) {
s.eventingConfiguration.mu.RLock()
defer s.eventingConfiguration.mu.RUnlock()
for _, send := range s.eventingConfiguration.subs {
Expand Down Expand Up @@ -367,10 +367,10 @@ func (s *ConnectService) newCORS() *cors.Cors {
})
}

func bindMetrics(s *ConnectService, svcConf Configuration) {
s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort))
func bindMetrics(s *ConnectService, svcConf iservice.Configuration) {
s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort))
server := &http.Server{
Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort),
Addr: fmt.Sprintf(":%d", svcConf.MetricsPort),
ReadHeaderTimeout: 3 * time.Second,
}
server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
mock "github.com/open-feature/flagd/core/pkg/eval/mock"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/model"
service "github.com/open-feature/flagd/core/pkg/service"
iservice "github.com/open-feature/flagd/core/pkg/service"
service "github.com/open-feature/flagd/core/pkg/service/flag-evaluation"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
},
Logger: logger.NewLogger(nil, false),
}
serveConf := service.Configuration{
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
return true
},
Expand Down
File renamed without changes.
15 changes: 13 additions & 2 deletions core/pkg/service/iservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,23 @@ type ReadinessProbe func() bool

type Configuration struct {
ReadinessProbe ReadinessProbe
Port uint16
MetricsPort uint16
}

/*
IService implementations define handlers for a particular transport, which call the IEvaluator implementation.
IFlagEvaluationService implementations define handlers for a particular transport,
which call the IEvaluator implementation.
*/
type IService interface {
type IFlagEvaluationService interface {
Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error
Notify(n Notification)
}

/*
IFlagEvaluationService implementations define handlers for a particular transport,
which call the IEvaluator implementation.
*/
type IKubeSyncService interface {
Serve(ctx context.Context, svcConf Configuration) error
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package metrics

import (
"bufio"
Expand Down Expand Up @@ -91,7 +91,7 @@ func (r OTelMetricsRecorder) OTelInFlightRequestEnd(p HTTPReqProperties) {
r.httpRequestsInflight.Add(context.TODO(), -1, r.setAttributes(p)...)
}

type middlewareConfig struct {
type MiddlewareConfig struct {
recorder Recorder
MetricReader metric.Reader
Logger *logger.Logger
Expand All @@ -101,16 +101,16 @@ type middlewareConfig struct {
}

type Middleware struct {
cfg middlewareConfig
cfg MiddlewareConfig
}

func New(cfg middlewareConfig) Middleware {
func New(cfg MiddlewareConfig) Middleware {
cfg.defaults()
m := Middleware{cfg: cfg}
return m
}

func (cfg *middlewareConfig) defaults() {
func (cfg *MiddlewareConfig) defaults() {
if cfg.Logger == nil {
log.Fatal("missing logger")
}
Expand All @@ -120,7 +120,7 @@ func (cfg *middlewareConfig) defaults() {
cfg.recorder = cfg.newOTelRecorder(cfg.MetricReader)
}

func (cfg *middlewareConfig) getDurationView(name string, bucket []float64) metric.View {
func (cfg *MiddlewareConfig) getDurationView(name string, bucket []float64) metric.View {
return metric.NewView(
metric.Instrument{
// we change aggregation only for instruments with this name and scope
Expand All @@ -135,7 +135,7 @@ func (cfg *middlewareConfig) getDurationView(name string, bucket []float64) metr
)
}

func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetricsRecorder {
func (cfg *MiddlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetricsRecorder {
const requestDurationName = "http_request_duration_seconds"
const responseSizeName = "http_response_size_bytes"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package metrics

import (
"context"
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestMiddleware(t *testing.T) {
const svcName = "mySvc"
exp := metric.NewManualReader()
l, _ := logger.NewZapLogger(zapcore.DebugLevel, "")
m := New(middlewareConfig{
m := New(MiddlewareConfig{
MetricReader: exp,
Service: svcName,
Logger: logger.NewLogger(l, true),
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestNew_AutowireOTel(t *testing.T) {
l, _ := logger.NewZapLogger(zapcore.DebugLevel, "")
log := logger.NewLogger(l, true)
exp := metric.NewManualReader()
mdw := New(middlewareConfig{
mdw := New(MiddlewareConfig{
MetricReader: exp,
Logger: log,
Service: "mySvc",
Expand Down
Loading

0 comments on commit 440864c

Please sign in to comment.