Skip to content

Commit

Permalink
update certificate automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
YRXING committed Oct 28, 2021
1 parent 56f6161 commit 7a4bd43
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/yurt-tunnel-server/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {

// 2. create a certificate manager for the tunnel server and run the
// csr approver for both yurttunnel-server and yurttunnel-agent
serverCertMgr, err := certmanager.NewYurttunnelServerCertManager(cfg.Client, cfg.CertDNSNames, cfg.CertIPs, stopCh)
serverCertMgr, err := certmanager.NewYurttunnelServerCertManager(cfg.Client, cfg.SharedInformerFactory, cfg.CertDNSNames, cfg.CertIPs, stopCh)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/yurttunnel/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
YurttunnelServerServiceName = "x-tunnel-server-svc"
YurttunnelServerAgentPortName = "tcp"
YurttunnelServerExternalAddrKey = "x-tunnel-server-external-addr"
YurttunnelServerLabel = "yurt-tunnel-server"
YurttunnelEndpointsNs = "kube-system"
YurttunnelEndpointsName = "x-tunnel-server-svc"

Expand Down
6 changes: 4 additions & 2 deletions pkg/yurttunnel/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ func NewCoreDNSRecordController(client clientset.Interface,

// newServiceInformer creates a shared index informer that returns only interested services
func newServiceInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("metadata.name=%v", constants.YurttunnelServerInternalServiceName)
// this informer will be used by coreDNSRecordController and certificate manager,
// so it should return x-tunnel-server-svc and x-tunnel-server-internal-svc
selector := fmt.Sprintf("name=%v", constants.YurttunnelServerLabel)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
options.LabelSelector = selector
}
return coreinformers.NewFilteredServiceInformer(cs, constants.YurttunnelServerServiceNs, resyncPeriod, nil, tweakListOptions)
}
Expand Down
68 changes: 43 additions & 25 deletions pkg/yurttunnel/pki/certmanager/certmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package certmanager

import (
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
Expand All @@ -32,19 +31,22 @@ import (
"github.com/openyurtio/openyurt/pkg/yurttunnel/server/serveraddr"

certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
clicert "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/certificate"
"k8s.io/klog/v2"
)

// NewYurttunnelServerCertManager creates a certificate manager for
// the yurttunnel-server
func NewYurttunnelServerCertManager(
clientset kubernetes.Interface,
factory informers.SharedInformerFactory,
clCertNames []string,
clIPs []net.IP,
stopCh <-chan struct{}) (certificate.Manager, error) {
Expand All @@ -54,32 +56,35 @@ func NewYurttunnelServerCertManager(
ips = []net.IP{}
err error
)

// add endPoints informer
factory.InformerFor(&v1.Endpoints{}, newEndPointsInformer)

// waite for the yurt tunnel server related resources synced
svcHasSynced := factory.Core().V1().Services().Informer().HasSynced
epsHasSynced := factory.Core().V1().Endpoints().Informer().HasSynced
nodesHasSynced := factory.Core().V1().Nodes().Informer().HasSynced
if !cache.WaitForCacheSync(stopCh, svcHasSynced, epsHasSynced, nodesHasSynced) {
return nil, fmt.Errorf("informer synced failed")
}

_ = wait.PollUntil(5*time.Second, func() (bool, error) {
dnsNames, ips, err = serveraddr.GetYurttunelServerDNSandIP(clientset)
dnsNames, _, err = serveraddr.GetYurttunelServerDNSandIP(clientset)
if err != nil {
return false, err
}

// get clusterIP for tunnel server internal service
svc, err := clientset.CoreV1().Services(constants.YurttunnelServerServiceNs).Get(context.Background(), constants.YurttunnelServerInternalServiceName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// compatible with versions that not supported x-tunnel-server-internal-svc
return true, nil
} else if err != nil {
return false, err
}

if svc.Spec.ClusterIP != "" && net.ParseIP(svc.Spec.ClusterIP) != nil {
ips = append(ips, net.ParseIP(svc.Spec.ClusterIP))
dnsNames = append(dnsNames, serveraddr.GetDefaultDomainsForSvc(svc.Namespace, svc.Name)...)
}

return true, nil
}, stopCh)
// add user specified DNS anems and IP addresses

// add user specified DNS anems
dnsNames = append(dnsNames, clCertNames...)
ips = append(ips, clIPs...)
klog.Infof("subject of tunnel server certificate, ips=%#+v, dnsNames=%#+v", ips, dnsNames)

getIPs := func() []net.IP {
ips = serveraddr.YurttunnelServerIPManager(factory)
ips = append(ips, clIPs...)
return ips
}

return newCertManager(
clientset,
Expand All @@ -94,7 +99,7 @@ func NewYurttunnelServerCertManager(
certificates.UsageServerAuth,
certificates.UsageClientAuth,
},
ips)
getIPs)
}

// NewYurttunnelAgentCertManager creates a certificate manager for
Expand All @@ -109,6 +114,10 @@ func NewYurttunnelAgentCertManager(
constants.YurttunnelAgentPodIPEnv)
}

