diff --git a/pkg/controllers/job/plugins/svc/svc.go b/pkg/controllers/job/plugins/svc/svc.go index c80070deee..b800365267 100644 --- a/pkg/controllers/job/plugins/svc/svc.go +++ b/pkg/controllers/job/plugins/svc/svc.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -97,6 +98,11 @@ func (sp *servicePlugin) OnJobAdd(job *batch.Job) error { return err } + // TODO: maybe add a flag + if err := sp.createNetworkPolicyIfNotExist(job); err != nil { + return err + } + job.Status.ControlledResources["plugin-"+sp.Name()] = sp.Name() return nil @@ -188,6 +194,56 @@ func (sp *servicePlugin) createServiceIfNotExist(job *batch.Job) error { return nil } +// Limit pods can be accessible only by pods belong to the job. +func (sp *servicePlugin) createNetworkPolicyIfNotExist(job *batch.Job) error { + // If network policy does not exist, create one for Job. + if _, err := sp.Clientset.KubeClients.NetworkingV1().NetworkPolicies(job.Namespace).Get(job.Name, metav1.GetOptions{}); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get NetworkPolicy for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + networkpolicy := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: job.Name, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + batch.JobNameKey: job.Name, + batch.JobNamespaceKey: job.Namespace, + }, + }, + Ingress: []networkingv1.NetworkPolicyIngressRule{{ + From: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + batch.JobNameKey: job.Name, + batch.JobNamespaceKey: job.Namespace, + }, + }, + }}, + }}, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress}, + }, + } + + if _, e := sp.Clientset.KubeClients.NetworkingV1().NetworkPolicies(job.Namespace).Create(networkpolicy); e != nil { + glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", job.Namespace, job.Name, e) + return e + } + job.Status.ControlledResources["plugin-"+sp.Name()] = sp.Name() + + } + + return nil +} + func (sp *servicePlugin) cmName(job *batch.Job) string { return fmt.Sprintf("%s-%s", job.Name, sp.Name()) }