Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
danibachar committed Oct 6, 2021
1 parent 25c3102 commit 5ea9465
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 2 deletions.
98 changes: 98 additions & 0 deletions pkg/namespace/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
SPDX-License-Identifier: Apache-2.0
Copyright Contributors to the Submariner project.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

type Controller struct {
// Indirection hook for unit tests to supply fake client sets
NewClientset func(kubeConfig *rest.Config) (kubernetes.Interface, error)
namespaceInformer cache.Controller
namespaceStore cache.Store
stopCh chan struct{}
}

func NewController() *Controller {
return &Controller{
NewClientset: func(c *rest.Config) (kubernetes.Interface, error) {
return kubernetes.NewForConfig(c)
},
stopCh: make(chan struct{}),
}
}

func (c *Controller) Start(kubeConfig *rest.Config) error {
klog.Infof("Starting Namespace Controller")

clientSet, err := c.NewClientset(kubeConfig)
if err != nil {
return fmt.Errorf("error creating client set: %v", err)
}

c.namespaceStore, c.namespaceInformer = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientSet.CoreV1().Namespaces().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientSet.CoreV1().Namespaces().Watch(context.TODO(), options)
},
},
&v1.Namespace{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
)

go c.namespaceInformer.Run(c.stopCh)

return nil
}

func (c *Controller) Stop() {
close(c.stopCh)

klog.Infof("Namespace Controller stopped")
}

func (c *Controller) LocalNamespacesContains(namespace string) bool {
namespaces := c.namespaceStore.List()

for _, ns := range namespaces {
if ns.(*v1.Namespace).ObjectMeta.Name == ns {
return true
}
}

return false
}
4 changes: 4 additions & 0 deletions plugin/lighthouse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (lh *Lighthouse) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
return lh.nextOrFailure(state.Name(), ctx, w, r, dns.RcodeNameError, "Only services supported")
}

if contains := lh.localNamespaces.LocalNamespacesContains(pReq.namespace); !contains {
return lh.nextOrFailure(state.Name(), ctx, w, r, dns.RcodeNameError, "requested namespace is not present in the local cluster")
}

return lh.getDNSRecord(zone, state, ctx, w, r, pReq)
}

Expand Down
30 changes: 29 additions & 1 deletion plugin/lighthouse/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ func (m *MockLocalServices) GetIP(name, namespace string) (*serviceimport.DNSRec
func getKey(name, namespace string) string {
return namespace + "/" + name
}

type MockLocalNamespaces struct {
LocalNamespacesMap map[string]string
}

func NewMockLocalNamespaces() *MockLocalNamespaces {
nsMap := make(map[string]string)
nsMap[namespace1] = namespace1
nsMap[namespace2] = namespace2
return &MockLocalNamespaces{LocalNamespacesMap: nsMap}
}

func (m *MockLocalNamespaces) LocalNamespacesContains(namespace string) bool {
_, found := m.LocalNamespacesMap[namespace]
return found
}

func (w *FailingResponseWriter) WriteMsg(m *dns.Msg) error {
return errors.New(w.errorMsg)
}
Expand All @@ -133,13 +150,15 @@ func testWithoutFallback() {
mockEs := NewMockEndpointStatus()
mockEs.endpointStatusMap[clusterID] = true
mockLs := NewMockLocalServices()
mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: setupServiceImportMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}

Expand Down Expand Up @@ -336,7 +355,7 @@ func testWithFallback() {
mockEs := NewMockEndpointStatus()
mockEs.endpointStatusMap[clusterID] = true
mockLs := NewMockLocalServices()

mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
Fall: fall.F{Zones: []string{"clusterset.local."}},
Expand All @@ -346,6 +365,7 @@ func testWithFallback() {
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}

Expand Down Expand Up @@ -465,13 +485,15 @@ func testClusterStatus() {
mockEs.endpointStatusMap[clusterID] = true
mockEs.endpointStatusMap[clusterID2] = true
mockLs := NewMockLocalServices()
mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: setupServiceImportMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}
lh.serviceImports.Put(newServiceImport(namespace1, service1, clusterID2, serviceIP2, portName2,
Expand Down Expand Up @@ -627,13 +649,15 @@ func testHeadlessService() {
mockEs.endpointStatusMap[clusterID] = true
mockEs.endpointStatusMap[clusterID2] = true
mockLs := NewMockLocalServices()
mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: serviceimport.NewMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}

Expand Down Expand Up @@ -836,13 +860,15 @@ func testLocalService() {
},
ClusterName: clusterID,
}
mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: setupServiceImportMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}
lh.serviceImports.Put(newServiceImport(namespace1, service1, clusterID2, serviceIP2, portName2, portNumber2,
Expand Down Expand Up @@ -972,13 +998,15 @@ func testSRVMultiplePorts() {
},
ClusterName: clusterID,
}
mockNs := NewMockLocalNamespaces()
lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: setupServiceImportMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
localNamespaces: mockNs,
ttl: defaultTTL,
}

Expand Down
5 changes: 5 additions & 0 deletions plugin/lighthouse/lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Lighthouse struct {
clusterStatus ClusterStatus
endpointsStatus EndpointsStatus
localServices LocalServices
localNamespaces LocalNamespaces
}

type ClusterStatus interface {
Expand All @@ -63,6 +64,10 @@ type LocalServices interface {
GetIP(name, namespace string) (*serviceimport.DNSRecord, bool)
}

type LocalNamespaces interface {
LocalNamespacesContains(namespacce string) bool
}

type EndpointsStatus interface {
IsHealthy(name, namespace, clusterID string) bool
}
Expand Down
9 changes: 8 additions & 1 deletion plugin/lighthouse/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/submariner-io/lighthouse/pkg/endpointslice"
"github.com/submariner-io/lighthouse/pkg/gateway"
"github.com/submariner-io/lighthouse/pkg/namesapce"
"github.com/submariner-io/lighthouse/pkg/service"
"github.com/submariner-io/lighthouse/pkg/serviceimport"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -99,17 +100,23 @@ func lighthouseParse(c *caddy.Controller) (*Lighthouse, error) {
if err != nil {
return nil, fmt.Errorf("error starting the Service controller: %v", err)
}
nsController := namesapce.NewController()
err = nsController.Start(cfg)
if err != nil {
return nil, fmt.Errorf("error starting the namespace controller: %v", err)
}

c.OnShutdown(func() error {
siController.Stop()
epController.Stop()
gwController.Stop()
svcController.Stop()
nsController.Stop()
return nil
})

lh := &Lighthouse{ttl: defaultTTL, serviceImports: siMap, clusterStatus: gwController, endpointSlices: epMap,
endpointsStatus: epController, localServices: svcController}
endpointsStatus: epController, localServices: svcController, localNamespaces: nsController}

// Changed `for` to `if` to satisfy golint:
// SA4004: the surrounding loop is unconditionally terminated (staticcheck)
Expand Down

0 comments on commit 5ea9465

Please sign in to comment.