getIPs := func() []net.IP {
return []net.IP{net.ParseIP(nodeIP)}
}

return newCertManager(
clientset,
projectinfo.GetAgentName(),
Expand All @@ -121,7 +130,7 @@ func NewYurttunnelAgentCertManager(
certificates.UsageDigitalSignature,
certificates.UsageClientAuth,
},
[]net.IP{net.ParseIP(nodeIP)})
getIPs)
}

// NewCertManager creates a certificate manager that will generates a
Expand All @@ -134,7 +143,7 @@ func newCertManager(
organizations,
dnsNames []string,
keyUsages []certificates.KeyUsage,
ipAddrs []net.IP) (certificate.Manager, error) {
getIPs serveraddr.GetIPs) (certificate.Manager, error) {
certificateStore, err :=
store.NewFileStoreWrapper(componentName, certDir, certDir, "", "")
if err != nil {
Expand All @@ -148,7 +157,7 @@ func newCertManager(
Organization: organizations,
},
DNSNames: dnsNames,
IPAddresses: ipAddrs,
IPAddresses: getIPs(),
}
}

Expand All @@ -167,3 +176,12 @@ func newCertManager(

return certManager, nil
}

// newEndPointsInformer creates a shared index informer that returns only interested endpoints
func newEndPointsInformer(cs kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("metadata.name=%v", constants.YurttunnelEndpointsName)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}
return coreinformers.NewFilteredEndpointsInformer(cs, constants.YurttunnelEndpointsNs, resyncPeriod, nil, tweakListOptions)
}
95 changes: 95 additions & 0 deletions pkg/yurttunnel/server/serveraddr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

type GetIPs func() []net.IP

// GetServerAddr gets the service address that exposes the tunnel server for
// tunnel agent to connect
func GetTunnelServerAddr(clientset kubernetes.Interface) (string, error) {
Expand Down Expand Up @@ -105,6 +109,97 @@ func GetYurttunelServerDNSandIP(
return extractTunnelServerDNSandIPs(svc, eps, nodeLst)
}

// YurttunelServerIPManager list the latest tunnel server resources, extract ips from them
func YurttunnelServerIPManager(factory informers.SharedInformerFactory) []net.IP {
var (
services []*v1.Service
eps []*v1.Endpoints
nodes []*v1.Node
ips = make([]net.IP, 0)
internalIp net.IP
err error
)

// list yurt-tunnel-server services
services, err = factory.Core().V1().Services().Lister().List(labels.Everything())
if err != nil {
return ips
}

// list x-tunnel-server-svc endpoints
eps, err = factory.Core().V1().Endpoints().Lister().List(labels.Everything())
if err != nil {
return ips
}

// list all of cloud nodes
label := fmt.Sprintf("%s=false", projectinfo.GetEdgeWorkerLabelKey())
selector, _ := labels.Parse(label)
// yurttunnel-server will be deployed on one of the cloud nodes
nodes, err = factory.Core().V1().Nodes().Lister().List(selector)
if err != nil {
return ips
}

// extract ip from the services
for _, svc := range services {
if svc.Name == constants.YurttunnelServerServiceName {
switch svc.Spec.Type {
case corev1.ServiceTypeLoadBalancer:
// make sure lb ip address is the first index in return ips slice
_, ips, err = getLoadBalancerDNSandIP(svc)
case corev1.ServiceTypeClusterIP:
// make sure annotation setting address is the first index in return ips slice
_, ips, err = getClusterIPDNSandIP(svc)
case corev1.ServiceTypeNodePort:
for _, node := range nodes {
if node.Labels[projectinfo.GetEdgeWorkerLabelKey()] == "false" {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
ips = append(ips, net.ParseIP(addr.Address))
}
}
}
}
default:
err = fmt.Errorf("unsupported service type: %s", string(svc.Spec.Type))
}

// extract ip from ClusterIP info
if svc.Spec.ClusterIP != "None" {
ips = append(ips, net.ParseIP(svc.Spec.ClusterIP))
}
ips = append(ips, net.ParseIP("127.0.0.1"))

if err != nil {
return ips
}
} else {
// get clusterIP for tunnel server internal service
if svc.Name == constants.YurttunnelServerInternalServiceName && svc.Spec.ClusterIP != "" && net.ParseIP(svc.Spec.ClusterIP) != nil {
internalIp = net.ParseIP(svc.Spec.ClusterIP)
}
}
}

if internalIp != nil {
ips = append(ips, internalIp)
}

// extract dns and ip from the endpoints
for _, ep := range eps {
for _, ss := range ep.Subsets {
for _, addr := range ss.Addresses {
if len(addr.IP) != 0 {
ips = append(ips, net.ParseIP(addr.IP))
}
}
}
}

return ips
}

// getTunnelServerResources get service, endpoints, and cloud nodes of tunnel server
func getTunnelServerResources(clientset kubernetes.Interface) (*v1.Service, *v1.Endpoints, *v1.NodeList, error) {
var (
Expand Down

0 comments on commit 7a4bd43

Please sign in to comment.