Skip to content

Commit

Permalink
pool-coordinator implementation of yurthub (openyurtio#1073)
Browse files Browse the repository at this point in the history
* pool-coordinator implementation of yurthub

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>
  • Loading branch information
Congrool committed Jan 17, 2023
1 parent 16a399e commit 3e69468
Show file tree
Hide file tree
Showing 33 changed files with 3,813 additions and 338 deletions.
22 changes: 20 additions & 2 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strings"
"time"

componentbaseconfig "k8s.io/component-base/config"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -40,6 +38,7 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
Expand Down Expand Up @@ -91,6 +90,17 @@ type YurtHubConfiguration struct {
YurtHubProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubDummyProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubSecureProxyServerServing *apiserver.SecureServingInfo
YurtHubProxyServerAddr string
DiskCachePath string
CoordinatorPKIDir string
EnableCoordinator bool
CoordinatorServerURL *url.URL
CoordinatorStoragePrefix string
CoordinatorStorageAddr string // ip:port
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

Expand Down Expand Up @@ -148,6 +158,14 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
FilterManager: filterManager,
MinRequestTimeout: options.MinRequestTimeout,
TenantNs: tenantNs,
YurtHubProxyServerAddr: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort),
DiskCachePath: options.DiskCachePath,
CoordinatorPKIDir: filepath.Join(options.RootDir, "poolcoordinator"),
EnableCoordinator: options.EnableCoordinator,
CoordinatorServerURL: coordinatorServerURL,
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
}

certMgr, err := createCertManager(options, us)
Expand Down
83 changes: 47 additions & 36 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,47 @@ const (

// YurtHubOptions is the main settings for the yurthub
type YurtHubOptions struct {
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort int
YurtHubProxyPort int
YurtHubProxySecurePort int
GCFrequency int
YurtHubCertOrganizations []string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
CACertHashes []string
UnsafeSkipCAVerification bool
ClientForTest kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort int
YurtHubProxyPort int
YurtHubProxySecurePort int
GCFrequency int
YurtHubCertOrganizations []string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
CACertHashes []string
UnsafeSkipCAVerification bool
ClientForTest kubernetes.Interface
CoordinatorStoragePrefix string
CoordinatorStorageAddr string
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -113,6 +118,7 @@ func NewYurtHubOptions() *YurtHubOptions {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
Expand Down Expand Up @@ -195,6 +201,11 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringSliceVar(&o.CACertHashes, "discovery-token-ca-cert-hash", o.CACertHashes, "For token-based discovery, validate that the root CA public key matches this hash (format: \"<type>:<value>\").")
fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Pool-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Pool-Coordinator etcd, in the format ip:port")
fs.StringVar(&o.CoordinatorStorageCaFile, "coordinator-storage-ca", o.CoordinatorStorageCaFile, "CA file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageCertFile, "coordinator-storage-cert", o.CoordinatorStorageCertFile, "Cert file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageKeyFile, "coordinator-storage-key", o.CoordinatorStorageKeyFile, "Key file path to communicate with Pool-Coordinator etcd")
bindFlags(&o.LeaderElection, fs)
}

Expand Down
87 changes: 58 additions & 29 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package app

import (
"context"
"fmt"
"net/url"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
Expand All @@ -42,7 +45,7 @@ import (
)

// NewCmdStartYurtHub creates a *cobra.Command object with default parameters
func NewCmdStartYurtHub(stopCh <-chan struct{}) *cobra.Command {
func NewCmdStartYurtHub(ctx context.Context) *cobra.Command {
yurtHubOptions := options.NewYurtHubOptions()

cmd := &cobra.Command{
Expand All @@ -69,7 +72,7 @@ func NewCmdStartYurtHub(stopCh <-chan struct{}) *cobra.Command {
}
klog.Infof("%s cfg: %#+v", projectinfo.GetHubName(), yurtHubCfg)

if err := Run(yurtHubCfg, stopCh); err != nil {
if err := Run(ctx, yurtHubCfg); err != nil {
klog.Fatalf("run %s failed, %v", projectinfo.GetHubName(), err)
}
},
Expand All @@ -80,40 +83,47 @@ func NewCmdStartYurtHub(stopCh <-chan struct{}) *cobra.Command {
}

// Run runs the YurtHubConfiguration. This should never exit
func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
defer cfg.CertManager.Stop()
trace := 1
klog.Infof("%d. new transport manager", trace)
transportManager, err := transport.NewTransportManager(cfg.CertManager, stopCh)
transportManager, err := transport.NewTransportManager(cfg.CertManager, ctx.Done())
if err != nil {
return fmt.Errorf("could not new transport manager, %w", err)
}
trace++

klog.Infof("%d. prepare for health checker clients", trace)
healthCheckerClientsForCloud, _, err := createHealthCheckerClient(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager)
cloudClients, coordinatorClient, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager)
if err != nil {
return fmt.Errorf("failed to create health checker clients, %w", err)
}
trace++

var healthChecker healthchecker.MultipleBackendsHealthChecker
var cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
var coordinatorHealthChecker healthchecker.HealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checker for remote servers ", trace)
healthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, healthCheckerClientsForCloud, stopCh)
klog.Infof("%d. create health checkers for remote servers and pool coordinator", trace)
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, cloudClients, ctx.Done())
if err != nil {
return fmt.Errorf("could not new health checker, %w", err)
return fmt.Errorf("could not new cloud health checker, %w", err)
}
coordinatorHealthChecker, err = healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudHealthChecker, ctx.Done())
if err != nil {
return fmt.Errorf("failed to create coordinator health checker, %v", err)
}

} else {
klog.Infof("%d. disable health checker for node %s because it is a cloud node", trace, cfg.NodeName)
// In cloud mode, health checker is not needed.
// This fake checker will always report that the remote server is healthy.
healthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
// In cloud mode, cloud health checker and pool coordinator health checker are not needed.
// This fake checker will always report that the cloud is healthy and pool coordinator is unhealthy.
cloudHealthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
coordinatorHealthChecker = healthchecker.NewFakeChecker(false, make(map[string]int))
}
trace++

klog.Infof("%d. new restConfig manager", trace)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, healthChecker)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, cloudHealthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
Expand All @@ -130,7 +140,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {

if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, ctx.Done())
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
}
Expand All @@ -141,36 +151,55 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new tenant sa manager", trace)
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, stopCh)
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done())
trace++

