Skip to content

Commit

Permalink
Merge pull request #2045 from songrx1997/label-propagation-config
Browse files Browse the repository at this point in the history
Refactor code related to PodLabelPropagationConfig
  • Loading branch information
k8s-ci-robot authored Mar 29, 2023
2 parents 4d35858 + a7a437b commit 58c50d9
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 41 deletions.
3 changes: 2 additions & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
ingctx "k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
"k8s.io/ingress-gce/pkg/neg"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"

"k8s.io/ingress-gce/cmd/glbc/app"
Expand Down Expand Up @@ -332,7 +333,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
asmServiceNEGSkipNamespaces = cmconfig.ASMServiceNEGSkipNamespaces
}

lpConfig := negtypes.PodLabelPropagationConfig{}
lpConfig := labels.PodLabelPropagationConfig{}
if flags.F.EnableNEGLabelPropagation {
lpConfigEnvVar := os.Getenv("LABEL_PROPAGATION_CONFIG")
if err := json.Unmarshal([]byte(lpConfigEnvVar), &lpConfig); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/ingress-gce/pkg/neg/metrics"
syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewController(
enableNonGcpMode bool,
enableAsm bool,
asmServiceNEGSkipNamespaces []string,
lpConfig negtypes.PodLabelPropagationConfig,
lpConfig labels.PodLabelPropagationConfig,
logger klog.Logger,
) *Controller {
logger = logger.WithName("NEGController")
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -140,7 +141,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
false, //enableNonGcpMode
enableASM, //enableAsm
[]string{},
negtypes.PodLabelPropagationConfig{},
labels.PodLabelPropagationConfig{},
klog.TODO(),
)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
podlabels "k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -107,7 +108,7 @@ type syncerManager struct {
vmIpPortZoneMap map[string]struct{}

// lpConfig configures the pod label to be propagated to NEG endpoints.
lpConfig negtypes.PodLabelPropagationConfig
lpConfig podlabels.PodLabelPropagationConfig
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
Expand All @@ -124,7 +125,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
syncerMetrics *metrics.SyncerMetrics,
enableNonGcpMode bool,
numGCWorkers int,
lpConfig negtypes.PodLabelPropagationConfig,
lpConfig podlabels.PodLabelPropagationConfig,
logger klog.Logger) *syncerManager {

var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
Expand Down Expand Up @@ -221,7 +222,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg

// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter,
syncerKey, portInfo.EpCalculatorMode, manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName), manager.lpConfig)
syncerKey, portInfo.EpCalculatorMode, manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName))
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
manager.recorder,
Expand All @@ -239,6 +240,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.syncerMetrics,
!manager.namer.IsNEG(portInfo.NegName),
manager.logger,
manager.lpConfig,
)
manager.syncerMap[syncerKey] = syncer
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce
metrics.FakeSyncerMetrics(),
false, //enableNonGcpMode
testContext.NumGCWorkers,
negtypes.PodLabelPropagationConfig{},
labels.PodLabelPropagationConfig{},
klog.TODO(),
)
return manager, testContext.Cloud
Expand Down
6 changes: 2 additions & 4 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,15 @@ type L7EndpointsCalculator struct {
servicePortName string
podLister cache.Indexer
networkEndpointType types.NetworkEndpointType
lpConfig types.PodLabelPropagationConfig
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, lpConfig types.PodLabelPropagationConfig) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
networkEndpointType: endpointType,
lpConfig: lpConfig,
logger: logger.WithName("L7EndpointsCalculator"),
}
}
Expand All @@ -199,7 +197,7 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType, l.lpConfig)
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType)
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestValidateEndpoints(t *testing.T) {
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer())
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), negtypes.PodLabelPropagationConfig{})
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO())
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())

Expand Down
29 changes: 29 additions & 0 deletions pkg/neg/syncers/labels/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package labels

// PodLabelPropagationConfig contains a list of configurations for labels to be propagated to GCE network endpoints.
type PodLabelPropagationConfig struct {
Labels []Label
}

// Label contains configuration for a label to be propagated to GCE network endpoints.
type Label struct {
Key string
ShortKey string
MaxLabelSizeBytes int
}
52 changes: 29 additions & 23 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils/patch"
Expand Down Expand Up @@ -108,6 +109,9 @@ type transactionSyncer struct {

// enableDegradedMode indicates whether we do endpoint calculation using degraded mode procedures
enableDegradedMode bool

// podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints
podLabelPropagationConfig labels.PodLabelPropagationConfig
}

func NewTransactionSyncer(
Expand All @@ -126,32 +130,34 @@ func NewTransactionSyncer(
svcNegClient svcnegclient.Interface,
syncerMetrics *metrics.SyncerMetrics,
customName bool,
log klog.Logger) negtypes.NegSyncer {
log klog.Logger,
lpConfig labels.PodLabelPropagationConfig) negtypes.NegSyncer {

logger := log.WithName("Syncer").WithValues("service", klog.KRef(negSyncerKey.Namespace, negSyncerKey.Name), "negName", negSyncerKey.NegName)

// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
errorState: "",
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
errorState: "",
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
podLabelPropagationConfig: lpConfig,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand All @@ -161,7 +167,7 @@ func NewTransactionSyncer(
return syncer
}

func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, lpConfig negtypes.PodLabelPropagationConfig) negtypes.NetworkEndpointsCalculator {
func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger) negtypes.NetworkEndpointsCalculator {
serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/")
if syncerKey.NegType == negtypes.VmIpEndpointType {
nodeLister := listers.NewNodeLister(nodeLister)
Expand All @@ -173,7 +179,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
}
}
return NewL7EndpointsCalculator(zoneGetter, podLister, syncerKey.PortTuple.Name,
syncerKey.NegType, logger, lpConfig)
syncerKey.NegType, logger)
}

func (s *transactionSyncer) sync() error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/endpointslices"
Expand Down Expand Up @@ -1390,7 +1391,7 @@ func TestUnknownNodes(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), negtypes.PodLabelPropagationConfig{})
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO())
return negsyncer, ts
}

Expand Down Expand Up @@ -1431,13 +1432,13 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
testContext.NodeInformer.GetIndexer(),
testContext.SvcNegInformer.GetIndexer(),
reflector,
GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter,
svcPort, mode, klog.TODO(), negtypes.PodLabelPropagationConfig{}),
GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO()),
string(kubeSystemUID),
testContext.SvcNegClient,
metrics.FakeSyncerMetrics(),
customName,
klog.TODO(),
labels.PodLabelPropagationConfig{},
)
transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer)
indexers := map[string]cache.IndexFunc{
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
}

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, lpConfig negtypes.PodLabelPropagationConfig) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
Expand Down
3 changes: 1 addition & 2 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,8 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
},
}

// TODO(songrx1997): Add endpoint annotations for the test after calculation code is in
for _, tc := range testCases {
retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType, negtypes.PodLabelPropagationConfig{})
retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType)
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
}
Expand Down

0 comments on commit 58c50d9

Please sign in to comment.