diff --git a/balancer/weightedtarget/weightedtarget.go b/balancer/weightedtarget/weightedtarget.go index c004e112c40a..a617f6a63a22 100644 --- a/balancer/weightedtarget/weightedtarget.go +++ b/balancer/weightedtarget/weightedtarget.go @@ -108,6 +108,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } addressesSplit := hierarchy.Group(s.ResolverState.Addresses) + endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) b.stateAggregator.PauseStateUpdates() defer b.stateAggregator.ResumeStateUpdates() @@ -155,6 +156,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: addressesSplit[name], + Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes.WithValue(localityKey, name), }, diff --git a/internal/hierarchy/hierarchy.go b/internal/hierarchy/hierarchy.go index c3baac3643ce..362c05fa2aa6 100644 --- a/internal/hierarchy/hierarchy.go +++ b/internal/hierarchy/hierarchy.go @@ -48,6 +48,18 @@ func (p pathValue) Equal(o any) bool { return true } +// FromEndpoint returns the hierarchical path of endpoint. +func FromEndpoint(endpoint resolver.Endpoint) []string { + path, _ := endpoint.Attributes.Value(pathKey).(pathValue) + return path +} + +// SetInEndpoint overrides the hierarchical path in endpoint with path. +func SetInEndpoint(endpoint resolver.Endpoint, path []string) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(pathKey, pathValue(path)) + return endpoint +} + // Get returns the hierarchical path of addr. func Get(addr resolver.Address) []string { attrs := addr.BalancerAttributes @@ -110,3 +122,50 @@ func Group(addrs []resolver.Address) map[string][]resolver.Address { } return ret } + +// GroupEndpoints splits a slice of endpoints into groups based on +// the first hierarchy path. The first hierarchy path will be removed from the +// result. +// +// Input: +// [ +// +// {endpoint0, path: [p0, wt0]} +// {endpoint1, path: [p0, wt1]} +// {endpoint2, path: [p1, wt2]} +// {endpoint3, path: [p1, wt3]} +// +// ] +// +// Endpoints will be split into p0/p1, and the p0/p1 will be removed from the +// path. +// +// Output: +// +// { +// p0: [ +// {endpoint0, path: [wt0]}, +// {endpoint1, path: [wt1]}, +// ], +// p1: [ +// {endpoint2, path: [wt2]}, +// {endpoint3, path: [wt3]}, +// ], +// } +// +// If hierarchical path is not set, or has no path in it, the endpoint is +// dropped. +func GroupEndpoints(endpoints []resolver.Endpoint) map[string][]resolver.Endpoint { + ret := make(map[string][]resolver.Endpoint) + for _, endpoint := range endpoints { + oldPath := FromEndpoint(endpoint) + if len(oldPath) == 0 { + continue + } + curPath := oldPath[0] + newPath := oldPath[1:] + newEndpoint := SetInEndpoint(endpoint, newPath) + ret[curPath] = append(ret[curPath], newEndpoint) + } + return ret +} diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index ef5b34ea4451..24ad2399ddd4 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -87,6 +87,7 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er // TODO: Get rid of handling hierarchy in addresses. This LB policy never // gets addresses from the resolver. addressesSplit := hierarchy.Group(s.ResolverState.Addresses) + endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) // Remove sub-balancers that are not in the new list from the aggregator and // balancergroup. @@ -139,6 +140,7 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: addressesSplit[childName], + Endpoints: endpointsSplit[childName], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }, diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 3b996989689e..ae2c5fe957a2 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -252,7 +252,6 @@ func (b *clusterResolverBalancer) updateChildConfig() { for i, a := range addrs { endpoints[i].Attributes = a.BalancerAttributes endpoints[i].Addresses = []resolver.Address{a} - endpoints[i].Addresses[0].BalancerAttributes = nil } if err := b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index c17c62f23a59..ba3fe52e5c0f 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -123,6 +123,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } addressesSplit := hierarchy.Group(s.ResolverState.Addresses) + endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints) b.mu.Lock() // Create and remove children, since we know all children from the config @@ -142,6 +143,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err cb := newChildBalancer(name, b, bb.Name(), b.cc) cb.updateConfig(newSubConfig, resolver.State{ Addresses: addressesSplit[name], + Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }) @@ -163,6 +165,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // be built, if it's a low priority). currentChild.updateConfig(newSubConfig, resolver.State{ Addresses: addressesSplit[name], + Endpoints: endpointsSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, })