klog.Infof("%d. create yurthub elector", trace)
elector, err := poolcoordinator.NewHubElector(cfg, coordinatorClient, coordinatorHealthChecker, cloudHealthChecker, ctx.Done())
if err != nil {
klog.Errorf("failed to create hub elector, %v", err)
}
elector.Run(ctx.Done())
trace++

// TODO: cloud client load balance
klog.Infof("%d. create coordinator", trace)
coordinator, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, transportManager, elector)
if err != nil {
klog.Errorf("failed to create coordinator, %v", err)
}
coordinator.Run()
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, coordinator, cloudHealthChecker, coordinatorHealthChecker, tenantMgr, ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
}
trace++

if cfg.NetworkMgr != nil {
cfg.NetworkMgr.Run(stopCh)
cfg.NetworkMgr.Run(ctx.Done())
}

// start shared informers before start hub server
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)
cfg.SharedFactory.Start(ctx.Done())
cfg.YurtSharedFactory.Start(ctx.Done())

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, stopCh); err != nil {
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, ctx.Done()); err != nil {
return fmt.Errorf("could not run hub servers, %w", err)
}
<-stopCh
<-ctx.Done()
klog.Infof("hub agent exited")
return nil
}

func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) {
var healthCheckerClientForCoordinator kubernetes.Interface
healthCheckerClientsForCloud := make(map[string]kubernetes.Interface)
// createClients will create clients for all cloud APIServer and client for pool coordinator
// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) {
var coordinatorClient kubernetes.Interface
cloudClients := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Host: remoteServers[i].String(),
Expand All @@ -179,9 +208,9 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url
}
c, err := kubernetes.NewForConfig(restConf)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
return cloudClients, coordinatorClient, err
}
healthCheckerClientsForCloud[remoteServers[i].String()] = c
cloudClients[remoteServers[i].String()] = c
}

cfg := &rest.Config{
Expand All @@ -191,9 +220,9 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url
}
c, err := kubernetes.NewForConfig(cfg)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
return cloudClients, coordinatorClient, err
}
healthCheckerClientForCoordinator = c
coordinatorClient = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
return cloudClients, coordinatorClient, nil
}
2 changes: 1 addition & 1 deletion cmd/yurthub/yurthub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func main() {
rand.Seed(time.Now().UnixNano())
cmd := app.NewCmdStartYurtHub(server.SetupSignalHandler())
cmd := app.NewCmdStartYurtHub(server.SetupSignalContext())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
if err := cmd.Execute(); err != nil {
panic(err)
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/vishvananda/netlink v1.1.1-0.20200603190939-5a869a71f0cb
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
github.com/wI2L/jsondiff v0.3.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8
google.golang.org/grpc v1.40.0
gopkg.in/cheggaaa/pb.v1 v1.0.25
gopkg.in/square/go-jose.v2 v2.2.2
Expand Down
Loading

0 comments on commit 3e69468

Please sign in to comment.