Skip to content

Commit

Permalink
pool-coordinator implementation of yurthub (#1073)
Browse files Browse the repository at this point in the history
* pool-coordinator implementation of yurthub

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>
  • Loading branch information
Congrool authored Dec 7, 2022
1 parent 7fa0805 commit 61b6716
Show file tree
Hide file tree
Showing 37 changed files with 3,828 additions and 361 deletions.
49 changes: 35 additions & 14 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"strings"
"time"

componentbaseconfig "k8s.io/component-base/config"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -37,6 +35,7 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
Expand Down Expand Up @@ -66,6 +65,7 @@ type YurtHubConfiguration struct {
YurtHubProxyServerSecureAddr string
YurtHubProxyServerDummyAddr string
YurtHubProxyServerSecureDummyAddr string
DiskCachePath string
GCFrequency int
CertMgrMode string
KubeletRootCAFilePath string
Expand Down Expand Up @@ -94,7 +94,13 @@ type YurtHubConfiguration struct {
CertIPs []net.IP
CoordinatorServer *url.URL
MinRequestTimeout time.Duration
CoordinatorStoragePrefix string
CoordinatorStorageAddr string // ip:port
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
LeaderElection componentbaseconfig.LeaderElectionConfiguration
CoordinatorClient kubernetes.Interface
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -130,7 +136,12 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort)
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
workingMode := util.WorkingMode(options.WorkingMode)
sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.EnableNodePool)
proxiedClient, err := buildProxiedClient(fmt.Sprintf("http://%s", proxyServerAddr))
if err != nil {
return nil, err
}
sharedFactory := informers.NewSharedInformerFactory(proxiedClient, 24*time.Hour)
yurtSharedFactory, err := createYurtSharedInformers(proxiedClient, options.EnableNodePool)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,6 +169,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
YurtHubProxyServerSecureAddr: proxySecureServerAddr,
YurtHubProxyServerDummyAddr: proxyServerDummyAddr,
YurtHubProxyServerSecureDummyAddr: proxySecureServerDummyAddr,
DiskCachePath: options.DiskCachePath,
GCFrequency: options.GCFrequency,
KubeletRootCAFilePath: options.KubeletRootCAFilePath,
KubeletPairFilePath: options.KubeletPairFilePath,
Expand All @@ -183,6 +195,11 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
FilterManager: filterManager,
CertIPs: certIPs,
MinRequestTimeout: options.MinRequestTimeout,
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
CoordinatorStorageCaFile: options.CoordinatorStorageCaFile,
CoordinatorStorageCertFile: options.CoordinatorStorageCertFile,
CoordinatorStorageKeyFile: options.CoordinatorStorageKeyFile,
LeaderElection: options.LeaderElection,
}

Expand Down Expand Up @@ -219,21 +236,26 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
return us, nil
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var yurtClient yurtclientset.Interface
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "")
func buildProxiedClient(proxyAddr string) (kubernetes.Interface, error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(proxyAddr, "")
if err != nil {
return nil, nil, err
return nil, err
}

client, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, err
}

return client, nil
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createYurtSharedInformers(proxiedClient kubernetes.Interface, enableNodePool bool) (yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var yurtClient yurtclientset.Interface
var err error

fakeYurtClient := &fake.Clientset{}
fakeWatch := watch.NewFake()
fakeYurtClient.AddWatchReactor("nodepools", core.DefaultWatchReactor(fakeWatch, nil))
Expand All @@ -242,12 +264,11 @@ func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.Sha
if enableNodePool {
yurtClient, err = yurtclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, err
}
}

return informers.NewSharedInformerFactory(client, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
return yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
}

// registerInformers reconstruct node/nodePool/configmap informers
Expand Down
81 changes: 46 additions & 35 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,46 @@ const (

// YurtHubOptions is the main settings for the yurthub
type YurtHubOptions struct {
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort string
YurtHubProxyPort string
YurtHubProxySecurePort string
GCFrequency int
YurtHubCertOrganizations string
KubeletRootCAFilePath string
KubeletPairFilePath string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort string
YurtHubProxyPort string
YurtHubProxySecurePort string
GCFrequency int
YurtHubCertOrganizations string
KubeletRootCAFilePath string
KubeletPairFilePath string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
CoordinatorStoragePrefix string
CoordinatorStorageAddr string
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
EnableNodePool bool
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -110,6 +115,7 @@ func NewYurtHubOptions() *YurtHubOptions {
KubeletHealthGracePeriod: time.Second * 40,
EnableNodePool: true,
MinRequestTimeout: time.Second * 1800,
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
Expand Down Expand Up @@ -184,6 +190,11 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Pool-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Pool-Coordinator etcd, in the format ip:port")
fs.StringVar(&o.CoordinatorStorageCaFile, "coordinator-storage-ca", o.CoordinatorStorageCaFile, "CA file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageCertFile, "coordinator-storage-cert", o.CoordinatorStorageCertFile, "Cert file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageKeyFile, "coordinator-storage-key", o.CoordinatorStorageKeyFile, "Key file path to communicate with Pool-Coordinator etcd")
bindFlags(&o.LeaderElection, fs)
}

Expand Down
Loading

0 comments on commit 61b6716

Please sign in to comment.