Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool-coordinator implementation of yurthub #1073

Merged
merged 22 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c33f71f
init etcd storage
Congrool Nov 22, 2022
897fcc5
add ut for etcdStorage
Congrool Oct 12, 2022
6b28ef6
add unit tests for etcd storage
Congrool Oct 14, 2022
c94d3a2
improve lifecycle context management of yurthub and etcd storage
Congrool Nov 22, 2022
d10ad4b
add pool coordinator proxy
Congrool Nov 22, 2022
e48264c
implement coordinator
Congrool Nov 22, 2022
8950261
integrate coordinator into yurthub
Congrool Nov 22, 2022
ef97f73
avoid race condition when elector status changes
Congrool Nov 22, 2022
75aca26
use rest manager to generate cloud kubeclient to ensure that the serv…
Congrool Nov 22, 2022
2b77999
use separated user-agent for pool-scoped resources list/watch request
Congrool Nov 22, 2022
fedd1cc
only allow leader yurthub update pool-scoped cache
Congrool Nov 22, 2022
511f801
remove old pool-scoped resources before sync
Congrool Nov 22, 2022
2f82744
cache response to local when proxy to pool-coordinator
Congrool Nov 22, 2022
dd4e835
add comments and fix lint
Congrool Nov 30, 2022
53f8adb
only save cache to poolcoordinator when leader asks for pool-scoped r…
Congrool Nov 30, 2022
b7d60d2
optimize proxy handler for different resources
Congrool Nov 30, 2022
d58dc4b
fix cache response and add GZipReaderCloser
Congrool Dec 1, 2022
262d9ec
add ability of switching pool-scoped resource request from cloud to p…
Congrool Dec 1, 2022
eb45555
stop write to pool-coordinator when giving up leadership
Congrool Dec 1, 2022
82948b9
cancel local watch when pool-coordinator is ready
Congrool Dec 1, 2022
fcb0c31
only cache node and pod resources to pool-coordinator
Congrool Dec 1, 2022
9306fa0
add options for coordinator storage
Congrool Dec 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"strings"
"time"

componentbaseconfig "k8s.io/component-base/config"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -37,6 +35,7 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
Expand Down Expand Up @@ -66,6 +65,7 @@ type YurtHubConfiguration struct {
YurtHubProxyServerSecureAddr string
YurtHubProxyServerDummyAddr string
YurtHubProxyServerSecureDummyAddr string
DiskCachePath string
GCFrequency int
CertMgrMode string
KubeletRootCAFilePath string
Expand Down Expand Up @@ -94,7 +94,13 @@ type YurtHubConfiguration struct {
CertIPs []net.IP
CoordinatorServer *url.URL
MinRequestTimeout time.Duration
CoordinatorStoragePrefix string
CoordinatorStorageAddr string // ip:port
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
LeaderElection componentbaseconfig.LeaderElectionConfiguration
CoordinatorClient kubernetes.Interface
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -130,7 +136,12 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort)
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
workingMode := util.WorkingMode(options.WorkingMode)
sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.EnableNodePool)
proxiedClient, err := buildProxiedClient(fmt.Sprintf("http://%s", proxyServerAddr))
if err != nil {
return nil, err
}
sharedFactory := informers.NewSharedInformerFactory(proxiedClient, 24*time.Hour)
yurtSharedFactory, err := createYurtSharedInformers(proxiedClient, options.EnableNodePool)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,6 +169,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
YurtHubProxyServerSecureAddr: proxySecureServerAddr,
YurtHubProxyServerDummyAddr: proxyServerDummyAddr,
YurtHubProxyServerSecureDummyAddr: proxySecureServerDummyAddr,
DiskCachePath: options.DiskCachePath,
GCFrequency: options.GCFrequency,
KubeletRootCAFilePath: options.KubeletRootCAFilePath,
KubeletPairFilePath: options.KubeletPairFilePath,
Expand All @@ -183,6 +195,11 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
FilterManager: filterManager,
CertIPs: certIPs,
MinRequestTimeout: options.MinRequestTimeout,
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
CoordinatorStorageCaFile: options.CoordinatorStorageCaFile,
CoordinatorStorageCertFile: options.CoordinatorStorageCertFile,
CoordinatorStorageKeyFile: options.CoordinatorStorageKeyFile,
LeaderElection: options.LeaderElection,
}

Expand Down Expand Up @@ -219,21 +236,26 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
return us, nil
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var yurtClient yurtclientset.Interface
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "")
func buildProxiedClient(proxyAddr string) (kubernetes.Interface, error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(proxyAddr, "")
if err != nil {
return nil, nil, err
return nil, err
}

client, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, err
}

return client, nil
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createYurtSharedInformers(proxiedClient kubernetes.Interface, enableNodePool bool) (yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var yurtClient yurtclientset.Interface
var err error

fakeYurtClient := &fake.Clientset{}
fakeWatch := watch.NewFake()
fakeYurtClient.AddWatchReactor("nodepools", core.DefaultWatchReactor(fakeWatch, nil))
Expand All @@ -242,12 +264,11 @@ func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.Sha
if enableNodePool {
yurtClient, err = yurtclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, err
}
}

return informers.NewSharedInformerFactory(client, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
return yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
}

// registerInformers reconstruct node/nodePool/configmap informers
Expand Down
81 changes: 46 additions & 35 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,46 @@ const (

// YurtHubOptions is the main settings for the yurthub
type YurtHubOptions struct {
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort string
YurtHubProxyPort string
YurtHubProxySecurePort string
GCFrequency int
YurtHubCertOrganizations string
KubeletRootCAFilePath string
KubeletPairFilePath string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
ServerAddr string
YurtHubHost string // YurtHub server host (e.g.: expose metrics API)
YurtHubProxyHost string // YurtHub proxy server host
YurtHubPort string
YurtHubProxyPort string
YurtHubProxySecurePort string
GCFrequency int
YurtHubCertOrganizations string
KubeletRootCAFilePath string
KubeletPairFilePath string
NodeName string
NodePoolName string
LBMode string
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Version bool
EnableProfiling bool
EnableDummyIf bool
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
DiskCachePath string
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
CoordinatorStoragePrefix string
CoordinatorStorageAddr string
CoordinatorStorageCaFile string
CoordinatorStorageCertFile string
CoordinatorStorageKeyFile string
EnableNodePool bool
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -110,6 +115,7 @@ func NewYurtHubOptions() *YurtHubOptions {
KubeletHealthGracePeriod: time.Second * 40,
EnableNodePool: true,
MinRequestTimeout: time.Second * 1800,
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
Expand Down Expand Up @@ -184,6 +190,11 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Pool-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Pool-Coordinator etcd, in the format ip:port")
fs.StringVar(&o.CoordinatorStorageCaFile, "coordinator-storage-ca", o.CoordinatorStorageCaFile, "CA file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageCertFile, "coordinator-storage-cert", o.CoordinatorStorageCertFile, "Cert file path to communicate with Pool-Coordinator etcd")
fs.StringVar(&o.CoordinatorStorageKeyFile, "coordinator-storage-key", o.CoordinatorStorageKeyFile, "Key file path to communicate with Pool-Coordinator etcd")
bindFlags(&o.LeaderElection, fs)
}

Expand Down
Loading